Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for top-level table properties table creation #34205

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ public void process(ProcessContext c) {}

// And digging to check whether the window is ready
when(mockEvaluationContext.createSideInputReader(anyList())).thenReturn(mockSideInputReader);
when(mockSideInputReader.isReady(ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(false);
when(mockSideInputReader.isReady(ArgumentMatchers.any(), ArgumentMatchers.any()))
.thenReturn(false);

IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(9));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ public void testPartialGroupByKeyWithCombinerAndSideInputs() throws Exception {
ImmutableList.of(
WindowedValue.valueInGlobalWindow(KV.of("hi", 4)),
WindowedValue.valueInGlobalWindow(KV.of("there", 5))));
when(mockSideInputFetcher.storeIfBlocked(ArgumentMatchers.<WindowedValue<KV<String, Integer>>>any()))
when(mockSideInputFetcher.storeIfBlocked(
ArgumentMatchers.<WindowedValue<KV<String, Integer>>>any()))
.thenReturn(false, false, false, true);

pgbkParDoFn.startBundle(receiver);
Expand Down Expand Up @@ -360,7 +361,8 @@ public void testCreateWithCombinerAndStreamingSideInputs() throws Exception {

when(mockSideInputReader.isEmpty()).thenReturn(false);
when(mockStreamingStepContext.stateInternals()).thenReturn((StateInternals) mockStateInternals);
when(mockStateInternals.state(ArgumentMatchers.<StateNamespace>any(), ArgumentMatchers.<StateTag>any()))
when(mockStateInternals.state(
ArgumentMatchers.<StateNamespace>any(), ArgumentMatchers.<StateTag>any()))
.thenReturn(mockState);
when(mockState.read()).thenReturn(Maps.newHashMap());

Expand Down Expand Up @@ -394,7 +396,8 @@ public void testCoderSizeEstimationWithNonLazyObserver() throws Exception {
return null;
})
.when(mockCoder)
.registerByteSizeObserver(ArgumentMatchers.eq("apple"), ArgumentMatchers.<ElementByteSizeObserver>any());
.registerByteSizeObserver(
ArgumentMatchers.eq("apple"), ArgumentMatchers.<ElementByteSizeObserver>any());
CoderSizeEstimator<String> estimator = new CoderSizeEstimator(mockCoder);
assertEquals(5, estimator.estimateSize("apple"));
}
Expand All @@ -410,7 +413,8 @@ public void testCoderSizeEstimationWithLazyObserver() throws Exception {
return null;
})
.when(mockCoder)
.registerByteSizeObserver(ArgumentMatchers.eq("apple"), ArgumentMatchers.<ElementByteSizeObserver>any());
.registerByteSizeObserver(
ArgumentMatchers.eq("apple"), ArgumentMatchers.<ElementByteSizeObserver>any());

// Encode the input to the output stream
doAnswer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ public RemoteEnvironment createEnvironment(Environment environment, String worke
String containerId = null;
InstructionRequestHandler instructionHandler = null;
try {
LOG.info("Running Docker command: image={}, opts={}, args={}", containerImage, dockerOptsBuilder.build(), argsBuilder.build());
LOG.info(
"Running Docker command: image={}, opts={}, args={}",
containerImage,
dockerOptsBuilder.build(),
argsBuilder.build());
containerId = docker.runImage(containerImage, dockerOptsBuilder.build(), argsBuilder.build());
LOG.debug("Created Docker Container with Container ID {}", containerId);
// Wait on a client from the gRPC server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,19 +587,11 @@ public class BigQueryIO {
private static final String DATASET_REGEXP = "[-\\w.]{1,1024}";

/**
* Regular expression that matches BigQuery Table IDs.
* Supports Unicode characters in categories:
* - L (letter)
* - M (mark)
* - N (number)
* As well as:
* - Underscore (_)
* - Dash (-)
* - Dollar sign ($)
* - At sign (@)
* - Space
* Regular expression that matches BigQuery Table IDs. Supports Unicode characters in categories:
* - L (letter) - M (mark) - N (number) As well as: - Underscore (_) - Dash (-) - Dollar sign ($)
* - At sign (@) - Space
*
* The pattern requires 1-1024 characters matching these categories.
* <p>The pattern requires 1-1024 characters matching these categories.
*/
private static final String TABLE_REGEXP = "[-_\\p{L}\\p{N}\\p{M}$@ ]{1,1024}";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
package org.apache.beam.sdk.io.iceberg;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.checkerframework.dataflow.qual.Pure;

@AutoValue
public abstract class IcebergTableCreateConfig {
public abstract class IcebergTableCreateConfig implements Serializable {

/** Schema for the destination, in the event that it must be dynamically created. */
@Pure
Expand All @@ -33,6 +36,9 @@ public abstract class IcebergTableCreateConfig {
@Pure
public abstract PartitionSpec getPartitionSpec();

@Nullable
public abstract Map<String, String> getTableProperties();

@Pure
public Builder builder() {
return new AutoValue_IcebergTableCreateConfig.Builder();
Expand All @@ -44,6 +50,8 @@ public abstract static class Builder {

public abstract Builder setPartitionSpec(PartitionSpec partitionSpec);

public abstract Builder setTableProperties(@Nullable Map<String, String> tableProperties);

@Pure
public abstract IcebergTableCreateConfig build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.io.iceberg.RecordWriterManager.DestinationState;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.WindowedValue;
Expand Down Expand Up @@ -277,7 +279,8 @@ static String getPartitionDataPath(
* implementation. Although it is expected, some implementations may not support creating a table
* using the Iceberg API.
*/
private Table getOrCreateTable(TableIdentifier identifier, Schema dataSchema) {
private Table getOrCreateTable(
TableIdentifier identifier, Schema dataSchema, IcebergDestination icebergDestination) {
@Nullable Table table = TABLE_CACHE.getIfPresent(identifier);
if (table == null) {
synchronized (TABLE_CACHE) {
Expand All @@ -288,7 +291,19 @@ private Table getOrCreateTable(TableIdentifier identifier, Schema dataSchema) {
org.apache.iceberg.Schema tableSchema =
IcebergUtils.beamSchemaToIcebergSchema(dataSchema);
// TODO(ahmedabu98): support creating a table with a specified partition spec
table = catalog.createTable(identifier, tableSchema);
PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
Map<String, String> tableProperties;
if (icebergDestination.getTableCreateConfig() != null) {
tableProperties = icebergDestination.getTableCreateConfig().getTableProperties();
} else {
tableProperties = new HashMap<>();
}
table =
catalog.createTable(
identifier,
tableSchema,
partitionSpec,
tableProperties != null ? tableProperties : new HashMap<>());
LOG.info("Created Iceberg table '{}' with schema: {}", identifier, tableSchema);
} catch (AlreadyExistsException alreadyExistsException) {
// handle race condition where workers are concurrently creating the same table.
Expand Down Expand Up @@ -319,7 +334,7 @@ public boolean write(WindowedValue<IcebergDestination> icebergDestination, Row r
icebergDestination,
destination -> {
TableIdentifier identifier = destination.getValue().getTableIdentifier();
Table table = getOrCreateTable(identifier, row.getSchema());
Table table = getOrCreateTable(identifier, row.getSchema(), destination.getValue());
return new DestinationState(destination.getValue(), table);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down
Loading