Skip to content

feat(datafusion): Support insert_into in IcebergTableProvider #1511

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 36 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
a5593b4
Support Datafusion insert_into
CTTY Jun 26, 2025
558b402
cleanup
CTTY Jul 15, 2025
847a2bb
minor
CTTY Jul 15, 2025
b067656
minor
CTTY Jul 15, 2025
f52a698
clippy ftw
CTTY Jul 15, 2025
d367a7c
minor
CTTY Jul 16, 2025
99af430
minor
CTTY Jul 16, 2025
2f9efa8
i luv cleaning up
CTTY Jul 16, 2025
9d7c1c3
fmt not working?
CTTY Jul 16, 2025
41a75bd
Merge branch 'main' into ctty/df-insert
CTTY Jul 16, 2025
e25f888
do not expose serde
CTTY Jul 16, 2025
b554701
cut it down
CTTY Jul 16, 2025
77b349b
Use stricter wrapper data file wrapper
CTTY Jul 16, 2025
88afe82
fix partitioning, and fmt ofc
CTTY Jul 16, 2025
295e9b6
minor
CTTY Jul 17, 2025
92588f5
partitioned shall not pass
CTTY Jul 17, 2025
7db9432
implement children and with_new_children for write node, fix fmt
CTTY Jul 17, 2025
6bd624c
Merge branch 'main' into ctty/df-insert
CTTY Jul 17, 2025
8c78046
get row counts from data files directly
CTTY Jul 17, 2025
724ec7d
Update crates/integrations/datafusion/src/physical_plan/write.rs
CTTY Jul 21, 2025
2f56169
Update crates/integrations/datafusion/src/physical_plan/commit.rs
CTTY Jul 21, 2025
273a164
Merge branch 'main' into ctty/df-insert
CTTY Jul 21, 2025
53b8b82
fix fmt, input boundedness
CTTY Jul 21, 2025
d2168f2
make data_files constant
CTTY Jul 21, 2025
59a3428
use format version when serde datafiles
CTTY Jul 21, 2025
3b4dc9d
use try_new instead
CTTY Jul 21, 2025
2b1c3df
minor
CTTY Jul 21, 2025
db20df1
coalesce partitions
CTTY Jul 21, 2025
e56ab4e
minor
CTTY Jul 21, 2025
04a44b3
fmt
CTTY Jul 21, 2025
c5b1c38
rolling
CTTY Jul 22, 2025
0f9bce0
rolling in the deep
CTTY Jul 22, 2025
d8f05cf
rolls the unit tests
CTTY Jul 22, 2025
1ea4a0f
could have it all for tests
CTTY Jul 22, 2025
5001e07
Merge branch 'main' into ctty/df-insert
CTTY Jul 22, 2025
078f458
Merge branch 'main' into ctty/df-insert
CTTY Jul 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/iceberg/src/arrow/nan_val_cnt_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ impl NanValueCountVisitor {
let arrow_arr_partner_accessor = ArrowArrayAccessor {};

let struct_arr = Arc::new(StructArray::from(batch)) as ArrayRef;
// todo remove these log lines
println!("----StructArray from record stream: {:?}", struct_arr);
println!("----Schema.as_struct from table: {:?}", schema.as_struct());
Comment on lines +163 to +164
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use log here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for testing only, and I'm planning to remove these log lines

visit_struct_with_partner(
schema.as_struct(),
&struct_arr,
Expand Down
14 changes: 12 additions & 2 deletions crates/iceberg/src/arrow/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,10 +440,12 @@ impl PartnerAccessor<ArrayRef> for ArrowArrayAccessor {
Ok(schema_partner)
}

// todo generate field_pos in datafusion instead of passing to here
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found it tricky to handle this case: the input from datafusion won't have field id, and we will need to assign them manually. maybe there is a way to do name mapping here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you help me to understand why we need to change this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a temporary hack to an issue that I don't know how exactly to fix for now: the RecordBatch from Datafusion won't have PARQUET_FIELD_ID_META_KEY in its schema's metadata, causing the schema visiting to fail here.

I'm thinking maybe we can bound the schema in datafusion via name mapping, but have not got the chance to explore more

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to convert RecordBatch's schema to iceberg schema?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method you mentioned is typically used to convert parquet file's schema to iceberg schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is used when using ParquetWriter to write RecordBatch. When it's counting nan values, it will need to walk through both RecordBatch's schema and Iceberg schema in a partner fashion:

.compute(self.schema.clone(), batch_c)?;

Basically the call stack is NanValueCountVisitor::compute -> visit_struct_with_partner -> ArrowArrayAccessor::field_partner -> get_field_id

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaination, that makes sense to me. We need a separate issue to solve this.

fn field_partner<'a>(
&self,
struct_partner: &'a ArrayRef,
field: &NestedField,
field_pos: Option<usize>,
) -> Result<&'a ArrayRef> {
let struct_array = struct_partner
.as_any()
Expand All @@ -455,6 +457,14 @@ impl PartnerAccessor<ArrayRef> for ArrowArrayAccessor {
)
})?;

// todo remove unneeded log lines
println!(
"!!!Accessor struct array from struct partner: {:?}",
struct_array
);

println!("!!!field: {:?}", field);

let field_pos = struct_array
.fields()
.iter()
Expand All @@ -463,12 +473,12 @@ impl PartnerAccessor<ArrayRef> for ArrowArrayAccessor {
.map(|id| id == field.id)
.unwrap_or(false)
})
.ok_or_else(|| {
.unwrap_or(field_pos.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Field id {} not found in struct array", field.id),
)
})?;
})?);

Ok(struct_array.column(field_pos))
}
Expand Down
14 changes: 9 additions & 5 deletions crates/iceberg/src/spec/manifest/_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use serde_derive::{Deserialize, Serialize};
use serde_with::serde_as;

use super::{Datum, ManifestEntry, Schema, Struct};
use crate::spec::{Literal, RawLiteral, StructType, Type};
use crate::spec::{FormatVersion, Literal, RawLiteral, StructType, Type};
use crate::{Error, ErrorKind};

#[derive(Serialize, Deserialize)]
Expand All @@ -40,7 +40,7 @@ impl ManifestEntryV2 {
snapshot_id: value.snapshot_id,
sequence_number: value.sequence_number,
file_sequence_number: value.file_sequence_number,
data_file: DataFileSerde::try_from(value.data_file, partition_type, false)?,
data_file: DataFileSerde::try_from(value.data_file, partition_type, FormatVersion::V2)?,
})
}

Expand Down Expand Up @@ -74,7 +74,7 @@ impl ManifestEntryV1 {
Ok(Self {
status: value.status as i32,
snapshot_id: value.snapshot_id.unwrap_or_default(),
data_file: DataFileSerde::try_from(value.data_file, partition_type, true)?,
data_file: DataFileSerde::try_from(value.data_file, partition_type, FormatVersion::V1)?,
})
}

Expand Down Expand Up @@ -129,9 +129,13 @@ impl DataFileSerde {
pub fn try_from(
value: super::DataFile,
partition_type: &StructType,
is_version_1: bool,
format_version: FormatVersion,
) -> Result<Self, Error> {
let block_size_in_bytes = if is_version_1 { Some(0) } else { None };
let block_size_in_bytes = if format_version == FormatVersion::V1 {
Some(0)
} else {
None
};
Ok(Self {
content: value.content as i32,
file_path: value.file_path,
Expand Down
8 changes: 6 additions & 2 deletions crates/iceberg/src/spec/manifest/data_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,12 @@ pub fn write_data_files_to_avro<W: Write>(
let mut writer = AvroWriter::new(&avro_schema, writer);

for data_file in data_files {
let value = to_value(DataFileSerde::try_from(data_file, partition_type, true)?)?
.resolve(&avro_schema)?;
let value = to_value(DataFileSerde::try_from(
data_file,
partition_type,
FormatVersion::V1,
)?)?
.resolve(&avro_schema)?;
writer.append(value)?;
}

Expand Down
151 changes: 150 additions & 1 deletion crates/iceberg/src/spec/manifest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use super::{
Datum, FormatVersion, ManifestContentType, PartitionSpec, PrimitiveType, Schema, Struct,
UNASSIGNED_SEQUENCE_NUMBER,
};
use crate::error::Result;
use crate::error::{Error, ErrorKind, Result};

/// A manifest contains metadata and a list of entries.
#[derive(Debug, PartialEq, Eq, Clone)]
Expand Down Expand Up @@ -119,12 +119,45 @@ impl Manifest {
}
}

/// Serialize a DataFile to a JSON string.
pub fn serialize_data_file_to_json(
data_file: DataFile,
partition_type: &super::StructType,
format_version: FormatVersion,
) -> Result<String> {
let serde = _serde::DataFileSerde::try_from(data_file, partition_type, format_version)?;
serde_json::to_string(&serde).map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
format!("Failed to serialize DataFile to JSON: {}", e),
)
})
}

/// Deserialize a DataFile from a JSON string.
pub fn deserialize_data_file_from_json(
json: &str,
partition_spec_id: i32,
partition_type: &super::StructType,
schema: &Schema,
) -> Result<DataFile> {
let serde = serde_json::from_str::<_serde::DataFileSerde>(json).map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
format!("Failed to deserialize JSON to DataFile: {}", e),
)
})?;

serde.try_into(partition_spec_id, partition_type, schema)
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::fs;
use std::sync::Arc;

use arrow_array::StringArray;
use tempfile::TempDir;

use super::*;
Expand Down Expand Up @@ -1056,4 +1089,120 @@ mod tests {
assert!(!partitions[2].clone().contains_null);
assert_eq!(partitions[2].clone().contains_nan, Some(false));
}

#[test]
fn test_data_file_serialization() {
// Create a simple schema
let schema = Schema::builder()
.with_schema_id(1)
.with_identifier_field_ids(vec![1])
.with_fields(vec![
crate::spec::NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long))
.into(),
crate::spec::NestedField::required(
2,
"name",
Type::Primitive(PrimitiveType::String),
)
.into(),
])
.build()
.unwrap();

// Create a partition spec
let partition_spec = PartitionSpec::builder(schema.clone())
.with_spec_id(1)
.add_partition_field("id", "id_partition", crate::spec::Transform::Identity)
.unwrap()
.build()
.unwrap();

// Get partition type from the partition spec
let partition_type = partition_spec.partition_type(&schema).unwrap();

// Create a vector of DataFile objects
let data_files = vec![
DataFileBuilder::default()
.content(crate::spec::DataContentType::Data)
.file_format(DataFileFormat::Parquet)
.file_path("path/to/file1.parquet".to_string())
.file_size_in_bytes(1024)
.record_count(100)
.partition_spec_id(1)
.partition(Struct::empty())
.column_sizes(HashMap::from([(1, 512), (2, 512)]))
.value_counts(HashMap::from([(1, 100), (2, 100)]))
.null_value_counts(HashMap::from([(1, 0), (2, 0)]))
.build()
.unwrap(),
DataFileBuilder::default()
.content(crate::spec::DataContentType::Data)
.file_format(DataFileFormat::Parquet)
.file_path("path/to/file2.parquet".to_string())
.file_size_in_bytes(2048)
.record_count(200)
.partition_spec_id(1)
.partition(Struct::empty())
.column_sizes(HashMap::from([(1, 1024), (2, 1024)]))
.value_counts(HashMap::from([(1, 200), (2, 200)]))
.null_value_counts(HashMap::from([(1, 10), (2, 5)]))
.build()
.unwrap(),
];

// Serialize the DataFile objects
let serialized_files = data_files
.into_iter()
.map(|f| {
let json =
serialize_data_file_to_json(f, &partition_type, FormatVersion::V2).unwrap();
println!("Test serialized data file: {}", json);
json
})
.collect::<Vec<String>>();

// Verify we have the expected number of serialized files
assert_eq!(serialized_files.len(), 2);

// Verify each serialized file contains expected data
for json in &serialized_files {
assert!(json.contains("path/to/file"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why not assert the json output? We could use snapshot test to make it easier, see https://docs.rs/expect-test/latest/expect_test/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a snapshot test makes more sense

assert!(json.contains("parquet"));
assert!(json.contains("record_count"));
assert!(json.contains("file_size_in_bytes"));
}

// Convert Vec<String> to StringArray and print it
let string_array = StringArray::from(serialized_files.clone());
println!("StringArray: {:?}", string_array);

// Now deserialize the JSON strings back into DataFile objects
println!("\nDeserializing back to DataFile objects:");
let deserialized_files: Vec<DataFile> = serialized_files
.into_iter()
.map(|json| {
let data_file = deserialize_data_file_from_json(
&json,
partition_spec.spec_id(),
&partition_type,
&schema,
)
.unwrap();

println!("Deserialized DataFile: {:?}", data_file);
data_file
})
.collect();

// Verify we have the expected number of deserialized files
assert_eq!(deserialized_files.len(), 2);

// Verify the deserialized files have the expected properties
for file in &deserialized_files {
assert_eq!(file.content_type(), crate::spec::DataContentType::Data);
assert_eq!(file.file_format(), DataFileFormat::Parquet);
assert!(file.file_path().contains("path/to/file"));
assert!(file.record_count() == 100 || file.record_count() == 200);
}
}
}
11 changes: 8 additions & 3 deletions crates/iceberg/src/spec/schema/visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,12 @@ pub trait PartnerAccessor<P> {
/// Get the struct partner from schema partner.
fn struct_parner<'a>(&self, schema_partner: &'a P) -> Result<&'a P>;
/// Get the field partner from struct partner.
fn field_partner<'a>(&self, struct_partner: &'a P, field: &NestedField) -> Result<&'a P>;
fn field_partner<'a>(
&self,
struct_partner: &'a P,
field: &NestedField,
field_pos: Option<usize>,
) -> Result<&'a P>;
/// Get the list element partner from list partner.
fn list_element_partner<'a>(&self, list_partner: &'a P) -> Result<&'a P>;
/// Get the map key partner from map partner.
Expand Down Expand Up @@ -253,8 +258,8 @@ pub fn visit_struct_with_partner<P, V: SchemaWithPartnerVisitor<P>, A: PartnerAc
accessor: &A,
) -> Result<V::T> {
let mut results = Vec::with_capacity(s.fields().len());
for field in s.fields() {
let field_partner = accessor.field_partner(partner, field)?;
for (pos, field) in s.fields().iter().enumerate() {
let field_partner = accessor.field_partner(partner, field, Some(pos))?;
visitor.before_struct_field(field, field_partner)?;
let result = visit_type_with_partner(&field.field_type, field_partner, visitor, accessor)?;
visitor.after_struct_field(field, field_partner)?;
Expand Down
7 changes: 7 additions & 0 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeou
/// Default value for total maximum retry time (ms).
pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; // 30 minutes

/// Default file format for data files
pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default";
/// Default file format for delete files
pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = "write.delete.format.default";
/// Default value for data file format
pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet";

/// Reference to [`TableMetadata`].
pub type TableMetadataRef = Arc<TableMetadata>;

Expand Down
2 changes: 2 additions & 0 deletions crates/iceberg/src/writer/base_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@

pub mod data_file_writer;
pub mod equality_delete_writer;
/// Module providing writers that can automatically roll over to new files based on size thresholds.
pub mod rolling_writer;
Loading
Loading