Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(scheduler_job_runner/asset): fix how asset dag warning is added #43873

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

Lee-W
Copy link
Member

@Lee-W Lee-W commented Nov 11, 2024

Why

The logic for activating assets is not correct now.

What

The correct logic is

  1. Find the warning that should exist after this round
  2. Delete the warnings that no longer needed
  3. Update the warnings if they already exist and add new warnings if they do not yet exists

Related PR: #43693 (that one fix the logic in the wrong way and is fixed in this PR)


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Nov 11, 2024
@Lee-W Lee-W changed the title fix(scheduler_job_runner/asset): fix how asset dag warnning is added fix(scheduler_job_runner/asset): fix how asset dag warning is added Nov 11, 2024
@Lee-W Lee-W requested a review from uranusjr November 11, 2024 10:49
def _get_first_item(x: Sequence[Any]) -> Any:
return x[0]

warnings_to_have = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uranusjr, I'm not sure whether we want to warn a case like this 🤔 It kinda makes the logic a bit more complicated. The message is just concatenated.

    schedule=(
        Asset(name="asset1_producer", uri="s3://bucket/asset1_producer1")
        | Asset(name="asset1_producer", uri="s3://bucket/asset1_producer2")
        | Asset(name="asset1_producer", uri="s3://bucket/asset1_producer3")
        | Asset(name="asset1_producer", uri="s3://bucket/asset1_producer4")
    ),

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean we should reduce the warnings from 3 to 1?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the existing logic, we only raise the latest one (if I'm not mistaken). In this implementation, the error will look like "Cannot activate asset Asset(...); ... is already associated to ...\nCannot activate asset Asset(...); ... is already associated to ...".

I don't think we can add multiple DagWarning unless we want to remove the unique constriant 🤔

@Lee-W Lee-W added the airflow3.0:candidate Potential candidates for Airflow 3.0 label Nov 11, 2024
The correct logic is
1. Find the warning that should exist after this round
2. Delete the warnings that no longer needed
3. Update the warnings if already exist and add new warnings if not yet exists
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
airflow3.0:candidate Potential candidates for Airflow 3.0 area:Scheduler including HA (high availability) scheduler
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants