Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 106 additions & 10 deletions dags/data_utils/grist/decidim_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from dataclasses import dataclass


@dataclass
class K8sDecidimConfig:
"""
Expand All @@ -30,16 +31,18 @@ class K8sDecidimConfig:
grist_doc_var : str
Airflow Variable name that contains the Grist doc id.
grist_table_var : str
Airflow Variable name that contains the Grist table name.
Airflow Variable name that contains the Grist snapshot table name.
grist_history_table_var : str
Airflow Variable name that contains the Grist history table name.
api_version : str
Kubernetes apiVersion of the Decidim CRD.
kind : str
Kubernetes Kind of the Decidim CRD.
"""

grist_conn_id: str
grist_doc_var: str
grist_table_var: str
grist_history_table_var: str
api_version: str = "apps.libre.sh/v1alpha1"
kind: str = "Decidim"

Expand Down Expand Up @@ -337,11 +340,11 @@ def find_version_changes(df_new: pd.DataFrame, df_old: pd.DataFrame, notify_on_n
Matching key: (Namespace, Name)

Returns columns:
Namespace, Name, Host, Old_Version, New_Version, Ready, Status, LastTransitionTime
Namespace, Name, Host, Image, Old_Version, New_Version, Ready, Status, LastTransitionTime
"""
# Normalize keys and versions to strings
for d in (df_new, df_old):
for c in ("Namespace", "Name", "Version", "Host"):
for c in ("Namespace", "Name", "Version", "Host", "Image"):
if c in d.columns:
d[c] = d[c].astype(str).fillna("").str.strip()
else:
Expand All @@ -357,33 +360,119 @@ def find_version_changes(df_new: pd.DataFrame, df_old: pd.DataFrame, notify_on_n
merged["Old_Version"] = merged["Old_Version"].fillna("")
merged["New_Version"] = merged["Version"].fillna("")

# Changed versions
changed = merged[(merged["_merge"] == "both") & (merged["Old_Version"] != merged["New_Version"])]

# Optionally include brand new platforms
if notify_on_new:
new_rows = merged[merged["_merge"] == "left_only"].copy()
new_rows["Old_Version"] = ""
new_rows["New_Version"] = new_rows["Version"].fillna("")
changed = pd.concat([changed, new_rows], ignore_index=True)

# Ensure presence of fields from df_new
for c in ["Host", "Image", "Ready", "Status", "LastTransitionTime"]:
if c not in merged.columns:
merged[c] = ""

out_cols = [
"Namespace",
"Name",
"Host",
"Image",
"Old_Version",
"New_Version",
"Ready",
"Status",
"LastTransitionTime",
]
# Ensure presence of fields from df_new
for c in ["Host", "Ready", "Status", "LastTransitionTime"]:
if c not in merged.columns:
merged[c] = ""

return changed[out_cols].reset_index(drop=True)

from datetime import datetime, timezone

def append_version_changes_to_grist_history(
api: GristDocAPI,
history_table_name: str,
changes_df: pd.DataFrame,
dt_provider: Callable[[], datetime] = lambda: datetime.now(timezone.utc),
chunk_size: int = 200,
):
"""
Append version-change events into a Grist history table with a DateTime Update_Date.

Update_Date is stored as an ISO string (UTC) for maximal compatibility with Grist API.
Example (display depends on Grist settings): 2025-12-17 14:30
"""
if changes_df.empty:
logging.info("No version changes to append to history.")
return

df = changes_df.copy()

# Timestamp (UTC), rounded to minute -> stable display like "YYYY-MM-DD HH:MM"
now_utc = dt_provider()
if now_utc.tzinfo is None:
now_utc = now_utc.replace(tzinfo=timezone.utc)
now_utc = now_utc.replace(second=0, microsecond=0)

# Store as ISO; Grist will render it as DateTime (often in user timezone)
df["Update_Date"] = now_utc.isoformat()

# Align naming
df["Previous_Version"] = df["Old_Version"].fillna("").astype(str).str.strip()
df["New_Version"] = df["New_Version"].fillna("").astype(str).str.strip()
df["Change_Type"] = df["Previous_Version"].apply(lambda v: "new" if not v else "updated")

wanted = ["Namespace", "Name", "Host", "Image", "Update_Date", "Previous_Version", "New_Version", "Change_Type"]
for c in wanted:
if c not in df.columns:
df[c] = ""
df = df[wanted]

# Key includes Update_Date (minute precision) to avoid duplicates on rerun
key_cols = [
["Namespace", "namespace", "Text"],
["Name", "name", "Text"],
["Update_Date", "update_date", "DateTime"],
["Previous_Version", "previous_version", "Text"],
["New_Version", "new_version", "Text"],
]

other_cols = [
("Host", "host", "Text"),
("Image", "image", "Text"),
("Change_Type", "change_type", "Text"),
]

mapping = {
"namespace": "Namespace",
"name": "Name",
"host": "Host",
"image": "Image",
"update_date": "Update_Date",
"previous_version": "Previous_Version",
"new_version": "New_Version",
"change_type": "Change_Type",
}

from types import SimpleNamespace
safe_df = df.replace({pd.NA: None})
records = []
for _, row in safe_df.iterrows():
payload = {attr: row.get(src) for attr, src in mapping.items()}
records.append(SimpleNamespace(**payload))

api.sync_table(
history_table_name,
records,
key_cols=key_cols,
other_cols=other_cols,
grist_fetch=None,
chunk_size=chunk_size,
filters=None,
)

logging.info("Appended %d history event(s) to '%s'.", len(df), history_table_name)


def send_version_changes_to_n8n(webhook_url: str, changes_df: pd.DataFrame, timeout: int = 10):
"""
Expand Down Expand Up @@ -452,6 +541,7 @@ def collect_and_push_to_grist(cfg: K8sDecidimConfig):

doc_id = Variable.get(cfg.grist_doc_var)
table_name = Variable.get(cfg.grist_table_var)
history_table_name = Variable.get(cfg.grist_history_table_var)

api = GristDocAPI(doc_id, server=grist_server, api_key=grist_api_key)

Expand All @@ -464,6 +554,12 @@ def collect_and_push_to_grist(cfg: K8sDecidimConfig):
changes = find_version_changes(df_new, df_existing, notify_on_new=True)
logging.info("Detected %d version change(s).", len(changes))

if not changes.empty:
try:
append_version_changes_to_grist_history(api, history_table_name, changes)
except Exception as e:
logging.exception("Failed to append history rows to '%s'. Err=%s", history_table_name, e)

webhook_url = Variable.get("n8n_webhook_decidim_version_change", default_var="")
if webhook_url and not changes.empty:
send_version_changes_to_n8n(webhook_url, changes)
Expand Down
1 change: 1 addition & 0 deletions dags/decidim_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def run_collection_and_push():
grist_conn_id="grist_osp",
grist_doc_var="grist_decidim_document_id",
grist_table_var="grist_osp_plateformes_decidim",
grist_history_table_var="grist_osp_plateformes_decidim_history",
api_version="apps.libre.sh/v1alpha1",
kind="Decidim",
)
Expand Down