Skip to content

Conversation

Bennylave
Copy link
Collaborator

@Bennylave Bennylave commented Aug 26, 2025

Related issue: #100

The goal of this PR is to allow for connectivity to Google Cloud from Flowfile as it was done before with AWS.

The tool we intend to use for testing purposes is Fake GCS Server

Bernardo Fernandes and others added 30 commits August 5, 2025 13:11
* adding landing page

* adding docs

* Fix the docs refs and simplify FlowGraph ID input

* Improving structure

* stashing documentation

* Minor clean up that I found during writing documentation

* stashing changes

* Adding doc strings

* Fixing breaking change

* Improving consistency in local execution

* Fixing issue with complex expressions in group by

* Add missing str conversions and handle complex group columns

* Add full documentation

* Adding the mkdocstrings to pyproject

* adding documentatino

* fix the response code to fix tests

* Fix missing reference in landing

* remove breakpoint

* updating the index.md

* Fix add links to design-concepts.md and remove test code

* add manual trigger of documentation workflow

* Improving documentation and links

* change order mermaid from td to lr"

* removing duplicate workflow_dispatch

* adding pyproject to mkdocs

* improving documentation

* adding clarification

* improved introduction

* small typo

* restructure of documentation

* fixing the structure

* Change complete restructure of documentation

* Adding index pages

* fixing links

* Finetuning documentation

* Removing sort from documentation

* Fix documentation quickstart.md
* Fix small issues in logging and missing functions

* fixing layout quickstart and index.html

* Minor improvements full local and documentation

* Small bug fixes

* Check if reverting the settings resolves failing tests

* quick fix for introduced failing tests by improvement

* bumping version

* ensure example data is set in single process worker

* Improve test so it better represents
* Fixing handling of pivot

* Fixing test for pivot table
* Adding print_tree method to FlowGraph

* Adding method print_tree() to FlowGraph

* Adding method print_tree() to FlowGraph

* Changes to node ordering in print_tree method

* Changes to node ordering in print_tree method

* Changes to node ordering of print_tree

* Changes to node ordering of print_tree

* retrigger checks

* Adding tests for print_tree method

* Adding tests for print_tree method

* Adding tests for print_tree method

* Adding tests for print_tree method

* Adding tests for print_tree method

* Adding tests for print_tree method

* Adding tests for print_tree method

* Adding tests for print_tree method

* Adding tests for print_tree method

* Merging skip_nodes and determine_execution_order

* Merging skip_nodes and determine_execution_order

* Merging skip_nodes and determine_execution_order

* Merging skip_nodes and determine_execution_order

* Merging skip_nodes and determine_execution_order

* Merging skip_nodes and determine_execution_order

* Merging skip_nodes and determine_execution_order

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Refactoring graph_tree method

---------

Co-authored-by: Bernardo Fernandes <[email protected]>
* Migrating to pl-fuzzy-frame-match

* adding fuzzy match

* Adding fuzzy match method to flowgraph

* Schema callback changes in fuzzy match

* Fixing tests and increasing overlap between generator and flowfile

* Adapted pl-fuzzy-frame-match changes in branch

* fix issue with test

* adding prints to the test to debug

* Make the schema_callback.py threadsafe and the object in fuzzy matching as well.

* remove warning in _handle_fuzzy_match

* increasing version fuzzy frame match

* reverting change in the execution

* Improve threading and order fuzzy match results based on incoming data
* Adding test file for print_tree

* Small refactor that adds types to functions
* removing auto to improve maintainability

* Ensure the offload per worker is determined per graph and there is no dependency on a global variable.

* Small improvement in logging

* Removing global change in tests

* skipping test in docker
* Fix small issues in logging and missing functions

* fixing layout quickstart and index.html

* Minor improvements full local and documentation

* Small bug fixes

* Check if reverting the settings resolves failing tests

* quick fix for introduced failing tests by improvement

* bumping version

* ensure example data is set in single process worker

* Improve test so it better represents
* Fixing handling of pivot

* Fixing test for pivot table
* Adding print_tree method to FlowGraph

* Adding method print_tree() to FlowGraph

* Adding method print_tree() to FlowGraph

* Changes to node ordering in print_tree method

* Changes to node ordering in print_tree method

* Changes to node ordering of print_tree

* Changes to node ordering of print_tree

* retrigger checks

* Adding tests for print_tree method

* Adding tests for print_tree method

* Adding tests for print_tree method

* Adding tests for print_tree method

* Adding tests for print_tree method

* Adding tests for print_tree method

* Adding tests for print_tree method

* Adding tests for print_tree method

* Adding tests for print_tree method

* Merging skip_nodes and determine_execution_order

* Merging skip_nodes and determine_execution_order

* Merging skip_nodes and determine_execution_order

* Merging skip_nodes and determine_execution_order

* Merging skip_nodes and determine_execution_order

* Merging skip_nodes and determine_execution_order

* Merging skip_nodes and determine_execution_order

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Adding new graph_tree method with tests

* Refactoring graph_tree method

---------

Co-authored-by: Bernardo Fernandes <[email protected]>
* Migrating to pl-fuzzy-frame-match

* adding fuzzy match

* Adding fuzzy match method to flowgraph

* Schema callback changes in fuzzy match

* Fixing tests and increasing overlap between generator and flowfile

* Adapted pl-fuzzy-frame-match changes in branch

* fix issue with test

* adding prints to the test to debug

* Make the schema_callback.py threadsafe and the object in fuzzy matching as well.

* remove warning in _handle_fuzzy_match

* increasing version fuzzy frame match

* reverting change in the execution

* Improve threading and order fuzzy match results based on incoming data
* Adding test file for print_tree

* Small refactor that adds types to functions
* removing auto to improve maintainability

* Ensure the offload per worker is determined per graph and there is no dependency on a global variable.

* Small improvement in logging

* Removing global change in tests

* skipping test in docker
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think something went wrong when merging with master. There is some code duplication

GCS_BUCKET_NAME = os.environ.get("TEST_GCS_CONTAINER", "test-gcs")
GCS_ENDPOINT_URL = f"http://{GCS_HOST}:{GCS_PORT}"
GCS_SERVER_NAME = os.environ.get("TEST_GCS_CONTAINER", "test-gcs-server")
GCS_ROOT_USER = "GCS"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GCS_BUCKET_NAME and GCS_SERVER_NAME refer to the same env var, intentional?
Also, the test-gcs is the default, but not created as a bucket.

blob.upload_from_filename(current_metadata_filename.encode(), content_type=guess_content_type(metadata_content))

# 4. version-hint.text (some Iceberg readers look for this)
blob = bucket.bloc(f"{metadata_dir}/version-hint.text")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a typo? Must be bucket.blob

arrow_table = df.to_arrow()

_create_delta_lake_table(gcs_client, arrow_table, bucket_name)
_create_iceberg_table(df, bucket_name, endpoint_url, gcs_client)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing argument. Can you make this keyword argument? It's pretty hard to debug which one is missing




def _create_delta_lake_table(gcs_client, arrow_table: pa.Table, bucket_name: str):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not work; you should first create the deltalake table locally and next upload it.

def _create_delta_lake_table(gcs_client, arrow_table: pa.Table, bucket_name: str):
    """Creates Delta Lake table by writing locally then uploading to GCS."""
    import tempfile
    import os
    logger.info("Writing Delta Lake table via local filesystem...")

    with tempfile.TemporaryDirectory() as temp_dir:
        # Write Delta table to local filesystem (this will work)
        local_delta_path = f"{temp_dir}/delta-table"

        # Use local filesystem - no storage_options needed
        write_deltalake(local_delta_path, arrow_table, mode='overwrite')
        logger.info(f"✅ Delta table created locally at: {local_delta_path}")

        # Upload all Delta Lake files to GCS
        bucket = gcs_client.bucket(bucket_name)
        uploaded_files = []
        breakpoint()
        for root, dirs, files in os.walk(local_delta_path):
            for file in files:
                local_file_path = os.path.join(root, file)
                rel_path = os.path.relpath(local_file_path, local_delta_path)
                gcs_path = f"delta-lake-table/{rel_path}"

                blob = bucket.blob(gcs_path)
                blob.upload_from_filename(local_file_path)
                uploaded_files.append(gcs_path)
                logger.info(f"Uploaded: {gcs_path}")

        logger.info(f"✅ Delta Lake table uploaded to GCS with {len(uploaded_files)} files")

The nicest thing would be if you could use the same logic as is used in S3 data generator. But I couldn't get that to work. Which can be problematic for testing, since the gcs implementation in Polars also relies on deltalake (delta-rs) library.

Perhaps you can try without anonymous access and provide a credential. I will also check it out further.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants