Skip to content

[persist] Fix a bug in the projection optimization refactoring #32178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions src/persist-client/src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ use mz_persist::indexed::encoding::{BlobTraceBatchPart, BlobTraceUpdates};
use mz_persist::location::{Blob, SeqNo};
use mz_persist::metrics::ColumnarMetrics;
use mz_persist_types::arrow::ArrayOrd;
use mz_persist_types::columnar::{ColumnDecoder, Schema};
use mz_persist_types::columnar::{ColumnDecoder, Schema, data_type};
use mz_persist_types::part::Codec64Mut;
use mz_persist_types::schema::backward_compatible;
use mz_persist_types::stats::PartStats;
use mz_persist_types::{Codec, Codec64};
use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
Expand Down Expand Up @@ -818,7 +819,7 @@ impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedPart<K, V,
let part_len = u64::cast_from(part.part.updates.len());
match &migration {
PartMigration::SameSchema { .. } => metrics.schema.migration_count_same.inc(),
PartMigration::Codec { .. } => {
PartMigration::Schemaless { .. } => {
metrics.schema.migration_count_codec.inc();
metrics.schema.migration_len_legacy_codec.inc_by(part_len);
}
Expand Down Expand Up @@ -847,14 +848,28 @@ impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedPart<K, V,
None
};

let downcast_structured = |structured: ColumnarRecordsStructuredExt| {
let downcast_structured = |structured: ColumnarRecordsStructuredExt,
structured_only: bool| {
let key_size_before = ArrayOrd::new(&structured.key).goodbytes();

let structured = match &migration {
PartMigration::SameSchema { .. } => structured,
PartMigration::Codec { .. } => {
return None;
PartMigration::Schemaless { read } if structured_only => {
// We don't know the source schema, but we do know the source datatype; migrate it directly.
let start = Instant::now();
let read_key = data_type::<K>(&*read.key).ok()?;
let read_val = data_type::<V>(&*read.val).ok()?;
let key_migration = backward_compatible(structured.key.data_type(), &read_key)?;
let val_migration = backward_compatible(structured.val.data_type(), &read_val)?;
let key = key_migration.migrate(structured.key);
let val = val_migration.migrate(structured.val);
metrics
.schema
.migration_migrate_seconds
.inc_by(start.elapsed().as_secs_f64());
ColumnarRecordsStructuredExt { key, val }
}
PartMigration::Schemaless { .. } => return None,
PartMigration::Either {
write: _,
read: _,
Expand Down Expand Up @@ -902,7 +917,7 @@ impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedPart<K, V,
BlobTraceUpdates::Structured { key_values, .. } => EitherOrBoth::Right(
// The structured-only data format was added after schema ids were recorded everywhere,
// so we expect this data to be present.
downcast_structured(key_values).expect("valid schemas for structured data"),
downcast_structured(key_values, true).expect("valid schemas for structured data"),
),
// If both are available, respect the specified part decode format.
BlobTraceUpdates::Both(records, ext) => match part_decode_format {
Expand All @@ -911,11 +926,11 @@ impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedPart<K, V,
} => EitherOrBoth::Left(records),
PartDecodeFormat::Row {
validate_structured: true,
} => match downcast_structured(ext) {
} => match downcast_structured(ext, false) {
Some(decoders) => EitherOrBoth::Both(records, decoders),
None => EitherOrBoth::Left(records),
},
PartDecodeFormat::Arrow => match downcast_structured(ext) {
PartDecodeFormat::Arrow => match downcast_structured(ext, false) {
Some(decoders) => EitherOrBoth::Right(decoders),
None => EitherOrBoth::Left(records),
},
Expand Down
11 changes: 5 additions & 6 deletions src/persist-client/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,8 @@ impl MigrationCacheMap {
pub(crate) enum PartMigration<K: Codec, V: Codec> {
/// No-op!
SameSchema { both: Schemas<K, V> },
/// This part predates writing down schema ids, so we have to decode and
/// potentially migrate it to the target schema via the legacy Codec path.
Codec { read: Schemas<K, V> },
/// We don't have a schema id for write schema.
Schemaless { read: Schemas<K, V> },
/// We have both write and read schemas, and they don't match.
Either {
write: Schemas<K, V>,
Expand All @@ -315,7 +314,7 @@ impl<K: Codec, V: Codec> Clone for PartMigration<K, V> {
fn clone(&self) -> Self {
match self {
Self::SameSchema { both } => Self::SameSchema { both: both.clone() },
Self::Codec { read } => Self::Codec { read: read.clone() },
Self::Schemaless { read } => Self::Schemaless { read: read.clone() },
Self::Either {
write,
read,
Expand Down Expand Up @@ -346,7 +345,7 @@ where
D: Semigroup + Codec64,
{
match (write, read.id) {
(None, _) => Ok(PartMigration::Codec { read }),
(None, _) => Ok(PartMigration::Schemaless { read }),
(Some(w), Some(r)) if w == r => Ok(PartMigration::SameSchema { both: read }),
(Some(w), _) => {
let write = schema_cache
Expand Down Expand Up @@ -396,7 +395,7 @@ impl<K: Codec, V: Codec> PartMigration<K, V> {
pub(crate) fn codec_read(&self) -> &Schemas<K, V> {
match self {
PartMigration::SameSchema { both } => both,
PartMigration::Codec { read } => read,
PartMigration::Schemaless { read } => read,
PartMigration::Either { read, .. } => read,
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/storage-operators/src/persist_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,10 @@ fn filter_result(
let may_keep = result.may_contain(Datum::True);
let may_skip = result.may_contain(Datum::False) || result.may_contain(Datum::Null);
if relation_desc.len() == 0 && !may_error && !may_skip {
let Ok(mut key) = <RelationDesc as Schema<Row>>::encoder(relation_desc) else {
let Ok(mut key) = <RelationDesc as Schema<SourceData>>::encoder(relation_desc) else {
return FilterResult::Keep;
};
key.append(&Row::default());
key.append(&SourceData(Ok(Row::default())));
let key = key.finish();
let Ok(mut val) = <UnitSchema as Schema<()>>::encoder(&UnitSchema) else {
return FilterResult::Keep;
Expand Down