diff --git a/dags/data_utils/grist/grist_dump_document.py b/dags/data_utils/grist/grist_dump_document.py index 97b7c10..dc847c9 100644 --- a/dags/data_utils/grist/grist_dump_document.py +++ b/dags/data_utils/grist/grist_dump_document.py @@ -1,18 +1,20 @@ -from typing import Tuple, List, Any, Dict import hashlib +from typing import Any, Dict, List, Tuple + import pandas as pd +from grist_api import GristDocAPI from sqlalchemy.engine import Connection from sqlalchemy.sql import text as sql_text -from grist_api import GristDocAPI -from .grist_types import GristTypes -from .grist_helper import list_grist_tables, sanitize_identifier -from ..postgres_helper import get_postgres_connection +from sqlalchemy.sql.schema import sqlalchemy +from .grist_helper import list_grist_tables, sanitize_identifier +from .grist_types import GristTypes # ========================= # === Internal helpers === # ========================= + def _safe_cname(name: str) -> str: """ Produce a lowercase, Postgres-safe constraint name with max length 63. @@ -35,7 +37,9 @@ def _fetch_grist_columns(api: GristDocAPI, table_id: str) -> List[Dict[str, Any] return columns_response.json()["columns"] -def _build_grist_types(columns: List[Dict[str, Any]], columns_to_explode: List[str]) -> List[GristTypes]: +def _build_grist_types( + columns: List[Dict[str, Any]], columns_to_explode: List[str] +) -> List[GristTypes]: """ Build GristTypes instances with the 'explode' flag set per column id. """ @@ -59,7 +63,9 @@ def _fetch_table_records_df(api: GristDocAPI, table_id: str) -> pd.DataFrame: return df -def _apply_column_transforms(df: pd.DataFrame, column_types: List[GristTypes]) -> Tuple[pd.DataFrame, Dict[str, Any]]: +def _apply_column_transforms( + df: pd.DataFrame, column_types: List[GristTypes] +) -> Tuple[pd.DataFrame, Dict[str, Any]]: """ Apply per-column transformations via GristTypes.modify_df and collect explicit SQL dtypes. Additionally, normalize REF columns (0 -> NULL) and ensure pandas nullable dtype where needed. @@ -81,7 +87,9 @@ def _apply_column_transforms(df: pd.DataFrame, column_types: List[GristTypes]) - # Ensure pandas can carry NULLs (nullable Int64 if numeric-like) try: df[c.id] = pd.to_numeric(df[c.id], errors="ignore") - if pd.api.types.is_integer_dtype(df[c.id]) or pd.api.types.is_float_dtype(df[c.id]): + if pd.api.types.is_integer_dtype( + df[c.id] + ) or pd.api.types.is_float_dtype(df[c.id]): df[c.id] = df[c.id].astype("Int64") except Exception: # Keep object dtype with NAs if conversion fails @@ -100,7 +108,9 @@ def _table_regclass(connection: Connection, schema: str, table_name: str) -> Any ).scalar() -def _prepare_main_table(connection: Connection, schema: str, table_name: str, if_exists: str) -> str: +def _prepare_main_table( + connection: Connection, schema: str, table_name: str, if_exists: str +) -> str: """ Decide the pandas.to_sql behavior for the main table and TRUNCATE when 'replace' is requested and the table already exists. Returns the effective if_exists ("append" or original). @@ -108,15 +118,23 @@ def _prepare_main_table(connection: Connection, schema: str, table_name: str, if main_reg = _table_regclass(connection, schema, table_name) effective = if_exists if if_exists == "replace" and main_reg is not None: - connection.execute(sql_text(f""" + connection.execute( + sql_text(f""" TRUNCATE TABLE {schema}."{table_name}" RESTART IDENTITY CASCADE; - """)) + """) + ) effective = "append" return effective -def _write_main_table(connection: Connection, schema: str, table_name: str, - df: pd.DataFrame, dtype_map: Dict[str, Any], if_exists: str) -> None: +def _write_main_table( + connection: Connection, + schema: str, + table_name: str, + df: pd.DataFrame, + dtype_map: Dict[str, Any], + if_exists: str, +) -> None: """ Write the main DataFrame to Postgres using pandas.to_sql with the provided dtype map. """ @@ -156,7 +174,9 @@ def _pk_ddl(schema: str, table_name: str): """) -def _direct_fk_ddls(schema: str, main_table: str, column_types: List[GristTypes], prefix: str) -> List[Any]: +def _direct_fk_ddls( + schema: str, main_table: str, column_types: List[GristTypes], prefix: str +) -> List[Any]: """ For each REF column, compose an idempotent DO-block to create a FK from main_table(col) to ref_table(id). """ @@ -168,7 +188,8 @@ def _direct_fk_ddls(schema: str, main_table: str, column_types: List[GristTypes] ref_table = f"{prefix}_{ref_base}" fk_name = _safe_cname(f"fk_{main_table}_{c.id}") - ddls.append(sql_text(f""" + ddls.append( + sql_text(f""" DO $$ BEGIN IF NOT EXISTS ( @@ -186,7 +207,8 @@ def _direct_fk_ddls(schema: str, main_table: str, column_types: List[GristTypes] REFERENCES {schema}."{ref_table}" ("id"); END IF; END$$; - """)) + """) + ) return ddls @@ -201,8 +223,8 @@ def _build_exploded_df(df: pd.DataFrame, col_id: str) -> pd.DataFrame: tmp = df[["id", col_id]].copy() child = ( tmp.explode(col_id) - .rename(columns={"id": "source", col_id: target_col}) - .reset_index(drop=True) + .rename(columns={"id": "source", col_id: target_col}) + .reset_index(drop=True) ) else: child = pd.DataFrame(columns=["source", target_col]) @@ -213,7 +235,9 @@ def _build_exploded_df(df: pd.DataFrame, col_id: str) -> pd.DataFrame: return child -def _prepare_exploded_table(connection: Connection, schema: str, table_name: str, if_exists: str) -> str: +def _prepare_exploded_table( + connection: Connection, schema: str, table_name: str, if_exists: str +) -> str: """ Decide pandas.to_sql behavior for an exploded child table and TRUNCATE when 'replace' is requested and the table already exists. Returns effective if_exists. @@ -221,16 +245,24 @@ def _prepare_exploded_table(connection: Connection, schema: str, table_name: str reg = _table_regclass(connection, schema, table_name) effective = if_exists if if_exists == "replace" and reg is not None: - connection.execute(sql_text(f""" + connection.execute( + sql_text(f""" TRUNCATE TABLE {schema}."{table_name}" RESTART IDENTITY; - """)) + """) + ) effective = "append" return effective -def _write_exploded_table(connection: Connection, schema: str, table_name: str, - df_exploded: pd.DataFrame, exploded_sql_type: Any | None, - target_col: str, if_exists: str) -> None: +def _write_exploded_table( + connection: Connection, + schema: str, + table_name: str, + df_exploded: pd.DataFrame, + exploded_sql_type: Any | None, + target_col: str, + if_exists: str, +) -> None: """ Write a child df (exploded) to Postgres using pandas.to_sql. Optionally pass dtype for target column. """ @@ -275,12 +307,15 @@ def _exploded_fk_source(schema: str, exploded_table: str, main_table: str): """) -def _exploded_fk_target(schema: str, exploded_table: str, target_col: str, - ref_table: str): +def _exploded_fk_target( + schema: str, exploded_table: str, target_col: str, ref_table: str +): """ Compose idempotent DO-block for FK: exploded_table(target_) -> ref_table(id). """ - fk_tgt = _safe_cname(f"fk_{exploded_table}_target_{target_col.replace('target_', '')}") + fk_tgt = _safe_cname( + f"fk_{exploded_table}_target_{target_col.replace('target_', '')}" + ) return sql_text(f""" DO $$ BEGIN @@ -306,6 +341,7 @@ def _exploded_fk_target(schema: str, exploded_table: str, target_col: str, # === Public helper (single table + children) # ========================================== + def _dump_grist_table_to_postgres( api: GristDocAPI, connection: Connection, @@ -341,13 +377,17 @@ def _dump_grist_table_to_postgres( # 5) Prepare and write main table (TRUNCATE+append when needed) main_if_exists = _prepare_main_table(connection, schema, main_table_name, if_exists) - _write_main_table(connection, schema, main_table_name, df, dtype_map, main_if_exists) + _write_main_table( + connection, schema, main_table_name, df, dtype_map, main_if_exists + ) # 6) Stage PK DDL pk_constraints.append(_pk_ddl(schema, main_table_name)) # 7) Stage direct FK DDLs - fk_constraints.extend(_direct_fk_ddls(schema, main_table_name, column_types, prefix)) + fk_constraints.extend( + _direct_fk_ddls(schema, main_table_name, column_types, prefix) + ) # 8) Explode refList columns and stage child FKs for c in column_types: @@ -361,19 +401,32 @@ def _dump_grist_table_to_postgres( df_exploded = _build_exploded_df(df, c.id) # Prepare and write exploded table - exploded_if_exists = _prepare_exploded_table(connection, schema, exploded_table_name, if_exists) + exploded_if_exists = _prepare_exploded_table( + connection, schema, exploded_table_name, if_exists + ) exploded_sql_type = getattr(c, "exploded_sql_type", None) _write_exploded_table( - connection, schema, exploded_table_name, - df_exploded, exploded_sql_type, target_col, exploded_if_exists + connection, + schema, + exploded_table_name, + df_exploded, + exploded_sql_type, + target_col, + exploded_if_exists, ) # Stage child FKs: source->parent, optional target->referenced - fk_constraints.append(_exploded_fk_source(schema, exploded_table_name, main_table_name)) + fk_constraints.append( + _exploded_fk_source(schema, exploded_table_name, main_table_name) + ) if c.exploded_ref_table: ref_base = sanitize_identifier(c.exploded_ref_table) ref_table_name = f"{prefix}_{ref_base}" - fk_constraints.append(_exploded_fk_target(schema, exploded_table_name, target_col, ref_table_name)) + fk_constraints.append( + _exploded_fk_target( + schema, exploded_table_name, target_col, ref_table_name + ) + ) print(f"Succesfully dump {grist_table_name}") return pk_constraints, fk_constraints @@ -397,27 +450,32 @@ def drop_tables_with_prefix(connection, schema: str, prefix: str): tables = [row[0] for row in result] if not tables: - print(f"Aucune table à supprimer dans le schéma '{schema}' avec le préfixe '{prefix}'.") + print( + f"Aucune table à supprimer dans le schéma '{schema}' avec le préfixe '{prefix}'." + ) return for table in tables: fq_table_name = f'"{schema}"."{table}"' - drop_query = sql_text(f'DROP TABLE IF EXISTS {fq_table_name} CASCADE') + drop_query = sql_text(f"DROP TABLE IF EXISTS {fq_table_name} CASCADE") connection.execute(drop_query) print(f"Dropped table {fq_table_name}") except Exception as e: - print(f"Erreur lors du drop des tables avec préfixe '{prefix}' dans le schéma '{schema}': {e}") + print( + f"Erreur lors du drop des tables avec préfixe '{prefix}' dans le schéma '{schema}': {e}" + ) # ========================================== # === Dump === # ========================================== + def dump_document_to_postgres( api: GristDocAPI, - connection_name: str, - database: str, + engine: sqlalchemy.engine.Engine, prefix: str, + tables=None, columns_to_explode: List[Tuple[str, str]] | None = None, schema: str = "grist", if_exists: str = "replace", @@ -425,7 +483,20 @@ def dump_document_to_postgres( verify_constraints: bool = False, ) -> None: """ - Two-pass orchestration with transactional commits: + Dump an entire Grist document to a postgres db. + Args: + api: the GristDocApi to use to get the document. + engine: the sql engine to use to dump the databse + database: the postgres namespace where the db will be created / updated + prefix: string that will be appened to each table name. You do not need to include a underscore. + tables: the list of table names to export. If not provided, all tables will be exported + columns_to_explode: list of (table_name, column_name) that will be exploded. + Only a column of type reference list or choice list can be exploded. + When a column is exploded, a new auxiliary table will be created + to represent this one to many relationship. + + + Technically, we use a two-pass orchestration with transactional commits: 1) Create/refresh all physical tables (main + exploded) and COLLECT DDL (PKs, FKs). 2) Execute ALL PK DDLs first (idempotent). 3) Execute ALL FK DDLs after (idempotent). @@ -439,16 +510,18 @@ def dump_document_to_postgres( for t, c in columns_to_explode: explode_map.setdefault(t, []).append(c) - engine = get_postgres_connection(connection_name, database) - with engine.begin() as connection: drop_tables_with_prefix(connection, schema=schema, prefix=prefix) all_pk_ddl: List[Any] = [] all_fk_ddl: List[Any] = [] - with engine.begin() as connection: + if tables is None: grist_tables = list_grist_tables(api, include_metadata=include_metadata) + else: + grist_tables = tables + + with engine.begin() as connection: for table_id in grist_tables: cols_for_this_table = explode_map.get(table_id, []) pk_ddl, fk_ddl = _dump_grist_table_to_postgres( @@ -468,5 +541,3 @@ def dump_document_to_postgres( connection.execute(ddl) for ddl in all_fk_ddl: connection.execute(ddl) - - diff --git a/dags/data_utils/grist/suivi_ca.py b/dags/data_utils/grist/suivi_ca.py deleted file mode 100644 index b4ed757..0000000 --- a/dags/data_utils/grist/suivi_ca.py +++ /dev/null @@ -1,25 +0,0 @@ -from airflow.models import Variable - -from ..postgres_helper import ( - dump_data_to_postgres, - get_postgres_connection, -) -from .grist_helper import fetch_grist_table_data, get_grist_api - -# Retrieve the connection object using Airflow's BaseHook -grist_ca_doc_id = Variable.get("grist_suivi_ca_doc_id") -if not isinstance(grist_ca_doc_id, str): - raise ValueError("grist_suivi_ca_doc_id variable not set") -api = get_grist_api("grist_osp", grist_ca_doc_id) - - -def fetch_and_dump_data(connection_name): - df = fetch_grist_table_data(api, "SUIVI_CLIENTS") - - engine = get_postgres_connection(connection_name, "aggregated_client_data") - connection = engine.connect() - table_name = "grist_suivi_ca" - - dump_data_to_postgres(connection, df, table_name, if_exists="replace") - - connection.close() diff --git a/dags/grist_dump_ca.py b/dags/grist_dump_ca.py deleted file mode 100644 index f509483..0000000 --- a/dags/grist_dump_ca.py +++ /dev/null @@ -1,24 +0,0 @@ -import pendulum -from airflow import DAG -from airflow.operators.python import PythonOperator -from data_utils.alerting.alerting import task_failed -from data_utils.grist.suivi_ca import fetch_and_dump_data - -connection_name="main_db_cluster_name" - -with DAG( - dag_id="grist_suivi_ca", - default_args={"owner": "airflow"}, - schedule="45 0 * * *", - start_date=pendulum.datetime(2024, 11, 15, tz="UTC"), - catchup=False, -) as dag: - fetch_grist_data = PythonOperator( - task_id='fetch_and_dump_grist_data', - python_callable=fetch_and_dump_data, - op_args=[f"{connection_name}"], - dag=dag, - on_failure_callback=task_failed, - ) - - fetch_grist_data \ No newline at end of file diff --git a/dags/grist_marseille_eco_citoyennete.py b/dags/grist_marseille_eco_citoyennete.py index ba85be3..d46f249 100644 --- a/dags/grist_marseille_eco_citoyennete.py +++ b/dags/grist_marseille_eco_citoyennete.py @@ -1,13 +1,13 @@ - import pendulum from airflow import DAG +from airflow.models import Variable from airflow.operators.python import PythonOperator from data_utils.alerting.alerting import task_failed from data_utils.grist.grist_dump_document import dump_document_to_postgres -from data_utils.grist.grist_helper import _get_grist_api -from airflow.models import Variable +from data_utils.grist.grist_helper import get_grist_api +from data_utils.postgres_helper import get_postgres_connection -connection_name="main_db_cluster_name" +connection_name = "main_db_cluster_name" with DAG( dag_id="grist_marseille_eco_citoyennete", @@ -19,13 +19,19 @@ ) as dag: columns_to_explode = [("Mobilite", "Transport")] doc_id = Variable.get("grist_marseille_eco-citoyennete") - api = _get_grist_api("grist_marseille", doc_id) + api = get_grist_api("grist_marseille", doc_id) + engine = get_postgres_connection(connection_name, "marseille") fetch_grist_data = PythonOperator( - task_id='fetch_and_dump_grist_data', + task_id="fetch_and_dump_grist_data", python_callable=dump_document_to_postgres, - op_args=[api, f"{connection_name}", "marseille", "eco_citoyennete", columns_to_explode], + op_kwargs={ + "api": api, + "engine": engine, + "prefix": "eco_citoyennete", + "columns_to_explode": columns_to_explode, + }, dag=dag, on_failure_callback=task_failed, ) - fetch_grist_data \ No newline at end of file + fetch_grist_data diff --git a/dags/grist_suivi_ca.py b/dags/grist_suivi_ca.py new file mode 100644 index 0000000..ec67d40 --- /dev/null +++ b/dags/grist_suivi_ca.py @@ -0,0 +1,36 @@ +import pendulum +from airflow import DAG +from airflow.models import Variable +from airflow.operators.python import PythonOperator +from data_utils.alerting.alerting import task_failed +from data_utils.grist.grist_dump_document import dump_document_to_postgres +from data_utils.grist.grist_helper import get_grist_api +from data_utils.postgres_helper import get_postgres_connection + +connection_name = "main_db_cluster_name" + +with DAG( + dag_id="grist_suivi_ca", + default_args={"owner": "airflow"}, + schedule="45 0 * * *", + start_date=pendulum.datetime(2024, 11, 15, tz="UTC"), + catchup=False, + tags=["grist"], +) as dag: + doc_id = Variable.get("grist_suivi_ca_doc_id") + api = get_grist_api("grist_osp", doc_id) + engine = get_postgres_connection(connection_name, "aggregated_client_data") + fetch_grist_data = PythonOperator( + task_id="fetch_and_dump_grist_data", + python_callable=dump_document_to_postgres, + op_kwargs={ + "api": api, + "engine": engine, + "prefix": "ca", + "tables": ["Clients", "Prestations"], + }, + dag=dag, + on_failure_callback=task_failed, + ) + + fetch_grist_data