diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 2829ad5825..e55e1791ae 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -37,10 +37,11 @@ mod tests; /// and in `TableChanges` when built with [`LogSegment::for_table_changes`]. /// /// [`Snapshot`]: crate::snapshot::Snapshot -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] pub(crate) struct LogSegment { pub end_version: Version, + pub checkpoint_version: Option, pub log_root: Url, /// Sorted commit files in the log segment (ascending) pub ascending_commit_files: Vec, @@ -49,12 +50,18 @@ pub(crate) struct LogSegment { } impl LogSegment { - fn try_new( - ascending_commit_files: Vec, + pub(crate) fn try_new( + mut ascending_commit_files: Vec, checkpoint_parts: Vec, log_root: Url, end_version: Option, ) -> DeltaResult { + // Commit file versions must be greater than the most recent checkpoint version if it exists + let checkpoint_version = checkpoint_parts.first().map(|checkpoint_file| { + ascending_commit_files.retain(|log_path| checkpoint_file.version < log_path.version); + checkpoint_file.version + }); + // We require that commits that are contiguous. In other words, there must be no gap between commit versions. require!( ascending_commit_files @@ -68,35 +75,37 @@ impl LogSegment { // There must be no gap between a checkpoint and the first commit version. Note that // that all checkpoint parts share the same version. - if let (Some(checkpoint_file), Some(commit_file)) = - (checkpoint_parts.first(), ascending_commit_files.first()) + if let (Some(checkpoint_version), Some(commit_file)) = + (checkpoint_version, ascending_commit_files.first()) { require!( - checkpoint_file.version + 1 == commit_file.version, + checkpoint_version + 1 == commit_file.version, Error::InvalidCheckpoint(format!( "Gap between checkpoint version {} and next commit {}", - checkpoint_file.version, commit_file.version, + checkpoint_version, commit_file.version, )) ) } // Get the effective version from chosen files - let version_eff = ascending_commit_files + let effective_version = ascending_commit_files .last() .or(checkpoint_parts.first()) .ok_or(Error::generic("No files in log segment"))? .version; if let Some(end_version) = end_version { require!( - version_eff == end_version, + effective_version == end_version, Error::generic(format!( "LogSegment end version {} not the same as the specified end version {}", - version_eff, end_version + effective_version, end_version )) ); } + Ok(LogSegment { - end_version: version_eff, + end_version: effective_version, + checkpoint_version, log_root, ascending_commit_files, checkpoint_parts, @@ -122,7 +131,7 @@ impl LogSegment { ) -> DeltaResult { let time_travel_version = time_travel_version.into(); - let (mut ascending_commit_files, checkpoint_parts) = + let (ascending_commit_files, checkpoint_parts) = match (checkpoint_hint.into(), time_travel_version) { (Some(cp), None) => list_log_files_with_checkpoint(&cp, storage, &log_root, None)?, (Some(cp), Some(end_version)) if cp.version <= end_version => { @@ -131,11 +140,6 @@ impl LogSegment { _ => list_log_files_with_version(storage, &log_root, None, time_travel_version)?, }; - // Commit file versions must be greater than the most recent checkpoint version if it exists - if let Some(checkpoint_file) = checkpoint_parts.first() { - ascending_commit_files.retain(|log_path| checkpoint_file.version < log_path.version); - } - LogSegment::try_new( ascending_commit_files, checkpoint_parts, @@ -360,8 +364,12 @@ impl LogSegment { )?)) } - // Get the most up-to-date Protocol and Metadata actions - pub(crate) fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult<(Metadata, Protocol)> { + // Do a lightweight protocol+metadata log replay to find the latest Protocol and Metadata in + // the LogSegment + pub(crate) fn protocol_and_metadata( + &self, + engine: &dyn Engine, + ) -> DeltaResult<(Option, Option)> { let data_batches = self.replay_for_metadata(engine)?; let (mut metadata_opt, mut protocol_opt) = (None, None); for batch in data_batches { @@ -377,7 +385,12 @@ impl LogSegment { break; } } - match (metadata_opt, protocol_opt) { + Ok((metadata_opt, protocol_opt)) + } + + // Get the most up-to-date Protocol and Metadata actions + pub(crate) fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult<(Metadata, Protocol)> { + match self.protocol_and_metadata(engine)? { (Some(m), Some(p)) => Ok((m, p)), (None, Some(_)) => Err(Error::MissingMetadata), (Some(_), None) => Err(Error::MissingProtocol), @@ -430,12 +443,16 @@ fn list_log_files( Err(_) => true, })) } + /// List all commit and checkpoint files with versions above the provided `start_version` (inclusive). /// If successful, this returns a tuple `(ascending_commit_files, checkpoint_parts)` of type /// `(Vec, Vec)`. The commit files are guaranteed to be sorted in /// ascending order by version. The elements of `checkpoint_parts` are all the parts of the same /// checkpoint. Checkpoint parts share the same version. -fn list_log_files_with_version( +// TODO: encode some of these guarantees in the output types. e.g. we could have: +// - SortedCommitFiles: Vec, is_ascending: bool, end_version: Version +// - CheckpointParts: Vec, checkpoint_version: Version (guarantee all same version) +pub(crate) fn list_log_files_with_version( storage: &dyn StorageHandler, log_root: &Url, start_version: Option, diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index b4e7882932..f94b7b7365 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -154,7 +154,7 @@ fn write_parquet_to_store( /// Writes all actions to a _delta_log parquet checkpoint file in the store. /// This function formats the provided filename into the _delta_log directory. -fn add_checkpoint_to_store( +pub(crate) fn add_checkpoint_to_store( store: &Arc, data: Box, filename: &str, diff --git a/kernel/src/path.rs b/kernel/src/path.rs index bfa3956e16..e2533d777e 100644 --- a/kernel/src/path.rs +++ b/kernel/src/path.rs @@ -15,7 +15,7 @@ const MULTIPART_PART_LEN: usize = 10; /// The number of characters in the uuid part of a uuid checkpoint const UUID_PART_LEN: usize = 36; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] #[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] enum LogPathFileType { @@ -38,7 +38,7 @@ enum LogPathFileType { Unknown, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] #[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] struct ParsedLogPath { diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 0077d6a973..8b0bc86fd6 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -7,7 +7,7 @@ use tracing::{debug, warn}; use url::Url; use crate::actions::{Metadata, Protocol}; -use crate::log_segment::LogSegment; +use crate::log_segment::{self, LogSegment}; use crate::scan::ScanBuilder; use crate::schema::{Schema, SchemaRef}; use crate::table_configuration::TableConfiguration; @@ -21,6 +21,7 @@ const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint"; /// throughout time, `Snapshot`s represent a view of a table at a specific point in time; they /// have a defined schema (which may change over time for any given table), specific version, and /// frozen log segment. +#[derive(PartialEq, Eq)] pub struct Snapshot { log_segment: LogSegment, table_configuration: TableConfiguration, @@ -43,13 +44,21 @@ impl std::fmt::Debug for Snapshot { } impl Snapshot { + fn new(log_segment: LogSegment, table_configuration: TableConfiguration) -> Self { + Self { + log_segment, + table_configuration, + } + } + /// Create a new [`Snapshot`] instance for the given version. /// /// # Parameters /// /// - `table_root`: url pointing at the table root (where `_delta_log` folder is located) /// - `engine`: Implementation of [`Engine`] apis. - /// - `version`: target version of the [`Snapshot`] + /// - `version`: target version of the [`Snapshot`]. None will create a snapshot at the latest + /// version of the table. pub fn try_new( table_root: Url, engine: &dyn Engine, @@ -67,6 +76,156 @@ impl Snapshot { Self::try_new_from_log_segment(table_root, log_segment, engine) } + /// Create a new [`Snapshot`] instance from an existing [`Snapshot`]. This is useful when you + /// already have a [`Snapshot`] lying around and want to do the minimal work to 'update' the + /// snapshot to a later version. + /// + /// We implement a simple heuristic: + /// 1. if the new version == existing version, just return the existing snapshot + /// 2. if the new version < existing version, error: there is no optimization to do here + /// 3. list from (existing checkpoint version + 1) onward (or just existing snapshot version if + /// no checkpoint) + /// 4. a. if new checkpoint is found: just create a new snapshot from that checkpoint (and + /// commits after it) + /// b. if no new checkpoint is found: do lightweight P+M replay on the latest commits (after + /// ensuring we only retain commits > any checkpoints) + /// + /// # Parameters + /// + /// - `existing_snapshot`: reference to an existing [`Snapshot`] + /// - `engine`: Implementation of [`Engine`] apis. + /// - `version`: target version of the [`Snapshot`]. None will create a snapshot at the latest + /// version of the table. + pub fn try_new_from( + existing_snapshot: Arc, + engine: &dyn Engine, + version: impl Into>, + ) -> DeltaResult> { + let old_log_segment = &existing_snapshot.log_segment; + let old_version = existing_snapshot.version(); + let new_version = version.into(); + if let Some(new_version) = new_version { + if new_version == old_version { + // Re-requesting the same version + return Ok(existing_snapshot.clone()); + } + if new_version < old_version { + // Hint is too new: error since this is effectively an incorrect optimization + return Err(Error::Generic(format!( + "Requested snapshot version {} is older than snapshot hint version {}", + new_version, old_version + ))); + } + } + + let log_root = old_log_segment.log_root.clone(); + let storage = engine.storage_handler(); + + // Start listing just after the previous segment's checkpoint, if any + let listing_start = old_log_segment.checkpoint_version.unwrap_or(0) + 1; + + // Check for new commits + let (new_ascending_commit_files, checkpoint_parts) = + log_segment::list_log_files_with_version( + storage.as_ref(), + &log_root, + Some(listing_start), + new_version, + )?; + + // NB: we need to check both checkpoints and commits since we filter commits at and below + // the checkpoint version. Example: if we have a checkpoint + commit at version 1, the log + // listing above will only return the checkpoint and not the commit. + if new_ascending_commit_files.is_empty() && checkpoint_parts.is_empty() { + match new_version { + Some(new_version) if new_version != old_version => { + // No new commits, but we are looking for a new version + return Err(Error::Generic(format!( + "Requested snapshot version {} is newer than the latest version {}", + new_version, old_version + ))); + } + _ => { + // No new commits, just return the same snapshot + return Ok(existing_snapshot.clone()); + } + } + } + + // create a log segment just from existing_checkpoint.version -> new_version + // OR could be from 1 -> new_version + let mut new_log_segment = LogSegment::try_new( + new_ascending_commit_files, + checkpoint_parts, + log_root.clone(), + new_version, + )?; + + let new_end_version = new_log_segment.end_version; + if new_end_version < old_version { + // we should never see a new log segment with a version < the existing snapshot + // version, that would mean a commit was incorrectly deleted from the log + return Err(Error::Generic(format!( + "Unexpected state: The newest version in the log {} is older than the old version {}", + new_end_version, old_version))); + } + if new_end_version == old_version { + // No new commits, just return the same snapshot + return Ok(existing_snapshot.clone()); + } + + if new_log_segment.checkpoint_version.is_some() { + // we have a checkpoint in the new LogSegment, just construct a new snapshot from that + let snapshot = Self::try_new_from_log_segment( + existing_snapshot.table_root().clone(), + new_log_segment, + engine, + ); + return Ok(Arc::new(snapshot?)); + } + + // after this point, we incrementally update the snapshot with the new log segment. + // first we remove the 'overlap' in commits, example: + // + // old logsegment checkpoint1-commit1-commit2-commit3 + // 1. new logsegment commit1-commit2-commit3 + // 2. new logsegment commit1-commit2-commit3-commit4 + // 3. new logsegment checkpoint2+commit2-commit3-commit4 + // + // retain does + // 1. new logsegment [empty] -> caught above + // 2. new logsegment [commit4] + // 3. new logsegment [checkpoint2-commit3] -> caught above + new_log_segment + .ascending_commit_files + .retain(|log_path| old_version < log_path.version); + + // we have new commits and no new checkpoint: we replay new commits for P+M and then + // create a new snapshot by combining LogSegments and building a new TableConfiguration + let (new_metadata, new_protocol) = new_log_segment.protocol_and_metadata(engine)?; + let table_configuration = TableConfiguration::try_new_from( + existing_snapshot.table_configuration(), + new_metadata, + new_protocol, + new_log_segment.end_version, + )?; + // NB: we must add the new log segment to the existing snapshot's log segment + let mut ascending_commit_files = old_log_segment.ascending_commit_files.clone(); + ascending_commit_files.extend(new_log_segment.ascending_commit_files); + // we can pass in just the old checkpoint parts since by the time we reach this line, we + // know there are no checkpoints in the new log segment. + let combined_log_segment = LogSegment::try_new( + ascending_commit_files, + old_log_segment.checkpoint_parts.clone(), + log_root, + new_version, + )?; + Ok(Arc::new(Snapshot::new( + combined_log_segment, + table_configuration, + ))) + } + /// Create a new [`Snapshot`] instance. pub(crate) fn try_new_from_log_segment( location: Url, @@ -201,11 +360,20 @@ mod tests { use object_store::memory::InMemory; use object_store::path::Path; use object_store::ObjectStore; + use serde_json::json; + + use crate::arrow::array::StringArray; + use crate::arrow::record_batch::RecordBatch; + use crate::parquet::arrow::ArrowWriter; + use crate::engine::arrow_data::ArrowEngineData; use crate::engine::default::executor::tokio::TokioBackgroundExecutor; use crate::engine::default::filesystem::ObjectStoreStorageHandler; + use crate::engine::default::DefaultEngine; use crate::engine::sync::SyncEngine; use crate::path::ParsedLogPath; + use crate::utils::test_utils::string_array_to_engine_data; + use test_utils::{add_commit, delta_path_for_version}; #[test] fn test_snapshot_read_metadata() { @@ -243,6 +411,211 @@ mod tests { assert_eq!(snapshot.schema(), expected); } + // interesting cases for testing Snapshot::new_from: + // 1. new version < existing version + // 2. new version == existing version + // 3. new version > existing version AND + // a. log segment hasn't changed + // b. log segment for old..=new version has a checkpoint (with new protocol/metadata) + // b. log segment for old..=new version has no checkpoint + // i. commits have (new protocol, new metadata) + // ii. commits have (new protocol, no metadata) + // iii. commits have (no protocol, new metadata) + // iv. commits have (no protocol, no metadata) + #[tokio::test] + async fn test_snapshot_new_from() -> DeltaResult<()> { + let path = + std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); + let url = url::Url::from_directory_path(path).unwrap(); + + let engine = SyncEngine::new(); + let old_snapshot = Arc::new(Snapshot::try_new(url.clone(), &engine, Some(1)).unwrap()); + // 1. new version < existing version: error + let snapshot_res = Snapshot::try_new_from(old_snapshot.clone(), &engine, Some(0)); + assert!(matches!( + snapshot_res, + Err(Error::Generic(msg)) if msg == "Requested snapshot version 0 is older than snapshot hint version 1" + )); + + // 2. new version == existing version + let snapshot = Snapshot::try_new_from(old_snapshot.clone(), &engine, Some(1)).unwrap(); + let expected = old_snapshot.clone(); + assert_eq!(snapshot, expected); + + // tests Snapshot::new_from by: + // 1. creating a snapshot with new API for commits 0..=2 (based on old snapshot at 0) + // 2. comparing with a snapshot created directly at version 2 + // + // the commits tested are: + // - commit 0 -> base snapshot at this version + // - commit 1 -> final snapshots at this version + // + // in each test we will modify versions 1 and 2 to test different scenarios + fn test_new_from(store: Arc) -> DeltaResult<()> { + let url = Url::parse("memory:///")?; + let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new())); + let base_snapshot = Arc::new(Snapshot::try_new(url.clone(), &engine, Some(0))?); + let snapshot = Snapshot::try_new_from(base_snapshot.clone(), &engine, Some(1))?; + let expected = Snapshot::try_new(url.clone(), &engine, Some(1))?; + assert_eq!(snapshot, expected.into()); + Ok(()) + } + + // TODO: unify this and lots of stuff in LogSegment tests and test_utils + async fn commit(store: &InMemory, version: Version, commit: Vec) { + let commit_data = commit + .iter() + .map(ToString::to_string) + .collect::>() + .join("\n"); + add_commit(store, version, commit_data).await.unwrap(); + } + + // for (3) we will just engineer custom log files + let store = Arc::new(InMemory::new()); + // everything will have a starting 0 commit with commitInfo, protocol, metadata + let commit0 = vec![ + json!({ + "commitInfo": { + "timestamp": 1587968586154i64, + "operation": "WRITE", + "operationParameters": {"mode":"ErrorIfExists","partitionBy":"[]"}, + "isBlindAppend":true + } + }), + json!({ + "protocol": { + "minReaderVersion": 1, + "minWriterVersion": 2 + } + }), + json!({ + "metaData": { + "id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9", + "format": { + "provider": "parquet", + "options": {} + }, + "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}", + "partitionColumns": [], + "configuration": {}, + "createdTime": 1587968585495i64 + } + }), + ]; + commit(store.as_ref(), 0, commit0.clone()).await; + // 3. new version > existing version + // a. no new log segment + let url = Url::parse("memory:///")?; + let engine = DefaultEngine::new( + Arc::new(store.fork()), + Arc::new(TokioBackgroundExecutor::new()), + ); + let base_snapshot = Arc::new(Snapshot::try_new(url.clone(), &engine, Some(0))?); + let snapshot = Snapshot::try_new_from(base_snapshot.clone(), &engine, None)?; + let expected = Snapshot::try_new(url.clone(), &engine, Some(0))?; + assert_eq!(snapshot, expected.into()); + // version exceeds latest version of the table = err + assert!(matches!( + Snapshot::try_new_from(base_snapshot.clone(), &engine, Some(1)), + Err(Error::Generic(msg)) if msg == "Requested snapshot version 1 is newer than the latest version 0" + )); + + // b. log segment for old..=new version has a checkpoint (with new protocol/metadata) + let store_3a = store.fork(); + let mut checkpoint1 = commit0.clone(); + commit(&store_3a, 1, commit0.clone()).await; + checkpoint1[1] = json!({ + "protocol": { + "minReaderVersion": 2, + "minWriterVersion": 5 + } + }); + checkpoint1[2]["partitionColumns"] = serde_json::to_value(["some_partition_column"])?; + + let handler = engine.json_handler(); + let json_strings: StringArray = checkpoint1 + .into_iter() + .map(|json| json.to_string()) + .collect::>() + .into(); + let parsed = handler + .parse_json( + string_array_to_engine_data(json_strings), + crate::actions::get_log_schema().clone(), + ) + .unwrap(); + let checkpoint = ArrowEngineData::try_from_engine_data(parsed).unwrap(); + let checkpoint: RecordBatch = checkpoint.into(); + + // Write the record batch to a Parquet file + let mut buffer = vec![]; + let mut writer = ArrowWriter::try_new(&mut buffer, checkpoint.schema(), None)?; + writer.write(&checkpoint)?; + writer.close()?; + + store + .put( + &delta_path_for_version(1, "checkpoint.parquet"), + buffer.into(), + ) + .await + .unwrap(); + test_new_from(store_3a.into())?; + + // c. log segment for old..=new version has no checkpoint + // i. commits have (new protocol, new metadata) + let store_3c_i = Arc::new(store.fork()); + let mut commit1 = commit0.clone(); + commit1[1] = json!({ + "protocol": { + "minReaderVersion": 2, + "minWriterVersion": 5 + } + }); + commit1[2]["partitionColumns"] = serde_json::to_value(["some_partition_column"])?; + commit(store_3c_i.as_ref(), 1, commit1).await; + test_new_from(store_3c_i.clone())?; + + // new commits AND request version > end of log + let url = Url::parse("memory:///")?; + let engine = DefaultEngine::new(store_3c_i, Arc::new(TokioBackgroundExecutor::new())); + let base_snapshot = Arc::new(Snapshot::try_new(url.clone(), &engine, Some(0))?); + assert!(matches!( + Snapshot::try_new_from(base_snapshot.clone(), &engine, Some(2)), + Err(Error::Generic(msg)) if msg == "LogSegment end version 1 not the same as the specified end version 2" + )); + + // ii. commits have (new protocol, no metadata) + let store_3c_ii = store.fork(); + let mut commit1 = commit0.clone(); + commit1[1] = json!({ + "protocol": { + "minReaderVersion": 2, + "minWriterVersion": 5 + } + }); + commit1.remove(2); // remove metadata + commit(&store_3c_ii, 1, commit1).await; + test_new_from(store_3c_ii.into())?; + + // iii. commits have (no protocol, new metadata) + let store_3c_iii = store.fork(); + let mut commit1 = commit0.clone(); + commit1[2]["partitionColumns"] = serde_json::to_value(["some_partition_column"])?; + commit1.remove(1); // remove protocol + commit(&store_3c_iii, 1, commit1).await; + test_new_from(store_3c_iii.into())?; + + // iv. commits have (no protocol, no metadata) + let store_3c_iv = store.fork(); + let commit1 = vec![commit0[0].clone()]; + commit(&store_3c_iv, 1, commit1).await; + test_new_from(store_3c_iv.into())?; + + Ok(()) + } + #[test] fn test_read_table_with_last_checkpoint() { let path = std::fs::canonicalize(PathBuf::from( diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index 2236fd5cb0..b11d6a87dd 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -111,7 +111,7 @@ static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| { pub struct TableChanges { pub(crate) log_segment: LogSegment, table_root: Url, - end_snapshot: Snapshot, + end_snapshot: Arc, start_version: Version, schema: Schema, } @@ -149,9 +149,12 @@ impl TableChanges { // Both snapshots ensure that reading is supported at the start and end version using // `ensure_read_supported`. Note that we must still verify that reading is // supported for every protocol action in the CDF range. - let start_snapshot = - Snapshot::try_new(table_root.as_url().clone(), engine, Some(start_version))?; - let end_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, end_version)?; + let start_snapshot = Arc::new(Snapshot::try_new( + table_root.as_url().clone(), + engine, + Some(start_version), + )?); + let end_snapshot = Snapshot::try_new_from(start_snapshot.clone(), engine, end_version)?; // Verify CDF is enabled at the beginning and end of the interval using // [`check_cdf_table_properties`] to fail early. This also ensures that column mapping is diff --git a/kernel/src/table_configuration.rs b/kernel/src/table_configuration.rs index 1bbada86dc..6392c4b5f7 100644 --- a/kernel/src/table_configuration.rs +++ b/kernel/src/table_configuration.rs @@ -33,7 +33,7 @@ use crate::{DeltaResult, Error, Version}; /// `try_new` successfully returns `TableConfiguration`, it is also guaranteed that reading the /// table is supported. #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq, Eq)] pub(crate) struct TableConfiguration { metadata: Metadata, protocol: Protocol, @@ -89,6 +89,31 @@ impl TableConfiguration { }) } + pub(crate) fn try_new_from( + table_configuration: &Self, + new_metadata: Option, + new_protocol: Option, + new_version: Version, + ) -> DeltaResult { + // simplest case: no new P/M, just return the existing table configuration with new version + if new_metadata.is_none() && new_protocol.is_none() { + return Ok(Self { + version: new_version, + ..table_configuration.clone() + }); + } + + // note that while we could pick apart the protocol/metadata updates and validate them + // individually, instead we just re-parse so that we can recycle the try_new validation + // (instead of duplicating it here). + Self::try_new( + new_metadata.unwrap_or_else(|| table_configuration.metadata.clone()), + new_protocol.unwrap_or_else(|| table_configuration.protocol.clone()), + table_configuration.table_root.clone(), + new_version, + ) + } + /// The [`Metadata`] for this table at this version. #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] pub(crate) fn metadata(&self) -> &Metadata { @@ -248,6 +273,7 @@ mod test { use crate::actions::{Metadata, Protocol}; use crate::table_features::{ReaderFeature, WriterFeature}; + use crate::table_properties::TableProperties; use super::TableConfiguration; @@ -332,4 +358,78 @@ mod test { assert!(!table_config.is_deletion_vector_supported()); assert!(!table_config.is_deletion_vector_enabled()); } + + #[test] + fn test_try_new_from() { + let schema_string =r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#.to_string(); + let metadata = Metadata { + configuration: HashMap::from_iter([( + "delta.enableChangeDataFeed".to_string(), + "true".to_string(), + )]), + schema_string: schema_string.clone(), + ..Default::default() + }; + let protocol = Protocol::try_new( + 3, + 7, + Some([ReaderFeature::DeletionVectors]), + Some([WriterFeature::DeletionVectors]), + ) + .unwrap(); + let table_root = Url::try_from("file:///").unwrap(); + let table_config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap(); + + let new_metadata = Metadata { + configuration: HashMap::from_iter([ + ( + "delta.enableChangeDataFeed".to_string(), + "false".to_string(), + ), + ( + "delta.enableDeletionVectors".to_string(), + "true".to_string(), + ), + ]), + schema_string, + ..Default::default() + }; + let new_protocol = Protocol::try_new( + 3, + 7, + Some([ReaderFeature::DeletionVectors, ReaderFeature::V2Checkpoint]), + Some([ + WriterFeature::DeletionVectors, + WriterFeature::V2Checkpoint, + WriterFeature::AppendOnly, + ]), + ) + .unwrap(); + let new_version = 1; + let new_table_config = TableConfiguration::try_new_from( + &table_config, + Some(new_metadata.clone()), + Some(new_protocol.clone()), + new_version, + ) + .unwrap(); + + assert_eq!(new_table_config.version(), new_version); + assert_eq!(new_table_config.metadata(), &new_metadata); + assert_eq!(new_table_config.protocol(), &new_protocol); + assert_eq!(new_table_config.schema(), table_config.schema()); + assert_eq!( + new_table_config.table_properties(), + &TableProperties { + enable_change_data_feed: Some(false), + enable_deletion_vectors: Some(true), + ..Default::default() + } + ); + assert_eq!( + new_table_config.column_mapping_mode(), + table_config.column_mapping_mode() + ); + assert_eq!(new_table_config.table_root(), table_config.table_root()); + } }