From 5b52f2a58df6bdd2ecbc8de4892bcbef32d4d883 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Fri, 11 Apr 2025 15:55:14 -0400 Subject: [PATCH 1/3] FIX: SourceData schema, not a row schema --- src/storage-operators/src/persist_source.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/storage-operators/src/persist_source.rs b/src/storage-operators/src/persist_source.rs index d45289ac7ab54..e2450db591edf 100644 --- a/src/storage-operators/src/persist_source.rs +++ b/src/storage-operators/src/persist_source.rs @@ -403,10 +403,10 @@ fn filter_result( let may_keep = result.may_contain(Datum::True); let may_skip = result.may_contain(Datum::False) || result.may_contain(Datum::Null); if relation_desc.len() == 0 && !may_error && !may_skip { - let Ok(mut key) = >::encoder(relation_desc) else { + let Ok(mut key) = >::encoder(relation_desc) else { return FilterResult::Keep; }; - key.append(&Row::default()); + key.append(&SourceData(Ok(Row::default()))); let key = key.finish(); let Ok(mut val) = >::encoder(&UnitSchema) else { return FilterResult::Keep; From 969748719539a599d1ddac6cdca995e8f926b8c1 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Fri, 11 Apr 2025 16:54:20 -0400 Subject: [PATCH 2/3] Handle data without a schema id more gracefully We don't actually need a schema id here, since we have the data itself... using the id to cache the migration is just an optimization. --- src/persist-client/src/fetch.rs | 21 +++++++++++++++++---- src/persist-client/src/schema.rs | 11 +++++------ 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/src/persist-client/src/fetch.rs b/src/persist-client/src/fetch.rs index 458669143fcbf..8c7115f74747e 100644 --- a/src/persist-client/src/fetch.rs +++ b/src/persist-client/src/fetch.rs @@ -31,8 +31,9 @@ use mz_persist::indexed::encoding::{BlobTraceBatchPart, BlobTraceUpdates}; use mz_persist::location::{Blob, SeqNo}; use mz_persist::metrics::ColumnarMetrics; use mz_persist_types::arrow::ArrayOrd; -use mz_persist_types::columnar::{ColumnDecoder, Schema}; +use mz_persist_types::columnar::{ColumnDecoder, Schema, data_type}; use mz_persist_types::part::Codec64Mut; +use mz_persist_types::schema::backward_compatible; use mz_persist_types::stats::PartStats; use mz_persist_types::{Codec, Codec64}; use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError}; @@ -818,7 +819,7 @@ impl FetchedPart metrics.schema.migration_count_same.inc(), - PartMigration::Codec { .. } => { + PartMigration::Schemaless { .. } => { metrics.schema.migration_count_codec.inc(); metrics.schema.migration_len_legacy_codec.inc_by(part_len); } @@ -852,8 +853,20 @@ impl FetchedPart structured, - PartMigration::Codec { .. } => { - return None; + PartMigration::Schemaless { read } => { + // We don't know the source schema, but we do know the source datatype; migrate it directly. + let start = Instant::now(); + let read_key = data_type::(&*read.key).ok()?; + let read_val = data_type::(&*read.val).ok()?; + let key_migration = backward_compatible(structured.key.data_type(), &read_key)?; + let val_migration = backward_compatible(structured.val.data_type(), &read_val)?; + let key = key_migration.migrate(structured.key); + let val = val_migration.migrate(structured.val); + metrics + .schema + .migration_migrate_seconds + .inc_by(start.elapsed().as_secs_f64()); + ColumnarRecordsStructuredExt { key, val } } PartMigration::Either { write: _, diff --git a/src/persist-client/src/schema.rs b/src/persist-client/src/schema.rs index 9fe23ac691ad6..ea25394387b72 100644 --- a/src/persist-client/src/schema.rs +++ b/src/persist-client/src/schema.rs @@ -299,9 +299,8 @@ impl MigrationCacheMap { pub(crate) enum PartMigration { /// No-op! SameSchema { both: Schemas }, - /// This part predates writing down schema ids, so we have to decode and - /// potentially migrate it to the target schema via the legacy Codec path. - Codec { read: Schemas }, + /// We don't have a schema id for write schema. + Schemaless { read: Schemas }, /// We have both write and read schemas, and they don't match. Either { write: Schemas, @@ -315,7 +314,7 @@ impl Clone for PartMigration { fn clone(&self) -> Self { match self { Self::SameSchema { both } => Self::SameSchema { both: both.clone() }, - Self::Codec { read } => Self::Codec { read: read.clone() }, + Self::Schemaless { read } => Self::Schemaless { read: read.clone() }, Self::Either { write, read, @@ -346,7 +345,7 @@ where D: Semigroup + Codec64, { match (write, read.id) { - (None, _) => Ok(PartMigration::Codec { read }), + (None, _) => Ok(PartMigration::Schemaless { read }), (Some(w), Some(r)) if w == r => Ok(PartMigration::SameSchema { both: read }), (Some(w), _) => { let write = schema_cache @@ -396,7 +395,7 @@ impl PartMigration { pub(crate) fn codec_read(&self) -> &Schemas { match self { PartMigration::SameSchema { both } => both, - PartMigration::Codec { read } => read, + PartMigration::Schemaless { read } => read, PartMigration::Either { read, .. } => read, } } From 735098b3538d2ad16e0fc4b94ab0a4b8a68440f1 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Mon, 14 Apr 2025 20:23:48 -0400 Subject: [PATCH 3/3] Only hit the fallback path if we have only structured data Avoid changing behaviour for older parts... probably fine but you never know. --- src/persist-client/src/fetch.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/persist-client/src/fetch.rs b/src/persist-client/src/fetch.rs index 8c7115f74747e..f0d7ee7cbe8ca 100644 --- a/src/persist-client/src/fetch.rs +++ b/src/persist-client/src/fetch.rs @@ -848,12 +848,13 @@ impl FetchedPart structured, - PartMigration::Schemaless { read } => { + PartMigration::Schemaless { read } if structured_only => { // We don't know the source schema, but we do know the source datatype; migrate it directly. let start = Instant::now(); let read_key = data_type::(&*read.key).ok()?; @@ -868,6 +869,7 @@ impl FetchedPart return None, PartMigration::Either { write: _, read: _, @@ -915,7 +917,7 @@ impl FetchedPart EitherOrBoth::Right( // The structured-only data format was added after schema ids were recorded everywhere, // so we expect this data to be present. - downcast_structured(key_values).expect("valid schemas for structured data"), + downcast_structured(key_values, true).expect("valid schemas for structured data"), ), // If both are available, respect the specified part decode format. BlobTraceUpdates::Both(records, ext) => match part_decode_format { @@ -924,11 +926,11 @@ impl FetchedPart EitherOrBoth::Left(records), PartDecodeFormat::Row { validate_structured: true, - } => match downcast_structured(ext) { + } => match downcast_structured(ext, false) { Some(decoders) => EitherOrBoth::Both(records, decoders), None => EitherOrBoth::Left(records), }, - PartDecodeFormat::Arrow => match downcast_structured(ext) { + PartDecodeFormat::Arrow => match downcast_structured(ext, false) { Some(decoders) => EitherOrBoth::Right(decoders), None => EitherOrBoth::Left(records), },