Skip to content

Commit

Permalink
Add bqMaxBytesPerPartition parameter to sink
Browse files Browse the repository at this point in the history
Per Google case 22669857 we found that a recent batch job was failing due to
having too many tables created for `main_v4`. This increases the allowed size
for intermediate tables and exposes that value as a compile-time parameter.
  • Loading branch information
jklukas committed Mar 25, 2020
1 parent 8a1b224 commit 9b0b78c
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ public static class BigQueryOutput extends Write {
private final Duration triggeringFrequency;
private final InputType inputType;
private final int numShards;
private final long maxBytesPerPartition;
private final ValueProvider<List<String>> streamingDocTypes;
private final ValueProvider<List<String>> strictSchemaDocTypes;
private final ValueProvider<String> schemasLocation;
Expand All @@ -348,7 +349,7 @@ public static class BigQueryOutput extends Write {

/** Public constructor. */
public BigQueryOutput(ValueProvider<String> tableSpecTemplate, BigQueryWriteMethod writeMethod,
Duration triggeringFrequency, InputType inputType, int numShards,
Duration triggeringFrequency, InputType inputType, int numShards, long maxBytesPerPartition,
ValueProvider<List<String>> streamingDocTypes,
ValueProvider<List<String>> strictSchemaDocTypes, ValueProvider<String> schemasLocation,
ValueProvider<TableRowFormat> tableRowFormat, ValueProvider<String> partitioningField,
Expand All @@ -358,6 +359,7 @@ public BigQueryOutput(ValueProvider<String> tableSpecTemplate, BigQueryWriteMeth
this.triggeringFrequency = triggeringFrequency;
this.inputType = inputType;
this.numShards = numShards;
this.maxBytesPerPartition = maxBytesPerPartition;
this.streamingDocTypes = NestedValueProvider.of(streamingDocTypes,
value -> Optional.ofNullable(value).orElse(Collections.emptyList()));
this.strictSchemaDocTypes = NestedValueProvider.of(strictSchemaDocTypes,
Expand Down Expand Up @@ -476,10 +478,7 @@ public WithFailures.Result<PDone, PubsubMessage> expand(PCollection<PubsubMessag
fileLoadsInput.ifPresent(messages -> {
BigQueryIO.Write<KV<TableDestination, PubsubMessage>> fileLoadsWrite = baseWriteTransform
.withMethod(BigQueryWriteMethod.file_loads.method)
// When writing to main_v4 in batch mode, we sometimes see memory exceeded errors for
// BigQuery load jobs; we have found empirically that limiting the total data size per
// load job to 100 GB leads to reliable performance.
.withMaxBytesPerPartition(100 * (1L << 30));
.withMaxBytesPerPartition(maxBytesPerPartition);
if (inputType == InputType.pubsub) {
// When using the file_loads method of inserting to BigQuery, BigQueryIO requires
// triggering frequency if the input PCollection is unbounded (which is the case for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ public Result<PDone, PubsubMessage> expand(PCollection<PubsubMessage> input) {
public Write writeFailures(SinkOptions.Parsed options) {
return new BigQueryOutput(options.getErrorOutput(), options.getErrorBqWriteMethod(),
options.getParsedErrorBqTriggeringFrequency(), options.getInputType(),
options.getErrorBqNumFileShards(), StaticValueProvider.of(null),
StaticValueProvider.of(null), options.getSchemasLocation(),
options.getErrorBqNumFileShards(), options.getBqMaxBytesPerPartition(),
StaticValueProvider.of(null), StaticValueProvider.of(null), options.getSchemasLocation(),
StaticValueProvider.of(TableRowFormat.raw), options.getErrorBqPartitioningField(),
options.getErrorBqClusteringFields());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ public Write write(SinkOptions.Parsed options) {
public Write write(SinkOptions.Parsed options) {
return new BigQueryOutput(options.getOutput(), options.getBqWriteMethod(),
options.getParsedBqTriggeringFrequency(), options.getInputType(),
options.getBqNumFileShards(), options.getBqStreamingDocTypes(),
options.getBqStrictSchemaDocTypes(), options.getSchemasLocation(),
options.getOutputTableRowFormat(), options.getBqPartitioningField(),
options.getBqClusteringFields());
options.getBqNumFileShards(), options.getBqMaxBytesPerPartition(),
options.getBqStreamingDocTypes(), options.getBqStrictSchemaDocTypes(),
options.getSchemasLocation(), options.getOutputTableRowFormat(),
options.getBqPartitioningField(), options.getBqClusteringFields());
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,19 @@ public interface SinkOptions extends PipelineOptions {

void setBqTriggeringFrequency(String value);

// When writing to main_v4 in batch mode, we sometimes see memory exceeded errors for
// BigQuery load jobs; we have found empirically that limiting the total data size per
// load job to 100 GB leads to reliable performance.
@Description("Maximum number of bytes to load into a single BigQuery table before rolling"
+ " to an additional partition; this is generally only relevant when writing main_v4 in batch"
+ " mode; we have found empirically that 500 GB per load job is low enough to avoid memory"
+ " exceeded errors in load jobs yet high enough to avoid creating too many intermediate"
+ " tables such that the final copy job fails")
@Default.Long(500 * (1L << 30))
Long getBqMaxBytesPerPartition();

void setBqMaxBytesPerPartition(Long value);

@Description("Number of file shards to stage for BigQuery when writing via file_loads")
@Default.Integer(100)
int getBqNumFileShards();
Expand Down

0 comments on commit 9b0b78c

Please sign in to comment.