Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions pypesto/engine/multi_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import multiprocessing
import os
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Any, Union

import cloudpickle as pickle
Expand Down Expand Up @@ -57,6 +58,8 @@ def execute(
) -> list[Any]:
"""Pickle tasks and distribute work over parallel processes.

Tasks are pickled on-demand as workers become available.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's true. ProcessPoolExecutor.submit is non-blocking.
If it were true, the progress bar handling wouldn't work, because it would only start after the tasks have been submitted.

Closes #1499?

No. For closing that, after submitting the initial n_procs tasks, a new tasks should only be submitted after a previous one finished.

Nothing against merging this nonetheless, but please keep the other issue open.


Parameters
----------
tasks:
Expand All @@ -69,21 +72,29 @@ def execute(
A list of results.
"""
n_tasks = len(tasks)

pickled_tasks = [pickle.dumps(task) for task in tasks]

n_procs = min(self.n_procs, n_tasks)
logger.debug(f"Parallelizing on {n_procs} processes.")

ctx = multiprocessing.get_context(method=self.method)

with ctx.Pool(processes=n_procs) as pool:
results = list(
tqdm(
pool.imap(work, pickled_tasks),
total=len(pickled_tasks),
enable=progress_bar,
),
)
# Use ProcessPoolExecutor for on-demand pickling
with ProcessPoolExecutor(
max_workers=n_procs, mp_context=ctx
) as executor:
# Submit tasks and track futures
future_to_index = {
executor.submit(work, pickle.dumps(task)): i
for i, task in enumerate(tasks)
}

# Collect results in original order
results = [None] * n_tasks
for future in tqdm(
as_completed(future_to_index),
total=n_tasks,
enable=progress_bar,
):
index = future_to_index[future]
results[index] = future.result()

return results