Skip to content

Commit 7295498

Browse files
author
Jens Hahne
committedApr 4, 2023
Refactoring scheduler to new params strategy
1 parent f8abf2e commit 7295498

14 files changed

+666
-94
lines changed
 

‎blockops/iteration.py

+15-15
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
from blockops.block import BlockOperator, I
1313
from blockops.run import PintRun
1414
from blockops.taskPool import TaskPool
15-
from blockops.schedule import getSchedule
1615
from blockops.utils.expr import getCoeffsFromFormula
1716
from blockops.graph import PintGraph
17+
from blockops.scheduler import getSchedule
1818

1919

2020
# -----------------------------------------------------------------------------
@@ -202,36 +202,36 @@ def __call__(self, nIter, nBlocks=None, u0=None, initSol=False, predSol=None):
202202
else:
203203
return u[:, 1:]
204204

205-
def getRuntime(self, N, K, nProc, schedule_type='BLOCK-BY-BLOCK'):
205+
def getRuntime(self, N, K, nProc, schedulerType='BLOCK-BY-BLOCK'):
206206
K = self.checkK(N=N, K=K)
207207
run = PintRun(blockIteration=self, nBlocks=N, kMax=K)
208208
pool = TaskPool(run=run)
209-
schedule = getSchedule(taskPool=pool, nProc=nProc, nPoints=N + 1, schedule_type=schedule_type)
209+
schedule = getSchedule(taskPool=pool, nProc=nProc, nPoints=N + 1, schedulerType=schedulerType)
210210
return schedule.getRuntime()
211211

212-
def getPerformances(self, N, K, nProc=None, schedule_type='BLOCK-BY-BLOCK', verbose=False):
212+
def getPerformances(self, N, K, nProc=None, schedulerType='BLOCK-BY-BLOCK', verbose=False):
213213

214214
seqPropCost = self.propagator.cost
215215
if (seqPropCost is None) or (seqPropCost == 0):
216216
raise ValueError(
217217
'no cost given for fine propagator,'
218218
' cannot measure performances')
219-
runtime_ts = seqPropCost * N
219+
runtimeTs = seqPropCost * N
220220

221221
K = self.checkK(N=N, K=K)
222-
print(f' -- computing {schedule_type} cost for K={K}')
222+
print(f' -- computing {schedulerType} cost for K={K}')
223223

224224
run = PintRun(blockIteration=self, nBlocks=N, kMax=K)
225225
pool = TaskPool(run=run)
226226
schedule = getSchedule(
227227
taskPool=pool, nProc=nProc, nPoints=N + 1,
228-
schedule_type=schedule_type)
228+
schedulerType=schedulerType)
229229
runtime = schedule.getRuntime()
230230
nProc = schedule.nProc
231231

232232
if verbose:
233233
graph = PintGraph(N, max(K), pool)
234-
optimal_runtime = graph.longestPath()
234+
optimalRuntime = graph.longestPath()
235235
print('=============================')
236236
if self.name is None:
237237
print(f'Block iteration: {self.update}')
@@ -241,14 +241,14 @@ def getPerformances(self, N, K, nProc=None, schedule_type='BLOCK-BY-BLOCK', verb
241241
print(f'Predictor: {self.predictor}')
242242
print(f'N={N}, K={K} \n')
243243
print(f'Runtime of schedule={schedule.NAME} for nProc={nProc}: {runtime}')
244-
print(f'Runtime time-stepping: {runtime_ts} (This is currently not the correct value)')
245-
print(f'Speedup of schedule={schedule.NAME} for nProc={nProc}: {(runtime_ts / runtime):.2f} \n')
246-
print(f'Theoretical lower runtime bound: {optimal_runtime}')
244+
print(f'Runtime time-stepping: {runtimeTs} (This is currently not the correct value)')
245+
print(f'Speedup of schedule={schedule.NAME} for nProc={nProc}: {(runtimeTs / runtime):.2f} \n')
246+
print(f'Theoretical lower runtime bound: {optimalRuntime}')
247247
print(
248-
f'Theoretical maximum speedup compared to time stepping: {(runtime_ts / optimal_runtime):.2f} (This is currently not the correct value)')
248+
f'Theoretical maximum speedup compared to time stepping: {(runtimeTs / optimalRuntime):.2f} (This is currently not the correct value)')
249249
print('=============================')
250250

251-
speedup = runtime_ts / runtime
251+
speedup = runtimeTs / runtime
252252
efficiency = speedup / nProc
253253
return speedup, efficiency, nProc
254254

@@ -261,12 +261,12 @@ def plotGraph(self, N: int, K, figSize: tuple = (6.4, 4.8), saveFig: str = ""):
261261
figSize=figSize, saveFig=saveFig)
262262
return run, pool, pintGraph
263263

264-
def plotSchedule(self, N: int, K, nProc: int, schedule_type: str = 'BLOCK-BY-BLOCK', figSize: tuple = (8, 4.8),
264+
def plotSchedule(self, N: int, K, nProc: int, schedulerType: str = 'BLOCK-BY-BLOCK', figSize: tuple = (8, 4.8),
265265
saveFig: str = ""):
266266
K = self.checkK(N=N, K=K)
267267
run = PintRun(blockIteration=self, nBlocks=N, kMax=K)
268268
pool = TaskPool(run=run)
269-
schedule = getSchedule(taskPool=pool, nProc=nProc, nPoints=N + 1, schedule_type=schedule_type)
269+
schedule = getSchedule(taskPool=pool, nProc=nProc, nPoints=N + 1, schedulerType=schedulerType)
270270
schedule.plot(figName=None if self.name is None else self.name + f' ({schedule.NAME} schedule)',
271271
figSize=figSize, saveFig=saveFig)
272272

‎blockops/run.py

+8-4
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,9 @@ def createExpressions(self):
239239
self.blockRules[(n + 1, k + 1)] = {'result': res, 'rule': rule}
240240

241241
def factorizeBlockRules(self) -> None:
242+
"""
243+
Factorizes the block rules and saves everything in a dictionary
244+
"""
242245
for key, value in self.blockRules.items():
243246
self.facBlockRules[key] = {'rule': self.factorize(rule=value['rule'], res=value['result']),
244247
'result': value['result']
@@ -254,10 +257,11 @@ def factorize(self, rule, res: sy.Symbol) -> dict:
254257
The rule to compute the block iteration for n and k
255258
res : sy.Symbol
256259
The name of the result
257-
n : int
258-
Current block
259-
k : int
260-
Current iteration
260+
261+
Returns
262+
-------
263+
ruleDict : dict
264+
Dictionary representing factorized expression
261265
"""
262266

263267
# If rule is just a copy of another task

‎blockops/scheduler/__init__.py

+192
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
import pkgutil
2+
import numpy as np
3+
from typing import Dict
4+
from matplotlib.lines import Line2D
5+
import matplotlib.pyplot as plt
6+
from matplotlib.patches import Rectangle
7+
from abc import ABC, abstractmethod
8+
9+
from blockops.utils.params import ParamClass, setParams
10+
from blockops.utils.params import PositiveInteger, TaskPoolParam
11+
from blockops.taskPool import TaskPool
12+
13+
14+
class ScheduledTask:
15+
"""
16+
Helper class containing information about scheduled task
17+
"""
18+
19+
def __init__(self, proc: int, start: float, end: float, name: str, color: str = 'gray') -> None:
20+
"""
21+
Constructor for ScheduledTask
22+
23+
Parameters
24+
----------
25+
proc : int
26+
Process associated to this task
27+
start : float
28+
Start point
29+
end : float
30+
End point
31+
name : str
32+
Name of the task
33+
color : str
34+
Color of this task
35+
"""
36+
self.proc = proc
37+
self.start = start
38+
self.name = name
39+
self.end = end
40+
self.color = color
41+
42+
43+
@setParams(
44+
taskPool=TaskPoolParam(),
45+
nProc=PositiveInteger(),
46+
nPoints=PositiveInteger()
47+
)
48+
class Scheduler(ABC, ParamClass):
49+
"""
50+
Base class for a scheduler (assigns start points to tasks)
51+
52+
Parameters
53+
----------
54+
taskPool : TaskPool
55+
Task pool containing all tasks to be scheduled
56+
nProc : int
57+
The number of processes
58+
nPoints : int
59+
The number of blocks
60+
"""
61+
62+
NAME = None # unique name for the schedule
63+
IDS = set() # list of ids that can be used for usage of this schedule
64+
65+
def __init__(self, taskPool: TaskPool, nProc: int, nPoints: int) -> None:
66+
"""
67+
Constructor for abstract scheduler class
68+
"""
69+
self.schedule = {}
70+
self.taskPool = taskPool
71+
self.makespan = 0
72+
self.nProc = nProc
73+
self.startPointProc = np.zeros(self.nProc)
74+
self.nPoints = nPoints
75+
76+
@abstractmethod
77+
def computeSchedule(self):
78+
"""
79+
Abstract method.
80+
81+
Child classes overwrites the method to define schedule
82+
"""
83+
pass
84+
85+
@staticmethod
86+
def getDefaultNProc(N):
87+
"""Return a default number of processors for a given number of block"""
88+
return None
89+
90+
def getRuntime(self) -> float:
91+
"""
92+
Returns the runtime of a schedule
93+
94+
Returns
95+
-------
96+
self.makespan : float
97+
Runtime of schedule
98+
"""
99+
return self.makespan
100+
101+
def plot(self, figName: str, figSize: tuple = (8, 4.8), saveFig: str = "") -> None:
102+
"""
103+
Helper function to plot a schedule
104+
105+
Parameters
106+
----------
107+
figName : str
108+
Figure name
109+
figSize : tuple
110+
Figure size
111+
saveFig : str
112+
Saves the plot in the file *saveFig* if not equal to ""
113+
"""
114+
fig, ax = plt.subplots(1, 1, figsize=figSize, num=figName)
115+
colors = {}
116+
for key, value in self.schedule.items():
117+
time = value.end - value.start
118+
if time > 0:
119+
operation = value.name
120+
rec = Rectangle((value.start, value.proc + .225), time, .5, color='k', fc=value.color)
121+
ax.add_patch(rec)
122+
if value.color not in colors:
123+
colors[value.color] = operation
124+
ax.set_xlim(0, self.makespan)
125+
ax.set_ylim(0, self.nProc)
126+
ax.set_yticks(np.linspace(self.nProc - 1, 0, self.nProc) + 0.5)
127+
ax.set_yticklabels(['P' + str(i) for i in range(self.nProc - 1, -1, -1)])
128+
ax.set_ylabel(ylabel='Processor rank')
129+
ax.set_xlabel(xlabel='Computation cost')
130+
leg = [Line2D([0],
131+
[0],
132+
marker='o',
133+
color='w',
134+
label=value,
135+
markerfacecolor=key,
136+
markersize=15) for key, value in colors.items()]
137+
plt.legend(handles=leg,
138+
title='Task description',
139+
loc='upper center',
140+
bbox_to_anchor=(0.5, 1.25),
141+
ncol=5,
142+
fancybox=True,
143+
shadow=True,
144+
numpoints=1,
145+
fontsize=16)
146+
if saveFig != "":
147+
fig.savefig(saveFig, bbox_inches='tight', pad_inches=0.5)
148+
plt.show()
149+
150+
151+
# Dictionary to store all the scheduler implementations
152+
SCHEDULER: Dict[str, Scheduler] = {}
153+
154+
155+
def register(cls: Scheduler) -> Scheduler:
156+
for stringID in cls.IDS:
157+
SCHEDULER[stringID] = cls
158+
return cls
159+
160+
161+
# Import submodules to register Scheduler classes in SCHEDULER
162+
__all__ = [name for name in locals().keys() if not name.startswith('__')]
163+
for loader, moduleName, _ in pkgutil.walk_packages(__path__):
164+
__all__.append(moduleName)
165+
__import__(__name__ + '.' + moduleName)
166+
167+
168+
def getSchedule(taskPool: TaskPool, nProc: int, nPoints: int, schedulerType: str) -> Scheduler:
169+
"""
170+
Helper function to get scheduler and compute schedule
171+
172+
Parameters
173+
----------
174+
taskPool : str
175+
Figure name
176+
nProc: int, None
177+
Number of procs
178+
nPoints: int
179+
Number of blocks
180+
schedulerType: str
181+
Name of the scheduler
182+
183+
Returns
184+
-------
185+
scheduler : Scheduler
186+
Scheduler containing schedule
187+
"""
188+
SchedulerClass = SCHEDULER[schedulerType]
189+
nProc = SchedulerClass.getDefaultNProc(nPoints - 1) if nProc is None else nProc
190+
scheduler = SchedulerClass(taskPool=taskPool, nProc=nProc, nPoints=nPoints)
191+
scheduler.computeSchedule()
192+
return scheduler

‎blockops/scheduler/listScheduler.py

+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import sympy as sy
2+
from abc import ABC, abstractmethod
3+
4+
from blockops.scheduler import Scheduler, TaskPool
5+
from blockops.utils.params import setParams
6+
7+
@setParams(
8+
)
9+
class listScheduler(Scheduler, ABC):
10+
"""
11+
Abstract list scheduler class
12+
"""
13+
14+
NAME = ""
15+
IDS = {}
16+
17+
def __init__(self, taskPool: TaskPool, nProc: int, nPoints: int) -> None:
18+
"""
19+
Constructor for PinTBlockByBlock scheduler
20+
"""
21+
super().__init__(taskPool=taskPool, nProc=nProc, nPoints=nPoints)
22+
23+
# Use three list to divide tasks:
24+
# - availableTasks: All prerequisites are fulfilled
25+
# - notAvailableTasks: At least one prerequisite is not fulfilled
26+
# - finishedTasks: Finished tasks
27+
self.availableTasks = set([key for key, value in self.taskPool.pool.items() if len(value.dep) == 0])
28+
self.notAvailableTasks = set([key for key, value in self.taskPool.pool.items() if len(value.dep) > 0])
29+
self.finishedTasks = set([])
30+
31+
@abstractmethod
32+
def pickTask(self):
33+
"""
34+
Abstract method
35+
Contains logic to pick the next task to schedule.
36+
"""
37+
raise NotImplementedError()
38+
39+
@abstractmethod
40+
def assignTask(self, taskName: sy.Symbol) -> None:
41+
"""
42+
Abstract method for assigning task
43+
44+
Parameters
45+
----------
46+
taskName : sy.Symbol
47+
The name of the task to be scheduled
48+
"""
49+
raise NotImplementedError()
50+
51+
def updateLists(self, taskName: sy.Symbol) -> None:
52+
"""
53+
Updates all three lists after task *taskName* is scheduled
54+
55+
Parameters
56+
----------
57+
taskName : sy.Symbol
58+
The name of the last scheduled task
59+
"""
60+
# Remove task from available task and add to finished
61+
self.finishedTasks.add(taskName)
62+
self.availableTasks.remove(taskName)
63+
64+
# Iterating on all following tasks
65+
task = self.taskPool.getTask(name=taskName)
66+
for item in task.followingTasks:
67+
folTask = self.taskPool.getTask(name=item)
68+
# Check if all dependencies are finished
69+
if sum(el in self.finishedTasks for el in folTask.dep) == len(folTask.dep):
70+
# Check if task is not already finished or available
71+
if item not in self.finishedTasks and item not in self.availableTasks:
72+
# Add task to available tasks and remove from non available
73+
self.availableTasks.add(item)
74+
self.notAvailableTasks.remove(item)
75+
76+
def computeSchedule(self) -> None:
77+
"""
78+
Main routine to compute a schedule.
79+
The scheduler is based on the following routine:
80+
81+
As long as there are tasks that can be scheduled:
82+
- Choose one tasks based on logic (see pickTasks)
83+
- Assign task, i.e. compute the earliest possible start
84+
on the corresponding process
85+
- Update the three lists of tasks
86+
"""
87+
while len(self.availableTasks) != 0:
88+
taskName = self.pickTask()
89+
self.assignTask(taskName=taskName)
90+
self.updateLists(taskName=taskName)

‎blockops/scheduler/lowestCostFirst.py

+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import numpy as np
2+
import sympy as sy
3+
4+
from blockops.scheduler import register, ScheduledTask, TaskPool
5+
from blockops.scheduler.listScheduler import listScheduler
6+
from blockops.utils.params import setParams
7+
8+
9+
@register
10+
@setParams(
11+
)
12+
class LowestCostFirst(listScheduler):
13+
"""
14+
Calculates a schedule using a list approach where the task with the
15+
lowest cost is schedules first. This assumes that the cheapest tasks
16+
corresponds to lower lever tasks which helps most to enable new
17+
computations.
18+
"""
19+
NAME = "LowestCostFirst"
20+
IDS = {"LOWEST-COST-FIRST", "LCF"}
21+
22+
def __init__(self, taskPool: TaskPool, nProc: int, nPoints: int) -> None:
23+
"""
24+
Constructor for LowestCostFirst scheduler
25+
"""
26+
super().__init__(taskPool=taskPool, nProc=nProc, nPoints=nPoints)
27+
28+
def pickTask(self) -> sy.Symbol:
29+
"""
30+
Contains logic to pick the next task to schedule.
31+
32+
Selects tasks based on the following priotirization:
33+
- Task with lowest costs
34+
35+
Returns
36+
-------
37+
taskName : sy.Symbol
38+
Name of the task to be scheduled next
39+
"""
40+
tmp = [[self.taskPool.getTask(item).cost,
41+
self.taskPool.getTask(item).iteration,
42+
self.taskPool.getTask(item).block, item] for item in self.availableTasks]
43+
return min(tmp, key=lambda x: (x[0], x[1], x[2]))[3]
44+
45+
def assignTask(self, taskName: sy.Symbol) -> None:
46+
"""
47+
Computes the earliest starting point for the task *taskName*.
48+
The starting point depends on the prerequisite tasks of *taskName*.
49+
50+
Parameters
51+
----------
52+
taskName : sy.Symbol
53+
The name of the task to be scheduled
54+
"""
55+
# Get task
56+
task = self.taskPool.getTask(taskName)
57+
58+
# Compute the minimum start time based on the prerequisites
59+
minimal_start_time = 0
60+
for depTask in task.dep:
61+
if self.schedule[depTask].end > minimal_start_time:
62+
minimal_start_time = self.schedule[depTask].end
63+
64+
# Get the first process who is free for the minimal start time
65+
tmp = np.where(self.startPointProc <= minimal_start_time)[0]
66+
if len(tmp) > 0:
67+
proc = tmp[0]
68+
else:
69+
proc = np.argmin(self.startPointProc)
70+
minimal_start_time = self.startPointProc[proc]
71+
self.schedule[taskName] = ScheduledTask(proc=proc,
72+
start=minimal_start_time,
73+
end=minimal_start_time + task.cost,
74+
name=task.opType,
75+
color=task.color)
76+
self.startPointProc[proc] = self.schedule[taskName].end
77+
78+
# Update minimum runtime if required
79+
if self.schedule[taskName].end > self.makespan:
80+
self.makespan = self.schedule[taskName].end

‎blockops/scheduler/optimal.py

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import numpy as np
2+
import sympy as sy
3+
4+
from blockops.scheduler import register, ScheduledTask, TaskPool
5+
from blockops.scheduler.listScheduler import listScheduler
6+
from blockops.utils.params import setParams
7+
8+
9+
@register
10+
@setParams(
11+
)
12+
class Optimal(listScheduler):
13+
"""
14+
Calculates an optimal schedule using a simple greedy approach.
15+
Assumes unlimited processes and does not minimize the number of processes.
16+
"""
17+
NAME = "Optimal"
18+
IDS = {"OPTIMAL", "OPT"}
19+
20+
def __init__(self, taskPool: TaskPool, nProc: int, nPoints: int) -> None:
21+
"""
22+
Constructor for optimal scheduler
23+
"""
24+
super().__init__(taskPool=taskPool, nProc=nProc, nPoints=nPoints)
25+
self.startPointProc = np.zeros(20000)
26+
27+
def pickTask(self) -> sy.Symbol:
28+
"""
29+
Contains logic to pick the next task to schedule
30+
31+
Choose the cheapest task (Typically corresponds to
32+
coarse solves that often allow new tasks)
33+
Random choices are possible
34+
35+
Returns
36+
-------
37+
taskName : sy.Symbol
38+
Name of the task to be scheduled next
39+
"""
40+
tmp = [[self.taskPool.getTask(item).cost, item] for item in self.availableTasks]
41+
return min(tmp, key=lambda x: x[0])[1]
42+
43+
def assignTask(self, taskName):
44+
"""
45+
Computes the earliest starting point for the task *taskName*.
46+
Assigns the task to process as soon as possible
47+
"""
48+
task = self.taskPool.getTask(taskName)
49+
minimal_start_time = 0
50+
for depTask in task.dep:
51+
if self.schedule[depTask].end > minimal_start_time:
52+
minimal_start_time = self.schedule[depTask].end
53+
# Get the first process who is free for the minimal start time
54+
proc = next(x[0] for x in enumerate(self.startPointProc) if x[1] <= minimal_start_time)
55+
self.schedule[taskName] = ScheduledTask(proc=proc,
56+
start=minimal_start_time,
57+
end=minimal_start_time + task.cost,
58+
name=task.opType,
59+
color=task.color)
60+
self.startPointProc[proc] = self.schedule[taskName].end
61+
if self.schedule[taskName].end > self.makespan:
62+
self.makespan = self.schedule[taskName].end
63+
64+
def computeSchedule(self):
65+
"""
66+
Main routine to compute a schedule.
67+
Calls the parent function and updates the number of processes used
68+
"""
69+
super(Optimal, self).computeSchedule()
70+
self.nProc = len(np.where(self.startPointProc != 0)[0])
+124
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import numpy as np
2+
import sympy as sy
3+
4+
from blockops.scheduler import register, ScheduledTask, TaskPool
5+
from blockops.scheduler.listScheduler import listScheduler
6+
from blockops.utils.params import setParams
7+
8+
9+
@register
10+
@setParams(
11+
)
12+
class PinTBlockByBlock(listScheduler):
13+
"""
14+
Computes standard parallel-in-time schedule based on block-by-block basis
15+
"""
16+
17+
NAME = "PinT Block-by-Block"
18+
IDS = {'BLOCK-BY-BLOCK', 'BbB'}
19+
20+
def __init__(self, taskPool: TaskPool, nProc: int, nPoints: int) -> None:
21+
"""
22+
Constructor for PinTBlockByBlock scheduler
23+
"""
24+
super().__init__(taskPool=taskPool, nProc=nProc, nPoints=nPoints)
25+
26+
# Compute which time points are located on which process
27+
self.distribution = np.array([int(self.nPoints / self.nProc + 1)] * (self.nPoints % self.nProc) +
28+
[int(self.nPoints / self.nProc)] * (self.nProc - self.nPoints % self.nProc))
29+
self.pointToProc = {}
30+
start = 0
31+
for i in range(self.nProc):
32+
for j in range(start, start + self.distribution[i]):
33+
self.pointToProc[j] = i
34+
start += self.distribution[i]
35+
36+
def pickTask(self) -> sy.Symbol:
37+
"""
38+
Contains logic to pick the next task to schedule.
39+
40+
Selects tasks based on the following priotirization:
41+
- Earliest iteration first
42+
- Latest time first
43+
44+
Returns
45+
-------
46+
taskName : sy.Symbol
47+
Name of the task to be scheduled next
48+
"""
49+
50+
# Pick the first task
51+
taskName = next(iter(self.availableTasks))
52+
task = self.taskPool.getTask(name=taskName)
53+
tmpIt = task.iteration
54+
tmpB = task.block
55+
56+
# Iterates over all tasks and checks if another task has higher priority
57+
for item in self.availableTasks:
58+
tmp = self.taskPool.getTask(name=item)
59+
if tmp.iteration < tmpIt:
60+
taskName = item
61+
task = tmp
62+
tmpIt = task.iteration
63+
tmpB = task.block
64+
elif tmp.iteration == tmpIt:
65+
if tmp.block > tmpB:
66+
taskName = item
67+
task = tmp
68+
tmpIt = task.iteration
69+
tmpB = task.block
70+
else:
71+
continue
72+
return taskName
73+
74+
def assignTask(self, taskName: sy.Symbol) -> None:
75+
"""
76+
Computes the earliest starting point for the task *taskName*.
77+
The starting point depends on the prerequisite tasks of *taskName*
78+
and the process associated with the task
79+
80+
Parameters
81+
----------
82+
taskName : sy.Symbol
83+
The name of the task to be scheduled
84+
"""
85+
86+
# Get task object
87+
task = self.taskPool.getTask(taskName)
88+
89+
# Compute earliest start point
90+
possibleStartTime = self.startPointProc[self.pointToProc[task.block]]
91+
tmp_commu = 0
92+
for depTask in task.dep:
93+
if self.schedule[depTask].end + tmp_commu > possibleStartTime:
94+
possibleStartTime = self.schedule[depTask].end + tmp_commu
95+
96+
self.schedule[taskName] = ScheduledTask(proc=self.pointToProc[task.block],
97+
start=possibleStartTime,
98+
end=possibleStartTime + task.cost,
99+
name=task.opType,
100+
color=task.color)
101+
102+
self.startPointProc[self.pointToProc[task.block]] = self.schedule[taskName].end
103+
104+
# Update makespan if required
105+
if self.schedule[taskName].end > self.makespan:
106+
self.makespan = self.schedule[taskName].end
107+
108+
@staticmethod
109+
def getDefaultNProc(N: int) -> int:
110+
"""
111+
Returns the standard choice of processes for this scheduler
112+
based on the number of blocks
113+
114+
Parameters
115+
----------
116+
N : int
117+
TODO
118+
119+
Returns
120+
-------
121+
N : int
122+
TODO
123+
"""
124+
return N

‎blockops/utils/params.py

+15-3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
import copy
1111
import numpy as np
1212

13+
from blockops.taskPool import TaskPool
14+
15+
1316
# -----------------------------------------------------------------------------
1417
# Base classes
1518
# -----------------------------------------------------------------------------
@@ -34,7 +37,7 @@ class Parameter(object):
3437
value = None
3538

3639
def error(self, value, reason):
37-
reason+= f" ({self.pType})"
40+
reason += f" ({self.pType})"
3841
raise ParamError(self.name, value, reason)
3942

4043
@property
@@ -102,7 +105,7 @@ def extractParamDocs(cls, *names):
102105
docLines = docs[iStart:].splitlines()[2:]
103106
descr = []
104107
for line in docLines:
105-
if line.startswith(8*' '):
108+
if line.startswith(8 * ' '):
106109
descr.append(line.strip())
107110
elif line.strip() == '':
108111
continue
@@ -131,7 +134,7 @@ def wrapper(cls):
131134

132135
# Ignore **kwargs type arguments
133136
sigParams = {name: par for name, par in sig.parameters.items()
134-
if par.kind != par.VAR_KEYWORD}
137+
if par.kind != par.VAR_KEYWORD}
135138

136139
# Add parameter object to the class PARAMS dictionnary
137140
for name, pType in kwargs.items():
@@ -272,3 +275,12 @@ def check(self, value):
272275
if not isinstance(value, bool):
273276
self.error(value, "not of boolean type")
274277
return True
278+
279+
280+
class TaskPoolParam(Parameter):
281+
"""Accepts a taskPool class"""
282+
283+
def check(self, value):
284+
if not isinstance(value, TaskPool):
285+
self.error(value, "not of TaskPool type")
286+
return True

‎notebook/01_baseTuto.ipynb

+47-47
Large diffs are not rendered by default.

‎notebook/04_schedules.ipynb

+17-17
Large diffs are not rendered by default.

‎blockops/schedule.py ‎old/schedule.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ class LowestCostFirst(Schedule):
192192
that the cheapest tasks corresponds to lower lever tasks which helps most to enable new computations.
193193
"""
194194
NAME = "LowestCostFirst"
195-
IDS = {"LOWEST-COST-FIRST", "LCF"}
195+
IDS = {"LOWEST-COST-FIRST", "LCF", "LowestCostFirst"}
196196

197197
def __init__(self, *args: object, **kwargs: object):
198198
super().__init__(*args, **kwargs)
@@ -313,12 +313,12 @@ def computeSchedule(self):
313313
self.nProc = len(np.where(self.startPointProc != 0)[0])
314314

315315

316-
def getSchedule(taskPool: TaskPool, nProc: int, nPoints: int, schedule_type: str
316+
def getSchedule(taskPool: TaskPool, nProc: int, nPoints: int, scheduleType: str
317317
) -> Schedule:
318-
if schedule_type not in SCHEDULE_TYPES:
318+
if scheduleType not in SCHEDULE_TYPES:
319319
raise Exception(f"Schedule {type} not implemented, must be in {list(SCHEDULE_TYPES.keys())}")
320320
else:
321-
ScheduleClass = SCHEDULE_TYPES[schedule_type]
321+
ScheduleClass = SCHEDULE_TYPES[scheduleType]
322322
nProc = ScheduleClass.getDefaultNProc(nPoints - 1) if nProc is None else nProc
323323
schedule = ScheduleClass(taskPool=taskPool, nProc=nProc, nPoints=nPoints)
324324
schedule.computeSchedule()

‎scripts/03_pintIter.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474
efficiencyLCF = np.zeros(nBlocks + 1)
7575
for k in reqIters:
7676
speedupLCF[k], efficiencyLCF[k], _ = algo.getPerformances(
77-
N=nBlocks, K=k, nProc=nBlocks + 1, schedule_type='LCF')
77+
N=nBlocks, K=k, nProc=nBlocks + 1, schedulerType='LCF')
7878
nSpeedup = speedupLCF[nIter]
7979
nEfficiency = efficiencyLCF[nIter]
8080

@@ -89,7 +89,7 @@
8989
efficiencyBbB = np.zeros(nBlocks + 1)
9090
for k in reqIters:
9191
speedupBbB[k], efficiencyBbB[k], _ = algo.getPerformances(
92-
N=nBlocks, K=k, schedule_type='BbB')
92+
N=nBlocks, K=k, schedulerType='BbB')
9393
nSpeedup = speedupBbB[nIter]
9494
nEfficiency = efficiencyBbB[nIter]
9595

‎scripts/04_runtimeScreening.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,5 @@
3939
for blockIter in [parareal,pararealSC,abj, abgs]:
4040
print(f'N: {nBlocks}, K: {k}, method: {method}, blockIter {blockIter.name}')
4141
start = time.time()
42-
a = blockIter.getRuntime(N=nBlocks, K=k, nProc=nBlocks, schedule_type=method)
42+
a = blockIter.getRuntime(N=nBlocks, K=k, nProc=nBlocks, schedulerType=method)
4343
print(f'{blockIter.name} runtime for N={nBlocks}, K={k}: {time.time()-start}, {a}')

‎scripts/05_speedupPFASST.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@
7575
efficiencyBbB = np.zeros(nBlocks + 1)
7676
for k in reqIters:
7777
speedupBbB[k], efficiencyBbB[k], _ = algo.getPerformances(
78-
N=nBlocks, K=k, schedule_type='BbB')
78+
N=nBlocks, K=k, schedulerType='BbB')
7979
nSpeedup = speedupBbB[nIter]
8080
nEfficiency = efficiencyBbB[nIter]
8181

0 commit comments

Comments
 (0)
Please sign in to comment.