diff --git a/dags/data_utils/grist/decidim_info.py b/dags/data_utils/grist/decidim_info.py index 6cc0bf5..f5a9a42 100644 --- a/dags/data_utils/grist/decidim_info.py +++ b/dags/data_utils/grist/decidim_info.py @@ -18,6 +18,7 @@ from dataclasses import dataclass + @dataclass class K8sDecidimConfig: """ @@ -30,16 +31,18 @@ class K8sDecidimConfig: grist_doc_var : str Airflow Variable name that contains the Grist doc id. grist_table_var : str - Airflow Variable name that contains the Grist table name. + Airflow Variable name that contains the Grist snapshot table name. + grist_history_table_var : str + Airflow Variable name that contains the Grist history table name. api_version : str Kubernetes apiVersion of the Decidim CRD. kind : str Kubernetes Kind of the Decidim CRD. """ - grist_conn_id: str grist_doc_var: str grist_table_var: str + grist_history_table_var: str api_version: str = "apps.libre.sh/v1alpha1" kind: str = "Decidim" @@ -337,11 +340,11 @@ def find_version_changes(df_new: pd.DataFrame, df_old: pd.DataFrame, notify_on_n Matching key: (Namespace, Name) Returns columns: - Namespace, Name, Host, Old_Version, New_Version, Ready, Status, LastTransitionTime + Namespace, Name, Host, Image, Old_Version, New_Version, Ready, Status, LastTransitionTime """ # Normalize keys and versions to strings for d in (df_new, df_old): - for c in ("Namespace", "Name", "Version", "Host"): + for c in ("Namespace", "Name", "Version", "Host", "Image"): if c in d.columns: d[c] = d[c].astype(str).fillna("").str.strip() else: @@ -357,33 +360,119 @@ def find_version_changes(df_new: pd.DataFrame, df_old: pd.DataFrame, notify_on_n merged["Old_Version"] = merged["Old_Version"].fillna("") merged["New_Version"] = merged["Version"].fillna("") - # Changed versions changed = merged[(merged["_merge"] == "both") & (merged["Old_Version"] != merged["New_Version"])] - # Optionally include brand new platforms if notify_on_new: new_rows = merged[merged["_merge"] == "left_only"].copy() new_rows["Old_Version"] = "" new_rows["New_Version"] = new_rows["Version"].fillna("") changed = pd.concat([changed, new_rows], ignore_index=True) + # Ensure presence of fields from df_new + for c in ["Host", "Image", "Ready", "Status", "LastTransitionTime"]: + if c not in merged.columns: + merged[c] = "" + out_cols = [ "Namespace", "Name", "Host", + "Image", "Old_Version", "New_Version", "Ready", "Status", "LastTransitionTime", ] - # Ensure presence of fields from df_new - for c in ["Host", "Ready", "Status", "LastTransitionTime"]: - if c not in merged.columns: - merged[c] = "" return changed[out_cols].reset_index(drop=True) +from datetime import datetime, timezone + +def append_version_changes_to_grist_history( + api: GristDocAPI, + history_table_name: str, + changes_df: pd.DataFrame, + dt_provider: Callable[[], datetime] = lambda: datetime.now(timezone.utc), + chunk_size: int = 200, +): + """ + Append version-change events into a Grist history table with a DateTime Update_Date. + + Update_Date is stored as an ISO string (UTC) for maximal compatibility with Grist API. + Example (display depends on Grist settings): 2025-12-17 14:30 + """ + if changes_df.empty: + logging.info("No version changes to append to history.") + return + + df = changes_df.copy() + + # Timestamp (UTC), rounded to minute -> stable display like "YYYY-MM-DD HH:MM" + now_utc = dt_provider() + if now_utc.tzinfo is None: + now_utc = now_utc.replace(tzinfo=timezone.utc) + now_utc = now_utc.replace(second=0, microsecond=0) + + # Store as ISO; Grist will render it as DateTime (often in user timezone) + df["Update_Date"] = now_utc.isoformat() + + # Align naming + df["Previous_Version"] = df["Old_Version"].fillna("").astype(str).str.strip() + df["New_Version"] = df["New_Version"].fillna("").astype(str).str.strip() + df["Change_Type"] = df["Previous_Version"].apply(lambda v: "new" if not v else "updated") + + wanted = ["Namespace", "Name", "Host", "Image", "Update_Date", "Previous_Version", "New_Version", "Change_Type"] + for c in wanted: + if c not in df.columns: + df[c] = "" + df = df[wanted] + + # Key includes Update_Date (minute precision) to avoid duplicates on rerun + key_cols = [ + ["Namespace", "namespace", "Text"], + ["Name", "name", "Text"], + ["Update_Date", "update_date", "DateTime"], + ["Previous_Version", "previous_version", "Text"], + ["New_Version", "new_version", "Text"], + ] + + other_cols = [ + ("Host", "host", "Text"), + ("Image", "image", "Text"), + ("Change_Type", "change_type", "Text"), + ] + + mapping = { + "namespace": "Namespace", + "name": "Name", + "host": "Host", + "image": "Image", + "update_date": "Update_Date", + "previous_version": "Previous_Version", + "new_version": "New_Version", + "change_type": "Change_Type", + } + + from types import SimpleNamespace + safe_df = df.replace({pd.NA: None}) + records = [] + for _, row in safe_df.iterrows(): + payload = {attr: row.get(src) for attr, src in mapping.items()} + records.append(SimpleNamespace(**payload)) + + api.sync_table( + history_table_name, + records, + key_cols=key_cols, + other_cols=other_cols, + grist_fetch=None, + chunk_size=chunk_size, + filters=None, + ) + + logging.info("Appended %d history event(s) to '%s'.", len(df), history_table_name) + def send_version_changes_to_n8n(webhook_url: str, changes_df: pd.DataFrame, timeout: int = 10): """ @@ -452,6 +541,7 @@ def collect_and_push_to_grist(cfg: K8sDecidimConfig): doc_id = Variable.get(cfg.grist_doc_var) table_name = Variable.get(cfg.grist_table_var) + history_table_name = Variable.get(cfg.grist_history_table_var) api = GristDocAPI(doc_id, server=grist_server, api_key=grist_api_key) @@ -464,6 +554,12 @@ def collect_and_push_to_grist(cfg: K8sDecidimConfig): changes = find_version_changes(df_new, df_existing, notify_on_new=True) logging.info("Detected %d version change(s).", len(changes)) + if not changes.empty: + try: + append_version_changes_to_grist_history(api, history_table_name, changes) + except Exception as e: + logging.exception("Failed to append history rows to '%s'. Err=%s", history_table_name, e) + webhook_url = Variable.get("n8n_webhook_decidim_version_change", default_var="") if webhook_url and not changes.empty: send_version_changes_to_n8n(webhook_url, changes) diff --git a/dags/decidim_info.py b/dags/decidim_info.py index b1edbe1..8325a16 100644 --- a/dags/decidim_info.py +++ b/dags/decidim_info.py @@ -29,6 +29,7 @@ def run_collection_and_push(): grist_conn_id="grist_osp", grist_doc_var="grist_decidim_document_id", grist_table_var="grist_osp_plateformes_decidim", + grist_history_table_var="grist_osp_plateformes_decidim_history", api_version="apps.libre.sh/v1alpha1", kind="Decidim", )