Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions airflow-core/src/airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import sqlalchemy_jsonfield
import uuid6
from sqlalchemy import ForeignKey, LargeBinary, String, select, tuple_
from sqlalchemy import ForeignKey, LargeBinary, String, select, tuple_, update
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, backref, foreign, joinedload, relationship
from sqlalchemy.sql.expression import func, literal
Expand Down Expand Up @@ -434,14 +434,23 @@ def write_dag(
# This is for dynamic DAGs that the hashes changes often. We should update
# the serialized dag, the dag_version and the dag_code instead of a new version
# if the dag_version is not associated with any task instances
latest_ser_dag = cls.get(dag.dag_id, session=session)
if not latest_ser_dag:

# Use direct UPDATE to avoid loading the full serialized DAG
result = session.execute(
update(cls)
.where(cls.dag_version_id == dag_version.id)
.values(
{
cls._data: new_serialized_dag._data,
cls._data_compressed: new_serialized_dag._data_compressed,
cls.dag_hash: new_serialized_dag.dag_hash,
}
)
)

if result.rowcount == 0:
# No rows updated - serialized DAG doesn't exist
return False
# Update the serialized DAG with the new_serialized_dag
latest_ser_dag._data = new_serialized_dag._data
latest_ser_dag._data_compressed = new_serialized_dag._data_compressed
latest_ser_dag.dag_hash = new_serialized_dag.dag_hash
session.merge(latest_ser_dag)
# The dag_version and dag_code may not have changed, still we should
# do the below actions:
# Update the latest dag version
Expand Down
84 changes: 84 additions & 0 deletions airflow-core/tests/unit/models/test_serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,3 +544,87 @@ def test_hash_method_removes_fileloc_and_remains_consistent(self):
# Verify that the original data still has fileloc (method shouldn't modify original)
assert "fileloc" in test_data["dag"]
assert test_data["dag"]["fileloc"] == "/different/path/to/dag.py"

def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session):
"""
Test that dynamic DAG update gracefully handles case where SerializedDagModel doesn't exist.
This preserves the null-check fix from PR #56422 and tests the direct UPDATE path.
"""
with dag_maker(dag_id="test_missing_serdag", serialized=True, session=session) as dag:
EmptyOperator(task_id="task1")

# Write the DAG first
lazy_dag = LazyDeserializedDAG.from_dag(dag)
SDM.write_dag(
dag=lazy_dag,
bundle_name="test_bundle",
bundle_version=None,
session=session,
)
session.commit()

# Get the dag_version
dag_version = session.scalar(
select(DagVersion).where(DagVersion.dag_id == "test_missing_serdag").limit(1)
)
assert dag_version is not None

# Manually delete SerializedDagModel (simulates edge case)
session.query(SDM).filter(SDM.dag_id == "test_missing_serdag").delete()
session.commit()

# Verify no SerializedDagModel exists
assert SDM.get("test_missing_serdag", session=session) is None

# Try to update - should return False gracefully (not crash)
result = SDM.write_dag(
dag=lazy_dag,
bundle_name="test_bundle",
bundle_version=None,
min_update_interval=None,
session=session,
)

assert result is False # Should return False when SerializedDagModel is missing

def test_dynamic_dag_update_success(self, dag_maker, session):
"""
Test that dynamic DAG update successfully updates the serialized DAG hash
when no task instances exist.
"""
with dag_maker(dag_id="test_dynamic_success", session=session) as dag:
EmptyOperator(task_id="task1")

# Write the DAG first
lazy_dag = LazyDeserializedDAG.from_dag(dag)
result1 = SDM.write_dag(
dag=lazy_dag,
bundle_name="test_bundle",
bundle_version=None,
session=session,
)
session.commit()

assert result1 is True
initial_sdag = SDM.get("test_dynamic_success", session=session)
assert initial_sdag is not None
initial_hash = initial_sdag.dag_hash

# Modify the DAG (add a task)
EmptyOperator(task_id="task2", dag=dag)
lazy_dag_updated = LazyDeserializedDAG.from_dag(dag)

# Write again - should use UPDATE path (no task instances yet)
result2 = SDM.write_dag(
dag=lazy_dag_updated,
bundle_name="test_bundle",
bundle_version=None,
session=session,
)
session.commit()

# Verify update succeeded
assert result2 is True
updated_sdag = SDM.get("test_dynamic_success", session=session)
assert updated_sdag.dag_hash != initial_hash # Hash should change
assert len(updated_sdag.dag.task_dict) == 2 # Should have 2 tasks now