Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37153] Monitor late event count in temporal join #25999

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.InternalTimer;
Expand Down Expand Up @@ -84,6 +85,9 @@ public class TemporalRowTimeJoinOperator extends BaseTwoInputStreamOperatorWithS
private static final String RIGHT_STATE_NAME = "right";
private static final String REGISTERED_TIMER_STATE_NAME = "timer";
private static final String TIMERS_STATE_NAME = "timers";
private static final String RIGHT_LATE_ELEMENTS_METRIC_NAME = "rightNumLateRecords";

private transient Counter rightNumLateRecords;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice that we already have numLateRecordsDropped metric. Is this sufficient?
You are proposing a new metric for a subset of operators (ones with 2 inputs) and only for one side, where all the processing is the same for the left and right sides at the moment.
I wonder if the metric should be when the late record is dropped in line with the existing metric
We would need documentation to be added for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice that we already have numLateRecordsDropped metric. Is this sufficient?

Yes, but not for temporal join. There is numLateRecordsDropped metric in window operators or cep operator.
What is more, late events ARE NOT dropped in this operator, that is why I named it rightNumLateRecordsDropped.

You are proposing a new metric for a subset of operators (ones with 2 inputs) and only for one side, where all the processing is the same for the left and right sides at the moment.

I just wanted to prompt a discussion with this PR :) We can easily add such metric for left side as well.


private final boolean isLeftOuterJoin;
private final InternalTypeInfo<RowData> leftType;
Expand Down Expand Up @@ -174,6 +178,9 @@ public void open() throws Exception {
outRow = new JoinedRowData();
rightNullRow = new GenericRowData(rightType.toRowType().getFieldCount());
collector = new TimestampedCollector<>(output);

this.rightNumLateRecords =
getRuntimeContext().getMetricGroup().counter(RIGHT_LATE_ELEMENTS_METRIC_NAME);
}

@Override
Expand All @@ -194,6 +201,10 @@ public void processElement2(StreamRecord<RowData> element) throws Exception {
registerSmallestTimer(rowTime); // Timer to clean up the state

registerProcessingCleanupTimer();

if (rowTime < timerService.currentWatermark()) {
rightNumLateRecords.inc();
}
}

@Override
Expand Down