Skip to content

Commit 909db9b

Browse files
committed
Use multiprocess if available
1 parent 90f19fa commit 909db9b

File tree

9 files changed

+53
-28
lines changed

9 files changed

+53
-28
lines changed

CHANGES

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Changes
1515
- Fix #373: read DOIT_CONFIG from TOML.
1616
- Fix #405: Add Task attribute `meta`.
1717
- Fix #349: Handle passing task args in "single" task execution.
18+
- Fix #369: Support `multiprocess` if manually installed.
1819

1920
0.33.1 (*2020-09-04*)
2021
=====================

doc/cmd_run.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,8 @@ parallel execution
240240
This allows different tasks to be run in parallel, as long any dependencies are met.
241241
By default the `multiprocessing <http://docs.python.org/library/multiprocessing.html>`_
242242
module is used.
243+
If the `multiprocess <https://pypi.org/project/multiprocess/>`_ module is installed,
244+
it will be used instead.
243245
So the same restrictions also apply to the use of multiprocessing in `doit`.
244246

245247
.. code-block:: console

doc/install.rst

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,15 @@ If you are using python 2::
1616
$ pip install "doit<0.30"
1717

1818

19-
If you want to use non-local plugins you need to install `setuptools` as well.
19+
If you want to use non-local plugins you need to install `setuptools` as well::
2020

2121
$ pip install doit[plugins]
2222

23+
If you are running with multiple processes and get a ``PicklingError``,
24+
installing `multiprocess` may resolve the issue::
25+
26+
$ pip install doit[multiprocess]
27+
2328

2429
Source
2530
^^^^^^

doit/cmd_auto.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
import os
55
import time
66
import sys
7-
from multiprocessing import Process
87
from subprocess import call
98

9+
from .compat import Process
1010
from .exceptions import InvalidCommand
1111
from .cmdparse import CmdParse
1212
from .filewatch import FileModifyWatcher

doit/compat.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,30 @@
11
"""stuff dealing with incompatibilities between python versions"""
22

3+
try:
4+
from multiprocess import Process, Queue as MQueue
5+
HAS_MULTIPROCESS = True
6+
except ImportError:
7+
from multiprocessing import Process, Queue as MQueue
8+
HAS_MULTIPROCESS = False
9+
Process # pyflakes
10+
MQueue # pyflakes
11+
12+
13+
def is_multiprocessing_available():
14+
# see: http://bugs.python.org/issue3770
15+
# not available on BSD systens
16+
try:
17+
if HAS_MULTIPROCESS:
18+
import multiprocess.synchronize
19+
multiprocess
20+
else:
21+
import multiprocessing.synchronize
22+
multiprocessing
23+
except ImportError: # pragma: no cover
24+
return False
25+
else:
26+
return True
27+
328

429
def get_platform_system():
530
"""return platform.system

doit/runner.py

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
"""Task runner"""
22

3-
from multiprocessing import Process, Queue as MQueue
43
from threading import Thread
54
import pickle
65
import queue
76

87
import cloudpickle
98

9+
from .compat import is_multiprocessing_available, MQueue, Process
1010
from .exceptions import InvalidTask, CatchedException
1111
from .exceptions import TaskFailed, SetupError, DependencyError, UnmetDependency
1212
from .task import Stream, DelayedLoaded
@@ -327,16 +327,7 @@ class MRunner(Runner):
327327
@staticmethod
328328
def available():
329329
"""check if multiprocessing module is available"""
330-
# see: https://bitbucket.org/schettino72/doit/issue/17
331-
# http://bugs.python.org/issue3770
332-
# not available on BSD systens
333-
try:
334-
import multiprocessing.synchronize
335-
multiprocessing # pyflakes
336-
except ImportError: # pragma: no cover
337-
return False
338-
else:
339-
return True
330+
return is_multiprocessing_available()
340331

341332
def __init__(self, dep_manager, reporter,
342333
continue_=False, always_execute=False,

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
extras_require={
6565
':sys.platform == "darwin"': ['macfsevents'],
6666
':sys.platform == "linux"': ['pyinotify'],
67+
'multiprocess': ['multiprocess'],
6768
'plugins': ['setuptools'],
6869
'toml': ['toml >=0.10.1']
6970
},

tests/test_cmd_auto.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import time
2-
from multiprocessing import Process
32

43
import pytest
54

5+
from doit.compat import Process
66
from doit.cmdparse import DefaultUpdate
77
from doit.task import Task
88
from doit.cmd_base import TaskLoader

tests/test_runner.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import os
22
import sys
33
import pickle
4-
from multiprocessing import Queue
54
import platform
65
from unittest.mock import Mock
76

87
import pytest
98

9+
from doit.compat import MQueue
1010
from doit.exceptions import CatchedException, InvalidTask
1111
from doit.dependency import DbmDB, Dependency
1212
from doit.reporter import ConsoleReporter
@@ -557,7 +557,7 @@ def testSystemExitRaises(self, reporter, RunnerClass, dep_manager):
557557
class TestMReporter(object):
558558
class MyRunner(object):
559559
def __init__(self):
560-
self.result_q = Queue()
560+
self.result_q = MQueue()
561561

562562
def testReporterMethod(self, reporter):
563563
fake_runner = self.MyRunner()
@@ -696,8 +696,8 @@ def test_all_processes(self, reporter, monkeypatch, dep_manager):
696696
td = TaskDispatcher({'t1':t1, 't2':t2}, [], ['t1', 't2'])
697697
run = runner.MRunner(dep_manager, reporter, num_process=2)
698698
run._run_tasks_init(td)
699-
result_q = Queue()
700-
task_q = Queue()
699+
result_q = MQueue()
700+
task_q = MQueue()
701701

702702
proc_list = run._run_start_processes(task_q, result_q)
703703
run.finish()
@@ -714,8 +714,8 @@ def test_less_processes(self, reporter, monkeypatch, dep_manager):
714714
td = TaskDispatcher({'t1':t1}, [], ['t1'])
715715
run = runner.MRunner(dep_manager, reporter, num_process=2)
716716
run._run_tasks_init(td)
717-
result_q = Queue()
718-
task_q = Queue()
717+
result_q = MQueue()
718+
task_q = MQueue()
719719

720720
proc_list = run._run_start_processes(task_q, result_q)
721721
run.finish()
@@ -732,8 +732,8 @@ def test_waiting_process(self, reporter, monkeypatch, dep_manager):
732732
td = TaskDispatcher({'t1':t1, 't2':t2}, [], ['t1', 't2'])
733733
run = runner.MRunner(dep_manager, reporter, num_process=2)
734734
run._run_tasks_init(td)
735-
result_q = Queue()
736-
task_q = Queue()
735+
result_q = MQueue()
736+
task_q = MQueue()
737737

738738
proc_list = run._run_start_processes(task_q, result_q)
739739
run.finish()
@@ -779,10 +779,10 @@ def test_task_not_picklabe_thread(self, reporter, dep_manager):
779779
class TestMRunner_execute_task(object):
780780
def test_hold(self, reporter, dep_manager):
781781
run = runner.MRunner(dep_manager, reporter)
782-
task_q = Queue()
782+
task_q = MQueue()
783783
task_q.put(runner.JobHold()) # to test
784784
task_q.put(None) # to terminate function
785-
result_q = Queue()
785+
result_q = MQueue()
786786
run.execute_task_subprocess(task_q, result_q, reporter.__class__)
787787
run.finish()
788788
# nothing was done
@@ -792,10 +792,10 @@ def test_full_task(self, reporter, dep_manager):
792792
# test execute_task_subprocess can receive a full Task object
793793
run = runner.MRunner(dep_manager, reporter)
794794
t1 = Task('t1', [simple_result])
795-
task_q = Queue()
795+
task_q = MQueue()
796796
task_q.put(runner.JobTask(t1)) # to test
797797
task_q.put(None) # to terminate function
798-
result_q = Queue()
798+
result_q = MQueue()
799799
run.execute_task_subprocess(task_q, result_q, reporter.__class__)
800800
run.finish()
801801
# check result
@@ -812,10 +812,10 @@ def test_full_task_fail(self, reporter, dep_manager):
812812
# test execute_task_subprocess can receive a full Task object
813813
run = runner.MRunner(dep_manager, reporter)
814814
t1 = Task('t1', [simple_fail])
815-
task_q = Queue()
815+
task_q = MQueue()
816816
task_q.put(runner.JobTask(t1)) # to test
817817
task_q.put(None) # to terminate function
818-
result_q = Queue()
818+
result_q = MQueue()
819819
run.execute_task_subprocess(task_q, result_q, reporter.__class__)
820820
run.finish()
821821
# check result

0 commit comments

Comments
 (0)