diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 19a8ee1f0778..a4c704a30210 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -962,14 +962,20 @@ static class InsertBatchofRowsCallable implements Callable> { private final TableReference ref; private final Boolean skipInvalidRows; - private final Boolean ignoreUnkownValues; + private final Boolean ignoreUnknownValues; // Fixed typo from ignoreUnkownValues private final Bigquery client; private final FluentBackoff rateLimitBackoffFactory; private final List rows; private final AtomicLong maxThrottlingMsec; private final Sleeper sleeper; private final StreamingInsertsMetrics result; + // Nullable new fields + private final @Nullable InsertRetryPolicy retryPolicy; + private final @Nullable List> originalRows; + private final @Nullable List> failedInserts; + private final @Nullable ErrorContainer errorContainer; + // Original 9-parameter constructor InsertBatchofRowsCallable( TableReference ref, Boolean skipInvalidRows, @@ -980,15 +986,50 @@ static class InsertBatchofRowsCallable implements Callable> { AtomicLong maxThrottlingMsec, Sleeper sleeper, StreamingInsertsMetrics result) { + this( + ref, + skipInvalidRows, + ignoreUnknownValues, + client, + rateLimitBackoffFactory, + rows, + maxThrottlingMsec, + sleeper, + result, + null, + null, + null, + null); + } + + // New 13-parameter constructor + InsertBatchofRowsCallable( + TableReference ref, + Boolean skipInvalidRows, + Boolean ignoreUnknownValues, + Bigquery client, + FluentBackoff rateLimitBackoffFactory, + List rows, + AtomicLong maxThrottlingMsec, + Sleeper sleeper, + StreamingInsertsMetrics result, + @Nullable InsertRetryPolicy retryPolicy, + @Nullable List> originalRows, + @Nullable List> failedInserts, + @Nullable ErrorContainer errorContainer) { this.ref = ref; this.skipInvalidRows = skipInvalidRows; - this.ignoreUnkownValues = ignoreUnknownValues; + this.ignoreUnknownValues = ignoreUnknownValues; this.client = client; this.rateLimitBackoffFactory = rateLimitBackoffFactory; this.rows = rows; this.maxThrottlingMsec = maxThrottlingMsec; this.sleeper = sleeper; this.result = result; + this.retryPolicy = retryPolicy; + this.originalRows = originalRows; + this.failedInserts = failedInserts; + this.errorContainer = errorContainer; } @Override @@ -996,7 +1037,7 @@ public List call() throws Exception { TableDataInsertAllRequest content = new TableDataInsertAllRequest(); content.setRows(rows); content.setSkipInvalidRows(skipInvalidRows); - content.setIgnoreUnknownValues(ignoreUnkownValues); + content.setIgnoreUnknownValues(ignoreUnknownValues); final Bigquery.Tabledata.InsertAll insert = client @@ -1004,43 +1045,69 @@ public List call() throws Exception { .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), content) .setPrettyPrint(false); - // A backoff for rate limit exceeded errors. BackOff backoff1 = BackOffAdapter.toGcpBackOff(rateLimitBackoffFactory.backoff()); long totalBackoffMillis = 0L; while (true) { ServiceCallMetric serviceCallMetric = BigQueryUtils.writeCallMetric(ref); Instant start = Instant.now(); try { - List response = - insert.execute().getInsertErrors(); - if (response == null || response.isEmpty()) { + TableDataInsertAllResponse response = insert.execute(); + List responseErrors = + response.getInsertErrors(); + if (responseErrors == null || responseErrors.isEmpty()) { serviceCallMetric.call("ok"); } else { - for (TableDataInsertAllResponse.InsertErrors insertErrors : response) { + for (TableDataInsertAllResponse.InsertErrors insertErrors : responseErrors) { for (ErrorProto insertError : insertErrors.getErrors()) { serviceCallMetric.call(insertError.getReason()); } } } result.updateSuccessfulRpcMetrics(start, Instant.now()); - return response; - } catch (IOException e) { + return responseErrors; + } catch (GoogleJsonResponseException e) { GoogleJsonError.ErrorInfo errorInfo = getErrorInfo(e); - if (errorInfo == null) { - serviceCallMetric.call(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN); - result.updateFailedRpcMetrics(start, start, BigQuerySinkMetrics.UNKNOWN); - throw e; - } - String errorReason = errorInfo.getReason(); + String errorReason = errorInfo != null ? errorInfo.getReason() : "unknown"; serviceCallMetric.call(errorReason); result.updateFailedRpcMetrics(start, Instant.now(), errorReason); - /** - * TODO(BEAM-10584): Check for QUOTA_EXCEEDED error will be replaced by - * ApiErrorExtractor.INSTANCE.quotaExceeded(e) after the next release of - * GoogleCloudDataproc/hadoop-connectors - */ - if (!ApiErrorExtractor.INSTANCE.rateLimited(e) - && !errorInfo.getReason().equals(QUOTA_EXCEEDED)) { + + // Only apply retry policy and DLQ logic if retryPolicy is provided + if (retryPolicy != null) { + InsertRetryPolicy.Context context = new InsertRetryPolicy.Context(null, e); + if (!retryPolicy.shouldRetry(context)) { + // Non-retriable non-200 response: send all rows in this batch to DLQ if configured + LOG.warn( + "Non-retriable HTTP error {} for insertAll, sending to DLQ: {}", + e.getStatusCode(), + e.getMessage()); + List syntheticErrors = new ArrayList<>(); + for (int i = 0; i < rows.size(); i++) { + InsertErrors error = + new InsertErrors() + .setIndex((long) i) + .setErrors( + ImmutableList.of( + new ErrorProto() + .setReason(errorReason) + .setMessage(e.getMessage()))); + syntheticErrors.add(error); + if (originalRows != null + && failedInserts != null + && errorContainer != null + && i < originalRows.size()) { + @SuppressWarnings("unchecked") + List rawFailedInserts = (List) failedInserts; + @SuppressWarnings("unchecked") + ErrorContainer rawErrorContainer = (ErrorContainer) errorContainer; + rawErrorContainer.add(rawFailedInserts, error, ref, originalRows.get(i)); + } + } + return syntheticErrors; // Return errors to mark batch as failed + } + } + + // Retryable non-200 response or no retry policy + if (!ApiErrorExtractor.INSTANCE.rateLimited(e) && !errorReason.equals(QUOTA_EXCEEDED)) { if (ApiErrorExtractor.INSTANCE.badRequest(e) && e.getMessage().contains(NO_ROWS_PRESENT)) { LOG.error( @@ -1049,8 +1116,9 @@ public List call() throws Exception { + " or 0 to disable timeouts", e.getCause()); } - throw e; + throw e; // Let the outer loop handle non-rate-limit errors } + try (QuotaEventCloseable qec = new QuotaEvent.Builder() .withOperation("insert_all") diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicy.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicy.java index edaa88d87b7c..7d34c6ac7954 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicy.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicy.java @@ -28,19 +28,34 @@ public abstract class InsertRetryPolicy implements Serializable { /** * Contains information about a failed insert. * - *

Currently only the list of errors returned from BigQuery. In the future this may contain - * more information - e.g. how many times this insert has been retried, and for how long. + *

Contains the list of errors returned from BigQuery for per-element failures within a + * successful (200 OK) response, and an optional exception for non-successful (non-200) HTTP + * responses. In the future, this may include additional details, such as retry attempts or + * elapsed time. */ public static class Context { // A list of all errors corresponding to an attempted insert of a single record. final TableDataInsertAllResponse.InsertErrors errors; + // Exception thrown for non-successful (non-200) HTTP responses, if applicable. + final Throwable exception; public TableDataInsertAllResponse.InsertErrors getInsertErrors() { return errors; } + public Throwable getException() { + return exception; + } + + // Constructor for per-element errors (existing behavior) public Context(TableDataInsertAllResponse.InsertErrors errors) { + this(errors, null); + } + + // New constructor for both per-element errors and exceptions + public Context(TableDataInsertAllResponse.InsertErrors errors, Throwable exception) { this.errors = errors; + this.exception = exception; } } @@ -71,12 +86,18 @@ public boolean shouldRetry(Context context) { }; } - /** Retry all failures except for known persistent errors. */ + /** Retry all failures except for known persistent errors or non-retryable exceptions. */ public static InsertRetryPolicy retryTransientErrors() { return new InsertRetryPolicy() { @Override public boolean shouldRetry(Context context) { - if (context.getInsertErrors().getErrors() != null) { + // Check for non-200 response exceptions first + if (context.getException() != null) { + // For now, assume non-200 responses are non-retryable unless specified otherwise + return false; // Could be refined later based on exception type (e.g., 503 vs 400) + } + // Existing logic for per-element errors + if (context.getInsertErrors() != null && context.getInsertErrors().getErrors() != null) { for (ErrorProto error : context.getInsertErrors().getErrors()) { if (error.getReason() != null && PERSISTENT_ERRORS.contains(error.getReason())) { return false;