Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
dab974b
Adding print_tree method to FlowGraph
Aug 5, 2025
8aa0b74
Adding method print_tree() to FlowGraph
Aug 6, 2025
3ead94a
Adding method print_tree() to FlowGraph
Aug 6, 2025
f2a4f55
Merge branch 'main' into feat/print_tree
Bennylave Aug 6, 2025
16f3f54
Merge branch 'main' into feat/print_tree
Bennylave Aug 7, 2025
e158075
Changes to node ordering in print_tree method
Aug 18, 2025
56be3d3
Merge branch 'feat/print_tree'
Aug 18, 2025
e317563
Merge branch 'main' into feat/print_tree
Bennylave Aug 18, 2025
667ee1d
Changes to node ordering in print_tree method
Aug 18, 2025
f9702fa
Changes to node ordering of print_tree
Aug 18, 2025
d98ad7b
Changes to node ordering of print_tree
Aug 18, 2025
5143849
retrigger checks
Aug 18, 2025
633c955
Merge branch 'main' into feat/print_tree
Aug 18, 2025
83a72d1
Adding tests for print_tree method
Aug 20, 2025
df09644
Adding tests for print_tree method
Aug 20, 2025
5487298
Adding tests for print_tree method
Aug 20, 2025
90e237f
Adding tests for print_tree method
Aug 20, 2025
f3a70d9
Adding tests for print_tree method
Aug 20, 2025
128e14c
Adding tests for print_tree method
Aug 20, 2025
fc6c787
Adding tests for print_tree method
Aug 20, 2025
c8b2a45
Adding tests for print_tree method
Aug 20, 2025
efcedca
Adding tests for print_tree method
Aug 20, 2025
25b3aa0
Merging skip_nodes and determine_execution_order
Aug 20, 2025
f913b4f
Merging skip_nodes and determine_execution_order
Aug 20, 2025
fcfb487
Merging skip_nodes and determine_execution_order
Aug 20, 2025
ba95c9e
Merging skip_nodes and determine_execution_order
Aug 20, 2025
503fdd2
Merging skip_nodes and determine_execution_order
Aug 20, 2025
6ec7f0f
Merging skip_nodes and determine_execution_order
Aug 20, 2025
dcf799b
Merging skip_nodes and determine_execution_order
Aug 20, 2025
06b044b
Adding new graph_tree method with tests
Aug 21, 2025
da5984c
Adding new graph_tree method with tests
Aug 21, 2025
c3d1688
Adding new graph_tree method with tests
Aug 21, 2025
6134820
Adding new graph_tree method with tests
Aug 21, 2025
8f8ff5a
Adding new graph_tree method with tests
Aug 21, 2025
3c9111c
Adding new graph_tree method with tests
Aug 21, 2025
88b1048
Adding new graph_tree method with tests
Aug 21, 2025
3e1cace
Adding new graph_tree method with tests
Aug 21, 2025
0a7faa8
Adding new graph_tree method with tests
Aug 21, 2025
2eef537
Adding new graph_tree method with tests
Aug 21, 2025
e7e37e1
Adding new graph_tree method with tests
Aug 21, 2025
cb9eab5
Adding new graph_tree method with tests
Aug 21, 2025
1f610aa
Adding new graph_tree method with tests
Aug 21, 2025
7a220dd
Adding new graph_tree method with tests
Aug 21, 2025
16be304
Adding new graph_tree method with tests
Aug 21, 2025
1a81d94
Adding new graph_tree method with tests
Aug 22, 2025
e737cc7
Adding new graph_tree method with tests
Aug 22, 2025
0f71671
Adding new graph_tree method with tests
Aug 22, 2025
eb3f47c
Refactoring graph_tree method
Aug 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flowfile/flowfile/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from subprocess import Popen
from flowfile_core.flowfile.flow_graph import FlowGraph
from tempfile import TemporaryDirectory
import flowfile as ff

# Configuration
FLOWFILE_HOST: str = os.environ.get("FLOWFILE_HOST", "127.0.0.1")
Expand Down
157 changes: 89 additions & 68 deletions flowfile_core/flowfile_core/flowfile/flow_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
from flowfile_core.flowfile.utils import snake_case_to_camel_case
from flowfile_core.flowfile.analytics.utils import create_graphic_walker_node_from_node_promise
from flowfile_core.flowfile.flow_node.flow_node import FlowNode
from flowfile_core.flowfile.util.execution_orderer import determine_execution_order
from flowfile_core.flowfile.util.execution_orderer import compute_execution_plan
from flowfile_core.flowfile.util.graph_tree import add_undrawn_nodes, build_flow_paths, build_node_info, calculate_depth, define_node_connections, draw_merged_paths, draw_standalone_paths, group_nodes_by_depth, trace_path
from flowfile_core.flowfile.flow_data_engine.polars_code_parser import polars_code_parser
from flowfile_core.flowfile.flow_data_engine.subprocess_operations.subprocess_operations import (ExternalDatabaseFetcher,
ExternalDatabaseWriter,
Expand Down Expand Up @@ -227,6 +228,8 @@ def __init__(self,
elif input_flow is not None:
self.add_datasource(input_file=input_flow)

skip_nodes, execution_order = compute_execution_plan(nodes=self.nodes,flow_starts=self._flow_starts+self.get_implicit_starter_nodes())

def add_node_promise(self, node_promise: input_schema.NodePromise):
"""Adds a placeholder node to the graph that is not yet fully configured.

Expand All @@ -243,64 +246,6 @@ def placeholder(n: FlowNode = None):
self.add_node_step(node_id=node_promise.node_id, node_type=node_promise.node_type, function=placeholder,
setting_input=node_promise)

def print_tree(self, show_schema=False, show_descriptions=False):
"""
Print flow_graph as a tree.
"""
max_node_id = max(self._node_db.keys())

tree = ""
tabs = 0
tab_counter = 0
for node in self.nodes:
tab_counter += 1
node_input = node.setting_input
operation = str(self._node_db[node_input.node_id]).split("(")[1][:-1].replace("_", " ").title()

if operation == "Formula":
operation = "With Columns"

tree += str(operation) + " (id=" + str(node_input.node_id) + ")"

if show_descriptions & show_schema:
raise ValueError('show_descriptions and show_schema cannot be True simultaneously')
if show_descriptions:
tree += ": " + str(node_input.description)
elif show_schema:
tree += " -> ["
if operation == "Manual Input":
schema = ", ".join([str(i.name) + ": " + str(i.data_type) for i in node_input.raw_data_format.columns])
tree += schema
elif operation == "With Columns":
tree_with_col_schema = ", " + node_input.function.field.name + ": " + node_input.function.field.data_type
tree += schema + tree_with_col_schema
elif operation == "Filter":
index = node_input.filter_input.advanced_filter.find("]")
filtered_column = str(node_input.filter_input.advanced_filter[1:index])
schema = re.sub('({str(filtered_column)}: [A-Za-z0-9]+\,\s)', "", schema)
tree += schema
elif operation == "Group By":
for col in node_input.groupby_input.agg_cols:
schema = re.sub(str(col.old_name) + ': [a-z0-9]+\, ', "", schema)
tree += schema
tree += "]"
else:
if operation == "Manual Input":
tree += ": " + str(node_input.raw_data_format.data)
elif operation == "With Columns":
tree += ": " + str(node_input.function)
elif operation == "Filter":
tree += ": " + str(node_input.filter_input.advanced_filter)
elif operation == "Group By":
tree += ": groupby=[" + ", ".join([col.old_name for col in node_input.groupby_input.agg_cols if col.agg == "groupby"]) + "], "
tree += "agg=[" + ", ".join([str(col.agg) + "(" + str(col.old_name) + ")" for col in node_input.groupby_input.agg_cols if col.agg != "groupby"]) + "]"

if node_input.node_id < max_node_id:
tree += "\n" + "# " + " "*3*(tabs-1) + "|___ "
print("\n"*2)

return print(tree)

def apply_layout(self, y_spacing: int = 150, x_spacing: int = 200, initial_y: int = 100):
"""Calculates and applies a layered layout to all nodes in the graph.

Expand Down Expand Up @@ -369,6 +314,89 @@ def __repr__(self):
settings_str = " -" + '\n -'.join(f"{k}: {v}" for k, v in self.flow_settings)
return f"FlowGraph(\nNodes: {self._node_db}\n\nSettings:\n{settings_str}"

def print_tree(self):
"""Print flow_graph as a visual tree structure, showing the DAG relationships with ASCII art."""
if not self._node_db:
print("Empty flow graph")
return

# Build node information
node_info = build_node_info(self.nodes)

# Calculate depths for all nodes
for node_id in node_info:
calculate_depth(node_id, node_info)

# Group nodes by depth
depth_groups, max_depth = group_nodes_by_depth(node_info)

# Sort nodes within each depth group
for depth in depth_groups:
depth_groups[depth].sort()

# Create the main flow visualization
lines = []
lines.append("=" * 80)
lines.append("Flow Graph Visualization")
lines.append("=" * 80)
lines.append("")

# Track which nodes connect to what
merge_points = define_node_connections(node_info)

# Build the flow paths
paths = build_flow_paths(node_info,self._flow_starts, merge_points)

# Find the maximum label length for each depth level
max_label_length = {}
for depth in range(max_depth + 1):
if depth in depth_groups:
max_len = max(len(node_info[nid]['label']) for nid in depth_groups[depth])
max_label_length[depth] = max_len

# Draw the paths
drawn_nodes = set()
merge_drawn = set()

# Group paths by their merge points
paths_by_merge = {}
standalone_paths = []

#Build flow paths
paths = build_flow_paths(node_info, self._flow_starts, merge_points)

# Define paths to merge and standalone paths
for path in paths:
if len(path) > 1 and path[-1] in merge_points and len(merge_points[path[-1]]) > 1:
merge_id = path[-1]
if merge_id not in paths_by_merge:
paths_by_merge[merge_id] = []
paths_by_merge[merge_id].append(path)
else:
standalone_paths.append(path)

# Draw merged paths
draw_merged_paths(node_info, merge_points, paths_by_merge,merge_drawn, drawn_nodes, lines)

# Draw standlone paths
draw_standalone_paths(drawn_nodes, standalone_paths, lines, node_info)

# Add undrawn nodes
add_undrawn_nodes(drawn_nodes, node_info, lines)

try:
skip_nodes, ordered_nodes = compute_execution_plan(nodes=self.nodes,flow_starts=self._flow_starts+self.get_implicit_starter_nodes())
if ordered_nodes:
for i, node in enumerate(ordered_nodes, 1):
lines.append(f" {i:3d}. {node_info[node.node_id]['label']}")
except Exception as e:
lines.append(f" Could not determine execution order: {e}")

# Print everything
output = "\n".join(lines)

print(output)

def get_nodes_overview(self):
"""Gets a list of dictionary representations for all nodes in the graph."""
output = []
Expand Down Expand Up @@ -1576,18 +1604,11 @@ def run_graph(self) -> RunInformation | None:
self.end_datetime = None
self.latest_run_info = None
self.flow_logger.info('Starting to run flowfile flow...')
skip_nodes = [node for node in self.nodes if not node.is_correct]
skip_nodes.extend([lead_to_node for node in skip_nodes for lead_to_node in node.leads_to_nodes])
execution_order = determine_execution_order(all_nodes=[node for node in self.nodes if
node not in skip_nodes],
flow_starts=self._flow_starts+self.get_implicit_starter_nodes())
skip_nodes, execution_order = compute_execution_plan(nodes=self.nodes, flow_starts=self._flow_starts+self.get_implicit_starter_nodes())

skip_node_message(self.flow_logger, skip_nodes)
execution_order_message(self.flow_logger, execution_order)
performance_mode = self.flow_settings.execution_mode == 'Performance'
if self.flow_settings.execution_location == 'local':
OFFLOAD_TO_WORKER.value = False
elif self.flow_settings.execution_location == 'remote':
OFFLOAD_TO_WORKER.value = True
for node in execution_order:
node_logger = self.flow_logger.get_node_logger(node.node_id)
if self.flow_settings.is_canceled:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@
from flowfile_core.flowfile.flow_node.flow_node import FlowNode
from flowfile_core.configs import logger
from collections import deque, defaultdict
from flowfile_core.flowfile.util.node_skipper import determine_nodes_to_skip

def compute_execution_plan(nodes: List[FlowNode], flow_starts: List[FlowNode] = None):
""" Computes the execution order after finding the nodes to skip on the execution step."""
skip_nodes = determine_nodes_to_skip(nodes=nodes)
computed_execution_order = determine_execution_order(all_nodes=[node for node in nodes if node not in skip_nodes],
flow_starts=flow_starts)
return skip_nodes, computed_execution_order



def determine_execution_order(all_nodes: List[FlowNode], flow_starts: List[FlowNode] = None) -> List[FlowNode]:
Expand Down
Loading
Loading