File tree 3 files changed +27
-2
lines changed
3 files changed +27
-2
lines changed Original file line number Diff line number Diff line change 27
27
import os
28
28
import sys
29
29
import types
30
+ import warnings
30
31
from pathlib import Path
31
32
from typing import TYPE_CHECKING , Any , Iterable
32
33
@@ -431,6 +432,17 @@ def initialize_ti_deps_plugins():
431
432
registered_ti_dep_classes = {}
432
433
433
434
for plugin in plugins :
435
+ if not plugin .ti_deps :
436
+ continue
437
+
438
+ from airflow .exceptions import RemovedInAirflow3Warning
439
+
440
+ warnings .warn (
441
+ "Using custom `ti_deps` on operators has been removed in Airflow 3.0" ,
442
+ RemovedInAirflow3Warning ,
443
+ stacklevel = 1 ,
444
+ )
445
+
434
446
registered_ti_dep_classes .update (
435
447
{qualname (ti_dep .__class__ ): ti_dep .__class__ for ti_dep in plugin .ti_deps }
436
448
)
Original file line number Diff line number Diff line change 28
28
29
29
import pytest
30
30
31
+ from airflow .exceptions import RemovedInAirflow3Warning
31
32
from airflow .hooks .base import BaseHook
32
33
from airflow .listeners .listener import get_listener_manager
33
34
from airflow .plugins_manager import AirflowPlugin
@@ -270,6 +271,17 @@ class AirflowAdminMenuLinksPlugin(AirflowPlugin):
270
271
),
271
272
]
272
273
274
+ def test_deprecate_ti_deps (self ):
275
+ class DeprecatedTIDeps (AirflowPlugin ):
276
+ name = "ti_deps"
277
+
278
+ ti_deps = [mock .MagicMock ()]
279
+
280
+ with mock_plugin_manager (plugins = [DeprecatedTIDeps ()]), pytest .warns (RemovedInAirflow3Warning ):
281
+ from airflow import plugins_manager
282
+
283
+ plugins_manager .initialize_ti_deps_plugins ()
284
+
273
285
def test_should_not_warning_about_fab_plugins (self , caplog ):
274
286
class AirflowAdminViewsPlugin (AirflowPlugin ):
275
287
name = "test_admin_views_plugin"
Original file line number Diff line number Diff line change @@ -1612,7 +1612,7 @@ class DummyTask(BaseOperator):
1612
1612
) as dag :
1613
1613
DummyTask (task_id = "task1" )
1614
1614
1615
- with pytest .raises (SerializationError ):
1615
+ with pytest .raises (SerializationError ), pytest . warns ( RemovedInAirflow3Warning ) :
1616
1616
SerializedBaseOperator .serialize_operator (dag .task_dict ["task1" ])
1617
1617
1618
1618
def test_error_on_unregistered_ti_dep_deserialization (self ):
@@ -1644,7 +1644,8 @@ class DummyTask(BaseOperator):
1644
1644
with DAG (dag_id = "test_serialize_custom_ti_deps" , schedule = None , start_date = execution_date ) as dag :
1645
1645
DummyTask (task_id = "task1" )
1646
1646
1647
- serialize_op = SerializedBaseOperator .serialize_operator (dag .task_dict ["task1" ])
1647
+ with pytest .warns (RemovedInAirflow3Warning ):
1648
+ serialize_op = SerializedBaseOperator .serialize_operator (dag .task_dict ["task1" ])
1648
1649
1649
1650
assert serialize_op ["deps" ] == [
1650
1651
"airflow.ti_deps.deps.mapped_task_upstream_dep.MappedTaskUpstreamDep" ,
You can’t perform that action at this time.
0 commit comments