Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
dab974b
Adding print_tree method to FlowGraph
Aug 5, 2025
8aa0b74
Adding method print_tree() to FlowGraph
Aug 6, 2025
3ead94a
Adding method print_tree() to FlowGraph
Aug 6, 2025
e158075
Changes to node ordering in print_tree method
Aug 18, 2025
56be3d3
Merge branch 'feat/print_tree'
Aug 18, 2025
a237c11
Adding test_utils for GCS
Aug 26, 2025
8a9a3a7
Merge remote-tracking branch 'upstream/main'
Aug 26, 2025
8509df1
Feature/add python api in docs (#106)
Edwardvaneechoud Aug 12, 2025
8128be3
Improvement/improve mobile experience docs (#107)
Edwardvaneechoud Aug 15, 2025
8e2353b
Fixing handling of pivot (#105)
Edwardvaneechoud Aug 15, 2025
c1c22ea
Adding print_tree method to FlowGraph (#103)
Bennylave Aug 22, 2025
5345bb0
Migrating to pl-fuzzy-frame-match (#108)
Edwardvaneechoud Aug 22, 2025
dc0e50b
Feature/small refactor (#109)
Edwardvaneechoud Aug 23, 2025
c74d9da
Feature/unify execution methods (#110)
Edwardvaneechoud Aug 23, 2025
982563f
Bumping version
Edwardvaneechoud Aug 24, 2025
fdc6015
Adding test_utils for GCS
Aug 26, 2025
0f1f1b4
Improvement/improve mobile experience docs (#107)
Edwardvaneechoud Aug 15, 2025
bc5c0dc
Fixing handling of pivot (#105)
Edwardvaneechoud Aug 15, 2025
1fb1f32
Adding print_tree method to FlowGraph (#103)
Bennylave Aug 22, 2025
64ef20c
Migrating to pl-fuzzy-frame-match (#108)
Edwardvaneechoud Aug 22, 2025
ea0c878
Feature/small refactor (#109)
Edwardvaneechoud Aug 23, 2025
bb673f1
Feature/unify execution methods (#110)
Edwardvaneechoud Aug 23, 2025
2c0088c
Bumping version
Edwardvaneechoud Aug 24, 2025
1c77804
Merge branch 'main' into feat/gcp_az_compatible
Aug 26, 2025
aae9a75
Resolving conflicts
Aug 26, 2025
d15b58e
Adding GCS test case
Aug 31, 2025
5e8984e
Adding GCS test case
Aug 31, 2025
c5abb72
Adding GCS test case
Aug 31, 2025
056e201
Adding GCS test case
Aug 31, 2025
eaebec6
Adding GCS test case
Aug 31, 2025
d39c59c
Adding GCS test case
Aug 31, 2025
df03a5c
Adding GCS test case
Aug 31, 2025
0115db0
Adding GCS test case
Aug 31, 2025
62be026
Adding GCS test case
Aug 31, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions flowfile_core/flowfile_core/flowfile/flow_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,8 @@ def analysis_preparation(flowfile_table: FlowDataEngine):
node_id=node.node_id,
flow_id=self.flow_id,
)
node.results.analysis_data_generator = get_read_top_n(external_sampler.status.file_ref,
n=min(sample_size, number_of_records))
node.results.analysis_data_generator = get_read_top_n(external_sampler.status.file_ref,
n=min(sample_size, number_of_records))
return flowfile_table
Expand Down
13 changes: 13 additions & 0 deletions flowfile_core/flowfile_core/flowfile/flow_node/flow_node.py
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think something went wrong when merging with master. There is some code duplication

Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,19 @@ def get_example_data():
return get_example_data
resulting_data = self.get_resulting_data()

if not performance_mode:
self.results.example_data_generator = example_data_generator()
def example_data_generator():
example_data = None

def get_example_data():
nonlocal example_data
if example_data is None:
example_data = resulting_data.get_sample(100).to_arrow()
return example_data
return get_example_data
resulting_data = self.get_resulting_data()

if not performance_mode:
self.results.example_data_generator = example_data_generator()
self.node_schema.result_schema = self.results.resulting_data.schema
Expand Down
15 changes: 15 additions & 0 deletions flowfile_core/tests/flowfile/analytics/test_analytics_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,21 @@
from pathlib import Path


def find_parent_directory(target_dir_name, start_path=None):
"""Navigate up directories until finding the target directory"""
current_path = Path(start_path) if start_path else Path.cwd()

while current_path != current_path.parent:
if current_path.name == target_dir_name:
return current_path
if current_path.name == target_dir_name:
return current_path
current_path = current_path.parent

raise FileNotFoundError(f"Directory '{target_dir_name}' not found")
from pathlib import Path


def find_parent_directory(target_dir_name, start_path=None):
"""Navigate up directories until finding the target directory"""
current_path = Path(start_path) if start_path else Path.cwd()
Expand Down
32 changes: 32 additions & 0 deletions flowfile_core/tests/flowfile/test_flowfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,15 +266,22 @@ def test_opening_parquet_file(flow_logger: FlowLogger):
def test_running_performance_mode():
graph = create_graph()
from flowfile_core.configs.settings import OFFLOAD_TO_WORKER
from flowfile_core.configs.settings import OFFLOAD_TO_WORKER
add_node_promise_on_type(graph, 'read', 1, 1)
from flowfile_core.configs.flow_logger import main_logger
received_table = input_schema.ReceivedTable(
file_type='parquet', name='table.parquet',
path=str(find_parent_directory("Flowfile")/'flowfile_core/tests/support_files/data/table.parquet'))
from flowfile_core.configs.flow_logger import main_logger
received_table = input_schema.ReceivedTable(
file_type='parquet', name='table.parquet',
path=str(find_parent_directory("Flowfile")/'flowfile_core/tests/support_files/data/table.parquet'))
node_read = input_schema.NodeRead(flow_id=1, node_id=1, cache_data=False, received_file=received_table)
graph.add_read(node_read)
main_logger.warning(str(graph))
main_logger.warning(OFFLOAD_TO_WORKER)
main_logger.warning(str(graph))
main_logger.warning(OFFLOAD_TO_WORKER)
add_node_promise_on_type(graph, 'record_count', 2)
connection = input_schema.NodeConnection.create_from_simple_input(1, 2)
add_connection(graph, connection)
Expand All @@ -286,6 +293,7 @@ def test_running_performance_mode():
graph.flow_settings.execution_mode = 'Development'
slow = graph.run_graph()


assert slow.node_step_result[1].run_time > fast.node_step_result[1].run_time, 'Performance mode should be faster'


Expand Down Expand Up @@ -421,6 +429,25 @@ def get_dependency_example():
return graph


def get_dependency_example():
graph = create_graph()
graph = add_manual_input(graph, data=[{'name': 'John', 'city': 'New York'},
{'name': 'Jane', 'city': 'Los Angeles'},
{'name': 'Edward', 'city': 'Chicago'},
{'name': 'Courtney', 'city': 'Chicago'}]
)
node_promise = input_schema.NodePromise(flow_id=1, node_id=2, node_type='unique')
graph.add_node_promise(node_promise)

node_connection = input_schema.NodeConnection.create_from_simple_input(from_id=1, to_id=2)
add_connection(graph, node_connection)
input_file = input_schema.NodeUnique(flow_id=1, node_id=2,
unique_input=transform_schema.UniqueInput(columns=['city'])
)
graph.add_unique(input_file)
return graph


def ensure_excel_is_read_from_arrow_object():
settings = {'flow_id': 1, 'node_id': 1, 'cache_results': True, 'pos_x': 234.37272727272727,
'pos_y': 271.5272727272727, 'is_setup': True, 'description': '',
Expand Down Expand Up @@ -1109,6 +1136,7 @@ def test_schema_callback_cloud_read(flow_logger):
graph.add_cloud_storage_reader(node_settings)
node = graph.get_node(1)
assert node.schema_callback._future is not None, 'Schema callback future should be set'
assert node.schema_callback._future is not None, 'Schema callback future should be set'
assert len(node.schema_callback()) == 4, 'Schema should have 4 columns'
original_schema_callback = id(node.schema_callback)
graph.add_cloud_storage_reader(node_settings)
Expand Down Expand Up @@ -1152,12 +1180,16 @@ def tracking_method(*args, **kwargs):
handle_run_info(result)


@pytest.mark.skipif(not is_docker_available(), reason="Docker is not available or not running so database reader cannot be tested")
@pytest.mark.skipif(not is_docker_available(), reason="Docker is not available or not running so database reader cannot be tested")
def test_complex_cloud_write_scenario():


ensure_cloud_storage_connection_is_available_and_get_connection()
handler = FlowfileHandler()

flow_id = handler.import_flow(find_parent_directory("Flowfile") / "flowfile_core/tests/support_files/flows/test_cloud_local.flowfile")

flow_id = handler.import_flow(find_parent_directory("Flowfile") / "flowfile_core/tests/support_files/flows/test_cloud_local.flowfile")
graph = handler.get_flow(flow_id)
node= graph.get_node(3)
Expand Down
1 change: 1 addition & 0 deletions flowfile_frame/flowfile_frame/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# Core classes
from flowfile_frame.flow_frame import FlowFrame # noqa: F401
from pl_fuzzy_frame_match.models import FuzzyMapping # noqa: F401
from pl_fuzzy_frame_match.models import FuzzyMapping # noqa: F401

from flowfile_frame.utils import create_flow_graph # noqa: F401

Expand Down
7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ python-jose = "^3.4.0"
bcrypt = "^4.3.0"
connectorx = "^0.4.2"
polars_simed = "^0.3.4"
requests = "^2.32.5"

# Only install pendulum for Python versions below 3.12
pendulum = { version = "2.1.2", markers = "python_version < '3.12'" }
Expand All @@ -58,7 +59,9 @@ httpx = "^0.28.1"
tqdm = "^4.67.1"
s3fs = "^2025.7.0"
pl-fuzzy-frame-match = ">=0.4.0"

google-auth = "^2.40.0"
google-cloud-storage = "^3.0.0"
azure-storage-blob = "^12.26.0"

[tool.poetry.scripts]
flowfile_worker = "flowfile_worker.main:run"
Expand All @@ -70,6 +73,8 @@ stop_postgres = "test_utils.postgres.commands:stop_postgres"
flowfile = "flowfile.__main__:main"
start_minio = "test_utils.s3.commands:start_minio"
stop_minio = "test_utils.s3.commands:stop_minio"
start_gcs = "test_utils.gcs.commands:start_gcs"
stop_gcs = "test_utils.gcs.commands:stop_gcs"


[tool.poetry.group.dev.dependencies]
Expand Down
Empty file added test_utils/gcs/__init__.py
Empty file.
45 changes: 45 additions & 0 deletions test_utils/gcs/commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import logging

# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger("postgres_commands")


def start_gcs():
"""Start GCS Server container for testing"""
from . import fixtures
if not fixtures.is_docker_available():
logger.warning("Docker is not available. Cannot start PostgreSQL container.")
print("\n" + "=" * 50)
print("SKIPPING: Docker is not available on this system")
print("Tests requiring Docker will need to be skipped")
print("=" * 50 + "\n")
return 0 # Return success to allow pipeline to continue


if fixtures.start_gcs():
print(f"MinIO started at http://localhost:{fixtures.GCS_PORT}")
return 0
return 1


def stop_gcs():
"""Stop MinIO container"""
from . import fixtures

if not fixtures.is_docker_available():
logger.warning("Docker is not available. Cannot stop MinIO container.")
print("\n" + "=" * 50)
print("SKIPPING: Docker is not available on this system")
print("Tests requiring Docker will need to be skipped")
print("=" * 50 + "\n")
return 0

if fixtures.stop_gcs():
print("MinIO stopped successfully")
return 0
return 1
Loading
Loading