Skip to content

Commit

Permalink
fix for adding unexpected Empty Records in Nested Arrays in BigQueryIO (
Browse files Browse the repository at this point in the history
#34102)

* fix for adding unexpected Empty Records in Nested Arrays

* comment

* Add test

* spotless

* new test, reference bug.

* should not produce null elements for repeated value.

* fix test

* changes after review

* changes after review

* changes after review

* changes after review
  • Loading branch information
stankiewicz authored Mar 4, 2025
1 parent 0faa7b8 commit 14b1fa5
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -581,8 +581,10 @@ void addMessage(
appendClientInfo = getAppendClientInfo(true, null);
}
@Nullable TableRow unknownFields = payload.getUnknownFields();
if (unknownFields != null) {
if (unknownFields != null && !unknownFields.isEmpty()) {
try {
// TODO(34145, radoslaws): concat will work for unknownFields that are primitive type,
// will cause issues with nested and repeated fields
payloadBytes =
payloadBytes.concat(
Preconditions.checkStateNotNull(appendClientInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,10 +536,16 @@ public static DynamicMessage messageFromMap(
// For STRUCT fields, we add a placeholder to unknownFields using the getNestedUnknown
// supplier (in case we encounter unknown nested fields). If the placeholder comes out
// to be empty, we should clean it up
if (fieldSchemaInformation.getType().equals(TableFieldSchema.Type.STRUCT)
&& unknownFields != null
&& unknownFields.get(key) instanceof Map
&& ((Map<?, ?>) unknownFields.get(key)).isEmpty()) {
if ((fieldSchemaInformation.getType().equals(TableFieldSchema.Type.STRUCT)
&& unknownFields != null)
&& ((unknownFields.get(key) instanceof Map
&& ((Map<?, ?>) unknownFields.get(key)).isEmpty()) // single struct, empty
|| (unknownFields.get(key)
instanceof List // repeated struct, empty list or list with empty structs
&& (((List<?>) unknownFields.get(key)).isEmpty()
|| ((List<?>) unknownFields.get(key))
.stream()
.allMatch(row -> row == null || ((Map<?, ?>) row).isEmpty()))))) {
unknownFields.remove(key);
}
} catch (Exception e) {
Expand All @@ -551,7 +557,6 @@ public static DynamicMessage messageFromMap(
e);
}
}

if (changeType != null) {
builder.setField(
Preconditions.checkStateNotNull(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.gcp.bigquery;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand All @@ -40,6 +41,7 @@
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -1671,6 +1673,86 @@ public void testIgnoreUnknownRepeatedNestedField() throws Exception {
assertEquals("foobar_doubly_nested", unknownDoublyNestedStruct.get("unknown_doubly_nested"));
}

@Test
public void testIgnoreUnknownRepeatedNestedFieldWithNoUnknowns() throws Exception {

List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("foo").setType("STRING"));
fields.add(
new TableFieldSchema()
.setName("repeated1")
.setMode("REPEATED")
.setType("RECORD")
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("key1").setType("STRING").setMode("REQUIRED"),
new TableFieldSchema().setName("key2").setType("STRING"))));
TableSchema schema = new TableSchema().setFields(fields);
TableRow tableRow =
new TableRow()
.set("foo", "bar")
.set(
"repeated1",
ImmutableList.of(
new TableCell().set("key1", "valueA").set("key2", "valueC"),
new TableCell().set("key1", "valueB").set("key2", "valueD")));

Descriptor descriptor =
TableRowToStorageApiProto.getDescriptorFromTableSchema(schema, true, false);
TableRowToStorageApiProto.SchemaInformation schemaInformation =
TableRowToStorageApiProto.SchemaInformation.fromTableSchema(schema);
TableRow unknown = new TableRow();
DynamicMessage msg =
TableRowToStorageApiProto.messageFromTableRow(
schemaInformation, descriptor, tableRow, true, false, unknown, null, -1);
assertEquals(2, msg.getAllFields().size());
assertTrue(unknown.isEmpty());
}

@Test
public void testIgnoreUnknownRepeatedNestedFieldWithUnknownInRepeatedField() throws Exception {

List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("foo").setType("STRING"));
fields.add(
new TableFieldSchema()
.setName("repeated1")
.setMode("REPEATED")
.setType("RECORD")
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("key1").setType("STRING").setMode("REQUIRED"),
new TableFieldSchema().setName("key2").setType("STRING"))));
TableSchema schema = new TableSchema().setFields(fields);
TableRow tableRow =
new TableRow()
.set("foo", "bar")
.set(
"repeated1",
ImmutableList.of(
new TableCell().set("key1", "valueA").set("key2", "valueC"),
new TableCell()
.set("key1", "valueB")
.set("key2", "valueD")
.set("unknown", "valueE")));

Descriptor descriptor =
TableRowToStorageApiProto.getDescriptorFromTableSchema(schema, true, false);
TableRowToStorageApiProto.SchemaInformation schemaInformation =
TableRowToStorageApiProto.SchemaInformation.fromTableSchema(schema);
TableRow unknown = new TableRow();
DynamicMessage msg =
TableRowToStorageApiProto.messageFromTableRow(
schemaInformation, descriptor, tableRow, true, false, unknown, null, -1);
assertEquals(2, msg.getAllFields().size());
assertFalse(unknown.isEmpty());
assertEquals(2, ((List<?>) unknown.get("repeated1")).size());
assertNotNull(((List<?>) unknown.get("repeated1")).get(0));
assertNotNull(((List<?>) unknown.get("repeated1")).get(1));
assertTrue(((TableRow) ((List<?>) unknown.get("repeated1")).get(0)).isEmpty());
assertEquals("valueE", ((TableRow) ((List<?>) unknown.get("repeated1")).get(1)).get("unknown"));
}

@Test
public void testCdcFields() throws Exception {
TableRow tableRow =
Expand Down

0 comments on commit 14b1fa5

Please sign in to comment.