Skip to content

Commit

Permalink
add the test
Browse files Browse the repository at this point in the history
  • Loading branch information
1996fanrui committed Feb 6, 2024
1 parent 0166791 commit 394fe76
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import javax.annotation.Nonnull;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Clock;
import java.util.Optional;
Expand All @@ -44,33 +46,42 @@ public Optional<AutoScalerEvent> queryLatestEvent(String jobKey, String reason,
throws Exception {
var query =
"select * from t_flink_autoscaler_event_handler "
+ "where job_key = ? and reason = ? and event_key = ? "
+ "order by create_time desc limit 1";
+ "where job_key = ? and reason = ? and event_key = ? ";

try (var pstmt = conn.prepareStatement(query)) {
pstmt.setString(1, jobKey);
pstmt.setString(2, reason);
pstmt.setString(3, eventKey);

var rs = pstmt.executeQuery();
AutoScalerEvent event = null;
if (rs.next()) {
event =
new AutoScalerEvent(
rs.getLong("id"),
rs.getTimestamp("create_time").toInstant(),
rs.getTimestamp("update_time").toInstant(),
rs.getString("job_key"),
rs.getString("reason"),
rs.getString("event_type"),
rs.getString("message"),
rs.getInt("event_count"),
rs.getString("event_key"));
// A better approach of finding the latestEvent is sql query desc the id and limit 1,
// but the limit syntax is different for different databases.
AutoScalerEvent latestEvent = null;
while (rs.next()) {
var currentEvent = generateEvent(rs);
if (latestEvent == null || latestEvent.getId() < currentEvent.getId()) {
// If the current event is newer than the latestEvent, then update the
// latestEvent.
latestEvent = currentEvent;
}
}
return Optional.ofNullable(event);
return Optional.ofNullable(latestEvent);
}
}

private AutoScalerEvent generateEvent(ResultSet rs) throws SQLException {
return new AutoScalerEvent(
rs.getLong("id"),
rs.getTimestamp("create_time").toInstant(),
rs.getTimestamp("update_time").toInstant(),
rs.getString("job_key"),
rs.getString("reason"),
rs.getString("event_type"),
rs.getString("message"),
rs.getInt("event_count"),
rs.getString("event_key"));
}

public void createEvent(
String jobKey,
String reason,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.autoscaler.jdbc.event;

import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest;
import org.apache.flink.autoscaler.jdbc.testutils.databases.derby.DerbyTestBase;
import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL56TestBase;
import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL57TestBase;
import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL8TestBase;
import org.apache.flink.autoscaler.jdbc.testutils.databases.postgres.PostgreSQLTestBase;

import org.junit.jupiter.api.Test;

/** The abstract IT case for {@link JdbcAutoScalerEventHandler}. */
abstract class AbstractJdbcAutoscalerEventHandlerITCase implements DatabaseTest {

@Test
void test() {
// todo test all if else conditions.
}
}

/** Test {@link JdbcAutoScalerEventHandler} via Derby. */
class DerbyJdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEventHandlerITCase
implements DerbyTestBase {}

/** Test {@link JdbcAutoScalerEventHandler} via MySQL 5.6.x. */
class MySQL56JdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEventHandlerITCase
implements MySQL56TestBase {}

/** Test {@link JdbcAutoScalerEventHandler} via MySQL 5.7.x. */
class MySQL57JdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEventHandlerITCase
implements MySQL57TestBase {}

/** Test {@link JdbcAutoScalerEventHandler} via MySQL 8.x. */
class MySQL8JdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEventHandlerITCase
implements MySQL8TestBase {}

/** Test {@link JdbcAutoScalerEventHandler} via Postgre SQL. */
class PostgreSQLJdbcAutoscalerEventHandlerITCase extends AbstractJdbcAutoscalerEventHandlerITCase
implements PostgreSQLTestBase {}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest;
import org.apache.flink.autoscaler.jdbc.testutils.databases.derby.DerbyTestBase;
import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL56TestBase;
import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL57TestBase;
import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL8TestBase;
Expand Down Expand Up @@ -129,6 +130,10 @@ private void assertEvent(
}
}

/** Test {@link JdbcEventInteractor} via Derby. */
class DerbyJdbcEventInteractorITCase extends AbstractJdbcEventInteractorITCase
implements DerbyTestBase {}

/** Test {@link JdbcEventInteractor} via MySQL 5.6.x. */
class MySQL56JdbcEventInteractorITCase extends AbstractJdbcEventInteractorITCase
implements MySQL56TestBase {}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.autoscaler.jdbc.event;

import org.apache.flink.autoscaler.event.AutoScalerEventHandler;

import java.sql.Connection;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

import static org.assertj.core.api.Assertions.assertThat;

/** Countable {@link JdbcEventInteractor}. */
class CountableJdbcEventInteractor extends JdbcEventInteractor {

private final AtomicLong queryCounter;
private final AtomicLong createCounter;
private final AtomicLong updateCounter;

public CountableJdbcEventInteractor(Connection conn) {
super(conn);
queryCounter = new AtomicLong();
createCounter = new AtomicLong();
updateCounter = new AtomicLong();
}

@Override
public Optional<AutoScalerEvent> queryLatestEvent(String jobKey, String reason, String eventKey)
throws Exception {
queryCounter.incrementAndGet();
return super.queryLatestEvent(jobKey, reason, eventKey);
}

@Override
public void createEvent(
String jobKey,
String reason,
AutoScalerEventHandler.Type type,
String message,
String eventKey)
throws Exception {
createCounter.incrementAndGet();
super.createEvent(jobKey, reason, type, message, eventKey);
}

@Override
public void updateEvent(long id, String message, int eventCount) throws Exception {
updateCounter.incrementAndGet();
super.updateEvent(id, message, eventCount);
}

public void assertCounters(
long expectedQueryCounter, long expectedUpdateCounter, long expectedCreateCounter) {
assertThat(queryCounter).hasValue(expectedQueryCounter);
assertThat(updateCounter).hasValue(expectedUpdateCounter);
assertThat(createCounter).hasValue(expectedCreateCounter);
}
}

0 comments on commit 394fe76

Please sign in to comment.