diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py
index e5b644aaa5adb..b9217bb2882ac 100644
--- a/misc/python/materialize/parallel_workload/action.py
+++ b/misc/python/materialize/parallel_workload/action.py
@@ -1050,8 +1050,11 @@ def __init__(
             BOOLEAN_FLAG_VALUES
         )
         self.flags_with_values["enable_eager_delta_joins"] = BOOLEAN_FLAG_VALUES
-        self.flags_with_values["persist_batch_columnar_format"] = ["row", "both_v2"]
-        self.flags_with_values["persist_record_schema_id"] = BOOLEAN_FLAG_VALUES
+        self.flags_with_values["persist_batch_columnar_format"] = [
+            "row",
+            "both_v2",
+            "structured",
+        ]
         self.flags_with_values["persist_batch_structured_order"] = BOOLEAN_FLAG_VALUES
         self.flags_with_values["persist_batch_builder_structured"] = BOOLEAN_FLAG_VALUES
         self.flags_with_values["persist_batch_structured_key_lower_len"] = [
diff --git a/src/persist-client/src/batch.rs b/src/persist-client/src/batch.rs
index 01b31cc1aa496..978f3dd715040 100644
--- a/src/persist-client/src/batch.rs
+++ b/src/persist-client/src/batch.rs
@@ -688,13 +688,7 @@ where
                     .codecs
                     .val
                     .encode(|| V::encode(val, &mut self.val_buf));
-                validate_schema(
-                    &self.builder.write_schemas,
-                    &self.key_buf,
-                    &self.val_buf,
-                    Some(key),
-                    Some(val),
-                );
+                validate_schema(&self.builder.write_schemas, key, val);
 
                 let update = (
                     (self.key_buf.as_slice(), self.val_buf.as_slice()),
@@ -881,32 +875,18 @@ where
 // inline it at the two callers.
 pub(crate) fn validate_schema<K: Codec, V: Codec>(
     stats_schemas: &Schemas<K, V>,
-    key: &[u8],
-    val: &[u8],
-    decoded_key: Option<&K>,
-    decoded_val: Option<&V>,
+    decoded_key: &K,
+    decoded_val: &V,
 ) {
     // Attempt to catch any bad schema usage in CI. This is probably too
     // expensive to run in prod.
     if !mz_ore::assert::SOFT_ASSERTIONS.load(Ordering::Relaxed) {
         return;
     }
-    let key_valid = match decoded_key {
-        Some(key) => K::validate(key, &stats_schemas.key),
-        None => {
-            let key = K::decode(key, &stats_schemas.key).expect("valid encoded key");
-            K::validate(&key, &stats_schemas.key)
-        }
-    };
+    let key_valid = K::validate(decoded_key, &stats_schemas.key);
     let () = key_valid
         .unwrap_or_else(|err| panic!("constructing batch with mismatched key schema: {}", err));
-    let val_valid = match decoded_val {
-        Some(val) => V::validate(val, &stats_schemas.val),
-        None => {
-            let val = V::decode(val, &stats_schemas.val).expect("valid encoded val");
-            V::validate(&val, &stats_schemas.val)
-        }
-    };
+    let val_valid = V::validate(decoded_val, &stats_schemas.val);
     let () = val_valid
         .unwrap_or_else(|err| panic!("constructing batch with mismatched val schema: {}", err));
 }
@@ -1135,6 +1115,7 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
             BatchColumnarFormat::Both(_) => {
                 self.cfg.inline_writes_single_max_bytes.saturating_div(2)
             }
+            BatchColumnarFormat::Structured => self.cfg.inline_writes_single_max_bytes,
         };
 
         let (name, write_future) = if updates.goodbytes() < inline_threshold {
diff --git a/src/persist-client/src/cli/inspect.rs b/src/persist-client/src/cli/inspect.rs
index 29c0d3139463a..d0a1fd480b4eb 100644
--- a/src/persist-client/src/cli/inspect.rs
+++ b/src/persist-client/src/cli/inspect.rs
@@ -37,7 +37,7 @@ use crate::async_runtime::IsolatedRuntime;
 use crate::cache::StateCache;
 use crate::cli::args::{make_blob, make_consensus, StateArgs, NO_COMMIT, READ_ALL_BUILD_INFO};
 use crate::error::CodecConcreteType;
-use crate::fetch::{Cursor, EncodedPart};
+use crate::fetch::EncodedPart;
 use crate::internal::encoding::{Rollup, UntypedState};
 use crate::internal::paths::{
     BlobKey, BlobKeyPrefix, PartialBatchKey, PartialBlobKey, PartialRollupKey, WriterKey,
@@ -361,15 +361,19 @@ pub async fn blob_batch_part(
         desc,
         updates: Vec::new(),
     };
-    let mut cursor = Cursor::default();
-    while let Some(((k, v, t, d), _)) = cursor.pop(&encoded_part) {
+    let records = encoded_part.normalize(&metrics.columnar);
+    for ((k, v), t, d) in records
+        .records()
+        .expect("only implemented for records")
+        .iter()
+    {
         if out.updates.len() > limit {
             break;
         }
         out.updates.push(BatchPartUpdate {
             k: format!("{:?}", PrettyBytes(k)),
             v: format!("{:?}", PrettyBytes(v)),
-            t,
+            t: u64::from_le_bytes(t),
             d: i64::from_le_bytes(d),
         });
     }
@@ -408,8 +412,9 @@ async fn consolidated_size(args: &StateArgs) -> Result<(), anyhow::Error> {
             )
             .await
             .expect("part exists");
-            let mut cursor = Cursor::default();
-            while let Some(((k, v, mut t, d), _)) = cursor.pop(&encoded_part) {
+            let part = encoded_part.normalize(&state_versions.metrics.columnar);
+            for ((k, v), t, d) in part.records().expect("codec records").iter() {
+                let mut t = <u64 as Codec64>::decode(t);
                 t.advance_by(as_of);
                 let d = <i64 as Codec64>::decode(d);
                 updates.push(((k.to_owned(), v.to_owned()), t, d));
diff --git a/src/persist-client/src/fetch.rs b/src/persist-client/src/fetch.rs
index a969327219c41..007480bb7da4e 100644
--- a/src/persist-client/src/fetch.rs
+++ b/src/persist-client/src/fetch.rs
@@ -15,11 +15,12 @@ use std::sync::Arc;
 use std::time::Instant;
 
 use anyhow::anyhow;
-use arrow::array::{Array, AsArray, BooleanArray};
+use arrow::array::{Array, AsArray, BooleanArray, Int64Array};
 use arrow::compute::FilterBuilder;
 use differential_dataflow::difference::Semigroup;
 use differential_dataflow::lattice::Lattice;
 use differential_dataflow::trace::Description;
+use itertools::EitherOrBoth;
 use mz_dyncfg::{Config, ConfigSet, ConfigValHandle};
 use mz_ore::bytes::SegmentedBytes;
 use mz_ore::cast::CastFrom;
@@ -722,17 +723,21 @@ impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedBlob<K, V,
 pub struct FetchedPart<K: Codec, V: Codec, T, D> {
     metrics: Arc<Metrics>,
     ts_filter: FetchBatchFilter<T>,
-    part: EncodedPart<T>,
     // If migration is Either, then the columnar one will have already been
-    // applied here.
-    structured_part: (
-        Option<<K::Schema as Schema2<K>>::Decoder>,
-        Option<<V::Schema as Schema2<V>>::Decoder>,
-    ),
-    part_decode_format: PartDecodeFormat,
+    // applied here on the structured data only.
+    part: EitherOrBoth<
+        ColumnarRecords,
+        (
+            <K::Schema as Schema2<K>>::Decoder,
+            <V::Schema as Schema2<V>>::Decoder,
+        ),
+    >,
+    timestamps: Int64Array,
+    diffs: Int64Array,
     migration: PartMigration<K, V>,
     filter_pushdown_audit: Option<LazyPartStats>,
-    part_cursor: Cursor,
+    peek_stash: Option<((Result<K, String>, Result<V, String>), T, D)>,
+    part_cursor: usize,
     key_storage: Option<K::Storage>,
     val_storage: Option<V::Storage>,
 
@@ -781,96 +786,91 @@ impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedPart<K, V,
             None
         };
 
-        // TODO(parkmycar): We should probably refactor this since these columns are duplicated
-        // (via a smart pointer) in EncodedPart.
-        //
-        // For structured columnar data we need to downcast from `dyn Array`s to concrete types.
-        // Downcasting is relatively expensive so we want to do this once, which is why we do it
-        // when creating a FetchedPart.
-        let should_downcast = match part_decode_format {
-            PartDecodeFormat::Row {
-                validate_structured,
-            } => validate_structured,
-            PartDecodeFormat::Arrow => true,
-        };
-        let structured_part = match (&part.part.updates, should_downcast) {
-            // Only downcast and create decoders if we have structured data AND
-            // (an audit of the data is requested OR we'd like to decode
-            // directly from the structured data).
-            (BlobTraceUpdates::Both(_codec, structured), true) => {
-                fn decode<C: Codec>(
-                    name: &str,
-                    schema: &C::Schema,
-                    array: &Arc<dyn Array>,
-                ) -> Option<<C::Schema as Schema2<C>>::Decoder> {
-                    match Schema2::decoder_any(schema, array) {
-                        Ok(x) => Some(x),
-                        Err(err) => {
-                            tracing::error!(?err, "failed to create {} decoder", name);
-                            None
-                        }
-                    }
+        let downcast_structured = |structured: ColumnarRecordsStructuredExt| {
+            let key_size_before = ArrayOrd::new(&structured.key).goodbytes();
+
+            let structured = match &migration {
+                PartMigration::SameSchema { .. } => structured,
+                PartMigration::Codec { .. } => {
+                    return None;
                 }
-                match &migration {
-                    PartMigration::SameSchema { both } => {
-                        let key_size_before = ArrayOrd::new(&structured.key).goodbytes();
-
-                        let key = decode::<K>("key", &*both.key, &structured.key);
-                        let val = decode::<V>("val", &*both.val, &structured.val);
-
-                        if let Some(key_decoder) = key.as_ref() {
-                            let key_size_after = key_decoder.goodbytes();
-                            let key_diff = key_size_before.saturating_sub(key_size_after);
-                            metrics
-                                .pushdown
-                                .parts_projection_trimmed_bytes
-                                .inc_by(u64::cast_from(key_diff));
-                        }
-
-                        (key, val)
-                    }
-                    PartMigration::Codec { .. } => (None, None),
-                    PartMigration::Either {
-                        _write,
-                        read,
-                        key_migration,
-                        val_migration,
-                    } => {
-                        let start = Instant::now();
-                        let key = key_migration.migrate(Arc::clone(&structured.key));
-                        let val = val_migration.migrate(Arc::clone(&structured.val));
-                        metrics
-                            .schema
-                            .migration_migrate_seconds
-                            .inc_by(start.elapsed().as_secs_f64());
-
-                        let key_before_size = ArrayOrd::new(&structured.key).goodbytes();
-                        let key_after_size = ArrayOrd::new(&key).goodbytes();
-                        let key_diff = key_before_size.saturating_sub(key_after_size);
-                        metrics
-                            .pushdown
-                            .parts_projection_trimmed_bytes
-                            .inc_by(u64::cast_from(key_diff));
-
-                        (
-                            decode::<K>("key", &*read.key, &key),
-                            decode::<V>("val", &*read.val, &val),
-                        )
-                    }
+                PartMigration::Either {
+                    write: _,
+                    read: _,
+                    key_migration,
+                    val_migration,
+                } => {
+                    let start = Instant::now();
+                    let key = key_migration.migrate(structured.key);
+                    let val = val_migration.migrate(structured.val);
+                    metrics
+                        .schema
+                        .migration_migrate_seconds
+                        .inc_by(start.elapsed().as_secs_f64());
+                    ColumnarRecordsStructuredExt { key, val }
+                }
+            };
+
+            let read_schema = migration.codec_read();
+            let key = K::Schema::decoder_any(&*read_schema.key, &*structured.key);
+            let val = V::Schema::decoder_any(&*read_schema.val, &*structured.val);
+
+            match &key {
+                Ok(key_decoder) => {
+                    let key_size_after = key_decoder.goodbytes();
+                    let key_diff = key_size_before.saturating_sub(key_size_after);
+                    metrics
+                        .pushdown
+                        .parts_projection_trimmed_bytes
+                        .inc_by(u64::cast_from(key_diff));
+                }
+                Err(e) => {
+                    soft_panic_or_log!("failed to create decoder: {e:#?}");
                 }
             }
-            _ => (None, None),
+
+            Some((key.ok()?, val.ok()?))
+        };
+
+        let updates = part.normalize(&metrics.columnar);
+        let timestamps = updates.timestamps().clone();
+        let diffs = updates.diffs().clone();
+        let part = match updates {
+            // If only one encoding is available, decode via that encoding.
+            BlobTraceUpdates::Row(records) => EitherOrBoth::Left(records),
+            BlobTraceUpdates::Structured { key_values, .. } => EitherOrBoth::Right(
+                // The structured-only data format was added after schema ids were recorded everywhere,
+                // so we expect this data to be present.
+                downcast_structured(key_values).expect("valid schemas for structured data"),
+            ),
+            // If both are available, respect the specified part decode format.
+            BlobTraceUpdates::Both(records, ext) => match part_decode_format {
+                PartDecodeFormat::Row {
+                    validate_structured: false,
+                } => EitherOrBoth::Left(records),
+                PartDecodeFormat::Row {
+                    validate_structured: true,
+                } => match downcast_structured(ext) {
+                    Some(decoders) => EitherOrBoth::Both(records, decoders),
+                    None => EitherOrBoth::Left(records),
+                },
+                PartDecodeFormat::Arrow => match downcast_structured(ext) {
+                    Some(decoders) => EitherOrBoth::Right(decoders),
+                    None => EitherOrBoth::Left(records),
+                },
+            },
         };
 
         FetchedPart {
             metrics,
             ts_filter,
             part,
-            structured_part,
-            part_decode_format,
+            peek_stash: None,
+            timestamps,
+            diffs,
             migration,
             filter_pushdown_audit,
-            part_cursor: Cursor::default(),
+            part_cursor: 0,
             key_storage: None,
             val_storage: None,
             _phantom: PhantomData,
@@ -915,74 +915,88 @@ where
         val: &mut Option<V>,
         result_override: Option<(K, V)>,
     ) -> Option<((Result<K, String>, Result<V, String>), T, D)> {
-        while let Some(((k, v, mut t, d), idx)) = self.part_cursor.pop(&self.part) {
-            if !self.ts_filter.filter_ts(&mut t) {
-                continue;
-            }
-
-            let mut d = D::decode(d);
-
-            // If `filter_ts` advances our timestamp, we may end up with the same K, V, T in successive
-            // records. If so, opportunistically consolidate those out.
-            while let Some((k_next, v_next, mut t_next, d_next)) = self.part_cursor.peek(&self.part)
-            {
-                if (k, v) != (k_next, v_next) {
-                    break;
+        let mut consolidated = self.peek_stash.take();
+        loop {
+            // Fetch and decode the next tuple in the sequence. (Or break if there is none.)
+            let next = if self.part_cursor < self.timestamps.len() {
+                let next_idx = self.part_cursor;
+                self.part_cursor += 1;
+                // These `to_le_bytes` calls were previously encapsulated by `ColumnarRecords`.
+                // TODO(structured): re-encapsulate these once we've finished the structured migration.
+                let mut t = T::decode(self.timestamps.values()[next_idx].to_le_bytes());
+                if !self.ts_filter.filter_ts(&mut t) {
+                    continue;
                 }
-
-                if !self.ts_filter.filter_ts(&mut t_next) {
-                    break;
+                let d = D::decode(self.diffs.values()[next_idx].to_le_bytes());
+                if d.is_zero() {
+                    continue;
                 }
-                if t != t_next {
+                let kv = if result_override.is_none() {
+                    self.decode_kv(next_idx, key, val)
+                } else {
+                    // This will be overridden immediately below - just leave a placeholder here for now.
+                    (Err("".to_string()), Err("".to_string()))
+                };
+                (kv, t, d)
+            } else {
+                break;
+            };
+
+            // Attempt to consolidate in the next tuple, stashing it if that's not possible.
+            if let Some((kv, t, d)) = &mut consolidated {
+                let (kv_next, t_next, d_next) = &next;
+                if kv == kv_next && t == t_next {
+                    d.plus_equals(d_next);
+                    if d.is_zero() {
+                        consolidated = None;
+                    }
+                } else {
+                    self.peek_stash = Some(next);
                     break;
                 }
-
-                // All equal... consolidate!
-                self.part_cursor.idx += 1;
-                d.plus_equals(&D::decode(d_next));
+            } else {
+                consolidated = Some(next);
             }
+        }
 
-            // If multiple updates consolidate out entirely, drop the record.
-            if d.is_zero() {
-                continue;
-            }
+        let (kv, t, d) = consolidated?;
 
-            if let Some((key, val)) = result_override {
-                return Some(((Ok(key), Ok(val)), t, d));
-            }
+        // Override the placeholder result we set above with the true value.
+        if let Some((key, val)) = result_override {
+            return Some(((Ok(key), Ok(val)), t, d));
+        }
 
-            // TODO: Putting this here relies on the Codec data still being
-            // populated (i.e. for the consolidate optimization above).
-            // Eventually we'll have to rewrite this path to work entirely
-            // without Codec data, but in the meantime, putting in here allows
-            // us to see the performance impact of decoding from arrow instead
-            // of Codec.
-            //
-            // Plus, it'll likely be easier to port all the logic here to work
-            // solely on arrow data once we finish migrating things like the
-            // ConsolidatingIter.
-            if let ((Some(keys), Some(vals)), PartDecodeFormat::Arrow) =
-                (&self.structured_part, self.part_decode_format)
-            {
-                let (k, v) = self.decode_structured(idx, keys, vals, key, val);
-                return Some(((k, v), t, d));
-            }
+        Some((kv, t, d))
+    }
 
-            let (k, v) = Self::decode_codec(
-                &self.metrics,
-                self.migration.codec_read(),
-                k,
-                v,
-                key,
-                val,
-                &mut self.key_storage,
-                &mut self.val_storage,
-            );
-            // Note: We only provide structured columns, if they were originally written, and a
-            // dyncfg was specified to run validation.
-            if let (Some(keys), Some(vals)) = &self.structured_part {
-                let (k_s, v_s) = self.decode_structured(idx, keys, vals, &mut None, &mut None);
+    fn decode_kv(
+        &mut self,
+        index: usize,
+        key: &mut Option<K>,
+        val: &mut Option<V>,
+    ) -> (Result<K, String>, Result<V, String>) {
+        let decoded = self
+            .part
+            .as_ref()
+            .map_left(|codec| {
+                let ((ck, cv), _, _) = codec.get(index).expect("valid index");
+                Self::decode_codec(
+                    &*self.metrics,
+                    self.migration.codec_read(),
+                    ck,
+                    cv,
+                    key,
+                    val,
+                    &mut self.key_storage,
+                    &mut self.val_storage,
+                )
+            })
+            .map_right(|(structured_key, structured_val)| {
+                self.decode_structured(index, structured_key, structured_val, key, val)
+            });
 
+        match decoded {
+            EitherOrBoth::Both((k, v), (k_s, v_s)) => {
                 // Purposefully do not trace to prevent blowing up Sentry.
                 let is_valid = self
                     .metrics
@@ -1003,11 +1017,12 @@ where
                 if !is_valid {
                     soft_panic_no_log!("structured val did not match, {v_s:?} != {v:?}");
                 }
-            }
 
-            return Some(((k, v), t, d));
+                (k, v)
+            }
+            EitherOrBoth::Left(kv) => kv,
+            EitherOrBoth::Right(kv) => kv,
         }
-        None
     }
 
     fn decode_codec(
@@ -1078,7 +1093,7 @@ where
 
     fn size_hint(&self) -> (usize, Option<usize>) {
         // We don't know in advance how restrictive the filter will be.
-        let max_len = self.part.part.updates.len();
+        let max_len = self.timestamps.len();
         (0, Some(max_len))
     }
 }
@@ -1287,6 +1302,12 @@ where
             timestamps = realloc_array(&timestamps, metrics);
         }
 
+        if self.ts_rewrite.is_some() {
+            self.metrics
+                .ts_rewrite
+                .inc_by(u64::cast_from(timestamps.len()));
+        }
+
         match (codec, structured) {
             (Some((key, value)), None) => {
                 BlobTraceUpdates::Row(ColumnarRecords::new(key, value, timestamps, diffs))
@@ -1304,103 +1325,6 @@ where
     }
 }
 
-/// A pointer into a particular encoded part, with methods for fetching an update and
-/// scanning forward to the next. It is an error to use the same cursor for distinct
-/// parts.
-///
-/// We avoid implementing copy to make it hard to accidentally duplicate a cursor. However,
-/// clone is very cheap.
-#[derive(Debug, Clone, Default)]
-pub(crate) struct Cursor {
-    idx: usize,
-}
-
-impl Cursor {
-    /// Get the tuple at the specified pair of indices. If there is no such tuple,
-    /// either because we are out of range or because this tuple has been filtered out,
-    /// this returns `None`.
-    pub fn get<'a, T: Timestamp + Lattice + Codec64>(
-        &self,
-        encoded: &'a EncodedPart<T>,
-    ) -> Option<(&'a [u8], &'a [u8], T, [u8; 8])> {
-        // TODO(structured): replace before allowing structured-only parts
-        let part = encoded
-            .part
-            .updates
-            .records()
-            .expect("created cursor for non-codec data");
-        let ((k, v), t, d) = part.get(self.idx)?;
-
-        let mut t = T::decode(t);
-        // We assert on the write side that at most one of rewrite or
-        // truncation is used, so it shouldn't matter which is run first.
-        //
-        // That said, my (Dan's) intuition here is that rewrite goes first,
-        // though I don't particularly have a justification for it.
-        if let Some(ts_rewrite) = encoded.ts_rewrite.as_ref() {
-            t.advance_by(ts_rewrite.borrow());
-            encoded.metrics.ts_rewrite.inc();
-        }
-
-        // This filtering is really subtle, see the comment above for
-        // what's going on here.
-        let truncated_t = encoded.needs_truncation && {
-            !encoded.registered_desc.lower().less_equal(&t)
-                || encoded.registered_desc.upper().less_equal(&t)
-        };
-        if truncated_t {
-            return None;
-        }
-        Some((k, v, t, d))
-    }
-
-    /// A cursor points to a particular update in the backing part data.
-    /// If the update it points to is not valid, advance it to the next valid update
-    /// if there is one, and return the pointed-to data.
-    pub fn peek<'a, T: Timestamp + Lattice + Codec64>(
-        &mut self,
-        part: &'a EncodedPart<T>,
-    ) -> Option<(&'a [u8], &'a [u8], T, [u8; 8])> {
-        while !self.is_exhausted(part) {
-            let current = self.get(part);
-            if current.is_some() {
-                return current;
-            }
-            self.advance(part);
-        }
-        None
-    }
-
-    /// Similar to peek, but advance the cursor just past the end of the most recent update.
-    /// Returns the update and the `(part_idx, idx)` that is was popped at.
-    pub fn pop<'a, T: Timestamp + Lattice + Codec64>(
-        &mut self,
-        part: &'a EncodedPart<T>,
-    ) -> Option<((&'a [u8], &'a [u8], T, [u8; 8]), usize)> {
-        while !self.is_exhausted(part) {
-            let current = self.get(part);
-            let popped_idx = self.idx;
-            self.advance(part);
-            if current.is_some() {
-                return current.map(|p| (p, popped_idx));
-            }
-        }
-        None
-    }
-
-    /// Returns true if the cursor is past the end of the part data.
-    pub fn is_exhausted<T: Timestamp + Codec64>(&self, part: &EncodedPart<T>) -> bool {
-        self.idx >= part.part.updates.len()
-    }
-
-    /// Advance the cursor just past the end of the most recent update, if there is one.
-    pub fn advance<T: Timestamp + Codec64>(&mut self, part: &EncodedPart<T>) {
-        if !self.is_exhausted(part) {
-            self.idx += 1;
-        }
-    }
-}
-
 /// This represents the serde encoding for [`LeasedBatchPart`]. We expose the struct
 /// itself (unlike other encodable structs) to attempt to provide stricter drop
 /// semantics on `LeasedBatchPart`, i.e. `SerdeLeasedBatchPart` is exchangeable
diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs
index fb41da45549e5..0fd9c60125be7 100644
--- a/src/persist-client/src/internal/encoding.rs
+++ b/src/persist-client/src/internal/encoding.rs
@@ -1524,6 +1524,7 @@ impl RustType<proto_hollow_batch_part::Format> for BatchColumnarFormat {
             BatchColumnarFormat::Both(version) => {
                 proto_hollow_batch_part::Format::RowAndColumnar((*version).cast_into())
             }
+            BatchColumnarFormat::Structured => proto_hollow_batch_part::Format::Structured(()),
         }
     }
 
@@ -1533,6 +1534,7 @@ impl RustType<proto_hollow_batch_part::Format> for BatchColumnarFormat {
             proto_hollow_batch_part::Format::RowAndColumnar(version) => {
                 BatchColumnarFormat::Both(version.cast_into())
             }
+            proto_hollow_batch_part::Format::Structured(_) => BatchColumnarFormat::Structured,
         };
         Ok(format)
     }
diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs
index b9774e8760c77..f983cfc0dbbc1 100644
--- a/src/persist-client/src/internal/machine.rs
+++ b/src/persist-client/src/internal/machine.rs
@@ -1422,7 +1422,7 @@ pub mod datadriven {
         BatchParts, BLOB_TARGET_SIZE, BUILDER_STRUCTURED, STRUCTURED_ORDER,
     };
     use crate::cfg::COMPACTION_MEMORY_BOUND_BYTES;
-    use crate::fetch::{Cursor, EncodedPart};
+    use crate::fetch::EncodedPart;
     use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
     use crate::internal::datadriven::DirectiveArgs;
     use crate::internal::encoding::Schemas;
@@ -1894,9 +1894,10 @@ pub mod datadriven {
             )
             .await
             .expect("invalid batch part");
-            let mut cursor = Cursor::default();
-            while let Some(((k, _v, t, d), _)) = cursor.pop(&part) {
+            let part = part.normalize(&datadriven.client.metrics.columnar);
+            for ((k, _v), t, d) in part.records().expect("codec records").iter() {
                 let (k, d) = (String::decode(k, &StringSchema).unwrap(), i64::decode(d));
+                let t = u64::from_le_bytes(t);
                 write!(s, "{k} {t} {d}\n");
             }
         }
@@ -2148,10 +2149,11 @@ pub mod datadriven {
                     )
                     .await
                     .expect("invalid batch part");
+                    let part = part.normalize(&datadriven.client.metrics.columnar);
 
                     let mut updates = Vec::new();
-                    let mut cursor = Cursor::default();
-                    while let Some(((k, _v, mut t, d), _)) = cursor.pop(&part) {
+                    for ((k, _v), t, d) in part.records().expect("codec data").iter() {
+                        let mut t = u64::decode(t);
                         t.advance_by(as_of.borrow());
                         updates.push((
                             String::decode(k, &StringSchema).unwrap(),
diff --git a/src/persist-client/src/internal/state.proto b/src/persist-client/src/internal/state.proto
index 6064e2e3d6846..4ef289724ac19 100644
--- a/src/persist-client/src/internal/state.proto
+++ b/src/persist-client/src/internal/state.proto
@@ -56,6 +56,7 @@ message ProtoHollowBatchPart {
   oneof format {
     google.protobuf.Empty row = 7;
     uint64 row_and_columnar = 8;
+    google.protobuf.Empty structured = 13;
   }
   optional uint64 schema_id = 12;
 
diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs
index 6cb638e7c77e2..9ed03c652dcec 100644
--- a/src/persist-client/src/lib.rs
+++ b/src/persist-client/src/lib.rs
@@ -826,8 +826,12 @@ mod tests {
             .await
             .expect("failed to fetch part")
             .expect("missing part");
-        let part =
+        let mut part =
             BlobTraceBatchPart::decode(&value, &metrics.columnar).expect("failed to decode part");
+        // Ensure codec data is present even if it was not generated at write time.
+        let _ = part
+            .updates
+            .get_or_make_codec::<K, V>(&read_schemas.key, &read_schemas.val);
         let mut updates = Vec::new();
         // TODO(bkirwi): switch to structured data in tests
         for ((k, v), t, d) in part.updates.records().expect("codec data").iter() {
diff --git a/src/persist-client/src/schema.rs b/src/persist-client/src/schema.rs
index d7e1be8635a77..fed263f07167b 100644
--- a/src/persist-client/src/schema.rs
+++ b/src/persist-client/src/schema.rs
@@ -306,7 +306,7 @@ pub(crate) enum PartMigration<K: Codec, V: Codec> {
     Codec { read: Schemas<K, V> },
     /// We have both write and read schemas, and they don't match.
     Either {
-        _write: Schemas<K, V>,
+        write: Schemas<K, V>,
         read: Schemas<K, V>,
         key_migration: Arc<Migration>,
         val_migration: Arc<Migration>,
@@ -319,12 +319,12 @@ impl<K: Codec, V: Codec> Clone for PartMigration<K, V> {
             Self::SameSchema { both } => Self::SameSchema { both: both.clone() },
             Self::Codec { read } => Self::Codec { read: read.clone() },
             Self::Either {
-                _write,
+                write,
                 read,
                 key_migration,
                 val_migration,
             } => Self::Either {
-                _write: _write.clone(),
+                write: write.clone(),
                 read: read.clone(),
                 key_migration: Arc::clone(key_migration),
                 val_migration: Arc::clone(val_migration),
@@ -384,7 +384,7 @@ where
                     .inc_by(start.elapsed().as_secs_f64());
 
                 Ok(PartMigration::Either {
-                    _write: write,
+                    write,
                     read,
                     key_migration,
                     val_migration,
diff --git a/src/persist/src/indexed/columnar/arrow.rs b/src/persist/src/indexed/columnar/arrow.rs
index 5c0648578c7ba..a7ef8d2f8f239 100644
--- a/src/persist/src/indexed/columnar/arrow.rs
+++ b/src/persist/src/indexed/columnar/arrow.rs
@@ -11,18 +11,18 @@
 
 use std::ptr::NonNull;
 use std::sync::Arc;
-use std::sync::LazyLock;
 
-use arrow::array::{make_array, Array, ArrayData, ArrayRef, AsArray};
+use anyhow::anyhow;
+use arrow::array::{make_array, Array, ArrayData, ArrayRef, BinaryArray, Int64Array, RecordBatch};
 use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer};
-use arrow::datatypes::{DataType, Field, Schema, ToByteSlice};
+use arrow::datatypes::ToByteSlice;
 use mz_dyncfg::Config;
-use mz_ore::iter::IteratorExt;
 
 use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
+use crate::indexed::encoding::BlobTraceUpdates;
 use crate::metrics::ColumnarMetrics;
 
-/// The Arrow schema we use to encode ((K, V), T, D) tuples.
+/// Converts a [`ColumnarRecords`] into [`arrow`] columns.
 ///
 /// Both Time and Diff are presented externally to persist users as a type
 /// parameter that implements [mz_persist_types::Codec64]. Our columnar format
@@ -37,55 +37,28 @@ use crate::metrics::ColumnarMetrics;
 /// time after year 2200). Using a i64 might be a pessimization for a
 /// non-realtime mz source with u64 timestamps in the range `(i64::MAX,
 /// u64::MAX]`, but realtime sources are overwhelmingly the common case.
-pub static SCHEMA_ARROW_RS_KVTD: LazyLock<Arc<Schema>> = LazyLock::new(|| {
-    let schema = Schema::new(vec![
-        Field::new("k", DataType::Binary, false),
-        Field::new("v", DataType::Binary, false),
-        Field::new("t", DataType::Int64, false),
-        Field::new("d", DataType::Int64, false),
-    ]);
-    Arc::new(schema)
-});
-
-/// Converts a [`ColumnarRecords`] into `(K, V, T, D)` [`arrow`] columns.
-pub fn encode_arrow_batch_kvtd(x: &ColumnarRecords) -> Vec<arrow::array::ArrayRef> {
-    let key = x.key_data.clone();
-    let val = x.val_data.clone();
-    let ts = x.timestamps.clone();
-    let diff = x.diffs.clone();
-
-    vec![Arc::new(key), Arc::new(val), Arc::new(ts), Arc::new(diff)]
-}
-
-/// Converts a [`ColumnarRecords`] and [`ColumnarRecordsStructuredExt`] pair
-/// (aka [`BlobTraceUpdates::Both`]) into [`arrow::array::Array`]s with columns
-/// [(K, V, T, D, K_S, V_S)].
-///
-/// [`BlobTraceUpdates::Both`]: crate::indexed::encoding::BlobTraceUpdates::Both
-pub fn encode_arrow_batch_kvtd_ks_vs(
-    records: &ColumnarRecords,
-    structured: &ColumnarRecordsStructuredExt,
-) -> (Vec<Arc<Field>>, Vec<Arc<dyn Array>>) {
-    let mut fields: Vec<_> = (*SCHEMA_ARROW_RS_KVTD).fields().iter().cloned().collect();
-    let mut arrays = encode_arrow_batch_kvtd(records);
-
-    {
-        let key_array = &structured.key;
-        let key_field = Field::new("k_s", key_array.data_type().clone(), false);
-
-        fields.push(Arc::new(key_field));
-        arrays.push(Arc::clone(key_array));
+pub fn encode_arrow_batch(updates: &BlobTraceUpdates) -> RecordBatch {
+    fn array_ref<A: Array + Clone + 'static>(a: &A) -> ArrayRef {
+        Arc::new(a.clone())
     }
-
-    {
-        let val_array = &structured.val;
-        let val_field = Field::new("v_s", val_array.data_type().clone(), false);
-
-        fields.push(Arc::new(val_field));
-        arrays.push(Arc::clone(val_array));
-    }
-
-    (fields, arrays)
+    // For historical reasons, the codec-encoded columns are placed before T/D,
+    // and the structured-encoding columns are placed after.
+    let kv = updates
+        .records()
+        .into_iter()
+        .flat_map(|x| [("k", array_ref(&x.key_data)), ("v", array_ref(&x.val_data))]);
+    let td = [
+        ("t", array_ref(updates.timestamps())),
+        ("d", array_ref(updates.diffs())),
+    ];
+    let ks_vs = updates
+        .structured()
+        .into_iter()
+        .flat_map(|x| [("k_s", Arc::clone(&x.key)), ("v_s", Arc::clone(&x.val))]);
+
+    // We expect all the top-level fields to be fully defined.
+    let fields = kv.chain(td).chain(ks_vs).map(|(f, a)| (f, a, false));
+    RecordBatch::try_from_iter_with_nullable(fields).expect("valid field definitions")
 }
 
 pub(crate) const ENABLE_ARROW_LGALLOC_CC_SIZES: Config<bool> = Config::new(
@@ -194,84 +167,71 @@ fn realloc_buffer(buffer: &Buffer, metrics: &ColumnarMetrics) -> Buffer {
     unsafe { Buffer::from_custom_allocation(ptr, bytes.len(), Arc::new(region)) }
 }
 
-/// Converts an [`arrow`] [(K, V, T, D)] [`RecordBatch`] into a [`ColumnarRecords`].
-///
-/// [`RecordBatch`]: `arrow::array::RecordBatch`
-pub fn decode_arrow_batch_kvtd(
-    columns: &[Arc<dyn Array>],
-    metrics: &ColumnarMetrics,
-) -> Result<ColumnarRecords, String> {
-    let (key_col, val_col, ts_col, diff_col) = match &columns {
-        x @ &[k, v, t, d] => {
-            // The columns need to all have the same logical length.
-            if !x.iter().map(|col| col.len()).all_equal() {
-                return Err(format!(
-                    "columns don't all have equal length {k_len}, {v_len}, {t_len}, {d_len}",
-                    k_len = k.len(),
-                    v_len = v.len(),
-                    t_len = t.len(),
-                    d_len = d.len()
-                ));
-            }
-
-            (k, v, t, d)
-        }
-        _ => return Err(format!("expected 4 columns got {}", columns.len())),
-    };
-
-    let key = key_col
-        .as_binary_opt::<i32>()
-        .ok_or_else(|| "key column is wrong type".to_string())?;
-
-    let val = val_col
-        .as_binary_opt::<i32>()
-        .ok_or_else(|| "val column is wrong type".to_string())?;
-
-    let time = ts_col
-        .as_primitive_opt::<arrow::datatypes::Int64Type>()
-        .ok_or_else(|| "time column is wrong type".to_string())?;
-
-    let diff = diff_col
-        .as_primitive_opt::<arrow::datatypes::Int64Type>()
-        .ok_or_else(|| "diff column is wrong type".to_string())?;
-
-    let len = key.len();
-    let ret = ColumnarRecords {
-        len,
-        key_data: realloc_array(key, metrics),
-        val_data: realloc_array(val, metrics),
-        timestamps: realloc_array(time, metrics),
-        diffs: realloc_array(diff, metrics),
-    };
-    ret.validate()?;
-
-    Ok(ret)
-}
-
-/// Converts an arrow [(K, V, T, D)] Chunk into a ColumnarRecords.
-pub fn decode_arrow_batch_kvtd_ks_vs(
-    cols: &[Arc<dyn Array>],
-    key_col: Arc<dyn Array>,
-    val_col: Arc<dyn Array>,
+/// Converts an [`arrow`] [RecordBatch] into a [BlobTraceUpdates] and reallocate the backing data.
+pub fn decode_arrow_batch(
+    batch: &RecordBatch,
     metrics: &ColumnarMetrics,
-) -> Result<(ColumnarRecords, ColumnarRecordsStructuredExt), String> {
-    let same_length = cols
-        .iter()
-        .map(|col| col.as_ref())
-        .chain([&*key_col])
-        .chain([&*val_col])
-        .map(|col| col.len())
-        .all_equal();
-    if !same_length {
-        return Err("not all columns (included structured) have the same length".to_string());
+) -> anyhow::Result<BlobTraceUpdates> {
+    fn try_downcast<A: Array + From<ArrayData> + 'static>(
+        batch: &RecordBatch,
+        name: &'static str,
+        metrics: &ColumnarMetrics,
+    ) -> anyhow::Result<Option<A>> {
+        let Some(array_ref) = batch.column_by_name(name) else {
+            return Ok(None);
+        };
+        let col_ref = array_ref
+            .as_any()
+            .downcast_ref::<A>()
+            .ok_or_else(|| anyhow!("wrong datatype for column {}", name))?;
+        let col = realloc_array(col_ref, metrics);
+        Ok(Some(col))
     }
 
-    // We always have (K, V, T, D) columns.
-    let primary_records = decode_arrow_batch_kvtd(cols, metrics)?;
-    let structured_ext = ColumnarRecordsStructuredExt {
-        key: key_col,
-        val: val_col,
+    let codec_key = try_downcast::<BinaryArray>(batch, "k", metrics)?;
+    let codec_val = try_downcast::<BinaryArray>(batch, "v", metrics)?;
+    let timestamps = try_downcast::<Int64Array>(batch, "t", metrics)?
+        .ok_or_else(|| anyhow!("missing timestamp column"))?;
+    let diffs = try_downcast::<Int64Array>(batch, "d", metrics)?
+        .ok_or_else(|| anyhow!("missing diff column"))?;
+    let structured_key = batch
+        .column_by_name("k_s")
+        .map(|a| realloc_any(Arc::clone(a), metrics));
+    let structured_val = batch
+        .column_by_name("v_s")
+        .map(|a| realloc_any(Arc::clone(a), metrics));
+
+    let updates = match (codec_key, codec_val, structured_key, structured_val) {
+        (Some(codec_key), Some(codec_val), Some(structured_key), Some(structured_val)) => {
+            BlobTraceUpdates::Both(
+                ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
+                ColumnarRecordsStructuredExt {
+                    key: structured_key,
+                    val: structured_val,
+                },
+            )
+        }
+        (Some(codec_key), Some(codec_val), None, None) => BlobTraceUpdates::Row(
+            ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
+        ),
+        (None, None, Some(structured_key), Some(structured_val)) => BlobTraceUpdates::Structured {
+            key_values: ColumnarRecordsStructuredExt {
+                key: structured_key,
+                val: structured_val,
+            },
+            timestamps,
+            diffs,
+        },
+        (k, v, ks, vs) => {
+            anyhow::bail!(
+                "unexpected mix of key/value columns: k={:?}, v={}, k_s={}, v_s={}",
+                k.is_some(),
+                v.is_some(),
+                ks.is_some(),
+                vs.is_some(),
+            );
+        }
     };
 
-    Ok((primary_records, structured_ext))
+    Ok(updates)
 }
diff --git a/src/persist/src/indexed/columnar/parquet.rs b/src/persist/src/indexed/columnar/parquet.rs
index 9d54ab3c830a7..f6f45f715aba9 100644
--- a/src/persist/src/indexed/columnar/parquet.rs
+++ b/src/persist/src/indexed/columnar/parquet.rs
@@ -12,8 +12,6 @@
 use std::io::Write;
 use std::sync::Arc;
 
-use arrow::datatypes::Schema;
-use arrow::record_batch::RecordBatch;
 use differential_dataflow::trace::Description;
 use mz_ore::bytes::SegmentedBytes;
 use mz_ore::cast::CastFrom;
@@ -30,11 +28,7 @@ use tracing::warn;
 use crate::error::Error;
 use crate::gen::persist::proto_batch_part_inline::FormatMetadata as ProtoFormatMetadata;
 use crate::gen::persist::ProtoBatchFormat;
-use crate::indexed::columnar::arrow::{
-    decode_arrow_batch_kvtd, decode_arrow_batch_kvtd_ks_vs, encode_arrow_batch_kvtd,
-    encode_arrow_batch_kvtd_ks_vs, realloc_any, SCHEMA_ARROW_RS_KVTD,
-};
-use crate::indexed::columnar::ColumnarRecords;
+use crate::indexed::columnar::arrow::{decode_arrow_batch, encode_arrow_batch};
 use crate::indexed::encoding::{
     decode_trace_inline_meta, encode_trace_inline_meta, BlobTraceBatchPart, BlobTraceUpdates,
 };
@@ -128,24 +122,14 @@ pub fn encode_parquet_kvtd<W: Write + Send>(
         .set_key_value_metadata(Some(vec![metadata]))
         .build();
 
-    let (columns, schema, format) = match updates {
-        BlobTraceUpdates::Row(updates) => (
-            encode_arrow_batch_kvtd(updates),
-            Arc::clone(&*SCHEMA_ARROW_RS_KVTD),
-            "k,v,t,d",
-        ),
-        BlobTraceUpdates::Both(codec_updates, structured_updates) => {
-            let (fields, arrays) = encode_arrow_batch_kvtd_ks_vs(codec_updates, structured_updates);
-            let schema = Schema::new(fields);
-            (arrays, Arc::new(schema), "k,v,t,d,k_s,v_s")
-        }
-        BlobTraceUpdates::Structured { .. } => {
-            unimplemented!("codec data should exist before reaching parquet encoding")
-        }
+    let batch = encode_arrow_batch(updates);
+    let format = match updates {
+        BlobTraceUpdates::Row(_) => "k,v,t,d",
+        BlobTraceUpdates::Both(_, _) => "k,v,t,d,k_s,v_s",
+        BlobTraceUpdates::Structured { .. } => "t,d,k_s,v_s",
     };
 
-    let mut writer = ArrowWriter::try_new(w, Arc::clone(&schema), Some(properties))?;
-    let batch = RecordBatch::try_new(Arc::clone(&schema), columns)?;
+    let mut writer = ArrowWriter::try_new(w, batch.schema(), Some(properties))?;
     writer.write(&batch)?;
 
     writer.flush()?;
@@ -179,31 +163,20 @@ pub fn decode_parquet_file_kvtd(
 
     match format_metadata {
         None => {
-            // Make sure we have all of the expected columns.
-            if SCHEMA_ARROW_RS_KVTD.fields() != schema.fields() {
-                return Err(format!("found invalid schema {:?}", schema).into());
-            }
-
             let mut ret = Vec::new();
             for batch in reader {
                 let batch = batch.map_err(|e| Error::String(e.to_string()))?;
-                ret.push(decode_arrow_batch_kvtd(batch.columns(), metrics)?);
+                ret.push(batch);
             }
             if ret.len() != 1 {
                 warn!("unexpected number of row groups: {}", ret.len());
             }
-            Ok(BlobTraceUpdates::Row(ColumnarRecords::concat(
-                &ret, metrics,
-            )))
+            let batch = ::arrow::compute::concat_batches(&schema, &ret)?;
+            let updates = decode_arrow_batch(&batch, metrics).map_err(|e| e.to_string())?;
+            Ok(updates)
         }
-        Some(ProtoFormatMetadata::StructuredMigration(v @ 1 | v @ 2)) => {
-            if schema.fields().len() > 6 {
-                return Err(
-                    format!("expected at most 6 columns, got {}", schema.fields().len()).into(),
-                );
-            }
-
-            let batch = reader
+        Some(ProtoFormatMetadata::StructuredMigration(v @ 1..=3)) => {
+            let mut batch = reader
                 .next()
                 .ok_or_else(|| Error::String("found empty batch".to_string()))??;
 
@@ -211,45 +184,14 @@ pub fn decode_parquet_file_kvtd(
             if reader.next().is_some() {
                 return Err(Error::String("found more than one RowGroup".to_string()));
             }
-            let columns = batch.columns();
-
-            // The first 4 columns are our primary (K, V, T, D) and optionally
-            // we also have K_S and/or V_S if we wrote structured data.
-            let primary_columns = &columns[..4];
 
             // Version 1 is a deprecated format so we just ignored the k_s and v_s columns.
-            if *v == 1 {
-                let records = decode_arrow_batch_kvtd(primary_columns, metrics)?;
-                return Ok(BlobTraceUpdates::Row(records));
+            if *v == 1 && batch.num_columns() > 4 {
+                batch = batch.project(&[0, 1, 2, 3])?;
             }
 
-            let k_s_column = schema
-                .fields()
-                .iter()
-                .position(|field| field.name() == "k_s")
-                .map(|idx| realloc_any(Arc::clone(&columns[idx]), metrics));
-            let v_s_column = schema
-                .fields()
-                .iter()
-                .position(|field| field.name() == "v_s")
-                .map(|idx| realloc_any(Arc::clone(&columns[idx]), metrics));
-
-            match (k_s_column, v_s_column) {
-                (Some(ks), Some(vs)) => {
-                    let (records, structured_ext) =
-                        decode_arrow_batch_kvtd_ks_vs(primary_columns, ks, vs, metrics)?;
-                    Ok(BlobTraceUpdates::Both(records, structured_ext))
-                }
-                (ks, vs) => {
-                    warn!(
-                        "unable to read back structured data! version={v} ks={} vs={}",
-                        ks.is_some(),
-                        vs.is_some()
-                    );
-                    let records = decode_arrow_batch_kvtd(primary_columns, metrics)?;
-                    Ok(BlobTraceUpdates::Row(records))
-                }
-            }
+            let updates = decode_arrow_batch(&batch, metrics).map_err(|e| e.to_string())?;
+            Ok(updates)
         }
         unknown => Err(format!("unkown ProtoFormatMetadata, {unknown:?}"))?,
     }
diff --git a/src/persist/src/indexed/encoding.rs b/src/persist/src/indexed/encoding.rs
index 44ecb7ccc0de1..9f43d2f10e910 100644
--- a/src/persist/src/indexed/encoding.rs
+++ b/src/persist/src/indexed/encoding.rs
@@ -60,6 +60,9 @@ pub enum BatchColumnarFormat {
     /// with a schema of `(k, k_c, v, v_c, t, d)`, where `k` are the serialized bytes and `k_c` is
     /// nested columnar data.
     Both(usize),
+    /// Rows are encoded to a columnar struct. The batch is written down as Parquet
+    /// with a schema of `(t, d, k_s, v_s)`, where `k_s` is nested columnar data.
+    Structured,
 }
 
 impl BatchColumnarFormat {
@@ -75,6 +78,7 @@ impl BatchColumnarFormat {
             "row" => BatchColumnarFormat::Row,
             "both" => BatchColumnarFormat::Both(0),
             "both_v2" => BatchColumnarFormat::Both(2),
+            "structured" => BatchColumnarFormat::Structured,
             x => {
                 let default = BatchColumnarFormat::default();
                 soft_panic_or_log!("Invalid batch columnar type: {x}, falling back to {default}");
@@ -100,6 +104,7 @@ impl BatchColumnarFormat {
             // The V0 format has been deprecated and we ignore its structured columns.
             BatchColumnarFormat::Both(0 | 1) => false,
             BatchColumnarFormat::Both(_) => true,
+            BatchColumnarFormat::Structured => true,
         }
     }
 }
@@ -406,29 +411,51 @@ impl BlobTraceUpdates {
         lgbytes: &ColumnarMetrics,
         proto: ProtoColumnarRecords,
     ) -> Result<Self, TryFromProtoError> {
-        let binary_array = |data: Bytes, offsets: Vec<i32>| match BinaryArray::try_new(
-            OffsetBuffer::new(offsets.into()),
-            ::arrow::buffer::Buffer::from_bytes(data.into()),
-            None,
-        ) {
-            Ok(data) => Ok(realloc_array(&data, lgbytes)),
-            Err(e) => Err(TryFromProtoError::InvalidFieldError(format!(
-                "Unable to decode binary array from repeated proto fields: {e:?}"
-            ))),
+        let binary_array = |data: Bytes, offsets: Vec<i32>| {
+            if offsets.is_empty() && proto.len > 0 {
+                return Ok(None);
+            };
+            match BinaryArray::try_new(
+                OffsetBuffer::new(offsets.into()),
+                ::arrow::buffer::Buffer::from_bytes(data.into()),
+                None,
+            ) {
+                Ok(data) => Ok(Some(realloc_array(&data, lgbytes))),
+                Err(e) => Err(TryFromProtoError::InvalidFieldError(format!(
+                    "Unable to decode binary array from repeated proto fields: {e:?}"
+                ))),
+            }
         };
 
-        let ret = ColumnarRecords::new(
-            binary_array(proto.key_data, proto.key_offsets)?,
-            binary_array(proto.val_data, proto.val_offsets)?,
-            realloc_array(&proto.timestamps.into(), lgbytes),
-            realloc_array(&proto.diffs.into(), lgbytes),
-        );
+        let codec_key = binary_array(proto.key_data, proto.key_offsets)?;
+        let codec_val = binary_array(proto.val_data, proto.val_offsets)?;
+
+        let timestamps = realloc_array(&proto.timestamps.into(), lgbytes);
+        let diffs = realloc_array(&proto.diffs.into(), lgbytes);
         let ext =
             ColumnarRecordsStructuredExt::from_proto(proto.key_structured, proto.val_structured)?;
 
-        let updates = match ext {
-            None => Self::Row(ret),
-            Some(ext) => Self::Both(ret, ext),
+        let updates = match (codec_key, codec_val, ext) {
+            (Some(codec_key), Some(codec_val), Some(ext)) => BlobTraceUpdates::Both(
+                ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
+                ext,
+            ),
+            (Some(codec_key), Some(codec_val), None) => BlobTraceUpdates::Row(
+                ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
+            ),
+            (None, None, Some(ext)) => BlobTraceUpdates::Structured {
+                key_values: ext,
+                timestamps,
+                diffs,
+            },
+            (k, v, ext) => {
+                return Err(TryFromProtoError::InvalidPersistState(format!(
+                    "unexpected mix of key/value columns: k={:?}, v={}, ext={}",
+                    k.is_some(),
+                    v.is_some(),
+                    ext.is_some(),
+                )))
+            }
         };
 
         Ok(updates)
@@ -488,6 +515,16 @@ impl BlobTraceUpdates {
                         .clone(),
                 )
             }
+            BatchColumnarFormat::Structured => {
+                let mut this = self.clone();
+                Self::Structured {
+                    key_values: this
+                        .get_or_make_structured::<K, V>(key_schema, val_schema)
+                        .clone(),
+                    timestamps: this.timestamps().clone(),
+                    diffs: this.diffs().clone(),
+                }
+            }
         }
     }
 }
@@ -686,9 +723,9 @@ pub fn encode_trace_inline_meta<T: Timestamp + Codec64>(batch: &BlobTraceBatchPa
             let metadata = ProtoFormatMetadata::StructuredMigration(2);
             (ProtoBatchFormat::ParquetStructured, Some(metadata))
         }
-
         BlobTraceUpdates::Structured { .. } => {
-            unimplemented!("codec data should exist before reaching parquet encoding")
+            let metadata = ProtoFormatMetadata::StructuredMigration(3);
+            (ProtoBatchFormat::ParquetStructured, Some(metadata))
         }
     };