Skip to content

Internal heartbeat Timer, use Virtual Thread for Java 21 #135

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 29, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions blackbox-tests/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.ebean</groupId>
<artifactId>ebean-datasource-parent</artifactId>
<version>9.6</version>
</parent>

<artifactId>blackbox-tests</artifactId>

<properties>
<maven.compiler.release>21</maven.compiler.release>
</properties>

<dependencies>
<dependency>
<groupId>io.ebean</groupId>
<artifactId>ebean-datasource</artifactId>
<version>9.6</version>
</dependency>

<dependency>
<groupId>io.avaje</groupId>
<artifactId>junit</artifactId>
<version>1.5</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.ebean</groupId>
<artifactId>ebean-test-containers</artifactId>
<version>7.8</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.7.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.5.17</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk-platform-logging</artifactId>
<version>2.0.17</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.avaje</groupId>
<artifactId>avaje-slf4j-jpl</artifactId>
<version>1.2</version>
<scope>test</scope>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.example.tests;

import io.ebean.datasource.DataSourceBuilder;
import io.ebean.datasource.DataSourcePool;
import io.ebean.test.containers.PostgresContainer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

class Java21TrimAndShutdownTest {

private static Logger log = LoggerFactory.getLogger(Java21TrimAndShutdownTest.class);

@BeforeAll
static void before() {
PostgresContainer.builder("15")
.port(9999)
.containerName("pool_test")
.dbName("app")
.user("db_owner")
.build()
.startWithDropCreate();
}

@Test
void test() throws InterruptedException, SQLException {
Properties clientInfo = new Properties();
clientInfo.setProperty("ApplicationName", "my-test");

DataSourcePool pool = DataSourceBuilder.create()
.url("jdbc:postgresql://127.0.0.1:9999/app")
.username("db_owner")
.password("test")
.clientInfo(clientInfo)
.maxInactiveTimeSecs(2)
.heartbeatFreqSecs(1)
.trimPoolFreqSecs(1)
.build();

List<Connection> connectionList = new ArrayList<>();
for (int i = 0; i < 50; i++) {
connectionList.add(pool.getConnection());
}

// close them slowly to allow multiple trims
for (Connection connection : connectionList) {
connection.rollback();
connection.close();
Thread.sleep(200);
}

log.info("----------- Sleep allowing trim -------------");
Thread.sleep(9_000);
log.info("----------- Shutdown pool -------------");
pool.shutdown();
}
}
19 changes: 19 additions & 0 deletions blackbox-tests/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<configuration scan="true" scanPeriod="10 seconds">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>

<logger name="java.lang" level="WARN"/>
<logger name="io.ebean" level="INFO"/>
<logger name="io.avaje.config" level="TRACE"/>
<logger name="io.ebean.docker" level="DEBUG"/>
<logger name="io.ebean.test" level="TRACE"/>
<logger name="io.ebean.datasource" level="TRACE"/>

</configuration>
Original file line number Diff line number Diff line change
@@ -26,6 +26,12 @@
*/
final class ConnectionPool implements DataSourcePool {

@FunctionalInterface
interface Heartbeat {

void stop();
}

private static final String APPLICATION_NAME = "ApplicationName";
private final ReentrantLock heartbeatLock = new ReentrantLock(false);
private final ReentrantLock notifyLock = new ReentrantLock(false);
@@ -80,7 +86,7 @@ final class ConnectionPool implements DataSourcePool {
private final int waitTimeoutMillis;
private final int pstmtCacheSize;
private final PooledConnectionQueue queue;
private Timer heartBeatTimer;
private Heartbeat heartbeat;
private int heartbeatPoolExhaustedCount;
private final ExecutorService executor;

@@ -161,13 +167,6 @@ void pstmtCacheMetrics(PstmtCache pstmtCache) {
pscRem.add(pstmtCache.removeCount());
}

final class HeartBeatRunnable extends TimerTask {
@Override
public void run() {
heartBeat();
}
}

@Override
public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
throw new SQLFeatureNotSupportedException("We do not support java.util.logging");
@@ -387,7 +386,7 @@ private void trimIdleConnections() {
* This is called by the HeartbeatRunnable which should be scheduled to
* run periodically (every heartbeatFreqSecs seconds).
*/
private void heartBeat() {
void heartbeat() {
trimIdleConnections();
if (validateOnHeartbeat) {
testConnection();
@@ -727,11 +726,10 @@ private void startHeartBeatIfStopped() {
heartbeatLock.lock();
try {
// only start if it is not already running
if (heartBeatTimer == null) {
if (heartbeat == null) {
int freqMillis = heartbeatFreqSecs * 1000;
if (freqMillis > 0) {
heartBeatTimer = new Timer(name + ".heartBeat", true);
heartBeatTimer.scheduleAtFixedRate(new HeartBeatRunnable(), freqMillis, freqMillis);
heartbeat = ExecutorFactory.newHeartBeat(this, freqMillis);
}
}
} finally {
@@ -743,9 +741,9 @@ private void stopHeartBeatIfRunning() {
heartbeatLock.lock();
try {
// only stop if it was running
if (heartBeatTimer != null) {
heartBeatTimer.cancel();
heartBeatTimer = null;
if (heartbeat != null) {
heartbeat.stop();
heartbeat = null;
}
} finally {
heartbeatLock.unlock();
Original file line number Diff line number Diff line change
@@ -4,13 +4,15 @@
import io.ebean.datasource.DataSourceFactory;
import io.ebean.datasource.DataSourcePool;

import static java.util.Objects.requireNonNullElse;

/**
* DataSourceFactory implementation that is service loaded.
*/
public final class ConnectionPoolFactory implements DataSourceFactory {

@Override
public DataSourcePool createPool(String name, DataSourceConfig config) {
return new ConnectionPool(name, config);
return new ConnectionPool(requireNonNullElse(name, ""), config);
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,65 @@
package io.ebean.datasource.pool;

import io.ebean.datasource.pool.ConnectionPool.Heartbeat;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

final class ExecutorFactory {

static ExecutorService newExecutor() {
return Executors.newSingleThreadExecutor();
return Executors.newSingleThreadExecutor(factory());
}

private static ThreadFactory factory() {
return runnable -> {
Thread thread = new Thread(runnable);
thread.setName("datasource.reaper");
return thread;
};
}

/**
* Return a new Heartbeat for the pool.
*/
static Heartbeat newHeartBeat(ConnectionPool pool, int freqMillis) {
final Timer timer = new Timer(nm(pool.name()), true);
timer.scheduleAtFixedRate(new HeartbeatTask(pool), freqMillis, freqMillis);
return new TimerHeartbeat(timer);
}

private static String nm(String poolName) {
return poolName.isEmpty() ? "datasource.heartbeat" : "datasource." + poolName + ".heartbeat";
}

private static final class TimerHeartbeat implements Heartbeat {

private final Timer timer;

private TimerHeartbeat(Timer timer) {
this.timer = timer;
}

@Override
public void stop() {
timer.cancel();
}
}

private static final class HeartbeatTask extends TimerTask {

private final ConnectionPool pool;

private HeartbeatTask(ConnectionPool pool) {
this.pool = pool;
}

@Override
public void run() {
pool.heartbeat();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,67 @@
package io.ebean.datasource.pool;

import io.ebean.datasource.pool.ConnectionPool.Heartbeat;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool: I didn't know this method until now:
Rolands note: Classes will be located in META-INF/versions/21
https://docs.oracle.com/javase/10/docs/specs/jar/jar.html#multi-release-jar-files


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;

final class ExecutorFactory {

static ExecutorService newExecutor() {
return Executors.newVirtualThreadPerTaskExecutor();
ThreadFactory factory = Thread.ofVirtual().name("datasource.reaper").factory();
return Executors.newThreadPerTaskExecutor(factory);
}

static Heartbeat newHeartBeat(ConnectionPool pool, int freqMillis) {
return new VTHeartbeat(pool, freqMillis).start();
}

private static final class VTHeartbeat implements Heartbeat {

private final AtomicBoolean running = new AtomicBoolean(false);
private final ConnectionPool pool;
private final int freqMillis;
private final Thread thread;

private VTHeartbeat(ConnectionPool pool, int freqMillis) {
this.pool = pool;
this.freqMillis = freqMillis;
this.thread = Thread.ofVirtual()
.name(nm(pool.name()))
.unstarted(this::run);
}

private static String nm(String poolName) {
return poolName.isEmpty() ? "datasource.heartbeat" : "datasource." + poolName + ".heartbeat";
}

private void run() {
while (running.get()) {
try {
Thread.sleep(freqMillis);
pool.heartbeat();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
// continue heartbeat
Log.warn("Error during heartbeat", e);
}
}
}

private Heartbeat start() {
running.set(true);
thread.start();
return this;
}

@Override
public void stop() {
running.set(false);
thread.interrupt();
}
}
}
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
*
* @author Roland Praml, Foconis Analytics GmbH
*/
@Disabled
public class NetworkOutageTest {

static void openPort(int port) throws Exception {
14 changes: 14 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -22,4 +22,18 @@
<module>ebean-datasource-api</module>
</modules>

<profiles>
<profile>
<id>central</id>
</profile>
<profile>
<id>default</id>
<activation>
<jdk>[21,]</jdk>
</activation>
<modules>
<module>blackbox-tests</module>
</modules>
</profile>
</profiles>
</project>