From d4432434cc49605fc73a372080f8d289047aa3a7 Mon Sep 17 00:00:00 2001 From: Roc Marshal Date: Fri, 23 Aug 2024 17:28:19 +0800 Subject: [PATCH] Add the test case testExpiredIdRangeQuickUpdateForCleanExpiredEvents --- ...tractJdbcAutoscalerEventHandlerITCase.java | 39 +++++++++++++++++++ .../event/CountableJdbcEventInteractor.java | 14 +++++++ 2 files changed, 53 insertions(+) diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java index e286b5b463..e2b4a9ef15 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/AbstractJdbcAutoscalerEventHandlerITCase.java @@ -446,6 +446,41 @@ void testScalingEventWithParallelismChange() throws Exception { }); } + @Test + void testQuickCleanExpiredEvents() throws Exception { + // Simulate the no-seq ids that is lacking of most of continuous ids. + final Duration ttl = Duration.ofDays(1L); + final int expiredNum = 2; + eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, ttl); + // Init expired records. + initTestingEventHandlerRecords(expiredNum); + + var expiredInstant = jdbcEventInteractor.getCurrentInstant(); + // The clock to clean all expired data. + Clock fixedClock = + Clock.fixed( + expiredInstant.plus(ttl).plus(Duration.ofMillis(1)), + ZoneId.systemDefault()); + jdbcEventInteractor.setClock(fixedClock); + + long maxId = + jdbcEventInteractor.queryMinEventIdByCreateTime( + Timestamp.from(fixedClock.instant())) + + expiredNum + - 1; + try (Connection connection = getConnection(); + PreparedStatement ps = + connection.prepareStatement( + "update t_flink_autoscaler_event_handler set id = ? where id = ?")) { + ps.setLong(1, Long.MAX_VALUE / 2); + ps.setLong(2, maxId); + ps.execute(); + } + + eventHandler.cleanExpiredEvents(); + jdbcEventInteractor.assertDeleteExpiredCounter(2L); + } + private static Stream getExpiredEventHandlersCaseMatrix() { return Stream.of( Arguments.of(false, 128, Duration.ofMinutes(2), 10), @@ -643,6 +678,10 @@ void testCleanExpiredEvents( Duration eventHandlerTtl, int unexpiredRecordsNum) throws Exception {} + + @Disabled("Closed due to the 'LIMIT' clause is not supported in Derby.") + @Override + void testQuickCleanExpiredEvents() throws Exception {} } /** Test {@link JdbcAutoScalerEventHandler} via MySQL 5.6.x. */ diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java index 664a28bdb6..39ccd76cae 100644 --- a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java +++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/event/CountableJdbcEventInteractor.java @@ -20,6 +20,7 @@ import org.apache.flink.autoscaler.event.AutoScalerEventHandler; import java.sql.Connection; +import java.sql.Timestamp; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; @@ -31,12 +32,14 @@ class CountableJdbcEventInteractor extends JdbcEventInteractor { private final AtomicLong queryCounter; private final AtomicLong createCounter; private final AtomicLong updateCounter; + private final AtomicLong deleteExpiredCounter; public CountableJdbcEventInteractor(Connection conn) { super(conn); queryCounter = new AtomicLong(); createCounter = new AtomicLong(); updateCounter = new AtomicLong(); + deleteExpiredCounter = new AtomicLong(); } @Override @@ -64,10 +67,21 @@ public void updateEvent(long id, String message, int eventCount) throws Exceptio super.updateEvent(id, message, eventCount); } + @Override + int deleteExpiredEventsByIdRangeAndDate(long startId, long endId, Timestamp timestamp) + throws Exception { + deleteExpiredCounter.incrementAndGet(); + return super.deleteExpiredEventsByIdRangeAndDate(startId, endId, timestamp); + } + public void assertCounters( long expectedQueryCounter, long expectedUpdateCounter, long expectedCreateCounter) { assertThat(queryCounter).hasValue(expectedQueryCounter); assertThat(updateCounter).hasValue(expectedUpdateCounter); assertThat(createCounter).hasValue(expectedCreateCounter); } + + public void assertDeleteExpiredCounter(long expectedCounter) { + assertThat(deleteExpiredCounter).hasValue(expectedCounter); + } }