Skip to content

Commit

Permalink
Add the test case for quick cleaning.
Browse files Browse the repository at this point in the history
  • Loading branch information
RocMarshal committed Aug 23, 2024
1 parent d5f1f35 commit 63edf33
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,41 @@ void testScalingEventWithParallelismChange() throws Exception {
});
}

@Test
void testQuickCleanExpiredEvents() throws Exception {
// Simulate the no-seq ids that is lacking of most of continuous ids.
final Duration ttl = Duration.ofDays(1L);
final int expiredNum = 2;
eventHandler = new JdbcAutoScalerEventHandler<>(jdbcEventInteractor, ttl);
// Init expired records.
initTestingEventHandlerRecords(expiredNum);

var expiredInstant = jdbcEventInteractor.getCurrentInstant();
// The clock to clean all expired data.
Clock fixedClock =
Clock.fixed(
expiredInstant.plus(ttl).plus(Duration.ofMillis(1)),
ZoneId.systemDefault());
jdbcEventInteractor.setClock(fixedClock);

long maxId =
jdbcEventInteractor.queryMinEventIdByCreateTime(
Timestamp.from(fixedClock.instant()))
+ expiredNum
- 1;
try (Connection connection = getConnection();
PreparedStatement ps =
connection.prepareStatement(
"update t_flink_autoscaler_event_handler set id = ? where id = ?")) {
ps.setLong(1, Long.MAX_VALUE / 2);
ps.setLong(2, maxId);
ps.execute();
}

eventHandler.cleanExpiredEvents();
jdbcEventInteractor.assertDeleteExpiredCounter(2L);
}

private static Stream<Arguments> getExpiredEventHandlersCaseMatrix() {
return Stream.of(
Arguments.of(false, 128, Duration.ofMinutes(2), 10),
Expand Down Expand Up @@ -635,14 +670,18 @@ private void assertScalingEvent(
class DerbyJdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEventHandlerITCase
implements DerbyTestBase {

@Disabled("Closed due to the 'LIMIT' clause is not supported in Derby.")
@Disabled("Disabled due to the 'LIMIT' clause is not supported in Derby.")
@Override
void testCleanExpiredEvents(
boolean tryIdNotSequential,
int expiredRecordsNum,
Duration eventHandlerTtl,
int unexpiredRecordsNum)
throws Exception {}

@Disabled("Disabled due to the 'LIMIT' clause is not supported in Derby.")
@Override
void testQuickCleanExpiredEvents() throws Exception {}
}

/** Test {@link JdbcAutoScalerEventHandler} via MySQL 5.6.x. */
Expand All @@ -659,4 +698,9 @@ class MySQL8JdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEvent

/** Test {@link JdbcAutoScalerEventHandler} via Postgre SQL. */
class PostgreSQLJdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEventHandlerITCase
implements PostgreSQLTestBase {}
implements PostgreSQLTestBase {

@Disabled("Disabled due to the column 'id' can only be updated to DEFAULT.")
@Override
void testQuickCleanExpiredEvents() throws Exception {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;

import java.sql.Connection;
import java.sql.Timestamp;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -31,12 +32,14 @@ class CountableJdbcEventInteractor extends JdbcEventInteractor {
private final AtomicLong queryCounter;
private final AtomicLong createCounter;
private final AtomicLong updateCounter;
private final AtomicLong deleteExpiredCounter;

public CountableJdbcEventInteractor(Connection conn) {
super(conn);
queryCounter = new AtomicLong();
createCounter = new AtomicLong();
updateCounter = new AtomicLong();
deleteExpiredCounter = new AtomicLong();
}

@Override
Expand Down Expand Up @@ -64,10 +67,21 @@ public void updateEvent(long id, String message, int eventCount) throws Exceptio
super.updateEvent(id, message, eventCount);
}

@Override
int deleteExpiredEventsByIdRangeAndDate(long startId, long endId, Timestamp timestamp)
throws Exception {
deleteExpiredCounter.incrementAndGet();
return super.deleteExpiredEventsByIdRangeAndDate(startId, endId, timestamp);
}

public void assertCounters(
long expectedQueryCounter, long expectedUpdateCounter, long expectedCreateCounter) {
assertThat(queryCounter).hasValue(expectedQueryCounter);
assertThat(updateCounter).hasValue(expectedUpdateCounter);
assertThat(createCounter).hasValue(expectedCreateCounter);
}

public void assertDeleteExpiredCounter(long expectedCounter) {
assertThat(deleteExpiredCounter).hasValue(expectedCounter);
}
}

0 comments on commit 63edf33

Please sign in to comment.