Skip to content

Commit

Permalink
Add docs about withQueryFn, logic to detect other functions, and new … (
Browse files Browse the repository at this point in the history
#34127)

* Add docs about withQueryFn, logic to detect other functions, and new FindQueryTest class

* switch check logic to expand phase and update test and update doc links

* remove exception thrown in split method

* revert line change to split

* change out exception thrown

* fix spotless java failures
  • Loading branch information
derrickaw authored Mar 3, 2025
1 parent a994291 commit 468aa4e
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -141,6 +142,11 @@ public class MongoDbIO {

private static final Logger LOG = LoggerFactory.getLogger(MongoDbIO.class);

public static final String ERROR_MSG_QUERY_FN =
" class is not supported. "
+ "Please provide one of the predefined classes in the MongoDbIO package "
+ "such as FindQuery or AggregationQuery.";

/** Read data from MongoDB. */
public static Read read() {
return new AutoValue_MongoDbIO_Read.Builder()
Expand Down Expand Up @@ -312,9 +318,16 @@ public Read withBucketAuto(boolean bucketAuto) {
return builder().setBucketAuto(bucketAuto).build();
}

/** Sets a queryFn. */
/**
* Sets a queryFn. The provided queryFn must be one of the predefined classes in the MongoDbIO
* package such as {@link FindQuery#FindQuery} or {@link AggregationQuery#AggregationQuery}.
*/
public Read withQueryFn(
SerializableFunction<MongoCollection<Document>, MongoCursor<Document>> queryBuilderFn) {
checkArgument(
Arrays.asList(AutoValue_FindQuery.class, AutoValue_AggregationQuery.class)
.contains(queryBuilderFn.getClass()),
String.format("[%s]" + ERROR_MSG_QUERY_FN, queryBuilderFn.getClass().getName()));
return builder().setQueryFn(queryBuilderFn).build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.mongodb;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.auto.value.AutoValue;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Projections;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.dataflow.qual.Pure;

/** Builds a MongoDB FindQueryTest object. */
@AutoValue
public abstract class FindQueryTest
implements SerializableFunction<MongoCollection<Document>, MongoCursor<Document>> {

@Pure
abstract @Nullable BsonDocument filters();

@Pure
abstract int limit();

@Pure
abstract List<String> projection();

private static Builder builder() {
return new AutoValue_FindQueryTest.Builder()
.setLimit(0)
.setProjection(Collections.emptyList())
.setFilters(new BsonDocument());
}

abstract Builder toBuilder();

public static FindQueryTest create() {
return builder().build();
}

@AutoValue.Builder
abstract static class Builder {
abstract Builder setFilters(@Nullable BsonDocument filters);

abstract Builder setLimit(int limit);

abstract Builder setProjection(List<String> projection);

abstract FindQueryTest build();
}

/** Sets the filters to find. */
private FindQueryTest withFilters(BsonDocument filters) {
return toBuilder().setFilters(filters).build();
}

/** Convert the Bson filters into a BsonDocument via default encoding. */
static BsonDocument bson2BsonDocument(Bson filters) {
return filters.toBsonDocument(BasicDBObject.class, MongoClient.getDefaultCodecRegistry());
}

/** Sets the filters to find. */
public FindQueryTest withFilters(Bson filters) {
return withFilters(bson2BsonDocument(filters));
}

/** Sets the limit of documents to find. */
public FindQueryTest withLimit(int limit) {
return toBuilder().setLimit(limit).build();
}

/** Sets the projection. */
public FindQueryTest withProjection(List<String> projection) {
checkArgument(projection != null, "projection can not be null");
return toBuilder().setProjection(projection).build();
}

@Override
public MongoCursor<Document> apply(MongoCollection<Document> collection) {
return collection
.find()
.filter(filters())
.limit(limit())
.projection(Projections.include(projection()))
.iterator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -83,6 +84,8 @@ public class MongoDbIOTest {

@Rule public final TestPipeline pipeline = TestPipeline.create();

@Rule public transient ExpectedException thrown = ExpectedException.none();

@BeforeClass
public static void beforeClass() throws Exception {
port = NetworkTestHelper.getAvailableLocalPort();
Expand Down Expand Up @@ -422,6 +425,20 @@ public void testUpdate() {
assertEquals("India", out.get("country"));
}

@Test
public void testUnknownQueryFnClass() throws IllegalArgumentException {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
"[org.apache.beam.sdk.io.mongodb.AutoValue_FindQueryTest]" + MongoDbIO.ERROR_MSG_QUERY_FN);

pipeline.apply(
MongoDbIO.read()
.withUri("mongodb://localhost:" + port)
.withDatabase(DATABASE_NAME)
.withCollection(COLLECTION_NAME)
.withQueryFn(FindQueryTest.create().withFilters(Filters.eq("scientist", "Einstein"))));
}

private static List<Document> createDocuments(final int n, boolean addId) {
final String[] scientists =
new String[] {
Expand Down

0 comments on commit 468aa4e

Please sign in to comment.