Skip to content

[dnm] allocation error debugging #32137

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion ci/release-qualification/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,63 @@ steps:
composition: feature-benchmark
args: [--other-tag=common-ancestor, --scale=+1]

- id: long-parallel-benchmark
- id: long-parallel-benchmark-1
label: "Long Parallel Benchmark"
depends_on: build-x86_64
timeout_in_minutes: 1200
agents:
queue: hetzner-x86-64-dedi-8cpu-32gb
plugins:
- ./ci/plugins/mzcompose:
composition: parallel-benchmark
args:
- --other-tag
- common-ancestor
- --load-phase-duration
- 1200
- id: long-parallel-benchmark-2
label: "Long Parallel Benchmark"
depends_on: build-x86_64
timeout_in_minutes: 1200
agents:
queue: hetzner-x86-64-dedi-8cpu-32gb
plugins:
- ./ci/plugins/mzcompose:
composition: parallel-benchmark
args:
- --other-tag
- common-ancestor
- --load-phase-duration
- 1200
- id: long-parallel-benchmark-3
label: "Long Parallel Benchmark"
depends_on: build-x86_64
timeout_in_minutes: 1200
agents:
queue: hetzner-x86-64-dedi-8cpu-32gb
plugins:
- ./ci/plugins/mzcompose:
composition: parallel-benchmark
args:
- --other-tag
- common-ancestor
- --load-phase-duration
- 1200
- id: long-parallel-benchmark-4
label: "Long Parallel Benchmark"
depends_on: build-x86_64
timeout_in_minutes: 1200
agents:
queue: hetzner-x86-64-dedi-8cpu-32gb
plugins:
- ./ci/plugins/mzcompose:
composition: parallel-benchmark
args:
- --other-tag
- common-ancestor
- --load-phase-duration
- 1200
- id: long-parallel-benchmark-5
label: "Long Parallel Benchmark"
depends_on: build-x86_64
timeout_in_minutes: 1200
Expand Down
3 changes: 2 additions & 1 deletion misc/python/materialize/parallel_benchmark/framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,9 @@ def teardown(self) -> None:
for i in range(len(self.thread_pool)):
# Indicate to every thread to stop working
self.jobs.put(None)

for thread in self.thread_pool:
thread.join()
thread.join(timeout=3600)
self.jobs.join()


Expand Down
1 change: 1 addition & 0 deletions misc/python/materialize/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def connect(self) -> psycopg.Connection:
password=self.password,
dbname=self.database,
sslmode="require" if self.ssl else None,
connect_timeout=600,
)
if self.autocommit:
conn.autocommit = True
Expand Down
23 changes: 22 additions & 1 deletion src/alloc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use mz_ore::metrics::MetricsRegistry;

#[cfg(all(feature = "jemalloc", not(miri)))]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
static ALLOC: custom_alloc::Allocator = custom_alloc::Allocator;

/// Registers metrics for the global allocator into the provided registry.
///
Expand All @@ -33,3 +33,24 @@ pub async fn register_metrics_into(_: &MetricsRegistry) {
pub async fn register_metrics_into(registry: &MetricsRegistry) {
mz_prof::jemalloc::JemallocMetrics::register_into(registry).await;
}

#[cfg(all(feature = "jemalloc", not(miri)))]
mod custom_alloc {
use std::alloc::{GlobalAlloc, Layout};
use tikv_jemallocator::Jemalloc;

pub struct Allocator;

unsafe impl GlobalAlloc for Allocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
if layout.size() > 1 << 40 {
panic!("attempt to allocate {} bytes", layout.size());
}
Jemalloc.alloc(layout)
}

unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
Jemalloc.dealloc(ptr, layout);
}
}
}
137 changes: 126 additions & 11 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ use std::collections::{BTreeMap, BTreeSet};
use std::num::NonZeroUsize;
use std::ops::DerefMut;
use std::rc::Rc;
use std::sync::{mpsc, Arc};
use std::sync::{Arc, mpsc};
use std::time::{Duration, Instant};

use bytesize::ByteSize;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::trace::{Cursor, TraceReader};
use differential_dataflow::Hashable;
use differential_dataflow::IntoOwned;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::trace::{Cursor, TraceReader};
use mz_compute_client::logging::LoggingConfig;
use mz_compute_client::protocol::command::{
ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
Expand All @@ -30,30 +30,30 @@ use mz_compute_client::protocol::response::{
StatusResponse, SubscribeResponse,
};
use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::plan::render_plan::RenderPlan;
use mz_compute_types::plan::LirId;
use mz_compute_types::plan::render_plan::RenderPlan;
use mz_dyncfg::ConfigSet;
use mz_expr::row::RowCollection;
use mz_expr::SafeMfpPlan;
use mz_expr::row::RowCollection;
use mz_ore::cast::CastFrom;
use mz_ore::collections::CollectionExt;
use mz_ore::metrics::UIntGauge;
use mz_ore::now::EpochMillis;
use mz_ore::task::AbortOnDropHandle;
use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
use mz_persist_client::Diagnostics;
use mz_persist_client::cache::PersistClientCache;
use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
use mz_persist_client::read::ReadHandle;
use mz_persist_client::Diagnostics;
use mz_persist_types::codec_impls::UnitSchema;
use mz_repr::fixed_length::ToDatumIter;
use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
use mz_storage_operators::stats::StatsCursor;
use mz_storage_types::StorageDiff;
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
use mz_storage_types::sources::SourceData;
use mz_storage_types::time_dependence::TimeDependence;
use mz_storage_types::StorageDiff;
use mz_txn_wal::operator::TxnsContext;
use mz_txn_wal::txn_cache::TxnsCache;
use timely::communication::Allocate;
Expand All @@ -63,7 +63,7 @@ use timely::progress::frontier::Antichain;
use timely::scheduling::Scheduler;
use timely::worker::Worker as TimelyWorker;
use tokio::sync::{oneshot, watch};
use tracing::{debug, error, info, span, warn, Level};
use tracing::{Level, debug, error, info, span, warn};
use uuid::Uuid;

use crate::arrangement::manager::{TraceBundle, TraceManager};
Expand Down Expand Up @@ -180,6 +180,8 @@ pub struct ComputeState {
/// replica can drop diffs associated with timestamps beyond the replica expiration.
/// The replica will panic if such dataflows are not dropped before the replica has expired.
pub replica_expiration: Antichain<Timestamp>,

last_command: Option<ComputeCommand>,
}

impl ComputeState {
Expand Down Expand Up @@ -224,6 +226,7 @@ impl ComputeState {
server_maintenance_interval: Duration::ZERO,
init_system_time: mz_ore::now::SYSTEM_TIME(),
replica_expiration: Antichain::default(),
last_command: None,
}
}

Expand Down Expand Up @@ -394,7 +397,118 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
pub fn handle_compute_command(&mut self, cmd: ComputeCommand) {
use ComputeCommand::*;

self.compute_state.command_history.push(cmd.clone());
let cmd2 = mz_ore::panic::catch_unwind_str(|| cmd.clone()).unwrap_or_else(|panic| {
error!("allocation error cloning compute command: {panic}");

let bytes = unsafe {
let ptr = &cmd as *const _ as *const u8;
std::slice::from_raw_parts(ptr, std::mem::size_of_val(&cmd))
};
error!(" cmd[raw]={bytes:02x?}");

if let Some(last) = self.compute_state.last_command.take() {
error!(" last={last:?}");
let bytes = unsafe {
let ptr = &last as *const _ as *const u8;
std::slice::from_raw_parts(ptr, std::mem::size_of_val(&last))
};
error!(" last[raw]={bytes:02x?}");
}

match &cmd {
CreateTimely { config, epoch } => {
error!(" type=CreateTimely");
error!(" config={config:?}");
error!(" epoch={epoch:?}");
}
CreateInstance(instance_config) => {
error!(" type=CreateInstance");
error!(" instance_config={instance_config:?}");
}
InitializationComplete => {
error!(" type=InitializationComplete");
}
AllowWrites => {
error!(" type=AllowWrites");
}
UpdateConfiguration(compute_parameters) => {
error!(" type=UpdateConfiguration");
let ComputeParameters {
workload_class,
max_result_size,
tracing,
grpc_client,
dyncfg_updates,
} = compute_parameters;
error!(" workload_class={workload_class:?}");
error!(" max_result_size={max_result_size:?}");
error!(" tracing={tracing:?}");
error!(" grpc_client={grpc_client:?}");
error!(" dyncfg_updates={dyncfg_updates:?}");
}
CreateDataflow(dataflow_description) => {
error!(" type=CreateDataflow");
let DataflowDescription {
source_imports,
index_imports,
objects_to_build,
index_exports,
sink_exports,
as_of,
until,
initial_storage_as_of,
refresh_schedule,
debug_name,
time_dependence,
} = dataflow_description;
error!(" source_imports={source_imports:?}");
error!(" index_imports={index_imports:?}");
error!(" objects_to_build={objects_to_build:?}");
error!(" index_exports={index_exports:?}");
error!(" sink_exports={sink_exports:?}");
error!(" as_of={as_of:?}");
error!(" until={until:?}");
error!(" initial_storage_as_of={initial_storage_as_of:?}");
error!(" refresh_schedule={refresh_schedule:?}");
error!(" debug_name={debug_name:?}");
error!(" time_dependence={time_dependence:?}");
}
Schedule(global_id) => {
error!(" type=Schedule");
error!(" global_id={global_id:?}")
}
AllowCompaction { id, frontier } => {
error!(" type=AllowCompaction");
error!(" id={id:?}");
error!(" frontier={frontier:?}");
}
Peek(peek) => {
error!(" type=Peek");
let mz_compute_client::protocol::command::Peek {
target,
literal_constraints,
uuid,
timestamp,
finishing,
map_filter_project,
otel_ctx,
} = peek;
error!(" target={target:?}");
error!(" literal_constraints={literal_constraints:?}");
error!(" uuid={uuid:?}");
error!(" timestamp={timestamp:?}");
error!(" finishing={finishing:?}");
error!(" map_filter_project={map_filter_project:?}");
error!(" otel_ctx={otel_ctx:?}");
}
CancelPeek { uuid } => {
error!(" type=CancelPeek");
error!(" uuid={uuid:?}");
}
}
panic!("abort");
});
self.compute_state.command_history.push(cmd2);

// Record the command duration, per worker and command kind.
let timer = self
Expand All @@ -404,6 +518,8 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
.for_command(&cmd)
.start_timer();

self.compute_state.last_command = Some(cmd.clone());

match cmd {
CreateTimely { .. } => panic!("CreateTimely must be captured before"),
CreateInstance(instance_config) => self.handle_create_instance(instance_config),
Expand Down Expand Up @@ -1500,8 +1616,7 @@ impl IndexPeek {
let copies: usize = if copies.is_negative() {
return Err(format!(
"Invalid data in source, saw retractions ({}) for row that does not exist: {:?}",
-copies,
&*borrow,
-copies, &*borrow,
));
} else {
copies.into_inner().try_into().unwrap()
Expand Down