Skip to content

Conversation

treff7es
Copy link
Contributor

The DataHub Airflow plugin is preventing task instance history records from being saved to Airflow's database for failed tasks that get retried in Airflow 2.10.5. This causes task retries to not appear in the main Airflow UI, which relies on these history records.

Symptoms:

  • Task retries are not visible in Airflow UI
  • TaskInstanceHistory.record_ti() is called successfully but records don't get saved to database
  • When DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD=false, severe errors occur:
    RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!")

Root Cause

The issue is caused by database session sharing between Airflow's internal operations and the DataHub plugin listener hooks.

When Airflow calls listener hooks like on_task_instance_failed(), it passes a SQLAlchemy session parameter that provides direct access to Airflow's active database session. The DataHub plugin
receiving this session object can cause transaction interference with Airflow's strict transaction boundaries, preventing TaskInstanceHistory.record_ti() from committing successfully.

In threaded mode: Session passed to worker threads causes cross-thread database access issues.

In non-threaded mode: Plugin runs synchronously and accessing the shared session triggers Airflow's HA lock protection errors.

Solution

This PR implements type-based SQLAlchemy session filtering in the run_in_thread decorator to prevent the DataHub plugin from accessing Airflow's database session context.

Key Changes

Before (problematic):

def safe_execution(*args, **kwargs):
# Plugin receives and can access Airflow's session
f(*args, **kwargs) # session parameter passed through

After (fixed):

def safe_execution(*args, **kwargs):
from sqlalchemy.orm import Session

  # Filter out SQLAlchemy session objects by type
  filtered_args = tuple(arg for arg in args if not isinstance(arg, Session))
  filtered_kwargs = {k: v for k, v in kwargs.items() if not isinstance(v, Session)}

  f(*filtered_args, **filtered_kwargs)  # No session access

Why This Works

  1. Transaction Isolation: Prevents plugin from accessing Airflow's database session, eliminating transaction conflicts
  2. Position Independent: Works regardless of parameter names or positions
  3. Preserves Functionality: Plugin still receives task_instance, dag_run, etc. for metadata extraction
  4. Mode Agnostic: Fixes issues in both threaded and non-threaded execution modes
  5. Future-Proof: Type checking is robust against Airflow API changes

Safety Analysis

What the plugin still receives (needed for functionality):

  • task_instance → task metadata (task_id, dag_id, etc.)
  • previous_state → state transition information
  • dag_run → DAG execution context

What gets filtered out (causes conflicts):

  • session → direct SQLAlchemy database access

The DataHub plugin doesn't perform direct SQL operations requiring Airflow's session - it only needs metadata from the task/DAG objects for DataHub API calls.

Testing

  • Type-based filtering correctly identifies SQLAlchemy sessions
  • Fallback mechanism works if SQLAlchemy import fails
  • Plugin functionality preserved (metadata extraction works)
  • Compatible with both threading modes

Expected Impact

After this fix:

  • ✅ TaskInstanceHistory.record_ti() completes successfully
  • ✅ Task retry history appears in Airflow UI
  • ✅ No "UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!" errors
  • ✅ DataHub plugin continues normal metadata extraction
  • ✅ Works with both DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD=true/false

References

@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Sep 10, 2025
@datahub-cyborg datahub-cyborg bot added the needs-review Label for PRs that need review from a maintainer. label Sep 10, 2025
Copy link

codecov bot commented Sep 10, 2025

❌ 10 Tests Failed:

Tests completed Failed Passed Skipped
3760 10 3750 48
View the top 3 failed test(s) by shortest run time
tests.integration.test_plugin::test_airflow_plugin[v2_basic_iolets]
Stack Traces | 37.6s run time
Metadata files differ (use `pytest --update-golden-files` to update):
Urn removed, urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)
Urn removed, urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)
Urn removed, urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableE,PROD)
Urn removed, urn:li:dataset:(urn:li:dataPlatform:snowflake,cloud.mydb.schema.tableC,PROD)
Urn removed, urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableB,DEV)
Urn removed, urn:li:dataProcessInstance:5d666eaf9015a31b3e305e8bc2dba078
Urn removed, urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)

Urn changed, urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.basic_iolets,prod),run_data_task):
<globalTags> removed
<ownership> removed
<dataJobInputOutput> removed
<dataPlatformInstance> removed
<dataJobInfo> removed
tests.integration.test_plugin::test_airflow_plugin[v2_datahub_emitter_operator_jinja_template_dag]
Stack Traces | 38s run time
Metadata files differ (use `pytest --update-golden-files` to update):
{'iterable_item_removed': {'root[10]': {'aspect': {'json': {'fineGrainedLineages': [],
                                                            'inputDatajobs': [],
                                                            'inputDatasets': [],
                                                            'outputDatasets': []}},
                                        'aspectName': 'dataJobInputOutput',
                                        'changeType': 'UPSERT',
                                        'entityType': 'dataJob',
                                        'entityUrn': 'urn:li:dataJob:(urn:li:dataFlow:(airflow,datahub_emitter_operator_jinja_template_dag,prod),datahub_emitter_operator_jinja_template_dag_task)'},
                           'root[11]': {'aspect': {'json': {'lastModified': {'actor': 'urn:li:corpuser:airflow',
                                                                             'time': 0},
                                                            'ownerTypes': {},
                                                            'owners': [{'owner': 'urn:li:corpuser:airflow',
                                                                        'source': {'type': 'SERVICE'},
                                                                        'type': 'DEVELOPER'}]}},
                                        'aspectName': 'ownership',
                                        'changeType': 'UPSERT',
                                        'entityType': 'dataJob',
                                        'entityUrn': 'urn:li:dataJob:(urn:li:dataFlow:(airflow,datahub_emitter_operator_jinja_template_dag,prod),datahub_emitter_operator_jinja_template_dag_task)'},
                           'root[12]': {'aspect': {'json': {'tags': [{'tag': 'urn:li:tag:example_tag'}]}},
                                        'aspectName': 'globalTags',
                                        'changeType': 'UPSERT',
                                        'entityType': 'dataJob',
                                        'entityUrn': 'urn:li:dataJob:(urn:li:dataFlow:(airflow,datahub_emitter_operator_jinja_template_dag,prod),datahub_emitter_operator_jinja_template_dag_task)'},
                           'root[13]': {'aspect': {'json': {'created': {'actor': 'urn:li:corpuser:datahub',
                                                                        'time': 1743622429751},
                                                            'customProperties': {'dag_id': 'datahub_emitter_operator_jinja_template_dag',
                                                                                 'duration': '<duration>',
                                                                                 'end_date': '<end_date>',
                                                                                 'execution_date': '2023-09-27 '
                                                                                                   '21:34:38+00:00',
                                                                                 'external_executor_id': 'None',
                                                                                 'log_url': 'http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=datahub_emitter_operator_jinja_template_dag_task&dag_id=datahub_emitter_operator_jinja_template_dag&map_index=-1',
                                                                                 'max_tries': '0',
                                                                                 'operator': 'DatahubEmitterOperator',
                                                                                 'orchestrator': 'airflow',
                                                                                 'priority_weight': '1',
                                                                                 'run_id': 'manual_run_test',
                                                                                 'start_date': '<start_date>',
                                                                                 'state': 'running',
                                                                                 'task_id': 'datahub_emitter_operator_jinja_template_dag_task',
                                                                                 'try_number': '0'},
                                                            'externalUrl': 'http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=datahub_emitter_operator_jinja_template_dag_task&dag_id=datahub_emitter_operator_jinja_template_dag&map_index=-1',
                                                            'name': 'datahub_emitter_operator_jinja_template_dag_datahub_emitter_operator_jinja_template_dag_task_manual_run_test',
                                                            'type': 'BATCH_AD_HOC'}},
                                        'aspectName': 'dataProcessInstanceProperties',
                                        'changeType': 'UPSERT',
                                        'entityType': 'dataProcessInstance',
                                        'entityUrn': 'urn:li:dataProcessInstance:c3af9aae54b864f2542b99a50e0402e3'},
                           'root[14]': {'aspect': {'json': {'parentTemplate': 'urn:li:dataJob:(urn:li:dataFlow:(airflow,datahub_emitter_operator_jinja_template_dag,prod),datahub_emitter_operator_jinja_template_dag_task)',
                                                            'upstreamInstances': []}},
                                        'aspectName': 'dataProcessInstanceRelationships',
                                        'changeType': 'UPSERT',
                                        'entityType': 'dataProcessInstance',
                                        'entityUrn': 'urn:li:dataProcessInstance:c3af9aae54b864f2542b99a50e0402e3'},
                           'root[15]': {'aspect': {'json': {'attempt': 1,
                                                            'partitionSpec': {'partition': 'FULL_TABLE_SNAPSHOT',
                                                                              'type': 'FULL_TABLE'},
                                                            'status': 'STARTED',
                                                            'timestampMillis': 1743622429751}},
                                        'aspectName': 'dataProcessInstanceRunEvent',
                                        'changeType': 'UPSERT',
                                        'entityType': 'dataProcessInstance',
                                        'entityUrn': 'urn:li:dataProcessInstance:c3af9aae54b864f2542b99a50e0402e3'},
                           'root[25]': {'aspect': {'json': {'partitionSpec': {'partition': 'FULL_TABLE_SNAPSHOT',
                                                                              'type': 'FULL_TABLE'},
                                                            'result': {'nativeResultType': 'airflow',
                                                                       'type': 'SUCCESS'},
                                                            'status': 'COMPLETE',
                                                            'timestampMillis': 1743622430015}},
                                        'aspectName': 'dataProcessInstanceRunEvent',
                                        'changeType': 'UPSERT',
                                        'entityType': 'dataProcessInstance',
                                        'entityUrn': 'urn:li:dataProcessInstance:c3af9aae54b864f2542b99a50e0402e3'},
                           'root[8]': {'aspect': {'json': {'customProperties': {'depends_on_past': 'False',
                                                                                'downstream_task_ids': '[]',
                                                                                'email': "['[email protected]']",
                                                                                'execution_timeout': 'datetime.timedelta(seconds=300)',
                                                                                'inlets': '[]',
                                                                                'label': "'datahub_emitter_operator_jinja_template_dag_task'",
                                                                                'openlineage_run_facet_unknownSourceAttribute': '{"_producer": '
                                                                                                                                '"https://github..../OpenLineage/tree/1.30.1/integration/airflow", '
                                                                                                                                '"_schemaURL": '
                                                                                                                                '"https://openlineage..../spec/2-0-2/OpenLineage.json#/$defs/BaseFacet", '
                                                                                                                                '"unknownItems": '
                                                                                                                                '[{"name": '
                                                                                                                                '"DatahubEmitterOperator", '
                                                                                                                                '"properties": '
                                                                                                                                '{"depends_on_past": '
                                                                                                                                'false, '
                                                                                                                                '"downstream_task_ids": '
                                                                                                                                '"[]", '
                                                                                                                                '"execution_timeout": '
                                                                                                                                '"<<non-serializable: '
                                                                                                                                'timedelta>>", '
                                                                                                                                '"executor_config": '
                                                                                                                                '{}, '
                                                                                                                                '"ignore_first_depends_on_past": '
                                                                                                                                'true, '
                                                                                                                                '"mapped": '
                                                                                                                                'false, '
                                                                                                                                '"operator_class": '
                                                                                                                                '"datahub_airflow_plugin.operators.datahub.DatahubEmitterOperator", '
                                                                                                                                '"owner": '
                                                                                                                                '"airflow", '
                                                                                                                                '"priority_weight": '
                                                                                                                                '1, '
                                                                                                                                '"queue": '
                                                                                                                                '"default", '
                                                                                                                                '"retries": '
                                                                                                                                '0, '
                                                                                                                                '"retry_exponential_backoff": '
                                                                                                                                'false, '
                                                                                                                                '"task_id": '
                                                                                                                                '"datahub_emitter_operator_jinja_template_dag_task", '
                                                                                                                                '"trigger_rule": '
                                                                                                                                '"all_success", '
                                                                                                                                '"upstream_task_ids": '
                                                                                                                                '"[]", '
                                                                                                                                '"wait_for_downstream": '
                                                                                                                                'false, '
                                                                                                                                '"weight_rule": '
                                                                                                                                '"downstream"}, '
                                                                                                                                '"type": '
                                                                                                                                '"operator"}]}',
                                                                                'outlets': '[]',
                                                                                'sla': 'None',
                                                                                'task_id': "'datahub_emitter_operator_jinja_template_dag_task'",
                                                                                'trigger_rule': '<TriggerRule.ALL_SUCCESS: '
                                                                                                "'all_success'>",
                                                                                'wait_for_downstream': 'False'},
                                                           'env': 'PROD',
                                                           'externalUrl': 'http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=datahub_emitter_operator_jinja_template_dag&_flt_3_task_id=datahub_emitter_operator_jinja_template_dag_task',
                                                           'name': 'datahub_emitter_operator_jinja_template_dag_task',
                                                           'type': {'string': 'COMMAND'}}},
                                       'aspectName': 'dataJobInfo',
                                       'changeType': 'UPSERT',
                                       'entityType': 'dataJob',
                                       'entityUrn': 'urn:li:dataJob:(urn:li:dataFlow:(airflow,datahub_emitter_operator_jinja_template_dag,prod),datahub_emitter_operator_jinja_template_dag_task)'}}}
tests.integration.test_plugin::test_airflow_plugin[v2_custom_operator_dag]
Stack Traces | 38.3s run time
Metadata files differ (use `pytest --update-golden-files` to update):
Urn removed, urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableB,PROD)
Urn removed, urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)
Urn removed, urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)
Urn removed, urn:li:dataProcessInstance:07a4aaeffa3875a24cccd1fec6fc7c8c

Urn changed, urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.custom_operator_dag,prod),custom_task_id):
<globalTags> removed
<ownership> removed
<dataJobInputOutput> removed
2nd <dataJobInputOutput> removed
<dataPlatformInstance> removed
<dataJobInfo> removed
2nd <dataJobInfo> removed
tests.integration.test_plugin::test_airflow_plugin[v2_custom_operator_sql_parsing]
Stack Traces | 39.2s run time
Metadata files differ (use `pytest --update-golden-files` to update):
Urn removed, urn:li:dataProcessInstance:e9904feabede38cc4dc2bc74e51c6d77
Urn removed, urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.default.my_output_table,PROD)
Urn removed, urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.default.my_input_table,PROD)

Urn changed, urn:li:dataJob:(urn:li:dataFlow:(airflow,custom_operator_sql_parsing,prod),transform_cost_table):
<globalTags> removed
<ownership> removed
<dataJobInputOutput> removed
2nd <dataJobInputOutput> removed
<dataJobInfo> removed
2nd <dataJobInfo> removed
tests.integration.test_plugin::test_airflow_plugin[v2_snowflake_operator]
Stack Traces | 40.4s run time
Metadata files differ (use `pytest --update-golden-files` to update):
Urn removed, urn:li:dataset:(urn:li:dataPlatform:snowflake,datahub_test_database.datahub_test_schema.processed_costs,PROD)
Urn removed, urn:li:dataset:(urn:li:dataPlatform:snowflake,datahub_test_database.datahub_test_schema.costs,PROD)
Urn removed, urn:li:dataProcessInstance:3161034cc84e16a7c5e1906225734747

Urn changed, urn:li:dataJob:(urn:li:dataFlow:(airflow,snowflake_operator,prod),transform_cost_table):
<globalTags> removed
<ownership> removed
<dataJobInputOutput> removed
<dataJobInfo> removed
tests.integration.test_plugin::test_airflow_plugin[v2_bigquery_insert_job_operator]
Stack Traces | 40.4s run time
Metadata files differ (use `pytest --update-golden-files` to update):
Urn removed, urn:li:dataset:(urn:li:dataPlatform:bigquery,test_project.test_dataset.processed_costs,PROD)
Urn removed, urn:li:dataset:(urn:li:dataPlatform:bigquery,test_project.test_dataset.costs,PROD)
Urn removed, urn:li:dataProcessInstance:ad5cf6e007be4b127a8729275f118c3b
Urn removed, urn:li:dataProcessInstance:c2b6bc237a75fe628ce6bf2c282b0e3c

Urn changed, urn:li:dataJob:(urn:li:dataFlow:(airflow,bigquery_insert_job_operator,prod),select_with_destination_config):
<globalTags> removed
<ownership> removed
<dataJobInputOutput> removed
<dataJobInfo> removed

Urn changed, urn:li:dataJob:(urn:li:dataFlow:(airflow,bigquery_insert_job_operator,prod),insert_query_without_destination):
<globalTags> removed
<ownership> removed
<dataJobInputOutput> removed
<dataJobInfo> removed
tests.integration.test_plugin::test_airflow_plugin[v2_athena_operator]
Stack Traces | 40.5s run time
Metadata files differ (use `pytest --update-golden-files` to update):
Urn removed, urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.processed_costs,PROD)
Urn removed, urn:li:dataProcessInstance:9cd4fbcec3a50a4988ffc5841beaf0ad
Urn removed, urn:li:dataset:(urn:li:dataPlatform:athena,athena_db.costs,PROD)

Urn changed, urn:li:dataJob:(urn:li:dataFlow:(airflow,athena_operator,prod),transform_cost_table):
<globalTags> removed
<ownership> removed
<dataJobInputOutput> removed
<dataJobInfo> removed
tests.integration.test_plugin::test_airflow_plugin[v2_simple_dag_no_datajob_lineage]
Stack Traces | 41.3s run time
Metadata files differ (use `pytest --update-golden-files` to update):
Urn removed, urn:li:dataProcessInstance:888f71b79d9a0b162fe44acad7b2c2ae
Urn removed, urn:li:dataProcessInstance:fdbbbcd638bc0e91bbf8d7775efbecaf

Urn changed, urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),run_another_data_task):
<globalTags> removed
<ownership> removed
<dataPlatformInstance> removed
<dataJobInfo> removed

Urn changed, urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),task_1):
<globalTags> removed
<ownership> removed
<dataPlatformInstance> removed
<dataJobInfo> removed
tests.integration.test_plugin::test_airflow_plugin[v2_simple_dag]
Stack Traces | 43.7s run time
Metadata files differ (use `pytest --update-golden-files` to update):
Urn removed, urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)
Urn removed, urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)
Urn removed, urn:li:dataProcessInstance:888f71b79d9a0b162fe44acad7b2c2ae
Urn removed, urn:li:dataProcessInstance:fdbbbcd638bc0e91bbf8d7775efbecaf
Urn removed, urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)

Urn changed, urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),run_another_data_task):
<globalTags> removed
<ownership> removed
<dataJobInputOutput> removed
<dataPlatformInstance> removed
<dataJobInfo> removed

Urn changed, urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),task_1):
<globalTags> removed
<ownership> removed
<dataJobInputOutput> removed
<dataPlatformInstance> removed
<dataJobInfo> removed
tests.integration.test_plugin::test_airflow_plugin[v2_sqlite_operator]
Stack Traces | 48.3s run time
Metadata files differ (use `pytest --update-golden-files` to update):
Urn removed, urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)
Urn removed, urn:li:dataProcessInstance:07285de22276959612189d51336cc21a
Urn removed, urn:li:dataProcessInstance:fbeed1180fa0434e02ac6f75ace87869
Urn removed, urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)
Urn removed, urn:li:dataProcessInstance:bab908abccf3cd6607b50fdaf3003372
Urn removed, urn:li:dataProcessInstance:64e5ff8f552e857b607832731e09808b
Urn removed, urn:li:dataProcessInstance:04e1badac1eacd1c41123d07f579fa92

Urn changed, urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.sqlite_operator,prod),populate_cost_table):
<globalTags> removed
<ownership> removed
<dataJobInputOutput> removed
<dataPlatformInstance> removed
<dataJobInfo> removed

Urn changed, urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.sqlite_operator,prod),cleanup_processed_costs):
<globalTags> removed
<ownership> removed
<dataJobInputOutput> removed
<dataPlatformInstance> removed
<dataJobInfo> removed

Urn changed, urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.sqlite_operator,prod),cleanup_costs):
<globalTags> removed
<ownership> removed
<dataJobInputOutput> removed
<dataPlatformInstance> removed
<dataJobInfo> removed

Urn changed, urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.sqlite_operator,prod),transform_cost_table):
<globalTags> removed
<ownership> removed
<dataJobInputOutput> removed
<dataPlatformInstance> removed
<dataJobInfo> removed

Urn changed, urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.sqlite_operator,prod),create_cost_table):
<globalTags> removed
<ownership> removed
<dataJobInputOutput> removed
<dataPlatformInstance> removed
<dataJobInfo> removed

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ingestion PR or Issue related to the ingestion of metadata needs-review Label for PRs that need review from a maintainer.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant