Skip to content

Commit

Permalink
simplify code by using method reference
Browse files Browse the repository at this point in the history
  • Loading branch information
halber committed Jun 19, 2023
1 parent 58d9c25 commit 2a7f900
Showing 1 changed file with 74 additions and 72 deletions.
146 changes: 74 additions & 72 deletions src/test/java/io/neonbee/NeonBeeExtension.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,16 @@ public Object resolveParameter(ParameterContext parameterContext, ExtensionConte
throw new ParameterResolutionException("Error while finding a free port for server verticle.", e);
}

return unpack(store(extensionContext).getOrComputeIfAbsent(options.getInstanceName(),
key -> new ScopedObject<NeonBee>(
createNeonBee(options,
neonBeeInstanceConfiguration.map(NeonBeeInstanceConfiguration::clusterManager)
.orElse(NeonBeeInstanceConfiguration.ClusterManager.FAKE)),
closeNeonBee())));
return unpack(store(extensionContext)
.getOrComputeIfAbsent(
options.getInstanceName(),
key -> new ScopedObject<>(
createNeonBee(
options,
neonBeeInstanceConfiguration
.map(NeonBeeInstanceConfiguration::clusterManager)
.orElse(NeonBeeInstanceConfiguration.ClusterManager.FAKE)),
this::closeNeonBee)));
}
if (type == VertxTestContext.class) {
return newTestContext(extensionContext);
Expand Down Expand Up @@ -294,82 +298,80 @@ private NeonBee createNeonBee(NeonBeeOptions options, NeonBeeInstanceConfigurati
}

@SuppressWarnings("FutureReturnValueIgnored")
private ThrowingConsumer<NeonBee> closeNeonBee() {
return neonBee -> {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Throwable> errorBox = new AtomicReference<>();

// additional logic for tests, due to Hazelcast / Infinispan clusters tend to get stuck, after test
// execution finishes, thus we forcefully will terminate the clusters at some point in time
Vertx vertx = neonBee.getVertx();
ClusterManager clusterManager = vertx instanceof VertxImpl ? ((VertxImpl) vertx).getClusterManager() : null;
if (clusterManager != null) {
Executors.newSingleThreadScheduledExecutor(runnable -> {
Thread thread = new Thread(runnable, "neonbee-cluster-terminator");
thread.setDaemon(true);
return thread;
}).schedule(() -> {
if (clusterManager instanceof HazelcastClusterManager) {
LifecycleService clusterLifecycleService =
((HazelcastClusterManager) clusterManager).getHazelcastInstance().getLifecycleService();

if (clusterLifecycleService.isRunning()) {
LOGGER.warn("Forcefully terminating Hazelcast cluster after test");
private void closeNeonBee(NeonBee neonBee) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Throwable> errorBox = new AtomicReference<>();

// additional logic for tests, due to Hazelcast / Infinispan clusters tend to get stuck, after test
// execution finishes, thus we forcefully will terminate the clusters at some point in time
Vertx vertx = neonBee.getVertx();
ClusterManager clusterManager = vertx instanceof VertxImpl ? ((VertxImpl) vertx).getClusterManager() : null;
if (clusterManager != null) {
Executors.newSingleThreadScheduledExecutor(runnable -> {
Thread thread = new Thread(runnable, "neonbee-cluster-terminator");
thread.setDaemon(true);
return thread;
}).schedule(() -> {
if (clusterManager instanceof HazelcastClusterManager) {
LifecycleService clusterLifecycleService =
((HazelcastClusterManager) clusterManager).getHazelcastInstance().getLifecycleService();

if (clusterLifecycleService.isRunning()) {
LOGGER.warn("Forcefully terminating Hazelcast cluster after test");
}

// terminate the cluster in any case, if already terminated, this call will do nothing
clusterLifecycleService.terminate();
} else if (clusterManager instanceof InfinispanClusterManager) {
BasicCacheContainer cacheContainer =
((InfinispanClusterManager) clusterManager).getCacheContainer();

if (cacheContainer instanceof EmbeddedCacheManager) {
if (!ComponentStatus.TERMINATED
.equals(((EmbeddedCacheManager) cacheContainer).getStatus())) {
LOGGER.warn("Forcefully terminating Infinispan cache manager after test");
}

// terminate the cluster in any case, if already terminated, this call will do nothing
clusterLifecycleService.terminate();
} else if (clusterManager instanceof InfinispanClusterManager) {
BasicCacheContainer cacheContainer =
((InfinispanClusterManager) clusterManager).getCacheContainer();

if (cacheContainer instanceof EmbeddedCacheManager) {
if (!ComponentStatus.TERMINATED
.equals(((EmbeddedCacheManager) cacheContainer).getStatus())) {
LOGGER.warn("Forcefully terminating Infinispan cache manager after test");
}

// terminate the cluster in any case, if already terminated, this call will do nothing
try {
((Closeable) cacheContainer).close();
} catch (IOException e) {
LOGGER.warn("Failed to close Infinispan cluster manager after test", e);
}
} else if (cacheContainer != null) {
LOGGER.warn("Unknown Infinispan cache container type {}, cannot check for termination",
cacheContainer.getClass());
try {
((Closeable) cacheContainer).close();
} catch (IOException e) {
LOGGER.warn("Failed to close Infinispan cluster manager after test", e);
}
} else if (cacheContainer != null) {
LOGGER.warn("Unknown Infinispan cache container type {}, cannot check for termination",
cacheContainer.getClass());
}
// do not reset the FakeClusterManager here, as 10 seconds after test, another test could already be
// using another instance of the FakeClusterManager, which accesses the same static variables
}, 10, TimeUnit.SECONDS);
}

neonBee.getVertx().close().onComplete(result -> {
if (result.failed()) {
errorBox.set(result.cause());
}

// if we run with a FakeClusterManager we need to reset it after the test
if (clusterManager instanceof FakeClusterManager) {
FakeClusterManager.reset();
}
// do not reset the FakeClusterManager here, as 10 seconds after test, another test could already be
// using another instance of the FakeClusterManager, which accesses the same static variables
}, 10, TimeUnit.SECONDS);
}

latch.countDown();
});
neonBee.getVertx().close().onComplete(result -> {
if (result.failed()) {
errorBox.set(result.cause());
}

if (!latch.await(DEFAULT_TIMEOUT_DURATION, DEFAULT_TIMEOUT_UNIT)) {
throw new TimeoutException("Closing the Vertx context timed out");
// if we run with a FakeClusterManager we need to reset it after the test
if (clusterManager instanceof FakeClusterManager) {
FakeClusterManager.reset();
}

Throwable throwable = errorBox.get();
if (throwable != null) {
if (throwable instanceof Exception) {
throw (Exception) throwable;
} else {
throw new VertxException(throwable);
}
latch.countDown();
});

if (!latch.await(DEFAULT_TIMEOUT_DURATION, DEFAULT_TIMEOUT_UNIT)) {
throw new TimeoutException("Closing the Vertx context timed out");
}

Throwable throwable = errorBox.get();
if (throwable != null) {
if (throwable instanceof Exception) {
throw (Exception) throwable;
} else {
throw new VertxException(throwable);
}
};
}
}
}

0 comments on commit 2a7f900

Please sign in to comment.