Skip to content

Commit 1bf6d27

Browse files
authored
Refactor concurrency of incremental streams (#21)
1 parent 0589677 commit 1bf6d27

File tree

3 files changed

+142
-128
lines changed

3 files changed

+142
-128
lines changed

crates/iceberg/src/arrow/incremental.rs

Lines changed: 115 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::metadata_columns::{RESERVED_FIELD_ID_UNDERSCORE_POS, row_pos_field};
3434
use crate::runtime::spawn;
3535
use crate::scan::ArrowRecordBatchStream;
3636
use crate::scan::incremental::{
37-
AppendedFileScanTask, IncrementalFileScanTask, IncrementalFileScanTaskStream,
37+
AppendedFileScanTask, DeleteScanTask, IncrementalFileScanTaskStreams,
3838
};
3939
use crate::{Error, ErrorKind, Result};
4040

@@ -57,120 +57,6 @@ pub type CombinedIncrementalBatchRecordStream =
5757
/// Stream type for obtaining a separate stream of appended and deleted record batches.
5858
pub type UnzippedIncrementalBatchRecordStream = (ArrowRecordBatchStream, ArrowRecordBatchStream);
5959

60-
impl StreamsInto<ArrowReader, CombinedIncrementalBatchRecordStream>
61-
for IncrementalFileScanTaskStream
62-
{
63-
/// Takes a stream of `IncrementalFileScanTasks` and reads all the files. Returns a
64-
/// stream of Arrow `RecordBatch`es containing the data from the files.
65-
fn stream(self, reader: ArrowReader) -> Result<CombinedIncrementalBatchRecordStream> {
66-
let (appends, deletes) =
67-
StreamsInto::<ArrowReader, UnzippedIncrementalBatchRecordStream>::stream(self, reader)?;
68-
69-
let left = appends.map(|res| res.map(|batch| (IncrementalBatchType::Append, batch)));
70-
let right = deletes.map(|res| res.map(|batch| (IncrementalBatchType::Delete, batch)));
71-
72-
Ok(Box::pin(select(left, right)) as CombinedIncrementalBatchRecordStream)
73-
}
74-
}
75-
76-
impl StreamsInto<ArrowReader, UnzippedIncrementalBatchRecordStream>
77-
for IncrementalFileScanTaskStream
78-
{
79-
/// Takes a stream of `IncrementalFileScanTasks` and reads all the files. Returns two
80-
/// separate streams of Arrow `RecordBatch`es containing appended data and deleted records.
81-
fn stream(self, reader: ArrowReader) -> Result<UnzippedIncrementalBatchRecordStream> {
82-
let (appends_tx, appends_rx) =
83-
channel::<Result<RecordBatch>>(reader.concurrency_limit_data_files);
84-
let (deletes_tx, deletes_rx) =
85-
channel::<Result<RecordBatch>>(reader.concurrency_limit_data_files);
86-
87-
let batch_size = reader.batch_size;
88-
89-
spawn(async move {
90-
let _ = self
91-
.try_for_each_concurrent(reader.concurrency_limit_data_files, |task| {
92-
let file_io = reader.file_io.clone();
93-
match task {
94-
IncrementalFileScanTask::Append(append_task) => {
95-
let appends_tx = appends_tx.clone();
96-
Box::pin(async move {
97-
spawn(async move {
98-
let record_batch_stream = process_incremental_append_task(
99-
append_task,
100-
batch_size,
101-
file_io,
102-
)
103-
.await;
104-
105-
process_record_batch_stream(
106-
record_batch_stream,
107-
appends_tx,
108-
"failed to read appended record batch",
109-
)
110-
.await;
111-
});
112-
Ok(())
113-
})
114-
as Pin<Box<dyn futures::Future<Output = Result<()>> + Send>>
115-
}
116-
IncrementalFileScanTask::Delete(deleted_file_task) => {
117-
let deletes_tx = deletes_tx.clone();
118-
Box::pin(async move {
119-
spawn(async move {
120-
let file_path = deleted_file_task.data_file_path().to_string();
121-
let total_records =
122-
deleted_file_task.base.record_count.unwrap_or(0);
123-
124-
let record_batch_stream = process_incremental_deleted_file_task(
125-
file_path,
126-
total_records,
127-
batch_size,
128-
);
129-
130-
process_record_batch_stream(
131-
record_batch_stream,
132-
deletes_tx,
133-
"failed to read deleted file record batch",
134-
)
135-
.await;
136-
});
137-
Ok(())
138-
})
139-
as Pin<Box<dyn futures::Future<Output = Result<()>> + Send>>
140-
}
141-
IncrementalFileScanTask::PositionalDeletes(file_path, delete_vector) => {
142-
let deletes_tx = deletes_tx.clone();
143-
Box::pin(async move {
144-
spawn(async move {
145-
let record_batch_stream = process_incremental_delete_task(
146-
file_path,
147-
delete_vector,
148-
batch_size,
149-
);
150-
151-
process_record_batch_stream(
152-
record_batch_stream,
153-
deletes_tx,
154-
"failed to read deleted record batch",
155-
)
156-
.await;
157-
});
158-
Ok(())
159-
})
160-
as Pin<Box<dyn futures::Future<Output = Result<()>> + Send>>
161-
}
162-
}
163-
})
164-
.await;
165-
});
166-
167-
Ok((
168-
Box::pin(appends_rx) as ArrowRecordBatchStream,
169-
Box::pin(deletes_rx) as ArrowRecordBatchStream,
170-
))
171-
}
172-
}
173-
17460
async fn process_incremental_append_task(
17561
task: AppendedFileScanTask,
17662
batch_size: Option<usize>,
@@ -330,3 +216,117 @@ fn process_incremental_deleted_file_task(
330216

331217
Ok(Box::pin(stream) as ArrowRecordBatchStream)
332218
}
219+
220+
impl StreamsInto<ArrowReader, CombinedIncrementalBatchRecordStream>
221+
for IncrementalFileScanTaskStreams
222+
{
223+
/// Takes separate streams of appended and deleted file scan tasks and reads all the files.
224+
/// Returns a combined stream of Arrow `RecordBatch`es containing the data from the files.
225+
fn stream(self, reader: ArrowReader) -> Result<CombinedIncrementalBatchRecordStream> {
226+
let (appends, deletes) =
227+
StreamsInto::<ArrowReader, UnzippedIncrementalBatchRecordStream>::stream(self, reader)?;
228+
229+
let left = appends.map(|res| res.map(|batch| (IncrementalBatchType::Append, batch)));
230+
let right = deletes.map(|res| res.map(|batch| (IncrementalBatchType::Delete, batch)));
231+
232+
Ok(Box::pin(select(left, right)) as CombinedIncrementalBatchRecordStream)
233+
}
234+
}
235+
236+
impl StreamsInto<ArrowReader, UnzippedIncrementalBatchRecordStream>
237+
for IncrementalFileScanTaskStreams
238+
{
239+
/// Takes separate streams of appended and deleted file scan tasks and reads all the files.
240+
/// Returns two separate streams of Arrow `RecordBatch`es containing appended data and deleted records.
241+
fn stream(self, reader: ArrowReader) -> Result<UnzippedIncrementalBatchRecordStream> {
242+
let (appends_tx, appends_rx) =
243+
channel::<Result<RecordBatch>>(reader.concurrency_limit_data_files);
244+
let (deletes_tx, deletes_rx) =
245+
channel::<Result<RecordBatch>>(reader.concurrency_limit_data_files);
246+
247+
let batch_size = reader.batch_size;
248+
249+
let (append_stream, delete_stream) = self;
250+
251+
// Process append tasks
252+
let file_io = reader.file_io.clone();
253+
spawn(async move {
254+
let _ = append_stream
255+
.try_for_each_concurrent(reader.concurrency_limit_data_files, |append_task| {
256+
let file_io = file_io.clone();
257+
let appends_tx = appends_tx.clone();
258+
async move {
259+
spawn(async move {
260+
let record_batch_stream =
261+
process_incremental_append_task(append_task, batch_size, file_io)
262+
.await;
263+
264+
process_record_batch_stream(
265+
record_batch_stream,
266+
appends_tx,
267+
"failed to read appended record batch",
268+
)
269+
.await;
270+
});
271+
Ok(())
272+
}
273+
})
274+
.await;
275+
});
276+
277+
// Process delete tasks
278+
spawn(async move {
279+
let _ = delete_stream
280+
.try_for_each_concurrent(reader.concurrency_limit_data_files, |delete_task| {
281+
let deletes_tx = deletes_tx.clone();
282+
async move {
283+
match delete_task {
284+
DeleteScanTask::DeletedFile(deleted_file_task) => {
285+
spawn(async move {
286+
let file_path = deleted_file_task.data_file_path().to_string();
287+
let total_records =
288+
deleted_file_task.base.record_count.unwrap_or(0);
289+
290+
let record_batch_stream = process_incremental_deleted_file_task(
291+
file_path,
292+
total_records,
293+
batch_size,
294+
);
295+
296+
process_record_batch_stream(
297+
record_batch_stream,
298+
deletes_tx,
299+
"failed to read deleted file record batch",
300+
)
301+
.await;
302+
});
303+
}
304+
DeleteScanTask::PositionalDeletes(file_path, delete_vector) => {
305+
spawn(async move {
306+
let record_batch_stream = process_incremental_delete_task(
307+
file_path,
308+
delete_vector,
309+
batch_size,
310+
);
311+
312+
process_record_batch_stream(
313+
record_batch_stream,
314+
deletes_tx,
315+
"failed to read deleted record batch",
316+
)
317+
.await;
318+
});
319+
}
320+
}
321+
Ok(())
322+
}
323+
})
324+
.await;
325+
});
326+
327+
Ok((
328+
Box::pin(appends_rx) as ArrowRecordBatchStream,
329+
Box::pin(deletes_rx) as ArrowRecordBatchStream,
330+
))
331+
}
332+
}

crates/iceberg/src/scan/incremental/mod.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,8 @@ impl IncrementalTableScan {
426426
}
427427

428428
/// Plans the files to be read in this incremental table scan.
429-
pub async fn plan_files(&self) -> Result<IncrementalFileScanTaskStream> {
429+
/// Returns separate streams for appended and deleted records.
430+
pub async fn plan_files(&self) -> Result<IncrementalFileScanTaskStreams> {
430431
let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
431432
let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
432433

@@ -584,21 +585,22 @@ impl IncrementalTableScan {
584585
}
585586
}
586587

587-
// Build final task list with net changes
588+
// Build final task lists with net changes
588589
// We filter out tasks for files that appear in both sets (cancelled out)
589-
let mut final_tasks: Vec<IncrementalFileScanTask> = Vec::new();
590+
let mut final_append_tasks: Vec<AppendedFileScanTask> = Vec::new();
591+
let mut final_delete_tasks: Vec<DeleteScanTask> = Vec::new();
590592

591593
// Add net append tasks (only files not in deleted_files)
592594
for append_task in append_tasks {
593595
if !deleted_files.contains(append_task.data_file_path()) {
594-
final_tasks.push(IncrementalFileScanTask::Append(append_task));
596+
final_append_tasks.push(append_task);
595597
}
596598
}
597599

598600
// Add net delete tasks (only files not in appended_files)
599601
for delete_task in delete_tasks {
600602
if !appended_files.contains(delete_task.data_file_path()) {
601-
final_tasks.push(IncrementalFileScanTask::Delete(delete_task));
603+
final_delete_tasks.push(DeleteScanTask::DeletedFile(delete_task));
602604
}
603605
}
604606

@@ -641,14 +643,13 @@ impl IncrementalTableScan {
641643
.with_source(e)
642644
})?;
643645

644-
let positional_delete_task =
645-
IncrementalFileScanTask::PositionalDeletes(path, delete_vector_inner);
646-
final_tasks.push(positional_delete_task);
646+
final_delete_tasks.push(DeleteScanTask::PositionalDeletes(path, delete_vector_inner));
647647
}
648648

649-
// We actually would not need a stream here, but we can keep it compatible with
650-
// other scan types.
651-
Ok(futures::stream::iter(final_tasks).map(Ok).boxed())
649+
let append_stream = futures::stream::iter(final_append_tasks).map(Ok).boxed();
650+
let delete_stream = futures::stream::iter(final_delete_tasks).map(Ok).boxed();
651+
652+
Ok((append_stream, delete_stream))
652653
}
653654

654655
/// Returns an [`CombinedIncrementalBatchRecordStream`] for this incremental table scan.

crates/iceberg/src/scan/incremental/task.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,21 @@ impl DeletedFileScanTask {
111111
}
112112
}
113113

114-
/// The stream of incremental file scan tasks.
115-
pub type IncrementalFileScanTaskStream = BoxStream<'static, Result<IncrementalFileScanTask>>;
114+
/// The streams of appended and deleted file scan tasks.
115+
pub type IncrementalFileScanTaskStreams = (
116+
BoxStream<'static, Result<AppendedFileScanTask>>,
117+
BoxStream<'static, Result<DeleteScanTask>>,
118+
);
119+
120+
/// A delete scan task, which can be a deleted data file or positional deletes.
121+
#[derive(Debug, Clone)]
122+
pub enum DeleteScanTask {
123+
/// A deleted data file.
124+
DeletedFile(DeletedFileScanTask),
125+
/// Positional deletes (deleted records of a data file). First argument is the file path,
126+
/// second the delete vector.
127+
PositionalDeletes(String, DeleteVector),
128+
}
116129

117130
/// An incremental file scan task, which can be an appended data file, deleted data file,
118131
/// or positional deletes.

0 commit comments

Comments
 (0)