Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
black --line-length 120 . --exclude renv*
Browse files Browse the repository at this point in the history
  • Loading branch information
jcblemai committed Dec 14, 2022
1 parent b32405f commit 2fe689b
Show file tree
Hide file tree
Showing 44 changed files with 1,048 additions and 2,757 deletions.
106 changes: 29 additions & 77 deletions batch/inference_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,12 @@
@click.option(
"--reset-chimerics-on-global-accept",
"--reset-chimerics-on-global-accept",
"reset_chimerics",
envvar="COVID_RESET_CHIMERICS",
type=bool,
default=True,
help="Flag determining whether to reset chimeric values on any global acceptances",
"reset_chimerics",
envvar="COVID_RESET_CHIMERICS",
type=bool,
default=True,
help="Flag determining whether to reset chimeric values on any global acceptances",
)

def launch_batch(
config_file,
run_id,
Expand All @@ -192,7 +191,7 @@ def launch_batch(
resume_discard_seeding,
max_stacked_interventions,
last_validation_date,
reset_chimerics
reset_chimerics,
):

config = None
Expand All @@ -211,9 +210,7 @@ def launch_batch(
if "filtering" in config:
config["filtering"]["simulations_per_slot"] = sims_per_job
if not os.path.exists(config["filtering"]["data_path"]):
print(
f"ERROR: filtering.data_path path {config['filtering']['data_path']} does not exist!"
)
print(f"ERROR: filtering.data_path path {config['filtering']['data_path']} does not exist!")
return 1
else:
print(f"WARNING: no filtering section found in {config_file}!")
Expand Down Expand Up @@ -258,9 +255,7 @@ def autodetect_params(config, *, num_jobs=None, sims_per_job=None, num_blocks=No
return (num_jobs, sims_per_job, num_blocks)

if "filtering" not in config or "simulations_per_slot" not in config["filtering"]:
raise click.UsageError(
"filtering::simulations_per_slot undefined in config, can't autodetect parameters"
)
raise click.UsageError("filtering::simulations_per_slot undefined in config, can't autodetect parameters")
sims_per_slot = int(config["filtering"]["simulations_per_slot"])

if num_jobs is None:
Expand All @@ -270,17 +265,10 @@ def autodetect_params(config, *, num_jobs=None, sims_per_job=None, num_blocks=No
if sims_per_job is None:
if num_blocks is not None:
sims_per_job = int(math.ceil(sims_per_slot / num_blocks))
print(
f"Setting number of blocks to {num_blocks} [via num_blocks (-k) argument]"
)
print(
f"Setting sims per job to {sims_per_job} [via {sims_per_slot} simulations_per_slot in config]"
)
print(f"Setting number of blocks to {num_blocks} [via num_blocks (-k) argument]")
print(f"Setting sims per job to {sims_per_job} [via {sims_per_slot} simulations_per_slot in config]")
else:
geoid_fname = (
pathlib.Path(config["spatial_setup"]["base_path"])
/ config["spatial_setup"]["geodata"]
)
geoid_fname = pathlib.Path(config["spatial_setup"]["base_path"]) / config["spatial_setup"]["geodata"]
with open(geoid_fname) as geoid_fp:
num_geoids = sum(1 for line in geoid_fp)

Expand All @@ -298,9 +286,7 @@ def autodetect_params(config, *, num_jobs=None, sims_per_job=None, num_blocks=No

if num_blocks is None:
num_blocks = int(math.ceil(sims_per_slot / sims_per_job))
print(
f"Setting number of blocks to {num_blocks} [via {sims_per_slot} simulations_per_slot in config]"
)
print(f"Setting number of blocks to {num_blocks} [via {sims_per_slot} simulations_per_slot in config]")

return (num_jobs, sims_per_job, num_blocks)

Expand All @@ -312,9 +298,7 @@ def get_job_queues(job_queue_prefix):
for q in resp["jobQueues"]:
queue_name = q["jobQueueName"]
if queue_name.startswith(job_queue_prefix):
job_list_resp = batch_client.list_jobs(
jobQueue=queue_name, jobStatus="PENDING"
)
job_list_resp = batch_client.list_jobs(jobQueue=queue_name, jobStatus="PENDING")
queues_with_jobs[queue_name] = len(job_list_resp["jobSummaryList"])
# Return the least-loaded queues first
return sorted(queues_with_jobs, key=queues_with_jobs.get)
Expand Down Expand Up @@ -363,9 +347,7 @@ def launch(self, job_name, config_file, scenarios, p_death_names, job_queues):
manifest["cmd"] = " ".join(sys.argv[:])
manifest["job_name"] = job_name
manifest["data_sha"] = subprocess.getoutput("git rev-parse HEAD")
manifest["csp_sha"] = subprocess.getoutput(
"cd COVIDScenarioPipeline; git rev-parse HEAD"
)
manifest["csp_sha"] = subprocess.getoutput("cd COVIDScenarioPipeline; git rev-parse HEAD")

# Prepare to tar up the current directory, excluding any dvc outputs, so it
# can be shipped to S3
Expand All @@ -386,25 +368,17 @@ def launch(self, job_name, config_file, scenarios, p_death_names, job_queues):
elif q == "sample_data":
for r in os.listdir("COVIDScenarioPipeline/sample_data"):
if r != "united-states-commutes":
tar.add(
os.path.join(
"COVIDScenarioPipeline", "sample_data", r
)
)
tar.add(os.path.join("COVIDScenarioPipeline", "sample_data", r))
elif not (p.startswith(".") or p.endswith("tar.gz") or p in self.outputs):
tar.add(
p,
filter=lambda x: None
if os.path.basename(x.name).startswith(".")
else x,
filter=lambda x: None if os.path.basename(x.name).startswith(".") else x,
)
tar.close()

# Upload the tar'd contents of this directory and the runner script to S3
runner_script_name = f"{job_name}-runner.sh"
local_runner_script = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "inference_runner.sh"
)
local_runner_script = os.path.join(os.path.dirname(os.path.realpath(__file__)), "inference_runner.sh")
s3_client = boto3.client("s3")
s3_client.upload_file(local_runner_script, self.s3_bucket, runner_script_name)
s3_client.upload_file(tarfile_name, self.s3_bucket, tarfile_name)
Expand All @@ -413,15 +387,11 @@ def launch(self, job_name, config_file, scenarios, p_death_names, job_queues):
# Save the manifest file to S3
with open("manifest.json", "w") as f:
json.dump(manifest, f, indent=4)
s3_client.upload_file(
"manifest.json", self.s3_bucket, f"{job_name}/manifest.json"
)
s3_client.upload_file("manifest.json", self.s3_bucket, f"{job_name}/manifest.json")

# Create job to copy output to appropriate places
copy_script_name = f"{job_name}-copy.sh"
local_runner_script = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "inference_copy.sh"
)
local_runner_script = os.path.join(os.path.dirname(os.path.realpath(__file__)), "inference_copy.sh")
s3_client.upload_file(local_runner_script, self.s3_bucket, copy_script_name)

# Prepare and launch the num_jobs via AWS Batch.
Expand Down Expand Up @@ -462,15 +432,11 @@ def launch(self, job_name, config_file, scenarios, p_death_names, job_queues):
cur_env_vars = base_env_vars.copy()
cur_env_vars.append({"name": "COVID_SCENARIOS", "value": s})
cur_env_vars.append({"name": "COVID_DEATHRATES", "value": d})
cur_env_vars.append(
{"name": "COVID_PREFIX", "value": f"{config['name']}/{s}/{d}"}
)
cur_env_vars.append({"name": "COVID_PREFIX", "value": f"{config['name']}/{s}/{d}"})
cur_env_vars.append({"name": "COVID_BLOCK_INDEX", "value": "1"})
cur_env_vars.append({"name": "COVID_RUN_INDEX", "value": f"{self.run_id}"})
if not (self.restart_from_s3_bucket is None):
cur_env_vars.append(
{"name": "S3_LAST_JOB_OUTPUT", "value": self.restart_from_s3_bucket}
)
cur_env_vars.append({"name": "S3_LAST_JOB_OUTPUT", "value": self.restart_from_s3_bucket})
cur_env_vars.append(
{
"name": "COVID_OLD_RUN_INDEX",
Expand Down Expand Up @@ -501,24 +467,12 @@ def launch(self, job_name, config_file, scenarios, p_death_names, job_queues):
cur_env_vars = base_env_vars.copy()
cur_env_vars.append({"name": "COVID_SCENARIOS", "value": s})
cur_env_vars.append({"name": "COVID_DEATHRATES", "value": d})
cur_env_vars.append(
{"name": "COVID_PREFIX", "value": f"{config['name']}/{s}/{d}"}
)
cur_env_vars.append(
{"name": "COVID_BLOCK_INDEX", "value": f"{block_idx+1}"}
)
cur_env_vars.append(
{"name": "COVID_RUN_INDEX", "value": f"{self.run_id}"}
)
cur_env_vars.append(
{"name": "COVID_OLD_RUN_INDEX", "value": f"{self.run_id}"}
)
cur_env_vars.append(
{"name": "S3_LAST_JOB_OUTPUT", "value": f"{results_path}/"}
)
cur_env_vars.append(
{"name": "JOB_NAME", "value": f"{cur_job_name}_block{block_idx}"}
)
cur_env_vars.append({"name": "COVID_PREFIX", "value": f"{config['name']}/{s}/{d}"})
cur_env_vars.append({"name": "COVID_BLOCK_INDEX", "value": f"{block_idx+1}"})
cur_env_vars.append({"name": "COVID_RUN_INDEX", "value": f"{self.run_id}"})
cur_env_vars.append({"name": "COVID_OLD_RUN_INDEX", "value": f"{self.run_id}"})
cur_env_vars.append({"name": "S3_LAST_JOB_OUTPUT", "value": f"{results_path}/"})
cur_env_vars.append({"name": "JOB_NAME", "value": f"{cur_job_name}_block{block_idx}"})
cur_job = batch_client.submit_job(
jobName=f"{cur_job_name}_block{block_idx}",
jobQueue=cur_job_queue,
Expand Down Expand Up @@ -567,10 +521,8 @@ def launch(self, job_name, config_file, scenarios, p_death_names, job_queues):
)

if not (self.restart_from_s3_bucket is None):
print(
f"Resuming from run id is {self.restart_from_run_id} located in {self.restart_from_s3_bucket}"
)
if (self.resume_discard_seeding):
print(f"Resuming from run id is {self.restart_from_run_id} located in {self.restart_from_s3_bucket}")
if self.resume_discard_seeding:
print(f"Discarding seeding results")
print(f"Final output will be: {results_path}/model_output/")
print(f"Run id is {self.run_id}")
Expand Down
20 changes: 5 additions & 15 deletions batch/inference_job_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@

def process_child_jobs(parent_job_id, tracker, next_token=None):
if next_token is not None:
child_jobs = batch.list_jobs(
arrayJobId=parent_job_id, jobStatus="RUNNING", nextToken=next_token
)
child_jobs = batch.list_jobs(arrayJobId=parent_job_id, jobStatus="RUNNING", nextToken=next_token)
else:
child_jobs = batch.list_jobs(arrayJobId=parent_job_id, jobStatus="RUNNING")
tracker["RUNNING"] = tracker.get("RUNNING", 0) + len(child_jobs["jobSummaryList"])
Expand All @@ -18,9 +16,7 @@ def process_child_jobs(parent_job_id, tracker, next_token=None):

def process_parent_jobs(job_queue, parent_tracker, next_token=None):
if next_token is not None:
parent_jobs = batch.list_jobs(
jobQueue=job_queue, jobStatus="PENDING", nextToken=next_token
)
parent_jobs = batch.list_jobs(jobQueue=job_queue, jobStatus="PENDING", nextToken=next_token)
else:
parent_jobs = batch.list_jobs(jobQueue=job_queue, jobStatus="PENDING")

Expand All @@ -29,16 +25,10 @@ def process_parent_jobs(job_queue, parent_tracker, next_token=None):
tracker = {}
next_child_token = process_child_jobs(job["jobId"], tracker)
while next_child_token is not None:
next_child_token = process_child_jobs(
job["jobId"], tracker, next_child_token
)
next_child_token = process_child_jobs(job["jobId"], tracker, next_child_token)
if tracker["RUNNING"]:
print(
f"Parent job {job['jobName']} had {tracker['RUNNING']} running child jobs."
)
parent_tracker["CHILD_JOBS"] = (
parent_tracker.get("CHILD_JOBS", 0) + tracker["RUNNING"]
)
print(f"Parent job {job['jobName']} had {tracker['RUNNING']} running child jobs.")
parent_tracker["CHILD_JOBS"] = parent_tracker.get("CHILD_JOBS", 0) + tracker["RUNNING"]

return parent_jobs["nextToken"] if "nextToken" in parent_jobs else None

Expand Down
19 changes: 4 additions & 15 deletions batch/launch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,20 +198,13 @@ def launch_job_inner(
tarfile_name = f"{job_name}.tar.gz"
tar = tarfile.open(tarfile_name, "w:gz")
for p in os.listdir("."):
if not (
p.startswith(".")
or p.endswith("tar.gz")
or p in dvc_outputs
or p == "batch"
):
if not (p.startswith(".") or p.endswith("tar.gz") or p in dvc_outputs or p == "batch"):
tar.add(p, filter=lambda x: None if x.name.startswith(".") else x)
tar.close()

# Upload the tar'd contents of this directory and the runner script to S3
runner_script_name = f"{job_name}-runner.sh"
local_runner_script = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "runner.sh"
)
local_runner_script = os.path.join(os.path.dirname(os.path.realpath(__file__)), "runner.sh")
s3_client = boto3.client("s3")
s3_client.upload_file(local_runner_script, s3_input_bucket, runner_script_name)
s3_client.upload_file(tarfile_name, s3_input_bucket, tarfile_name)
Expand All @@ -228,9 +221,7 @@ def launch_job_inner(
{"name": "S3_RESULTS_PATH", "value": results_path},
{"name": "SLOTS_PER_JOB", "value": str(slots_per_job)},
]
s3_cp_run_script = (
f"aws s3 cp s3://{s3_input_bucket}/{runner_script_name} $PWD/run-covid-pipeline"
)
s3_cp_run_script = f"aws s3 cp s3://{s3_input_bucket}/{runner_script_name} $PWD/run-covid-pipeline"
command = ["sh", "-c", f"{s3_cp_run_script}; /bin/bash $PWD/run-covid-pipeline"]
container_overrides = {
"vcpus": vcpu,
Expand All @@ -257,9 +248,7 @@ def launch_job_inner(
containerOverrides=container_overrides,
)

print(
f"Batch job with id {resp['jobId']} launched; output will be written to {results_path}"
)
print(f"Batch job with id {resp['jobId']} launched; output will be written to {results_path}")


def get_dvc_outputs():
Expand Down
Loading

0 comments on commit 2fe689b

Please sign in to comment.