Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java] Added Metrics Configuration Support to Iceberg Data Writers #34140

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetricsConfig; // Add this import
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
Expand Down Expand Up @@ -59,6 +60,9 @@ class RecordWriter {
throws IOException {
this.table = table;
this.fileFormat = fileFormat;

MetricsConfig metricsConfig = MetricsConfig.forTable(table);

if (table.spec().isUnpartitioned()) {
absoluteFilename =
fileFormat.addExtension(table.locationProvider().newDataLocation(filename));
Expand All @@ -80,6 +84,7 @@ class RecordWriter {
.schema(table.schema())
.withSpec(table.spec())
.withPartition(partitionKey)
.metricsConfig(metricsConfig)
.overwrite()
.build();
break;
Expand All @@ -90,6 +95,7 @@ class RecordWriter {
.schema(table.schema())
.withSpec(table.spec())
.withPartition(partitionKey)
.metricsConfig(metricsConfig)
.overwrite()
.build();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
Expand All @@ -46,11 +48,13 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
Expand Down Expand Up @@ -153,15 +157,17 @@ public void testCreateNewWriterForEachDestination() throws IOException {
assertEquals(3, writerManager.openWriters);

// dest4
// This is a new destination, but the writer manager is saturated with 3 writers. reject the
// This is a new destination, but the writer manager is saturated with 3
// writers. reject the
// record
row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
writeSuccess = writerManager.write(dest4, row);
assertFalse(writeSuccess);
assertEquals(3, writerManager.openWriters);

// dest3, partition: [aaa, false]
// new partition, but the writer manager is saturated with 3 writers. reject the record
// new partition, but the writer manager is saturated with 3 writers. reject the
// record
row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", false).build();
writeSuccess = writerManager.write(dest3, row);
assertFalse(writeSuccess);
Expand Down Expand Up @@ -214,7 +220,8 @@ public void testCreateNewWriterForEachPartition() throws IOException {
assertEquals(3, writerManager.openWriters);

// partition: [aaa, false].
// The writerManager is already saturated with three writers. This record is rejected.
// The writerManager is already saturated with three writers. This record is
// rejected.
row = Row.withSchema(BEAM_SCHEMA).addValues(5, "aaa123", false).build();
writeSuccess = writerManager.write(windowedDestination, row);
assertFalse(writeSuccess);
Expand Down Expand Up @@ -508,7 +515,7 @@ public void testIdentityPartitioning() throws IOException {
for (Schema.Field field : primitiveTypeSchema.getFields()) {
Object val = checkStateNotNull(row.getValue(field.getName()));
if (dateTypes.contains(field.getName())) {
val = URLEncoder.encode(val.toString(), StandardCharsets.UTF_8.toString());
val = URLEncoder.encode(val.toString(), UTF_8.toString());
}
expectedPartitions.add(field.getName() + "=" + val);
}
Expand Down Expand Up @@ -719,4 +726,136 @@ public void close() throws IOException {
throw new IOException("I am failing!");
}
}

@Test
public void testColumnSpecificMetricsCollection() throws IOException {
// Set up table with column-specific metrics enabled
TableIdentifier tableId = TableIdentifier.of("default", "test_column_metrics");
Table table = warehouse.createTable(tableId, ICEBERG_SCHEMA, PARTITION_SPEC);
table
.updateProperties()
.set("write.metadata.metrics.column.id", "counts")
.set("write.metadata.metrics.column.name", "counts")
.commit();

RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 1000, 3);

// Write multiple rows
Row row1 = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
Row row2 = Row.withSchema(BEAM_SCHEMA).addValues(2, "bbb", false).build();
Row row3 = Row.withSchema(BEAM_SCHEMA).addValues(3, "ccc", true).build();
assertTrue("Write operation should succeed", writerManager.write(windowedDestination, row1));
assertTrue("Write operation should succeed", writerManager.write(windowedDestination, row2));
assertTrue("Write operation should succeed", writerManager.write(windowedDestination, row3));
writerManager.close();

Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>> dataFiles =
writerManager.getSerializableDataFiles();
assertFalse("Data files should not be empty", dataFiles.isEmpty());

// Commit using the same table instance
for (Map.Entry<WindowedValue<IcebergDestination>, List<SerializableDataFile>> entry :
dataFiles.entrySet()) {
AppendFiles appendFiles = table.newAppend();
for (SerializableDataFile dataFile : entry.getValue()) {
appendFiles.appendFile(dataFile.createDataFile(table.specs()));
}
appendFiles.commit();
}
table.refresh(); // Single refresh after all commits

// Verify using the same table
Snapshot snapshot = table.currentSnapshot();
assertNotNull("Table should have a snapshot after writing data", snapshot);

// Verify metrics are collected for specified columns in one file
DataFile dataFile = snapshot.addedDataFiles(table.io()).iterator().next();
Map<Integer, Long> valueCounts = dataFile.valueCounts();
assertNotNull("Value counts should not be null", valueCounts);
assertTrue("Value counts should exist for id (column 1)", valueCounts.containsKey(1));
assertEquals("Value count for id should be 1 in this file", 1L, valueCounts.get(1).longValue());
assertTrue("Value counts should exist for name (column 2)", valueCounts.containsKey(2));
assertEquals(
"Value count for name should be 1 in this file", 1L, valueCounts.get(2).longValue());
// Note: bool (column 3) may have metrics due to default behavior; not explicitly excluded

assertNotNull(dataFile.nullValueCounts());
assertNotNull(dataFile.columnSizes());
}

@Test
public void testDefaultMetrics() throws IOException {
// Set up table with default metrics enabled
TableIdentifier tableId = TableIdentifier.of("default", "test_default_metrics");
Table table = warehouse.createTable(tableId, ICEBERG_SCHEMA, PARTITION_SPEC);
table.updateProperties().set("write.metadata.metrics.default", "full").commit();

// Create a RecordWriterManager
RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 1000, 3);

// Write multiple rows
Row row1 = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
Row row2 = Row.withSchema(BEAM_SCHEMA).addValues(2, "bbb", false).build();
Row row3 = Row.withSchema(BEAM_SCHEMA).addValues(3, "ccc", true).build();
assertTrue("Write operation should succeed", writerManager.write(windowedDestination, row1));
assertTrue("Write operation should succeed", writerManager.write(windowedDestination, row2));
assertTrue("Write operation should succeed", writerManager.write(windowedDestination, row3));
writerManager.close();

// Manually commit the data files to the Iceberg table
Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>> dataFiles =
writerManager.getSerializableDataFiles();
assertFalse("Data files should not be empty", dataFiles.isEmpty());

// Commit using the same table instance
for (Map.Entry<WindowedValue<IcebergDestination>, List<SerializableDataFile>> entry :
dataFiles.entrySet()) {
AppendFiles appendFiles = table.newAppend();
for (SerializableDataFile dataFile : entry.getValue()) {
appendFiles.appendFile(dataFile.createDataFile(table.specs()));
}
appendFiles.commit();
}
table.refresh(); // Single refresh after all commits

// Verify using the same table
Snapshot snapshot = table.currentSnapshot();
assertNotNull("Table should have a snapshot after writing data", snapshot);

// Verify metrics are collected for all columns in one file
DataFile dataFile = snapshot.addedDataFiles(table.io()).iterator().next();
assertNotNull(dataFile.valueCounts());
assertNotNull(dataFile.nullValueCounts());
assertNotNull(dataFile.columnSizes());
assertNotNull(dataFile.lowerBounds());
assertNotNull(dataFile.upperBounds());

Map<Integer, Long> valueCounts = dataFile.valueCounts();
Map<Integer, Long> nullValueCounts = dataFile.nullValueCounts();
Map<Integer, Long> columnSizes = dataFile.columnSizes();
Map<Integer, ByteBuffer> lowerBounds = dataFile.lowerBounds();
Map<Integer, ByteBuffer> upperBounds = dataFile.upperBounds();

for (int i = 1; i <= ICEBERG_SCHEMA.columns().size(); i++) {
assertTrue("Value counts should be collected for column " + i, valueCounts.containsKey(i));
assertEquals(
"Value count for column " + i + " should be 1 in this file",
1L,
valueCounts.get(i).longValue());
assertTrue(
"Null value counts should be collected for column " + i, nullValueCounts.containsKey(i));
assertTrue("Column sizes should be collected for column " + i, columnSizes.containsKey(i));
assertTrue("Lower bounds should be collected for column " + i, lowerBounds.containsKey(i));
assertTrue("Upper bounds should be collected for column " + i, upperBounds.containsKey(i));

// Verify bounds are non-null and equal (single-row file)
ByteBuffer lower = lowerBounds.get(i);
ByteBuffer upper = upperBounds.get(i);
assertNotNull("Lower bound for column " + i + " should not be null", lower);
assertNotNull("Upper bound for column " + i + " should not be null", upper);
assertTrue(
"Lower and upper bounds for column " + i + " should be equal in single-row file",
lower.equals(upper));
}
}
}
Loading