-
Notifications
You must be signed in to change notification settings - Fork 402
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36039][autoscaler] Support clean historical event handler records in JDBC event handler #865
Conversation
0dddc09
to
ecabeee
Compare
...alone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
Show resolved
Hide resolved
...src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java
Outdated
Show resolved
Hide resolved
...lone/src/main/java/org/apache/flink/autoscaler/standalone/AutoscalerEventHandlerFactory.java
Outdated
Show resolved
Hide resolved
...er-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java
Outdated
Show resolved
Hide resolved
...er-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/event/JdbcEventInteractor.java
Outdated
Show resolved
Hide resolved
Executors.newSingleThreadScheduledExecutor( | ||
new ThreadFactoryBuilder() | ||
.setNameFormat("jdbc-autoscaler-events-cleaner") | ||
.setDaemon(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why it's false here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the comments.
Please let me know what are you referring is to the explicit call to ".setDaemon(false)" due the default value is false
here, or are you suggesting that it should be set to true
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious what you think about this? In which cases should we use true and in which cases should we use false.
And in the current case, why do you use false? (I don't care if false is set manually or is the default value).
It is necessary for developers to know the meaning of every line of code they write and why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thx for the clarify.
- IIUC, There is no explicit exit operation in the entire service, so the default value of false can be used here. It is not necessary to set it to true to automatically exit after waiting for other threads to finish.
- Explicit call for readability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no explicit exit operation in the entire service
Jdbc event handler doesn't know who call itself, it doesn't know Autoscaler Standalone, and doesn't know k8s operator.
String message, | ||
String eventKey, | ||
Timestamp createTime) | ||
throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need introduce new method for testing, you could setClock to control the timestamp.
Check AbstractJdbcAutoscalerEventHandlerITCase#testEventIntervalWithoutMessageKey
quickQuery("select create_time from t_flink_autoscaler_event_handler")) { | ||
assertThat(countResultSet).isNotNull(); | ||
assertThat(countResultSet.next()).isTrue(); | ||
assertThat(countResultSet.getInt(1)).isOne(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we extract the unexpired record number into one parameter? It could cover more cases.
assertThat(createTimeResultSet.next()).isTrue(); | ||
assertThat(createTimeResultSet.getTimestamp(1)).isEqualTo(createTimeOfUnExpiredRecord); | ||
} catch (SQLException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't understand why need this catch?
AutoScalerEventHandler.Type.Normal, | ||
"message", | ||
"eventKey", | ||
createTimeOfUnExpiredRecord); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unexpired is a word, right?
final Clock clock = Clock.systemDefaultZone(); | ||
initExpiredEventHandlerRecords(eventHandlerTtl, expiredRecordsNum, clock); | ||
final Timestamp createTimeOfUnExpiredRecord = Timestamp.from(clock.instant()); | ||
jdbcEventInteractor.createEvent( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't insert event via event handler? It's a little wired that creating event hanldler after inserting data. In addion, It could cover more cases.
return null; | ||
} | ||
var result = new ExpiredEventsResult(resultSet.getInt(1), resultSet.getLong(2)); | ||
resultSet.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need it, copy from the comment of java.sql.ResultSet#close
. Let us keep all queries are same style.
Note: A ResultSet object is automatically closed by the Statement object that generated it when that Statement object is closed, re-executed, or is used to retrieve the next result from a sequence of multiple results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this comment not addressed?
CI is failed, please take a look as well, thanks~ |
9d058a4
to
c57e30b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @1996fanrui for the quick review.
I updated it based on the comments. PTAL~
var startId = expiredResult.minId; | ||
var endId = Math.min(startId + batch, expiredResult.maxId); | ||
while (startId <= expiredResult.maxId) { | ||
jdbcEventInteractor.deleteExpiredEventsByIdRange(startId, endId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The limit
is not supported in postgres delete clause.
So, I try to use the current way
Thread.sleep(restInterval); | ||
} | ||
log.info( | ||
"Deleted expired {} event handler records successfully whose id is smaller than {}.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
smaller than or equal to ?
long maxId; | ||
|
||
public ExpiredEventsResult(int expiredRecords, long minId, long maxId) { | ||
this.expiredRecords = expiredRecords; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After introducing minId, the expiredRecords
is not needed anymore, right?
Also, please update the comment of ExpiredEventsResult as well.
return null; | ||
} | ||
var result = new ExpiredEventsResult(resultSet.getInt(1), resultSet.getLong(2)); | ||
resultSet.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this comment not addressed?
@Nullable | ||
ExpiredEventsResult queryExpiredEventsAndMaxId(Duration eventHandlerTtl) throws Exception { | ||
var query = | ||
"SELECT COUNT(1) records_num, min(id) min_existed_id, max(id) max_target_id " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you unify the name of min_existed_id and max_target_id?
var batch = 4098; | ||
var startId = expiredResult.minId; | ||
var endId = Math.min(startId + batch, expiredResult.maxId); | ||
while (startId <= expiredResult.maxId) { | ||
jdbcEventInteractor.deleteExpiredEventsByIdRange(startId, endId); | ||
startId += batch; | ||
endId = Math.min(endId + batch, expiredResult.maxId); | ||
log.debug("Deleted expired event handler records by batch successfully."); | ||
Thread.sleep(10L); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems the loop is pretty easier than while loop, and we don't need to introduce a series of variables.
var batch = 4098; | |
var startId = expiredResult.minId; | |
var endId = Math.min(startId + batch, expiredResult.maxId); | |
while (startId <= expiredResult.maxId) { | |
jdbcEventInteractor.deleteExpiredEventsByIdRange(startId, endId); | |
startId += batch; | |
endId = Math.min(endId + batch, expiredResult.maxId); | |
log.debug("Deleted expired event handler records by batch successfully."); | |
Thread.sleep(10L); | |
} | |
var batch = 4098; | |
for (var startId = expiredResult.minId; startId <= expiredResult.maxId; startId += batch) { | |
jdbcEventInteractor.deleteExpiredEventsByIdRange(startId, Math.min(start + batch, expiredResult.maxId)); | |
log.debug("Deleted expired event handler records by batch successfully."); | |
Thread.sleep(10L); | |
} |
Arguments.of(1024, Duration.ofMinutes(2), 1), | ||
Arguments.of(1024 * 5, Duration.ofMinutes(2), 2), | ||
Arguments.of(1024, Duration.ofMinutes(100), 3), | ||
Arguments.of(1024 * 5, Duration.ofMinutes(100), 100)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test don't cover the case that unexpiredRecordsNum
is 0.
try (Connection con = getConnection(); | ||
PreparedStatement ps = | ||
con.prepareStatement("delete from t_flink_autoscaler_event_handler")) { | ||
ps.execute(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you find the t_flink_autoscaler_event_handler
table has data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, Here would be some data resulted by the parameterized test running due to the shared database instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, PostgreSQLExtension and MySQLExtension have afterEach
, it cleans all data.
But they missed t_flink_autoscaler_event_handler
table. I submitted #866 to fix it.
@MethodSource("getExpiredEventHandlersCaseMatrix") | ||
@ParameterizedTest | ||
void testCleanExpiredEvents( | ||
int expiredRecordsNum, Duration eventHandlerTtl, int unexpiredRecordsNum) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a little wired that JdbcAutoScalerEventHandler#eventHandlerTtl
is always 0, but it could clean expired records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for comments.
IIUC, Maybe it's accepted, because in the current case, we just want to test the method cleanExpiredEvents
of JdbcAutoScalerEventHandler instead of the whole logic of the JdbcAutoScalerEventHandler
.
If needed, I'd like to do some change for the stable test cases.
Arguments.of(1024, Duration.ofMinutes(100), 3), | ||
Arguments.of(1024, Duration.ofMinutes(100), 0), | ||
Arguments.of(1024 * 5, Duration.ofMinutes(100), 100), | ||
Arguments.of(1024 * 5, Duration.ofMinutes(100), 0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could try run unexpiredRecordsNum = 0
locally, all records cannot be cleaned.
In addition, it's needed to add the case that expiredRecordsNum = 0.
06212b7
to
8f23741
Compare
+ " SELECT id FROM t_flink_autoscaler_event_handler " | ||
+ " WHERE create_time >= ? ORDER BY id ASC LIMIT 1)"; | ||
+ " WHERE create_time < ? ORDER BY id DESC LIMIT 1)"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand, desc will scan most of data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thx for the comments.
Made some change for it. PTAL~ :)
c1d80a0
to
3605359
Compare
int deleted = 0, batch = 4098; | ||
var count = 0L; | ||
boolean cleanable = true; | ||
for (long startId = minId; deleted == batch || cleanable; startId += batch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- IIUC,
deleted == batch
isn't needed. If they are not equal, you updated the cleanable, so we check cleanable is enough. - If 1 is acceptable, we don't need the
for loop
.while(true) loop
is enough, and we can break the loop ifcleanable
is false. - If 1 is acceptable,
deleted
can be defined inside ofloop
. - If 2 is acceptable, we don't need
cleanable
. We can checkjdbcEventInteractor.queryMinEventIdByCreateTime(date) == null
, if it's true, break directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for comments.
IIUC, deleted == batch isn't needed. If they are not equal, you updated the cleanable, so we check cleanable is enough.
Yes, it does overall. But here's a small difference in this condition.
The deleted==batch
would reduce the sql query from the DB,
jdbcEventInteractor.queryMinEventIdByCreateTime(date)
This query function is more like searching for the starting point of an interval composed of consecutive IDs that meet the deletion criteria. the deleted==batch
would drive the deletion action in the consecutive IDs.
Please correct me if I'm wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the code, jdbcEventInteractor.queryMinEventIdByCreateTime(date) != null;
is only called when deleted!=batch
.
It means jdbcEventInteractor.queryMinEventIdByCreateTime(date) != null;
won't be called when deleted==batch
. And for loop
will delete next round.
I don't understand why updating deleted == batch || cleanable
to cleanable
will query more times from Database.
return; | ||
} | ||
int deleted = 0, batch = 4098; | ||
var count = 0L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var count = 0L; | |
var deletedTotalCount = 0L; |
log.warn("No expired event handlers queried at {}", new Date()); | ||
return; | ||
} | ||
int deleted = 0, batch = 4098; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about renaming it to batchSize?
} | ||
log.debug( | ||
"Deleted expired {} event handler records by batch successfully.", deleted); | ||
Thread.sleep(10L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about sleep 1s?
One minute could clean 4096 * 60 = 240 k, it's totally enough.
var sql = | ||
"SELECT id from t_flink_autoscaler_event_handler " | ||
+ " where id = (SELECT id FROM t_flink_autoscaler_event_handler order by id asc limit 1) " | ||
+ " and create_time < ?"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SELECT id, create_time FROM t_flink_autoscaler_event_handler order by id asc limit 1
is enough, we can check create_time in memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the comment.
I conducted some tests based on the above logic, and this SQL statement has these two characteristics:
- It would run a few times per triggering..
- Running on a large amount of data takes very little time. Even equivalent to this sentence
SELECT id FROM t_flink_autoscaler_event_handler order by id asc limit 1
How about keeping it as now? of course, I'd like to updated it if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about keeping it as now? of course, I'd like to updated it if needed.
It's up to you, both of them are fine for me.
This sql only query the first row, so the performance isn't my concern.
I leave this comment because sql looks weird at first glance.
startId, endId, date); | ||
count += deleted; | ||
if (deleted != batch) { | ||
cleanable = jdbcEventInteractor.queryMinEventIdByCreateTime(date) != null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, if it's not null, we should update the startId = jdbcEventInteractor.queryMinEventIdByCreateTime(date)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And please add some tests to cover this case if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added.
.getCurrentInstant() | ||
.minusMillis(eventHandlerTtl.toMillis())); | ||
try { | ||
Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could define a nested loops:
- The outer loop check
Long minId = jdbcEventInteractor.queryMinEventIdByCreateTime(date)
, andminId != null
- The inner loop check
deleted == batchSize
.
106c4b3
to
e3defd6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM assuming the CI is green.
Hey @huyuanfeng2018 , as I know, your employer also needs this feature. Would you mind reviewing this PR as well? thank you very much.
d443243
to
63edf33
Compare
…rds in JDBC event handler
Updated based on review comments Addressed comments Updated based on review comments Addressed review comments.
63edf33
to
a9b42bd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks to @RocMarshal for contribution and @1996fanrui for review, the code looks good to me!
What is the purpose of the change
Support clean historical event handler records in JDBC event handler
Brief change log
Verifying this change
Tested locally.
Does this pull request potentially affect one of the following parts:
CustomResourceDescriptors
: (yes / no)Documentation