-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtask_dispatcher.py
More file actions
126 lines (102 loc) · 3.15 KB
/
task_dispatcher.py
File metadata and controls
126 lines (102 loc) · 3.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import logging
import time
import threading
import multiprocessing
import argparse
class Data:
def __init__(self):
self.time = time.ctime()
def __str__(self):
return f'{self.time}'
from celery import Celery
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise argparse.ArgumentTypeError('Boolean value expected.')
logging.basicConfig(level=logging.DEBUG)
local = '127.0.0.1'
port = 6379
url = f'redis://{local}:{port}/0'
app = Celery('tasks', broker=url)
config = {'CELERY_RESULT_BACKEND': url,
'CELERY_ACCEPT_CONTENT': ['pickle', 'json'],
'CELERY_RESULT_SERIALIZER': 'pickle'
}
app.conf.update(config)
def new_app():
app = Celery('tasks', broker=url)
config = {'CELERY_RESULT_BACKEND': url,
'CELERY_ACCEPT_CONTENT': ['pickle', 'json'],
'CELERY_RESULT_SERIALIZER': 'pickle'
}
app.conf.update(config)
return app
def manage_sqrt_task(b: threading.Barrier, value):
app_n = new_app()
b.wait()
result = app_n.send_task('tasks.square_root', args=(value,))
logging.info(result.get())
def manage_data_m(b: multiprocessing.Barrier):
app_ = new_app()
name = multiprocessing.process.current_process().name
b.wait()
t = time.time()
result = app_.send_task('tasks.data', args=())
data = result.get()
t2 = time.time()
delta = t2 - t
logging.info(f'The Overall time taken is {delta}')
def manage_data_t(b: threading.Barrier):
app_ = new_app()
name = threading.currentThread().getName()
b.wait()
t = time.time()
result = app_.send_task('tasks.data', args=())
data = result.get()
t2 = time.time()
delta = t2 - t
logging.info(f'{name} The Overall time taken is {delta}')
def count_down(cnt: int, b: threading.Barrier):
b.wait()
t = time.time()
while cnt > 0:
cnt -= 1
delta = time.time() - t
print(f"time taken {delta}")
def count_down_m(cnt: int, b: multiprocessing.Barrier):
b.wait()
print(f'Starting {time.ctime()}')
t = time.time()
while cnt > 0:
cnt -= 1
delta = time.time() - t
print(f"time taken {delta}")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--parallel", type=str2bool, required=True, help="increase output verbosity")
parser.add_argument("--number", type=int, required=True, help="increase output verbosity")
args = parser.parse_args()
n = args.number
cnt = 500_000_000
tasks = list()
if not args.parallel:
print('running in threaded mode')
b = threading.Barrier(n)
for i in range(n):
t = threading.Thread(target=manage_data_t, args=(b,))
tasks.append(t)
else:
print('running in parallel mode')
b = multiprocessing.Barrier(n)
for i in range(n):
t = multiprocessing.Process(target=manage_data_m, args=(b,))
tasks.append(t)
for t in tasks:
t.start()
for t in tasks:
t.join()