Skip to content

Commit

Permalink
storage: round probe timestamps to the probe interval
Browse files Browse the repository at this point in the history
This commit ensures that probe timestamps, and therefore timestamps used
for minting new bindings, are rounded down to the probe interval
(typically 1s). This is to reduce the amount of distinct timestamps
flowing through a dataflow joining multiple sources. Each distinct
timestamp induces some amount of overhead, so forcing the timestamps of
individual sources to the same values is more efficient.
  • Loading branch information
teskje committed Jan 16, 2025
1 parent 503d77b commit 1f74863
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 92 deletions.
1 change: 1 addition & 0 deletions src/storage/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub mod generator;
mod kafka;
mod mysql;
mod postgres;
mod probe;
pub(crate) mod reclock;
mod source_reader_pipeline;
mod statistics;
Expand Down
146 changes: 70 additions & 76 deletions src/storage/src/source/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use std::collections::BTreeMap;
use std::convert::Infallible;
use std::str::{self};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

use anyhow::anyhow;
Expand Down Expand Up @@ -69,7 +68,7 @@ use crate::metrics::source::kafka::KafkaSourceMetrics;
use crate::source::types::{
Probe, ProgressStatisticsUpdate, SignaledFuture, SourceRender, StackedCollection,
};
use crate::source::{RawSourceCreationConfig, SourceMessage};
use crate::source::{probe, RawSourceCreationConfig, SourceMessage};

#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
struct HealthStatus {
Expand Down Expand Up @@ -118,7 +117,7 @@ pub struct KafkaSourceReader {
/// Channel to receive Kafka statistics JSON blobs from the stats callback.
stats_rx: crossbeam_channel::Receiver<Jsonb>,
/// Progress statistics as collected from the `resume_uppers` stream and the partition metadata
/// thread.
/// task.
progress_statistics: Arc<Mutex<PartialProgressStatistics>>,
/// A handle to the partition specific metrics
partition_metrics: KafkaSourceMetrics,
Expand All @@ -127,7 +126,7 @@ pub struct KafkaSourceReader {
}

/// A partially-filled version of `ProgressStatisticsUpdate`. This allows us to
/// only emit updates when `offset_known` is updated by the metadata thread.
/// only emit updates when `offset_known` is updated by the metadata task.
#[derive(Default)]
struct PartialProgressStatistics {
offset_known: Option<u64>,
Expand Down Expand Up @@ -890,7 +889,7 @@ fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
}
}

// If we have a new `offset_known` from the partition metadata thread, and
// If we have a new `offset_known` from the partition metadata task, and
// `committed` from reading the `resume_uppers` stream, we can emit a
// progress stats update.
let mut stats =
Expand Down Expand Up @@ -1617,7 +1616,7 @@ fn render_metadata_fetcher<G: Scope<Timestamp = KafkaTimestamp>>(
);

let (tx, mut rx) = mpsc::unbounded_channel();
spawn_metadata_thread(config, consumer, topic, poll_interval, tx);
spawn_metadata_task(config, consumer, topic, poll_interval, tx);

let mut prev_upstream_frontier = resume_upper;

Expand Down Expand Up @@ -1661,92 +1660,87 @@ fn render_metadata_fetcher<G: Scope<Timestamp = KafkaTimestamp>>(
(metadata_stream, probe_stream, button.press_on_drop())
}

fn spawn_metadata_thread<C: ConsumerContext>(
fn spawn_metadata_task<C: ConsumerContext>(
config: RawSourceCreationConfig,
consumer: BaseConsumer<TunnelingClientContext<C>>,
topic: String,
poll_interval: Duration,
tx: mpsc::UnboundedSender<(mz_repr::Timestamp, MetadataUpdate)>,
) {
thread::Builder::new()
.name(format!("kafka-metadata-{}", config.id))
.spawn(move || {
mz_ore::task::spawn(|| format!("kafka-metadata-{}", config.id), async move {
trace!(
source_id = config.id.to_string(),
worker_id = config.worker_id,
num_workers = config.worker_count,
poll_interval =? poll_interval,
"kafka metadata task: starting..."
);

let mut ticker = probe::Ticker::new(poll_interval, config.now_fn);
loop {
let probe_ts = ticker.tick().await;
let result = fetch_partition_info(
&consumer,
&topic,
config
.config
.parameters
.kafka_timeout_config
.fetch_metadata_timeout,
);
trace!(
source_id = config.id.to_string(),
worker_id = config.worker_id,
num_workers = config.worker_count,
poll_interval =? poll_interval,
"kafka metadata thread: starting..."
"kafka metadata task: metadata fetch result: {:?}",
result
);
loop {
let probe_ts = (config.now_fn)().into();
let result = fetch_partition_info(
&consumer,
&topic,
config
.config
.parameters
.kafka_timeout_config
.fetch_metadata_timeout,
);
trace!(
source_id = config.id.to_string(),
worker_id = config.worker_id,
num_workers = config.worker_count,
"kafka metadata thread: metadata fetch result: {:?}",
result
);
let update = match result {
Ok(partitions) => {
trace!(
source_id = config.id.to_string(),
worker_id = config.worker_id,
num_workers = config.worker_count,
"kafka metadata thread: fetched partition metadata info",
);
let update = match result {
Ok(partitions) => {
trace!(
source_id = config.id.to_string(),
worker_id = config.worker_id,
num_workers = config.worker_count,
"kafka metadata task: fetched partition metadata info",
);

MetadataUpdate::Partitions(partitions)
}
Err(GetPartitionsError::TopicDoesNotExist) => {
let error = SourceError {
error: SourceErrorDetails::Other("topic was deleted".into()),
};
MetadataUpdate::DefiniteError(error)
}
Err(e) => {
let kafka_status = Some(HealthStatusUpdate::stalled(
format!("{}", e.display_with_causes()),
None,
));

let ssh_status = consumer.client().context().tunnel_status();
let ssh_status = match ssh_status {
SshTunnelStatus::Running => Some(HealthStatusUpdate::running()),
SshTunnelStatus::Errored(e) => {
Some(HealthStatusUpdate::stalled(e, None))
}
};
MetadataUpdate::Partitions(partitions)
}
Err(GetPartitionsError::TopicDoesNotExist) => {
let error = SourceError {
error: SourceErrorDetails::Other("topic was deleted".into()),
};
MetadataUpdate::DefiniteError(error)
}
Err(e) => {
let kafka_status = Some(HealthStatusUpdate::stalled(
format!("{}", e.display_with_causes()),
None,
));

MetadataUpdate::TransientError(HealthStatus {
kafka: kafka_status,
ssh: ssh_status,
})
}
};
let ssh_status = consumer.client().context().tunnel_status();
let ssh_status = match ssh_status {
SshTunnelStatus::Running => Some(HealthStatusUpdate::running()),
SshTunnelStatus::Errored(e) => Some(HealthStatusUpdate::stalled(e, None)),
};

if tx.send((probe_ts, update)).is_err() {
break;
MetadataUpdate::TransientError(HealthStatus {
kafka: kafka_status,
ssh: ssh_status,
})
}
};

thread::park_timeout(poll_interval);
if tx.send((probe_ts.into(), update)).is_err() {
break;
}
}

info!(
source_id = config.id.to_string(),
worker_id = config.worker_id,
num_workers = config.worker_count,
"kafka metadata thread: receiver has gone away; shutting down."
)
})
.unwrap();
info!(
source_id = config.id.to_string(),
worker_id = config.worker_id,
num_workers = config.worker_count,
"kafka metadata task: receiver has gone away; shutting down."
)
});
}
15 changes: 6 additions & 9 deletions src/storage/src/source/mysql/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use mz_storage_types::sources::MySqlSourceConnection;
use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};

use crate::source::types::{Probe, ProgressStatisticsUpdate};
use crate::source::RawSourceCreationConfig;
use crate::source::{probe, RawSourceCreationConfig};

use super::{ReplicationError, TransientError};

Expand Down Expand Up @@ -89,16 +89,13 @@ pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
let prev_offset_committed = Cell::new(None);
let stats_output = RefCell::new(stats_output);

let mut interval = tokio::time::interval(
mz_storage_types::dyncfgs::MYSQL_OFFSET_KNOWN_INTERVAL
.get(config.config.config_set()),
);
let probe_interval = mz_storage_types::dyncfgs::MYSQL_OFFSET_KNOWN_INTERVAL
.get(config.config.config_set());
let mut probe_ticker = probe::Ticker::new(probe_interval, config.now_fn);
let probe_loop = async {
loop {
interval.tick().await;
let probe_ts = probe_ticker.tick().await;

let probe_ts =
mz_repr::Timestamp::try_from((config.now_fn)()).expect("must fit");
let gtid_executed =
query_sys_var(&mut stats_conn, "global.gtid_executed").await?;
// We don't translate this into a definite error like in snapshotting, but we
Expand All @@ -121,7 +118,7 @@ pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
probe_output.give(
&probe_cap[0],
Probe {
probe_ts,
probe_ts: probe_ts.into(),
upstream_frontier,
},
);
Expand Down
12 changes: 5 additions & 7 deletions src/storage/src/source/postgres/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ use tracing::{error, trace};
use crate::metrics::source::postgres::PgSourceMetrics;
use crate::source::postgres::verify_schema;
use crate::source::postgres::{DefiniteError, ReplicationError, SourceOutputInfo, TransientError};
use crate::source::probe;
use crate::source::types::{
Probe, ProgressStatisticsUpdate, SignaledFuture, SourceMessage, StackedCollection,
};
Expand Down Expand Up @@ -757,20 +758,17 @@ async fn raw_stream<'a>(
);

let (probe_tx, mut probe_rx) = watch::channel(None);
let offset_known_interval =
let probe_interval =
mz_storage_types::dyncfgs::PG_OFFSET_KNOWN_INTERVAL.get(config.config.config_set());
let now_fn = config.now_fn.clone();
let mut probe_ticker = probe::Ticker::new(probe_interval, config.now_fn.clone());
let max_lsn_task_handle =
mz_ore::task::spawn(|| format!("pg_current_wal_lsn:{}", config.id), async move {
let mut interval = tokio::time::interval(offset_known_interval);

while !probe_tx.is_closed() {
interval.tick().await;
let probe_ts = mz_repr::Timestamp::try_from((now_fn)()).expect("must fit");
let probe_ts = probe_ticker.tick().await;
let probe_or_err = super::fetch_max_lsn(&*metadata_client)
.await
.map(|lsn| Probe {
probe_ts,
probe_ts: probe_ts.into(),
upstream_frontier: Antichain::from_elem(lsn),
});
let _ = probe_tx.send(Some(probe_or_err));
Expand Down
68 changes: 68 additions & 0 deletions src/storage/src/source/probe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Support for sending frontier probes to upstream systems.
use std::time::Duration;

use mz_ore::now::{EpochMillis, NowFn};
use tracing::trace;

/// A ticker to drive source upstream probing.
///
/// This type works similar to [`tokio::time::Interval`] but returns `EpochMillis` timestamps from
/// its [`Ticker::tick`] method that can be used as probe timestamps. These timestamps are rounded
/// down to the nearest multiple of the tick interval, to reduce the amount of unique timestamps
/// emitted by sources, thereby reducing churn in downstream dataflows.
pub(super) struct Ticker {
interval: EpochMillis,
now: NowFn,
last_tick: Option<EpochMillis>,
}

impl Ticker {
pub fn new(interval: Duration, now: NowFn) -> Self {
let interval = interval.as_millis().try_into().unwrap();
Self {
interval,
now,
last_tick: None,
}
}

/// Wait until it is time for the next probe, returning a suitable probe timestamp.
///
/// This method tries to resolve as close as possible to the returned probe timestamp, though
/// it is not guaranteed to always succeed. If a tick is missed, it is skipped entirely.
pub async fn tick(&mut self) -> EpochMillis {
let mut now = (self.now)();

let target = match self.last_tick {
Some(ms) => ms + self.interval,
None => now,
};
let target = self.round_to_interval(target);

while now < target {
let wait = Duration::from_millis(target - now);
tokio::time::sleep(wait).await;
now = (self.now)();
}

trace!(target, now, "probe ticker skew: {}ms", now - target);

let this_tick = self.round_to_interval(now);
self.last_tick = Some(this_tick);
this_tick
}

fn round_to_interval(&self, ms: EpochMillis) -> EpochMillis {
ms - (ms % self.interval)
}
}
Loading

0 comments on commit 1f74863

Please sign in to comment.