diff --git a/Cargo.lock b/Cargo.lock index 190438f638e4c..e079834af59d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2424,8 +2424,7 @@ dependencies = [ [[package]] name = "differential-dataflow" version = "0.13.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "922c18a0f94e29defaef228ecd65589880c16f3f3462a33258f869119f039443" +source = "git+https://github.com/antiguru/differential-dataflow?branch=timely_update#256be27cf1c302c22c5f99ed052a5b12b46ac410" dependencies = [ "columnar", "columnation", @@ -10687,8 +10686,7 @@ dependencies = [ [[package]] name = "timely" version = "0.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a714a3fed9aeacf63d9c5c574523c18972b788fa0414011b590af73acad30b09" +source = "git+https://github.com/TimelyDataflow/timely-dataflow#d0ea86fb6a8c540c9775908250ab34a62be7ee87" dependencies = [ "bincode", "byteorder", @@ -10707,14 +10705,12 @@ dependencies = [ [[package]] name = "timely_bytes" version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46e1275de95b4a2713f0850c458d3a550dc323fffda65ce3e075f62545e0484b" +source = "git+https://github.com/TimelyDataflow/timely-dataflow#d0ea86fb6a8c540c9775908250ab34a62be7ee87" [[package]] name = "timely_communication" version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3cdbfc7739e6a8ed95cd591ec0e862f294c681796c6121e6b3fa1ab946473e1" +source = "git+https://github.com/TimelyDataflow/timely-dataflow#d0ea86fb6a8c540c9775908250ab34a62be7ee87" dependencies = [ "byteorder", "columnar", @@ -10729,14 +10725,12 @@ dependencies = [ [[package]] name = "timely_container" version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c951c468b95e2070be7f48a9d8350b6e8e5ecb23e0d13fd7f6155893bb1297d" +source = "git+https://github.com/TimelyDataflow/timely-dataflow#d0ea86fb6a8c540c9775908250ab34a62be7ee87" [[package]] name = "timely_logging" version = "0.13.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10d46d6e2fbf5831ff8345f92723e46f778ce157cf8b74448fdcaac9efb9b9a2" +source = "git+https://github.com/TimelyDataflow/timely-dataflow#d0ea86fb6a8c540c9775908250ab34a62be7ee87" dependencies = [ "timely_container", ] diff --git a/Cargo.toml b/Cargo.toml index e0fd70a20f5b3..53af97ca2a008 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -295,6 +295,8 @@ incremental = true # merged), after which point it becomes impossible to build that historical # version of Materialize. [patch.crates-io] +timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } +differential-dataflow = { git = "https://github.com/antiguru/differential-dataflow", branch = "timely_update" } # Waiting on https://github.com/sfackler/rust-postgres/pull/752. postgres = { git = "https://github.com/MaterializeInc/rust-postgres" } tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres" } diff --git a/src/compute/src/render/continual_task.rs b/src/compute/src/render/continual_task.rs index 78bcd3d9846db..683218784a188 100644 --- a/src/compute/src/render/continual_task.rs +++ b/src/compute/src/render/continual_task.rs @@ -782,7 +782,7 @@ where let mut input = builder.new_input_connection( &self.inner, Pipeline, - vec![Antichain::from_elem(step_forward_summary)], + [(0, Antichain::from_elem(step_forward_summary))].into(), ); builder.set_notify(false); builder.build(move |_caps| { diff --git a/src/compute/src/sink/refresh.rs b/src/compute/src/sink/refresh.rs index 3cf29d5b3b932..47fdf6d52a3ec 100644 --- a/src/compute/src/sink/refresh.rs +++ b/src/compute/src/sink/refresh.rs @@ -15,7 +15,6 @@ use mz_repr::{Diff, Timestamp}; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; use timely::dataflow::Scope; -use timely::progress::Antichain; /// This is for REFRESH options on materialized views. It adds an operator that rounds up the /// timestamps of data and frontiers to the time of the next refresh. See @@ -36,7 +35,7 @@ where // time, we'll round it up to the next refresh time. let mut builder = OperatorBuilder::new("apply_refresh".to_string(), coll.scope()); let (mut output_buf, output_stream) = builder.new_output::>(); - let mut input = builder.new_input_connection(&coll.inner, Pipeline, vec![Antichain::new()]); + let mut input = builder.new_input_connection(&coll.inner, Pipeline, Default::default()); builder.build(move |capabilities| { // This capability directly controls this operator's output frontier (because we have // disconnected the input above). We wrap it in an Option so we can drop it to advance to diff --git a/src/storage-client/src/storage_collections.rs b/src/storage-client/src/storage_collections.rs index 65a10c4f2710f..5a1d13146403c 100644 --- a/src/storage-client/src/storage_collections.rs +++ b/src/storage-client/src/storage_collections.rs @@ -2476,29 +2476,33 @@ where fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy)>) { let mut collections = self.collections.lock().expect("lock poisoned"); - let user_capabilities = collections - .iter_mut() - .filter(|(id, _c)| id.is_user()) - .map(|(id, c)| { - let updates = c.read_capabilities.updates().cloned().collect_vec(); - (*id, c.implied_capability.clone(), updates) - }) - .collect_vec(); + if tracing::enabled!(tracing::Level::TRACE) { + let user_capabilities = collections + .iter_mut() + .filter(|(id, _c)| id.is_user()) + .map(|(id, c)| { + let updates = c.read_capabilities.updates().cloned().collect_vec(); + (*id, c.implied_capability.clone(), updates) + }) + .collect_vec(); - trace!(?policies, ?user_capabilities, "set_read_policies"); + trace!(?policies, ?user_capabilities, "set_read_policies"); + } self.set_read_policies_inner(&mut collections, policies); - let user_capabilities = collections - .iter_mut() - .filter(|(id, _c)| id.is_user()) - .map(|(id, c)| { - let updates = c.read_capabilities.updates().cloned().collect_vec(); - (*id, c.implied_capability.clone(), updates) - }) - .collect_vec(); + if tracing::enabled!(tracing::Level::TRACE) { + let user_capabilities = collections + .iter_mut() + .filter(|(id, _c)| id.is_user()) + .map(|(id, c)| { + let updates = c.read_capabilities.updates().cloned().collect_vec(); + (*id, c.implied_capability.clone(), updates) + }) + .collect_vec(); - trace!(?user_capabilities, "after! set_read_policies"); + trace!(?user_capabilities, "after! set_read_policies"); + } } fn acquire_read_holds( diff --git a/src/storage/src/source/source_reader_pipeline.rs b/src/storage/src/source/source_reader_pipeline.rs index fba3c478176a9..f639ed72a6cc0 100644 --- a/src/storage/src/source/source_reader_pipeline.rs +++ b/src/storage/src/source/source_reader_pipeline.rs @@ -335,15 +335,12 @@ where // `export_handles.len() + 1` outputs. for (id, export) in exports { // This output is not connected to any of the existing inputs. - let connection = vec![Antichain::new(); export_handles.len()]; let (export_output, new_export) = - builder.new_output_connection::>(connection); + builder.new_output_connection::>(Default::default()); // The input is not connected to any of the existing outputs. let outputs_count = export_handles.len() + 1; - let mut connection = vec![Antichain::new(); outputs_count]; - // Standard frontier implication for the corresponding output of this input. - connection.push(Antichain::from_elem(Default::default())); + let connection = [(outputs_count, Antichain::from_elem(Default::default()))].into(); let export_input = builder.new_input_connection(&export.inner, Pipeline, connection); export_handles.push((id, export_input, export_output)); let new_export: StackedCollection> = diff --git a/src/timely-util/src/builder_async.rs b/src/timely-util/src/builder_async.rs index 9d2d139858488..81bb4ab2dad4d 100644 --- a/src/timely-util/src/builder_async.rs +++ b/src/timely-util/src/builder_async.rs @@ -16,7 +16,7 @@ //! Types to build async operators with general shapes. use std::cell::{Cell, RefCell}; -use std::collections::VecDeque; +use std::collections::{BTreeMap, VecDeque}; use std::future::Future; use std::pin::Pin; use std::rc::Rc; @@ -340,7 +340,7 @@ pub trait InputConnection { type Capability; /// Generates a summary description of the connection behavior given the number of outputs. - fn describe(&self, outputs: usize) -> Vec>; + fn describe(&self, outputs: usize) -> BTreeMap>; /// Accepts an input capability. fn accept(&self, input_cap: InputCapability) -> Self::Capability; @@ -352,8 +352,8 @@ pub struct Disconnected; impl InputConnection for Disconnected { type Capability = T; - fn describe(&self, outputs: usize) -> Vec> { - vec![Antichain::new(); outputs] + fn describe(&self, _outputs: usize) -> BTreeMap> { + BTreeMap::default() } fn accept(&self, input_cap: InputCapability) -> Self::Capability { @@ -367,10 +367,8 @@ pub struct ConnectedToOne(usize); impl InputConnection for ConnectedToOne { type Capability = Capability; - fn describe(&self, outputs: usize) -> Vec> { - let mut summary = vec![Antichain::new(); outputs]; - summary[self.0] = Antichain::from_elem(T::Summary::default()); - summary + fn describe(&self, _outputs: usize) -> BTreeMap> { + [(self.0, Antichain::from_elem(T::Summary::default()))].into() } fn accept(&self, input_cap: InputCapability) -> Self::Capability { @@ -384,12 +382,11 @@ pub struct ConnectedToMany([usize; N]); impl InputConnection for ConnectedToMany { type Capability = [Capability; N]; - fn describe(&self, outputs: usize) -> Vec> { - let mut summary = vec![Antichain::new(); outputs]; - for output in self.0 { - summary[output] = Antichain::from_elem(T::Summary::default()); - } - summary + fn describe(&self, _outputs: usize) -> BTreeMap> { + self.0 + .iter() + .map(|output| (*output, Antichain::from_elem(T::Summary::default()))) + .collect() } fn accept(&self, input_cap: InputCapability) -> Self::Capability { @@ -533,8 +530,7 @@ impl OperatorBuilder { ) { let index = self.builder.shape().outputs(); - let connection = vec![Antichain::new(); self.builder.shape().inputs()]; - let (wrapper, stream) = self.builder.new_output_connection(connection); + let (wrapper, stream) = self.builder.new_output_connection(Default::default()); let handle = AsyncOutputHandle::new(wrapper, index);