Skip to content

Commit

Permalink
Merge pull request #2684 from oliver-sanders/2680-2.0.x
Browse files Browse the repository at this point in the history
job runner: limit concurrency
  • Loading branch information
MetRonnie authored Apr 19, 2023
2 parents 3f41454 + 096f300 commit 76bf732
Showing 1 changed file with 80 additions and 69 deletions.
149 changes: 80 additions & 69 deletions metomi/rose/job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ def update(self, other):
class JobRunner:
"""Runs JobProxy objects with pool of workers."""

ASYNC_SLEEP_TIME = 0.1

def __init__(self, job_processor, nproc=None):
"""
Initialise a job runner.
Expand All @@ -161,87 +163,96 @@ def __init__(self, job_processor, nproc=None):
"""
self.job_processor = job_processor

def run(self, job_manager, *args):
"""
Start the job runner with an instance of JobManager.
def run(self, job_manager, *args, concurrency=6):
"""Start the job runner with an instance of JobManager.
Args:
job_manager (JobManager):
A JobManager object used to handle the list of jobs to be done
Outline:
+------------------+
+----> job_manager. +------------> FINISH
| | has_jobs ? | No
| +------------------+
| |Yes
| +---v--------------+
| |Post-process any |
| |finished jobs |
| +---+--------------+
| |
| +---v--------------------------------+
| | Check for ready jobs |
| | Add ready jobs to "awaiting" |
| +---+-------------------------^------+
| | |
| +---v---------------+ |
+----+ Are there any jobs| |
No | in "awaiting"? | |
+-------------------+ |
|Yes |
+-------------------+ |
| Add jobs to event | |
| loop - wait until | |
| asyncio returns | |
| first result. +---------+
+-------------------+
args:
Arguments to pass through to jobs / post-processing.
concurrency:
The maximum number of jobs to run concurrently.
"""
running = []
loop = asyncio.get_event_loop()
loop.set_exception_handler(self.job_processor.handle_event)
results = {}
loop.run_until_complete(
asyncio.gather(
self._run_jobs(running, job_manager, args, concurrency),
self._post_process_jobs(running, job_manager, args),
)
)
dead_jobs = job_manager.get_dead_jobs()
if dead_jobs:
raise JobRunnerNotCompletedError(dead_jobs)

async def _run_jobs(self, running, job_manager, args, concurrency):
"""Run pending jobs subject to the concurrency limit.
This coroutine exits when there are no more jobs left to run.
Args:
running:
Jobs will be added to this list when run.
job_manager:
A JobManager object used to handle the list of jobs to be done
args:
Arguments to pass through to jobs / post-processing.
concurrency:
The maximum number of jobs to run concurrently.
"""
while job_manager.has_jobs():
# Post-process all finished jobs and handle exceptions.
for job_proxy, result in list(results.items()):
results.pop(job_proxy)
job_proxy.exc = result.exception()
job_manager.put_job(job_proxy)
if not job_proxy.exc:
self.job_processor.post_process_job(job_proxy, *args)
self.job_processor.handle_event(JobEvent(job_proxy))
else:
self.job_processor.handle_event(job_proxy.exc)

awaiting = set()

def get_ready_jobs():
"""Get list of jobs with satisfied dependencies"""
while job_manager.has_ready_jobs():
job = job_manager.get_job()
if job is None:
break
task = loop.create_task(
self.job_processor.process_job(job, *args)
)
task.job = job
awaiting.add(task)

get_ready_jobs()
while awaiting:
# Submit all tasks in awaiting to event loop, then wait until
# one of them completes, at which point check whether any more
# jobs have become ready and submit those.
just_completed, awaiting = loop.run_until_complete(
asyncio.wait(awaiting, return_when=asyncio.FIRST_COMPLETED)
while len(running) < concurrency:
# run jobs
job = job_manager.get_job()
if job is None:
# we've run out of jobs for now
break
task = asyncio.create_task(
self.job_processor.process_job(job, *args)
)
results.update({task.job: task for task in just_completed})
get_ready_jobs()
task.job = job
running.append(task)
# we've hit the concurrency limit => wait
await asyncio.sleep(self.ASYNC_SLEEP_TIME)

dead_jobs = job_manager.get_dead_jobs()
if dead_jobs:
raise JobRunnerNotCompletedError(dead_jobs)
async def _post_process_jobs(self, running, job_manager, args):
"""Post process completed jobs.
This coroutine exits when there are not more jobs left to run / post
process.
Args:
running:
Jobs will be added to this list when run.
job_manager:
A JobManager object used to handle the list of jobs to be done
args:
Arguments to pass through to jobs / post-processing.
"""
while running or job_manager.has_jobs():
if not running:
# wait for more tasks to be queued
await asyncio.sleep(self.ASYNC_SLEEP_TIME)
continue
done, _running = await asyncio.wait(
running,
return_when=asyncio.FIRST_COMPLETED
)
for task in done:
running.remove(task)
job = task.job
job.exc = task.exception()
job_manager.put_job(job)
if not job.exc:
self.job_processor.post_process_job(job, *args)
self.job_processor.handle_event(JobEvent(job))
else:
self.job_processor.handle_event(job.exc)

__call__ = run

Expand Down

0 comments on commit 76bf732

Please sign in to comment.