diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java b/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java index 704dea56f4..9381ad842a 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java @@ -40,6 +40,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -63,6 +64,7 @@ public class TestCommandListener implements CommandListener { private final TestListener listener; private final Lock lock = new ReentrantLock(); private final Condition commandCompletedCondition = lock.newCondition(); + private final Condition commandAnyEventCondition = lock.newCondition(); private final boolean observeSensitiveCommands; private boolean ignoreNextSucceededOrFailedEvent; private static final CodecRegistry CODEC_REGISTRY_HACK; @@ -223,22 +225,12 @@ private List getEvents(final Class type, } } - public List waitForStartedEvents(final int numEvents) { - lock.lock(); - try { - while (!hasCompletedEvents(numEvents)) { - try { - if (!commandCompletedCondition.await(TIMEOUT, TimeUnit.SECONDS)) { - throw new MongoTimeoutException("Timeout waiting for event"); - } - } catch (InterruptedException e) { - throw interruptAndCreateMongoInterruptedException("Interrupted waiting for event", e); - } - } - return getEvents(CommandStartedEvent.class, numEvents); - } finally { - lock.unlock(); - } + private long getEventCount(final Class eventClass, final Predicate matcher) { + return getEvents().stream() + .filter(eventClass::isInstance) + .map(eventClass::cast) + .filter(matcher) + .count(); } public void waitForFirstCommandCompletion() { @@ -287,6 +279,7 @@ else if (!observeSensitiveCommands) { addEvent(new CommandStartedEvent(event.getRequestContext(), event.getOperationId(), event.getRequestId(), event.getConnectionDescription(), event.getDatabaseName(), event.getCommandName(), event.getCommand() == null ? null : getWritableClone(event.getCommand()))); + commandAnyEventCondition.signal(); } finally { lock.unlock(); } @@ -312,6 +305,7 @@ else if (!observeSensitiveCommands) { event.getResponse() == null ? null : event.getResponse().clone(), event.getElapsedTime(TimeUnit.NANOSECONDS))); commandCompletedCondition.signal(); + commandAnyEventCondition.signal(); } finally { lock.unlock(); } @@ -334,6 +328,7 @@ else if (!observeSensitiveCommands) { try { addEvent(event); commandCompletedCondition.signal(); + commandAnyEventCondition.signal(); } finally { lock.unlock(); } @@ -428,4 +423,22 @@ private void assertEquivalence(final CommandStartedEvent actual, final CommandSt assertEquals(expected.getDatabaseName(), actual.getDatabaseName()); assertEquals(expected.getCommand(), actual.getCommand()); } + + public void waitForEvents(final Class eventClass, final Predicate matcher, final int count) + throws TimeoutException { + lock.lock(); + try { + while (getEventCount(eventClass, matcher) < count) { + try { + if (!commandAnyEventCondition.await(TIMEOUT, TimeUnit.SECONDS)) { + throw new MongoTimeoutException("Timeout waiting for command event"); + } + } catch (InterruptedException e) { + throw interruptAndCreateMongoInterruptedException("Interrupted waiting for event", e); + } + } + } finally { + lock.unlock(); + } + } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java b/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java index e81ebc025d..7e6e86fb01 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java @@ -146,6 +146,10 @@ public String toString() { }; } + public static ContextElement ofWaitForCommandEvents(final String client, final BsonDocument commandEvent, final int count) { + return new EventCountContext("Wait For Command Events", client, commandEvent, count); + } + public static ContextElement ofTopologyEvents(final String client, final BsonArray expectedEvents, final List actualEvents) { return new ContextElement() { diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java index 61a418ad3a..b2718b4b2d 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java @@ -37,6 +37,7 @@ import com.mongodb.event.ServerHeartbeatSucceededEvent; import com.mongodb.event.TestServerMonitorListener; import com.mongodb.internal.connection.TestClusterListener; +import com.mongodb.internal.connection.TestCommandListener; import com.mongodb.internal.connection.TestConnectionPoolListener; import com.mongodb.internal.connection.TestServerListener; import com.mongodb.lang.NonNull; @@ -223,6 +224,26 @@ public void waitForConnectionPoolEvents(final String client, final BsonDocument } } + public void waitForCommandEvents(final String clientId, final BsonDocument expectedCommandEvent, final int count, + final TestCommandListener clientCommandListener) { + context.push(ContextElement.ofWaitForCommandEvents(clientId, expectedCommandEvent, count)); + try { + switch (expectedCommandEvent.getFirstKey()) { + case "commandStartedEvent": + BsonDocument properties = expectedCommandEvent.getDocument(expectedCommandEvent.getFirstKey()); + String commandName = properties.getString("commandName").getValue(); + clientCommandListener.waitForEvents(CommandStartedEvent.class, commandStartedEvent -> + commandName.equals(commandStartedEvent.getCommandName()), count); + break; + default: + throw new UnsupportedOperationException("Unsupported event: " + expectedCommandEvent.getFirstKey()); + } + context.pop(); + } catch (TimeoutException e) { + fail(context.getMessage("Timed out waiting for connection pool events")); + } + } + public void assertConnectionPoolEventCount(final String client, final BsonDocument event, final int count, final List events) { context.push(ContextElement.ofConnectionPoolEventCount(client, event, count)); Class eventClass; diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java index 3364cf4183..b977c80be4 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java @@ -779,6 +779,9 @@ private OperationResult executeWaitForEvent(final UnifiedTestContext context, fi context.getEventMatcher().waitForServerMonitorEvents(clientId, TestServerMonitorListener.eventType(eventName), event, count, entities.getServerMonitorListener(clientId)); break; + case "commandStartedEvent": + context.getEventMatcher().waitForCommandEvents(clientId, event, count, entities.getClientCommandListener(clientId)); + break; default: throw new UnsupportedOperationException("Unsupported event: " + eventName); } diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java index 6d559e0d66..d5cf994f92 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java @@ -76,10 +76,6 @@ public static void applyCustomizations(final TestDef def) { .test("client-side-operations-timeout", "timeoutMS behaves correctly for GridFS download operations", "timeoutMS applied to entire download, not individual parts"); - def.skipJira("https://jira.mongodb.org/browse/JAVA-5815") - .test("client-side-operations-timeout", "WaitQueueTimeoutError does not clear the pool", - "WaitQueueTimeoutError does not clear the pool"); - def.skipJira("https://jira.mongodb.org/browse/JAVA-5491") .testContains("client-side-operations-timeout", "dropIndex") .when(() -> !serverVersionLessThan(8, 3))