Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Managed Iceberg] unbounded source #33504

Open
wants to merge 36 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
bb87511
initial
ahmedabu98 Jan 6, 2025
853de4d
let CombinedScanTask do splitting (based on Parquet row groups)
ahmedabu98 Jan 7, 2025
69fd988
perf improv
ahmedabu98 Jan 7, 2025
da2f33f
create one read task descriptor per snapshot range
ahmedabu98 Jan 8, 2025
73c8992
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 Jan 22, 2025
81ca709
some improvements
ahmedabu98 Jan 25, 2025
e319d76
use GiB for streaming, Redistribute for batch; update docs
ahmedabu98 Jan 30, 2025
c25cd75
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 Jan 30, 2025
af1ec85
use static value
ahmedabu98 Feb 3, 2025
f5d3268
add some test
ahmedabu98 Feb 3, 2025
df40239
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 Feb 3, 2025
43ab88f
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 Feb 4, 2025
20db0ee
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 Feb 7, 2025
622625f
add a java doc; don't use static block to create coder
ahmedabu98 Feb 10, 2025
4c25d3f
spotless
ahmedabu98 Feb 10, 2025
8666166
add options: from/to timestamp, starting strategy, and streaming toggle
ahmedabu98 Feb 13, 2025
297c309
trigger integration tests
ahmedabu98 Feb 13, 2025
5e3a2cc
small test fix
ahmedabu98 Feb 13, 2025
8b131fd
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 Feb 14, 2025
887eff1
scan every snapshot individually; use snapshot commit timestamp to ma…
ahmedabu98 Feb 25, 2025
6cfc2d8
new schematransform for cdc streaming; add watermark configs
ahmedabu98 Mar 3, 2025
fbad86e
cleanup
ahmedabu98 Mar 3, 2025
50f9497
add guava import
ahmedabu98 Mar 3, 2025
4f1f40b
remove iceberg_cdc_read from xlang auto-wrapper gen
ahmedabu98 Mar 3, 2025
633365c
fix javadoc
ahmedabu98 Mar 3, 2025
37485f1
cleanup
ahmedabu98 Mar 3, 2025
4ede0e8
spotless
ahmedabu98 Mar 4, 2025
db9fd63
use CDC schema for batch and streaming; re-introduce boolean 'streami…
ahmedabu98 Mar 4, 2025
79ab16a
add to CHANGES.md and discussion docs
ahmedabu98 Mar 4, 2025
06a4cee
spotless
ahmedabu98 Mar 4, 2025
132034f
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 Mar 4, 2025
795c87c
address review comments about java docs
ahmedabu98 Mar 5, 2025
c6461c9
remove raw guava dep
ahmedabu98 Mar 5, 2025
7dbf3e1
add another test for read utils
ahmedabu98 Mar 5, 2025
5263a13
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
ahmedabu98 Mar 6, 2025
40fe4ab
use cached schemas
ahmedabu98 Mar 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion sdks/java/io/iceberg/bqms/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ plugins {

applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.iceberg.bqms',
shadowClosure: {},
shadowClosure: {
relocate "com.google.auth", getJavaRelocatedPath("bqms.com.google.auth")
},
exportJavadoc: false,
publish: false, // it's an intermediate jar for io-expansion-service
validateShadowJar: false
Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ dependencies {
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation "org.apache.parquet:parquet-column:$parquet_version"
implementation "org.apache.parquet:parquet-hadoop:$parquet_version"
implementation "org.apache.orc:orc-core:$orc_version"
implementation "org.apache.iceberg:iceberg-core:$iceberg_version"
implementation "org.apache.iceberg:iceberg-api:$iceberg_version"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.iceberg;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.IncrementalAppendScan;
import org.apache.iceberg.ScanTaskParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.io.CloseableIterable;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Scans the given {@link SnapshotRange}, and creates multiple {@link ReadTask}s. Each task
* represents a portion of a data file that was appended within the snapshot range.
*/
class CreateReadTasksDoFn extends DoFn<SnapshotRange, KV<ReadTaskDescriptor, ReadTask>> {
private static final Logger LOG = LoggerFactory.getLogger(CreateReadTasksDoFn.class);
private static final Counter numFileScanTasks =
Metrics.counter(CreateReadTasksDoFn.class, "numFileScanTasks");
private final IcebergCatalogConfig catalogConfig;

CreateReadTasksDoFn(IcebergCatalogConfig catalogConfig) {
this.catalogConfig = catalogConfig;
}

@ProcessElement
public void process(
@Element SnapshotRange range, OutputReceiver<KV<ReadTaskDescriptor, ReadTask>> out)
throws IOException, ExecutionException {
Table table = TableCache.get(range.getTableIdentifier(), catalogConfig.catalog());
@Nullable Long fromSnapshot = range.getFromSnapshotExclusive();
long toSnapshot = range.getToSnapshot();

LOG.info("Planning to scan snapshot range ({}, {}]", fromSnapshot, toSnapshot);
IncrementalAppendScan scan =
table
.newIncrementalAppendScan()
.toSnapshot(toSnapshot)
.option(TableProperties.SPLIT_SIZE, String.valueOf(TableProperties.SPLIT_SIZE_DEFAULT));
if (fromSnapshot != null) {
scan = scan.fromSnapshotExclusive(fromSnapshot);
}

try (CloseableIterable<CombinedScanTask> combinedScanTasks = scan.planTasks()) {
for (CombinedScanTask combinedScanTask : combinedScanTasks) {
// A single DataFile can be broken up into multiple FileScanTasks
// if it is large enough.
for (FileScanTask fileScanTask : combinedScanTask.tasks()) {
ReadTask task =
ReadTask.builder()
.setTableIdentifierString(range.getTableIdentifierString())
.setFileScanTaskJson(ScanTaskParser.toJson(fileScanTask))
.setByteSize(fileScanTask.sizeBytes())
.build();
ReadTaskDescriptor descriptor =
ReadTaskDescriptor.builder()
.setTableIdentifierString(range.getTableIdentifierString())
.build();
out.output(KV.of(descriptor, task));
numFileScanTasks.inc();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,25 @@ public static Builder builder() {
}

public org.apache.iceberg.catalog.Catalog catalog() {
if (cachedCatalog != null) {
return cachedCatalog;
if (cachedCatalog == null) {
String catalogName = getCatalogName();
if (catalogName == null) {
catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion();
}
Map<String, String> catalogProps = getCatalogProperties();
if (catalogProps == null) {
catalogProps = Maps.newHashMap();
}
Map<String, String> confProps = getConfigProperties();
if (confProps == null) {
confProps = Maps.newHashMap();
}
Configuration config = new Configuration();
for (Map.Entry<String, String> prop : confProps.entrySet()) {
config.set(prop.getKey(), prop.getValue());
}
cachedCatalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config);
}
String catalogName = getCatalogName();
if (catalogName == null) {
catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion();
}
Map<String, String> catalogProps = getCatalogProperties();
if (catalogProps == null) {
catalogProps = Maps.newHashMap();
}
Map<String, String> confProps = getConfigProperties();
if (confProps == null) {
confProps = Maps.newHashMap();
}
Configuration config = new Configuration();
for (Map.Entry<String, String> prop : confProps.entrySet()) {
config.set(prop.getKey(), prop.getValue());
}
cachedCatalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config);
return cachedCatalog;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,15 @@
* template to use dynamic destinations (see the `Dynamic Destinations` section below for details). </td>
* </tr>
* <tr>
* <td> {@code triggering_frequency_seconds} </td> <td> {@code int} </td> <td> Required for streaming writes. Roughly every
* <td> {@code triggering_frequency_seconds} </td>
* <td> {@code int} </td>
* <td>
* <p><b>Sink:</b> Required for streaming writes. Roughly every
* {@code triggering_frequency_seconds} duration, the sink will write records to data files and produce a table snapshot.
* Generally, a higher value will produce fewer, larger data files.
* </td>
* <p><b>Source:</b> Enables streaming reads. Roughly every {@code triggering_frequency_seconds} duration, the source
* will scan the table for new snapshots and read new records.
* </td>
* </tr>
* <tr>
* <td> {@code catalog_name} </td> <td> {@code str} </td> <td> The name of the catalog. Defaults to {@code apache-beam-<VERSION>}. </td>
Expand All @@ -101,6 +106,20 @@
* implementation, but <a href="https://iceberg.apache.org/docs/latest/configuration/#hadoop-configuration">this list</a>
* is a good starting point.
* </tr>
* <tr>
* <td> {@code from_snapshot_exclusive} </td>
* <td> {@code long} </td>
* <td> For the source; starts reading from this snapshot ID (exclusive). If unset, it will start reading from the
* oldest snapshot (inclusive).
* </td>
* </tr>
* <tr>
* <td> {@code to_snapshot} </td>
* <td> {@code long} </td>
* <td> For the source; Reads up to this snapshot ID (inclusive). If unset and the source is bounded, it will read
* up to the current snapshot (inclusive). If unset and source is unbounded, it will continue polling for new snapshots forever.
* </td>
* </tr>
* </table>
*
* <p><b>Additional configuration options are provided in the `Pre-filtering Options` section below,
Expand Down Expand Up @@ -405,6 +424,12 @@ public abstract static class ReadRows extends PTransform<PBegin, PCollection<Row

abstract @Nullable TableIdentifier getTableIdentifier();

abstract @Nullable Long getFromSnapshotExclusive();

abstract @Nullable Long getToSnapshot();

abstract @Nullable Duration getTriggeringFrequency();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -413,29 +438,56 @@ abstract static class Builder {

abstract Builder setTableIdentifier(TableIdentifier identifier);

abstract Builder setFromSnapshotExclusive(@Nullable Long fromSnapshotExclusive);

abstract Builder setToSnapshot(@Nullable Long toSnapshot);

abstract Builder setTriggeringFrequency(Duration triggeringFrequency);

abstract ReadRows build();
}

public ReadRows from(TableIdentifier tableIdentifier) {
return toBuilder().setTableIdentifier(tableIdentifier).build();
}

public ReadRows fromSnapshotExclusive(@Nullable Long fromSnapshotExclusive) {
return toBuilder().setFromSnapshotExclusive(fromSnapshotExclusive).build();
}

public ReadRows toSnapshot(@Nullable Long toSnapshot) {
return toBuilder().setToSnapshot(toSnapshot).build();
}

public ReadRows withTriggeringFrequency(Duration triggeringFrequency) {
return toBuilder().setTriggeringFrequency(triggeringFrequency).build();
}

@Override
public PCollection<Row> expand(PBegin input) {
TableIdentifier tableId =
checkStateNotNull(getTableIdentifier(), "Must set a table to read from.");

Table table = getCatalogConfig().catalog().loadTable(tableId);

return input.apply(
Read.from(
new ScanSource(
IcebergScanConfig.builder()
.setCatalogConfig(getCatalogConfig())
.setScanType(IcebergScanConfig.ScanType.TABLE)
.setTableIdentifier(tableId)
.setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema()))
.build())));
IcebergScanConfig scanConfig =
IcebergScanConfig.builder()
.setCatalogConfig(getCatalogConfig())
.setScanType(IcebergScanConfig.ScanType.TABLE)
.setTableIdentifier(tableId)
.setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema()))
.setFromSnapshotExclusive(getFromSnapshotExclusive())
.setToSnapshot(getToSnapshot())
.build();
if (getTriggeringFrequency() != null
|| scanConfig.getToSnapshot() != null
|| scanConfig.getFromSnapshotExclusive() != null) {
return input
.apply(new IncrementalScanSource(scanConfig, getTriggeringFrequency()))
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema()));
}

return input.apply(Read.from(new ScanSource(scanConfig)));
}
}
}
Loading
Loading