Skip to content

Commit 1336cbc

Browse files
committed
Optimize dynamic DAG updates to avoid loading large serialized DAGs (#57592)
When updating dynamic DAGs (those without task instances), Airflow was loading the entire `SerializedDagModel` object from the database, which could contain megabytes of JSON data, just to update a few fields which was completely unnecesary. This change replaces the object-loading approach with a direct SQL UPDATE statement, significantly improving performance for deployments with large or frequently-changing dynamic DAGs. The optimization uses SQLAlchemy's update() construct to modify only the necessary columns (_data, _data_compressed, dag_hash) without fetching the existing row, reducing both database load and network transfer. Additionally, removed an unnecessary session.merge() call on dag_version, as the object is already tracked by the session after being loaded. (cherry picked from commit 27c9b94)
1 parent 9c8ae23 commit 1336cbc

File tree

2 files changed

+102
-9
lines changed

2 files changed

+102
-9
lines changed

airflow-core/src/airflow/models/serialized_dag.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
import sqlalchemy_jsonfield
2929
import uuid6
30-
from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, select, tuple_
30+
from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, select, tuple_, update
3131
from sqlalchemy.orm import backref, foreign, relationship
3232
from sqlalchemy.sql.expression import func, literal
3333
from sqlalchemy_utils import UUIDType
@@ -427,14 +427,23 @@ def write_dag(
427427
# This is for dynamic DAGs that the hashes changes often. We should update
428428
# the serialized dag, the dag_version and the dag_code instead of a new version
429429
# if the dag_version is not associated with any task instances
430-
latest_ser_dag = cls.get(dag.dag_id, session=session)
431-
if TYPE_CHECKING:
432-
assert latest_ser_dag is not None
433-
# Update the serialized DAG with the new_serialized_dag
434-
latest_ser_dag._data = new_serialized_dag._data
435-
latest_ser_dag._data_compressed = new_serialized_dag._data_compressed
436-
latest_ser_dag.dag_hash = new_serialized_dag.dag_hash
437-
session.merge(latest_ser_dag)
430+
431+
# Use direct UPDATE to avoid loading the full serialized DAG
432+
result = session.execute(
433+
update(cls)
434+
.where(cls.dag_version_id == dag_version.id)
435+
.values(
436+
{
437+
cls._data: new_serialized_dag._data,
438+
cls._data_compressed: new_serialized_dag._data_compressed,
439+
cls.dag_hash: new_serialized_dag.dag_hash,
440+
}
441+
)
442+
)
443+
444+
if result.rowcount == 0:
445+
# No rows updated - serialized DAG doesn't exist
446+
return False
438447
# The dag_version and dag_code may not have changed, still we should
439448
# do the below actions:
440449
# Update the latest dag version

airflow-core/tests/unit/models/test_serialized_dag.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,3 +544,87 @@ def test_hash_method_removes_fileloc_and_remains_consistent(self):
544544
# Verify that the original data still has fileloc (method shouldn't modify original)
545545
assert "fileloc" in test_data["dag"]
546546
assert test_data["dag"]["fileloc"] == "/different/path/to/dag.py"
547+
548+
def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session):
549+
"""
550+
Test that dynamic DAG update gracefully handles case where SerializedDagModel doesn't exist.
551+
This preserves the null-check fix from PR #56422 and tests the direct UPDATE path.
552+
"""
553+
with dag_maker(dag_id="test_missing_serdag", serialized=True, session=session) as dag:
554+
EmptyOperator(task_id="task1")
555+
556+
# Write the DAG first
557+
lazy_dag = LazyDeserializedDAG.from_dag(dag)
558+
SDM.write_dag(
559+
dag=lazy_dag,
560+
bundle_name="test_bundle",
561+
bundle_version=None,
562+
session=session,
563+
)
564+
session.commit()
565+
566+
# Get the dag_version
567+
dag_version = session.scalar(
568+
select(DagVersion).where(DagVersion.dag_id == "test_missing_serdag").limit(1)
569+
)
570+
assert dag_version is not None
571+
572+
# Manually delete SerializedDagModel (simulates edge case)
573+
session.query(SDM).filter(SDM.dag_id == "test_missing_serdag").delete()
574+
session.commit()
575+
576+
# Verify no SerializedDagModel exists
577+
assert SDM.get("test_missing_serdag", session=session) is None
578+
579+
# Try to update - should return False gracefully (not crash)
580+
result = SDM.write_dag(
581+
dag=lazy_dag,
582+
bundle_name="test_bundle",
583+
bundle_version=None,
584+
min_update_interval=None,
585+
session=session,
586+
)
587+
588+
assert result is False # Should return False when SerializedDagModel is missing
589+
590+
def test_dynamic_dag_update_success(self, dag_maker, session):
591+
"""
592+
Test that dynamic DAG update successfully updates the serialized DAG hash
593+
when no task instances exist.
594+
"""
595+
with dag_maker(dag_id="test_dynamic_success", session=session) as dag:
596+
EmptyOperator(task_id="task1")
597+
598+
# Write the DAG first
599+
lazy_dag = LazyDeserializedDAG.from_dag(dag)
600+
result1 = SDM.write_dag(
601+
dag=lazy_dag,
602+
bundle_name="test_bundle",
603+
bundle_version=None,
604+
session=session,
605+
)
606+
session.commit()
607+
608+
assert result1 is True
609+
initial_sdag = SDM.get("test_dynamic_success", session=session)
610+
assert initial_sdag is not None
611+
initial_hash = initial_sdag.dag_hash
612+
613+
# Modify the DAG (add a task)
614+
EmptyOperator(task_id="task2", dag=dag)
615+
lazy_dag_updated = LazyDeserializedDAG.from_dag(dag)
616+
617+
# Write again - should use UPDATE path (no task instances yet)
618+
result2 = SDM.write_dag(
619+
dag=lazy_dag_updated,
620+
bundle_name="test_bundle",
621+
bundle_version=None,
622+
session=session,
623+
)
624+
session.commit()
625+
626+
# Verify update succeeded
627+
assert result2 is True
628+
updated_sdag = SDM.get("test_dynamic_success", session=session)
629+
assert updated_sdag.dag_hash != initial_hash # Hash should change
630+
assert len(updated_sdag.dag.task_dict) == 2 # Should have 2 tasks now

0 commit comments

Comments
 (0)