From 37241bce72db6788141798ca01889e5443de07ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Ko=C5=82akowski?= Date: Thu, 16 Jan 2025 13:12:01 +0100 Subject: [PATCH] [FLINK-37153] Monitor late event count in temporal join Expose rightNumLateRecords metric. --- .../join/temporal/TemporalRowTimeJoinOperator.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java index 940e15c2b4d03..94d2b20e5f6ba 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.operators.InternalTimer; @@ -84,6 +85,9 @@ public class TemporalRowTimeJoinOperator extends BaseTwoInputStreamOperatorWithS private static final String RIGHT_STATE_NAME = "right"; private static final String REGISTERED_TIMER_STATE_NAME = "timer"; private static final String TIMERS_STATE_NAME = "timers"; + private static final String RIGHT_LATE_ELEMENTS_METRIC_NAME = "rightNumLateRecords"; + + private transient Counter rightNumLateRecords; private final boolean isLeftOuterJoin; private final InternalTypeInfo leftType; @@ -174,6 +178,9 @@ public void open() throws Exception { outRow = new JoinedRowData(); rightNullRow = new GenericRowData(rightType.toRowType().getFieldCount()); collector = new TimestampedCollector<>(output); + + this.rightNumLateRecords = + getRuntimeContext().getMetricGroup().counter(RIGHT_LATE_ELEMENTS_METRIC_NAME); } @Override @@ -194,6 +201,10 @@ public void processElement2(StreamRecord element) throws Exception { registerSmallestTimer(rowTime); // Timer to clean up the state registerProcessingCleanupTimer(); + + if (rowTime < timerService.currentWatermark()) { + rightNumLateRecords.inc(); + } } @Override