Skip to content

Add support for awaiting CommandStartedEvent in Unified Test Runner. #1790

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -63,6 +64,7 @@ public class TestCommandListener implements CommandListener {
private final TestListener listener;
private final Lock lock = new ReentrantLock();
private final Condition commandCompletedCondition = lock.newCondition();
private final Condition commandAnyEventCondition = lock.newCondition();
private final boolean observeSensitiveCommands;
private boolean ignoreNextSucceededOrFailedEvent;
private static final CodecRegistry CODEC_REGISTRY_HACK;
Expand Down Expand Up @@ -223,22 +225,12 @@ private <T extends CommandEvent> List<T> getEvents(final Class<T> type,
}
}

public List<CommandStartedEvent> waitForStartedEvents(final int numEvents) {
lock.lock();
try {
while (!hasCompletedEvents(numEvents)) {
try {
if (!commandCompletedCondition.await(TIMEOUT, TimeUnit.SECONDS)) {
throw new MongoTimeoutException("Timeout waiting for event");
}
} catch (InterruptedException e) {
throw interruptAndCreateMongoInterruptedException("Interrupted waiting for event", e);
}
}
return getEvents(CommandStartedEvent.class, numEvents);
} finally {
lock.unlock();
}
private <T extends CommandEvent> long getEventCount(final Class<T> eventClass, final Predicate<T> matcher) {
return getEvents().stream()
.filter(eventClass::isInstance)
.map(eventClass::cast)
.filter(matcher)
.count();
}

public void waitForFirstCommandCompletion() {
Expand Down Expand Up @@ -287,6 +279,7 @@ else if (!observeSensitiveCommands) {
addEvent(new CommandStartedEvent(event.getRequestContext(), event.getOperationId(), event.getRequestId(),
event.getConnectionDescription(), event.getDatabaseName(), event.getCommandName(),
event.getCommand() == null ? null : getWritableClone(event.getCommand())));
commandAnyEventCondition.signal();
} finally {
lock.unlock();
}
Expand All @@ -312,6 +305,7 @@ else if (!observeSensitiveCommands) {
event.getResponse() == null ? null : event.getResponse().clone(),
event.getElapsedTime(TimeUnit.NANOSECONDS)));
commandCompletedCondition.signal();
commandAnyEventCondition.signal();
} finally {
lock.unlock();
}
Expand All @@ -334,6 +328,7 @@ else if (!observeSensitiveCommands) {
try {
addEvent(event);
commandCompletedCondition.signal();
commandAnyEventCondition.signal();
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -428,4 +423,22 @@ private void assertEquivalence(final CommandStartedEvent actual, final CommandSt
assertEquals(expected.getDatabaseName(), actual.getDatabaseName());
assertEquals(expected.getCommand(), actual.getCommand());
}

public <T extends CommandEvent> void waitForEvents(final Class<T> eventClass, final Predicate<T> matcher, final int count)
throws TimeoutException {
lock.lock();
try {
while (getEventCount(eventClass, matcher) < count) {
try {
if (!commandAnyEventCondition.await(TIMEOUT, TimeUnit.SECONDS)) {
throw new MongoTimeoutException("Timeout waiting for command event");
}
} catch (InterruptedException e) {
throw interruptAndCreateMongoInterruptedException("Interrupted waiting for event", e);
}
}
} finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ public String toString() {
};
}

public static ContextElement ofWaitForCommandEvents(final String client, final BsonDocument commandEvent, final int count) {
return new EventCountContext("Wait For Command Events", client, commandEvent, count);
}

public static ContextElement ofTopologyEvents(final String client, final BsonArray expectedEvents,
final List<?> actualEvents) {
return new ContextElement() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.mongodb.event.ServerHeartbeatSucceededEvent;
import com.mongodb.event.TestServerMonitorListener;
import com.mongodb.internal.connection.TestClusterListener;
import com.mongodb.internal.connection.TestCommandListener;
import com.mongodb.internal.connection.TestConnectionPoolListener;
import com.mongodb.internal.connection.TestServerListener;
import com.mongodb.lang.NonNull;
Expand Down Expand Up @@ -223,6 +224,26 @@ public void waitForConnectionPoolEvents(final String client, final BsonDocument
}
}

public void waitForCommandEvents(final String clientId, final BsonDocument expectedCommandEvent, int count,
final TestCommandListener clientCommandListener) {
context.push(ContextElement.ofWaitForCommandEvents(clientId, expectedCommandEvent, count));
try {
switch (expectedCommandEvent.getFirstKey()) {
case "commandStartedEvent":
BsonDocument properties = expectedCommandEvent.getDocument(expectedCommandEvent.getFirstKey());
String commandName = properties.getString("commandName").getValue();
clientCommandListener.waitForEvents(CommandStartedEvent.class, commandStartedEvent ->
commandName.equals(commandStartedEvent.getCommandName()), count);
break;
default:
throw new UnsupportedOperationException("Unsupported event: " + expectedCommandEvent.getFirstKey());
}
context.pop();
} catch (TimeoutException e) {
fail(context.getMessage("Timed out waiting for connection pool events"));
}
}

public void assertConnectionPoolEventCount(final String client, final BsonDocument event, final int count, final List<Object> events) {
context.push(ContextElement.ofConnectionPoolEventCount(client, event, count));
Class<?> eventClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,9 @@ private OperationResult executeWaitForEvent(final UnifiedTestContext context, fi
context.getEventMatcher().waitForServerMonitorEvents(clientId, TestServerMonitorListener.eventType(eventName), event, count,
entities.getServerMonitorListener(clientId));
break;
case "commandStartedEvent":
context.getEventMatcher().waitForCommandEvents(clientId, event, count, entities.getClientCommandListener(clientId));
break;
default:
throw new UnsupportedOperationException("Unsupported event: " + eventName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@ public static void applyCustomizations(final TestDef def) {
.test("client-side-operations-timeout", "timeoutMS behaves correctly for GridFS download operations",
"timeoutMS applied to entire download, not individual parts");

def.skipJira("https://jira.mongodb.org/browse/JAVA-5815")
.test("client-side-operations-timeout", "WaitQueueTimeoutError does not clear the pool",
"WaitQueueTimeoutError does not clear the pool");

def.skipJira("https://jira.mongodb.org/browse/JAVA-5491")
.testContains("client-side-operations-timeout", "dropIndex")
.when(() -> !serverVersionLessThan(8, 3))
Expand Down