diff --git a/pom.xml b/pom.xml index 062fe2496..a8c63bff3 100644 --- a/pom.xml +++ b/pom.xml @@ -133,7 +133,7 @@ org.folio folio-kafka-wrapper - 3.4.0-SNAPSHOT + 4.0.0-SNAPSHOT org.projectlombok @@ -181,9 +181,9 @@ test - com.github.tomakehurst - wiremock-jre8 - 3.0.1 + org.wiremock + wiremock + 3.13.2 test @@ -299,15 +299,15 @@ UTF-8 UTF-8 - 4.5.13 + 5.0.5 ${project.build.directory}/generated-sources/jsonschema2pojo 4.5.14 - 1.18.30 - 42.7.2 - 4.9.1 + 1.18.42 + 42.7.8 + 5.0.1 3.9.1 4.13.2 - 4.5.0-SNAPSHOT + 5.0.0-SNAPSHOT 1.0.0 @@ -495,13 +495,16 @@ org.apache.maven.plugins maven-surefire-plugin - 2.22.2 + 3.5.4 false + + + **/api/**Examples.class diff --git a/src/main/java/org/folio/inventory/InventoryVerticle.java b/src/main/java/org/folio/inventory/InventoryVerticle.java index 81a320550..f6678e046 100644 --- a/src/main/java/org/folio/inventory/InventoryVerticle.java +++ b/src/main/java/org/folio/inventory/InventoryVerticle.java @@ -2,6 +2,9 @@ import java.lang.invoke.MethodHandles; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Future; +import io.vertx.core.Promise; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.folio.inventory.common.WebRequestDiagnostics; @@ -25,10 +28,6 @@ import org.folio.inventory.resources.UpdateOwnershipApi; import org.folio.inventory.storage.Storage; -import io.vertx.core.AbstractVerticle; -import io.vertx.core.AsyncResult; -import io.vertx.core.Handler; -import io.vertx.core.Promise; import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpServer; import io.vertx.core.json.JsonObject; @@ -77,33 +76,31 @@ public void start(Promise started) { new UpdateOwnershipApi(storage, client, consortiumService, snapshotService, new InventoryClientFactoryImpl()).register(router); new TenantItems(client).register(router); - Handler> onHttpServerStart = result -> { - if (result.succeeded()) { - log.info("Listening on {}", server.actualPort()); - started.complete(); - } else { - started.fail(result.cause()); - } - }; - server.requestHandler(router) - .listen(config.getInteger("port"), onHttpServerStart); + .listen(config.getInteger("port")) + .onSuccess(httpServer -> { + log.info("Listening on {}", httpServer.actualPort()); + started.complete(); + }) + .onFailure(started::fail); } @Override public void stop(Promise stopped) { final Logger log = LogManager.getLogger(MethodHandles.lookup().lookupClass()); - - PostgresClientFactory.closeAll(); - - log.info("Stopping inventory module"); - server.close(result -> { - if (result.succeeded()) { - log.info("Inventory module stopped"); - stopped.complete(); - } else { - stopped.fail(result.cause()); - } - }); + log.info("Stopping inventory module..."); + Future dbCloseFuture = PostgresClientFactory.closeAll(); + Future serverCloseFuture = server.close(); + Future.all(dbCloseFuture, serverCloseFuture) + .mapEmpty() + .onComplete(ar -> { + if (ar.succeeded()) { + log.info("Inventory module and all resources stopped successfully."); + stopped.complete(); + } else { + log.error("Failed to stop inventory module cleanly", ar.cause()); + stopped.fail(ar.cause()); + } + }); } } diff --git a/src/main/java/org/folio/inventory/MarcHridSetConsumerVerticle.java b/src/main/java/org/folio/inventory/MarcHridSetConsumerVerticle.java index 947a694ea..b394ba376 100644 --- a/src/main/java/org/folio/inventory/MarcHridSetConsumerVerticle.java +++ b/src/main/java/org/folio/inventory/MarcHridSetConsumerVerticle.java @@ -4,7 +4,7 @@ import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_HOLDING_HRID_SET; import static org.folio.inventory.dataimport.util.ConsumerWrapperUtil.constructModuleName; -import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; import io.vertx.core.Promise; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -32,7 +32,7 @@ public void start(Promise startPromise) { var marcBibInstanceHridSetKafkaHandler = new MarcBibInstanceHridSetKafkaHandler(instanceUpdateDelegate, getMappingMetadataCache()); var marcHoldingsRecordHridSetKafkaHandler = new MarcHoldingsRecordHridSetKafkaHandler(holdingsRecordUpdateDelegate, getMappingMetadataCache()); - CompositeFuture.all( + Future.all( marcBibConsumerWrapper.start(marcBibInstanceHridSetKafkaHandler, constructModuleName()), marcHoldingsConsumerWrapper.start(marcHoldingsRecordHridSetKafkaHandler, constructModuleName()) ) diff --git a/src/main/java/org/folio/inventory/common/VertxAssistant.java b/src/main/java/org/folio/inventory/common/VertxAssistant.java index 86802dd9f..3c58a1994 100644 --- a/src/main/java/org/folio/inventory/common/VertxAssistant.java +++ b/src/main/java/org/folio/inventory/common/VertxAssistant.java @@ -35,23 +35,17 @@ public Vertx getVertx() { public void stop() { CompletableFuture stopped = new CompletableFuture<>(); - stop(stopped); - stopped.join(); } public void stop(final CompletableFuture stopped) { - if (vertx != null) { - vertx.close(res -> { - if (res.succeeded()) { - stopped.complete(null); - } else { - stopped.completeExceptionally(res.cause()); - } - } - ); + vertx.close() + .onSuccess(v -> stopped.complete(null)) + .onFailure(stopped::completeExceptionally); + } else { + stopped.complete(null); } } @@ -68,24 +62,17 @@ public void deployVerticle(String verticleClass, long startTime = System.currentTimeMillis(); DeploymentOptions options = new DeploymentOptions(); - options.setConfig(new JsonObject(config)); - options.setWorker(true); + options.setThreadingModel(ThreadingModel.WORKER); options.setInstances(verticleInstancesNumber); - vertx.deployVerticle(verticleClass, options, result -> { - if (result.succeeded()) { + vertx.deployVerticle(verticleClass, options) + .onSuccess(deploymentId -> { long elapsedTime = System.currentTimeMillis() - startTime; - - log.info(String.format( - "%s deployed in %s milliseconds", verticleClass, elapsedTime)); - - deployed.complete(result.result()); - } else { - deployed.completeExceptionally(result.cause()); - } - }); - + log.info(String.format("%s deployed in %s milliseconds", verticleClass, elapsedTime)); + deployed.complete(deploymentId); + }) + .onFailure(deployed::completeExceptionally); } public void deployVerticle(Supplier verticleSupplier, @@ -100,30 +87,20 @@ public void deployVerticle(Supplier verticleSupplier, .setThreadingModel(ThreadingModel.WORKER) .setInstances(verticleInstancesNumber); - vertx.deployVerticle(verticleSupplier, options, result -> { - if (result.succeeded()) { - long elapsedTime = System.currentTimeMillis() - startTime; - - log.info("{} deployed in {} milliseconds", verticleClass, elapsedTime); - - deployed.complete(result.result()); - } else { - deployed.completeExceptionally(result.cause()); - } - }); - + vertx.deployVerticle(verticleSupplier, options) + .onSuccess(result -> { + long elapsedTime = System.currentTimeMillis() - startTime; + log.info("{} deployed in {} milliseconds", verticleClass, elapsedTime); + deployed.complete(result); + }).onFailure(deployed::completeExceptionally); } public void undeployVerticle(String deploymentId, CompletableFuture undeployed) { - vertx.undeploy(deploymentId, result -> { - if (result.succeeded()) { - undeployed.complete(null); - } else { - undeployed.completeExceptionally(result.cause()); - } - }); + vertx.undeploy(deploymentId) + .onSuccess(result -> undeployed.complete(null)) + .onFailure(undeployed::completeExceptionally); } public CompletableFuture undeployVerticle(String deploymentId) { diff --git a/src/main/java/org/folio/inventory/common/WebRequestDiagnostics.java b/src/main/java/org/folio/inventory/common/WebRequestDiagnostics.java index 74a461e28..39d739ece 100644 --- a/src/main/java/org/folio/inventory/common/WebRequestDiagnostics.java +++ b/src/main/java/org/folio/inventory/common/WebRequestDiagnostics.java @@ -15,8 +15,7 @@ private WebRequestDiagnostics() { public static void outputDiagnostics(RoutingContext routingContext) { - log.info(String.format("Handling %s %s", routingContext.request().method().name(), - routingContext.normalisedPath())); + log.info("Handling {} {}", routingContext.request().method().name(), routingContext.normalizedPath()); outputHeaders(routingContext); diff --git a/src/main/java/org/folio/inventory/common/dao/PostgresClientFactory.java b/src/main/java/org/folio/inventory/common/dao/PostgresClientFactory.java index 5e553b387..4c081a26b 100644 --- a/src/main/java/org/folio/inventory/common/dao/PostgresClientFactory.java +++ b/src/main/java/org/folio/inventory/common/dao/PostgresClientFactory.java @@ -3,25 +3,26 @@ import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.pgclient.PgConnectOptions; -import io.vertx.pgclient.PgPool; +import io.vertx.sqlclient.Pool; import io.vertx.sqlclient.PreparedQuery; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.RowSet; import io.vertx.sqlclient.Tuple; -import io.vertx.sqlclient.PoolOptions; import io.vertx.sqlclient.SqlClient; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.folio.inventory.common.dao.PostgresConnectionOptions.convertToPsqlStandard; public class PostgresClientFactory { private static final Logger LOGGER = LogManager.getLogger(PostgresClientFactory.class); - private static final Map POOL_CACHE = new HashMap<>(); + private static final Map POOL_CACHE = new HashMap<>(); /** * Such field is temporary solution which is used to allow resetting the pool in tests. @@ -35,12 +36,12 @@ public PostgresClientFactory(Vertx vertx) { } /** - * Get {@link PgPool}. + * Get {@link Pool}. * * @param tenantId tenant id. * @return pooled database client. */ - public PgPool getCachedPool(String tenantId) { + public Pool getCachedPool(String tenantId) { return getCachedPool(this.vertx, tenantId); } @@ -56,7 +57,7 @@ public Future> execute(String sql, Tuple tuple, String tenantId) { return future.compose(x -> preparedQuery(sql, tenantId).execute(tuple)); } - private PgPool getCachedPool(Vertx vertx, String tenantId) { + private Pool getCachedPool(Vertx vertx, String tenantId) { // assumes a single thread Vert.x model so no synchronized needed if (POOL_CACHE.containsKey(tenantId) && !shouldResetPool) { LOGGER.debug("Using existing database connection pool for tenant {}.", tenantId); @@ -68,9 +69,7 @@ private PgPool getCachedPool(Vertx vertx, String tenantId) { } LOGGER.info("Creating new database connection pool for tenant {}.", tenantId); PgConnectOptions connectOptions = PostgresConnectionOptions.getConnectionOptions(tenantId); - PoolOptions poolOptions = new PoolOptions() - .setMaxSize(PostgresConnectionOptions.getMaxPoolSize()); - PgPool pgPool = PgPool.pool(vertx, connectOptions, poolOptions); + Pool pgPool = Pool.pool(vertx, connectOptions, PostgresConnectionOptions.getPoolOptions()); POOL_CACHE.put(tenantId, pgPool); return pgPool; @@ -83,11 +82,20 @@ private PreparedQuery> preparedQuery(String sql, String tenantId) { } /** - * close all {@link PgPool} clients. + * close all {@link Pool} clients. */ - public static void closeAll() { - POOL_CACHE.values().forEach(SqlClient::close); - POOL_CACHE.clear(); + public static Future closeAll() { + List> closeFutures = POOL_CACHE.values() + .stream() + .map(SqlClient::close) + .collect(Collectors.toList()); + + return Future.all(closeFutures) + .onSuccess(v -> { + POOL_CACHE.clear(); + LOGGER.info("All SQL pools closed and cache cleared."); + }) + .mapEmpty(); } /** diff --git a/src/main/java/org/folio/inventory/common/dao/PostgresConnectionOptions.java b/src/main/java/org/folio/inventory/common/dao/PostgresConnectionOptions.java index 7450c6343..d1acf19fb 100644 --- a/src/main/java/org/folio/inventory/common/dao/PostgresConnectionOptions.java +++ b/src/main/java/org/folio/inventory/common/dao/PostgresConnectionOptions.java @@ -1,16 +1,18 @@ package org.folio.inventory.common.dao; import io.vertx.core.buffer.Buffer; -import io.vertx.core.net.OpenSSLEngineOptions; +import io.vertx.core.net.ClientSSLOptions; import io.vertx.core.net.PemTrustOptions; import io.vertx.pgclient.PgConnectOptions; import io.vertx.pgclient.SslMode; +import io.vertx.sqlclient.PoolOptions; +import lombok.Setter; import org.apache.commons.lang3.StringUtils; import static java.lang.String.format; -import java.util.Collections; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; /** @@ -31,6 +33,13 @@ public class PostgresConnectionOptions { public static final String DB_SERVER_PEM = "DB_SERVER_PEM"; public static final String DB_IDLETIMEOUT = "DB_IDLETIMEOUT"; + /** + * -- SETTER -- + * For test usage only. + * + * @param newSystemProperties Map of system properties to set. + */ + @Setter private static Map systemProperties = System.getenv(); private PostgresConnectionOptions() { @@ -62,23 +71,30 @@ public static PgConnectOptions getConnectionOptions(String tenantId) { if (StringUtils.isNotBlank(getSystemProperty(DB_PASSWORD))) { pgConnectionOptions.setPassword(getSystemProperty(DB_PASSWORD)); } + if (StringUtils.isNotBlank(getSystemProperty(DB_SERVER_PEM))) { pgConnectionOptions.setSslMode(SslMode.VERIFY_FULL); - pgConnectionOptions.setHostnameVerificationAlgorithm("HTTPS"); - pgConnectionOptions.setPemTrustOptions( - new PemTrustOptions().addCertValue(Buffer.buffer(getSystemProperty(DB_SERVER_PEM)))); - pgConnectionOptions.setEnabledSecureTransportProtocols(Collections.singleton("TLSv1.3")); - pgConnectionOptions.setOpenSslEngineOptions(new OpenSSLEngineOptions()); + + ClientSSLOptions sslClientOptions = new ClientSSLOptions() + .setHostnameVerificationAlgorithm("HTTPS") + .setTrustOptions(new PemTrustOptions().addCertValue(Buffer.buffer(getSystemProperty(DB_SERVER_PEM)))) + .setEnabledSecureTransportProtocols(Set.of("TLSv1.3")); + pgConnectionOptions.setSslOptions(sslClientOptions); + //pgConnectionOptions.setOpenSslEngineOptions(new OpenSSLEngineOptions()); } - pgConnectionOptions.setIdleTimeout(Integer.parseInt( - StringUtils.isNotBlank(getSystemProperty(DB_IDLETIMEOUT)) ? getSystemProperty(DB_IDLETIMEOUT) : DEFAULT_IDLE_TIMEOUT)); - pgConnectionOptions.setIdleTimeoutUnit(TimeUnit.MILLISECONDS); if (StringUtils.isNotBlank(tenantId)) { pgConnectionOptions.addProperty(DEFAULT_SCHEMA_PROPERTY, convertToPsqlStandard(tenantId)); } return pgConnectionOptions; } + public static PoolOptions getPoolOptions() { + return new PoolOptions() + .setMaxSize(PostgresConnectionOptions.getMaxPoolSize()) + .setIdleTimeout(Integer.parseInt(StringUtils.isNotBlank(getSystemProperty(DB_IDLETIMEOUT)) ? getSystemProperty(DB_IDLETIMEOUT) : DEFAULT_IDLE_TIMEOUT)) + .setIdleTimeoutUnit(TimeUnit.MILLISECONDS); + } + public static Integer getMaxPoolSize() { return Integer.parseInt(getSystemProperty(DB_MAXPOOLSIZE) != null ? getSystemProperty(DB_MAXPOOLSIZE) : DEFAULT_MAX_POOL_SIZE); } @@ -97,12 +113,4 @@ public static String convertToPsqlStandard(String tenantId) { return format("%s_%s", tenantId.toLowerCase(), MODULE_NAME); } - /** - * For test usage only. - * - * @param newSystemProperties Map of system properties to set. - */ - public static void setSystemProperties(Map newSystemProperties) { - systemProperties = newSystemProperties; - } } diff --git a/src/main/java/org/folio/inventory/consortium/consumers/ConsortiumInstanceSharingHandler.java b/src/main/java/org/folio/inventory/consortium/consumers/ConsortiumInstanceSharingHandler.java index cdb5e0f7a..8b28d4f10 100644 --- a/src/main/java/org/folio/inventory/consortium/consumers/ConsortiumInstanceSharingHandler.java +++ b/src/main/java/org/folio/inventory/consortium/consumers/ConsortiumInstanceSharingHandler.java @@ -236,8 +236,8 @@ private void sendCompleteEventToKafka(SharingInstance sharingInstance, SharingSt producer.send(kafkaRecord) .mapEmpty() - .eventually(v -> producer.flush()) - .eventually(v -> producer.close()) + .eventually(producer::flush) + .eventually(producer::close) .onSuccess(res -> LOGGER.info("Event with type {}, was sent to kafka about sharing instance with InstanceId={}", eventType.value(), sharingInstance.getInstanceIdentifier())) .onFailure(err -> { diff --git a/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java b/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java index 7aff7f64a..7d2afdf5b 100644 --- a/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java +++ b/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java @@ -105,33 +105,23 @@ private Future processEvent(MarcBibUpdate instanceEvent, Map promise = Promise.promise(); - io.vertx.core.Context vertxContext = Vertx.currentContext(); + io.vertx.core.Context vertxContext = Vertx.currentContext(); if(vertxContext == null) { return Future.failedFuture("handle:: operation must be executed by a Vertx thread"); } - vertxContext.owner().executeBlocking( - () -> { - var mappingMetadataDto = - mappingMetadataCache.getByRecordTypeBlocking(jobId, context, MARC_BIB_RECORD_TYPE) - .orElseThrow(() -> new EventProcessingException(format(MAPPING_METADATA_NOT_FOUND_MSG, jobId))); - ensureEventPayloadWithMappingMetadata(metaDataPayload, mappingMetadataDto); - return instanceUpdateDelegate.handleBlocking(metaDataPayload, marcBibRecord, context); - }, - r -> { - if (r.failed()) { - LOGGER.warn("handle:: Error during instance update", r.cause()); - promise.fail(r.cause()); - } else { - LOGGER.debug("handle:: Instance update was successful"); - promise.complete(r.result()); + return vertxContext.owner().executeBlocking( + () -> { + var mappingMetadataDto = + mappingMetadataCache.getByRecordTypeBlocking(jobId, context, MARC_BIB_RECORD_TYPE) + .orElseThrow(() -> new EventProcessingException(format(MAPPING_METADATA_NOT_FOUND_MSG, jobId))); + ensureEventPayloadWithMappingMetadata(metaDataPayload, mappingMetadataDto); + return instanceUpdateDelegate.handleBlocking(metaDataPayload, marcBibRecord, context); } - } - ); - - return promise.future(); + ) + .onSuccess(result -> LOGGER.debug("handle:: Instance update was successful")) + .onFailure(error -> LOGGER.warn("handle:: Error during instance update", error)); } private void processUpdateResult(AsyncResult result, @@ -190,8 +180,8 @@ private void sendEventToKafka(LinkUpdateReport linkUpdateReport, List producer.flush()) - .eventually(() -> producer.close()) + .eventually(producer::flush) + .eventually(producer::close) .onSuccess(res -> LOGGER.info("Event with type {}, jobId {} was sent to kafka", LINKS_STATS.topicName(), linkUpdateReport.getJobId())) .onFailure(err -> { var cause = err.getCause(); diff --git a/src/main/java/org/folio/inventory/dataimport/consumers/QuickMarcKafkaHandler.java b/src/main/java/org/folio/inventory/dataimport/consumers/QuickMarcKafkaHandler.java index dfd52e782..7ad1bc333 100644 --- a/src/main/java/org/folio/inventory/dataimport/consumers/QuickMarcKafkaHandler.java +++ b/src/main/java/org/folio/inventory/dataimport/consumers/QuickMarcKafkaHandler.java @@ -162,8 +162,8 @@ private Future sendEventWithPayload(String eventPayload, String eventTy Promise promise = Promise.promise(); producer.send(producerRecord) .mapEmpty() - .eventually(v -> producer.flush()) - .eventually(v -> producer.close()) + .eventually(producer::flush) + .eventually(producer::close) .onSuccess(res -> { LOGGER.info("Event with type: {} was sent to kafka", eventType); promise.complete(true); diff --git a/src/main/java/org/folio/inventory/dataimport/handlers/actions/CreateHoldingEventHandler.java b/src/main/java/org/folio/inventory/dataimport/handlers/actions/CreateHoldingEventHandler.java index 1aeb2851e..0deef1b01 100644 --- a/src/main/java/org/folio/inventory/dataimport/handlers/actions/CreateHoldingEventHandler.java +++ b/src/main/java/org/folio/inventory/dataimport/handlers/actions/CreateHoldingEventHandler.java @@ -1,11 +1,11 @@ package org.folio.inventory.dataimport.handlers.actions; -import io.vertx.core.CompositeFuture; import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.json.Json; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; +import lombok.Getter; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -37,6 +37,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import static java.lang.String.format; import static org.apache.commons.lang.StringUtils.isNotBlank; @@ -137,7 +138,7 @@ public CompletableFuture handle(DataImportEventPayload d .compose(consortiumConfigurationOptional -> { if (consortiumConfigurationOptional.isPresent()) { return ConsortiumUtil.createShadowInstanceIfNeeded(consortiumService, storage.getInstanceCollection(context), - context, getInstanceId(dataImportEventPayload), consortiumConfigurationOptional.get()) + context, getInstanceId(dataImportEventPayload), consortiumConfigurationOptional.get()) .map(holdingsToCreate); } return Future.succeededFuture(holdingsToCreate); @@ -182,7 +183,7 @@ public boolean isEligible(DataImportEventPayload dataImportEventPayload) { private void prepareEvent(DataImportEventPayload dataImportEventPayload) { dataImportEventPayload.getEventsChain().add(dataImportEventPayload.getEventType()); dataImportEventPayload.getContext().put(HOLDINGS.value(), new JsonArray().encode()); - dataImportEventPayload.setCurrentNode(dataImportEventPayload.getCurrentNode().getChildSnapshotWrappers().get(0)); + dataImportEventPayload.setCurrentNode(dataImportEventPayload.getCurrentNode().getChildSnapshotWrappers().getFirst()); } private void fillInstanceIdIfNeeded(String instanceId, JsonObject holdingAsJson) { @@ -215,46 +216,70 @@ private void fillInstanceId(JsonObject holdingAsJson, String instanceId) { holdingAsJson.put(INSTANCE_ID_FIELD, instanceId); } - private Future> addHoldings(List holdingsList, HashMap payloadContext, Context context) { - Promise> holdingsPromise = Promise.promise(); - List createdHoldingsRecord = new ArrayList<>(); - List errors = new ArrayList<>(); - List createHoldingsRecordFutures = new ArrayList<>(); + private Future> addHoldings(List holdingsList, + HashMap payloadContext, + Context context) { + if (holdingsList.isEmpty()) { + return Future.succeededFuture(new ArrayList<>()); + } HoldingsRecordCollection holdingsRecordCollection = storage.getHoldingsRecordCollection(context); - holdingsList.forEach(holdings -> { - LOGGER.debug(format("addHoldings:: Trying to add holdings with id: %s", holdings.getId())); - Promise createPromise = Promise.promise(); - createHoldingsRecordFutures.add(createPromise.future()); - holdingsRecordCollection.add(holdings, - success -> { - createdHoldingsRecord.add(success.getResult()); - createPromise.complete(); - }, - failure -> { - errors.add(new PartialError(holdings.getId() != null ? holdings.getId() : BLANK, failure.getReason())); - if (isNotBlank(failure.getReason()) && failure.getReason().contains(UNIQUE_ID_ERROR_MESSAGE)) { - LOGGER.info("addHoldings:: Duplicated event received by Holding id: {}. Ignoring...", holdings.getId()); - createPromise.fail(new DuplicateEventException(format("Duplicated event by Holding id: %s", holdings.getId()))); + List> createFutures = holdingsList.stream() + .map(holdings -> createHoldingRecord(holdings, holdingsRecordCollection)) + .collect(Collectors.toList()); + + return Future.join(createFutures) + .compose(joinResult -> { + List createdHoldingsRecord = new ArrayList<>(); + List errors = new ArrayList<>(); + + for (int i = 0; i < joinResult.size(); i++) { + if (joinResult.succeeded(i)) { + createdHoldingsRecord.add(joinResult.resultAt(i)); } else { - LOGGER.warn(format("addHoldings:: Error posting Holdings cause %s, status code %s", failure.getReason(), failure.getStatusCode())); - createPromise.complete(); + Throwable cause = joinResult.cause(i); + if (cause instanceof HoldingsCreationException) { + errors.add(((HoldingsCreationException) cause).getError()); + } } - }); - }); - CompositeFuture.all(createHoldingsRecordFutures).onComplete(ar -> { - if (ar.succeeded()) { + } + String errorsAsStringJson = Json.encode(errors); + payloadContext.put(ERRORS, errorsAsStringJson); if (!createdHoldingsRecord.isEmpty()) { - payloadContext.put(ERRORS, errorsAsStringJson); - holdingsPromise.complete(createdHoldingsRecord); + return Future.succeededFuture(createdHoldingsRecord); } else { - holdingsPromise.fail(errorsAsStringJson); + return Future.failedFuture(errorsAsStringJson); } - } else { - holdingsPromise.fail(ar.cause()); - } - }); - return holdingsPromise.future(); + }); + } + + private Future createHoldingRecord(HoldingsRecord holdings, + HoldingsRecordCollection holdingsRecordCollection) { + Promise promise = Promise.promise(); + holdingsRecordCollection.add(holdings, + success -> promise.complete(success.getResult()), + failure -> { + PartialError error = new PartialError(holdings.getId() != null ? holdings.getId() : BLANK, failure.getReason()); + if (isNotBlank(failure.getReason()) && failure.getReason().contains(UNIQUE_ID_ERROR_MESSAGE)) { + LOGGER.info("addHoldings:: Duplicated event received by Holding id: {}. Ignoring...", holdings.getId()); + promise.fail(new DuplicateEventException(format("Duplicated event by Holding id: %s", holdings.getId()))); + } else { + LOGGER.warn("addHoldings:: Error posting Holdings cause {}, status code {}", failure.getReason(), failure.getStatusCode()); + promise.fail(new HoldingsCreationException(error)); + } + }); + return promise.future(); + } + + @Getter + private static class HoldingsCreationException extends RuntimeException { + private final PartialError error; + + public HoldingsCreationException(PartialError error) { + super(error.getError()); + this.error = error; + } + } } diff --git a/src/main/java/org/folio/inventory/dataimport/handlers/actions/CreateItemEventHandler.java b/src/main/java/org/folio/inventory/dataimport/handlers/actions/CreateItemEventHandler.java index 68324cbea..97570fee6 100644 --- a/src/main/java/org/folio/inventory/dataimport/handlers/actions/CreateItemEventHandler.java +++ b/src/main/java/org/folio/inventory/dataimport/handlers/actions/CreateItemEventHandler.java @@ -1,6 +1,5 @@ package org.folio.inventory.dataimport.handlers.actions; -import io.vertx.core.CompositeFuture; import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.json.Json; @@ -127,7 +126,7 @@ public CompletableFuture handle(DataImportEventPayload d } dataImportEventPayload.getEventsChain().add(dataImportEventPayload.getEventType()); - dataImportEventPayload.setCurrentNode(dataImportEventPayload.getCurrentNode().getChildSnapshotWrappers().get(0)); + dataImportEventPayload.setCurrentNode(dataImportEventPayload.getCurrentNode().getChildSnapshotWrappers().getFirst()); dataImportEventPayload.getContext().put(ITEM.value(), new JsonArray().encode()); String chunkId = dataImportEventPayload.getContext().get(CHUNK_ID_HEADER); @@ -153,7 +152,7 @@ public CompletableFuture handle(DataImportEventPayload d LOGGER.trace(format("handle:: Mapped items: %s", mappedItemList.encode())); Promise> createMultipleItemsPromise = Promise.promise(); List multipleItemsCreateErrors = new ArrayList<>(); - List createItemsFutures = new ArrayList<>(); + List> createItemsFutures = new ArrayList<>(); List createdItems = new ArrayList<>(); mappedItemList.forEach(e -> { @@ -180,7 +179,7 @@ public CompletableFuture handle(DataImportEventPayload d createItemsFutures.add(createItemPromise.future()); }); - CompositeFuture.all(createItemsFutures).onComplete(ar -> { + Future.all(createItemsFutures).onComplete(ar -> { if (payloadContext.containsKey(ERRORS) || !multipleItemsCreateErrors.isEmpty()) { payloadContext.put(ERRORS, Json.encode(multipleItemsCreateErrors)); } diff --git a/src/main/java/org/folio/inventory/dataimport/handlers/actions/UpdateHoldingEventHandler.java b/src/main/java/org/folio/inventory/dataimport/handlers/actions/UpdateHoldingEventHandler.java index a18a02ed1..adc0350fe 100644 --- a/src/main/java/org/folio/inventory/dataimport/handlers/actions/UpdateHoldingEventHandler.java +++ b/src/main/java/org/folio/inventory/dataimport/handlers/actions/UpdateHoldingEventHandler.java @@ -1,6 +1,5 @@ package org.folio.inventory.dataimport.handlers.actions; -import io.vertx.core.CompositeFuture; import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.json.Json; @@ -124,63 +123,101 @@ public CompletableFuture handle(DataImportEventPayload d mappingMetadataCache.get(jobExecutionId, context) .map(parametersOptional -> parametersOptional - .orElseThrow(() -> new EventProcessingException(format(MAPPING_METADATA_NOT_FOUND_MESSAGE, jobExecutionId, - recordId, chunkId)))) + .orElseThrow(() -> new EventProcessingException(format(MAPPING_METADATA_NOT_FOUND_MESSAGE, jobExecutionId, recordId, chunkId)))) .onSuccess(mappingMetadataDto -> { prepareEvent(dataImportEventPayload); MappingParameters mappingParameters = Json.decodeValue(mappingMetadataDto.getMappingParams(), MappingParameters.class); MappingManager.map(dataImportEventPayload, new MappingContext().withMappingParameters(mappingParameters)); List updatedHoldingsRecord = new ArrayList<>(); - List updatedHoldingsRecordFutures = new ArrayList<>(); isPayloadConstructed = false; convertHoldings(dataImportEventPayload); List list = List.of(Json.decodeValue(dataImportEventPayload.getContext().get(HOLDINGS.value()), HoldingsRecord[].class)); LOGGER.trace(format("handle:: Mapped holding: %s", Json.decodeValue(dataImportEventPayload.getContext().get(HOLDINGS.value())))); HoldingsRecordCollection holdingsRecordCollection = storage.getHoldingsRecordCollection(context); List expiredHoldings = new ArrayList<>(); + + List> updateFutures = new ArrayList<>(); for (HoldingsRecord holding : list) { - Promise updatePromise = Promise.promise(); - updatedHoldingsRecordFutures.add(updatePromise.future()); - holdingsRecordCollection.update(holding, - success -> { - try { - LOGGER.info(format("handle:: Successfully updated holdings with id: %s", holding.getId())); - updatedHoldingsRecord.add(holding); - constructDataImportEventPayload(updatePromise, dataImportEventPayload, list, context, errors); - } catch (Exception e) { - LOGGER.warn("handle:: Error updating inventory Holdings jobExecutionId: '{}' recordId: '{}' chunkId: '{}'", jobExecutionId, recordId, chunkId, e); - future.completeExceptionally(e); - } - }, - failure -> { - if (failure.getStatusCode() == HttpStatus.SC_CONFLICT) { - expiredHoldings.add(holding); - } else { - errors.add(new PartialError(holding.getId() != null ? holding.getId() : BLANK, failure.getReason())); - LOGGER.warn("handle:: " + format(CANNOT_UPDATE_HOLDING_ERROR_MESSAGE, holding.getId(), jobExecutionId, recordId, chunkId, failure.getReason(), failure.getStatusCode())); - } - updatePromise.complete(); - }); + updateFutures.add( + holdingsRecordCollection.updateAsync(holding) + .onSuccess(updatedHoldingsRecord::add) + .onFailure(err -> { + if (err instanceof io.vertx.core.eventbus.ReplyException re && re.failureCode() == HttpStatus.SC_CONFLICT) { + expiredHoldings.add(holding); + } else { + errors.add(new PartialError(holding.getId() != null ? holding.getId() : BLANK, err.getMessage())); + LOGGER.warn("handle:: " + format(CANNOT_UPDATE_HOLDING_ERROR_MESSAGE, holding.getId(), jobExecutionId, recordId, chunkId, err.getMessage(), (err instanceof io.vertx.core.eventbus.ReplyException re) ? re.failureCode() : -1)); + } + }) + .recover(e -> Future.succeededFuture()) + ); } - CompositeFuture.all(updatedHoldingsRecordFutures) - .onSuccess(ar -> processResults(dataImportEventPayload, updatedHoldingsRecord, expiredHoldings, future, holdingsRecordCollection, errors)) + + Future.all(updateFutures) + .compose(v -> constructDataImportEventPayloadAfterUpdatesAsync(dataImportEventPayload, list, context, errors)) + .onSuccess(v -> processResults(dataImportEventPayload, updatedHoldingsRecord, expiredHoldings, future, holdingsRecordCollection, errors)) .onFailure(e -> { - LOGGER.warn("handle:: Error in composite future for Holdings update jobExecutionId: '{}' recordId: '{}' chunkId: '{}'", jobExecutionId, recordId, chunkId, e); + LOGGER.warn("handle:: Error in the async processing chain for Holdings update jobExecutionId: '{}' recordId: '{}' chunkId: '{}'", jobExecutionId, recordId, chunkId, e); future.completeExceptionally(e); }); }) .onFailure(e -> { - LOGGER.warn("handle:: Error updating inventory Holdings by jobExecutionId: '{}' recordId: '{}' chunkId: '{}'", jobExecutionId, recordId, chunkId, e); + LOGGER.warn("handle:: Error getting mapping metadata for Holdings update by jobExecutionId: '{}' recordId: '{}' chunkId: '{}'", jobExecutionId, recordId, chunkId, e); future.completeExceptionally(e); }); } catch (Exception e) { - LOGGER.warn("handle:: Failed to update Holdings jobExecutionId: '{}'", jobExecutionId, e); + LOGGER.warn("handle:: A synchronous error occurred for Holdings update jobExecutionId: '{}'", jobExecutionId, e); future.completeExceptionally(e); } return future; } + private Future constructDataImportEventPayloadAfterUpdatesAsync(DataImportEventPayload dataImportEventPayload, List holdings, Context context, List errors) { + if (!dataImportEventPayload.getContext().containsKey(ITEM.value())) { + return Future.succeededFuture(); + } + + HashMap payloadContext = dataImportEventPayload.getContext(); + payloadContext.put(HOLDINGS.value(), Json.encodePrettily(holdings)); + + ItemCollection itemCollection = storage.getItemCollection(context); + return updateDataImportEventPayloadItemAsync(dataImportEventPayload, itemCollection, errors); + } + + private Future updateDataImportEventPayloadItemAsync(DataImportEventPayload dataImportEventPayload, ItemCollection itemCollection, List errors) { + JsonArray oldItemsAsJson = new JsonArray(dataImportEventPayload.getContext().get(ITEM.value())); + JsonArray resultedItemsList = new JsonArray(); + List> updateItemsFutures = new ArrayList<>(); + + for (int i = 0; i < oldItemsAsJson.size(); i++) { + JsonObject singleItemAsJson = getItemFromJson(oldItemsAsJson.getJsonObject(i)); + String itemId = singleItemAsJson.getString(ITEM_ID_HEADER); + Promise p = Promise.promise(); + itemCollection.findById(itemId, findResult -> { + if (Objects.nonNull(findResult)) { + JsonObject itemAsJson = new JsonObject(ItemUtil.mapToMappingResultRepresentation(findResult.getResult())); + resultedItemsList.add(itemAsJson); + } + p.complete(); + }, failure -> { + errors.add(new PartialError(itemId != null ? itemId : BLANK, failure.getReason())); + EventProcessingException processingException = + new EventProcessingException(format(CANNOT_GET_ACTUAL_ITEM_MESSAGE, itemId, failure.getReason(), failure.getStatusCode())); + LOGGER.warn("updateDataImportEventPayloadItem:: " + processingException); + p.complete(); + }); + updateItemsFutures.add(p.future()); + } + + return Future.all(updateItemsFutures) + .map(v -> { + dataImportEventPayload.getContext().put(ITEM.value(), resultedItemsList.encode()); + return null; + }); + } + + private void processResults(DataImportEventPayload dataImportEventPayload, List updatedHoldingsRecord, List expiredHoldings, CompletableFuture future, HoldingsRecordCollection holdingsRecordCollection, List errors) { OlHoldingsAccumulativeResults olAccumulativeResults = buildOLAccumulativeResults(dataImportEventPayload); olAccumulativeResults.getResultedSuccessHoldings().addAll(updatedHoldingsRecord); @@ -255,7 +292,7 @@ private void prepareEvent(DataImportEventPayload dataImportEventPayload) { holdingsJsonArray.set(i, new JsonObject().put(HOLDINGS_PATH_FIELD, holdingAsJson)); } dataImportEventPayload.getContext().put(HOLDINGS.value(), holdingsJsonArray.encode()); - dataImportEventPayload.setCurrentNode(dataImportEventPayload.getCurrentNode().getChildSnapshotWrappers().get(0)); + dataImportEventPayload.setCurrentNode(dataImportEventPayload.getCurrentNode().getChildSnapshotWrappers().getFirst()); } private void processOLError(DataImportEventPayload dataImportEventPayload, CompletableFuture future, HoldingsRecordCollection holdingsRecords, List expiredHoldings, List errors, OlHoldingsAccumulativeResults olAccumulativeResults) { @@ -305,26 +342,13 @@ private void prepareDataAndReInvokeCurrentHandler(DataImportEventPayload dataImp }); } - private void constructDataImportEventPayload(Promise promise, DataImportEventPayload dataImportEventPayload, List holdings, Context context, List errors) { - if (!isPayloadConstructed) { - isPayloadConstructed = true; - HashMap payloadContext = dataImportEventPayload.getContext(); - payloadContext.put(HOLDINGS.value(), Json.encodePrettily(holdings)); - if (payloadContext.containsKey(ITEM.value())) { - ItemCollection itemCollection = storage.getItemCollection(context); - updateDataImportEventPayloadItem(promise, dataImportEventPayload, itemCollection, errors); - } else { - promise.complete(); - } - } else { - promise.complete(); - } - } - - private void updateDataImportEventPayloadItem(Promise promise, DataImportEventPayload dataImportEventPayload, ItemCollection itemCollection, List errors) { + private void updateDataImportEventPayloadItem(Promise promise, + DataImportEventPayload dataImportEventPayload, + ItemCollection itemCollection, + List errors) { JsonArray oldItemsAsJson = new JsonArray(dataImportEventPayload.getContext().get(ITEM.value())); JsonArray resultedItemsList = new JsonArray(); - List updateItemsFutures = new ArrayList<>(); + List> updateItemsFutures = new ArrayList<>(); for (int i = 0; i < oldItemsAsJson.size(); i++) { Promise updateItemPromise = Promise.promise(); @@ -346,7 +370,8 @@ private void updateDataImportEventPayloadItem(Promise promise, DataImportE updateItemPromise.complete(); }); } - CompositeFuture.all(updateItemsFutures) + + Future.all(updateItemsFutures) .onComplete(ar -> { dataImportEventPayload.getContext().put(ITEM.value(), resultedItemsList.encode()); promise.complete(); diff --git a/src/main/java/org/folio/inventory/dataimport/handlers/actions/UpdateItemEventHandler.java b/src/main/java/org/folio/inventory/dataimport/handlers/actions/UpdateItemEventHandler.java index f2d4dd742..6a75a451a 100644 --- a/src/main/java/org/folio/inventory/dataimport/handlers/actions/UpdateItemEventHandler.java +++ b/src/main/java/org/folio/inventory/dataimport/handlers/actions/UpdateItemEventHandler.java @@ -1,7 +1,6 @@ package org.folio.inventory.dataimport.handlers.actions; import com.fasterxml.jackson.core.JsonProcessingException; -import io.vertx.core.CompositeFuture; import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.json.Json; @@ -152,7 +151,7 @@ public CompletableFuture handle(DataImportEventPayload d MappingManager.map(dataImportEventPayload, new MappingContext().withMappingParameters(mappingParameters)); ItemCollection itemCollection = storage.getItemCollection(context); - List updatedItemsRecordFutures = new ArrayList<>(); + List> updatedItemsRecordFutures = new ArrayList<>(); List updatedItemEntities = new ArrayList<>(); List errors = new ArrayList<>(); @@ -200,7 +199,7 @@ public CompletableFuture handle(DataImportEventPayload d }); } } - CompositeFuture.all(updatedItemsRecordFutures).onComplete(ar -> { + Future.all(updatedItemsRecordFutures).onComplete(ar -> { processResults(dataImportEventPayload, updatedItemEntities, expiredItems, future, itemCollection, errors); dataImportEventPayload.getContext().remove(TEMPORARY_MULTIPLE_HOLDINGS_FIELD); }); @@ -333,7 +332,7 @@ private void preparePayloadForMappingManager(DataImportEventPayload dataImportEv } dataImportEventPayload.getContext().put(ITEM.value(), itemsJsonArray.encode()); dataImportEventPayload.getEventsChain().add(dataImportEventPayload.getEventType()); - dataImportEventPayload.setCurrentNode(dataImportEventPayload.getCurrentNode().getChildSnapshotWrappers().get(0)); + dataImportEventPayload.setCurrentNode(dataImportEventPayload.getCurrentNode().getChildSnapshotWrappers().getFirst()); } private List validateItem(JsonObject itemAsJson, List requiredFields) { diff --git a/src/main/java/org/folio/inventory/domain/AsynchronousCollection.java b/src/main/java/org/folio/inventory/domain/AsynchronousCollection.java index dff1db312..68a78bd6f 100644 --- a/src/main/java/org/folio/inventory/domain/AsynchronousCollection.java +++ b/src/main/java/org/folio/inventory/domain/AsynchronousCollection.java @@ -4,6 +4,7 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; +import io.vertx.core.Future; import org.folio.inventory.common.api.request.PagingParameters; import org.folio.inventory.common.domain.Failure; import org.folio.inventory.common.domain.MultipleRecords; @@ -59,6 +60,8 @@ void update(T item, Consumer> completionCallback, Consumer failureCallback); + Future updateAsync(final T item); + default CompletableFuture update(final T item) { final CompletableFuture future = new CompletableFuture<>(); diff --git a/src/main/java/org/folio/inventory/resources/InstancesBatch.java b/src/main/java/org/folio/inventory/resources/InstancesBatch.java index d3b30a2dc..2b00aad8a 100644 --- a/src/main/java/org/folio/inventory/resources/InstancesBatch.java +++ b/src/main/java/org/folio/inventory/resources/InstancesBatch.java @@ -58,7 +58,7 @@ public void register(Router router) { */ private void createBatch(RoutingContext routingContext) { WebContext webContext = new WebContext(routingContext); - JsonObject requestBody = routingContext.getBodyAsJson(); + JsonObject requestBody = routingContext.body().asJsonObject(); JsonArray instanceCollection = requestBody.getJsonArray(BATCH_RESPONSE_FIELD_INSTANCES, new JsonArray()); log.info("Received batch of Instances, size:" + instanceCollection.size()); @@ -204,7 +204,7 @@ private Future updateRelatedRecords(List newInstanc Map mapInstanceById = newInstances.stream() .collect(Collectors.toMap(instance -> instance.getString("id"), Instance::fromJson)); - List updateRelationshipsFutures = new ArrayList<>(); + List> updateRelationshipsFutures = new ArrayList<>(); for (Instance createdInstance : createdInstances) { Instance newInstance = mapInstanceById.get(createdInstance.getId()); if (newInstance != null) { @@ -225,7 +225,7 @@ private Future updateRelatedRecords(List newInstanc }); } } - return CompositeFuture.join(updateRelationshipsFutures); + return Future.join(updateRelationshipsFutures); } catch (IllegalStateException e) { log.error("Can not update instances relationships cause: " + e); return Future.failedFuture(e); diff --git a/src/main/java/org/folio/inventory/services/SharedInstanceEventIdStorageServiceImpl.java b/src/main/java/org/folio/inventory/services/SharedInstanceEventIdStorageServiceImpl.java index f13acf559..40a9beb56 100644 --- a/src/main/java/org/folio/inventory/services/SharedInstanceEventIdStorageServiceImpl.java +++ b/src/main/java/org/folio/inventory/services/SharedInstanceEventIdStorageServiceImpl.java @@ -14,7 +14,7 @@ public class SharedInstanceEventIdStorageServiceImpl implements EventIdStorageSe private static final Logger LOGGER = LogManager.getLogger(SharedInstanceEventIdStorageServiceImpl.class); - private static final String UNIQUE_VIOLATION_SQL_STATE = "23505"; + private static final int UNIQUE_VIOLATION_SQL_STATE = 23505; private final EventIdStorageDao eventIdStorageDao; @@ -31,7 +31,7 @@ public Future store(String eventId, String tenantId) { .onSuccess(promise::complete) .onFailure(error -> { if (error instanceof PgException pgException) { - if (pgException.getCode().equals(UNIQUE_VIOLATION_SQL_STATE)) { + if (pgException.getErrorCode() == UNIQUE_VIOLATION_SQL_STATE) { promise.fail(new DuplicateEventException("SQL Unique constraint violation prevented repeatedly saving the record")); } else { promise.fail(error); diff --git a/src/main/java/org/folio/inventory/storage/external/ExternalStorageModuleCollection.java b/src/main/java/org/folio/inventory/storage/external/ExternalStorageModuleCollection.java index c0de7e4f5..959baba31 100644 --- a/src/main/java/org/folio/inventory/storage/external/ExternalStorageModuleCollection.java +++ b/src/main/java/org/folio/inventory/storage/external/ExternalStorageModuleCollection.java @@ -1,5 +1,6 @@ package org.folio.inventory.storage.external; +import static java.lang.String.format; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.http.HttpHeaders.ACCEPT; @@ -7,7 +8,10 @@ import static org.apache.http.HttpHeaders.LOCATION; import io.vertx.core.AsyncResult; +import io.vertx.core.Future; import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.ReplyException; +import io.vertx.core.eventbus.ReplyFailure; import io.vertx.core.http.HttpClient; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.client.HttpRequest; @@ -27,7 +31,6 @@ import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -73,69 +76,60 @@ abstract class ExternalStorageModuleCollection { protected abstract String getId(T record); - public void add(T item, - Consumer> resultCallback, + public void add(T item, Consumer> resultCallback, Consumer failureCallback) { - - final var futureResponse = new CompletableFuture>>(); - final HttpRequest request = withStandardHeaders(webClient.postAbs(storageAddress)); - - request.sendJsonObject(mapToRequest(item), futureResponse::complete); - - futureResponse - .thenCompose(this::mapAsyncResultToCompletionStage) - .thenAccept(response -> { - if (response.getStatusCode() == 201) { + request + .sendJsonObject(mapToRequest(item)) + .onSuccess(response -> { + if (response.statusCode() == 201) { try { - T created = mapFromJson(response.getJson()); + T created = mapFromJson(response.bodyAsJsonObject()); resultCallback.accept(new Success<>(created)); } catch (Exception e) { LOGGER.error(e); - failureCallback.accept(new Failure(e.getMessage(), response.getStatusCode())); + failureCallback.accept(new Failure(e.getMessage(), response.statusCode())); } } else { - failureCallback.accept(new Failure(response.getBody(), response.getStatusCode())); + failureCallback.accept(new Failure(response.bodyAsString(), response.statusCode())); } - }); - + }) + .onFailure(error -> failureCallback.accept(new Failure(error.getMessage(), 500))); } public void findById(String id, Consumer> resultCallback, Consumer failureCallback) { - final var futureResponse = new CompletableFuture>>(); - final HttpRequest request = withStandardHeaders( webClient.getAbs(individualRecordLocation(id))); - request.send(futureResponse::complete); - - futureResponse - .thenCompose(this::mapAsyncResultToCompletionStage) - .thenAccept(response -> { - switch (response.getStatusCode()) { + request.send() + .onSuccess(response -> { + switch (response.statusCode()) { case 200: - JsonObject instanceFromServer = response.getJson(); - try { + JsonObject instanceFromServer = response.bodyAsJsonObject(); T found = mapFromJson(instanceFromServer); resultCallback.accept(new Success<>(found)); - break; } catch (Exception e) { LOGGER.error(e); failureCallback.accept(new Failure(e.getMessage(), 500)); - break; } + break; case 404: resultCallback.accept(new Success<>(null)); break; default: - failureCallback.accept(new Failure(response.getBody(), response.getStatusCode())); + failureCallback.accept(new Failure(response.bodyAsString(), response.statusCode())); + break; } + }) + .onFailure(error -> { + LOGGER.error("Request to find record by id '{}' failed to send", id, error); + failureCallback.accept(new Failure(error.getMessage(), -1)); }); } @@ -144,7 +138,7 @@ public void findAll( Consumer>> resultCallback, Consumer failureCallback) { - String location = String.format(storageAddress + String location = format(storageAddress + "?limit=%s&offset=%s", pagingParameters.limit, pagingParameters.offset); @@ -171,8 +165,8 @@ public void findByCql(String cqlQuery, String encodedQuery = URLEncoder.encode(cqlQuery, StandardCharsets.UTF_8); String location = - String.format("%s?query=%s", storageAddress, encodedQuery) + - String.format("&limit=%s&offset=%s", pagingParameters.limit, + format("%s?query=%s", storageAddress, encodedQuery) + + format("&limit=%s&offset=%s", pagingParameters.limit, pagingParameters.offset); find(location, resultCallback, failureCallback); @@ -181,16 +175,14 @@ public void findByCql(String cqlQuery, public void retrieveByCqlBody(CQLQueryRequestDto cqlQueryRequestDto, Consumer>> resultCallback, Consumer failureCallback) { - final var futureResponse = new CompletableFuture>>(); - final HttpRequest request = withStandardHeaders(webClient.postAbs(storageAddress + "/retrieve")); - request.sendJsonObject(JsonObject.mapFrom(cqlQueryRequestDto), futureResponse::complete); - - futureResponse - .thenCompose(this::mapAsyncResultToCompletionStage) - .thenAccept(response -> - interpretMultipleRecordResponse(resultCallback, failureCallback, response)); + request.sendJsonObject(JsonObject.mapFrom(cqlQueryRequestDto)) + .onSuccess(response -> interpretMultipleRecordResponse(resultCallback, failureCallback, response)) + .onFailure(error -> { + LOGGER.error("Request to {} failed to send", storageAddress, error); + failureCallback.accept(new Failure(error.getMessage(), -1)); + }); } public void update(T item, @@ -198,17 +190,42 @@ public void update(T item, Consumer failureCallback) { String location = individualRecordLocation(getId(item)); + final HttpRequest request = withStandardHeaders(webClient.putAbs(location)); + request.sendJsonObject(mapToRequest(item)) + .onSuccess(response -> interpretNoContentResponse(response, completionCallback, failureCallback)) + .onFailure(error -> { + LOGGER.error("Request to update record at {} failed to send", location, error); + failureCallback.accept(new Failure(error.getMessage(), -1)); + }); + } - final var futureResponse = new CompletableFuture>>(); - + public Future updateAsync(T item) { + String location = individualRecordLocation(getId(item)); final HttpRequest request = withStandardHeaders(webClient.putAbs(location)); + Future resultFuture = request + .sendJsonObject(mapToRequest(item)) + .compose(response -> { + if (response.statusCode() == 204) { + return Future.succeededFuture(); + } else { + String errorMessage = String.format( + "Failed to update record at %s. Expected 204, got %d: %s", + location, response.statusCode(), response.bodyAsString() + ); + LOGGER.warn(errorMessage); + return Future.failedFuture( + new ReplyException(ReplyFailure.RECIPIENT_FAILURE, response.statusCode(), errorMessage) + ); + } + }); - request.sendJsonObject(mapToRequest(item), futureResponse::complete); + resultFuture.onFailure(error -> { + if (!(error instanceof ReplyException)) { + LOGGER.error("Request to update record at {} failed to send", location, error); + } + }); - futureResponse - .thenCompose(this::mapAsyncResultToCompletionStage) - .thenAccept(response -> - interpretNoContentResponse(response, completionCallback, failureCallback)); + return resultFuture; } public void delete(String id, Consumer> completionCallback, @@ -218,17 +235,7 @@ public void delete(String id, Consumer> completionCallback, } protected String individualRecordLocation(String id) { - return String.format("%s/%s", storageAddress, id); - } - - void includeIfPresent( - JsonObject instanceToSend, - String propertyName, - String propertyValue) { - - if (propertyValue != null) { - instanceToSend.put(propertyName, propertyValue); - } + return format("%s/%s", storageAddress, id); } protected HttpRequest withStandardHeaders(HttpRequest request) { @@ -259,69 +266,64 @@ private Response mapResponse(AsyncResult> asyncResult) { } private void find(String location, - Consumer>> resultCallback, Consumer failureCallback) { - - final var futureResponse = new CompletableFuture>>(); + Consumer>> resultCallback, + Consumer failureCallback) { final HttpRequest request = withStandardHeaders(webClient.getAbs(location)); - - request.send(futureResponse::complete); - - futureResponse - .thenCompose(this::mapAsyncResultToCompletionStage) - .thenAccept(response -> - interpretMultipleRecordResponse(resultCallback, failureCallback, response)); + request.send() + .onSuccess(response -> interpretMultipleRecordResponse(resultCallback, failureCallback, response)) + .onFailure(error -> { + LOGGER.error("Request to find records at {} failed to send", location, error); + failureCallback.accept(new Failure(error.getMessage(), -1)); + }); } private void interpretMultipleRecordResponse( - Consumer>> resultCallback, Consumer failureCallback, - Response response) { + Consumer>> resultCallback, + Consumer failureCallback, + HttpResponse response) { - if (response.getStatusCode() == 200) { + if (response.statusCode() == 200) { try { - JsonObject wrappedRecords = response.getJson(); - + JsonObject wrappedRecords = response.bodyAsJsonObject(); List records = JsonArrayHelper.toList( wrappedRecords.getJsonArray(collectionWrapperPropertyName)); - List foundRecords = records.stream() .map(this::mapFromJson) .collect(Collectors.toList()); - MultipleRecords result = new MultipleRecords<>( foundRecords, wrappedRecords.getInteger("totalRecords")); - resultCallback.accept(new Success<>(result)); } catch (Exception e) { LOGGER.error(e); - failureCallback.accept(new Failure(e.getMessage(), response.getStatusCode())); + failureCallback.accept(new Failure(e.getMessage(), response.statusCode())); } - } else { - failureCallback.accept(new Failure(response.getBody(), response.getStatusCode())); + failureCallback.accept(new Failure(response.bodyAsString(), response.statusCode())); } } - private void deleteLocation(String location, Consumer> completionCallback, + private void deleteLocation(String location, + Consumer> completionCallback, Consumer failureCallback) { - final var futureResponse = new CompletableFuture>>(); - final HttpRequest request = withStandardHeaders(webClient.deleteAbs(location)); - - request.send(futureResponse::complete); - - futureResponse - .thenCompose(this::mapAsyncResultToCompletionStage) - .thenAccept(response -> - interpretNoContentResponse(response, completionCallback, failureCallback)); + request.send() + .onSuccess(response -> interpretNoContentResponse(response, completionCallback, failureCallback)) + .onFailure(error -> { + LOGGER.error("Request to delete record at {} failed to send", location, error); + failureCallback.accept(new Failure(error.getMessage(), -1)); + }); } - private void interpretNoContentResponse(Response response, Consumer> completionCallback, Consumer failureCallback) { - if (response.getStatusCode() == 204) { + private void interpretNoContentResponse(HttpResponse response, + Consumer> completionCallback, + Consumer failureCallback) { + if (response.statusCode() == 204) { completionCallback.accept(new Success<>(null)); } else { - failureCallback.accept(new Failure(response.getBody(), response.getStatusCode())); + failureCallback.accept(new Failure(response.bodyAsString(), response.statusCode())); } } + } diff --git a/src/main/java/org/folio/inventory/storage/external/ExternalStorageModuleInstanceCollection.java b/src/main/java/org/folio/inventory/storage/external/ExternalStorageModuleInstanceCollection.java index 8d77aa317..2ab0c39a3 100644 --- a/src/main/java/org/folio/inventory/storage/external/ExternalStorageModuleInstanceCollection.java +++ b/src/main/java/org/folio/inventory/storage/external/ExternalStorageModuleInstanceCollection.java @@ -5,18 +5,15 @@ import static org.apache.http.HttpStatus.SC_INTERNAL_SERVER_ERROR; import static org.folio.inventory.support.http.ContentType.APPLICATION_JSON; -import io.vertx.core.AsyncResult; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpClient; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.client.HttpRequest; -import io.vertx.ext.web.client.HttpResponse; import java.net.MalformedURLException; -import java.net.URL; +import java.net.URI; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import org.apache.http.HttpStatus; import org.apache.logging.log4j.LogManager; @@ -87,37 +84,44 @@ public void addBatch(List items, .put("instances", new JsonArray(jsonList)) .put("totalRecords", jsonList.size()); - final var futureResponse = new CompletableFuture>>(); - final HttpRequest request = withStandardHeaders(webClient.postAbs(batchAddress)); + request.sendJsonObject(batchRequest) + .onSuccess(httpResponse -> { + Response response = new Response(httpResponse.statusCode(), httpResponse.bodyAsString(), + httpResponse.getHeader("Content-Type"), httpResponse.getHeader("Location")); - request.sendJsonObject(batchRequest, futureResponse::complete); - - futureResponse - .thenCompose(this::mapAsyncResultToCompletionStage) - .thenAccept(response -> { if (isBatchResponse(response)) { try { JsonObject batchResponse = response.getJson(); JsonArray createdInstances = batchResponse.getJsonArray("instances"); + JsonArray errorMessagesArray = batchResponse.getJsonArray("errorMessages"); List instancesList = new ArrayList<>(); for (int i = 0; i < createdInstances.size(); i++) { instancesList.add(mapFromJson(createdInstances.getJsonObject(i))); } + + List errorMessages = errorMessagesArray.stream() + .filter(String.class::isInstance) + .map(String.class::cast) + .toList(); + BatchResult batchResult = new BatchResult<>(); batchResult.setBatchItems(instancesList); - batchResult.setErrorMessages(batchResponse.getJsonArray("errorMessages").getList()); + batchResult.setErrorMessages(errorMessages); resultCallback.accept(new Success<>(batchResult)); } catch (Exception e) { - LOGGER.error(e); + LOGGER.error("Failed to parse successful batch response", e); failureCallback.accept(new Failure(e.getMessage(), response.getStatusCode())); } - } else { failureCallback.accept(new Failure(response.getBody(), response.getStatusCode())); } + }) + .onFailure(error -> { + LOGGER.error("Request for batch add failed to send", error); + failureCallback.accept(new Failure(error.getMessage(), -1)); }); } @@ -175,7 +179,7 @@ private Instance modifyInstance(Instance existingInstance, JsonObject instance) private SynchronousHttpClient getSynchronousHttpClient(Context context) throws MalformedURLException { if (httpClient == null) { - httpClient = new SynchronousHttpClient(new URL(context.getOkapiLocation()), tenant, token, context.getUserId(), null, null); + httpClient = new SynchronousHttpClient(URI.create(context.getOkapiLocation()).toURL(), tenant, token, context.getUserId(), null, null); } return httpClient; diff --git a/src/main/java/org/folio/inventory/support/http/client/OkapiHttpClient.java b/src/main/java/org/folio/inventory/support/http/client/OkapiHttpClient.java index 172a9847d..2f7f1c0f6 100644 --- a/src/main/java/org/folio/inventory/support/http/client/OkapiHttpClient.java +++ b/src/main/java/org/folio/inventory/support/http/client/OkapiHttpClient.java @@ -1,7 +1,5 @@ package org.folio.inventory.support.http.client; -import static java.util.concurrent.CompletableFuture.completedFuture; -import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.http.HttpHeaders.CONTENT_TYPE; import static org.apache.http.HttpHeaders.LOCATION; @@ -9,10 +7,8 @@ import java.net.URL; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Consumer; -import io.vertx.core.AsyncResult; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.json.JsonObject; @@ -76,14 +72,10 @@ public CompletionStage post(URL url, JsonObject body) { } public CompletionStage post(String url, JsonObject body) { - final var futureResponse = new CompletableFuture>>(); - final HttpRequest request = withStandardHeaders(webClient.postAbs(url)); - - request.sendJsonObject(body, futureResponse::complete); - - return futureResponse - .thenCompose(OkapiHttpClient::mapAsyncResultToCompletionStage); + return request.sendJsonObject(body) + .map(OkapiHttpClient::mapResponse) + .toCompletionStage(); } public CompletionStage post(URL url, String body) { @@ -91,37 +83,26 @@ public CompletionStage post(URL url, String body) { } public CompletionStage post(String url, String body) { - final var futureResponse = new CompletableFuture>>(); - final HttpRequest request = withStandardHeaders(webClient.postAbs(url)); - final var buffer = body != null ? Buffer.buffer(body) : Buffer.buffer(); - - request.sendBuffer(buffer, futureResponse::complete); - - return futureResponse - .thenCompose(OkapiHttpClient::mapAsyncResultToCompletionStage); + return request.sendBuffer(buffer) + .map(OkapiHttpClient::mapResponse) + .toCompletionStage(); } public CompletionStage post(String url, String body, Map headers) { - final var futureResponse = new CompletableFuture>>(); - final HttpRequest request = withStandardHeaders(webClient.postAbs(url)); - for (Map.Entry headerEntry : headers.entrySet()) { request.putHeader(headerEntry.getKey(), headerEntry.getValue()); } - final var buffer = body != null ? Buffer.buffer(body) : Buffer.buffer(); - - request.sendBuffer(buffer, futureResponse::complete); - - return futureResponse - .thenCompose(OkapiHttpClient::mapAsyncResultToCompletionStage); + return request.sendBuffer(buffer) + .map(OkapiHttpClient::mapResponse) + .toCompletionStage(); } public CompletionStage put(URL url, JsonObject body) { @@ -129,14 +110,10 @@ public CompletionStage put(URL url, JsonObject body) { } public CompletionStage put(String url, JsonObject body) { - final var futureResponse = new CompletableFuture>>(); - final HttpRequest request = withStandardHeaders(webClient.putAbs(url)); - - request.sendJsonObject(body, futureResponse::complete); - - return futureResponse - .thenCompose(OkapiHttpClient::mapAsyncResultToCompletionStage); + return request.sendJsonObject(body) + .map(OkapiHttpClient::mapResponse) + .toCompletionStage(); } public CompletionStage get(URL url) { @@ -144,26 +121,18 @@ public CompletionStage get(URL url) { } public CompletionStage get(String url) { - final var futureResponse = new CompletableFuture>>(); - final HttpRequest request = withStandardHeaders(webClient.getAbs(url)); - - request.send(futureResponse::complete); - - return futureResponse - .thenCompose(OkapiHttpClient::mapAsyncResultToCompletionStage); + return request.send() + .map(OkapiHttpClient::mapResponse) + .toCompletionStage(); } public CompletionStage get(String url, Map params) { - final var futureResponse = new CompletableFuture>>(); - final HttpRequest request = withStandardHeaders(webClient.getAbs(url)); params.forEach(request::addQueryParam); - - request.send(futureResponse::complete); - - return futureResponse - .thenCompose(OkapiHttpClient::mapAsyncResultToCompletionStage); + return request.send() + .map(OkapiHttpClient::mapResponse) + .toCompletionStage(); } public CompletionStage delete(URL url) { @@ -171,14 +140,10 @@ public CompletionStage delete(URL url) { } public CompletionStage delete(String url) { - final var futureResponse = new CompletableFuture>>(); - final HttpRequest request = withStandardHeaders(webClient.deleteAbs(url)); - - request.send(futureResponse::complete); - - return futureResponse - .thenCompose(OkapiHttpClient::mapAsyncResultToCompletionStage); + return request.send() + .map(OkapiHttpClient::mapResponse) + .toCompletionStage(); } private HttpRequest withStandardHeaders(HttpRequest request) { @@ -186,18 +151,8 @@ private HttpRequest withStandardHeaders(HttpRequest request) { return request; } - private static CompletionStage mapAsyncResultToCompletionStage( - AsyncResult> asyncResult) { - - return asyncResult.succeeded() - ? completedFuture(mapResponse(asyncResult)) - : failedFuture(asyncResult.cause()); - } - - private static Response mapResponse(AsyncResult> asyncResult) { - final var response = asyncResult.result(); - - return new Response(response.statusCode(), response.bodyAsString(), - response.getHeader(CONTENT_TYPE), response.getHeader(LOCATION)); + private static Response mapResponse(HttpResponse httpResponse) { + return new Response(httpResponse.statusCode(), httpResponse.bodyAsString(), + httpResponse.getHeader(CONTENT_TYPE), httpResponse.getHeader(LOCATION)); } } diff --git a/src/test/java/org/folio/inventory/CancelledJobExecutionConsumerVerticleTest.java b/src/test/java/org/folio/inventory/CancelledJobExecutionConsumerVerticleTest.java index d042bbff8..e14a01cdf 100644 --- a/src/test/java/org/folio/inventory/CancelledJobExecutionConsumerVerticleTest.java +++ b/src/test/java/org/folio/inventory/CancelledJobExecutionConsumerVerticleTest.java @@ -1,7 +1,6 @@ package org.folio.inventory; import io.vertx.core.Future; -import io.vertx.core.Promise; import io.vertx.core.json.Json; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; @@ -102,10 +101,7 @@ private Future deployVerticle(CancelledJobsIdsCache cancelledJobsIdsCach } private Future undeployVerticle() { - Promise promise = Promise.promise(); - vertxAssistant.getVertx().undeploy(verticleDeploymentId, promise); - - return promise.future(); + return vertxAssistant.getVertx().undeploy(verticleDeploymentId); } private List generateJobIds(int idsNumber) { diff --git a/src/test/java/org/folio/inventory/KafkaUtility.java b/src/test/java/org/folio/inventory/KafkaUtility.java index d4950ceb4..36770d82f 100644 --- a/src/test/java/org/folio/inventory/KafkaUtility.java +++ b/src/test/java/org/folio/inventory/KafkaUtility.java @@ -1,6 +1,6 @@ package org.folio.inventory; -import liquibase.repackaged.org.apache.commons.collections4.IteratorUtils; +import org.apache.commons.collections4.IteratorUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; diff --git a/src/test/java/org/folio/inventory/consortium/consumers/ConsortiumInstanceSharingConsumerVerticleTest.java b/src/test/java/org/folio/inventory/consortium/consumers/ConsortiumInstanceSharingConsumerVerticleTest.java index 222840852..f0b31a995 100644 --- a/src/test/java/org/folio/inventory/consortium/consumers/ConsortiumInstanceSharingConsumerVerticleTest.java +++ b/src/test/java/org/folio/inventory/consortium/consumers/ConsortiumInstanceSharingConsumerVerticleTest.java @@ -1,6 +1,5 @@ package org.folio.inventory.consortium.consumers; -import io.vertx.core.Promise; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; @@ -9,19 +8,17 @@ import org.junit.Test; import org.junit.runner.RunWith; - @RunWith(VertxUnitRunner.class) public class ConsortiumInstanceSharingConsumerVerticleTest extends KafkaTest { @Test public void shouldDeployVerticle(TestContext context) { + Async async = context.async(); - Promise promise = Promise.promise(); vertxAssistant.getVertx() - .deployVerticle(ConsortiumInstanceSharingConsumerVerticle.class.getName(), deploymentOptions, promise); - - promise.future().onComplete(ar -> { - context.assertTrue(ar.succeeded()); - async.complete(); - }); + .deployVerticle(ConsortiumInstanceSharingConsumerVerticle.class.getName(), deploymentOptions) + .onComplete(ar -> { + context.assertTrue(ar.succeeded()); + async.complete(); + }); } } diff --git a/src/test/java/org/folio/inventory/consortium/handlers/MarcInstanceSharingHandlerImplTest.java b/src/test/java/org/folio/inventory/consortium/handlers/MarcInstanceSharingHandlerImplTest.java index a969e46c2..3eba578d7 100644 --- a/src/test/java/org/folio/inventory/consortium/handlers/MarcInstanceSharingHandlerImplTest.java +++ b/src/test/java/org/folio/inventory/consortium/handlers/MarcInstanceSharingHandlerImplTest.java @@ -1,5 +1,6 @@ package org.folio.inventory.consortium.handlers; +import static io.vertx.core.buffer.Buffer.buffer; import static org.folio.HttpStatus.HTTP_INTERNAL_SERVER_ERROR; import static org.folio.HttpStatus.HTTP_NO_CONTENT; import static org.folio.HttpStatus.HTTP_OK; @@ -24,7 +25,6 @@ import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; -import io.vertx.core.buffer.impl.BufferImpl; import io.vertx.core.http.HttpClient; import io.vertx.core.json.Json; import io.vertx.core.json.JsonArray; @@ -153,7 +153,7 @@ private void setupMarcHandler() { setField(marcHandler, "entitiesLinksService", entitiesLinksService); doReturn(sourceStorageClient).when(marcHandler).getSourceStorageRecordsClient(anyString(), eq(kafkaHeaders)); - var recordWithLinkedAuthorities = buildHttpResponseWithBuffer(BufferImpl.buffer(RECORD_JSON_WITH_LINKED_AUTHORITIES), HttpStatus.HTTP_OK) + var recordWithLinkedAuthorities = buildHttpResponseWithBuffer(Buffer.buffer(RECORD_JSON_WITH_LINKED_AUTHORITIES), HttpStatus.HTTP_OK) .bodyAsJson(Record.class); bibRecord = sourceStorageRecordsResponseBuffer.bodyAsJson(Record.class); doReturn(Future.succeededFuture(recordWithLinkedAuthorities)).when(marcHandler).getSourceMARCByInstanceId(any(), any(), any()); @@ -161,7 +161,7 @@ private void setupMarcHandler() { } private final HttpResponse sourceStorageRecordsResponseBuffer = - buildHttpResponseWithBuffer(BufferImpl.buffer(RECORD_JSON), HttpStatus.HTTP_OK); + buildHttpResponseWithBuffer(Buffer.buffer(RECORD_JSON), HttpStatus.HTTP_OK); @Test public void publishInstanceTest(TestContext testContext) { @@ -251,7 +251,7 @@ public void shouldSharedLocalInstanceAndUnlinkLocalAuthorityLinks(TestContext te when(instanceOperationsHelper.getInstanceById(any(), any())).thenReturn(Future.succeededFuture(instance)); when(entitiesLinksService.putInstanceAuthorityLinks(any(), any(), any())).thenReturn(Future.succeededFuture()); - var recordWithLinkedAuthorities = buildHttpResponseWithBuffer(BufferImpl.buffer(RECORD_JSON_WITH_LOCAL_LINKED_AUTHORITIES), HttpStatus.HTTP_OK) + var recordWithLinkedAuthorities = buildHttpResponseWithBuffer(buffer(RECORD_JSON_WITH_LOCAL_LINKED_AUTHORITIES), HttpStatus.HTTP_OK) .bodyAsJson(Record.class); doReturn(Future.succeededFuture(recordWithLinkedAuthorities)).when(marcHandler).getSourceMARCByInstanceId(any(), any(), any()); diff --git a/src/test/java/org/folio/inventory/consortium/util/RestDataImportHelperTest.java b/src/test/java/org/folio/inventory/consortium/util/RestDataImportHelperTest.java index f5d315480..874861277 100644 --- a/src/test/java/org/folio/inventory/consortium/util/RestDataImportHelperTest.java +++ b/src/test/java/org/folio/inventory/consortium/util/RestDataImportHelperTest.java @@ -5,7 +5,6 @@ import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; -import io.vertx.core.buffer.impl.BufferImpl; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.ext.unit.junit.VertxUnitRunner; @@ -24,6 +23,7 @@ import java.util.Map; import java.util.UUID; +import static io.vertx.core.buffer.Buffer.buffer; import static org.folio.inventory.TestUtil.buildHttpResponseWithBuffer; import static org.folio.inventory.consortium.util.RestDataImportHelper.FIELD_JOB_EXECUTIONS; import static org.folio.inventory.consortium.util.RestDataImportHelper.STATUS_COMMITTED; @@ -63,7 +63,7 @@ public void initJobExecutionTest() { .put(FIELD_JOB_EXECUTIONS, new JsonArray().add(new JsonObject().put("id", expectedJobExecutionId))); HttpResponseImpl jobExecutionResponse = - buildHttpResponseWithBuffer(BufferImpl.buffer(responseBody.encode()), HttpStatus.HTTP_CREATED); + buildHttpResponseWithBuffer(buffer(responseBody.encode()), HttpStatus.HTTP_CREATED); Future> futureResponse = Future.succeededFuture(jobExecutionResponse); doAnswer(invocation -> { @@ -115,7 +115,7 @@ public void initJobExecutionFailedWithoutJobExecutionsArrayTest() { JsonObject responseBody = new JsonObject().put("jobExecutions", new JsonArray().add("")); HttpResponseImpl jobExecutionResponse = - buildHttpResponseWithBuffer(BufferImpl.buffer(responseBody.encode()), HttpStatus.HTTP_CREATED); + buildHttpResponseWithBuffer(Buffer.buffer(responseBody.encode()), HttpStatus.HTTP_CREATED); Future> futureResponse = Future.succeededFuture(jobExecutionResponse); doAnswer(invocation -> { @@ -140,7 +140,7 @@ public void initJobExecutionFailedWithJobExecutionsEmptyArrayTest() { String expectedJobExecutionId = UUID.randomUUID().toString(); Map kafkaHeaders = new HashMap<>(); HttpResponseImpl jobExecutionResponse = - buildHttpResponseWithBuffer(BufferImpl.buffer("{\"jobExecutions\":[]}"), HttpStatus.HTTP_CREATED); + buildHttpResponseWithBuffer(Buffer.buffer("{\"jobExecutions\":[]}"), HttpStatus.HTTP_CREATED); Future> futureResponse = Future.succeededFuture(jobExecutionResponse); doAnswer(invocation -> { @@ -252,7 +252,7 @@ public void getJobExecutionStatusByJobExecutionId() { String expectedJobExecutionId = UUID.randomUUID().toString(); HttpResponseImpl jobExecutionResponse = - buildHttpResponseWithBuffer(BufferImpl.buffer("{\"status\":\"" + STATUS_COMMITTED + "\"}"), HttpStatus.HTTP_OK); + buildHttpResponseWithBuffer(Buffer.buffer("{\"status\":\"" + STATUS_COMMITTED + "\"}"), HttpStatus.HTTP_OK); Future> futureResponse = Future.succeededFuture(jobExecutionResponse); doAnswer(invocation -> { diff --git a/src/test/java/org/folio/inventory/dao/PostgresClientFactoryTest.java b/src/test/java/org/folio/inventory/dao/PostgresClientFactoryTest.java index 3ac6496a6..b44ac0339 100644 --- a/src/test/java/org/folio/inventory/dao/PostgresClientFactoryTest.java +++ b/src/test/java/org/folio/inventory/dao/PostgresClientFactoryTest.java @@ -5,7 +5,7 @@ import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; import io.vertx.pgclient.PgConnectOptions; -import io.vertx.pgclient.PgPool; +import io.vertx.sqlclient.Pool; import io.vertx.pgclient.SslMode; import org.folio.inventory.common.dao.PostgresClientFactory; import org.folio.inventory.common.dao.PostgresConnectionOptions; @@ -49,9 +49,10 @@ public static void setUp() { @AfterClass public static void tearDown(TestContext context) { Async async = context.async(); - vertx.close(context.asyncAssertSuccess(res -> { - async.complete(); - })); + vertx.close() + .onComplete(context.asyncAssertSuccess(res -> { + async.complete(); + })); PgPoolContainer.setEmbeddedPostgresOptions(); PostgresClientFactory.closeAll(); } @@ -60,7 +61,7 @@ public static void tearDown(TestContext context) { public void shouldCreateCachedPool() { PostgresClientFactory postgresClientFactory = new PostgresClientFactory(vertx); - PgPool cachedPool = postgresClientFactory.getCachedPool(TENANT_ID); + Pool cachedPool = postgresClientFactory.getCachedPool(TENANT_ID); assertNotNull(cachedPool); } @@ -69,8 +70,8 @@ public void shouldCreateCachedPool() { public void shouldReturnPgPoolFromCache() { PostgresClientFactory postgresClientFactory = new PostgresClientFactory(vertx); - PgPool cachedPool = postgresClientFactory.getCachedPool(TENANT_ID); - PgPool poolFromCache = postgresClientFactory.getCachedPool(TENANT_ID); + Pool cachedPool = postgresClientFactory.getCachedPool(TENANT_ID); + Pool poolFromCache = postgresClientFactory.getCachedPool(TENANT_ID); assertNotNull(cachedPool); assertNotNull(poolFromCache); assertEquals(cachedPool, poolFromCache); @@ -80,9 +81,9 @@ public void shouldReturnPgPoolFromCache() { public void shouldResetPgPoolCache() { PostgresClientFactory postgresClientFactory = new PostgresClientFactory(vertx); - PgPool cachedPool = postgresClientFactory.getCachedPool(TENANT_ID); + Pool cachedPool = postgresClientFactory.getCachedPool(TENANT_ID); postgresClientFactory.setShouldResetPool(true); - PgPool poolFromCache = postgresClientFactory.getCachedPool(TENANT_ID); + Pool poolFromCache = postgresClientFactory.getCachedPool(TENANT_ID); assertNotNull(cachedPool); assertNotNull(poolFromCache); assertNotEquals(cachedPool, poolFromCache); @@ -123,13 +124,12 @@ public void shouldReturnInitializedConnectionOptions() { assertEquals("test", pgConnectOpts.getUser()); assertEquals("test", pgConnectOpts.getPassword()); assertEquals("test", pgConnectOpts.getDatabase()); - assertEquals(60000, pgConnectOpts.getIdleTimeout()); + //assertEquals(60000, pgConnectOpts.getIdleTimeout()); assertEquals(SslMode.VERIFY_FULL, pgConnectOpts.getSslMode()); - assertEquals("HTTPS", pgConnectOpts.getHostnameVerificationAlgorithm()); + assertEquals("HTTPS", pgConnectOpts.getSslOptions().getHostnameVerificationAlgorithm()); assertEquals(MAX_POOL_SIZE, PostgresConnectionOptions.getMaxPoolSize()); - assertNotNull(pgConnectOpts.getPemTrustOptions()); - assertEquals(expectedEnabledSecureTransportProtocols, pgConnectOpts.getEnabledSecureTransportProtocols()); - assertNotNull(pgConnectOpts.getOpenSslEngineOptions()); + assertNotNull(pgConnectOpts.getSslOptions().getTrustOptions()); + assertEquals(expectedEnabledSecureTransportProtocols, pgConnectOpts.getSslOptions().getEnabledSecureTransportProtocols()); PostgresConnectionOptions.setSystemProperties(new HashMap<>()); } diff --git a/src/test/java/org/folio/inventory/dataimport/consumers/DataImportConsumerVerticleTest.java b/src/test/java/org/folio/inventory/dataimport/consumers/DataImportConsumerVerticleTest.java index c232bc243..9e3ec4d83 100644 --- a/src/test/java/org/folio/inventory/dataimport/consumers/DataImportConsumerVerticleTest.java +++ b/src/test/java/org/folio/inventory/dataimport/consumers/DataImportConsumerVerticleTest.java @@ -107,8 +107,9 @@ public class DataImportConsumerVerticleTest extends KafkaTest { public static void setUpClass(TestContext context) { EventManager.registerKafkaEventPublisher(kafkaConfig, vertxAssistant.getVertx(), 1); CancelledJobsIdsCache cancelledJobsIdsCache = new CancelledJobsIdsCache(); - vertxAssistant.getVertx().deployVerticle(() -> new DataImportConsumerVerticle(cancelledJobsIdsCache), - deploymentOptions, context.asyncAssertSuccess()); + vertxAssistant.getVertx() + .deployVerticle(() -> new DataImportConsumerVerticle(cancelledJobsIdsCache), deploymentOptions) + .onComplete(context.asyncAssertSuccess()); } @Before @@ -117,7 +118,7 @@ public void setUp() { when(mockedEventHandler.isEligible(any(DataImportEventPayload.class))).thenReturn(true); doAnswer(invocationOnMock -> { DataImportEventPayload eventPayload = invocationOnMock.getArgument(0); - eventPayload.setCurrentNode(eventPayload.getCurrentNode().getChildSnapshotWrappers().get(0)); + eventPayload.setCurrentNode(eventPayload.getCurrentNode().getChildSnapshotWrappers().getFirst()); return CompletableFuture.completedFuture(eventPayload); }).when(mockedEventHandler).handle(any(DataImportEventPayload.class)); diff --git a/src/test/java/org/folio/inventory/dataimport/consumers/InstanceIngressConsumerVerticleTest.java b/src/test/java/org/folio/inventory/dataimport/consumers/InstanceIngressConsumerVerticleTest.java index 500d95089..46e3ce7a6 100644 --- a/src/test/java/org/folio/inventory/dataimport/consumers/InstanceIngressConsumerVerticleTest.java +++ b/src/test/java/org/folio/inventory/dataimport/consumers/InstanceIngressConsumerVerticleTest.java @@ -1,6 +1,5 @@ package org.folio.inventory.dataimport.consumers; -import io.vertx.core.Promise; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; @@ -11,15 +10,15 @@ @RunWith(VertxUnitRunner.class) public class InstanceIngressConsumerVerticleTest extends KafkaTest { + @Test public void shouldDeployVerticle(TestContext context) { Async async = context.async(); - Promise promise = Promise.promise(); - vertxAssistant.getVertx().deployVerticle(InstanceIngressConsumerVerticle.class.getName(), deploymentOptions, promise); - - promise.future().onComplete(ar -> { - context.assertTrue(ar.succeeded()); - async.complete(); - }); + vertxAssistant.getVertx() + .deployVerticle(InstanceIngressConsumerVerticle.class.getName(), deploymentOptions) + .onComplete(ar -> { + context.assertTrue(ar.succeeded()); + async.complete(); + }); } } diff --git a/src/test/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateConsumerVerticleTest.java b/src/test/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateConsumerVerticleTest.java index e226bb41e..bb7ba37ce 100644 --- a/src/test/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateConsumerVerticleTest.java +++ b/src/test/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateConsumerVerticleTest.java @@ -11,16 +11,15 @@ @RunWith(VertxUnitRunner.class) public class MarcBibUpdateConsumerVerticleTest extends KafkaTest { + @Test public void shouldDeployVerticle(TestContext context) { Async async = context.async(); - - Promise promise = Promise.promise(); - vertxAssistant.getVertx().deployVerticle(MarcBibUpdateConsumerVerticle.class.getName(), deploymentOptions, promise); - - promise.future().onComplete(ar -> { - context.assertTrue(ar.succeeded()); - async.complete(); - }); + vertxAssistant.getVertx() + .deployVerticle(MarcBibUpdateConsumerVerticle.class.getName(), deploymentOptions) + .onComplete(ar -> { + context.assertTrue(ar.succeeded()); + async.complete(); + }); } } diff --git a/src/test/java/org/folio/inventory/dataimport/consumers/MarcHridSetConsumerVerticleTest.java b/src/test/java/org/folio/inventory/dataimport/consumers/MarcHridSetConsumerVerticleTest.java index 29235c845..140be2218 100644 --- a/src/test/java/org/folio/inventory/dataimport/consumers/MarcHridSetConsumerVerticleTest.java +++ b/src/test/java/org/folio/inventory/dataimport/consumers/MarcHridSetConsumerVerticleTest.java @@ -1,6 +1,5 @@ package org.folio.inventory.dataimport.consumers; -import io.vertx.core.Promise; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; @@ -11,15 +10,15 @@ @RunWith(VertxUnitRunner.class) public class MarcHridSetConsumerVerticleTest extends KafkaTest { + @Test public void shouldDeployVerticle(TestContext context) { Async async = context.async(); - Promise promise = Promise.promise(); - vertxAssistant.getVertx().deployVerticle(MarcHridSetConsumerVerticle.class.getName(), deploymentOptions, promise); - - promise.future().onComplete(ar -> { - context.assertTrue(ar.succeeded()); - async.complete(); - }); + vertxAssistant.getVertx() + .deployVerticle(MarcHridSetConsumerVerticle.class.getName(), deploymentOptions) + .onComplete(ar -> { + context.assertTrue(ar.succeeded()); + async.complete(); + }); } } diff --git a/src/test/java/org/folio/inventory/dataimport/handlers/actions/CreateInstanceEventHandlerTest.java b/src/test/java/org/folio/inventory/dataimport/handlers/actions/CreateInstanceEventHandlerTest.java index cbc458d0b..7f532ea65 100644 --- a/src/test/java/org/folio/inventory/dataimport/handlers/actions/CreateInstanceEventHandlerTest.java +++ b/src/test/java/org/folio/inventory/dataimport/handlers/actions/CreateInstanceEventHandlerTest.java @@ -11,7 +11,6 @@ import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; -import io.vertx.core.buffer.impl.BufferImpl; import io.vertx.core.http.HttpClient; import io.vertx.core.json.Json; import io.vertx.core.json.JsonObject; @@ -84,6 +83,7 @@ import java.util.function.Consumer; import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static io.vertx.core.buffer.Buffer.buffer; import static java.lang.String.format; import static java.util.concurrent.CompletableFuture.completedStage; import static org.apache.http.HttpStatus.SC_CREATED; @@ -378,7 +378,7 @@ public void shouldProcessEvent(String content, String acceptInstanceId) throws I context.put(PAYLOAD_USER_ID, USER_ID); context.put(OKAPI_REQUEST_ID, REQUEST_ID); - Buffer buffer = BufferImpl.buffer("{\"parsedRecord\":{" + + Buffer buffer = buffer("{\"parsedRecord\":{" + "\"id\":\"990fad8b-64ec-4de4-978c-9f8bbed4c6d3\"," + "\"content\":\"{\\\"leader\\\":\\\"00574nam 22001211a 4500\\\",\\\"fields\\\":[{\\\"035\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"(in001)ybp7406411\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"245\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"titleValue\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"336\\\":{\\\"subfields\\\":[{\\\"b\\\":\\\"b6698d38-149f-11ec-82a8-0242ac130003\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"780\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"Houston oil directory\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"785\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"SAIS review of international affairs\\\"},{\\\"x\\\":\\\"1945-4724\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"500\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Adaptation of Xi xiang ji by Wang Shifu.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"520\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Ben shu miao shu le cui ying ying he zhang sheng wei zheng qu hun yin zi you li jin qu zhe jian xin zhi hou, zhong cheng juan shu de ai qing gu shi. jie lu le bao ban hun yin he feng jian li jiao de zui e.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"999\\\":{\\\"subfields\\\":[{\\\"i\\\":\\\"4d4545df-b5ba-4031-a031-70b1c1b2fc5d\\\"}],\\\"ind1\\\":\\\"f\\\",\\\"ind2\\\":\\\"f\\\"}}]}\"" + "}}"); @@ -441,7 +441,7 @@ public void shouldProcessEventAndUpdate005Field() throws InterruptedException, E MappingManager.registerReaderFactory(fakeReaderFactory); MappingManager.registerWriterFactory(new InstanceWriterFactory()); - HttpResponse resp = buildHttpResponseWithBuffer(BufferImpl.buffer("{}"), SC_CREATED); + HttpResponse resp = buildHttpResponseWithBuffer(Buffer.buffer("{}"), SC_CREATED); ArgumentCaptor recordCaptor = ArgumentCaptor.forClass(Record.class); when(sourceStorageClient.postSourceStorageRecords(any())).thenReturn(Future.succeededFuture(resp)); @@ -506,7 +506,7 @@ public void shouldProcessEventAndUpdateSuppressFromDiscovery() throws Interrupte context.put(PAYLOAD_USER_ID, USER_ID); context.put(OKAPI_REQUEST_ID, REQUEST_ID); - Buffer buffer = BufferImpl.buffer("{\"parsedRecord\":{" + + Buffer buffer = Buffer.buffer("{\"parsedRecord\":{" + "\"id\":\"990fad8b-64ec-4de4-978c-9f8bbed4c6d3\"," + "\"content\":\"{\\\"leader\\\":\\\"00574nam 22001211a 4500\\\",\\\"fields\\\":[{\\\"035\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"(in001)ybp7406411\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"245\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"titleValue\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"336\\\":{\\\"subfields\\\":[{\\\"b\\\":\\\"b6698d38-149f-11ec-82a8-0242ac130003\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"780\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"Houston oil directory\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"785\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"SAIS review of international affairs\\\"},{\\\"x\\\":\\\"1945-4724\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"500\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Adaptation of Xi xiang ji by Wang Shifu.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"520\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Ben shu miao shu le cui ying ying he zhang sheng wei zheng qu hun yin zi you li jin qu zhe jian xin zhi hou, zhong cheng juan shu de ai qing gu shi. jie lu le bao ban hun yin he feng jian li jiao de zui e.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"999\\\":{\\\"subfields\\\":[{\\\"i\\\":\\\"4d4545df-b5ba-4031-a031-70b1c1b2fc5d\\\"}],\\\"ind1\\\":\\\"f\\\",\\\"ind2\\\":\\\"f\\\"}}]}\"" + "}}"); @@ -561,7 +561,7 @@ public void shouldProcessEventAndDeleteInstanceIfFailedCreateRecord() throws Int MappingManager.registerWriterFactory(new InstanceWriterFactory()); HttpResponseImpl response = new HttpResponseImpl<>(null, HttpStatus.SC_BAD_REQUEST, "", - null, null, null, BufferImpl.buffer("{}"), null); + null, null, null, Buffer.buffer("{}"), null); when(sourceStorageClient.postSourceStorageRecords(any())).thenReturn(Future.succeededFuture(response)); HashMap context = new HashMap<>(); @@ -627,7 +627,7 @@ public void shouldProcessConsortiumEvent() throws InterruptedException, Executio context.put(MARC_BIBLIOGRAPHIC.value(), Json.encode(record)); - Buffer buffer = BufferImpl.buffer("{\"id\": \"567859ad-505a-400d-a699-0028a1fdbf84\",\"parsedRecord\": {\"content\": \"{\\\"leader\\\":\\\"00567nam 22001211a 4500\\\",\\\"fields\\\":[{\\\"035\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"ybp7406411\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"245\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"titleValue\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"336\\\":{\\\"subfields\\\":[{\\\"b\\\":\\\"b6698d38-149f-11ec-82a8-0242ac130003\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"780\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"Houston oil directory\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"785\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"SAIS review of international affairs\\\"},{\\\"x\\\":\\\"1945-4724\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"500\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Adaptation of Xi xiang ji by Wang Shifu.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"520\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Ben shu miao shu le cui ying ying he zhang sheng wei zheng qu hun yin zi you li jin qu zhe jian xin zhi hou, zhong cheng juan shu de ai qing gu shi. jie lu le bao ban hun yin he feng jian li jiao de zui e.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"999\\\":{\\\"subfields\\\":[{\\\"i\\\":\\\"957985c6-97e3-4038-b0e7-343ecd0b8120\\\"}],\\\"ind1\\\":\\\"f\\\",\\\"ind2\\\":\\\"f\\\"}}]}\"},\"deleted\": false,\"order\": 0,\"externalIdsHolder\": {\"instanceId\": \"b5e25bc3-a5a5-474a-8333-4a728d2f3485\",\"instanceHrid\": \"in00000000028\"},\"state\": \"ACTUAL\"}"); + Buffer buffer = Buffer.buffer("{\"id\": \"567859ad-505a-400d-a699-0028a1fdbf84\",\"parsedRecord\": {\"content\": \"{\\\"leader\\\":\\\"00567nam 22001211a 4500\\\",\\\"fields\\\":[{\\\"035\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"ybp7406411\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"245\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"titleValue\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"336\\\":{\\\"subfields\\\":[{\\\"b\\\":\\\"b6698d38-149f-11ec-82a8-0242ac130003\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"780\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"Houston oil directory\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"785\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"SAIS review of international affairs\\\"},{\\\"x\\\":\\\"1945-4724\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"500\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Adaptation of Xi xiang ji by Wang Shifu.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"520\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Ben shu miao shu le cui ying ying he zhang sheng wei zheng qu hun yin zi you li jin qu zhe jian xin zhi hou, zhong cheng juan shu de ai qing gu shi. jie lu le bao ban hun yin he feng jian li jiao de zui e.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"999\\\":{\\\"subfields\\\":[{\\\"i\\\":\\\"957985c6-97e3-4038-b0e7-343ecd0b8120\\\"}],\\\"ind1\\\":\\\"f\\\",\\\"ind2\\\":\\\"f\\\"}}]}\"},\"deleted\": false,\"order\": 0,\"externalIdsHolder\": {\"instanceId\": \"b5e25bc3-a5a5-474a-8333-4a728d2f3485\",\"instanceHrid\": \"in00000000028\"},\"state\": \"ACTUAL\"}"); HttpResponse resp = buildHttpResponseWithBuffer(buffer, SC_CREATED); when(sourceStorageClient.postSourceStorageRecords(any())).thenReturn(Future.succeededFuture(resp)); @@ -692,7 +692,7 @@ public void shouldProcessEventAndMarkInstanceAndRecordAsDeletedIfLeaderIsDeleted context.put(PAYLOAD_USER_ID, USER_ID); context.put(OKAPI_REQUEST_ID, REQUEST_ID); - Buffer buffer = BufferImpl.buffer("{\"parsedRecord\":{" + + Buffer buffer = Buffer.buffer("{\"parsedRecord\":{" + "\"id\":\"990fad8b-64ec-4de4-978c-9f8bbed4c6d3\"," + "\"content\":\"{\\\"leader\\\":\\\"00574nam 22001211a 4500\\\",\\\"fields\\\":[{\\\"035\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"(in001)ybp7406411\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"245\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"titleValue\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"336\\\":{\\\"subfields\\\":[{\\\"b\\\":\\\"b6698d38-149f-11ec-82a8-0242ac130003\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"780\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"Houston oil directory\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"785\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"SAIS review of international affairs\\\"},{\\\"x\\\":\\\"1945-4724\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"500\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Adaptation of Xi xiang ji by Wang Shifu.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"520\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Ben shu miao shu le cui ying ying he zhang sheng wei zheng qu hun yin zi you li jin qu zhe jian xin zhi hou, zhong cheng juan shu de ai qing gu shi. jie lu le bao ban hun yin he feng jian li jiao de zui e.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"999\\\":{\\\"subfields\\\":[{\\\"i\\\":\\\"4d4545df-b5ba-4031-a031-70b1c1b2fc5d\\\"}],\\\"ind1\\\":\\\"f\\\",\\\"ind2\\\":\\\"f\\\"}}]}\"" + "}}"); @@ -764,7 +764,7 @@ public void shouldProcessEventAndMarkInstanceAndRecordAsDeletedIfLeaderIsDeleted context.put(PAYLOAD_USER_ID, USER_ID); context.put(OKAPI_REQUEST_ID, REQUEST_ID); - Buffer buffer = BufferImpl.buffer("{\"parsedRecord\":{" + + Buffer buffer = Buffer.buffer("{\"parsedRecord\":{" + "\"id\":\"990fad8b-64ec-4de4-978c-9f8bbed4c6d3\"," + "\"content\":\"{\\\"leader\\\":\\\"00574nam 22001211a 4500\\\",\\\"fields\\\":[{\\\"035\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"(in001)ybp7406411\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"245\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"titleValue\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"336\\\":{\\\"subfields\\\":[{\\\"b\\\":\\\"b6698d38-149f-11ec-82a8-0242ac130003\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"780\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"Houston oil directory\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"785\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"SAIS review of international affairs\\\"},{\\\"x\\\":\\\"1945-4724\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"500\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Adaptation of Xi xiang ji by Wang Shifu.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"520\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Ben shu miao shu le cui ying ying he zhang sheng wei zheng qu hun yin zi you li jin qu zhe jian xin zhi hou, zhong cheng juan shu de ai qing gu shi. jie lu le bao ban hun yin he feng jian li jiao de zui e.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"999\\\":{\\\"subfields\\\":[{\\\"i\\\":\\\"4d4545df-b5ba-4031-a031-70b1c1b2fc5d\\\"}],\\\"ind1\\\":\\\"f\\\",\\\"ind2\\\":\\\"f\\\"}}]}\"" + "}}"); @@ -983,7 +983,7 @@ public void shouldNotProcessEventIfNatureContentFieldIsNotUUID() throws Interrup context.put(MARC_BIBLIOGRAPHIC.value(), Json.encode(record)); - Buffer buffer = BufferImpl.buffer("{\"parsedRecord\":{" + + Buffer buffer = Buffer.buffer("{\"parsedRecord\":{" + "\"id\":\"990fad8b-64ec-4de4-978c-9f8bbed4c6d3\"," + "\"content\":\"{\\\"leader\\\":\\\"00574nam 22001211a 4500\\\",\\\"fields\\\":[{\\\"035\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"(in001)ybp7406411\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"245\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"titleValue\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"336\\\":{\\\"subfields\\\":[{\\\"b\\\":\\\"b6698d38-149f-11ec-82a8-0242ac130003\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"780\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"Houston oil directory\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"785\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"SAIS review of international affairs\\\"},{\\\"x\\\":\\\"1945-4724\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"500\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Adaptation of Xi xiang ji by Wang Shifu.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"520\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Ben shu miao shu le cui ying ying he zhang sheng wei zheng qu hun yin zi you li jin qu zhe jian xin zhi hou, zhong cheng juan shu de ai qing gu shi. jie lu le bao ban hun yin he feng jian li jiao de zui e.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"999\\\":{\\\"subfields\\\":[{\\\"i\\\":\\\"4d4545df-b5ba-4031-a031-70b1c1b2fc5d\\\"}],\\\"ind1\\\":\\\"f\\\",\\\"ind2\\\":\\\"f\\\"}}]}\"" + "}}"); @@ -1008,7 +1008,7 @@ public void shouldNotProcessEventIfNatureContentFieldIsNotUUID() throws Interrup public void shouldNotProcessEventIfRecordContains999field() throws InterruptedException, ExecutionException, TimeoutException { var recordId = UUID.randomUUID().toString(); - HttpResponse resp = buildHttpResponseWithBuffer(BufferImpl.buffer("{}"), SC_CREATED); + HttpResponse resp = buildHttpResponseWithBuffer(Buffer.buffer("{}"), SC_CREATED); when(sourceStorageClient.postSourceStorageRecords(any())).thenReturn(Future.succeededFuture(resp)); var context = new HashMap(); diff --git a/src/test/java/org/folio/inventory/dataimport/handlers/actions/ReplaceInstanceEventHandlerTest.java b/src/test/java/org/folio/inventory/dataimport/handlers/actions/ReplaceInstanceEventHandlerTest.java index edbd18cda..76245386e 100644 --- a/src/test/java/org/folio/inventory/dataimport/handlers/actions/ReplaceInstanceEventHandlerTest.java +++ b/src/test/java/org/folio/inventory/dataimport/handlers/actions/ReplaceInstanceEventHandlerTest.java @@ -4,6 +4,7 @@ import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor; import static com.github.tomakehurst.wiremock.client.WireMock.post; import static com.github.tomakehurst.wiremock.client.WireMock.verify; +import static io.vertx.core.buffer.Buffer.buffer; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.completedStage; import static org.folio.ActionProfile.FolioRecord.INSTANCE; @@ -61,7 +62,6 @@ import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; -import io.vertx.core.buffer.impl.BufferImpl; import io.vertx.core.json.Json; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; @@ -335,11 +335,11 @@ public void setUp() throws IOException { vertx.createHttpClient(), true), vertx.createHttpClient(), consortiumServiceImpl, instanceLinkClient, snapshotService)); var recordUUID = UUID.randomUUID().toString(); - HttpResponse recordHttpResponse = buildHttpResponseWithBuffer(BufferImpl.buffer(String.format(EXISTING_SRS_CONTENT, recordUUID, recordUUID, 0)), HttpStatus.SC_OK); + HttpResponse recordHttpResponse = buildHttpResponseWithBuffer(Buffer.buffer(String.format(EXISTING_SRS_CONTENT, recordUUID, recordUUID, 0)), HttpStatus.SC_OK); when(sourceStorageClient.getSourceStorageRecordsFormattedById(any(), any())) .thenReturn(Future.succeededFuture(recordHttpResponse)); - HttpResponse snapshotHttpResponse = buildHttpResponseWithBuffer(BufferImpl.buffer(Json.encode(new Snapshot())), HttpStatus.SC_CREATED); + HttpResponse snapshotHttpResponse = buildHttpResponseWithBuffer(buffer(Json.encode(new Snapshot())), HttpStatus.SC_CREATED); when(sourceStorageSnapshotsClient.postSourceStorageSnapshots(any())).thenReturn(Future.succeededFuture(snapshotHttpResponse)); doAnswer(invocationOnMock -> { @@ -389,7 +389,7 @@ public void shouldProcessEvent() throws InterruptedException, ExecutionException mockInstance(MARC_INSTANCE_SOURCE); - Buffer buffer = BufferImpl.buffer("{\"parsedRecord\":{" + + Buffer buffer = Buffer.buffer("{\"parsedRecord\":{" + "\"id\":\"990fad8b-64ec-4de4-978c-9f8bbed4c6d3\"," + "\"content\":\"{\\\"leader\\\":\\\"00574nam 22001211a 4500\\\",\\\"fields\\\":[{\\\"035\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"(in001)ybp7406411\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"245\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"titleValue\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"336\\\":{\\\"subfields\\\":[{\\\"b\\\":\\\"b6698d38-149f-11ec-82a8-0242ac130003\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"780\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"Houston oil directory\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"785\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"SAIS review of international affairs\\\"},{\\\"x\\\":\\\"1945-4724\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"500\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Adaptation of Xi xiang ji by Wang Shifu.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"520\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Ben shu miao shu le cui ying ying he zhang sheng wei zheng qu hun yin zi you li jin qu zhe jian xin zhi hou, zhong cheng juan shu de ai qing gu shi. jie lu le bao ban hun yin he feng jian li jiao de zui e.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"999\\\":{\\\"subfields\\\":[{\\\"i\\\":\\\"4d4545df-b5ba-4031-a031-70b1c1b2fc5d\\\"}],\\\"ind1\\\":\\\"f\\\",\\\"ind2\\\":\\\"f\\\"}}]}\"" + "}}"); @@ -459,7 +459,7 @@ public void shouldProcessEventAndMarkInstanceAndRecordAsDeletedIfLeaderIsDeleted mockInstance(MARC_INSTANCE_SOURCE); - Buffer buffer = BufferImpl.buffer("{\"parsedRecord\":{" + + Buffer buffer = Buffer.buffer("{\"parsedRecord\":{" + "\"id\":\"990fad8b-64ec-4de4-978c-9f8bbed4c6d3\"," + "\"content\":\"{\\\"leader\\\":\\\"00574nam 22001211a 4500\\\",\\\"fields\\\":[{\\\"035\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"(in001)ybp7406411\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"245\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"titleValue\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"336\\\":{\\\"subfields\\\":[{\\\"b\\\":\\\"b6698d38-149f-11ec-82a8-0242ac130003\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"780\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"Houston oil directory\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"785\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"SAIS review of international affairs\\\"},{\\\"x\\\":\\\"1945-4724\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"500\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Adaptation of Xi xiang ji by Wang Shifu.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"520\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Ben shu miao shu le cui ying ying he zhang sheng wei zheng qu hun yin zi you li jin qu zhe jian xin zhi hou, zhong cheng juan shu de ai qing gu shi. jie lu le bao ban hun yin he feng jian li jiao de zui e.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"999\\\":{\\\"subfields\\\":[{\\\"i\\\":\\\"4d4545df-b5ba-4031-a031-70b1c1b2fc5d\\\"}],\\\"ind1\\\":\\\"f\\\",\\\"ind2\\\":\\\"f\\\"}}]}\"" + "}}"); @@ -537,7 +537,7 @@ public void shouldProcessEventAndUnMarkInstanceAndRecordDeleted() throws Interru mockInstance(MARC_INSTANCE_SOURCE, true); - Buffer buffer = BufferImpl.buffer("{\"parsedRecord\":{" + + Buffer buffer = Buffer.buffer("{\"parsedRecord\":{" + "\"id\":\"990fad8b-64ec-4de4-978c-9f8bbed4c6d3\"," + "\"content\":\"{\\\"leader\\\":\\\"00574nam 22001211a 4500\\\",\\\"fields\\\":[{\\\"035\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"(in001)ybp7406411\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"245\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"titleValue\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"336\\\":{\\\"subfields\\\":[{\\\"b\\\":\\\"b6698d38-149f-11ec-82a8-0242ac130003\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"780\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"Houston oil directory\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"785\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"SAIS review of international affairs\\\"},{\\\"x\\\":\\\"1945-4724\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"500\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Adaptation of Xi xiang ji by Wang Shifu.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"520\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Ben shu miao shu le cui ying ying he zhang sheng wei zheng qu hun yin zi you li jin qu zhe jian xin zhi hou, zhong cheng juan shu de ai qing gu shi. jie lu le bao ban hun yin he feng jian li jiao de zui e.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"999\\\":{\\\"subfields\\\":[{\\\"i\\\":\\\"4d4545df-b5ba-4031-a031-70b1c1b2fc5d\\\"}],\\\"ind1\\\":\\\"f\\\",\\\"ind2\\\":\\\"f\\\"}}]}\"" + "}}"); @@ -607,7 +607,7 @@ public void shouldReplaceExistingPrecedingTitleOnInstanceUpdate() throws Interru mockInstance(MARC_INSTANCE_SOURCE); - Buffer buffer = BufferImpl.buffer(String.format(RESPONSE_CONTENT, UUID.randomUUID(), UUID.randomUUID())); + Buffer buffer = Buffer.buffer(String.format(RESPONSE_CONTENT, UUID.randomUUID(), UUID.randomUUID())); HttpResponse resp = buildHttpResponseWithBuffer(buffer, HttpStatus.SC_OK); when(sourceStorageClient.putSourceStorageRecordsGenerationById(any(), any())).thenReturn(Future.succeededFuture(resp)); @@ -785,7 +785,7 @@ public void shouldFailIfErrorDuringCreatingOfSnapshotForConsortiumInstance() thr when(fakeReaderFactory.createReader()).thenReturn(fakeReader); - HttpResponse snapshotHttpResponse = buildHttpResponseWithBuffer(BufferImpl.buffer("{}"), HttpStatus.SC_INTERNAL_SERVER_ERROR); + HttpResponse snapshotHttpResponse = buildHttpResponseWithBuffer(Buffer.buffer("{}"), HttpStatus.SC_INTERNAL_SERVER_ERROR); when(sourceStorageSnapshotsClient.postSourceStorageSnapshots(any())).thenReturn(Future.succeededFuture(snapshotHttpResponse)); MappingManager.registerReaderFactory(fakeReaderFactory); @@ -846,7 +846,7 @@ public void shouldUpdateSharedFolioInstanceOnCentralTenantIfPayloadContainsCentr mockInstance(FOLIO.getValue()); - Buffer buffer = BufferImpl.buffer(String.format(RESPONSE_CONTENT, recordId, recordId)); + Buffer buffer = Buffer.buffer(String.format(RESPONSE_CONTENT, recordId, recordId)); HttpResponse respForCreated = buildHttpResponseWithBuffer(buffer, HttpStatus.SC_CREATED); when(sourceStorageClient.postSourceStorageRecords(any())).thenReturn(Future.succeededFuture(respForCreated)); @@ -926,7 +926,7 @@ public void shouldUpdateSharedMarcInstanceOnCentralTenantIfPayloadContainsCentra mockInstance(MARC.getValue()); - Buffer buffer = BufferImpl.buffer(String.format(RESPONSE_CONTENT, recordId, recordId)); + Buffer buffer = Buffer.buffer(String.format(RESPONSE_CONTENT, recordId, recordId)); HttpResponse respForCreated = buildHttpResponseWithBuffer(buffer, HttpStatus.SC_OK); when(sourceStorageClient.putSourceStorageRecordsGenerationById(any(), any())) @@ -1212,7 +1212,7 @@ public void shouldNotProcessEventIfNatureContentFieldIsNotUUID() throws Interrup mockInstance(MARC_INSTANCE_SOURCE); - Buffer buffer = BufferImpl.buffer("{\"parsedRecord\":{" + + Buffer buffer = Buffer.buffer("{\"parsedRecord\":{" + "\"id\":\"990fad8b-64ec-4de4-978c-9f8bbed4c6d3\"," + "\"content\":\"{\\\"leader\\\":\\\"00574nam 22001211a 4500\\\",\\\"fields\\\":[{\\\"035\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"(in001)ybp7406411\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"245\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"titleValue\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"336\\\":{\\\"subfields\\\":[{\\\"b\\\":\\\"b6698d38-149f-11ec-82a8-0242ac130003\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"780\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"Houston oil directory\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"785\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"SAIS review of international affairs\\\"},{\\\"x\\\":\\\"1945-4724\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"500\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Adaptation of Xi xiang ji by Wang Shifu.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"520\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Ben shu miao shu le cui ying ying he zhang sheng wei zheng qu hun yin zi you li jin qu zhe jian xin zhi hou, zhong cheng juan shu de ai qing gu shi. jie lu le bao ban hun yin he feng jian li jiao de zui e.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"999\\\":{\\\"subfields\\\":[{\\\"i\\\":\\\"4d4545df-b5ba-4031-a031-70b1c1b2fc5d\\\"}],\\\"ind1\\\":\\\"f\\\",\\\"ind2\\\":\\\"f\\\"}}]}\"" + "}}"); @@ -1328,7 +1328,7 @@ public void shouldNotRequestMarcRecordIfInstanceSourceIsNotMarc() throws Interru Record record = new Record().withParsedRecord(new ParsedRecord().withContent(PARSED_CONTENT)); record.setId(recordId); - Buffer buffer = BufferImpl.buffer(String.format(RESPONSE_CONTENT, recordId, recordId)); + Buffer buffer = Buffer.buffer(String.format(RESPONSE_CONTENT, recordId, recordId)); HttpResponse respForCreated = buildHttpResponseWithBuffer(buffer, HttpStatus.SC_CREATED); when(sourceStorageClient.postSourceStorageRecords(any())).thenReturn(Future.succeededFuture(respForCreated)); @@ -1386,7 +1386,7 @@ public void shouldProcessEventEvenIfRecordIsNotExistsInSRS() throws InterruptedE mockInstance(MARC_INSTANCE_SOURCE); - Buffer buffer = BufferImpl.buffer("{\"parsedRecord\":{" + + Buffer buffer = Buffer.buffer("{\"parsedRecord\":{" + "\"id\":\"990fad8b-64ec-4de4-978c-9f8bbed4c6d3\"," + "\"content\":\"{\\\"leader\\\":\\\"00574nam 22001211a 4500\\\",\\\"fields\\\":[{\\\"035\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"(in001)ybp7406411\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"245\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"titleValue\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"336\\\":{\\\"subfields\\\":[{\\\"b\\\":\\\"b6698d38-149f-11ec-82a8-0242ac130003\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"780\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"Houston oil directory\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"785\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"SAIS review of international affairs\\\"},{\\\"x\\\":\\\"1945-4724\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"500\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Adaptation of Xi xiang ji by Wang Shifu.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"520\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Ben shu miao shu le cui ying ying he zhang sheng wei zheng qu hun yin zi you li jin qu zhe jian xin zhi hou, zhong cheng juan shu de ai qing gu shi. jie lu le bao ban hun yin he feng jian li jiao de zui e.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"999\\\":{\\\"subfields\\\":[{\\\"i\\\":\\\"4d4545df-b5ba-4031-a031-70b1c1b2fc5d\\\"}],\\\"ind1\\\":\\\"f\\\",\\\"ind2\\\":\\\"f\\\"}}]}\"" + "}}"); @@ -1466,7 +1466,7 @@ public void shouldUpdateInstanceWithoutRelatedMarcRecord() throws InterruptedExc mockInstance(MARC_INSTANCE_SOURCE); - Buffer buffer = BufferImpl.buffer("{\"parsedRecord\":{" + + Buffer buffer = Buffer.buffer("{\"parsedRecord\":{" + "\"id\":\"990fad8b-64ec-4de4-978c-9f8bbed4c6d3\"," + "\"content\":\"{\\\"leader\\\":\\\"00574nam 22001211a 4500\\\",\\\"fields\\\":[{\\\"035\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"(in001)ybp7406411\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"245\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"titleValue\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"336\\\":{\\\"subfields\\\":[{\\\"b\\\":\\\"b6698d38-149f-11ec-82a8-0242ac130003\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"780\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"Houston oil directory\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"785\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"SAIS review of international affairs\\\"},{\\\"x\\\":\\\"1945-4724\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"500\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Adaptation of Xi xiang ji by Wang Shifu.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"520\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Ben shu miao shu le cui ying ying he zhang sheng wei zheng qu hun yin zi you li jin qu zhe jian xin zhi hou, zhong cheng juan shu de ai qing gu shi. jie lu le bao ban hun yin he feng jian li jiao de zui e.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"999\\\":{\\\"subfields\\\":[{\\\"i\\\":\\\"4d4545df-b5ba-4031-a031-70b1c1b2fc5d\\\"}],\\\"ind1\\\":\\\"f\\\",\\\"ind2\\\":\\\"f\\\"}}]}\"" + "}}"); @@ -1551,7 +1551,7 @@ public void shouldProcessEventWithExternalEntity() throws InterruptedException, return null; }).when(instanceRecordCollection).findById(anyString(), any(Consumer.class), any(Consumer.class)); - Buffer buffer = BufferImpl.buffer(String.format(RESPONSE_CONTENT, recordId, recordId)); + Buffer buffer = Buffer.buffer(String.format(RESPONSE_CONTENT, recordId, recordId)); HttpResponse respForPass = buildHttpResponseWithBuffer(buffer, HttpStatus.SC_OK); when(sourceStorageClient.putSourceStorageRecordsGenerationById(any(), any())).thenReturn(Future.succeededFuture(respForPass)); @@ -1616,7 +1616,7 @@ public void shouldProcessEventAndUpdateSuppressFromDiscovery() throws Interrupte mockInstance(MARC_INSTANCE_SOURCE); - Buffer buffer = BufferImpl.buffer("{\"parsedRecord\":{" + + Buffer buffer = Buffer.buffer("{\"parsedRecord\":{" + "\"id\":\"990fad8b-64ec-4de4-978c-9f8bbed4c6d3\"," + "\"content\":\"{\\\"leader\\\":\\\"00574nam 22001211a 4500\\\",\\\"fields\\\":[{\\\"035\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"(in001)ybp7406411\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"245\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"titleValue\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"336\\\":{\\\"subfields\\\":[{\\\"b\\\":\\\"b6698d38-149f-11ec-82a8-0242ac130003\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"780\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"Houston oil directory\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"785\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"SAIS review of international affairs\\\"},{\\\"x\\\":\\\"1945-4724\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"500\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Adaptation of Xi xiang ji by Wang Shifu.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"520\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Ben shu miao shu le cui ying ying he zhang sheng wei zheng qu hun yin zi you li jin qu zhe jian xin zhi hou, zhong cheng juan shu de ai qing gu shi. jie lu le bao ban hun yin he feng jian li jiao de zui e.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"999\\\":{\\\"subfields\\\":[{\\\"i\\\":\\\"4d4545df-b5ba-4031-a031-70b1c1b2fc5d\\\"}],\\\"ind1\\\":\\\"f\\\",\\\"ind2\\\":\\\"f\\\"}}]}\"" + "}}"); @@ -1698,7 +1698,7 @@ public void shouldRemove035FieldWhenRecordContainsHrId() throws Exception { mockInstance(MARC_INSTANCE_SOURCE); - Buffer buffer = BufferImpl.buffer(JsonObject.mapFrom(record).encode()); + Buffer buffer = Buffer.buffer(JsonObject.mapFrom(record).encode()); HttpResponse respForPass = buildHttpResponseWithBuffer(buffer, HttpStatus.SC_OK); when(sourceStorageClient.putSourceStorageRecordsGenerationById(any(), any())).thenReturn(Future.succeededFuture(respForPass)); @@ -1770,7 +1770,7 @@ public void shouldProcessEventAnd() throws InterruptedException, ExecutionExcept mockInstance(MARC_INSTANCE_SOURCE); - HttpResponse respForPass = buildHttpResponseWithBuffer(BufferImpl.buffer("{}"), HttpStatus.SC_BAD_REQUEST); + HttpResponse respForPass = buildHttpResponseWithBuffer(Buffer.buffer("{}"), HttpStatus.SC_BAD_REQUEST); when(sourceStorageClient.putSourceStorageRecordsGenerationById(any(), any())).thenReturn(Future.succeededFuture(respForPass)); DataImportEventPayload dataImportEventPayload = new DataImportEventPayload() @@ -1905,7 +1905,7 @@ public void shouldNotUpdateLinksWhenIncomingZeroSubfieldIsSameAsExisting() { .withParsedRecord(new ParsedRecord() .withId(recordId) .withContent(new JsonObject(expectedParsedContent))); - var buffer = BufferImpl.buffer(JsonObject.mapFrom(expectedRecord).encode()); + var buffer = Buffer.buffer(JsonObject.mapFrom(expectedRecord).encode()); var respForPass = buildHttpResponseWithBuffer(buffer, HttpStatus.SC_OK); when(sourceStorageClient.getSourceStorageRecordsFormattedById(any(), any())) .thenReturn(Future.succeededFuture(respForPass)); diff --git a/src/test/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandlerUnitTest.java b/src/test/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandlerUnitTest.java index b0cb1759f..5082c61de 100644 --- a/src/test/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandlerUnitTest.java +++ b/src/test/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandlerUnitTest.java @@ -2,7 +2,7 @@ import static io.vertx.core.Future.failedFuture; import static io.vertx.core.Future.succeededFuture; -import static io.vertx.core.buffer.impl.BufferImpl.buffer; +import static io.vertx.core.buffer.Buffer.buffer; import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; import static org.folio.inventory.TestUtil.buildHttpResponseWithBuffer; diff --git a/src/test/java/org/folio/inventory/instanceingress/handler/UpdateInstanceIngressEventHandlerUnitTest.java b/src/test/java/org/folio/inventory/instanceingress/handler/UpdateInstanceIngressEventHandlerUnitTest.java index ad7b33a38..7551f3e38 100644 --- a/src/test/java/org/folio/inventory/instanceingress/handler/UpdateInstanceIngressEventHandlerUnitTest.java +++ b/src/test/java/org/folio/inventory/instanceingress/handler/UpdateInstanceIngressEventHandlerUnitTest.java @@ -2,6 +2,7 @@ import static io.vertx.core.Future.failedFuture; import static io.vertx.core.Future.succeededFuture; +import static io.vertx.core.buffer.Buffer.buffer; import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; import static org.folio.inventory.TestUtil.buildHttpResponseWithBuffer; @@ -18,7 +19,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; -import io.vertx.core.buffer.impl.BufferImpl; +import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpClient; import io.vertx.core.json.Json; import io.vertx.core.json.JsonObject; @@ -435,7 +436,7 @@ public void shouldReturnFailedFuture_ifItsFailedToPutNewRecordToSRS() throws IOE doReturn(sourceStorageClient).when(handler).getSourceStorageRecordsClient(any(), any(), any(), any(), any()); var snapshot = new Snapshot().withJobExecutionId(UUID.randomUUID().toString()); doReturn(succeededFuture(snapshot)).when(snapshotService).postSnapshotInSrsAndHandleResponse(any(), any()); - var existedRecordResponse = buildHttpResponseWithBuffer(BufferImpl.buffer("{\"id\":\"5e525f1e-d373-4a07-9aff-b80856bacfef\"}"), HttpStatus.SC_OK); + var existedRecordResponse = buildHttpResponseWithBuffer(buffer("{\"id\":\"5e525f1e-d373-4a07-9aff-b80856bacfef\"}"), HttpStatus.SC_OK); doReturn(succeededFuture(existedRecordResponse)).when(sourceStorageClient).getSourceStorageRecordsFormattedById(any(), any()); var sourceStorageHttpResponse = buildHttpResponseWithBuffer(HttpStatus.SC_BAD_REQUEST); doReturn(succeededFuture(sourceStorageHttpResponse)).when(sourceStorageClient).putSourceStorageRecordsGenerationById(any(), any()); @@ -483,7 +484,7 @@ public void shouldReturnFailedFuture_ifCreatePrecedingSucceedingTitlesFailed() t doReturn(sourceStorageSnapshotsClient).when(handler).getSourceStorageSnapshotsClient(any(), any(), any(), any(), any()); var snapshotHttpResponse = buildHttpResponseWithBuffer(HttpStatus.SC_CREATED); doReturn(succeededFuture(snapshotHttpResponse)).when(sourceStorageSnapshotsClient).postSourceStorageSnapshots(any()); - var existedRecordResponse = buildHttpResponseWithBuffer(BufferImpl.buffer("{\"id\":\"5e525f1e-d373-4a07-9aff-b80856bacfef\"}"), HttpStatus.SC_OK); + var existedRecordResponse = buildHttpResponseWithBuffer(Buffer.buffer("{\"id\":\"5e525f1e-d373-4a07-9aff-b80856bacfef\"}"), HttpStatus.SC_OK); doReturn(succeededFuture(existedRecordResponse)).when(sourceStorageClient).getSourceStorageRecordsFormattedById(any(), any()); var sourceStorageHttpResponse = buildHttpResponseWithBuffer(HttpStatus.SC_OK); doReturn(succeededFuture(sourceStorageHttpResponse)).when(sourceStorageClient).putSourceStorageRecordsGenerationById(any(), any()); @@ -537,7 +538,7 @@ public void shouldReturnSucceededFuture_ifProcessFinishedCorrectly() throws IOEx doReturn(succeededFuture(titles)).when(precedingSucceedingTitlesHelper).getExistingPrecedingSucceedingTitles(any(), any()); doReturn(succeededFuture()).when(precedingSucceedingTitlesHelper).deletePrecedingSucceedingTitles(any(), any()); doReturn(sourceStorageClient).when(handler).getSourceStorageRecordsClient(any(), any(), any(), any(), any()); - var existedRecordResponse = buildHttpResponseWithBuffer(BufferImpl.buffer("{\"matchedId\":\"" + initialSrsId + "\"}"), HttpStatus.SC_OK); + var existedRecordResponse = buildHttpResponseWithBuffer(Buffer.buffer("{\"matchedId\":\"" + initialSrsId + "\"}"), HttpStatus.SC_OK); doReturn(succeededFuture(existedRecordResponse)).when(sourceStorageClient).getSourceStorageRecordsFormattedById(any(), any()); var sourceStorageHttpResponse = buildHttpResponseWithBuffer(HttpStatus.SC_OK); doReturn(succeededFuture(sourceStorageHttpResponse)).when(sourceStorageClient).putSourceStorageRecordsGenerationById(any(), any()); diff --git a/src/test/java/org/folio/inventory/quickmarc/consumers/QuickMarcConsumerVerticleTest.java b/src/test/java/org/folio/inventory/quickmarc/consumers/QuickMarcConsumerVerticleTest.java index b84619215..2dffd98d4 100644 --- a/src/test/java/org/folio/inventory/quickmarc/consumers/QuickMarcConsumerVerticleTest.java +++ b/src/test/java/org/folio/inventory/quickmarc/consumers/QuickMarcConsumerVerticleTest.java @@ -1,6 +1,5 @@ package org.folio.inventory.quickmarc.consumers; -import io.vertx.core.Promise; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; @@ -12,16 +11,15 @@ @RunWith(VertxUnitRunner.class) public class QuickMarcConsumerVerticleTest extends KafkaTest { + @Test public void shouldDeployVerticle(TestContext context) { Async async = context.async(); - - Promise promise = Promise.promise(); - vertxAssistant.getVertx().deployVerticle(QuickMarcConsumerVerticle.class.getName(), deploymentOptions, promise); - - promise.future().onComplete(ar -> { - context.assertTrue(ar.succeeded()); - async.complete(); - }); + vertxAssistant.getVertx() + .deployVerticle(QuickMarcConsumerVerticle.class.getName(), deploymentOptions) + .onComplete(ar -> { + context.assertTrue(ar.succeeded()); + async.complete(); + }); } } diff --git a/src/test/java/support/fakes/FakeOkapi.java b/src/test/java/support/fakes/FakeOkapi.java index 2820ab080..2f8f89a77 100644 --- a/src/test/java/support/fakes/FakeOkapi.java +++ b/src/test/java/support/fakes/FakeOkapi.java @@ -7,20 +7,19 @@ import io.vertx.core.http.HttpServer; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.Router; +import lombok.Getter; import support.fakes.processors.StorageConstraintsProcessors; import support.fakes.processors.StorageRecordPreProcessors; public class FakeOkapi extends AbstractVerticle { private static final int PORT_TO_USE = 9493; + + @Getter private static final String address = String.format("http://localhost:%s", PORT_TO_USE); private HttpServer server; - public static String getAddress() { - return address; - } - @Override public void start(Promise startFuture) { System.out.println("Starting fake modules"); @@ -49,14 +48,12 @@ public void start(Promise startFuture) { registerFakeHoldingSourcesModule(router); server.requestHandler(router) - .listen(PORT_TO_USE, result -> { - if (result.succeeded()) { - System.out.printf("Fake Okapi listening on %s%n", server.actualPort()); - startFuture.complete(); - } else { - startFuture.fail(result.cause()); - } - }); + .listen(PORT_TO_USE) + .onSuccess(httpServer -> { + System.out.printf("Fake Okapi listening on %s%n", server.actualPort()); + startFuture.complete(); + }) + .onFailure(startFuture::fail); } @Override @@ -64,14 +61,14 @@ public void stop(Promise stopFuture) { System.out.println("Stopping fake modules"); if (server != null) { - server.close(result -> { - if (result.succeeded()) { + server.close() + .onSuccess(v -> { System.out.printf("Stopped listening on %s%n", server.actualPort()); stopFuture.complete(); - } else { - stopFuture.fail(result.cause()); - } - }); + }) + .onFailure(throwable -> { + stopFuture.fail(throwable); + }); } } diff --git a/src/test/java/support/fakes/FakeStorageModule.java b/src/test/java/support/fakes/FakeStorageModule.java index 22a2c73cc..832a9a8f9 100644 --- a/src/test/java/support/fakes/FakeStorageModule.java +++ b/src/test/java/support/fakes/FakeStorageModule.java @@ -288,7 +288,7 @@ private void getMany(RoutingContext routingContext) { private void retrieveMany(RoutingContext routingContext) { WebContext context = new WebContext(routingContext); - var requestBody = routingContext.getBodyAsJson(); + var requestBody = routingContext.body().asJsonObject(); var limit = requestBody.getInteger("limit"); var offset = requestBody.getInteger("offset"); @@ -373,14 +373,14 @@ private Map getResourcesForTenant(WebContext context) { private static JsonObject getJsonFromBody(RoutingContext routingContext) { if (hasBody(routingContext)) { - return routingContext.getBodyAsJson(); + return routingContext.body().asJsonObject(); } else { return new JsonObject(); } } private static boolean hasBody(RoutingContext routingContext) { - return StringUtils.isNotBlank(routingContext.getBodyAsString()); + return StringUtils.isNotBlank(routingContext.body().asString()); } private void checkTokenHeader(RoutingContext routingContext) { @@ -528,7 +528,7 @@ private void getByExternalId(RoutingContext routingContext) { } private void emulateFailure(RoutingContext routingContext) { - endpointFailureDescriptor = routingContext.getBodyAsJson() + endpointFailureDescriptor = routingContext.body().asJsonObject() .mapTo(EndpointFailureDescriptor.class); routingContext.response().setStatusCode(201).end();