Skip to content

Commit ab2bc8b

Browse files
authored
Improve task naming for Coiled executor (#811)
* Improve task naming for Coiled executor * Store (user) function name in DAG * Use func_name from DAG
1 parent 2338790 commit ab2bc8b

File tree

5 files changed

+26
-17
lines changed

5 files changed

+26
-17
lines changed

cubed/core/plan.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ def _new(
103103

104104
first_cubed_i = min(i for i, s in enumerate(stack_summaries) if s.is_cubed())
105105
first_cubed_summary = stack_summaries[first_cubed_i]
106+
func_name = first_cubed_summary.name
106107

107108
op_name_unique = gensym()
108109

@@ -112,9 +113,9 @@ def _new(
112113
op_name_unique,
113114
name=op_name_unique,
114115
op_name=op_name,
116+
func_name=func_name,
115117
type="op",
116118
stack_summaries=stack_summaries,
117-
op_display_name=f"{op_name_unique}\n{first_cubed_summary.name}",
118119
hidden=hidden,
119120
)
120121
# array
@@ -143,9 +144,9 @@ def _new(
143144
op_name_unique,
144145
name=op_name_unique,
145146
op_name=op_name,
147+
func_name=func_name,
146148
type="op",
147149
stack_summaries=stack_summaries,
148-
op_display_name=f"{op_name_unique}\n{first_cubed_summary.name}",
149150
hidden=hidden,
150151
primitive_op=primitive_op,
151152
pipeline=primitive_op.pipeline,
@@ -216,7 +217,7 @@ def _create_lazy_zarr_arrays(self, dag):
216217
name=name,
217218
op_name=op_name,
218219
type="op",
219-
op_display_name=name,
220+
func_name="",
220221
primitive_op=primitive_op,
221222
pipeline=primitive_op.pipeline,
222223
)
@@ -379,7 +380,8 @@ def visualize(
379380
tooltip = f"name: {n}\n"
380381
node_type = d.get("type", None)
381382
if node_type == "op":
382-
label = d["op_display_name"]
383+
func_name = d["func_name"]
384+
label = f"{n}\n{func_name}".strip()
383385
op_name = d["op_name"]
384386
if op_name == "blockwise":
385387
d["style"] = '"rounded,filled"'

cubed/diagnostics/rich.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ def on_compute_start(self, event):
4444
progress_tasks = {}
4545
for name, node in visit_nodes(event.dag):
4646
num_tasks = node["primitive_op"].num_tasks
47-
op_display_name = node["op_display_name"].replace("\n", " ")
47+
func_name = node["func_name"]
48+
op_display_name = f"{name} {func_name}"
4849
progress_task = progress.add_task(
4950
f"{op_display_name}", start=False, total=num_tasks
5051
)

cubed/diagnostics/tqdm.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,15 @@ def on_compute_start(self, event):
1818
from tqdm.auto import tqdm
1919

2020
# find the maximum display width so we can align bars below
21-
max_op_display_name = (
22-
max(
23-
len(node["op_display_name"].replace("\n", " "))
24-
for _, node in visit_nodes(event.dag)
25-
)
26-
+ 1 # for the colon
21+
max_op_display_name = max(
22+
len(f"{name} {node['func_name']}:") for name, node in visit_nodes(event.dag)
2723
)
2824

2925
self.pbars = {}
3026
for i, (name, node) in enumerate(visit_nodes(event.dag)):
3127
num_tasks = node["primitive_op"].num_tasks
32-
op_display_name = node["op_display_name"].replace("\n", " ") + ":"
28+
func_name = node["func_name"]
29+
op_display_name = f"{name} {func_name}:"
3330
# note double curlies to get literal { and } for tqdm bar format
3431
bar_format = (
3532
f"{{desc:{max_op_display_name}}} {{percentage:3.0f}}%|{{bar}}{{r_bar}}"

cubed/runtime/executors/coiled.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
from cubed.spec import Spec
1515

1616

17-
def make_coiled_function(func, coiled_kwargs):
18-
return coiled.function(**coiled_kwargs)(execution_stats(func))
17+
def make_coiled_function(func, name, coiled_kwargs):
18+
return coiled.function(**coiled_kwargs)(execution_stats(func, name=name))
1919

2020

2121
class CoiledExecutor(DagExecutor):
@@ -42,7 +42,12 @@ def execute_dag(
4242
for name, node in visit_nodes(dag):
4343
handle_operation_start_callbacks(callbacks, name)
4444
pipeline = node["pipeline"]
45-
coiled_function = make_coiled_function(pipeline.function, merged_kwargs)
45+
# this name will show up on the dask dashboard - need to replace '-' as anything after it is suppressed
46+
func_name = node["func_name"]
47+
op_display_name = f"{name} {func_name}".replace("-", "_")
48+
coiled_function = make_coiled_function(
49+
pipeline.function, op_display_name, merged_kwargs
50+
)
4651
if minimum_workers is not None:
4752
coiled_function.cluster.adapt(minimum=minimum_workers)
4853
# coiled expects a sequence (it calls `len` on it)

cubed/runtime/utils.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,14 @@ def execute_with_timing(function, *args, **kwargs):
5757
)
5858

5959

60-
def execution_stats(func):
60+
def execution_stats(func, name=None):
6161
"""Decorator to measure timing information and peak memory usage of a function call."""
6262

63-
return partial(execute_with_stats, func)
63+
def wrapper(*args, **kwargs):
64+
return execute_with_stats(func, *args, **kwargs)
65+
66+
wrapper.__name__ = name or func.__name__
67+
return wrapper
6468

6569

6670
def execution_timing(func):

0 commit comments

Comments
 (0)