Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java] Add InsertRetryPolicy for non-successful BigQuery insertAll responses #34222

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -962,14 +962,20 @@ static class InsertBatchofRowsCallable implements Callable<List<InsertErrors>> {

private final TableReference ref;
private final Boolean skipInvalidRows;
private final Boolean ignoreUnkownValues;
private final Boolean ignoreUnknownValues; // Fixed typo from ignoreUnkownValues
private final Bigquery client;
private final FluentBackoff rateLimitBackoffFactory;
private final List<TableDataInsertAllRequest.Rows> rows;
private final AtomicLong maxThrottlingMsec;
private final Sleeper sleeper;
private final StreamingInsertsMetrics result;
// Nullable new fields
private final @Nullable InsertRetryPolicy retryPolicy;
private final @Nullable List<FailsafeValueInSingleWindow<TableRow, TableRow>> originalRows;
private final @Nullable List<ValueInSingleWindow<?>> failedInserts;
private final @Nullable ErrorContainer<?> errorContainer;

// Original 9-parameter constructor
InsertBatchofRowsCallable(
TableReference ref,
Boolean skipInvalidRows,
Expand All @@ -980,67 +986,128 @@ static class InsertBatchofRowsCallable implements Callable<List<InsertErrors>> {
AtomicLong maxThrottlingMsec,
Sleeper sleeper,
StreamingInsertsMetrics result) {
this(
ref,
skipInvalidRows,
ignoreUnknownValues,
client,
rateLimitBackoffFactory,
rows,
maxThrottlingMsec,
sleeper,
result,
null,
null,
null,
null);
}

// New 13-parameter constructor
InsertBatchofRowsCallable(
TableReference ref,
Boolean skipInvalidRows,
Boolean ignoreUnknownValues,
Bigquery client,
FluentBackoff rateLimitBackoffFactory,
List<TableDataInsertAllRequest.Rows> rows,
AtomicLong maxThrottlingMsec,
Sleeper sleeper,
StreamingInsertsMetrics result,
@Nullable InsertRetryPolicy retryPolicy,
@Nullable List<FailsafeValueInSingleWindow<TableRow, TableRow>> originalRows,
@Nullable List<ValueInSingleWindow<?>> failedInserts,
@Nullable ErrorContainer<?> errorContainer) {
this.ref = ref;
this.skipInvalidRows = skipInvalidRows;
this.ignoreUnkownValues = ignoreUnknownValues;
this.ignoreUnknownValues = ignoreUnknownValues;
this.client = client;
this.rateLimitBackoffFactory = rateLimitBackoffFactory;
this.rows = rows;
this.maxThrottlingMsec = maxThrottlingMsec;
this.sleeper = sleeper;
this.result = result;
this.retryPolicy = retryPolicy;
this.originalRows = originalRows;
this.failedInserts = failedInserts;
this.errorContainer = errorContainer;
}

@Override
public List<TableDataInsertAllResponse.InsertErrors> call() throws Exception {
TableDataInsertAllRequest content = new TableDataInsertAllRequest();
content.setRows(rows);
content.setSkipInvalidRows(skipInvalidRows);
content.setIgnoreUnknownValues(ignoreUnkownValues);
content.setIgnoreUnknownValues(ignoreUnknownValues);

final Bigquery.Tabledata.InsertAll insert =
client
.tabledata()
.insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), content)
.setPrettyPrint(false);

// A backoff for rate limit exceeded errors.
BackOff backoff1 = BackOffAdapter.toGcpBackOff(rateLimitBackoffFactory.backoff());
long totalBackoffMillis = 0L;
while (true) {
ServiceCallMetric serviceCallMetric = BigQueryUtils.writeCallMetric(ref);
Instant start = Instant.now();
try {
List<TableDataInsertAllResponse.InsertErrors> response =
insert.execute().getInsertErrors();
if (response == null || response.isEmpty()) {
TableDataInsertAllResponse response = insert.execute();
List<TableDataInsertAllResponse.InsertErrors> responseErrors =
response.getInsertErrors();
if (responseErrors == null || responseErrors.isEmpty()) {
serviceCallMetric.call("ok");
} else {
for (TableDataInsertAllResponse.InsertErrors insertErrors : response) {
for (TableDataInsertAllResponse.InsertErrors insertErrors : responseErrors) {
for (ErrorProto insertError : insertErrors.getErrors()) {
serviceCallMetric.call(insertError.getReason());
}
}
}
result.updateSuccessfulRpcMetrics(start, Instant.now());
return response;
} catch (IOException e) {
return responseErrors;
} catch (GoogleJsonResponseException e) {
GoogleJsonError.ErrorInfo errorInfo = getErrorInfo(e);
if (errorInfo == null) {
serviceCallMetric.call(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN);
result.updateFailedRpcMetrics(start, start, BigQuerySinkMetrics.UNKNOWN);
throw e;
}
String errorReason = errorInfo.getReason();
String errorReason = errorInfo != null ? errorInfo.getReason() : "unknown";
serviceCallMetric.call(errorReason);
result.updateFailedRpcMetrics(start, Instant.now(), errorReason);
/**
* TODO(BEAM-10584): Check for QUOTA_EXCEEDED error will be replaced by
* ApiErrorExtractor.INSTANCE.quotaExceeded(e) after the next release of
* GoogleCloudDataproc/hadoop-connectors
*/
if (!ApiErrorExtractor.INSTANCE.rateLimited(e)
&& !errorInfo.getReason().equals(QUOTA_EXCEEDED)) {

// Only apply retry policy and DLQ logic if retryPolicy is provided
if (retryPolicy != null) {
InsertRetryPolicy.Context context = new InsertRetryPolicy.Context(null, e);
if (!retryPolicy.shouldRetry(context)) {
// Non-retriable non-200 response: send all rows in this batch to DLQ if configured
LOG.warn(
"Non-retriable HTTP error {} for insertAll, sending to DLQ: {}",
e.getStatusCode(),
e.getMessage());
List<InsertErrors> syntheticErrors = new ArrayList<>();
for (int i = 0; i < rows.size(); i++) {
InsertErrors error =
new InsertErrors()
.setIndex((long) i)
.setErrors(
ImmutableList.of(
new ErrorProto()
.setReason(errorReason)
.setMessage(e.getMessage())));
syntheticErrors.add(error);
if (originalRows != null
&& failedInserts != null
&& errorContainer != null
&& i < originalRows.size()) {
@SuppressWarnings("unchecked")
List rawFailedInserts = (List) failedInserts;
@SuppressWarnings("unchecked")
ErrorContainer rawErrorContainer = (ErrorContainer) errorContainer;
rawErrorContainer.add(rawFailedInserts, error, ref, originalRows.get(i));
}
}
return syntheticErrors; // Return errors to mark batch as failed
}
}

// Retryable non-200 response or no retry policy
if (!ApiErrorExtractor.INSTANCE.rateLimited(e) && !errorReason.equals(QUOTA_EXCEEDED)) {
if (ApiErrorExtractor.INSTANCE.badRequest(e)
&& e.getMessage().contains(NO_ROWS_PRESENT)) {
LOG.error(
Expand All @@ -1049,8 +1116,9 @@ public List<TableDataInsertAllResponse.InsertErrors> call() throws Exception {
+ " or 0 to disable timeouts",
e.getCause());
}
throw e;
throw e; // Let the outer loop handle non-rate-limit errors
}

try (QuotaEventCloseable qec =
new QuotaEvent.Builder()
.withOperation("insert_all")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,34 @@ public abstract class InsertRetryPolicy implements Serializable {
/**
* Contains information about a failed insert.
*
* <p>Currently only the list of errors returned from BigQuery. In the future this may contain
* more information - e.g. how many times this insert has been retried, and for how long.
* <p>Contains the list of errors returned from BigQuery for per-element failures within a
* successful (200 OK) response, and an optional exception for non-successful (non-200) HTTP
* responses. In the future, this may include additional details, such as retry attempts or
* elapsed time.
*/
public static class Context {
// A list of all errors corresponding to an attempted insert of a single record.
final TableDataInsertAllResponse.InsertErrors errors;
// Exception thrown for non-successful (non-200) HTTP responses, if applicable.
final Throwable exception;

public TableDataInsertAllResponse.InsertErrors getInsertErrors() {
return errors;
}

public Throwable getException() {
return exception;
}

// Constructor for per-element errors (existing behavior)
public Context(TableDataInsertAllResponse.InsertErrors errors) {
this(errors, null);
}

// New constructor for both per-element errors and exceptions
public Context(TableDataInsertAllResponse.InsertErrors errors, Throwable exception) {
this.errors = errors;
this.exception = exception;
}
}

Expand Down Expand Up @@ -71,12 +86,18 @@ public boolean shouldRetry(Context context) {
};
}

/** Retry all failures except for known persistent errors. */
/** Retry all failures except for known persistent errors or non-retryable exceptions. */
public static InsertRetryPolicy retryTransientErrors() {
return new InsertRetryPolicy() {
@Override
public boolean shouldRetry(Context context) {
if (context.getInsertErrors().getErrors() != null) {
// Check for non-200 response exceptions first
if (context.getException() != null) {
// For now, assume non-200 responses are non-retryable unless specified otherwise
return false; // Could be refined later based on exception type (e.g., 503 vs 400)
}
// Existing logic for per-element errors
if (context.getInsertErrors() != null && context.getInsertErrors().getErrors() != null) {
for (ErrorProto error : context.getInsertErrors().getErrors()) {
if (error.getReason() != null && PERSISTENT_ERRORS.contains(error.getReason())) {
return false;
Expand Down
Loading