Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37153] Monitor late event count in temporal join #25999

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

grzegorz8
Copy link
Contributor

What is the purpose of the change

In temporal join, events on the right side (build side, versioned table) are not discarded even if they arrive late (event timestamp < current watermark), which is generally good.

As far as I understand, late events can sometimes lead to incorrect results. This happens when the latest right-side event is delayed for some reason, and as the watermark progresses, a left-side event gets joined with the most recently known right-side event (stored in Flink's state), which might not be the true latest one.

I believe it would be valuable to have a metric tracking late event count on the right side, just to be aware if such situation might have occurred.

Brief change log

Expose rightNumLateRecords metric in temporal join.

Verifying this change

This change is a trivial rework.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not documented)

@@ -84,6 +85,9 @@ public class TemporalRowTimeJoinOperator extends BaseTwoInputStreamOperatorWithS
private static final String RIGHT_STATE_NAME = "right";
private static final String REGISTERED_TIMER_STATE_NAME = "timer";
private static final String TIMERS_STATE_NAME = "timers";
private static final String RIGHT_LATE_ELEMENTS_METRIC_NAME = "rightNumLateRecords";

private transient Counter rightNumLateRecords;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice that we already have numLateRecordsDropped metric. Is this sufficient?
You are proposing a new metric for a subset of operators (ones with 2 inputs) and only for one side, where all the processing is the same for the left and right sides at the moment.
I wonder if the metric should be when the late record is dropped in line with the existing metric
We would need documentation to be added for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice that we already have numLateRecordsDropped metric. Is this sufficient?

Yes, but not for temporal join. There is numLateRecordsDropped metric in window operators or cep operator.
What is more, late events ARE NOT dropped in this operator, that is why I named it rightNumLateRecordsDropped.

You are proposing a new metric for a subset of operators (ones with 2 inputs) and only for one side, where all the processing is the same for the left and right sides at the moment.

I just wanted to prompt a discussion with this PR :) We can easily add such metric for left side as well.

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 16, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants