Replies: 2 comments 3 replies
-
This looks like your "something" clashes with scheduler while running "initdb". It looks like you are doing something that you should absolutely not do - which is running Why are you doing this? |
Beta Was this translation helpful? Give feedback.
3 replies
-
I have same issue when running this on MWAA airflow. I created my dags via decorators. Were you able to have this resolved? @rdjouder |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Hello,
Almost same use case as "dynamic dag creation race condition".
Adding details, refreshing/recycling discussion thread.
Using Airflow 2.5.2, postgresql 13, python 3.8, (max_tis_per_query = 256) running two schedulers instances, dags are created dynamically from dedicated psql table. Number of DAGs may reach 20k. When creating too many (300), dags at a time, Scheduler is experiencing race conditions during DAG serialisation and restarts:
[SQL: INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_parsed_time, last_pickled, last_expired, scheduler_lock, pickle_id, fileloc, processor_subdir, owners, description, default_view, schedule_interval, timetable_description, max_active_tasks, max_active_runs, has_task_concurrency_limits, has_import_errors, next_dagrun, next_dagrun_data_interval_start, next_dagrun_data_interval_end, next_dagrun_create_after) VALUES (%(dag_id)s, %(root_dag_id)s, %(is_paused)s, %(is_subdag)s, %(is_active)s, %(last_parsed_time)s, %(last_pickled)s, %(last_expired)s, %(scheduler_lock)s, %(pickle_id)s, %(fileloc)s, %(processor_subdir)s, %(owners)s, %(description)s, %(default_view)s, %(schedule_interval)s, %(timetable_description)s, %(max_active_tasks)s, %(max_active_runs)s, %(has_task_concurrency_limits)s, %(has_import_errors)s, %(next_dagrun)s, %(next_dagrun_data_interval_start)s, %(next_dagrun_data_interval_end)s, %(next_dagrun_create_after)s)]
[parameters: ({'dag_id': ...]
(Background on this error at: https://sqlalche.me/e/14/gkpj)
ERROR [airflow.models.dagbag.DagBag] Failed to write serialized DAG: /usr/local/airflow/dags/dynamic_dag_creator_chunk2.py
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 8, in
sys.exit(main())
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/main.py", line 48, in main
args.func(args)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 52, in command
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/db_command.py", line 37, in initdb
db.initdb()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 75, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/db.py", line 697, in initdb
upgradedb(session=session)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/db.py", line 1588, in upgradedb
_reserialize_dags(session=session)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/db.py", line 840, in _reserialize_dags
dagbag.sync_to_db(session=session)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dagbag.py", line 639, in sync_to_db
...
sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "dag_pkey"
DETAIL: Key (dag_id)=(PredictiveThreshold_os.cpu.load_XXX_0) already exists.
Seeing 4/5 occurrences of this kind of error then Scheduler restarts.
Is there a way to tune my scheduler config to prevent such issues ?
This issue does not occur when DAGs are submitted at a lower pace, one DAG every 0.1s.
DAGs parameters are submitted through REST service, which back-pressure framework would you recommend ?
Beta Was this translation helpful? Give feedback.
All reactions