-
Notifications
You must be signed in to change notification settings - Fork 5
Adding print_tree method to FlowGraph #103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding print_tree method to FlowGraph #103
Conversation
@Bennylave , Nice! I'm not sure if I understand the question correctly, but maybe the information below helps. FlowGraph is indeed a good entry point. Every step that is added is a NodeStep. import flowfile as ff
from flowfile_core.flowfile.flow_graph import FlowGraph
graph: FlowGraph = ff.create_flow_graph()
# Explicitly creating and attaching the graph is optional, but for clarity it is nice. If you do not provide it, it generates a new FlowGraph
df_1 = ff.FlowFrame([
{"id": 1, "region": "North", "quantity": 10, "price": 150},
{"id": 2, "region": "South", "quantity": 5, "price": 300},
{"id": 3, "region": "East", "quantity": 8, "price": 200},
], flow_graph=graph)
df_2 = df_1.with_columns(flowfile_formulas=['[quantity] * [price]'], output_column_names=["total"])
df_3 = df_2.filter(flowfile_formula="[total]>1500")
df_4 = df_3.group_by(['region']).agg([
ff.col("total").sum().alias("total_revenue"),
ff.col("total").mean().alias("total_quantity"),
])
# now we can access all the nodes that were created in the graph
print(graph._node_db)
# {1: Node id: 1 (manual_input), 3: Node id: 3 (formula), 4: Node id: 4 (filter), 5: Node id: 5 (group_by)}
# you can also find the starting node(s) of the graph:
print(graph._flow_starts)
# [Node id: 1 (manual_input)]
# and from every node, you can access the next node that has a dependency on it:
print(graph.get_node(1).leads_to_nodes)
# [Node id: 3 (formula)]
# the other way around, works also:
print(graph.get_node(3).node_inputs)
# NodeStepInputs(Left Input: None, Right Input: None, Main Inputs: [Node id: 1 (manual_input)])
# you can also access the settings of the node and it's type:
print(graph.get_node(4).setting_input)
print(graph.get_node(4).node_type) |
@Edwardvaneechoud let's use the example of
If you look at the example output from this PR's method, you'll see that the The only way I could find around this is by using the I'm not sure whether it's possible or desirable to have If you have any other suggestion for a way to find the details about the operations being done on the PS : I've been using the example in the README but I am also open for other example that you'd find more adequate. |
@Bennylave Ah yes, I understand now, the pipeline in the issue is a FlowFrame, so FlowGraph can not have a dependency on FlowFrame. You can get information of the node by accessing the import flowfile as ff
from flowfile_core.flowfile.flow_graph import FlowGraph
graph: FlowGraph = ff.create_flow_graph()
pipeline = (
ff.FlowFrame([
{"id": 1, "region": "North", "quantity": 10, "price": 150},
{"id": 2, "region": "South", "quantity": 5, "price": 300},
{"id": 3, "region": "East", "quantity": 8, "price": 200},
])
.with_columns(flowfile_formulas=['[quantity] * [price]'], output_column_names=["total"])
.filter(flowfile_formula="[total]>1500")
.group_by(['region']).agg([
ff.col("total").sum().alias("total_revenue"),
ff.col("total").mean().alias("total_quantity"),
])
)
for node in pipeline.flow_graph.nodes:
print(node.setting_input)
# flow_id=4411 node_id=1 cache_results=False pos_x=100.0 pos_y=100.0 is_setup=True description='Data imported from Pyth...
# flow_id=4411 node_id=3 cache_results=False pos_x=0 pos_y=0 is_setup=True description=None user_id=None is_flow_output...
# flow_id=4411 node_id=4 cache_results=False pos_x=200.0 pos_y=150.0 is_setup=True description=None user_id=None is_flo...
# flow_id=4411 node_id=5 cache_results=False pos_x=200.0 pos_y=200.0 is_setup=True description='Aggregate after groupin... Since I am working on some documentation of how to use the api and FlowGraph and will update it soon! |
@Edwardvaneechoud is there a 1:1 match between If that's the case I need to add some other types. |
@Bennylave, No, explain uses the polars integration, and the transformation is free to use multiple strategies (for example cross join and join both map to join in polars). We can show the description field, some transformations already fill it automatically.
If no description is available we can fallback to just mentioning filter - node id. or something. Then it is just important that we generate good descriptions, but that is probably the easiest! |
@Edwardvaneechoud can you please review this? Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looked at it and have some comments:
skip_nodes
andexecution_order
are now instance attributes, which is confusing and potentially can lead to bugs.- It gives the illusion that they are always up to date. It is better to refactor them in a function, so that when you need them, you always explicitly have to recalculate them.
- It creates a shared mutable state, which might be problematic in the future. For example, when we run the graph twice without waiting for completion, it will overwrite each other state.
- We can solve this by using this part of the code (perhaps we can move this into a method for reusability).
skip_nodes = [node for node in self.nodes if not node.is_correct]
skip_nodes.extend([lead_to_node for node in skip_nodes for lead_to_node in node.leads_to_nodes])
execution_order = determine_execution_order(all_nodes=[node for node in self.nodes if
node not in skip_nodes],
flow_starts=self._flow_starts+self.get_implicit_starter_nodes())
- There is an issue with the function: pipeline is not defined. You can just do self._node_db.get(...,)
operation = str(pipeline.flow_graph._node_db[node_input.node_id]).split("(")[1][:-1].replace("_", " ").title()
- Can you add tests so that we know the function is working, this will also make it easier for us to see what it is doing. I've created in my branch these tests:
flowfile_core/tests/flowfile/test_flowfile.py
@pytest.fixture
def join_graph():
graph = create_graph()
# graph.flow_settings.execution_mode = 'Performance'
left_data = [{"name": "eduward"},
{"name": "edward"},
{"name": "edwin"}]
add_manual_input(graph, data=left_data)
right_data = left_data[:1]
add_manual_input(graph, data=right_data, node_id=2)
add_node_promise_on_type(graph, 'join', 3)
left_connection = input_schema.NodeConnection.create_from_simple_input(1, 3)
right_connection = input_schema.NodeConnection.create_from_simple_input(2, 3)
right_connection.input_connection.connection_class = 'input-1'
add_connection(graph, left_connection)
add_connection(graph, right_connection)
data = get_join_data(how='inner')
graph.add_join(input_schema.NodeJoin(**data))
return graph
def test_print_tree_group_by():
graph = get_group_by_flow()
original_stdout = sys.stdout
# 2. Redirect standard output to a new StringIO object
captured_output = StringIO()
with redirect_stdout(captured_output):
graph.print_tree()
# TODO: There are many new lines in the beginning is that needed?
assert captured_output.getvalue().strip() == "'Manual Input (id=1)\n# |___ Group By (id=2)'"
def test_print_tree_join(join_graph):
graph = join_graph
original_stdout = sys.stdout
# 2. Redirect standard output to a new StringIO object
captured_output = StringIO()
with redirect_stdout(captured_output):
_= graph.print_tree()
assert captured_output.getvalue().strip() == "'Manual Input (id=1)\n# |___ Manual Input (id=2)\n# |___ Join (id=3)'"
def test_print_tree_union():
# Create a graph with multiple input nodes
graph = create_graph()
for i in range(4):
input_data = (FlowDataEngine.create_random(100).apply_flowfile_formula('random_int(0, 4)', 'groups')
.select_columns(['groups', 'Country', 'sales_data']))
add_manual_input(graph, data=input_data.to_pylist(), node_id=i)
graph.add_union(input_schema.NodeUnion(flow_id=1, node_id=5))
for i in range(4):
connection = input_schema.NodeConnection.create_from_simple_input(i, 5)
add_connection(graph, connection)
graph.print_tree()
They do not pass yet, but this way we can validate that the it works.
- Fix how multiple inputs are handled, the print_tree method only takes a step by step approach, not the connections between the steps =>
def test_print_tree_union():
graph = create_graph()
for i in range(4):
input_data = (FlowDataEngine.create_random(100).apply_flowfile_formula('random_int(0, 4)', 'groups')
.select_columns(['groups', 'Country', 'sales_data']))
add_manual_input(graph, data=input_data.to_pylist(), node_id=i)
graph.add_union(input_schema.NodeUnion(flow_id=1, node_id=5))
for i in range(4):
connection = input_schema.NodeConnection.create_from_simple_input(i, 5)
add_connection(graph, connection)
=>
Manual Input (id=0)
# |___ Manual Input (id=1)
# |___ Manual Input (id=2)
# |___ Manual Input (id=3)
# |___ Union (id=5)
I gave it gen ai and came up with this solution (which will need a refactor to make it easier to maintain, but the tree respects the dependencies).
def print_tree(self):
"""
Print flow_graph as a visual tree structure, showing the DAG relationships with ASCII art.
"""
if not self._node_db:
print("Empty flow graph")
return
# Build node information
node_info = {}
for node in self.nodes:
node_id = node.node_id
# Get node label
operation = node.node_type.replace("_", " ").title() if node.node_type else "Unknown"
label = f"{operation} (id={node_id})"
if hasattr(node, 'setting_input') and hasattr(node.setting_input, 'description'):
if node.setting_input.description:
desc = node.setting_input.description
if len(desc) > 20: # Truncate long descriptions
desc = desc[:17] + "..."
label = f"{operation} ({node_id}): {desc}"
# Get inputs and outputs
inputs = {
'main': [n.node_id for n in (node.node_inputs.main_inputs or [])],
'left': node.node_inputs.left_input.node_id if node.node_inputs.left_input else None,
'right': node.node_inputs.right_input.node_id if node.node_inputs.right_input else None
}
outputs = [n.node_id for n in node.leads_to_nodes]
node_info[node_id] = {
'label': label,
'short_label': f"{operation} ({node_id})",
'inputs': inputs,
'outputs': outputs,
'depth': 0
}
# Calculate depth for each node
def calculate_depth(node_id, visited=None):
if visited is None:
visited = set()
if node_id in visited:
return node_info[node_id]['depth']
visited.add(node_id)
max_input_depth = -1
inputs = node_info[node_id]['inputs']
for main_id in inputs['main']:
max_input_depth = max(max_input_depth, calculate_depth(main_id, visited))
if inputs['left']:
max_input_depth = max(max_input_depth, calculate_depth(inputs['left'], visited))
if inputs['right']:
max_input_depth = max(max_input_depth, calculate_depth(inputs['right'], visited))
node_info[node_id]['depth'] = max_input_depth + 1
return node_info[node_id]['depth']
# Calculate depths for all nodes
for node_id in node_info:
calculate_depth(node_id)
# Group nodes by depth
depth_groups = {}
max_depth = 0
for node_id, info in node_info.items():
depth = info['depth']
max_depth = max(max_depth, depth)
if depth not in depth_groups:
depth_groups[depth] = []
depth_groups[depth].append(node_id)
# Sort nodes within each depth group
for depth in depth_groups:
depth_groups[depth].sort()
# Create the main flow visualization
lines = []
lines.append("=" * 80)
lines.append("Flow Graph Visualization")
lines.append("=" * 80)
lines.append("")
# Track which nodes connect to what
merge_points = {} # target_id -> list of source_ids
for node_id, info in node_info.items():
for output_id in info['outputs']:
if output_id not in merge_points:
merge_points[output_id] = []
merge_points[output_id].append(node_id)
# Build the flow paths
paths = [] # List of paths through the graph
visited_in_paths = set()
# Find all root nodes (no inputs)
root_nodes = [nid for nid, info in node_info.items()
if not info['inputs']['main'] and not info['inputs']['left'] and not info['inputs']['right']]
if not root_nodes and self._flow_starts:
root_nodes = [n.node_id for n in self._flow_starts]
# Trace paths from each root
def trace_path(node_id, current_path=None):
if current_path is None:
current_path = []
current_path = current_path + [node_id]
outputs = node_info[node_id]['outputs']
if not outputs:
# End of path
return [current_path]
# If this node has multiple outputs or connects to a merge point, branch
all_paths = []
for output_id in outputs:
if output_id in merge_points and len(merge_points[output_id]) > 1:
# This is a merge point, end this path here
all_paths.append(current_path + [output_id])
else:
# Continue the path
all_paths.extend(trace_path(output_id, current_path))
return all_paths
# Get all paths
for root_id in root_nodes:
paths.extend(trace_path(root_id))
# Find the maximum label length for each depth level
max_label_length = {}
for depth in range(max_depth + 1):
if depth in depth_groups:
max_len = max(len(node_info[nid]['label']) for nid in depth_groups[depth])
max_label_length[depth] = max_len
# Draw the paths
drawn_nodes = set()
merge_drawn = set()
# Group paths by their merge points
paths_by_merge = {}
standalone_paths = []
for path in paths:
if len(path) > 1 and path[-1] in merge_points and len(merge_points[path[-1]]) > 1:
merge_id = path[-1]
if merge_id not in paths_by_merge:
paths_by_merge[merge_id] = []
paths_by_merge[merge_id].append(path)
else:
standalone_paths.append(path)
# Draw merged paths
for merge_id, merge_paths in paths_by_merge.items():
if merge_id in merge_drawn:
continue
merge_info = node_info[merge_id]
sources = merge_points[merge_id]
# Draw each source path leading to the merge
for i, source_id in enumerate(sources):
# Find the path containing this source
source_path = None
for path in merge_paths:
if source_id in path:
source_path = path[:path.index(source_id) + 1]
break
if source_path:
# Build the line for this path
line_parts = []
for j, nid in enumerate(source_path):
if j == 0:
line_parts.append(node_info[nid]['label'])
else:
line_parts.append(f" ──> {node_info[nid]['short_label']}")
# Add the merge arrow
if i == 0:
# First source
line = "".join(line_parts) + " ─────┐"
lines.append(line)
elif i == len(sources) - 1:
# Last source
line = "".join(line_parts) + " ─────┴──> " + merge_info['label']
lines.append(line)
# Continue with the rest of the path after merge
remaining = node_info[merge_id]['outputs']
while remaining:
next_id = remaining[0]
lines[-1] += f" ──> {node_info[next_id]['short_label']}"
remaining = node_info[next_id]['outputs']
drawn_nodes.add(next_id)
else:
# Middle sources
line = "".join(line_parts) + " ─────┤"
lines.append(line)
for nid in source_path:
drawn_nodes.add(nid)
drawn_nodes.add(merge_id)
merge_drawn.add(merge_id)
lines.append("") # Add spacing between merge groups
# Draw standalone paths
for path in standalone_paths:
if all(nid in drawn_nodes for nid in path):
continue
line_parts = []
for i, node_id in enumerate(path):
if node_id not in drawn_nodes:
if i == 0:
line_parts.append(node_info[node_id]['label'])
else:
line_parts.append(f" ──> {node_info[node_id]['short_label']}")
drawn_nodes.add(node_id)
if line_parts:
lines.append("".join(line_parts))
# Add any remaining undrawn nodes
for node_id in node_info:
if node_id not in drawn_nodes:
lines.append(node_info[node_id]['label'] + " (isolated)")
lines.append("")
lines.append("=" * 80)
lines.append("Execution Order")
lines.append("=" * 80)
try:
ordered_nodes = self.determine_execution_order()
if ordered_nodes:
for i, node in enumerate(ordered_nodes, 1):
lines.append(f" {i:3d}. {node_info[node.node_id]['label']}")
except Exception as e:
lines.append(f" Could not determine execution order: {e}")
# Print everything
output = "\n".join(lines)
print(output)
return output
It needs definitely some love and refactor but it prints the tree pretty nicely:
graph = create_graph()
for i in range(4):
input_data = (FlowDataEngine.create_random(100).apply_flowfile_formula('random_int(0, 4)', 'groups')
.select_columns(['groups', 'Country', 'sales_data']))
add_manual_input(graph, data=input_data.to_pylist(), node_id=i)
graph.add_union(input_schema.NodeUnion(flow_id=1, node_id=5))
for i in range(4):
connection = input_schema.NodeConnection.create_from_simple_input(i, 5)
add_connection(graph, connection)
graph.print_tree_v2()
result =>
_ = graph.print_tree_v2()
2025-08-19 14:23:03,985 - PipelineHandler - INFO - Starting topological sort to determine execution order
2025-08-19 14:23:03,986 - PipelineHandler - INFO - execution order:
[Node id: 0 (manual_input), Node id: 1 (manual_input), Node id: 2 (manual_input), Node id: 3 (manual_input), Node id: 5 (union)]
================================================================================
Flow Graph Visualization
================================================================================
Manual Input (id=0) ─────┐
Manual Input (id=1) ─────┤
Manual Input (id=2) ─────┤
Manual Input (id=3) ─────┴──> Union (id=5)
================================================================================
Execution Order
================================================================================
1. Manual Input (id=0)
2. Manual Input (id=1)
3. Manual Input (id=2)
4. Manual Input (id=3)
5. Union (id=5)
@Edwardvaneechoud I will be working on each of these suggestions. In summary:
I do have some questions about 1. :
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice separation 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
* Adding print_tree method to FlowGraph * Adding method print_tree() to FlowGraph * Adding method print_tree() to FlowGraph * Changes to node ordering in print_tree method * Changes to node ordering in print_tree method * Changes to node ordering of print_tree * Changes to node ordering of print_tree * retrigger checks * Adding tests for print_tree method * Adding tests for print_tree method * Adding tests for print_tree method * Adding tests for print_tree method * Adding tests for print_tree method * Adding tests for print_tree method * Adding tests for print_tree method * Adding tests for print_tree method * Adding tests for print_tree method * Merging skip_nodes and determine_execution_order * Merging skip_nodes and determine_execution_order * Merging skip_nodes and determine_execution_order * Merging skip_nodes and determine_execution_order * Merging skip_nodes and determine_execution_order * Merging skip_nodes and determine_execution_order * Merging skip_nodes and determine_execution_order * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Refactoring graph_tree method --------- Co-authored-by: Bernardo Fernandes <[email protected]>
* Adding print_tree method to FlowGraph * Adding method print_tree() to FlowGraph * Adding method print_tree() to FlowGraph * Changes to node ordering in print_tree method * Changes to node ordering in print_tree method * Changes to node ordering of print_tree * Changes to node ordering of print_tree * retrigger checks * Adding tests for print_tree method * Adding tests for print_tree method * Adding tests for print_tree method * Adding tests for print_tree method * Adding tests for print_tree method * Adding tests for print_tree method * Adding tests for print_tree method * Adding tests for print_tree method * Adding tests for print_tree method * Merging skip_nodes and determine_execution_order * Merging skip_nodes and determine_execution_order * Merging skip_nodes and determine_execution_order * Merging skip_nodes and determine_execution_order * Merging skip_nodes and determine_execution_order * Merging skip_nodes and determine_execution_order * Merging skip_nodes and determine_execution_order * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Adding new graph_tree method with tests * Refactoring graph_tree method --------- Co-authored-by: Bernardo Fernandes <[email protected]>
Adding
print_tree()
method toFlowGraph
class.Related issue : 101
Couple of notes :
Is there anyway I can access
FlowFile
attributes? Not sure if havingFlowGraph
inheriting fromFlowFile
would help here. Basically this is what is stopping me from accessing the operation type at each node (e.g: Filter, Select etc.)I couldn't find the symbols that would match what was described on the issue yet...