Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion notebooks/1-single-node.ipynb

Large diffs are not rendered by default.

74 changes: 59 additions & 15 deletions src/executorlib/task_scheduler/interactive/dependency_plot.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import inspect
import os.path
from concurrent.futures import Future
from typing import Optional
Expand All @@ -24,6 +25,12 @@ def generate_nodes_and_edges_for_plotting(
edge_lst: list = []
hash_id_dict: dict = {}

def extend_args(funct_dict):
sig = inspect.signature(funct_dict["fn"])
args = sig.bind(*funct_dict["args"], **funct_dict["kwargs"])
funct_dict["signature"] = args.arguments
return funct_dict
Comment on lines +28 to +32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Add error handling for signature binding failures.

The sig.bind() call can raise TypeError if arguments don't match the function signature (e.g., missing required parameters, unexpected keyword arguments). Without error handling, this will crash the plotting functionality.

🔎 Proposed fix with error handling
 def extend_args(funct_dict):
-    sig = inspect.signature(funct_dict["fn"])
-    args = sig.bind(*funct_dict["args"], **funct_dict["kwargs"])
-    funct_dict["signature"] = args.arguments
-    return funct_dict
+    try:
+        sig = inspect.signature(funct_dict["fn"])
+        args = sig.bind(*funct_dict["args"], **funct_dict["kwargs"])
+        funct_dict["signature"] = args.arguments
+    except (TypeError, ValueError) as e:
+        # Fallback to kwargs if signature binding fails
+        funct_dict["signature"] = funct_dict["kwargs"]
+    return funct_dict
🤖 Prompt for AI Agents
In src/executorlib/task_scheduler/interactive/dependency_plot.py around lines 28
to 32, the call to sig.bind(...) can raise TypeError when provided args/kwargs
don't match the function signature; wrap the bind call in a try/except that
catches TypeError, record the error (e.g., add a "binding_error" or
"binding_exception" entry to funct_dict with the exception message/exception
object) and either return funct_dict with that error attached or re-raise a more
descriptive exception; ensure you do not let an unhandled TypeError crash the
caller and include enough context (function name and provided args/kwargs) in
the stored or raised message.


def add_element(arg, link_to, label=""):
"""
Add element to the node and edge lists.
Expand All @@ -39,6 +46,8 @@ def add_element(arg, link_to, label=""):
"start": hash_id_dict[future_hash_inverse_dict[arg._future]],
"end": link_to,
"label": label + str(arg._selector),
"end_label": label,
"start_label": str(arg._selector),
}
)
elif isinstance(arg, Future):
Expand All @@ -53,39 +62,71 @@ def add_element(arg, link_to, label=""):
lst_no_future = [a if not isinstance(a, Future) else "$" for a in arg]
node_id = len(node_lst)
node_lst.append(
{"name": str(lst_no_future), "id": node_id, "shape": "circle"}
{
"name": str(lst_no_future),
"value": "python_workflow_definition.shared.get_list",
"id": node_id,
"type": "function",
"shape": "box",
}
)
edge_lst.append({"start": node_id, "end": link_to, "label": label})
for i, a in enumerate(arg):
if isinstance(a, Future):
add_element(arg=a, link_to=node_id, label="ind: " + str(i))
add_element(arg=a, link_to=node_id, label=str(i))
elif isinstance(arg, dict) and any(isinstance(a, Future) for a in arg.values()):
dict_no_future = {
kt: vt if not isinstance(vt, Future) else "$" for kt, vt in arg.items()
}
node_id = len(node_lst)
node_lst.append(
{"name": str(dict_no_future), "id": node_id, "shape": "circle"}
{
"name": str(dict_no_future),
"value": "python_workflow_definition.shared.get_dict",
"id": node_id,
"type": "function",
"shape": "box",
}
)
edge_lst.append({"start": node_id, "end": link_to, "label": label})
for kt, vt in arg.items():
if isinstance(vt, Future):
add_element(arg=vt, link_to=node_id, label="key: " + kt)
add_element(arg=vt, link_to=node_id, label=kt)
else:
node_id = len(node_lst)
node_lst.append({"name": str(arg), "id": node_id, "shape": "circle"})
value_dict = {
str(n["value"]): n["id"] for n in node_lst if n["type"] == "input"
}
if str(arg) not in value_dict:
node_id = len(node_lst)
node_lst.append(
{
"name": label,
"value": arg,
"id": node_id,
"type": "input",
"shape": "circle",
}
)
else:
node_id = value_dict[str(arg)]
edge_lst.append({"start": node_id, "end": link_to, "label": label})

for k, v in task_hash_dict.items():
task_hash_modified_dict = {
k: extend_args(funct_dict=v) for k, v in task_hash_dict.items()
}

for k, v in task_hash_modified_dict.items():
hash_id_dict[k] = len(node_lst)
node_lst.append(
{"name": v["fn"].__name__, "id": hash_id_dict[k], "shape": "box"}
{
"name": v["fn"].__name__,
"type": "function",
"value": v["fn"].__module__ + "." + v["fn"].__name__,
"id": hash_id_dict[k],
"shape": "box",
}
)
for k, task_dict in task_hash_dict.items():
for arg in task_dict["args"]:
add_element(arg=arg, link_to=hash_id_dict[k], label="")

for kw, v in task_dict["kwargs"].items():
for k, task_dict in task_hash_modified_dict.items():
for kw, v in task_dict["signature"].items():
add_element(arg=v, link_to=hash_id_dict[k], label=str(kw))
Comment on lines +128 to 130
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Add defensive check for "signature" key existence.

Line 147 assumes task_dict["signature"] exists, but if extend_args encounters an error (which currently has no error handling), the "signature" key may be missing, causing a KeyError.

🔎 Proposed fix with fallback
 for k, task_dict in task_hash_modified_dict.items():
-    for kw, v in task_dict["signature"].items():
+    signature = task_dict.get("signature", task_dict.get("kwargs", {}))
+    for kw, v in signature.items():
         add_element(arg=v, link_to=hash_id_dict[k], label=str(kw))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for k, task_dict in task_hash_modified_dict.items():
for kw, v in task_dict["signature"].items():
add_element(arg=v, link_to=hash_id_dict[k], label=str(kw))
for k, task_dict in task_hash_modified_dict.items():
signature = task_dict.get("signature", task_dict.get("kwargs", {}))
for kw, v in signature.items():
add_element(arg=v, link_to=hash_id_dict[k], label=str(kw))
🤖 Prompt for AI Agents
In src/executorlib/task_scheduler/interactive/dependency_plot.py around lines
146 to 148, the loop assumes task_dict["signature"] always exists which can
raise KeyError if extend_args failed; guard access by checking for the
"signature" key (e.g., use task_dict.get("signature") or an if "signature" in
task_dict) and treat a missing signature as an empty mapping so the inner loop
is skipped; optionally log or warn when signature is missing to aid debugging.


return node_lst, edge_lst
Expand Down Expand Up @@ -175,7 +216,10 @@ def plot_dependency_graph_function(

graph = nx.DiGraph()
for node in node_lst:
graph.add_node(node["id"], label=node["name"], shape=node["shape"])
if node["type"] == "input":
graph.add_node(node["id"], label=str(node["value"]), shape=node["shape"])
else:
graph.add_node(node["id"], label=str(node["name"]), shape=node["shape"])
for edge in edge_lst:
graph.add_edge(edge["start"], edge["end"], label=edge["label"])
if filename is not None:
Expand Down
8 changes: 4 additions & 4 deletions tests/test_fluxjobexecutor_plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def test_executor_dependency_plot(self):
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
},
)
self.assertEqual(len(nodes), 5)
self.assertEqual(len(nodes), 4)
self.assertEqual(len(edges), 4)

def test_many_to_one_plot(self):
Expand Down Expand Up @@ -106,7 +106,7 @@ def test_many_to_one_plot(self):
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
},
)
self.assertEqual(len(nodes), 19)
self.assertEqual(len(nodes), 14)
self.assertEqual(len(edges), 22)


Expand All @@ -132,7 +132,7 @@ def test_executor_dependency_plot(self):
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
},
)
self.assertEqual(len(nodes), 5)
self.assertEqual(len(nodes), 4)
self.assertEqual(len(edges), 4)

def test_many_to_one_plot(self):
Expand Down Expand Up @@ -175,5 +175,5 @@ def test_many_to_one_plot(self):
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
},
)
self.assertEqual(len(nodes), 19)
self.assertEqual(len(nodes), 14)
self.assertEqual(len(edges), 22)
12 changes: 6 additions & 6 deletions tests/test_singlenodeexecutor_plot_dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def test_executor_dependency_plot(self):
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
},
)
self.assertEqual(len(nodes), 5)
self.assertEqual(len(nodes), 4)
self.assertEqual(len(edges), 4)

def test_executor_dependency_plot_filename(self):
Expand Down Expand Up @@ -141,7 +141,7 @@ def test_many_to_one_plot(self):
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
},
)
self.assertEqual(len(nodes), 19)
self.assertEqual(len(nodes), 14)
self.assertEqual(len(edges), 22)

def test_future_input_dict(self):
Expand Down Expand Up @@ -186,7 +186,7 @@ def test_executor_dependency_plot(self):
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
},
)
self.assertEqual(len(nodes), 5)
self.assertEqual(len(nodes), 4)
self.assertEqual(len(edges), 4)

def test_many_to_one_plot(self):
Expand Down Expand Up @@ -231,7 +231,7 @@ def test_many_to_one_plot(self):
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
},
)
self.assertEqual(len(nodes), 19)
self.assertEqual(len(nodes), 14)
self.assertEqual(len(edges), 22)


Expand All @@ -257,7 +257,7 @@ def test_executor_dependency_plot(self):
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
},
)
self.assertEqual(len(nodes), 5)
self.assertEqual(len(nodes), 4)
self.assertEqual(len(edges), 4)

def test_many_to_one_plot(self):
Expand Down Expand Up @@ -300,7 +300,7 @@ def test_many_to_one_plot(self):
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
},
)
self.assertEqual(len(nodes), 19)
self.assertEqual(len(nodes), 14)
self.assertEqual(len(edges), 22)


Expand Down
2 changes: 1 addition & 1 deletion tests/test_testclusterexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def test_executor_dependency_plot(self):
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
},
)
self.assertEqual(len(nodes), 5)
self.assertEqual(len(nodes), 4)
self.assertEqual(len(edges), 4)

def tearDown(self):
Expand Down