Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions presto-mongodb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<mongo-java.version>3.12.14</mongo-java.version>
<mongo-java.version>5.6.5</mongo-java.version>
<mongo-server.version>1.47.0</mongo-server.version>
<air.check.skip-modernizer>true</air.check.skip-modernizer>
</properties>
Expand All @@ -28,7 +28,7 @@

<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<artifactId>mongodb-driver-sync</artifactId>
<version>${mongo-java.version}</version>
<exclusions>
<exclusion>
Expand All @@ -38,6 +38,18 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-core</artifactId>
<version>${mongo-java.version}</version>
</dependency>

<dependency>
<groupId>org.mongodb</groupId>
<artifactId>bson</artifactId>
<version>${mongo-java.version}</version>
</dependency>

<dependency>
<groupId>jakarta.validation</groupId>
<artifactId>jakarta.validation-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,16 @@
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientOptions;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.ReadPreference;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import jakarta.inject.Singleton;

import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
import static java.util.Objects.requireNonNull;

Expand All @@ -46,38 +52,74 @@ public static MongoSession createMongoSession(TypeManager typeManager, MongoClie
{
requireNonNull(config, "config is null");

MongoClientOptions.Builder options = MongoClientOptions.builder()
.connectionsPerHost(config.getConnectionsPerHost())
.connectTimeout(config.getConnectionTimeout())
.socketTimeout(config.getSocketTimeout())
.socketKeepAlive(config.getSocketKeepAlive())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

socketKeepAlive is removed?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The socketKeepAlive() method has been removed from the driver API

  • Socket keep-alive is now always enabled by default and controlled at the OS level

.maxWaitTime(config.getMaxWaitTime())
.minConnectionsPerHost(config.getMinConnectionsPerHost())
.writeConcern(config.getWriteConcern().getWriteConcern());
MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder();

String connectionString = buildConnectionString(config);
settingsBuilder.applyConnectionString(new ConnectionString(connectionString));

if (!config.getCredentials().isEmpty()) {
if (config.getCredentials().size() > 1) {
throw new IllegalArgumentException(
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this okay to fail if multiple credentials are present?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what the intention was for having credential config as a list?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The list of credentials might be introduced to support different credentials per seed/host.

But in the current version of mongodb driver, (3.12.14) itself, the all the MongoClient constructors with list of credentials were deprecated.
Image

"Multiple credentials are not supported. Only one credential can be configured.");
}
settingsBuilder.credential(config.getCredentials().get(0));
}

settingsBuilder.applyToConnectionPoolSettings(builder -> builder
.maxSize(config.getConnectionsPerHost())
.minSize(config.getMinConnectionsPerHost())
.maxWaitTime(config.getMaxWaitTime(), TimeUnit.MILLISECONDS));

settingsBuilder.applyToSocketSettings(builder -> builder
.connectTimeout(config.getConnectionTimeout(), TimeUnit.MILLISECONDS)
.readTimeout(config.getSocketTimeout(), TimeUnit.MILLISECONDS));

settingsBuilder.writeConcern(config.getWriteConcern().getWriteConcern());

settingsBuilder.readPreference(configureReadPreference(config));

if (config.getRequiredReplicaSetName() != null) {
options.requiredReplicaSetName(config.getRequiredReplicaSetName());
settingsBuilder.applyToClusterSettings(builder ->
builder.requiredReplicaSetName(config.getRequiredReplicaSetName()));
}

configureReadPreference(options, config);
configureSsl(options, config);
configureSsl(settingsBuilder, config);

MongoClient client = new MongoClient(config.getSeeds(), config.getCredentials(), options.build());
MongoClient client = MongoClients.create(settingsBuilder.build());

return new MongoSession(typeManager, client, config);
}

private static void configureReadPreference(MongoClientOptions.Builder options, MongoClientConfig config)
private static String buildConnectionString(MongoClientConfig config)
{
StringBuilder connectionString = new StringBuilder("mongodb://");

connectionString.append(config.getSeeds().stream()
.map(addr -> addr.getHost() + ":" + addr.getPort())
.collect(Collectors.joining(",")));

if (!config.getCredentials().isEmpty()) {
connectionString.append("/")
.append(config.getCredentials().get(0).getSource());
}

// Enable replica set discovery when replica set name is configured or multiple seeds are provided
if (config.getRequiredReplicaSetName() != null || config.getSeeds().size() > 1) {
connectionString.append("?directConnection=false");
}
return connectionString.toString();
}
private static ReadPreference configureReadPreference(MongoClientConfig config)
{
if (config.getReadPreferenceTags().isEmpty()) {
options.readPreference(config.getReadPreference().getReadPreference());
return config.getReadPreference().getReadPreference();
}
else {
options.readPreference(config.getReadPreference().getReadPreferenceWithTags(config.getReadPreferenceTags()));
return config.getReadPreference().getReadPreferenceWithTags(config.getReadPreferenceTags());
}
}

private static void configureSsl(MongoClientOptions.Builder options, MongoClientConfig config)
private static void configureSsl(MongoClientSettings.Builder settings, MongoClientConfig config)
{
if (config.isTlsEnabled()) {
SslContextProvider sslContextProvider = new SslContextProvider(
Expand All @@ -88,8 +130,9 @@ private static void configureSsl(MongoClientOptions.Builder options, MongoClient

sslContextProvider.buildSslContext()
.ifPresent(sslContext -> {
options.sslContext(sslContext);
options.sslEnabled(true);
settings.applyToSslSettings(builder -> builder
.enabled(true)
.context(sslContext));
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.mongodb.MongoClient;
import com.mongodb.MongoNamespace;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,8 @@
public enum WriteConcernType
{
ACKNOWLEDGED(WriteConcern.ACKNOWLEDGED),
FSYNC_SAFE(WriteConcern.FSYNC_SAFE),
FSYNCED(WriteConcern.FSYNCED),
JOURNAL_SAFE(WriteConcern.JOURNAL_SAFE),
JOURNALED(WriteConcern.JOURNALED),
MAJORITY(WriteConcern.MAJORITY),
NORMAL(WriteConcern.NORMAL),
REPLICA_ACKNOWLEDGED(WriteConcern.REPLICA_ACKNOWLEDGED),
REPLICAS_SAFE(WriteConcern.REPLICAS_SAFE),
SAFE(WriteConcern.SAFE),
UNACKNOWLEDGED(WriteConcern.UNACKNOWLEDGED);

private final WriteConcern writeConcern;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import com.facebook.presto.tpch.TpchPlugin;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.mongodb.MongoClient;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import de.bwaldvogel.mongo.MongoServer;
import io.airlift.tpch.TpchTable;

Expand Down Expand Up @@ -48,7 +48,7 @@ private MongoQueryRunner(Session session, int workers)

server = new MongoServer(new SyncMemoryBackend());
address = server.bind();
client = new MongoClient(new ServerAddress(address));
client = MongoClients.create("mongodb://" + address.getHostString() + ":" + address.getPort());
}

public static MongoQueryRunner createMongoQueryRunner(TpchTable<?>... tables)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.google.common.collect.ImmutableMap;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoClient;
import io.airlift.tpch.TpchTable;
import org.bson.Document;
import org.testng.annotations.AfterClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@

import com.facebook.presto.plugin.base.security.SslContextProvider;
import com.facebook.presto.tests.SslKeystoreManager;
import com.mongodb.MongoClientOptions;
import com.mongodb.MongoClientSettings;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import javax.net.ssl.SSLContext;

import java.io.File;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static com.facebook.presto.tests.SslKeystoreManager.SSL_STORE_PASSWORD;
import static com.facebook.presto.tests.SslKeystoreManager.getKeystorePath;
Expand Down Expand Up @@ -124,66 +125,82 @@ public void testSslContextProviderIntegration()
}

@Test
public void testMongoClientOptionsWithTlsEnabled()
public void testMongoClientSettingsWithTlsEnabled()
{
MongoClientConfig config = new MongoClientConfig()
.setSeeds("localhost:27017");
configureTlsProperties(config);

MongoClientOptions.Builder optionsBuilder = MongoClientOptions.builder()
.connectionsPerHost(config.getConnectionsPerHost())
.connectTimeout(config.getConnectionTimeout())
.socketTimeout(config.getSocketTimeout())
.socketKeepAlive(config.getSocketKeepAlive())
.maxWaitTime(config.getMaxWaitTime())
.minConnectionsPerHost(config.getMinConnectionsPerHost())
.writeConcern(config.getWriteConcern().getWriteConcern());
MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder();

// Connection pool settings
settingsBuilder.applyToConnectionPoolSettings(builder -> builder
.maxSize(config.getConnectionsPerHost())
.minSize(config.getMinConnectionsPerHost())
.maxWaitTime(config.getMaxWaitTime(), TimeUnit.MILLISECONDS));

// Socket settings
settingsBuilder.applyToSocketSettings(builder -> builder
.connectTimeout(config.getConnectionTimeout(), TimeUnit.MILLISECONDS)
.readTimeout(config.getSocketTimeout(), TimeUnit.MILLISECONDS));

// Write concern
settingsBuilder.writeConcern(config.getWriteConcern().getWriteConcern());

// Configure SSL
if (config.isTlsEnabled()) {
SslContextProvider sslContextProvider = createSslContextProvider(config);

sslContextProvider.buildSslContext().ifPresent(sslContext -> {
optionsBuilder.sslContext(sslContext);
optionsBuilder.sslEnabled(true);
settingsBuilder.applyToSslSettings(builder -> builder
.enabled(true)
.context(sslContext));
});
}

MongoClientOptions options = optionsBuilder.build();
MongoClientSettings settings = settingsBuilder.build();

assertTrue(options.isSslEnabled(), "SSL should be enabled in MongoClientOptions");
assertNotNull(options.getSslContext(), "SSL context should be set in MongoClientOptions");
assertTrue(settings.getSslSettings().isEnabled(), "SSL should be enabled in MongoClientSettings");
assertNotNull(settings.getSslSettings().getContext(), "SSL context should be set in MongoClientSettings");
}

@Test
public void testMongoClientOptionsWithTlsDisabled()
public void testMongoClientSettingsWithTlsDisabled()
{
MongoClientConfig config = new MongoClientConfig()
.setSeeds("localhost:27017")
.setTlsEnabled(false);

MongoClientOptions.Builder optionsBuilder = MongoClientOptions.builder()
.connectionsPerHost(config.getConnectionsPerHost())
.connectTimeout(config.getConnectionTimeout())
.socketTimeout(config.getSocketTimeout())
.socketKeepAlive(config.getSocketKeepAlive())
.maxWaitTime(config.getMaxWaitTime())
.minConnectionsPerHost(config.getMinConnectionsPerHost())
.writeConcern(config.getWriteConcern().getWriteConcern());
MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder();

// Connection pool settings
settingsBuilder.applyToConnectionPoolSettings(builder -> builder
.maxSize(config.getConnectionsPerHost())
.minSize(config.getMinConnectionsPerHost())
.maxWaitTime(config.getMaxWaitTime(), TimeUnit.MILLISECONDS));

// Socket settings
settingsBuilder.applyToSocketSettings(builder -> builder
.connectTimeout(config.getConnectionTimeout(), TimeUnit.MILLISECONDS)
.readTimeout(config.getSocketTimeout(), TimeUnit.MILLISECONDS));

// Write concern
settingsBuilder.writeConcern(config.getWriteConcern().getWriteConcern());

// Configure SSL
if (config.isTlsEnabled()) {
SslContextProvider sslContextProvider = createSslContextProvider(config);

sslContextProvider.buildSslContext().ifPresent(sslContext -> {
optionsBuilder.sslContext(sslContext);
optionsBuilder.sslEnabled(true);
settingsBuilder.applyToSslSettings(builder -> builder
.enabled(true)
.context(sslContext));
});
}

MongoClientOptions options = optionsBuilder.build();
MongoClientSettings settings = settingsBuilder.build();

assertFalse(options.isSslEnabled(), "SSL should be disabled in MongoClientOptions");
assertFalse(settings.getSslSettings().isEnabled(), "SSL should be disabled in MongoClientSettings");
}

@Test
Expand Down Expand Up @@ -245,24 +262,30 @@ public void testFullMongoClientCreationFlow()
Optional<SSLContext> sslContext = sslContextProvider.buildSslContext();
assertTrue(sslContext.isPresent(), "SSL context should be created");

// Build MongoClientOptions
MongoClientOptions.Builder optionsBuilder = MongoClientOptions.builder()
.connectionsPerHost(config.getConnectionsPerHost())
.connectTimeout(config.getConnectionTimeout())
.readPreference(config.getReadPreference().getReadPreference());
// Build MongoClientSettings
MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder();

settingsBuilder.applyToConnectionPoolSettings(builder -> builder
.maxSize(config.getConnectionsPerHost()));

settingsBuilder.applyToSocketSettings(builder -> builder
.connectTimeout(config.getConnectionTimeout(), TimeUnit.MILLISECONDS));

settingsBuilder.readPreference(config.getReadPreference().getReadPreference());

sslContext.ifPresent(ctx -> {
optionsBuilder.sslContext(ctx);
optionsBuilder.sslEnabled(true);
settingsBuilder.applyToSslSettings(builder -> builder
.enabled(true)
.context(ctx));
});

MongoClientOptions options = optionsBuilder.build();
MongoClientSettings settings = settingsBuilder.build();

// Verify final options
assertTrue(options.isSslEnabled(), "SSL should be enabled");
assertNotNull(options.getSslContext(), "SSL context should be set");
assertEquals(options.getConnectionsPerHost(), 50);
assertEquals(options.getConnectTimeout(), 5000);
// Verify final settings
assertTrue(settings.getSslSettings().isEnabled(), "SSL should be enabled");
assertNotNull(settings.getSslSettings().getContext(), "SSL context should be set");
assertEquals(settings.getConnectionPoolSettings().getMaxSize(), 50);
assertEquals(settings.getSocketSettings().getConnectTimeout(TimeUnit.MILLISECONDS), 5000);
}

@Test
Expand Down
Loading
Loading