Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 116 additions & 45 deletions dags/data_utils/grist/grist_dump_document.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
from typing import Tuple, List, Any, Dict
import hashlib
from typing import Any, Dict, List, Tuple

import pandas as pd
from grist_api import GristDocAPI
from sqlalchemy.engine import Connection
from sqlalchemy.sql import text as sql_text
from grist_api import GristDocAPI
from .grist_types import GristTypes
from .grist_helper import list_grist_tables, sanitize_identifier
from ..postgres_helper import get_postgres_connection
from sqlalchemy.sql.schema import sqlalchemy

from .grist_helper import list_grist_tables, sanitize_identifier
from .grist_types import GristTypes

# =========================
# === Internal helpers ===
# =========================


def _safe_cname(name: str) -> str:
"""
Produce a lowercase, Postgres-safe constraint name with max length 63.
Expand All @@ -35,7 +37,9 @@ def _fetch_grist_columns(api: GristDocAPI, table_id: str) -> List[Dict[str, Any]
return columns_response.json()["columns"]


def _build_grist_types(columns: List[Dict[str, Any]], columns_to_explode: List[str]) -> List[GristTypes]:
def _build_grist_types(
columns: List[Dict[str, Any]], columns_to_explode: List[str]
) -> List[GristTypes]:
"""
Build GristTypes instances with the 'explode' flag set per column id.
"""
Expand All @@ -59,7 +63,9 @@ def _fetch_table_records_df(api: GristDocAPI, table_id: str) -> pd.DataFrame:
return df


def _apply_column_transforms(df: pd.DataFrame, column_types: List[GristTypes]) -> Tuple[pd.DataFrame, Dict[str, Any]]:
def _apply_column_transforms(
df: pd.DataFrame, column_types: List[GristTypes]
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
"""
Apply per-column transformations via GristTypes.modify_df and collect explicit SQL dtypes.
Additionally, normalize REF columns (0 -> NULL) and ensure pandas nullable dtype where needed.
Expand All @@ -81,7 +87,9 @@ def _apply_column_transforms(df: pd.DataFrame, column_types: List[GristTypes]) -
# Ensure pandas can carry NULLs (nullable Int64 if numeric-like)
try:
df[c.id] = pd.to_numeric(df[c.id], errors="ignore")
if pd.api.types.is_integer_dtype(df[c.id]) or pd.api.types.is_float_dtype(df[c.id]):
if pd.api.types.is_integer_dtype(
df[c.id]
) or pd.api.types.is_float_dtype(df[c.id]):
df[c.id] = df[c.id].astype("Int64")
except Exception:
# Keep object dtype with NAs if conversion fails
Expand All @@ -100,23 +108,33 @@ def _table_regclass(connection: Connection, schema: str, table_name: str) -> Any
).scalar()


def _prepare_main_table(connection: Connection, schema: str, table_name: str, if_exists: str) -> str:
def _prepare_main_table(
connection: Connection, schema: str, table_name: str, if_exists: str
) -> str:
"""
Decide the pandas.to_sql behavior for the main table and TRUNCATE when 'replace' is requested
and the table already exists. Returns the effective if_exists ("append" or original).
"""
main_reg = _table_regclass(connection, schema, table_name)
effective = if_exists
if if_exists == "replace" and main_reg is not None:
connection.execute(sql_text(f"""
connection.execute(
sql_text(f"""
TRUNCATE TABLE {schema}."{table_name}" RESTART IDENTITY CASCADE;
"""))
""")
)
effective = "append"
return effective


def _write_main_table(connection: Connection, schema: str, table_name: str,
df: pd.DataFrame, dtype_map: Dict[str, Any], if_exists: str) -> None:
def _write_main_table(
connection: Connection,
schema: str,
table_name: str,
df: pd.DataFrame,
dtype_map: Dict[str, Any],
if_exists: str,
) -> None:
"""
Write the main DataFrame to Postgres using pandas.to_sql with the provided dtype map.
"""
Expand Down Expand Up @@ -156,7 +174,9 @@ def _pk_ddl(schema: str, table_name: str):
""")


def _direct_fk_ddls(schema: str, main_table: str, column_types: List[GristTypes], prefix: str) -> List[Any]:
def _direct_fk_ddls(
schema: str, main_table: str, column_types: List[GristTypes], prefix: str
) -> List[Any]:
"""
For each REF column, compose an idempotent DO-block to create a FK from main_table(col) to ref_table(id).
"""
Expand All @@ -168,7 +188,8 @@ def _direct_fk_ddls(schema: str, main_table: str, column_types: List[GristTypes]
ref_table = f"{prefix}_{ref_base}"
fk_name = _safe_cname(f"fk_{main_table}_{c.id}")

ddls.append(sql_text(f"""
ddls.append(
sql_text(f"""
DO $$
BEGIN
IF NOT EXISTS (
Expand All @@ -186,7 +207,8 @@ def _direct_fk_ddls(schema: str, main_table: str, column_types: List[GristTypes]
REFERENCES {schema}."{ref_table}" ("id");
END IF;
END$$;
"""))
""")
)
return ddls


Expand All @@ -201,8 +223,8 @@ def _build_exploded_df(df: pd.DataFrame, col_id: str) -> pd.DataFrame:
tmp = df[["id", col_id]].copy()
child = (
tmp.explode(col_id)
.rename(columns={"id": "source", col_id: target_col})
.reset_index(drop=True)
.rename(columns={"id": "source", col_id: target_col})
.reset_index(drop=True)
)
else:
child = pd.DataFrame(columns=["source", target_col])
Expand All @@ -213,24 +235,34 @@ def _build_exploded_df(df: pd.DataFrame, col_id: str) -> pd.DataFrame:
return child


def _prepare_exploded_table(connection: Connection, schema: str, table_name: str, if_exists: str) -> str:
def _prepare_exploded_table(
connection: Connection, schema: str, table_name: str, if_exists: str
) -> str:
"""
Decide pandas.to_sql behavior for an exploded child table and TRUNCATE when 'replace'
is requested and the table already exists. Returns effective if_exists.
"""
reg = _table_regclass(connection, schema, table_name)
effective = if_exists
if if_exists == "replace" and reg is not None:
connection.execute(sql_text(f"""
connection.execute(
sql_text(f"""
TRUNCATE TABLE {schema}."{table_name}" RESTART IDENTITY;
"""))
""")
)
effective = "append"
return effective


def _write_exploded_table(connection: Connection, schema: str, table_name: str,
df_exploded: pd.DataFrame, exploded_sql_type: Any | None,
target_col: str, if_exists: str) -> None:
def _write_exploded_table(
connection: Connection,
schema: str,
table_name: str,
df_exploded: pd.DataFrame,
exploded_sql_type: Any | None,
target_col: str,
if_exists: str,
) -> None:
"""
Write a child df (exploded) to Postgres using pandas.to_sql. Optionally pass dtype for target column.
"""
Expand Down Expand Up @@ -275,12 +307,15 @@ def _exploded_fk_source(schema: str, exploded_table: str, main_table: str):
""")


def _exploded_fk_target(schema: str, exploded_table: str, target_col: str,
ref_table: str):
def _exploded_fk_target(
schema: str, exploded_table: str, target_col: str, ref_table: str
):
"""
Compose idempotent DO-block for FK: exploded_table(target_<col>) -> ref_table(id).
"""
fk_tgt = _safe_cname(f"fk_{exploded_table}_target_{target_col.replace('target_', '')}")
fk_tgt = _safe_cname(
f"fk_{exploded_table}_target_{target_col.replace('target_', '')}"
)
return sql_text(f"""
DO $$
BEGIN
Expand All @@ -306,6 +341,7 @@ def _exploded_fk_target(schema: str, exploded_table: str, target_col: str,
# === Public helper (single table + children)
# ==========================================


def _dump_grist_table_to_postgres(
api: GristDocAPI,
connection: Connection,
Expand Down Expand Up @@ -341,13 +377,17 @@ def _dump_grist_table_to_postgres(

# 5) Prepare and write main table (TRUNCATE+append when needed)
main_if_exists = _prepare_main_table(connection, schema, main_table_name, if_exists)
_write_main_table(connection, schema, main_table_name, df, dtype_map, main_if_exists)
_write_main_table(
connection, schema, main_table_name, df, dtype_map, main_if_exists
)

# 6) Stage PK DDL
pk_constraints.append(_pk_ddl(schema, main_table_name))

# 7) Stage direct FK DDLs
fk_constraints.extend(_direct_fk_ddls(schema, main_table_name, column_types, prefix))
fk_constraints.extend(
_direct_fk_ddls(schema, main_table_name, column_types, prefix)
)

# 8) Explode refList columns and stage child FKs
for c in column_types:
Expand All @@ -361,19 +401,32 @@ def _dump_grist_table_to_postgres(
df_exploded = _build_exploded_df(df, c.id)

# Prepare and write exploded table
exploded_if_exists = _prepare_exploded_table(connection, schema, exploded_table_name, if_exists)
exploded_if_exists = _prepare_exploded_table(
connection, schema, exploded_table_name, if_exists
)
exploded_sql_type = getattr(c, "exploded_sql_type", None)
_write_exploded_table(
connection, schema, exploded_table_name,
df_exploded, exploded_sql_type, target_col, exploded_if_exists
connection,
schema,
exploded_table_name,
df_exploded,
exploded_sql_type,
target_col,
exploded_if_exists,
)

# Stage child FKs: source->parent, optional target->referenced
fk_constraints.append(_exploded_fk_source(schema, exploded_table_name, main_table_name))
fk_constraints.append(
_exploded_fk_source(schema, exploded_table_name, main_table_name)
)
if c.exploded_ref_table:
ref_base = sanitize_identifier(c.exploded_ref_table)
ref_table_name = f"{prefix}_{ref_base}"
fk_constraints.append(_exploded_fk_target(schema, exploded_table_name, target_col, ref_table_name))
fk_constraints.append(
_exploded_fk_target(
schema, exploded_table_name, target_col, ref_table_name
)
)

print(f"Succesfully dump {grist_table_name}")
return pk_constraints, fk_constraints
Expand All @@ -397,35 +450,53 @@ def drop_tables_with_prefix(connection, schema: str, prefix: str):
tables = [row[0] for row in result]

if not tables:
print(f"Aucune table à supprimer dans le schéma '{schema}' avec le préfixe '{prefix}'.")
print(
f"Aucune table à supprimer dans le schéma '{schema}' avec le préfixe '{prefix}'."
)
return

for table in tables:
fq_table_name = f'"{schema}"."{table}"'
drop_query = sql_text(f'DROP TABLE IF EXISTS {fq_table_name} CASCADE')
drop_query = sql_text(f"DROP TABLE IF EXISTS {fq_table_name} CASCADE")
connection.execute(drop_query)
print(f"Dropped table {fq_table_name}")
except Exception as e:
print(f"Erreur lors du drop des tables avec préfixe '{prefix}' dans le schéma '{schema}': {e}")
print(
f"Erreur lors du drop des tables avec préfixe '{prefix}' dans le schéma '{schema}': {e}"
)


# ==========================================
# === Dump ===
# ==========================================


def dump_document_to_postgres(
api: GristDocAPI,
connection_name: str,
database: str,
engine: sqlalchemy.engine.Engine,
prefix: str,
tables=None,
columns_to_explode: List[Tuple[str, str]] | None = None,
schema: str = "grist",
if_exists: str = "replace",
include_metadata: bool = False,
verify_constraints: bool = False,
) -> None:
"""
Two-pass orchestration with transactional commits:
Dump an entire Grist document to a postgres db.
Args:
api: the GristDocApi to use to get the document.
engine: the sql engine to use to dump the databse
database: the postgres namespace where the db will be created / updated
prefix: string that will be appened to each table name. You do not need to include a underscore.
tables: the list of table names to export. If not provided, all tables will be exported
columns_to_explode: list of (table_name, column_name) that will be exploded.
Only a column of type reference list or choice list can be exploded.
When a column is exploded, a new auxiliary table will be created
to represent this one to many relationship.


Technically, we use a two-pass orchestration with transactional commits:
1) Create/refresh all physical tables (main + exploded) and COLLECT DDL (PKs, FKs).
2) Execute ALL PK DDLs first (idempotent).
3) Execute ALL FK DDLs after (idempotent).
Expand All @@ -439,16 +510,18 @@ def dump_document_to_postgres(
for t, c in columns_to_explode:
explode_map.setdefault(t, []).append(c)

engine = get_postgres_connection(connection_name, database)

with engine.begin() as connection:
drop_tables_with_prefix(connection, schema=schema, prefix=prefix)

all_pk_ddl: List[Any] = []
all_fk_ddl: List[Any] = []

with engine.begin() as connection:
if tables is None:
grist_tables = list_grist_tables(api, include_metadata=include_metadata)
else:
grist_tables = tables

with engine.begin() as connection:
for table_id in grist_tables:
cols_for_this_table = explode_map.get(table_id, [])
pk_ddl, fk_ddl = _dump_grist_table_to_postgres(
Expand All @@ -468,5 +541,3 @@ def dump_document_to_postgres(
connection.execute(ddl)
for ddl in all_fk_ddl:
connection.execute(ddl)


25 changes: 0 additions & 25 deletions dags/data_utils/grist/suivi_ca.py

This file was deleted.

Loading