diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index b3e4df9d..b417c263 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -526,6 +526,8 @@ def analysis_preparation(flowfile_table: FlowDataEngine): node_id=node.node_id, flow_id=self.flow_id, ) + node.results.analysis_data_generator = get_read_top_n(external_sampler.status.file_ref, + n=min(sample_size, number_of_records)) node.results.analysis_data_generator = get_read_top_n(external_sampler.status.file_ref, n=min(sample_size, number_of_records)) return flowfile_table diff --git a/flowfile_core/flowfile_core/flowfile/flow_node/flow_node.py b/flowfile_core/flowfile_core/flowfile/flow_node/flow_node.py index 3a8f1a18..6cd7339c 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_node/flow_node.py +++ b/flowfile_core/flowfile_core/flowfile/flow_node/flow_node.py @@ -734,6 +734,19 @@ def get_example_data(): return get_example_data resulting_data = self.get_resulting_data() + if not performance_mode: + self.results.example_data_generator = example_data_generator() + def example_data_generator(): + example_data = None + + def get_example_data(): + nonlocal example_data + if example_data is None: + example_data = resulting_data.get_sample(100).to_arrow() + return example_data + return get_example_data + resulting_data = self.get_resulting_data() + if not performance_mode: self.results.example_data_generator = example_data_generator() self.node_schema.result_schema = self.results.resulting_data.schema diff --git a/flowfile_core/tests/flowfile/analytics/test_analytics_processor.py b/flowfile_core/tests/flowfile/analytics/test_analytics_processor.py index 4e2ba5d3..6e0edb6b 100644 --- a/flowfile_core/tests/flowfile/analytics/test_analytics_processor.py +++ b/flowfile_core/tests/flowfile/analytics/test_analytics_processor.py @@ -7,6 +7,21 @@ from pathlib import Path +def find_parent_directory(target_dir_name, start_path=None): + """Navigate up directories until finding the target directory""" + current_path = Path(start_path) if start_path else Path.cwd() + + while current_path != current_path.parent: + if current_path.name == target_dir_name: + return current_path + if current_path.name == target_dir_name: + return current_path + current_path = current_path.parent + + raise FileNotFoundError(f"Directory '{target_dir_name}' not found") +from pathlib import Path + + def find_parent_directory(target_dir_name, start_path=None): """Navigate up directories until finding the target directory""" current_path = Path(start_path) if start_path else Path.cwd() diff --git a/flowfile_core/tests/flowfile/test_flowfile.py b/flowfile_core/tests/flowfile/test_flowfile.py index 41483d2c..8a31915a 100644 --- a/flowfile_core/tests/flowfile/test_flowfile.py +++ b/flowfile_core/tests/flowfile/test_flowfile.py @@ -266,8 +266,13 @@ def test_opening_parquet_file(flow_logger: FlowLogger): def test_running_performance_mode(): graph = create_graph() from flowfile_core.configs.settings import OFFLOAD_TO_WORKER + from flowfile_core.configs.settings import OFFLOAD_TO_WORKER add_node_promise_on_type(graph, 'read', 1, 1) from flowfile_core.configs.flow_logger import main_logger + received_table = input_schema.ReceivedTable( + file_type='parquet', name='table.parquet', + path=str(find_parent_directory("Flowfile")/'flowfile_core/tests/support_files/data/table.parquet')) + from flowfile_core.configs.flow_logger import main_logger received_table = input_schema.ReceivedTable( file_type='parquet', name='table.parquet', path=str(find_parent_directory("Flowfile")/'flowfile_core/tests/support_files/data/table.parquet')) @@ -275,6 +280,8 @@ def test_running_performance_mode(): graph.add_read(node_read) main_logger.warning(str(graph)) main_logger.warning(OFFLOAD_TO_WORKER) + main_logger.warning(str(graph)) + main_logger.warning(OFFLOAD_TO_WORKER) add_node_promise_on_type(graph, 'record_count', 2) connection = input_schema.NodeConnection.create_from_simple_input(1, 2) add_connection(graph, connection) @@ -286,6 +293,7 @@ def test_running_performance_mode(): graph.flow_settings.execution_mode = 'Development' slow = graph.run_graph() + assert slow.node_step_result[1].run_time > fast.node_step_result[1].run_time, 'Performance mode should be faster' @@ -421,6 +429,25 @@ def get_dependency_example(): return graph +def get_dependency_example(): + graph = create_graph() + graph = add_manual_input(graph, data=[{'name': 'John', 'city': 'New York'}, + {'name': 'Jane', 'city': 'Los Angeles'}, + {'name': 'Edward', 'city': 'Chicago'}, + {'name': 'Courtney', 'city': 'Chicago'}] +) + node_promise = input_schema.NodePromise(flow_id=1, node_id=2, node_type='unique') + graph.add_node_promise(node_promise) + + node_connection = input_schema.NodeConnection.create_from_simple_input(from_id=1, to_id=2) + add_connection(graph, node_connection) + input_file = input_schema.NodeUnique(flow_id=1, node_id=2, + unique_input=transform_schema.UniqueInput(columns=['city']) + ) + graph.add_unique(input_file) + return graph + + def ensure_excel_is_read_from_arrow_object(): settings = {'flow_id': 1, 'node_id': 1, 'cache_results': True, 'pos_x': 234.37272727272727, 'pos_y': 271.5272727272727, 'is_setup': True, 'description': '', @@ -1109,6 +1136,7 @@ def test_schema_callback_cloud_read(flow_logger): graph.add_cloud_storage_reader(node_settings) node = graph.get_node(1) assert node.schema_callback._future is not None, 'Schema callback future should be set' + assert node.schema_callback._future is not None, 'Schema callback future should be set' assert len(node.schema_callback()) == 4, 'Schema should have 4 columns' original_schema_callback = id(node.schema_callback) graph.add_cloud_storage_reader(node_settings) @@ -1152,12 +1180,16 @@ def tracking_method(*args, **kwargs): handle_run_info(result) +@pytest.mark.skipif(not is_docker_available(), reason="Docker is not available or not running so database reader cannot be tested") @pytest.mark.skipif(not is_docker_available(), reason="Docker is not available or not running so database reader cannot be tested") def test_complex_cloud_write_scenario(): + ensure_cloud_storage_connection_is_available_and_get_connection() handler = FlowfileHandler() + flow_id = handler.import_flow(find_parent_directory("Flowfile") / "flowfile_core/tests/support_files/flows/test_cloud_local.flowfile") + flow_id = handler.import_flow(find_parent_directory("Flowfile") / "flowfile_core/tests/support_files/flows/test_cloud_local.flowfile") graph = handler.get_flow(flow_id) node= graph.get_node(3) diff --git a/flowfile_frame/flowfile_frame/__init__.py b/flowfile_frame/flowfile_frame/__init__.py index 9b8ba70b..12d68829 100644 --- a/flowfile_frame/flowfile_frame/__init__.py +++ b/flowfile_frame/flowfile_frame/__init__.py @@ -4,6 +4,7 @@ # Core classes from flowfile_frame.flow_frame import FlowFrame # noqa: F401 from pl_fuzzy_frame_match.models import FuzzyMapping # noqa: F401 +from pl_fuzzy_frame_match.models import FuzzyMapping # noqa: F401 from flowfile_frame.utils import create_flow_graph # noqa: F401 diff --git a/pyproject.toml b/pyproject.toml index afa93ebd..1de35c82 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,6 +47,7 @@ python-jose = "^3.4.0" bcrypt = "^4.3.0" connectorx = "^0.4.2" polars_simed = "^0.3.4" +requests = "^2.32.5" # Only install pendulum for Python versions below 3.12 pendulum = { version = "2.1.2", markers = "python_version < '3.12'" } @@ -58,7 +59,9 @@ httpx = "^0.28.1" tqdm = "^4.67.1" s3fs = "^2025.7.0" pl-fuzzy-frame-match = ">=0.4.0" - +google-auth = "^2.40.0" +google-cloud-storage = "^3.0.0" +azure-storage-blob = "^12.26.0" [tool.poetry.scripts] flowfile_worker = "flowfile_worker.main:run" @@ -70,6 +73,8 @@ stop_postgres = "test_utils.postgres.commands:stop_postgres" flowfile = "flowfile.__main__:main" start_minio = "test_utils.s3.commands:start_minio" stop_minio = "test_utils.s3.commands:stop_minio" +start_gcs = "test_utils.gcs.commands:start_gcs" +stop_gcs = "test_utils.gcs.commands:stop_gcs" [tool.poetry.group.dev.dependencies] diff --git a/test_utils/gcs/__init__.py b/test_utils/gcs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/test_utils/gcs/commands.py b/test_utils/gcs/commands.py new file mode 100644 index 00000000..50eda916 --- /dev/null +++ b/test_utils/gcs/commands.py @@ -0,0 +1,45 @@ +import logging + +# Set up logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) +logger = logging.getLogger("postgres_commands") + + +def start_gcs(): + """Start GCS Server container for testing""" + from . import fixtures + if not fixtures.is_docker_available(): + logger.warning("Docker is not available. Cannot start PostgreSQL container.") + print("\n" + "=" * 50) + print("SKIPPING: Docker is not available on this system") + print("Tests requiring Docker will need to be skipped") + print("=" * 50 + "\n") + return 0 # Return success to allow pipeline to continue + + + if fixtures.start_gcs(): + print(f"MinIO started at http://localhost:{fixtures.GCS_PORT}") + return 0 + return 1 + + +def stop_gcs(): + """Stop MinIO container""" + from . import fixtures + + if not fixtures.is_docker_available(): + logger.warning("Docker is not available. Cannot stop MinIO container.") + print("\n" + "=" * 50) + print("SKIPPING: Docker is not available on this system") + print("Tests requiring Docker will need to be skipped") + print("=" * 50 + "\n") + return 0 + + if fixtures.stop_gcs(): + print("MinIO stopped successfully") + return 0 + return 1 \ No newline at end of file diff --git a/test_utils/gcs/data_generator.py b/test_utils/gcs/data_generator.py new file mode 100644 index 00000000..66885967 --- /dev/null +++ b/test_utils/gcs/data_generator.py @@ -0,0 +1,283 @@ + +import logging +import io +import os + +# Third-party libraries +from google.cloud import storage +import polars as pl +import pyarrow as pa +from deltalake import write_deltalake +from pyiceberg.catalog import load_catalog +import mimetypes +from typing import Dict +from google.auth.credentials import AnonymousCredentials +import requests +import google.auth.transport.requests + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +GCS_HOST = os.environ.get("TEST_GCS_HOST", "localhost") +GCS_PORT = int(os.environ.get("TEST_GCS_PORT", 4443)) +GCS_BUCKET_NAME = os.environ.get("TEST_GCS_CONTAINER", "test-gcs") +GCS_ENDPOINT_URL = f"http://{GCS_HOST}:{GCS_PORT}" +FAKE_GCS_SERVER_NAME = os.environ.get("TEST_MINIO_CONTAINER", "test-gcs-server") + +def _create_single_csv_file(gcs_client, df: pl.DataFrame, bucket_name: str): + """Creates a single CSV file from a DataFrame and uploads it to GCS.""" + logger.info("Writing single-file CSV...") + csv_buffer = io.BytesIO() + df.write_csv(csv_buffer) + csv_buffer.seek(0) + bucket = gcs_client.bucket(bucket_name) + blob = bucket.blob('single-file-csv/data.csv') + blob.upload_from_file(csv_buffer, content_type='text/csv') + + +def _create_multi_file_csv(gcs_client, df: pl.DataFrame, bucket_name: str, num_files: int = 10): + """Creates multiple CSV files from a DataFrame and uploads them to S3.""" + logger.info(f"Writing {num_files} CSV files...") + data_size = len(df) + rows_per_file = data_size // num_files + bucket = gcs_client.bucket(bucket_name) + for i in range(num_files): + sub_df = df.slice(i * rows_per_file, rows_per_file) + csv_buffer = io.BytesIO() + sub_df.write_csv(csv_buffer) + csv_buffer.seek(0) + blob = bucket.blob(f'multi-file-csv/part_{i:02d}.csv') + blob.upload_from_file(csv_buffer, content_type='text/csv') + + + +def _create_single_file_json(gcs_client, df: pl.DataFrame, bucket_name: str): + """Creates a single JSON file from a DataFrame and uploads it to S3.""" + logger.info("Writing single-file JSON...") + json_buffer = io.BytesIO() + df.write_ndjson(json_buffer) + json_buffer.seek(0) + bucket = gcs_client.bucket(bucket_name) + blob = bucket.blob('single-file-json/data.json') + blob.upload_from_file(json_buffer, content_type='application/json') + + +def _create_multi_file_json(gcs_client, df: pl.DataFrame, bucket_name: str, num_files: int = 10): + """Creates multiple JSON files from a DataFrame and uploads them to S3.""" + logger.info(f"Writing {num_files} JSON files...") + data_size = len(df) + rows_per_file = data_size // num_files + bucket = gcs_client.bucket(bucket_name) + for i in range(num_files): + sub_df = df.slice(i * rows_per_file, rows_per_file) + json_buffer = io.BytesIO() + sub_df.write_ndjson(json_buffer) + json_buffer.seek(0) + blob = bucket.blob(f'multi-file-json/part_{i:02d}.json') + blob.upload_from_file(json_buffer, content_type='application/json') + + +def _create_single_parquet_file(gcs_client, df: pl.DataFrame, bucket_name: str): + """Creates a single Parquet file from a DataFrame and uploads it to S3.""" + logger.info("Writing single-file Parquet...") + parquet_buffer = io.BytesIO() + df.write_parquet(parquet_buffer) + parquet_buffer.seek(0) + bucket = gcs_client.bucket(bucket_name) + blob = bucket.blob('single-file-parquet/data.parquet') + blob.upload_from_file(parquet_buffer, content_type='application/vnd.apache.parquet') + + +def _create_multi_parquet_file(gcs_client, df: pl.DataFrame, bucket_name: str, num_files: int = 10): + """Creates multiple Parquet files from a DataFrame and uploads them to S3.""" + logger.info(f"Writing {num_files} Parquet files...") + data_size = len(df) + rows_per_file = data_size // num_files + bucket = gcs_client.bucket(bucket_name) + for i in range(num_files): + sub_df = df.slice(i * rows_per_file, rows_per_file) + parquet_buffer = io.BytesIO() + sub_df.write_parquet(parquet_buffer) + parquet_buffer.seek(0) + blob = bucket.blob(f'multi-file-parquet/part_{i:02d}.parquet') + blob.upload_from_file(parquet_buffer, content_type='application/vnd.apache.parquet') + +def guess_content_type(filename): + """ Created since mimetypes doesn't know about parquet by default.""" + if filename.endswith(".parquet"): + return "application/vnd.apache.parquet" + if filename.endswith(".avro"): + return "application/avro" + return mimetypes.guess_type(filename)[0] or "application/octet-stream" + + + +def _create_delta_lake_table(gcs_client, arrow_table: pa.Table, bucket_name: str): + """Creates a Delta Lake table from a PyArrow table in S3.""" + logger.info("Writing Delta Lake table...") + bucket = gcs_client.bucket(bucket_name) + blob = bucket.blob(f"delta-lake-table") + for root, _, files in os.walk(f"http://{bucket_name}/delta-lake-table"): + for f in files: + local_path = os.path.join(root, f) + rel_path = os.path.relpath(local_path, arrow_table) + blob = bucket.blob(f"delta_table/{rel_path}") + blob.upload_from_filename(local_path, content_type=guess_content_type(local_path)) + print(f"Uploaded {rel_path} with content_type={blob.content_type}") + + + +def _create_iceberg_table(df: pl.DataFrame, bucket_name: str, endpoint_url: str, app_credentials: Dict, + gcs_client): + """Creates an Apache Iceberg table and FORCES sane metadata pointers.""" + logger.info("Writing Apache Iceberg table with SANE metadata access...") + # Configure the catalog properties for S3 access + catalog_props = { + "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO", + "gcs.endpoint": endpoint_url, + "gcs.app-credentials": app_credentials + } + + # Creating a bucket instance to upload files + bucket = gcs_client.bucket(bucket_name) + + # Use the SQL catalog with an in-memory SQLite database for storing metadata pointers + catalog = load_catalog( + "default", + **{ + "type": "sql", + "uri": "sqlite:///:memory:", # Use an in-memory SQL DB for the catalog + "warehouse": f"https://storage.googleapis.com/{bucket_name}/iceberg_warehouse", + **catalog_props, + } + ) + table_identifier = ("default_db", "iceberg_table") + # Create a namespace (like a schema or database) for the table + try: + catalog.drop_namespace("default_db") + except Exception: + pass # Ignore if namespace doesn't exist + catalog.create_namespace("default_db") + try: + catalog.load_table(table_identifier) + catalog.drop_table(table_identifier) + except: + pass + + # Create the table schema and object first + schema = df.to_arrow().schema + table = catalog.create_table(identifier=table_identifier, schema=schema) + + # Use the simplified write_iceberg method from Polars + df.write_iceberg(table, mode='overwrite') + + # NOW CREATE WHAT SHOULD EXIST BY DEFAULT - SANE METADATA POINTERS + # Get the current metadata location from the table + current_metadata = table.metadata_location + logger.info(f"Original metadata location: {current_metadata}") + + # Extract just the path part + if current_metadata.startswith("s3a://"): + current_metadata_key = current_metadata.replace(f"https://storage.googleapis.com/{bucket_name}/", "") + else: + current_metadata_key = current_metadata.replace(f"https://storage.googleapis.com/{bucket_name}/", "") + + # Read the current metadata + response = gcs_client.get_object(Bucket=bucket_name, Key=current_metadata_key) + metadata_content = response['Body'].read() + + # Get the metadata directory + metadata_dir = "/".join(current_metadata_key.split("/")[:-1]) + + # Write it to standardized locations + # 1. metadata.json in the metadata folder (this is what pl.scan_iceberg expects) + blob = bucket.blob(f"{metadata_dir}") + blob.upload_from_filename("metadata.json", content_type=guess_content_type(metadata_content)) + + logger.info(f"Created stable metadata.json at: https://storage.googleapis.com/{bucket_name}/{metadata_dir}/metadata.json") + + # 2. current.json as an additional pointer + blob = bucket.blob(f"{metadata_dir}") + blob.upload_from_filename("current.json", content_type=guess_content_type(metadata_content)) + + # 3. VERSION file that contains the current metadata filename + current_metadata_filename = current_metadata_key.split("/")[-1] + blob = bucket.bloc(f"{metadata_dir}/VERSION") + blob.upload_from_filename(current_metadata_filename.encode(), content_type=guess_content_type(metadata_content)) + + # 4. version-hint.text (some Iceberg readers look for this) + blob = bucket.bloc(f"{metadata_dir}/version-hint.text") + blob.upload_from_filename(current_metadata_filename.encode(), content_type=guess_content_type(metadata_content)) + + table_base = "iceberg_warehouse/default_db.db/my_iceberg_table" + logger.info(f""" +✅ Iceberg table created with SANE access patterns: + - Versioned metadata: https://storage.googleapis.com/{bucket_name}/{current_metadata_key} + - Latest metadata: https://storage.googleapis.com/{bucket_name}/{table_base}/metadata/metadata.json + - Current pointer: https://storage.googleapis.com/{bucket_name}/{table_base}/metadata/current.json + - Version hint: https://storage.googleapis.com/{bucket_name}/{table_base}/metadata/version-hint.text + + Read with: pl.scan_iceberg('s3://{bucket_name}/{table_base}/metadata/metadata.json').collect() +""") + + +def populate_test_data(endpoint_url: str, bucket_name: str): + """ + Populates a MinIO bucket with a variety of large-scale test data formats. + + Args: + endpoint_url (str): The S3 endpoint URL for the MinIO instance. + access_key (str): The access key for MinIO. + secret_key (str): The secret key for MinIO. + bucket_name (str): The name of the bucket to populate. + """ + logger.info("🚀 Starting data population...") + + # ---- Custom transport that skips SSL verification ---- + session = requests.Session() + session.verify = False # disable SSL cert verification + + transport = google.auth.transport.requests.AuthorizedSession( + AnonymousCredentials() + ) + transport.session = session # inject custom session + + # --- S3 Client and Storage Options --- + gcs_client = storage.Client( + project="test-project", + credentials=AnonymousCredentials(), + client_options={"api_endpoint": f"{endpoint_url}"}, + _http=transport + ) + + # --- Data Generation --- + data_size = 100_000 + df = pl.DataFrame({ + "id": range(1, data_size + 1), + "name": [f"user_{i}" for i in range(1, data_size + 1)], + "value": [i * 10.5 for i in range(1, data_size + 1)], + "category": ["A", "B", "C", "D", "E"] * (data_size // 5) + }) + logger.info(f"Generated a Polars DataFrame with {data_size} rows.") + # + # # --- Execute Data Population Scenarios --- + _create_single_csv_file(gcs_client, df, bucket_name) + _create_multi_file_csv(gcs_client, df, bucket_name) + _create_single_file_json(gcs_client, df, bucket_name) + _create_multi_file_json(gcs_client, df, bucket_name) + _create_single_parquet_file(gcs_client, df, bucket_name) + _create_multi_parquet_file(gcs_client, df, bucket_name) + + # Convert to PyArrow table once for Delta and Iceberg + arrow_table = df.to_arrow() + + _create_delta_lake_table(gcs_client, arrow_table, bucket_name) + _create_iceberg_table(df, bucket_name, endpoint_url, gcs_client) + + logger.info("✅ All test data populated successfully.") + + +if __name__ == '__main__': + populate_test_data(endpoint_url=GCS_ENDPOINT_URL, + bucket_name=GCS_BUCKET_NAME) \ No newline at end of file diff --git a/test_utils/gcs/demo_data_generator.py b/test_utils/gcs/demo_data_generator.py new file mode 100644 index 00000000..bf15f5f2 --- /dev/null +++ b/test_utils/gcs/demo_data_generator.py @@ -0,0 +1,178 @@ +import logging +import io +import os +import tempfile +import shutil +import random +from datetime import datetime, timedelta + +# Third-party libraries +from google.cloud import storage +from google.auth.credentials import AnonymousCredentials +import polars as pl +import pyarrow as pa +from pyarrow import parquet as pq + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# --- MinIO/gcs Configuration --- +GCS_HOST = os.environ.get("TEST_GCS_HOST", "localhost") +GCS_PORT = int(os.environ.get("TEST_GCS_PORT", 4443)) +GCS_ENDPOINT_URL = f"http://{GCS_HOST}:{GCS_PORT}" + +# --- Data Generation Functions --- + +def _create_sales_data(gcs_client, df: pl.DataFrame, bucket_name: str): + """ + Creates partitioned Parquet files for the sales data based on year and month. + gcs://data-lake/sales/year=YYYY/month=MM/ + """ + logger.info("Writing partitioned sales data...") + # Use Polars' built-in partitioning + # A temporary local directory is needed to stage the partitioned files before uploading + with tempfile.TemporaryDirectory() as temp_dir: + df.write_parquet( + temp_dir, + use_pyarrow=True, + pyarrow_options={"partition_cols": ["year", "month"]} + ) + # Walk through the local directory and upload files to gcs + for root, _, files in os.walk(temp_dir): + for file in files: + if file.endswith(".parquet"): + local_path = os.path.join(root, file) + # Construct the gcs key to match the desired structure + relative_path = os.path.relpath(local_path, temp_dir) + gcs_key = f"data-lake/sales/{relative_path.replace(os.path.sep, '/')}" + bucket = gcs_client.bucket(bucket_name) + blob = bucket.blob(f'{gcs_key}') + blob.upload_from_file(local_path, content_type='application/parquet') + logger.info(f"Finished writing sales data to gcs://{bucket_name}/data-lake/sales/") + +def _create_customers_data(gcs_client, df: pl.DataFrame, bucket_name: str): + """ + Creates a Parquet file for the customers data. + gcs://data-lake/customers/ + """ + logger.info("Writing customers Parquet data...") + parquet_buffer = io.BytesIO() + df.write_parquet(parquet_buffer) + parquet_buffer.seek(0) + bucket = gcs_client.bucket(bucket_name) + blob = bucket.blob('data-lake/customers/customers.parquet') + blob.upload_from_file(parquet_buffer.getvalue(), content_type='application/parquet') + logger.info(f"Finished writing customers data to gcs://{bucket_name}/data-lake/customers/") + + +def _create_orders_data(gcs_client, df: pl.DataFrame, bucket_name: str): + """ + Creates a pipe-delimited CSV file for the orders data. + gcs://raw-data/orders/ + """ + logger.info("Writing orders CSV data...") + csv_buffer = io.BytesIO() + # Write with pipe delimiter and header + df.write_csv(csv_buffer, separator="|") + csv_buffer.seek(0) + bucket = gcs_client.bucket(bucket_name) + blob = bucket.blob('raw-data/orders/orders.csv') + blob.upload_from_file(csv_buffer.getvalue(), content_type='text/csv') + logger.info(f"Finished writing orders data to gcs://{bucket_name}/raw-data/orders/") + +def _create_products_data(df: pl.DataFrame): + """ + Creates a local Parquet file for the products data. + """ + logger.info("Writing local products Parquet data...") + # Create a directory for local data if it doesn't exist + local_data_dir = "local_data" + os.makedirs(local_data_dir, exist_ok=True) + file_path = os.path.join(local_data_dir, "local_products.parquet") + df.write_parquet(file_path) + logger.info(f"Finished writing products data to {file_path}") + + +def create_demo_data(endpoint_url: str, bucket_name: str): + """ + Populates a MinIO bucket with test data matching the schemas from the examples. + """ + logger.info("🚀 Starting data population for flowfile examples...") + + gcs_client = storage.Client( + project="test-project", + credentials=AnonymousCredentials(), + client_options={"api_endpoint": f"{endpoint_url}"} + ) + + # --- Generate Core DataFrames --- + DATA_SIZE = 15_000 # Increased data size for more variety + START_DATE = datetime(2022, 1, 1) + END_DATE = datetime(2024, 12, 31) + TOTAL_DAYS = (END_DATE - START_DATE).days + + # States for region mapping + states = ["CA", "OR", "WA", "NY", "NJ", "PA", "TX", "FL", "GA", "IL", "OH", "MI"] + + # Generate base sales data across multiple years + sales_data = { + "order_id": range(1, DATA_SIZE + 1), + "customer_id": [random.randint(100, 299) for _ in range(DATA_SIZE)], + "product_id": [random.randint(1, 100) for _ in range(DATA_SIZE)], + "order_date": [START_DATE + timedelta(days=random.randint(0, TOTAL_DAYS)) for _ in range(DATA_SIZE)], + "quantity": [random.randint(1, 5) for _ in range(DATA_SIZE)], + "unit_price": [round(random.uniform(10.0, 500.0), 2) for _ in range(DATA_SIZE)], + "discount_rate": [random.choice([0.0, 0.1, 0.15, 0.2, None]) for _ in range(DATA_SIZE)], + "status": [random.choice(["completed", "pending", "cancelled"]) for _ in range(DATA_SIZE)], + "customer_lifetime_value": [random.uniform(500, 20000) for _ in range(DATA_SIZE)], + "state": [random.choice(states) for _ in range(DATA_SIZE)], + } + sales_df = pl.from_dict(sales_data).with_columns([ + pl.col("order_date").dt.year().alias("year"), + pl.col("order_date").dt.month().alias("month"), + # The 'amount' column in the example seems to be the price before discount + pl.col("unit_price").alias("amount") + ]) + + # Generate customers DataFrame + unique_customer_ids = sales_df["customer_id"].unique().to_list() + customers_df = pl.DataFrame({ + "customer_id": unique_customer_ids, + "customer_segment": [random.choice(["VIP", "Regular", "New"]) for _ in unique_customer_ids] + }) + + # Generate products DataFrame + unique_product_ids = sales_df["product_id"].unique().to_list() + # Create a map of product_id to unit_price from the first occurrence in sales_df + product_price_map = sales_df.group_by("product_id").agg(pl.first("unit_price")).to_dict(as_series=False) + price_dict = dict(zip(product_price_map['product_id'], product_price_map['unit_price'])) + + products_df = pl.DataFrame({ + "product_id": unique_product_ids, + "product_category": [random.choice(["Electronics", "Books", "Clothing", "Home Goods"]) for _ in unique_product_ids], + "unit_price": [price_dict.get(pid) for pid in unique_product_ids] + }) + + # Generate orders DataFrame for the CSV file (subset of sales) + orders_df = sales_df.select(["customer_id", "product_id", "quantity", "discount_rate"]) + + logger.info(f"Generated {len(sales_df)} sales records across {sales_df['year'].n_unique()} years, for {len(customers_df)} customers, and {len(products_df)} products.") + + # --- Write Data to gcs and Local Filesystem --- + _create_sales_data(gcs_client, sales_df, bucket_name) + _create_customers_data(gcs_client, customers_df, bucket_name) + _create_orders_data(gcs_client, orders_df, bucket_name) + _create_products_data(products_df) + + logger.info("✅ All test data populated successfully.") + + +if __name__ == '__main__': + # The bucket that will be created and populated + BUCKET = "flowfile-demo-data" + + create_demo_data( + endpoint_url=GCS_ENDPOINT_URL, + bucket_name=BUCKET + ) diff --git a/test_utils/gcs/fixtures.py b/test_utils/gcs/fixtures.py new file mode 100644 index 00000000..d2eb745a --- /dev/null +++ b/test_utils/gcs/fixtures.py @@ -0,0 +1,230 @@ +import os +import time +import subprocess +import logging +from contextlib import contextmanager +from typing import Dict, Generator +import shutil +from google.cloud import storage +from google.auth.credentials import AnonymousCredentials +from test_utils.gcs.data_generator import populate_test_data +from test_utils.gcs.demo_data_generator import create_demo_data +from requests import Session +from google.auth.transport.requests import AuthorizedSession + + +logger = logging.getLogger("gcs_fixture") + +GCS_HOST = os.environ.get("TEST_GCS_HOST", "localhost") +GCS_PORT = int(os.environ.get("TEST_GCS_PORT", 4443)) +GCS_BUCKET_NAME = os.environ.get("TEST_GCS_CONTAINER", "test-gcs") +GCS_ENDPOINT_URL = f"http://{GCS_HOST}:{GCS_PORT}" +GCS_SERVER_NAME = os.environ.get("TEST_GCS_CONTAINER", "test-gcs-server") +GCS_ROOT_USER = "GCS" +GCS_ROOT_PASSWORD = "gcsadmin" + +# Operating system detection +IS_MACOS = os.uname().sysname == 'Darwin' if hasattr(os, 'uname') else False +IS_WINDOWS = os.name == 'nt' + + +def get_gcs_client(): + """Get google.storage client for GCS Server""" + + + session = Session() + session.verify = False + + transport = AuthorizedSession( + AnonymousCredentials() + ) + transport.session = session + + try: + gcs_client = storage.Client( + project="test-project", + credentials= AnonymousCredentials(), + client_options={"api_endpoint": GCS_ENDPOINT_URL}, + _http=transport) + except Exception as e: + logger.error(f"Failed to create GCS client: {e}") + + return gcs_client + +def wait_for_gcs_server(max_retries=30, interval=1): + """Wait for GCS Server to be ready""" + for i in range(max_retries): + try: + client = get_gcs_client() + client.list_buckets() + logger.info("GCS Server is ready") + return True + except Exception: + if i < max_retries - 1: + logger.info("Retrying connection to GCS Server...") + time.sleep(interval) + continue + return False + +def is_container_running(container_name: str) -> bool: + """Check if Fake GCS Server container is already running""" + try: + result = subprocess.run( + ["docker", "ps", "--filter", f"name={container_name}", "--format", "{{.Names}}"], + capture_output=True, + text=True, + check=True + ) + return container_name in result.stdout.strip() + except subprocess.CalledProcessError: + return False + + +def stop_gcs() -> bool: + """Stop the Fake GCS Server container and remove its data volume for a clean shutdown.""" + container_name = GCS_SERVER_NAME + volume_name = f"{container_name}-data" + + if not is_container_running(container_name): + logger.info(f"Container '{container_name}' is not running.") + # Attempt to remove the volume in case it was left orphaned + try: + subprocess.run(["docker", "volume", "rm", volume_name], check=False, capture_output=True) + except Exception: + pass # Ignore errors if volume doesn't exist + return True + + logger.info(f"Stopping and cleaning up container '{container_name}' and volume '{volume_name}'...") + try: + # Stop and remove the container + subprocess.run(["docker", "stop", container_name], check=True, capture_output=True) + subprocess.run(["docker", "rm", container_name], check=True, capture_output=True) + + # Remove the associated volume to clear all data + subprocess.run(["docker", "volume", "rm", volume_name], check=True, capture_output=True) + + logger.info("✅ GCS Server container and data volume successfully removed.") + return True + except subprocess.CalledProcessError as e: + stderr = e.stderr.decode() + if "no such volume" in stderr: + logger.info("Volume was already removed or never created.") + return True + logger.error(f"❌ Failed to clean up GCS Server resources: {stderr}") + return False + + +def create_test_buckets(): + """Create test buckets and populate with sample data""" + client = get_gcs_client() + + # Create test buckets + buckets = ['test-bucket', 'flowfile-test', 'sample-data', 'worker-test-bucket', 'demo-bucket'] + for bucket in buckets: + try: + client.bucket(bucket) + client.create_bucket(bucket) + logger.info(f"Created bucket: {bucket}") + except Exception: + logger.info(f"Could not create Bucket: {bucket}") + + +def is_docker_available() -> bool: + """ + Check if Docker is available on the system. + + Returns: + bool: True if Docker is available and working, False otherwise + """ + # Skip Docker on macOS and Windows in CI + if (IS_MACOS or IS_WINDOWS) and os.environ.get('CI', '').lower() in ('true', '1', 'yes'): + logger.info("Skipping Docker on macOS/Windows in CI environment") + return False + + # If docker executable is not in PATH + if shutil.which("docker") is None: + logger.warning("Docker executable not found in PATH") + return False + + # Try a simple docker command + try: + result = subprocess.run( + ["docker", "info"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=5, + check=False # Don't raise exception on non-zero return code + ) + + if result.returncode != 0: + logger.warning("Docker is not operational") + return False + + return True + except (subprocess.SubprocessError, OSError): + logger.warning("Error running Docker command") + return False + + +def start_gcs() -> bool: + """Start Fake GCS Server container with initialization""" + if is_container_running(GCS_SERVER_NAME): + logger.info(f"Container {GCS_SERVER_NAME} is already running") + return True + try: + # Start GCS Server with volume for persistence + subprocess.run([ + "docker", "run", "-d", + "--name", GCS_SERVER_NAME, + "-p", f"{GCS_PORT}:4443", + "-e", f"GCS_ROOT_USER={GCS_ROOT_USER}", + "-e", f"GCS_ROOT_PASSWORD={GCS_ROOT_PASSWORD}", + "-v", f"{GCS_SERVER_NAME}-data:/data", + "fsouza/fake-gcs-server", "-scheme", "http", "server", "/data", "-data" ,"--console-address", ":4443" + ], check=True) + + # Wait for GCS Server to be ready + if wait_for_gcs_server(): + try: + create_test_buckets() + except Exception as e: + logger.error(f"Failed to create test buckets: {e}") + try: + populate_test_data(endpoint_url=GCS_ENDPOINT_URL, + bucket_name="test-bucket") + except Exception as e: + logger.error(f"Failed to populate test data: {e}") + try: + create_demo_data(endpoint_url=GCS_ENDPOINT_URL, + bucket_name="demo-bucket") + except Exception as e: + logger.error(f"Failed to create demo data: {e}") + return True + return False + + except Exception as e: + logger.error(f"Failed to start GCS Server: {e}") + stop_gcs() + return False + + +@contextmanager +def managed_gcs() -> Generator[Dict[str, any], None, None]: + """Context manager for GCS Server container with full connection info""" + if not start_gcs(): + yield {} + return + + try: + connection_info = { + "endpoint_url": GCS_ENDPOINT_URL, + "host": GCS_HOST, + "port": GCS_PORT, + "console_port": GCS_PORT, + "connection_string": GCS_ENDPOINT_URL, + } + yield connection_info + finally: + # Optionally keep container running for debugging + if os.environ.get("KEEP_GCS_RUNNING", "false").lower() != "true": + stop_gcs()