-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Airflow 2 to 3 auto migration rules #41641
Comments
The |
I pinned the issue - this way it will show up at the top of "Issues" list in the repo |
Thanks! |
We can just go over all the significant newsfragments and create a rule for them or keep some reasoning why it doesn't require one |
We should add something for the public API change too. API v1 won't work anymore. Those are being changed as part of AIP-84 to a new FastApi based app. GitHub project for it: https://github.com/orgs/apache/projects/414 |
Issue here to regroup Rest API breaking changes #43378 |
I have started prototyping a small package based on LibCST to build a Python 2to3 like tool for Airflow 2to3 that does simple and straight forward replacements. My main motivation was around lot of our users in our Airflow instance using Dags
Operators
Sample file import datetime
from airflow import DAG
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.timetables.events import EventsTimetable
with DAG(
dag_id="my_dag_name",
default_view="tree",
start_date=datetime.datetime(2021, 1, 1),
schedule_interval="@daily",
concurrency=2,
):
op = EmptyOperator(
task_id="task", task_concurrency=1, trigger_rule="none_failed_or_skipped"
)
@dag(
default_view="graph",
start_date=datetime.datetime(2021, 1, 1),
schedule_interval=EventsTimetable(event_dates=[datetime.datetime(2022, 4, 5)]),
max_active_tasks=2,
full_filepath="/tmp/test_dag.py"
)
def my_decorated_dag():
op = EmptyOperator(task_id="task")
my_decorated_dag() Sample usage python -m libcst.tool codemod dag_fixer.DagFixerCommand -u 1 tests/test_dag.py
Calculating full-repo metadata...
Executing codemod...
reformatted -
All done! ✨ 🍰 ✨
1 file reformatted.
--- /home/karthikeyan/stuff/python/libcst-tut/tests/test_dag.py
+++ /home/karthikeyan/stuff/python/libcst-tut/tests/test_dag.py
@@ -10,6 +10,6 @@
dag_id="my_dag_name",
- default_view="tree",
+ default_view="grid",
start_date=datetime.datetime(2021, 1, 1),
- schedule_interval="@daily",
- concurrency=2,
+ schedule="@daily",
+ max_active_tasks=2,
):
@@ -23,5 +23,4 @@
start_date=datetime.datetime(2021, 1, 1),
- schedule_interval=EventsTimetable(event_dates=[datetime.datetime(2022, 4, 5)]),
+ schedule=EventsTimetable(event_dates=[datetime.datetime(2022, 4, 5)]),
max_active_tasks=2,
- full_filepath="/tmp/test_dag.py"
)
Finished codemodding 1 files!
- Transformed 1 files successfully.
- Skipped 0 files.
- Failed to codemod 0 files.
- 0 warnings were generated. |
NICE! @tirkarthi -> you should start a thread about it at devlist and propose adding it to the repo. The sooner we start working on it and let poeple test it, the better it will be. And we can already start adding not only the newsfragments but also rules to the migration tools (cc: @vikramkoka @kaxil ) - we can even think about keeping a database of old-way-dags and running such migration tool on them and letting airflow scheduler from Airflow 3 process them (and maybe even execute) as part of our CI. This would tremendously help with maintaining and updating such a tool if we will make it a part of our CI pipeline. |
BTW. I like it a lot how simple it is with libCST - we previously used quite a bit more complex tool from Facebook that allowed to do refactoring at scale in parallell (https://github.com/facebookincubator/Bowler) , but it was rather brittle to develop rules for it and it had some weird problems and missing features. One thing that was vere useful - is that it had a nice "parallelism" features - which allowed to refactor 1000s of files in seconds (but also made it difficult to debug). I think if we get it working with libCST - it will be way more generic and maintainable, also we can easily add parallelism later on when/if we see it is slow. |
One small watchout though - such a tool should have a way to isolate rules - so that they are not in a single big method - some abstraction that will allow us to easily develop and selectively apply (or skip) different rules - see https://github.com/apache/airflow/tree/v1-10-test/airflow/upgrade where we have documentation and information about the upgrade check we've done in Airflow 1 -> 2 migration. Also we have to discuss, whether it should be a separate repo or whether it should be in airflow's monorepo. Both have pros and cons - in 1.10 we chose to keep it 1.10 branch of airflow, because it imported some of the airflow code and it was easier, but we could likely create a new repo for it, add CI there and keep it there. We even have this archived repo https://github.com/apache/airflow-upgrade-check which we never used and archived, we could re-open it. We also have https://pypi.org/project/apache-airflow-upgrade-check/ - package in PyPI - and we could release new upgrade check versions (2.* ?) with "apache-airflow>=2.11.0" as dependency. All that should likely be discussed at devlist :) |
Thanks @potiuk for the details. I will start a discussion on this at the devlist and continue there. Bowler looks interesting. Using
|
Don't be deceived by it :). It was helpful for Provider's migration at some point in time, but I had many rough edges - like debugging a problem was a nightmare until we learned how to do it properly, also it had some annoying limitations - you had to learn a completely new non-standard abstractions (an SQLAlchemy-like DSL to perform modifications) - which did not cover all the refactorings we wanted to do. We had to really dig-deep into the code an find some workarounds for things we wanted to do, when authors of Bowler have not thoght about them. And sometimes those were nasty workarounds. query = (
Query(<paths to modify>)
.select_function("old_name")
.rename("new_name")
.diff(interactive=True)
) Example that I remember above is that we could not rename some of the object types easily because it was not "foreseen" (can't remember exactly) - we had a few surprises there. Also Bowler seems to be not maintained for > 3 years and it means that it's unlikely to handle some constructs even in 3.9+ Airflow. What I like about libcst is that it is really "low-level" interface that you have to program in Python rather than in abstract DSL - similar to "ast". You write actual python code to perform what you want to perform rather than rely on incomplete abstractions, even if you have to copy&paste rename code between different "rules" (for example) (which you can then abstract away as 'common` util if you need, so no big deal). |
BTW. Codemod .... is also 5 years not maintained. Not that it is disqualification - but they list |
I tried to use libcst in airflow as a tiny POC of this issue here
|
👀 👀 Me ❤️ it (but I doubt we want to invest in it as it might be difficult to maintain, unless we find quite a few committers who are somewhat ruff profficient to at least be able to review the code) . But it's tempting I must admit. But to be honest - while I'd love to finally get a serious rust project, it's not worth it I think we are talking of one-time migration for even a 10.000 dags it will take at most single minutes and we can turn it maybe in under one minute with rust - so not a big gain for a lot of pain :) . Or at lest this is what my intuition tells me. I think parallelism will do the job nicely. My intuition tells me (but this is just intuition and understanding on some limits ans speed of certain operation) - that we will get from multiple 10s of minutes (when running such migration sequentially) to single minutes when we allow to run migration in parallel using multiple processors and processes - even with Python and libcst. This task is really suitable for such parallelisation because each file is complete, independent task that can be run in complete isolation from all other tasks - so spawning multiple paralllel interpreters, ideally forking them right after all the imports and common code is loaded so that they use shared memory for those - this should do the job nicely (at least intuitively). Using RUST for that might be classic premature optimisation - we might likely not need it :). But would be worth to make some calculations and get some "numbers" for big installation - i.e. how many dags of what size are out there, and how long it will be to parse them all with libcst and write back (even unmodified or with a simple modification). I presume that parsing and writing back will be the bulk of the job - and modifications will add very little overhead as they will be mostly operating on in memory data structures. |
Yep, totally agree. I just want to raise this idea which might be interesting. 👀
Yep, I think you're right. My previous default deferrable script took around 10 sec to process ~400 operators. Using ast for checking took around 1 sec |
Description
Why
As we're introducing breaking changes to the main branch, it would be better to begin recording the things we could use migration tools to help our users migrate from Airflow 2 to 3.
The breaking changes can be found at https://github.com/apache/airflow/pulls?q=is%3Apr+label%3Aairflow3.0%3Abreaking.
Rules
airflow.sensors.external_task.ExternalTaskSensorLink
→airflow.sensors.external_task.ExternalDagLink
ExternalTaskSensorLink
#41391 (comment)airflow.models.taskMixin.TaskMixin
→airflow.models.taskMixin.DependencyMixin
TaskMixin
class #41394airflow.contrib.*
contrib
#41366airflow.models.ImportError
->airflow.models.errors.ParseImportError
ImportError
fromairflow.models
#41367Related issues
No response
Are you willing to submit a PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: