Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timely connectivity changes #31947

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ incremental = true
# merged), after which point it becomes impossible to build that historical
# version of Materialize.
[patch.crates-io]
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" }
differential-dataflow = { git = "https://github.com/antiguru/differential-dataflow", branch = "timely_update" }
# Waiting on https://github.com/sfackler/rust-postgres/pull/752.
postgres = { git = "https://github.com/MaterializeInc/rust-postgres" }
tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres" }
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/render/continual_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ where
let mut input = builder.new_input_connection(
&self.inner,
Pipeline,
vec![Antichain::from_elem(step_forward_summary)],
[(0, Antichain::from_elem(step_forward_summary))].into(),
);
builder.set_notify(false);
builder.build(move |_caps| {
Expand Down
3 changes: 1 addition & 2 deletions src/compute/src/sink/refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use mz_repr::{Diff, Timestamp};
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::Scope;
use timely::progress::Antichain;

/// This is for REFRESH options on materialized views. It adds an operator that rounds up the
/// timestamps of data and frontiers to the time of the next refresh. See
Expand All @@ -36,7 +35,7 @@ where
// time, we'll round it up to the next refresh time.
let mut builder = OperatorBuilder::new("apply_refresh".to_string(), coll.scope());
let (mut output_buf, output_stream) = builder.new_output::<ConsolidatingContainerBuilder<_>>();
let mut input = builder.new_input_connection(&coll.inner, Pipeline, vec![Antichain::new()]);
let mut input = builder.new_input_connection(&coll.inner, Pipeline, Default::default());
builder.build(move |capabilities| {
// This capability directly controls this operator's output frontier (because we have
// disconnected the input above). We wrap it in an Option so we can drop it to advance to
Expand Down
40 changes: 22 additions & 18 deletions src/storage-client/src/storage_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2476,29 +2476,33 @@ where
fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy<Self::Timestamp>)>) {
let mut collections = self.collections.lock().expect("lock poisoned");

let user_capabilities = collections
.iter_mut()
.filter(|(id, _c)| id.is_user())
.map(|(id, c)| {
let updates = c.read_capabilities.updates().cloned().collect_vec();
(*id, c.implied_capability.clone(), updates)
})
.collect_vec();
if tracing::enabled!(tracing::Level::TRACE) {
let user_capabilities = collections
.iter_mut()
.filter(|(id, _c)| id.is_user())
.map(|(id, c)| {
let updates = c.read_capabilities.updates().cloned().collect_vec();
(*id, c.implied_capability.clone(), updates)
})
.collect_vec();

trace!(?policies, ?user_capabilities, "set_read_policies");
trace!(?policies, ?user_capabilities, "set_read_policies");
}

self.set_read_policies_inner(&mut collections, policies);

let user_capabilities = collections
.iter_mut()
.filter(|(id, _c)| id.is_user())
.map(|(id, c)| {
let updates = c.read_capabilities.updates().cloned().collect_vec();
(*id, c.implied_capability.clone(), updates)
})
.collect_vec();
if tracing::enabled!(tracing::Level::TRACE) {
let user_capabilities = collections
.iter_mut()
.filter(|(id, _c)| id.is_user())
.map(|(id, c)| {
let updates = c.read_capabilities.updates().cloned().collect_vec();
(*id, c.implied_capability.clone(), updates)
})
.collect_vec();

trace!(?user_capabilities, "after! set_read_policies");
trace!(?user_capabilities, "after! set_read_policies");
}
}

fn acquire_read_holds(
Expand Down
7 changes: 2 additions & 5 deletions src/storage/src/source/source_reader_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,15 +335,12 @@ where
// `export_handles.len() + 1` outputs.
for (id, export) in exports {
// This output is not connected to any of the existing inputs.
let connection = vec![Antichain::new(); export_handles.len()];
let (export_output, new_export) =
builder.new_output_connection::<CapacityContainerBuilder<_>>(connection);
builder.new_output_connection::<CapacityContainerBuilder<_>>(Default::default());

// The input is not connected to any of the existing outputs.
let outputs_count = export_handles.len() + 1;
let mut connection = vec![Antichain::new(); outputs_count];
// Standard frontier implication for the corresponding output of this input.
connection.push(Antichain::from_elem(Default::default()));
let connection = [(outputs_count, Antichain::from_elem(Default::default()))].into();
let export_input = builder.new_input_connection(&export.inner, Pipeline, connection);
export_handles.push((id, export_input, export_output));
let new_export: StackedCollection<G, Result<SourceMessage, DataflowError>> =
Expand Down
28 changes: 12 additions & 16 deletions src/timely-util/src/builder_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//! Types to build async operators with general shapes.

use std::cell::{Cell, RefCell};
use std::collections::VecDeque;
use std::collections::{BTreeMap, VecDeque};
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
Expand Down Expand Up @@ -340,7 +340,7 @@ pub trait InputConnection<T: Timestamp> {
type Capability;

/// Generates a summary description of the connection behavior given the number of outputs.
fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>>;
fn describe(&self, outputs: usize) -> BTreeMap<usize, Antichain<T::Summary>>;

/// Accepts an input capability.
fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability;
Expand All @@ -352,8 +352,8 @@ pub struct Disconnected;
impl<T: Timestamp> InputConnection<T> for Disconnected {
type Capability = T;

fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
vec![Antichain::new(); outputs]
fn describe(&self, _outputs: usize) -> BTreeMap<usize, Antichain<T::Summary>> {
BTreeMap::default()
}

fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
Expand All @@ -367,10 +367,8 @@ pub struct ConnectedToOne(usize);
impl<T: Timestamp> InputConnection<T> for ConnectedToOne {
type Capability = Capability<T>;

fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
let mut summary = vec![Antichain::new(); outputs];
summary[self.0] = Antichain::from_elem(T::Summary::default());
summary
fn describe(&self, _outputs: usize) -> BTreeMap<usize, Antichain<T::Summary>> {
[(self.0, Antichain::from_elem(T::Summary::default()))].into()
}

fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
Expand All @@ -384,12 +382,11 @@ pub struct ConnectedToMany<const N: usize>([usize; N]);
impl<const N: usize, T: Timestamp> InputConnection<T> for ConnectedToMany<N> {
type Capability = [Capability<T>; N];

fn describe(&self, outputs: usize) -> Vec<Antichain<T::Summary>> {
let mut summary = vec![Antichain::new(); outputs];
for output in self.0 {
summary[output] = Antichain::from_elem(T::Summary::default());
}
summary
fn describe(&self, _outputs: usize) -> BTreeMap<usize, Antichain<T::Summary>> {
self.0
.iter()
.map(|output| (*output, Antichain::from_elem(T::Summary::default())))
.collect()
}

fn accept(&self, input_cap: InputCapability<T>) -> Self::Capability {
Expand Down Expand Up @@ -533,8 +530,7 @@ impl<G: Scope> OperatorBuilder<G> {
) {
let index = self.builder.shape().outputs();

let connection = vec![Antichain::new(); self.builder.shape().inputs()];
let (wrapper, stream) = self.builder.new_output_connection(connection);
let (wrapper, stream) = self.builder.new_output_connection(Default::default());

let handle = AsyncOutputHandle::new(wrapper, index);

Expand Down