Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java] Added Metrics Configuration Support to Iceberg Data Writers #34140

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetricsConfig; // Add this import
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
Expand Down Expand Up @@ -59,6 +60,9 @@ class RecordWriter {
throws IOException {
this.table = table;
this.fileFormat = fileFormat;

MetricsConfig metricsConfig = MetricsConfig.forTable(table);

if (table.spec().isUnpartitioned()) {
absoluteFilename =
fileFormat.addExtension(table.locationProvider().newDataLocation(filename));
Expand All @@ -80,6 +84,7 @@ class RecordWriter {
.schema(table.schema())
.withSpec(table.spec())
.withPartition(partitionKey)
.metricsConfig(metricsConfig) // Pass the MetricsConfig
.overwrite()
.build();
break;
Expand All @@ -90,6 +95,7 @@ class RecordWriter {
.schema(table.schema())
.withSpec(table.spec())
.withPartition(partitionKey)
.metricsConfig(metricsConfig) // Pass the MetricsConfig
.overwrite()
.build();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.hamcrest.Matchers.either;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

Expand All @@ -46,11 +47,13 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
Expand Down Expand Up @@ -153,15 +156,17 @@ public void testCreateNewWriterForEachDestination() throws IOException {
assertEquals(3, writerManager.openWriters);

// dest4
// This is a new destination, but the writer manager is saturated with 3 writers. reject the
// This is a new destination, but the writer manager is saturated with 3
// writers. reject the
// record
row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
writeSuccess = writerManager.write(dest4, row);
assertFalse(writeSuccess);
assertEquals(3, writerManager.openWriters);

// dest3, partition: [aaa, false]
// new partition, but the writer manager is saturated with 3 writers. reject the record
// new partition, but the writer manager is saturated with 3 writers. reject the
// record
row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", false).build();
writeSuccess = writerManager.write(dest3, row);
assertFalse(writeSuccess);
Expand Down Expand Up @@ -214,7 +219,8 @@ public void testCreateNewWriterForEachPartition() throws IOException {
assertEquals(3, writerManager.openWriters);

// partition: [aaa, false].
// The writerManager is already saturated with three writers. This record is rejected.
// The writerManager is already saturated with three writers. This record is
// rejected.
row = Row.withSchema(BEAM_SCHEMA).addValues(5, "aaa123", false).build();
writeSuccess = writerManager.write(windowedDestination, row);
assertFalse(writeSuccess);
Expand Down Expand Up @@ -719,4 +725,113 @@ public void close() throws IOException {
throw new IOException("I am failing!");
}
}

@Test
public void testColumnSpecificMetricsCollection() throws IOException {
// Set up table with column-specific metrics enabled
TableIdentifier tableId = TableIdentifier.of("default", "test_column_metrics");
Table table = warehouse.createTable(tableId, ICEBERG_SCHEMA, PARTITION_SPEC);
table
.updateProperties()
.set("write.metadata.metrics.column.id", "counts")
.set("write.metadata.metrics.column.name", "counts")
.commit();

RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 1000, 3);

Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
boolean writeSuccess = writerManager.write(windowedDestination, row);
assertTrue("Write operation should succeed", writeSuccess);
writerManager.close();

Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>> dataFiles =
writerManager.getSerializableDataFiles();
assertFalse("Data files should not be empty", dataFiles.isEmpty());

for (Map.Entry<WindowedValue<IcebergDestination>, List<SerializableDataFile>> entry :
dataFiles.entrySet()) {
Table tableToCommit = catalog.loadTable(entry.getKey().getValue().getTableIdentifier());
AppendFiles appendFiles = tableToCommit.newAppend();
for (SerializableDataFile dataFile : entry.getValue()) {
appendFiles.appendFile(dataFile.createDataFile(tableToCommit.specs()));
}
appendFiles.commit();
tableToCommit.refresh();
}

// Verify the correct table (where data was written)
Table dataTable =
catalog.loadTable(
TableIdentifier.of("default", "table_testColumnSpecificMetricsCollection"));
dataTable.refresh();

// Verify that a snapshot exists
Snapshot snapshot = dataTable.currentSnapshot();
assertNotNull("Table should have a snapshot after writing data", snapshot);

// Verify metrics are collected only for specified columns
DataFile dataFile = snapshot.addedDataFiles(dataTable.io()).iterator().next();
assertNotNull(dataFile.valueCounts());
assertNotNull(dataFile.nullValueCounts());
assertNotNull(dataFile.columnSizes());
}

@Test
public void testDefaultMetrics() throws IOException {
// Set up table with default metrics enabled
TableIdentifier tableId = TableIdentifier.of("default", "test_default_metrics");
Table table = warehouse.createTable(tableId, ICEBERG_SCHEMA, PARTITION_SPEC);
table.updateProperties().set("write.metadata.metrics.default", "full").commit();

// Create a RecordWriterManager
RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 1000, 3);

// Write a row
Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
boolean writeSuccess = writerManager.write(windowedDestination, row);
assertTrue("Write operation should succeed", writeSuccess);
writerManager.close();

// Manually commit the data files to the Iceberg table
Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>> dataFiles =
writerManager.getSerializableDataFiles();
assertFalse("Data files should not be empty", dataFiles.isEmpty());

for (Map.Entry<WindowedValue<IcebergDestination>, List<SerializableDataFile>> entry :
dataFiles.entrySet()) {
Table tableToCommit = catalog.loadTable(entry.getKey().getValue().getTableIdentifier());
AppendFiles appendFiles = tableToCommit.newAppend();
for (SerializableDataFile dataFile : entry.getValue()) {
appendFiles.appendFile(dataFile.createDataFile(tableToCommit.specs()));
}
appendFiles.commit();
tableToCommit.refresh();
}

// Verify the correct table (where data was written)
Table dataTable = catalog.loadTable(TableIdentifier.of("default", "test_default_metrics"));
dataTable.refresh();

// Verify that a snapshot exists
Snapshot snapshot = dataTable.currentSnapshot();
assertNotNull("Table should have a snapshot after writing data", snapshot);

// Verify metrics are collected for all columns
DataFile dataFile = snapshot.addedDataFiles(dataTable.io()).iterator().next();
assertNotNull(dataFile.valueCounts());
assertNotNull(dataFile.nullValueCounts());
assertNotNull(dataFile.columnSizes());

// Verify metrics are collected for all columns
Map<Integer, Long> valueCounts = dataFile.valueCounts();
Map<Integer, Long> nullValueCounts = dataFile.nullValueCounts();
Map<Integer, Long> columnSizes = dataFile.columnSizes();

for (int i = 1; i <= ICEBERG_SCHEMA.columns().size(); i++) {
assertTrue("Value counts should be collected for column " + i, valueCounts.containsKey(i));
assertTrue(
"Null value counts should be collected for column " + i, nullValueCounts.containsKey(i));
assertTrue("Column sizes should be collected for column " + i, columnSizes.containsKey(i));
}
}
}
Loading