Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Managed Iceberg] unbounded source #33504

Open
wants to merge 36 commits into
base: master
Choose a base branch
from

Conversation

ahmedabu98
Copy link
Contributor

@ahmedabu98 ahmedabu98 commented Jan 6, 2025

Unbounded (streaming) source for Managed Iceberg.

See design doc for high level overview: https://s.apache.org/beam-iceberg-incremental-source

Fixes #33092

@ahmedabu98 ahmedabu98 marked this pull request as draft January 6, 2025 18:16
@ahmedabu98 ahmedabu98 marked this pull request as ready for review January 30, 2025 21:09
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@ahmedabu98
Copy link
Contributor Author

R: @kennknowles
R: @regadas

Can y'all take a look? I still have to write some tests, but it's at a good spot for a first round of reviews. I ran a bunch of pipelines (w/Legacy DataflowRunner) at different scales and the throughput/scalability looks good.

Copy link
Contributor

github-actions bot commented Feb 3, 2025

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I think all the pieces are in the right place. Just a question about why an SDF is the way it is and a couple code-level comments.

This seems like something you want to test a lot of different ways before it gets into a release. Maybe get another set of eyes like @chamikaramj or @Abacn too. But I'm approving and leaving to your judgment.

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait actually I forgot I want to have the discussion about the high level toggle between incremental scan source and bounded source.

@github-actions github-actions bot added the build label Feb 13, 2025
…rk progress; convert GiB output iterable to list because of RunnerV2 bug
@github-actions github-actions bot added the model label Mar 3, 2025
Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

* <tr>
* <td> {@code to_timestamp} </td>
* <td> {@code long} </td>
* <td> Reads up to the latest snapshot (inclusive) created before this timestamp (in milliseconds).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this also optional (similar to to_snapshot) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, all the new configuration parameters are optional

* <li>{@code earliest}: starts reading from the earliest snapshot</li>
* <li>{@code latest}: starts reading from the latest snapshot</li>
* </ul>
* <p>Defaults to {@code earliest} for batch, and {@code latest} for streaming.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By "streaming" do you mean the PipelineOption [1] the Iceberg config (defined below) or both ?

[1]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streaming in the context of IcebergIO, so the config. I'll move up the streaming row so ppl will see it first and reference it.

* <td>
* The column used to derive event time to track progress. Must be of type:
* <ul>
* <li>{@code timestamp}</li>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate what you mean here by timestamp and timestamptz types ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are Iceberg types: https://iceberg.apache.org/spec/#primitive-types. Will include this link

* </td>
* </tr>
* <tr>
* <td> {@code streaming} </td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if both this and to_snapshot or to_timestamp are set ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mentioned below in the "Choosing an End Point (CDC only)" section.

It will still be a streaming pipeline, which will stop by itself after processing the end snapshot. Similar to how PeriodicImpulse behaves.

@ahmedabu98
Copy link
Contributor Author

@chamikaramj this is ready for another review

.discardingFiredPanes())
.apply(
GroupIntoBatches.<ReadTaskDescriptor, ReadTask>ofByteSize(
MAX_FILES_BATCH_BYTE_SIZE, ReadTask::getByteSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't really want these batches, we just want the read tasks distributed to workers without causing worker ooms. Otherwise we're just adding latency for the poll latency and not really benefitting from the batch.

Ideally we could change Redistribute to autoshard, but since it is tied to GroupIntoBatches currently, what about just doing GroupIntoBatches.ofSize(1).withShardedKey() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially figured that GroupIntoBatches.ofSize(1).withShardedKey() would give us the same problem of having too many concurrent shards, but when I ran it I found it actually produces only 1 shard, processing everything sequentially. Same thing when I tried .ofByteSize(1)

.setCoder(KvCoder.of(ReadTaskDescriptor.getCoder(), ReadTask.getCoder()))
.apply(
Window.<KV<ReadTaskDescriptor, ReadTask>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this trigger be removed? seems like the GiB does the triggering so I'm not sure if this has effect (or if it does if it is intended).

String tableIdentifier = element.getKey().getTableIdentifierString();
ReadTask readTask = element.getValue();
Table table = TableCache.get(tableIdentifier, scanConfig.getCatalogConfig().catalog());
Schema dataSchema = IcebergUtils.icebergSchemaToBeamSchema(table.schema());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like some additional things that could be cached.

Or better yet can the schemas be built at pipeline construction time? Having well-defined schemas seems like it will help for pipeline update compatability. Each one now is going to get a unique uuid

scanTasksCompleted.inc();
}

// infinite skew in case we encounter some files that don't support watermark column statistics,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like this will either:

  • hold the watermark
  • output late stuff that will be dropped

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this would be needed (at least at this transform) if the read tasks had the right event timestamp coming in, since we woudl just assign that timestamp to all records within the snapshot

Schema dataSchema = IcebergUtils.icebergSchemaToBeamSchema(table.schema());
Schema outputCdcSchema = ReadUtils.outputCdcSchema(dataSchema);

Instant outputTimestamp = ReadUtils.getReadTaskTimestamp(readTask, scanConfig);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like this should be done when creating the read task, that way it will hold the watermark up appropriately while the task is being shuffled etc.


return isComplete
? PollResult.complete(timestampedSnapshots) // stop at specified snapshot
: PollResult.incomplete(timestampedSnapshots); // continue forever
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to generate a correct watermark here using
PollResult.withWatermark

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. LGTM.

* <td> {@code operation} </td>
* <td> {@code string} </td>
* <td>
* The snapshot <a href="https://iceberg.apache.org/javadoc/0.11.0/org/apache/iceberg/DataOperations">operation</a> associated with this record. For now, only "append" is supported.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be change to "APPEND" to be consistent with Iceberg.

*
* <p><b>Note</b>: This reads <b>append-only</b> snapshots. Full CDC is not supported yet.
*
* <p>The CDC <b>streaming</b> source (enabled with {@code streaming=true}) continuously polls the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should validate (and fail) somewhere if the "streaming" flag is set here and the streaming PipelineOption [1] is not set.

[1]

@@ -108,6 +110,7 @@ public class Managed {
*
* <ul>
* <li>{@link Managed#ICEBERG} : Read from Apache Iceberg tables
* <li>{@link Managed#ICEBERG_CDC} : CDC Read from Apache Iceberg tables
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should link to locations where users can find additional Javadocs related to each of these options (also for write).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature Request]: {Managed IO Iceberg} - Allow users to run streaming reads
4 participants