Skip to content

Commit

Permalink
new test, reference bug.
Browse files Browse the repository at this point in the history
  • Loading branch information
stankiewicz committed Mar 3, 2025
1 parent 84c85fd commit b4c03d8
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -581,8 +581,10 @@ void addMessage(
appendClientInfo = getAppendClientInfo(true, null);
}
@Nullable TableRow unknownFields = payload.getUnknownFields();
if (unknownFields != null) {
if (unknownFields != null && !unknownFields.isEmpty()) {
try {
// TODO(34145, radoslaws): concat will work for unknownFields that are primitive type,
// will cause issues with nested and repeated fields
payloadBytes =
payloadBytes.concat(
Preconditions.checkStateNotNull(appendClientInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Predicate;
Expand Down Expand Up @@ -544,8 +545,10 @@ public static DynamicMessage messageFromMap(
unknownFields.remove(key);
} else if (unknownFields.get(key) instanceof List) { // repeated
((List<?>) unknownFields.get(key))
.removeIf(next -> next instanceof Map && ((Map<?, ?>) next).isEmpty());
if (((List<?>) unknownFields.get(key)).isEmpty()) {
.replaceAll(
next -> next instanceof Map && ((Map<?, ?>) next).isEmpty() ? null : next);
if (((List<?>) unknownFields.get(key)).isEmpty()
|| ((List<?>) unknownFields.get(key)).stream().allMatch(Objects::isNull)) {
unknownFields.remove(key);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.beam.sdk.io.gcp.bigquery;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -1673,7 +1675,7 @@ public void testIgnoreUnknownRepeatedNestedField() throws Exception {
}

@Test
public void testIgnoreUnknownRepeatedNestedField2() throws Exception {
public void testIgnoreUnknownRepeatedNestedFieldWithNoUnknowns() throws Exception {

List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("foo").setType("STRING"));
Expand Down Expand Up @@ -1705,11 +1707,52 @@ public void testIgnoreUnknownRepeatedNestedField2() throws Exception {
TableRowToStorageApiProto.messageFromTableRow(
schemaInformation, descriptor, tableRow, true, false, unknown, null, -1);
assertEquals(2, msg.getAllFields().size());

System.out.println(unknown);
assertTrue(unknown.isEmpty());
}

@Test
public void testIgnoreUnknownRepeatedNestedFieldWithUknownInRepeatedField() throws Exception {

List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("foo").setType("STRING"));
fields.add(
new TableFieldSchema()
.setName("repeated1")
.setMode("REPEATED")
.setType("RECORD")
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("key1").setType("STRING").setMode("REQUIRED"),
new TableFieldSchema().setName("key2").setType("STRING"))));
TableSchema schema = new TableSchema().setFields(fields);
TableRow tableRow =
new TableRow()
.set("foo", "bar")
.set(
"repeated1",
ImmutableList.of(
new TableCell().set("key1", "valueA").set("key2", "valueC"),
new TableCell()
.set("key1", "valueB")
.set("key2", "valueD")
.set("unknown", "valueE")));

Descriptor descriptor =
TableRowToStorageApiProto.getDescriptorFromTableSchema(schema, true, false);
TableRowToStorageApiProto.SchemaInformation schemaInformation =
TableRowToStorageApiProto.SchemaInformation.fromTableSchema(schema);
TableRow unknown = new TableRow();
DynamicMessage msg =
TableRowToStorageApiProto.messageFromTableRow(
schemaInformation, descriptor, tableRow, true, false, unknown, null, -1);
assertEquals(2, msg.getAllFields().size());
assertFalse(unknown.isEmpty());
assertEquals(2, ((List<?>) unknown.get("repeated1")).size());
assertNull(((List<?>) unknown.get("repeated1")).get(0));
assertNotNull(((List<?>) unknown.get("repeated1")).get(1));
assertEquals("valueE", ((TableRow) ((List<?>) unknown.get("repeated1")).get(1)).get("unknown"));
}

@Test
public void testCdcFields() throws Exception {
TableRow tableRow =
Expand Down

0 comments on commit b4c03d8

Please sign in to comment.