@@ -12,14 +12,14 @@ use std::collections::{BTreeMap, BTreeSet};
12
12
use std:: num:: NonZeroUsize ;
13
13
use std:: ops:: DerefMut ;
14
14
use std:: rc:: Rc ;
15
- use std:: sync:: { mpsc , Arc } ;
15
+ use std:: sync:: { Arc , mpsc } ;
16
16
use std:: time:: { Duration , Instant } ;
17
17
18
18
use bytesize:: ByteSize ;
19
- use differential_dataflow:: lattice:: Lattice ;
20
- use differential_dataflow:: trace:: { Cursor , TraceReader } ;
21
19
use differential_dataflow:: Hashable ;
22
20
use differential_dataflow:: IntoOwned ;
21
+ use differential_dataflow:: lattice:: Lattice ;
22
+ use differential_dataflow:: trace:: { Cursor , TraceReader } ;
23
23
use mz_compute_client:: logging:: LoggingConfig ;
24
24
use mz_compute_client:: protocol:: command:: {
25
25
ComputeCommand , ComputeParameters , InstanceConfig , Peek , PeekTarget ,
@@ -30,30 +30,30 @@ use mz_compute_client::protocol::response::{
30
30
StatusResponse , SubscribeResponse ,
31
31
} ;
32
32
use mz_compute_types:: dataflows:: DataflowDescription ;
33
- use mz_compute_types:: plan:: render_plan:: RenderPlan ;
34
33
use mz_compute_types:: plan:: LirId ;
34
+ use mz_compute_types:: plan:: render_plan:: RenderPlan ;
35
35
use mz_dyncfg:: ConfigSet ;
36
- use mz_expr:: row:: RowCollection ;
37
36
use mz_expr:: SafeMfpPlan ;
37
+ use mz_expr:: row:: RowCollection ;
38
38
use mz_ore:: cast:: CastFrom ;
39
39
use mz_ore:: collections:: CollectionExt ;
40
40
use mz_ore:: metrics:: UIntGauge ;
41
41
use mz_ore:: now:: EpochMillis ;
42
42
use mz_ore:: task:: AbortOnDropHandle ;
43
43
use mz_ore:: tracing:: { OpenTelemetryContext , TracingHandle } ;
44
+ use mz_persist_client:: Diagnostics ;
44
45
use mz_persist_client:: cache:: PersistClientCache ;
45
46
use mz_persist_client:: cfg:: USE_CRITICAL_SINCE_SNAPSHOT ;
46
47
use mz_persist_client:: read:: ReadHandle ;
47
- use mz_persist_client:: Diagnostics ;
48
48
use mz_persist_types:: codec_impls:: UnitSchema ;
49
49
use mz_repr:: fixed_length:: ToDatumIter ;
50
50
use mz_repr:: { DatumVec , Diff , GlobalId , Row , RowArena , Timestamp } ;
51
51
use mz_storage_operators:: stats:: StatsCursor ;
52
+ use mz_storage_types:: StorageDiff ;
52
53
use mz_storage_types:: controller:: CollectionMetadata ;
53
54
use mz_storage_types:: dyncfgs:: ORE_OVERFLOWING_BEHAVIOR ;
54
55
use mz_storage_types:: sources:: SourceData ;
55
56
use mz_storage_types:: time_dependence:: TimeDependence ;
56
- use mz_storage_types:: StorageDiff ;
57
57
use mz_txn_wal:: operator:: TxnsContext ;
58
58
use mz_txn_wal:: txn_cache:: TxnsCache ;
59
59
use timely:: communication:: Allocate ;
@@ -63,7 +63,7 @@ use timely::progress::frontier::Antichain;
63
63
use timely:: scheduling:: Scheduler ;
64
64
use timely:: worker:: Worker as TimelyWorker ;
65
65
use tokio:: sync:: { oneshot, watch} ;
66
- use tracing:: { debug, error, info, span, warn, Level } ;
66
+ use tracing:: { Level , debug, error, info, span, warn} ;
67
67
use uuid:: Uuid ;
68
68
69
69
use crate :: arrangement:: manager:: { TraceBundle , TraceManager } ;
@@ -394,7 +394,102 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
394
394
pub fn handle_compute_command ( & mut self , cmd : ComputeCommand ) {
395
395
use ComputeCommand :: * ;
396
396
397
- self . compute_state . command_history . push ( cmd. clone ( ) ) ;
397
+ let cmd2 = mz_ore:: panic:: catch_unwind_str ( || cmd. clone ( ) ) . unwrap_or_else ( |panic| {
398
+ error ! ( "allocation error cloning compute command: {panic}" ) ;
399
+ match cmd {
400
+ CreateTimely { config, epoch } => {
401
+ error ! ( " type=CreateTimely" ) ;
402
+ error ! ( " config={config:?}" ) ;
403
+ error ! ( " epoch={epoch:?}" ) ;
404
+ }
405
+ CreateInstance ( instance_config) => {
406
+ error ! ( " type=CreateInstance" ) ;
407
+ error ! ( " instance_config={instance_config:?}" ) ;
408
+ }
409
+ InitializationComplete => {
410
+ error ! ( " type=InitializationComplete" ) ;
411
+ }
412
+ AllowWrites => {
413
+ error ! ( " type=AllowWrites" ) ;
414
+ }
415
+ UpdateConfiguration ( compute_parameters) => {
416
+ error ! ( " type=UpdateConfiguration" ) ;
417
+ let ComputeParameters {
418
+ workload_class,
419
+ max_result_size,
420
+ tracing,
421
+ grpc_client,
422
+ dyncfg_updates,
423
+ } = compute_parameters;
424
+ error ! ( " workload_class={workload_class:?}" ) ;
425
+ error ! ( " max_result_size={max_result_size:?}" ) ;
426
+ error ! ( " tracing={tracing:?}" ) ;
427
+ error ! ( " grpc_client={grpc_client:?}" ) ;
428
+ error ! ( " dyncfg_updates={dyncfg_updates:?}" ) ;
429
+ }
430
+ CreateDataflow ( dataflow_description) => {
431
+ error ! ( " type=CreateDataflow" ) ;
432
+ let DataflowDescription {
433
+ source_imports,
434
+ index_imports,
435
+ objects_to_build,
436
+ index_exports,
437
+ sink_exports,
438
+ as_of,
439
+ until,
440
+ initial_storage_as_of,
441
+ refresh_schedule,
442
+ debug_name,
443
+ time_dependence,
444
+ } = dataflow_description;
445
+ error ! ( " source_imports={source_imports:?}" ) ;
446
+ error ! ( " index_imports={index_imports:?}" ) ;
447
+ error ! ( " objects_to_build={objects_to_build:?}" ) ;
448
+ error ! ( " index_exports={index_exports:?}" ) ;
449
+ error ! ( " sink_exports={sink_exports:?}" ) ;
450
+ error ! ( " as_of={as_of:?}" ) ;
451
+ error ! ( " until={until:?}" ) ;
452
+ error ! ( " initial_storage_as_of={initial_storage_as_of:?}" ) ;
453
+ error ! ( " refresh_schedule={refresh_schedule:?}" ) ;
454
+ error ! ( " debug_name={debug_name:?}" ) ;
455
+ error ! ( " time_dependence={time_dependence:?}" ) ;
456
+ }
457
+ Schedule ( global_id) => {
458
+ error ! ( " type=Schedule" ) ;
459
+ error ! ( " global_id={global_id:?}" )
460
+ }
461
+ AllowCompaction { id, frontier } => {
462
+ error ! ( " type=AllowCompaction" ) ;
463
+ error ! ( " id={id:?}" ) ;
464
+ error ! ( " frontier={frontier:?}" ) ;
465
+ }
466
+ Peek ( peek) => {
467
+ error ! ( " type=Peek" ) ;
468
+ let mz_compute_client:: protocol:: command:: Peek {
469
+ target,
470
+ literal_constraints,
471
+ uuid,
472
+ timestamp,
473
+ finishing,
474
+ map_filter_project,
475
+ otel_ctx,
476
+ } = peek;
477
+ error ! ( " target={target:?}" ) ;
478
+ error ! ( " literal_constraints={literal_constraints:?}" ) ;
479
+ error ! ( " uuid={uuid:?}" ) ;
480
+ error ! ( " timestamp={timestamp:?}" ) ;
481
+ error ! ( " finishing={finishing:?}" ) ;
482
+ error ! ( " map_filter_project={map_filter_project:?}" ) ;
483
+ error ! ( " otel_ctx={otel_ctx:?}" ) ;
484
+ }
485
+ CancelPeek { uuid } => {
486
+ error ! ( " type=CancelPeek" ) ;
487
+ error ! ( " uuid={uuid:?}" ) ;
488
+ }
489
+ }
490
+ panic ! ( "abort" ) ;
491
+ } ) ;
492
+ self . compute_state . command_history . push ( cmd2) ;
398
493
399
494
// Record the command duration, per worker and command kind.
400
495
let timer = self
@@ -1500,8 +1595,7 @@ impl IndexPeek {
1500
1595
let copies: usize = if copies. is_negative ( ) {
1501
1596
return Err ( format ! (
1502
1597
"Invalid data in source, saw retractions ({}) for row that does not exist: {:?}" ,
1503
- -copies,
1504
- & * borrow,
1598
+ -copies, & * borrow,
1505
1599
) ) ;
1506
1600
} else {
1507
1601
copies. into_inner ( ) . try_into ( ) . unwrap ( )
0 commit comments