diff --git a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/config/SinkConfig.java b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/config/SinkConfig.java index 6c4ae7363..fc5f2f2ac 100644 --- a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/config/SinkConfig.java +++ b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/config/SinkConfig.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; import java.util.function.Function; import java.util.function.Predicate; @@ -61,6 +63,10 @@ public class SinkConfig { STREAMING_BATCH_MAX_BYTES, STREAMING_BATCH_MAX_DELAY, STREAMING_BATCH_MAX_MESSAGES, STREAMING_DOCTYPES, STRICT_SCHEMA_DOCTYPES); + // Executor for CompletableFuture::*Async to use instead of ForkJoinPool.commonPool(), because + // the default parallelism is 1 in stage and prod. + private static final Executor DEFAULT_EXECUTOR = new ForkJoinPool( + Math.max(Runtime.getRuntime().availableProcessors() * 2, 10)); // BigQuery.Write.Batch.getByteSize reports protobuf size, which can be ~1/3rd more // efficient than the JSON that actually gets sent over HTTP, so we use to 60% of the // 10MB API limit by default. @@ -85,7 +91,7 @@ public class SinkConfig { // to have a total pipeline delay (including edge and decoder) of less than 1 hour, and ideally // less than 10 minutes, so this plus DEFAULT_LOAD_MAX_DELAY should be less than 10 minutes. // Messages may be kept in memory until they ack or nack, so too much delay can cause OOM errors. - private static final String DEFAULT_BATCH_MAX_DELAY = "10s"; // 10 seconds + private static final String DEFAULT_BATCH_MAX_DELAY = "1m"; // 1 minute // BigQuery Load API limits maximum bytes per request to 15TB, but load requests for clustered // tables fail when sorting that much data, so to avoid that issue the default is lower. private static final long DEFAULT_LOAD_MAX_BYTES = 100_000_000_000L; // 100GB @@ -134,10 +140,8 @@ private CompressPayload getOutputCompression(Env env) { @Override Output getOutput(Env env) { - return new Output(env, this, - new Pubsub.Write(env.getString(OUTPUT_TOPIC), - env.getInt(OUTPUT_TOPIC_EXECUTOR_THREADS, 1), b -> b, - getOutputCompression(env))::withoutResult); + return new Output(env, this, new Pubsub.Write(env.getString(OUTPUT_TOPIC), DEFAULT_EXECUTOR, + b -> b, getOutputCompression(env))::withoutResult); } }, @@ -150,8 +154,9 @@ Output getOutput(Env env) { env.getLong(BATCH_MAX_BYTES, DEFAULT_BATCH_MAX_BYTES), env.getInt(BATCH_MAX_MESSAGES, DEFAULT_BATCH_MAX_MESSAGES), env.getDuration(BATCH_MAX_DELAY, DEFAULT_BATCH_MAX_DELAY), - PubsubMessageToTemplatedString.of(getGcsOutputBucket(env)), getFormat(env), - ignore -> CompletableFuture.completedFuture(null)).withOpenCensusMetrics()); + PubsubMessageToTemplatedString.of(getGcsOutputBucket(env)), DEFAULT_EXECUTOR, + getFormat(env), ignore -> CompletableFuture.completedFuture(null)) + .withOpenCensusMetrics()); } }, @@ -163,7 +168,7 @@ Output getOutput(Env env) { new BigQuery.Load(getBigQueryService(env), getGcsService(env), env.getLong(LOAD_MAX_BYTES, DEFAULT_LOAD_MAX_BYTES), env.getInt(LOAD_MAX_FILES, DEFAULT_LOAD_MAX_FILES), - env.getDuration(LOAD_MAX_DELAY, DEFAULT_LOAD_MAX_DELAY), + env.getDuration(LOAD_MAX_DELAY, DEFAULT_LOAD_MAX_DELAY), DEFAULT_EXECUTOR, // don't delete files until successfully loaded BigQuery.Load.Delete.onSuccess).withOpenCensusMetrics()); } @@ -180,7 +185,7 @@ Output getOutput(Env env) { env.getInt(BATCH_MAX_MESSAGES, DEFAULT_BATCH_MAX_MESSAGES), env.getDuration(BATCH_MAX_DELAY, DEFAULT_BATCH_MAX_DELAY), PubsubMessageToTemplatedString.forBigQuery(getBigQueryOutputBucket(env)), - getFormat(env), + DEFAULT_EXECUTOR, getFormat(env), // BigQuery Load API limits maximum load requests per table per day to 1,000 so send // blobInfo to pubsub and require loads be run separately to reduce maximum latency blobInfo -> pubsubWrite.apply(BlobInfoToPubsubMessage.apply(blobInfo))) @@ -198,7 +203,7 @@ Output getOutput(Env env) { env.getInt(BATCH_MAX_MESSAGES, DEFAULT_STREAMING_BATCH_MAX_MESSAGES), env.getDuration(BATCH_MAX_DELAY, DEFAULT_STREAMING_BATCH_MAX_DELAY), PubsubMessageToTemplatedString.forBigQuery(env.getString(OUTPUT_TABLE)), - getFormat(env)).withOpenCensusMetrics()); + DEFAULT_EXECUTOR, getFormat(env)).withOpenCensusMetrics()); } }, @@ -217,7 +222,7 @@ Output getOutput(Env env) { bigQueryLoad = new BigQuery.Load(bigQuery, storage, env.getLong(LOAD_MAX_BYTES, DEFAULT_LOAD_MAX_BYTES), env.getInt(LOAD_MAX_FILES, DEFAULT_LOAD_MAX_FILES), - env.getDuration(LOAD_MAX_DELAY, DEFAULT_STREAMING_LOAD_MAX_DELAY), + env.getDuration(LOAD_MAX_DELAY, DEFAULT_STREAMING_LOAD_MAX_DELAY), DEFAULT_EXECUTOR, // files will be recreated if not successfully loaded BigQuery.Load.Delete.always).withOpenCensusMetrics(); } @@ -227,15 +232,16 @@ Output getOutput(Env env) { env.getInt(BATCH_MAX_MESSAGES, DEFAULT_BATCH_MAX_MESSAGES), env.getDuration(BATCH_MAX_DELAY, DEFAULT_BATCH_MAX_DELAY), PubsubMessageToTemplatedString.forBigQuery(getBigQueryOutputBucket(env)), - getFormat(env), blobInfo -> bigQueryLoad.apply(BlobInfoToPubsubMessage.apply(blobInfo))) + DEFAULT_EXECUTOR, getFormat(env), + blobInfo -> bigQueryLoad.apply(BlobInfoToPubsubMessage.apply(blobInfo))) .withOpenCensusMetrics(); // Like bigQueryStreaming, but use STREAMING_ prefix env vars for batch configuration Function> streamingOutput = new BigQuery.Write( bigQuery, env.getLong(STREAMING_BATCH_MAX_BYTES, DEFAULT_STREAMING_BATCH_MAX_BYTES), env.getInt(STREAMING_BATCH_MAX_MESSAGES, DEFAULT_STREAMING_BATCH_MAX_MESSAGES), env.getDuration(STREAMING_BATCH_MAX_DELAY, DEFAULT_STREAMING_BATCH_MAX_DELAY), - PubsubMessageToTemplatedString.forBigQuery(env.getString(OUTPUT_TABLE)), getFormat(env)) - .withOpenCensusMetrics(); + PubsubMessageToTemplatedString.forBigQuery(env.getString(OUTPUT_TABLE)), + DEFAULT_EXECUTOR, getFormat(env)).withOpenCensusMetrics(); // fallbackOutput sends messages to fileOutput when rejected by streamingOutput due to size Function> fallbackOutput = message -> streamingOutput .apply(message).thenApply(CompletableFuture::completedFuture).exceptionally(t -> { @@ -371,7 +377,7 @@ public static Pubsub.Read getInput(Output output) throws IOException { .setMaxOutstandingElementCount(output.type.getMaxOutstandingElementCount(output.env)) .setMaxOutstandingRequestBytes(output.type.getMaxOutstandingRequestBytes(output.env)) .build()), - getInputCompression(output.env)); + getInputCompression(output.env), DEFAULT_EXECUTOR); output.env.requireAllVarsUsed(); // Setup OpenCensus stackdriver exporter after all measurement views have been registered, // as seen in https://opencensus.io/exporters/supported-exporters/java/stackdriver-stats/ diff --git a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/BigQuery.java b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/BigQuery.java index 9ca90a5bf..5dcc887fa 100644 --- a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/BigQuery.java +++ b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/BigQuery.java @@ -24,11 +24,10 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class BigQuery { @@ -59,16 +58,14 @@ private static TableId getTableId(String input) { public static class Write extends BatchWrite { - private static final Logger LOG = LoggerFactory.getLogger(Write.class); - private final com.google.cloud.bigquery.BigQuery bigQuery; private final PubsubMessageToObjectNode encoder; /** Constructor. */ public Write(com.google.cloud.bigquery.BigQuery bigQuery, long maxBytes, int maxMessages, - Duration maxDelay, PubsubMessageToTemplatedString batchKeyTemplate, + Duration maxDelay, PubsubMessageToTemplatedString batchKeyTemplate, Executor executor, PubsubMessageToObjectNode encoder) { - super(maxBytes, maxMessages, maxDelay, batchKeyTemplate); + super(maxBytes, maxMessages, maxDelay, batchKeyTemplate, executor); this.bigQuery = bigQuery; this.encoder = encoder; } @@ -153,8 +150,8 @@ public enum Delete { /** Constructor. */ public Load(com.google.cloud.bigquery.BigQuery bigQuery, Storage storage, long maxBytes, - int maxFiles, Duration maxDelay, Delete delete) { - super(maxBytes, maxFiles, maxDelay, null); + int maxFiles, Duration maxDelay, Executor executor, Delete delete) { + super(maxBytes, maxFiles, maxDelay, null, executor); this.bigQuery = bigQuery; this.storage = storage; this.delete = delete; diff --git a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/Gcs.java b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/Gcs.java index 59b39d10f..886f2cc38 100644 --- a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/Gcs.java +++ b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/Gcs.java @@ -16,6 +16,7 @@ import java.time.Duration; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -35,9 +36,10 @@ public static class Ndjson extends Write { private final PubsubMessageToObjectNode encoder; public Ndjson(Storage storage, long maxBytes, int maxMessages, Duration maxDelay, - PubsubMessageToTemplatedString batchKeyTemplate, PubsubMessageToObjectNode encoder, + PubsubMessageToTemplatedString batchKeyTemplate, Executor executor, + PubsubMessageToObjectNode encoder, Function> batchCloseHook) { - super(storage, maxBytes, maxMessages, maxDelay, batchKeyTemplate, batchCloseHook); + super(storage, maxBytes, maxMessages, maxDelay, batchKeyTemplate, executor, batchCloseHook); this.encoder = encoder; } @@ -55,9 +57,9 @@ protected byte[] encodeInput(PubsubMessage input) { private final Function> batchCloseHook; private Write(Storage storage, long maxBytes, int maxMessages, Duration maxDelay, - PubsubMessageToTemplatedString batchKeyTemplate, + PubsubMessageToTemplatedString batchKeyTemplate, Executor executor, Function> batchCloseHook) { - super(maxBytes, maxMessages, maxDelay, batchKeyTemplate); + super(maxBytes, maxMessages, maxDelay, batchKeyTemplate, executor); this.storage = storage; this.batchCloseHook = batchCloseHook; } diff --git a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/Pubsub.java b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/Pubsub.java index a659f76af..3089c4ec5 100644 --- a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/Pubsub.java +++ b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/Pubsub.java @@ -16,7 +16,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,11 +35,11 @@ public static class Read { /** Constructor. */ public Read(String subscriptionName, Function> output, Function config, - Function decompress) { + Function decompress, Executor executor) { ProjectSubscriptionName subscription = ProjectSubscriptionName.parse(subscriptionName); subscriber = config.apply(Subscriber.newBuilder(subscription, (message, consumer) -> CompletableFuture.completedFuture(message) - .thenApplyAsync(decompress).thenComposeAsync(output) + .thenApplyAsync(decompress, executor).thenComposeAsync(output, executor) .whenCompleteAsync((result, exception) -> { if (exception == null) { consumer.ack(); @@ -49,7 +48,7 @@ public Read(String subscriptionName, Function publishers = new ConcurrentHashMap<>(); /** Constructor. */ - public Write(String topicTemplate, int numThreads, + public Write(String topicTemplate, Executor executor, Function config, Function compress) { - executor = Executors.newFixedThreadPool(numThreads); + this.executor = executor; this.topicTemplate = PubsubMessageToTemplatedString.of(topicTemplate); this.config = config; this.compress = compress; diff --git a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/util/BatchWrite.java b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/util/BatchWrite.java index 07cab8382..19c95526c 100644 --- a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/util/BatchWrite.java +++ b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/util/BatchWrite.java @@ -17,6 +17,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -80,18 +81,20 @@ private static Distribution exponentialDistribution(double base, double expStart private final MeasureLong totalMessages; protected final PubsubMessageToTemplatedString batchKeyTemplate; + protected final Executor executor; private final long maxBytes; private final int maxMessages; - private final long maxDelayMillis; + private final Duration maxDelay; /** Constructor. */ public BatchWrite(long maxBytes, int maxMessages, Duration maxDelay, - PubsubMessageToTemplatedString batchKeyTemplate) { + PubsubMessageToTemplatedString batchKeyTemplate, Executor executor) { this.maxBytes = maxBytes; this.maxMessages = maxMessages; - this.maxDelayMillis = maxDelay.toMillis(); + this.maxDelay = maxDelay; this.batchKeyTemplate = batchKeyTemplate; + this.executor = executor; // create OpenCensus measures with a class name prefix final String shortClassName = this.getClass().getName().replaceAll(".*[.]", ""); @@ -164,17 +167,16 @@ public CompletableFuture apply(InputT input) { public abstract class Batch { - // block this batch from completing by timeout until this future is resolved + // block this batch from completing by timeout until this future is completed final CompletableFuture init = new CompletableFuture<>(); - // wait for init then setup full indicator by timeout + // wait for init then mark this batch full after maxDelay @VisibleForTesting - final CompletableFuture full = init.thenRunAsync(this::timeout) - .exceptionally(ignore -> null); + final CompletableFuture full = init.thenComposeAsync(v -> new TimedFuture(maxDelay)); // wait for full then synchronize and close private final CompletableFuture result = full - .thenComposeAsync(this::synchronousClose); + .thenComposeAsync(this::synchronousClose, executor); @VisibleForTesting public int size = 0; @@ -184,14 +186,6 @@ public abstract class Batch { private final long startNanos = System.nanoTime(); - private void timeout() { - try { - Thread.sleep(maxDelayMillis); - } catch (InterruptedException e) { - // this is fine - } - } - /** * Call close from a synchronized context. */ @@ -214,7 +208,7 @@ private synchronized Optional> add(EncodedT encodedInput byteSize = newByteSize; write(encodedInput); return Optional - .of(result.thenAcceptAsync(result -> this.checkResultFor(result, newSize - 1))); + .of(result.thenAcceptAsync(result -> this.checkResultFor(result, newSize - 1), executor)); } protected void checkResultFor(BatchResultT batchResult, int index) { diff --git a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/util/TimedFuture.java b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/util/TimedFuture.java new file mode 100644 index 000000000..3af8e6e0c --- /dev/null +++ b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/util/TimedFuture.java @@ -0,0 +1,24 @@ +package com.mozilla.telemetry.ingestion.sink.util; + +import java.time.Duration; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; + +/** + * A non-blocking Future that will complete after some amount of time has passed. + */ +public class TimedFuture extends CompletableFuture { + + public TimedFuture(Duration delay) { + new Timer().schedule(new CompleteTask(), delay.toMillis()); + } + + private class CompleteTask extends TimerTask { + + @Override + public void run() { + complete(null); + } + } +} diff --git a/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/BigQueryTest.java b/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/BigQueryTest.java index d8ef37946..219a2b6f3 100644 --- a/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/BigQueryTest.java +++ b/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/BigQueryTest.java @@ -23,6 +23,7 @@ import java.time.Duration; import java.util.List; import java.util.concurrent.CompletionException; +import java.util.concurrent.ForkJoinPool; import org.junit.Before; import org.junit.Test; @@ -49,7 +50,7 @@ public void mockBigQueryResponse() { when(bigQuery.insertAll(any())).thenReturn(response); when(response.getErrorsFor(anyLong())).thenReturn(ImmutableList.of()); output = new BigQuery.Write(bigQuery, MAX_BYTES, MAX_MESSAGES, MAX_DELAY, BATCH_KEY_TEMPLATE, - PubsubMessageToObjectNode.Raw.of()); + ForkJoinPool.commonPool(), PubsubMessageToObjectNode.Raw.of()); } @Test @@ -60,7 +61,7 @@ public void canReturnSuccess() { @Test public void canSendWithNoDelay() { output = new BigQuery.Write(bigQuery, MAX_BYTES, MAX_MESSAGES, Duration.ofMillis(0), - BATCH_KEY_TEMPLATE, PubsubMessageToObjectNode.Raw.of()); + BATCH_KEY_TEMPLATE, ForkJoinPool.commonPool(), PubsubMessageToObjectNode.Raw.of()); output.apply(EMPTY_MESSAGE); assertEquals(1, output.batches.get(BATCH_KEY).size); } @@ -111,7 +112,7 @@ public void canHandleOversizeMessage() { public void canHandleProjectInTableId() { output = new BigQuery.Write(bigQuery, MAX_BYTES, MAX_MESSAGES, NO_DELAY, PubsubMessageToTemplatedString.forBigQuery("project.dataset.table"), - PubsubMessageToObjectNode.Raw.of()); + ForkJoinPool.commonPool(), PubsubMessageToObjectNode.Raw.of()); output.apply(EMPTY_MESSAGE).join(); assertNotNull(output.batches.get(TableId.of("project", "dataset", "table"))); } @@ -119,7 +120,7 @@ public void canHandleProjectInTableId() { @Test public void canHandleDocumentId() { output = new BigQuery.Write(bigQuery, MAX_BYTES, MAX_MESSAGES, NO_DELAY, BATCH_KEY_TEMPLATE, - PubsubMessageToObjectNode.Raw.of()); + ForkJoinPool.commonPool(), PubsubMessageToObjectNode.Raw.of()); output.apply(PubsubMessage.newBuilder().putAttributes("document_id", "id").build()).join(); List rows = ((BigQuery.Write.Batch) output.batches .get(BATCH_KEY)).builder.build().getRows(); @@ -133,7 +134,7 @@ public void canHandleDocumentId() { public void canHandleDynamicTableId() { output = new BigQuery.Write(bigQuery, MAX_BYTES, MAX_MESSAGES, NO_DELAY, PubsubMessageToTemplatedString.forBigQuery("${dataset}.${table}"), - PubsubMessageToObjectNode.Raw.of()); + ForkJoinPool.commonPool(), PubsubMessageToObjectNode.Raw.of()); output.apply(PubsubMessage.newBuilder().putAttributes("dataset", "dataset") .putAttributes("table", "table").build()).join(); assertNotNull(output.batches.get(TableId.of("dataset", "table"))); @@ -143,7 +144,7 @@ public void canHandleDynamicTableId() { public void canHandleDynamicTableIdWithEmptyValues() { output = new BigQuery.Write(bigQuery, MAX_BYTES, MAX_MESSAGES, NO_DELAY, PubsubMessageToTemplatedString.forBigQuery("${dataset}_.${table}_"), - PubsubMessageToObjectNode.Raw.of()); + ForkJoinPool.commonPool(), PubsubMessageToObjectNode.Raw.of()); output.apply( PubsubMessage.newBuilder().putAttributes("dataset", "").putAttributes("table", "").build()) .join(); @@ -154,7 +155,7 @@ public void canHandleDynamicTableIdWithEmptyValues() { public void canHandleDynamicTableIdWithDefaults() { output = new BigQuery.Write(bigQuery, MAX_BYTES, MAX_MESSAGES, NO_DELAY, PubsubMessageToTemplatedString.forBigQuery("${dataset:-dataset}.${table:-table}"), - PubsubMessageToObjectNode.Raw.of()); + ForkJoinPool.commonPool(), PubsubMessageToObjectNode.Raw.of()); output.apply(EMPTY_MESSAGE).join(); assertNotNull(output.batches.get(TableId.of("dataset", "table"))); } @@ -164,7 +165,7 @@ public void canHandleDynamicTableIdWithHyphens() { output = new BigQuery.Write(bigQuery, MAX_BYTES, MAX_MESSAGES, NO_DELAY, PubsubMessageToTemplatedString .forBigQuery("${document_namespace}.${document_type}_${suffix}"), - PubsubMessageToObjectNode.Raw.of()); + ForkJoinPool.commonPool(), PubsubMessageToObjectNode.Raw.of()); output.apply(PubsubMessage.newBuilder().putAttributes("document_namespace", "my-namespace") .putAttributes("document_type", "myDocType").putAttributes("suffix", "my-suffix").build()) .join(); @@ -175,14 +176,14 @@ public void canHandleDynamicTableIdWithHyphens() { public void failsOnMissingAttributes() { new BigQuery.Write(bigQuery, MAX_BYTES, MAX_MESSAGES, NO_DELAY, PubsubMessageToTemplatedString.forBigQuery("${dataset}.${table}"), - PubsubMessageToObjectNode.Raw.of()).apply(EMPTY_MESSAGE); + ForkJoinPool.commonPool(), PubsubMessageToObjectNode.Raw.of()).apply(EMPTY_MESSAGE); } @Test(expected = IllegalArgumentException.class) public void failsOnInvalidTable() { new BigQuery.Write(bigQuery, MAX_BYTES, MAX_MESSAGES, NO_DELAY, - PubsubMessageToTemplatedString.forBigQuery(""), PubsubMessageToObjectNode.Raw.of()) - .apply(EMPTY_MESSAGE); + PubsubMessageToTemplatedString.forBigQuery(""), ForkJoinPool.commonPool(), + PubsubMessageToObjectNode.Raw.of()).apply(EMPTY_MESSAGE); } @Test(expected = NullPointerException.class) diff --git a/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/GcsWriteTest.java b/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/GcsWriteTest.java index 3dca4d095..6241a8317 100644 --- a/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/GcsWriteTest.java +++ b/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/GcsWriteTest.java @@ -17,6 +17,7 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ForkJoinPool; import org.junit.Before; import org.junit.Test; @@ -43,7 +44,7 @@ private CompletableFuture batchCloseHook(BlobInfo ignore) { public void mockBigQueryResponse() { storage = mock(Storage.class); output = new Gcs.Write.Ndjson(storage, MAX_BYTES, MAX_MESSAGES, MAX_DELAY, BATCH_KEY_TEMPLATE, - PubsubMessageToObjectNode.Raw.of(), this::batchCloseHook); + ForkJoinPool.commonPool(), PubsubMessageToObjectNode.Raw.of(), this::batchCloseHook); } @Test @@ -57,7 +58,8 @@ public void canReturnSuccess() { @Test public void canSendWithNoDelay() { output = new Gcs.Write.Ndjson(storage, MAX_BYTES, MAX_MESSAGES, Duration.ofMillis(0), - BATCH_KEY_TEMPLATE, PubsubMessageToObjectNode.Raw.of(), this::batchCloseHook); + BATCH_KEY_TEMPLATE, ForkJoinPool.commonPool(), PubsubMessageToObjectNode.Raw.of(), + this::batchCloseHook); output.apply(EMPTY_MESSAGE).join(); assertEquals(1, output.batches.get(BATCH_KEY).size); assertEquals(EMPTY_MESSAGE_SIZE, output.batches.get(BATCH_KEY).byteSize); diff --git a/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/PubsubReadIntegrationTest.java b/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/PubsubReadIntegrationTest.java index 59faad907..6adc10ac6 100644 --- a/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/PubsubReadIntegrationTest.java +++ b/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/PubsubReadIntegrationTest.java @@ -9,6 +9,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicReference; import org.junit.Rule; import org.junit.Test; @@ -38,7 +39,7 @@ public void canReadOneMessage() { .map(channelProvider -> builder.setChannelProvider(channelProvider) .setCredentialsProvider(pubsub.noCredentialsProvider)) .orElse(builder), - m -> m)); + m -> m, ForkJoinPool.commonPool())); input.get().run(); @@ -68,7 +69,7 @@ public void canRetryOnException() { .map(channelProvider -> builder.setChannelProvider(channelProvider) .setCredentialsProvider(pubsub.noCredentialsProvider)) .orElse(builder), - m -> m)); + m -> m, ForkJoinPool.commonPool())); input.get().run(); diff --git a/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/PubsubWriteIntegrationTest.java b/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/PubsubWriteIntegrationTest.java index 5ad05f8c3..bf4afa485 100644 --- a/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/PubsubWriteIntegrationTest.java +++ b/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/PubsubWriteIntegrationTest.java @@ -7,6 +7,7 @@ import com.google.pubsub.v1.PubsubMessage; import com.mozilla.telemetry.ingestion.sink.util.SinglePubsubTopic; import java.nio.charset.StandardCharsets; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.junit.Rule; @@ -23,16 +24,18 @@ public class PubsubWriteIntegrationTest { @Test public void canWriteToStaticDestination() { - new Pubsub.Write(pubsub.getTopic(), 1, b -> b, m -> m).apply(PubsubMessage.newBuilder() - .setData(ByteString.copyFrom("test".getBytes(StandardCharsets.UTF_8))).build()).join(); + new Pubsub.Write(pubsub.getTopic(), ForkJoinPool.commonPool(), b -> b, m -> m) + .apply(PubsubMessage.newBuilder() + .setData(ByteString.copyFrom("test".getBytes(StandardCharsets.UTF_8))).build()) + .join(); assertEquals(ImmutableList.of("test"), pubsub.pull(1, false).stream() .map(m -> m.getData().toStringUtf8()).collect(Collectors.toList())); } @Test public void canWriteToDynamicDestination() { - new Pubsub.Write("${topic}", 1, b -> b, m -> m).apply(PubsubMessage.newBuilder() - .setData(ByteString.copyFrom("test".getBytes(StandardCharsets.UTF_8))) + new Pubsub.Write("${topic}", ForkJoinPool.commonPool(), b -> b, m -> m).apply(PubsubMessage + .newBuilder().setData(ByteString.copyFrom("test".getBytes(StandardCharsets.UTF_8))) .putAttributes("topic", pubsub.getTopic()).build()).join(); assertEquals(ImmutableList.of("test"), pubsub.pull(1, false).stream() .map(m -> m.getData().toStringUtf8()).collect(Collectors.toList())); diff --git a/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/util/BatchWriteTest.java b/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/util/BatchWriteTest.java index 7304a746c..f84780738 100644 --- a/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/util/BatchWriteTest.java +++ b/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/util/BatchWriteTest.java @@ -4,6 +4,7 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; import org.junit.Test; public class BatchWriteTest { @@ -13,7 +14,7 @@ private static class NoopBatchWrite extends BatchWrite