From b4c03d8c59705cc48e66092c62d07573817db178 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Mon, 3 Mar 2025 12:59:49 +0100 Subject: [PATCH] new test, reference bug. --- .../StorageApiWriteUnshardedRecords.java | 4 +- .../bigquery/TableRowToStorageApiProto.java | 7 ++- .../TableRowToStorageApiProtoTest.java | 49 +++++++++++++++++-- 3 files changed, 54 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 1b5f32f7e431..cbcd70753aca 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -581,8 +581,10 @@ void addMessage( appendClientInfo = getAppendClientInfo(true, null); } @Nullable TableRow unknownFields = payload.getUnknownFields(); - if (unknownFields != null) { + if (unknownFields != null && !unknownFields.isEmpty()) { try { + // TODO(34145, radoslaws): concat will work for unknownFields that are primitive type, + // will cause issues with nested and repeated fields payloadBytes = payloadBytes.concat( Preconditions.checkStateNotNull(appendClientInfo) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java index 0b294eadde37..e267b9c61071 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java @@ -54,6 +54,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.function.Predicate; @@ -544,8 +545,10 @@ public static DynamicMessage messageFromMap( unknownFields.remove(key); } else if (unknownFields.get(key) instanceof List) { // repeated ((List) unknownFields.get(key)) - .removeIf(next -> next instanceof Map && ((Map) next).isEmpty()); - if (((List) unknownFields.get(key)).isEmpty()) { + .replaceAll( + next -> next instanceof Map && ((Map) next).isEmpty() ? null : next); + if (((List) unknownFields.get(key)).isEmpty() + || ((List) unknownFields.get(key)).stream().allMatch(Objects::isNull)) { unknownFields.remove(key); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java index a3424dab3563..725f00b9a018 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java @@ -18,7 +18,9 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -1673,7 +1675,7 @@ public void testIgnoreUnknownRepeatedNestedField() throws Exception { } @Test - public void testIgnoreUnknownRepeatedNestedField2() throws Exception { + public void testIgnoreUnknownRepeatedNestedFieldWithNoUnknowns() throws Exception { List fields = new ArrayList<>(); fields.add(new TableFieldSchema().setName("foo").setType("STRING")); @@ -1705,11 +1707,52 @@ public void testIgnoreUnknownRepeatedNestedField2() throws Exception { TableRowToStorageApiProto.messageFromTableRow( schemaInformation, descriptor, tableRow, true, false, unknown, null, -1); assertEquals(2, msg.getAllFields().size()); - - System.out.println(unknown); assertTrue(unknown.isEmpty()); } + @Test + public void testIgnoreUnknownRepeatedNestedFieldWithUknownInRepeatedField() throws Exception { + + List fields = new ArrayList<>(); + fields.add(new TableFieldSchema().setName("foo").setType("STRING")); + fields.add( + new TableFieldSchema() + .setName("repeated1") + .setMode("REPEATED") + .setType("RECORD") + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("key1").setType("STRING").setMode("REQUIRED"), + new TableFieldSchema().setName("key2").setType("STRING")))); + TableSchema schema = new TableSchema().setFields(fields); + TableRow tableRow = + new TableRow() + .set("foo", "bar") + .set( + "repeated1", + ImmutableList.of( + new TableCell().set("key1", "valueA").set("key2", "valueC"), + new TableCell() + .set("key1", "valueB") + .set("key2", "valueD") + .set("unknown", "valueE"))); + + Descriptor descriptor = + TableRowToStorageApiProto.getDescriptorFromTableSchema(schema, true, false); + TableRowToStorageApiProto.SchemaInformation schemaInformation = + TableRowToStorageApiProto.SchemaInformation.fromTableSchema(schema); + TableRow unknown = new TableRow(); + DynamicMessage msg = + TableRowToStorageApiProto.messageFromTableRow( + schemaInformation, descriptor, tableRow, true, false, unknown, null, -1); + assertEquals(2, msg.getAllFields().size()); + assertFalse(unknown.isEmpty()); + assertEquals(2, ((List) unknown.get("repeated1")).size()); + assertNull(((List) unknown.get("repeated1")).get(0)); + assertNotNull(((List) unknown.get("repeated1")).get(1)); + assertEquals("valueE", ((TableRow) ((List) unknown.get("repeated1")).get(1)).get("unknown")); + } + @Test public void testCdcFields() throws Exception { TableRow tableRow =