Skip to content

Commit e460c4b

Browse files
authored
Merge pull request #10 from realratchet/main
Change default behaviour on unix to spawn instead of fork
2 parents 84735cd + 47bd6df commit e460c4b

1 file changed

Lines changed: 23 additions & 9 deletions

File tree

mplite/__init__.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import io
2+
import sys
23
import multiprocessing
34
import traceback
45
import time
@@ -7,10 +8,10 @@
78
from itertools import count
89

910

10-
major, minor, patch = 1, 2, 3
11+
major, minor, patch = 1, 2, 4
1112
__version_info__ = (major, minor, patch)
1213
__version__ = '.'.join(str(i) for i in __version_info__)
13-
14+
default_context = None if "pytest" in sys.modules else "spawn"
1415

1516
class Task(object):
1617
task_id_counter = count(start=1)
@@ -46,13 +47,25 @@ def execute(self):
4647
return error
4748

4849

49-
class Worker(multiprocessing.Process):
50-
def __init__(self, name, tq, rq):
51-
super().__init__(group=None, target=self.update, name=name, daemon=False)
50+
class Worker(object):
51+
def __init__(self, ctx, name, tq, rq):
52+
self.ctx = ctx
5253
self.exit = multiprocessing.Event()
5354
self.tq = tq # workers task queue
5455
self.rq = rq # workers result queue
5556

57+
self.process = ctx.Process(group=None, target=self.update, name=name, daemon=False)
58+
59+
def start(self):
60+
self.process.start()
61+
62+
def is_alive(self):
63+
return self.process.is_alive()
64+
65+
@property
66+
def exitcode(self):
67+
return self.process.exitcode
68+
5669
def update(self):
5770
while True:
5871
try:
@@ -73,10 +86,11 @@ def update(self):
7386

7487

7588
class TaskManager(object):
76-
def __init__(self, cpu_count=None) -> None:
89+
def __init__(self, cpu_count=None, context=default_context) -> None:
90+
self._ctx = multiprocessing.get_context(context)
7791
self._cpus = multiprocessing.cpu_count() if cpu_count is None else cpu_count
78-
self.tq = multiprocessing.Queue()
79-
self.rq = multiprocessing.Queue()
92+
self.tq = self._ctx.Queue()
93+
self.rq = self._ctx.Queue()
8094
self.pool: list[Worker] = []
8195
self._open_tasks = 0
8296

@@ -89,7 +103,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): # signature requires these, thou
89103

90104
def start(self):
91105
for i in range(self._cpus): # create workers
92-
worker = Worker(name=str(i), tq=self.tq, rq=self.rq)
106+
worker = Worker(self._ctx, name=str(i), tq=self.tq, rq=self.rq)
93107
self.pool.append(worker)
94108
worker.start()
95109
while not all(p.is_alive() for p in self.pool):

0 commit comments

Comments
 (0)