Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,9 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-jre8</artifactId>
<version>3.0.1</version>
<groupId> org.wiremock</groupId>
<artifactId>wiremock</artifactId>
<version>3.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -299,15 +299,15 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<vertx.version>4.5.13</vertx.version>
<vertx.version>5.0.5</vertx.version>
<jsonschema2pojo_output_dir>${project.build.directory}/generated-sources/jsonschema2pojo</jsonschema2pojo_output_dir>
<httpcomponents.version>4.5.14</httpcomponents.version>
<lombok.version>1.18.30</lombok.version>
<postgres.version>42.7.2</postgres.version>
<liquibase.version>4.9.1</liquibase.version>
<lombok.version>1.18.42</lombok.version>
<postgres.version>42.7.8</postgres.version>
<liquibase.version>5.0.1</liquibase.version>
<kafkaclients.version>3.9.1</kafkaclients.version>
<junit.version>4.13.2</junit.version>
<data-import-processing-core.version>4.5.0-SNAPSHOT</data-import-processing-core.version>
<data-import-processing-core.version>5.0.0-SNAPSHOT</data-import-processing-core.version>
<folio-module-descriptor-validator.version>1.0.0</folio-module-descriptor-validator.version>
</properties>

Expand Down Expand Up @@ -495,13 +495,16 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
<version>3.5.4</version>
<configuration>
<!-- TODO: update to version 3.0.0 and remove useSystemClassLoader
https://issues.folio.org/browse/FOLIO-1609
https://issues.apache.org/jira/browse/SUREFIRE-1588
-->
<useSystemClassLoader>false</useSystemClassLoader>
<systemPropertyVariables>
<api.version>1.44</api.version>
</systemPropertyVariables>
<excludes>
<exclude>**/api/**Examples.class</exclude>
</excludes>
Expand Down
49 changes: 23 additions & 26 deletions src/main/java/org/folio/inventory/InventoryVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import java.lang.invoke.MethodHandles;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.inventory.common.WebRequestDiagnostics;
Expand All @@ -25,10 +28,6 @@
import org.folio.inventory.resources.UpdateOwnershipApi;
import org.folio.inventory.storage.Storage;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpServer;
import io.vertx.core.json.JsonObject;
Expand Down Expand Up @@ -77,33 +76,31 @@ public void start(Promise<Void> started) {
new UpdateOwnershipApi(storage, client, consortiumService, snapshotService, new InventoryClientFactoryImpl()).register(router);
new TenantItems(client).register(router);

Handler<AsyncResult<HttpServer>> onHttpServerStart = result -> {
if (result.succeeded()) {
log.info("Listening on {}", server.actualPort());
started.complete();
} else {
started.fail(result.cause());
}
};

server.requestHandler(router)
.listen(config.getInteger("port"), onHttpServerStart);
.listen(config.getInteger("port"))
.onSuccess(httpServer -> {
log.info("Listening on {}", httpServer.actualPort());
started.complete();
})
.onFailure(started::fail);
}

@Override
public void stop(Promise<Void> stopped) {
final Logger log = LogManager.getLogger(MethodHandles.lookup().lookupClass());

PostgresClientFactory.closeAll();

log.info("Stopping inventory module");
server.close(result -> {
if (result.succeeded()) {
log.info("Inventory module stopped");
stopped.complete();
} else {
stopped.fail(result.cause());
}
});
log.info("Stopping inventory module...");
Future<Void> dbCloseFuture = PostgresClientFactory.closeAll();
Future<Void> serverCloseFuture = server.close();
Future.all(dbCloseFuture, serverCloseFuture)
.mapEmpty()
.onComplete(ar -> {
if (ar.succeeded()) {
log.info("Inventory module and all resources stopped successfully.");
stopped.complete();
} else {
log.error("Failed to stop inventory module cleanly", ar.cause());
stopped.fail(ar.cause());
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_HOLDING_HRID_SET;
import static org.folio.inventory.dataimport.util.ConsumerWrapperUtil.constructModuleName;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -32,7 +32,7 @@ public void start(Promise<Void> startPromise) {
var marcBibInstanceHridSetKafkaHandler = new MarcBibInstanceHridSetKafkaHandler(instanceUpdateDelegate, getMappingMetadataCache());
var marcHoldingsRecordHridSetKafkaHandler = new MarcHoldingsRecordHridSetKafkaHandler(holdingsRecordUpdateDelegate, getMappingMetadataCache());

CompositeFuture.all(
Future.all(
marcBibConsumerWrapper.start(marcBibInstanceHridSetKafkaHandler, constructModuleName()),
marcHoldingsConsumerWrapper.start(marcHoldingsRecordHridSetKafkaHandler, constructModuleName())
)
Expand Down
65 changes: 21 additions & 44 deletions src/main/java/org/folio/inventory/common/VertxAssistant.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,17 @@ public Vertx getVertx() {

public void stop() {
CompletableFuture<Void> stopped = new CompletableFuture<>();

stop(stopped);

stopped.join();
}

public void stop(final CompletableFuture<Void> stopped) {

if (vertx != null) {
vertx.close(res -> {
if (res.succeeded()) {
stopped.complete(null);
} else {
stopped.completeExceptionally(res.cause());
}
}
);
vertx.close()
.onSuccess(v -> stopped.complete(null))
.onFailure(stopped::completeExceptionally);
} else {
stopped.complete(null);
}
}

Expand All @@ -68,24 +62,17 @@ public void deployVerticle(String verticleClass,
long startTime = System.currentTimeMillis();

DeploymentOptions options = new DeploymentOptions();

options.setConfig(new JsonObject(config));
options.setWorker(true);
options.setThreadingModel(ThreadingModel.WORKER);
options.setInstances(verticleInstancesNumber);

vertx.deployVerticle(verticleClass, options, result -> {
if (result.succeeded()) {
vertx.deployVerticle(verticleClass, options)
.onSuccess(deploymentId -> {
long elapsedTime = System.currentTimeMillis() - startTime;

log.info(String.format(
"%s deployed in %s milliseconds", verticleClass, elapsedTime));

deployed.complete(result.result());
} else {
deployed.completeExceptionally(result.cause());
}
});

log.info(String.format("%s deployed in %s milliseconds", verticleClass, elapsedTime));
deployed.complete(deploymentId);
})
.onFailure(deployed::completeExceptionally);
}

public void deployVerticle(Supplier<Verticle> verticleSupplier,
Expand All @@ -100,30 +87,20 @@ public void deployVerticle(Supplier<Verticle> verticleSupplier,
.setThreadingModel(ThreadingModel.WORKER)
.setInstances(verticleInstancesNumber);

vertx.deployVerticle(verticleSupplier, options, result -> {
if (result.succeeded()) {
long elapsedTime = System.currentTimeMillis() - startTime;

log.info("{} deployed in {} milliseconds", verticleClass, elapsedTime);

deployed.complete(result.result());
} else {
deployed.completeExceptionally(result.cause());
}
});

vertx.deployVerticle(verticleSupplier, options)
.onSuccess(result -> {
long elapsedTime = System.currentTimeMillis() - startTime;
log.info("{} deployed in {} milliseconds", verticleClass, elapsedTime);
deployed.complete(result);
}).onFailure(deployed::completeExceptionally);
}

public void undeployVerticle(String deploymentId,
CompletableFuture<Void> undeployed) {

vertx.undeploy(deploymentId, result -> {
if (result.succeeded()) {
undeployed.complete(null);
} else {
undeployed.completeExceptionally(result.cause());
}
});
vertx.undeploy(deploymentId)
.onSuccess(result -> undeployed.complete(null))
.onFailure(undeployed::completeExceptionally);
}

public CompletableFuture<Void> undeployVerticle(String deploymentId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ private WebRequestDiagnostics() {

public static void outputDiagnostics(RoutingContext routingContext) {

log.info(String.format("Handling %s %s", routingContext.request().method().name(),
routingContext.normalisedPath()));
log.info("Handling {} {}", routingContext.request().method().name(), routingContext.normalizedPath());

outputHeaders(routingContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,26 @@
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgPool;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.PreparedQuery;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.Tuple;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.SqlClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.folio.inventory.common.dao.PostgresConnectionOptions.convertToPsqlStandard;

public class PostgresClientFactory {
private static final Logger LOGGER = LogManager.getLogger(PostgresClientFactory.class);

private static final Map<String, PgPool> POOL_CACHE = new HashMap<>();
private static final Map<String, Pool> POOL_CACHE = new HashMap<>();

/**
* Such field is temporary solution which is used to allow resetting the pool in tests.
Expand All @@ -35,12 +36,12 @@ public PostgresClientFactory(Vertx vertx) {
}

/**
* Get {@link PgPool}.
* Get {@link Pool}.
*
* @param tenantId tenant id.
* @return pooled database client.
*/
public PgPool getCachedPool(String tenantId) {
public Pool getCachedPool(String tenantId) {
return getCachedPool(this.vertx, tenantId);
}

Expand All @@ -56,7 +57,7 @@ public Future<RowSet<Row>> execute(String sql, Tuple tuple, String tenantId) {
return future.compose(x -> preparedQuery(sql, tenantId).execute(tuple));
}

private PgPool getCachedPool(Vertx vertx, String tenantId) {
private Pool getCachedPool(Vertx vertx, String tenantId) {
// assumes a single thread Vert.x model so no synchronized needed
if (POOL_CACHE.containsKey(tenantId) && !shouldResetPool) {
LOGGER.debug("Using existing database connection pool for tenant {}.", tenantId);
Expand All @@ -68,9 +69,7 @@ private PgPool getCachedPool(Vertx vertx, String tenantId) {
}
LOGGER.info("Creating new database connection pool for tenant {}.", tenantId);
PgConnectOptions connectOptions = PostgresConnectionOptions.getConnectionOptions(tenantId);
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(PostgresConnectionOptions.getMaxPoolSize());
PgPool pgPool = PgPool.pool(vertx, connectOptions, poolOptions);
Pool pgPool = Pool.pool(vertx, connectOptions, PostgresConnectionOptions.getPoolOptions());
POOL_CACHE.put(tenantId, pgPool);

return pgPool;
Expand All @@ -83,11 +82,20 @@ private PreparedQuery<RowSet<Row>> preparedQuery(String sql, String tenantId) {
}

/**
* close all {@link PgPool} clients.
* close all {@link Pool} clients.
*/
public static void closeAll() {
POOL_CACHE.values().forEach(SqlClient::close);
POOL_CACHE.clear();
public static Future<Void> closeAll() {
List<Future<?>> closeFutures = POOL_CACHE.values()
.stream()
.map(SqlClient::close)
.collect(Collectors.toList());

return Future.all(closeFutures)
.onSuccess(v -> {
POOL_CACHE.clear();
LOGGER.info("All SQL pools closed and cache cleared.");
})
.mapEmpty();
}

/**
Expand Down
Loading