Skip to content

Commit

Permalink
Always provide executor to CompletableFuture::*Async (#1243)
Browse files Browse the repository at this point in the history
  • Loading branch information
relud authored Apr 15, 2020
1 parent bd5f43b commit b43df65
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Function;
import java.util.function.Predicate;

Expand Down Expand Up @@ -61,6 +63,10 @@ public class SinkConfig {
STREAMING_BATCH_MAX_BYTES, STREAMING_BATCH_MAX_DELAY, STREAMING_BATCH_MAX_MESSAGES,
STREAMING_DOCTYPES, STRICT_SCHEMA_DOCTYPES);

// Executor for CompletableFuture::*Async to use instead of ForkJoinPool.commonPool(), because
// the default parallelism is 1 in stage and prod.
private static final Executor DEFAULT_EXECUTOR = new ForkJoinPool(
Math.max(Runtime.getRuntime().availableProcessors() * 2, 10));
// BigQuery.Write.Batch.getByteSize reports protobuf size, which can be ~1/3rd more
// efficient than the JSON that actually gets sent over HTTP, so we use to 60% of the
// 10MB API limit by default.
Expand All @@ -85,7 +91,7 @@ public class SinkConfig {
// to have a total pipeline delay (including edge and decoder) of less than 1 hour, and ideally
// less than 10 minutes, so this plus DEFAULT_LOAD_MAX_DELAY should be less than 10 minutes.
// Messages may be kept in memory until they ack or nack, so too much delay can cause OOM errors.
private static final String DEFAULT_BATCH_MAX_DELAY = "10s"; // 10 seconds
private static final String DEFAULT_BATCH_MAX_DELAY = "1m"; // 1 minute
// BigQuery Load API limits maximum bytes per request to 15TB, but load requests for clustered
// tables fail when sorting that much data, so to avoid that issue the default is lower.
private static final long DEFAULT_LOAD_MAX_BYTES = 100_000_000_000L; // 100GB
Expand Down Expand Up @@ -134,10 +140,8 @@ private CompressPayload getOutputCompression(Env env) {

@Override
Output getOutput(Env env) {
return new Output(env, this,
new Pubsub.Write(env.getString(OUTPUT_TOPIC),
env.getInt(OUTPUT_TOPIC_EXECUTOR_THREADS, 1), b -> b,
getOutputCompression(env))::withoutResult);
return new Output(env, this, new Pubsub.Write(env.getString(OUTPUT_TOPIC), DEFAULT_EXECUTOR,
b -> b, getOutputCompression(env))::withoutResult);
}
},

Expand All @@ -150,8 +154,9 @@ Output getOutput(Env env) {
env.getLong(BATCH_MAX_BYTES, DEFAULT_BATCH_MAX_BYTES),
env.getInt(BATCH_MAX_MESSAGES, DEFAULT_BATCH_MAX_MESSAGES),
env.getDuration(BATCH_MAX_DELAY, DEFAULT_BATCH_MAX_DELAY),
PubsubMessageToTemplatedString.of(getGcsOutputBucket(env)), getFormat(env),
ignore -> CompletableFuture.completedFuture(null)).withOpenCensusMetrics());
PubsubMessageToTemplatedString.of(getGcsOutputBucket(env)), DEFAULT_EXECUTOR,
getFormat(env), ignore -> CompletableFuture.completedFuture(null))
.withOpenCensusMetrics());
}
},

Expand All @@ -163,7 +168,7 @@ Output getOutput(Env env) {
new BigQuery.Load(getBigQueryService(env), getGcsService(env),
env.getLong(LOAD_MAX_BYTES, DEFAULT_LOAD_MAX_BYTES),
env.getInt(LOAD_MAX_FILES, DEFAULT_LOAD_MAX_FILES),
env.getDuration(LOAD_MAX_DELAY, DEFAULT_LOAD_MAX_DELAY),
env.getDuration(LOAD_MAX_DELAY, DEFAULT_LOAD_MAX_DELAY), DEFAULT_EXECUTOR,
// don't delete files until successfully loaded
BigQuery.Load.Delete.onSuccess).withOpenCensusMetrics());
}
Expand All @@ -180,7 +185,7 @@ Output getOutput(Env env) {
env.getInt(BATCH_MAX_MESSAGES, DEFAULT_BATCH_MAX_MESSAGES),
env.getDuration(BATCH_MAX_DELAY, DEFAULT_BATCH_MAX_DELAY),
PubsubMessageToTemplatedString.forBigQuery(getBigQueryOutputBucket(env)),
getFormat(env),
DEFAULT_EXECUTOR, getFormat(env),
// BigQuery Load API limits maximum load requests per table per day to 1,000 so send
// blobInfo to pubsub and require loads be run separately to reduce maximum latency
blobInfo -> pubsubWrite.apply(BlobInfoToPubsubMessage.apply(blobInfo)))
Expand All @@ -198,7 +203,7 @@ Output getOutput(Env env) {
env.getInt(BATCH_MAX_MESSAGES, DEFAULT_STREAMING_BATCH_MAX_MESSAGES),
env.getDuration(BATCH_MAX_DELAY, DEFAULT_STREAMING_BATCH_MAX_DELAY),
PubsubMessageToTemplatedString.forBigQuery(env.getString(OUTPUT_TABLE)),
getFormat(env)).withOpenCensusMetrics());
DEFAULT_EXECUTOR, getFormat(env)).withOpenCensusMetrics());
}
},

Expand All @@ -217,7 +222,7 @@ Output getOutput(Env env) {
bigQueryLoad = new BigQuery.Load(bigQuery, storage,
env.getLong(LOAD_MAX_BYTES, DEFAULT_LOAD_MAX_BYTES),
env.getInt(LOAD_MAX_FILES, DEFAULT_LOAD_MAX_FILES),
env.getDuration(LOAD_MAX_DELAY, DEFAULT_STREAMING_LOAD_MAX_DELAY),
env.getDuration(LOAD_MAX_DELAY, DEFAULT_STREAMING_LOAD_MAX_DELAY), DEFAULT_EXECUTOR,
// files will be recreated if not successfully loaded
BigQuery.Load.Delete.always).withOpenCensusMetrics();
}
Expand All @@ -227,15 +232,16 @@ Output getOutput(Env env) {
env.getInt(BATCH_MAX_MESSAGES, DEFAULT_BATCH_MAX_MESSAGES),
env.getDuration(BATCH_MAX_DELAY, DEFAULT_BATCH_MAX_DELAY),
PubsubMessageToTemplatedString.forBigQuery(getBigQueryOutputBucket(env)),
getFormat(env), blobInfo -> bigQueryLoad.apply(BlobInfoToPubsubMessage.apply(blobInfo)))
DEFAULT_EXECUTOR, getFormat(env),
blobInfo -> bigQueryLoad.apply(BlobInfoToPubsubMessage.apply(blobInfo)))
.withOpenCensusMetrics();
// Like bigQueryStreaming, but use STREAMING_ prefix env vars for batch configuration
Function<PubsubMessage, CompletableFuture<Void>> streamingOutput = new BigQuery.Write(
bigQuery, env.getLong(STREAMING_BATCH_MAX_BYTES, DEFAULT_STREAMING_BATCH_MAX_BYTES),
env.getInt(STREAMING_BATCH_MAX_MESSAGES, DEFAULT_STREAMING_BATCH_MAX_MESSAGES),
env.getDuration(STREAMING_BATCH_MAX_DELAY, DEFAULT_STREAMING_BATCH_MAX_DELAY),
PubsubMessageToTemplatedString.forBigQuery(env.getString(OUTPUT_TABLE)), getFormat(env))
.withOpenCensusMetrics();
PubsubMessageToTemplatedString.forBigQuery(env.getString(OUTPUT_TABLE)),
DEFAULT_EXECUTOR, getFormat(env)).withOpenCensusMetrics();
// fallbackOutput sends messages to fileOutput when rejected by streamingOutput due to size
Function<PubsubMessage, CompletableFuture<Void>> fallbackOutput = message -> streamingOutput
.apply(message).thenApply(CompletableFuture::completedFuture).exceptionally(t -> {
Expand Down Expand Up @@ -371,7 +377,7 @@ public static Pubsub.Read getInput(Output output) throws IOException {
.setMaxOutstandingElementCount(output.type.getMaxOutstandingElementCount(output.env))
.setMaxOutstandingRequestBytes(output.type.getMaxOutstandingRequestBytes(output.env))
.build()),
getInputCompression(output.env));
getInputCompression(output.env), DEFAULT_EXECUTOR);
output.env.requireAllVarsUsed();
// Setup OpenCensus stackdriver exporter after all measurement views have been registered,
// as seen in https://opencensus.io/exporters/supported-exporters/java/stackdriver-stats/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQuery {

Expand Down Expand Up @@ -59,16 +58,14 @@ private static TableId getTableId(String input) {
public static class Write
extends BatchWrite<PubsubMessage, PubsubMessage, TableId, InsertAllResponse> {

private static final Logger LOG = LoggerFactory.getLogger(Write.class);

private final com.google.cloud.bigquery.BigQuery bigQuery;
private final PubsubMessageToObjectNode encoder;

/** Constructor. */
public Write(com.google.cloud.bigquery.BigQuery bigQuery, long maxBytes, int maxMessages,
Duration maxDelay, PubsubMessageToTemplatedString batchKeyTemplate,
Duration maxDelay, PubsubMessageToTemplatedString batchKeyTemplate, Executor executor,
PubsubMessageToObjectNode encoder) {
super(maxBytes, maxMessages, maxDelay, batchKeyTemplate);
super(maxBytes, maxMessages, maxDelay, batchKeyTemplate, executor);
this.bigQuery = bigQuery;
this.encoder = encoder;
}
Expand Down Expand Up @@ -153,8 +150,8 @@ public enum Delete {

/** Constructor. */
public Load(com.google.cloud.bigquery.BigQuery bigQuery, Storage storage, long maxBytes,
int maxFiles, Duration maxDelay, Delete delete) {
super(maxBytes, maxFiles, maxDelay, null);
int maxFiles, Duration maxDelay, Executor executor, Delete delete) {
super(maxBytes, maxFiles, maxDelay, null, executor);
this.bigQuery = bigQuery;
this.storage = storage;
this.delete = delete;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -35,9 +36,10 @@ public static class Ndjson extends Write {
private final PubsubMessageToObjectNode encoder;

public Ndjson(Storage storage, long maxBytes, int maxMessages, Duration maxDelay,
PubsubMessageToTemplatedString batchKeyTemplate, PubsubMessageToObjectNode encoder,
PubsubMessageToTemplatedString batchKeyTemplate, Executor executor,
PubsubMessageToObjectNode encoder,
Function<BlobInfo, CompletableFuture<Void>> batchCloseHook) {
super(storage, maxBytes, maxMessages, maxDelay, batchKeyTemplate, batchCloseHook);
super(storage, maxBytes, maxMessages, maxDelay, batchKeyTemplate, executor, batchCloseHook);
this.encoder = encoder;
}

Expand All @@ -55,9 +57,9 @@ protected byte[] encodeInput(PubsubMessage input) {
private final Function<BlobInfo, CompletableFuture<Void>> batchCloseHook;

private Write(Storage storage, long maxBytes, int maxMessages, Duration maxDelay,
PubsubMessageToTemplatedString batchKeyTemplate,
PubsubMessageToTemplatedString batchKeyTemplate, Executor executor,
Function<BlobInfo, CompletableFuture<Void>> batchCloseHook) {
super(maxBytes, maxMessages, maxDelay, batchKeyTemplate);
super(maxBytes, maxMessages, maxDelay, batchKeyTemplate, executor);
this.storage = storage;
this.batchCloseHook = batchCloseHook;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,11 +35,11 @@ public static class Read {
/** Constructor. */
public <T> Read(String subscriptionName, Function<PubsubMessage, CompletableFuture<T>> output,
Function<Subscriber.Builder, Subscriber.Builder> config,
Function<PubsubMessage, PubsubMessage> decompress) {
Function<PubsubMessage, PubsubMessage> decompress, Executor executor) {
ProjectSubscriptionName subscription = ProjectSubscriptionName.parse(subscriptionName);
subscriber = config.apply(Subscriber.newBuilder(subscription,
(message, consumer) -> CompletableFuture.completedFuture(message)
.thenApplyAsync(decompress).thenComposeAsync(output)
.thenApplyAsync(decompress, executor).thenComposeAsync(output, executor)
.whenCompleteAsync((result, exception) -> {
if (exception == null) {
consumer.ack();
Expand All @@ -49,7 +48,7 @@ public <T> Read(String subscriptionName, Function<PubsubMessage, CompletableFutu
LOG.warn("Exception while attempting to deliver message", exception.getCause());
consumer.nack();
}
})))
}, executor)))
.build();
}

Expand All @@ -73,10 +72,10 @@ public static class Write implements Function<PubsubMessage, CompletableFuture<S
private final ConcurrentMap<String, Publisher> publishers = new ConcurrentHashMap<>();

/** Constructor. */
public Write(String topicTemplate, int numThreads,
public Write(String topicTemplate, Executor executor,
Function<Publisher.Builder, Publisher.Builder> config,
Function<PubsubMessage, PubsubMessage> compress) {
executor = Executors.newFixedThreadPool(numThreads);
this.executor = executor;
this.topicTemplate = PubsubMessageToTemplatedString.of(topicTemplate);
this.config = config;
this.compress = compress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -80,18 +81,20 @@ private static Distribution exponentialDistribution(double base, double expStart
private final MeasureLong totalMessages;

protected final PubsubMessageToTemplatedString batchKeyTemplate;
protected final Executor executor;

private final long maxBytes;
private final int maxMessages;
private final long maxDelayMillis;
private final Duration maxDelay;

/** Constructor. */
public BatchWrite(long maxBytes, int maxMessages, Duration maxDelay,
PubsubMessageToTemplatedString batchKeyTemplate) {
PubsubMessageToTemplatedString batchKeyTemplate, Executor executor) {
this.maxBytes = maxBytes;
this.maxMessages = maxMessages;
this.maxDelayMillis = maxDelay.toMillis();
this.maxDelay = maxDelay;
this.batchKeyTemplate = batchKeyTemplate;
this.executor = executor;

// create OpenCensus measures with a class name prefix
final String shortClassName = this.getClass().getName().replaceAll(".*[.]", "");
Expand Down Expand Up @@ -164,17 +167,16 @@ public CompletableFuture<Void> apply(InputT input) {

public abstract class Batch {

// block this batch from completing by timeout until this future is resolved
// block this batch from completing by timeout until this future is completed
final CompletableFuture<Void> init = new CompletableFuture<>();

// wait for init then setup full indicator by timeout
// wait for init then mark this batch full after maxDelay
@VisibleForTesting
final CompletableFuture<Void> full = init.thenRunAsync(this::timeout)
.exceptionally(ignore -> null);
final CompletableFuture<Void> full = init.thenComposeAsync(v -> new TimedFuture(maxDelay));

// wait for full then synchronize and close
private final CompletableFuture<BatchResultT> result = full
.thenComposeAsync(this::synchronousClose);
.thenComposeAsync(this::synchronousClose, executor);

@VisibleForTesting
public int size = 0;
Expand All @@ -184,14 +186,6 @@ public abstract class Batch {

private final long startNanos = System.nanoTime();

private void timeout() {
try {
Thread.sleep(maxDelayMillis);
} catch (InterruptedException e) {
// this is fine
}
}

/**
* Call close from a synchronized context.
*/
Expand All @@ -214,7 +208,7 @@ private synchronized Optional<CompletableFuture<Void>> add(EncodedT encodedInput
byteSize = newByteSize;
write(encodedInput);
return Optional
.of(result.thenAcceptAsync(result -> this.checkResultFor(result, newSize - 1)));
.of(result.thenAcceptAsync(result -> this.checkResultFor(result, newSize - 1), executor));
}

protected void checkResultFor(BatchResultT batchResult, int index) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.mozilla.telemetry.ingestion.sink.util;

import java.time.Duration;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;

/**
* A non-blocking Future that will complete after some amount of time has passed.
*/
public class TimedFuture extends CompletableFuture<Void> {

public TimedFuture(Duration delay) {
new Timer().schedule(new CompleteTask(), delay.toMillis());
}

private class CompleteTask extends TimerTask {

@Override
public void run() {
complete(null);
}
}
}
Loading

0 comments on commit b43df65

Please sign in to comment.