diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 2b8385ac2d89..abf068db0cca 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -269,7 +269,7 @@ impl StatementExecutor { let options = task_ctx.session_config().options(); // Track memory usage for the query result if it's bounded - let mut reservation = + let reservation = MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool()); if physical_plan.boundedness().is_unbounded() { diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 712f4389f585..b6c606ff467f 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -2186,7 +2186,7 @@ mod tests { // configure with same memory / disk manager let memory_pool = ctx1.runtime_env().memory_pool.clone(); - let mut reservation = MemoryConsumer::new("test").register(&memory_pool); + let reservation = MemoryConsumer::new("test").register(&memory_pool); reservation.grow(100); let disk_manager = ctx1.runtime_env().disk_manager.clone(); diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 6635c9072dd9..d59b42ed15d1 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1360,7 +1360,7 @@ impl FileSink for ParquetSink { parquet_props.clone(), ) .await?; - let mut reservation = MemoryConsumer::new(format!("ParquetSink[{path}]")) + let reservation = MemoryConsumer::new(format!("ParquetSink[{path}]")) .register(context.memory_pool()); file_write_tasks.spawn(async move { while let Some(batch) = rx.recv().await { @@ -1465,7 +1465,7 @@ impl DataSink for ParquetSink { async fn column_serializer_task( mut rx: Receiver, mut writer: ArrowColumnWriter, - mut reservation: MemoryReservation, + reservation: MemoryReservation, ) -> Result<(ArrowColumnWriter, MemoryReservation)> { while let Some(col) = rx.recv().await { writer.write(&col)?; @@ -1550,7 +1550,7 @@ fn spawn_rg_join_and_finalize_task( rg_rows: usize, pool: &Arc, ) -> SpawnedTask { - let mut rg_reservation = + let rg_reservation = MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(pool); SpawnedTask::spawn(async move { @@ -1682,12 +1682,12 @@ async fn concatenate_parallel_row_groups( mut object_store_writer: Box, pool: Arc, ) -> Result { - let mut file_reservation = + let file_reservation = MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool); while let Some(task) = serialize_rx.recv().await { let result = task.join_unwind().await; - let (serialized_columns, mut rg_reservation, _cnt) = + let (serialized_columns, rg_reservation, _cnt) = result.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; let mut rg_out = parquet_writer.next_row_group()?; diff --git a/datafusion/execution/src/memory_pool/mod.rs b/datafusion/execution/src/memory_pool/mod.rs index fbf9ce41da8f..b23eede2a054 100644 --- a/datafusion/execution/src/memory_pool/mod.rs +++ b/datafusion/execution/src/memory_pool/mod.rs @@ -18,7 +18,7 @@ //! [`MemoryPool`] for memory management during query execution, [`proxy`] for //! help with allocation accounting. -use datafusion_common::{Result, internal_err}; +use datafusion_common::{Result, internal_datafusion_err}; use std::hash::{Hash, Hasher}; use std::{cmp::Ordering, sync::Arc, sync::atomic}; @@ -322,7 +322,7 @@ impl MemoryConsumer { pool: Arc::clone(pool), consumer: self, }), - size: 0, + size: atomic::AtomicUsize::new(0), } } } @@ -351,13 +351,13 @@ impl Drop for SharedRegistration { #[derive(Debug)] pub struct MemoryReservation { registration: Arc, - size: usize, + size: atomic::AtomicUsize, } impl MemoryReservation { /// Returns the size of this reservation in bytes pub fn size(&self) -> usize { - self.size + self.size.load(atomic::Ordering::Relaxed) } /// Returns [MemoryConsumer] for this [MemoryReservation] @@ -367,8 +367,8 @@ impl MemoryReservation { /// Frees all bytes from this reservation back to the underlying /// pool, returning the number of bytes freed. - pub fn free(&mut self) -> usize { - let size = self.size; + pub fn free(&self) -> usize { + let size = self.size.load(atomic::Ordering::Relaxed); if size != 0 { self.shrink(size) } @@ -380,60 +380,62 @@ impl MemoryReservation { /// # Panics /// /// Panics if `capacity` exceeds [`Self::size`] - pub fn shrink(&mut self, capacity: usize) { - let new_size = self.size.checked_sub(capacity).unwrap(); + pub fn shrink(&self, capacity: usize) { + self.size.fetch_sub(capacity, atomic::Ordering::Relaxed); self.registration.pool.shrink(self, capacity); - self.size = new_size } /// Tries to free `capacity` bytes from this reservation /// if `capacity` does not exceed [`Self::size`] /// Returns new reservation size /// or error if shrinking capacity is more than allocated size - pub fn try_shrink(&mut self, capacity: usize) -> Result { - if let Some(new_size) = self.size.checked_sub(capacity) { - self.registration.pool.shrink(self, capacity); - self.size = new_size; - Ok(new_size) - } else { - internal_err!( - "Cannot free the capacity {capacity} out of allocated size {}", - self.size + pub fn try_shrink(&self, capacity: usize) -> Result { + let updated = self.size.fetch_update( + atomic::Ordering::Relaxed, + atomic::Ordering::Relaxed, + |prev| prev.checked_sub(capacity), + ); + updated.map_err(|_| { + let prev = self.size.load(atomic::Ordering::Relaxed); + internal_datafusion_err!( + "Cannot free the capacity {capacity} out of allocated size {prev}" ) - } + }) } /// Sets the size of this reservation to `capacity` - pub fn resize(&mut self, capacity: usize) { - match capacity.cmp(&self.size) { - Ordering::Greater => self.grow(capacity - self.size), - Ordering::Less => self.shrink(self.size - capacity), + pub fn resize(&self, capacity: usize) { + let size = self.size.load(atomic::Ordering::Relaxed); + match capacity.cmp(&size) { + Ordering::Greater => self.grow(capacity - size), + Ordering::Less => self.shrink(size - capacity), _ => {} } } /// Try to set the size of this reservation to `capacity` - pub fn try_resize(&mut self, capacity: usize) -> Result<()> { - match capacity.cmp(&self.size) { - Ordering::Greater => self.try_grow(capacity - self.size)?, - Ordering::Less => self.shrink(self.size - capacity), + pub fn try_resize(&self, capacity: usize) -> Result<()> { + let size = self.size.load(atomic::Ordering::Relaxed); + match capacity.cmp(&size) { + Ordering::Greater => self.try_grow(capacity - size)?, + Ordering::Less => self.shrink(size - capacity), _ => {} }; Ok(()) } /// Increase the size of this reservation by `capacity` bytes - pub fn grow(&mut self, capacity: usize) { + pub fn grow(&self, capacity: usize) { self.registration.pool.grow(self, capacity); - self.size += capacity; + self.size.fetch_add(capacity, atomic::Ordering::Relaxed); } /// Try to increase the size of this reservation by `capacity` /// bytes, returning error if there is insufficient capacity left /// in the pool. - pub fn try_grow(&mut self, capacity: usize) -> Result<()> { + pub fn try_grow(&self, capacity: usize) -> Result<()> { self.registration.pool.try_grow(self, capacity)?; - self.size += capacity; + self.size.fetch_add(capacity, atomic::Ordering::Relaxed); Ok(()) } @@ -447,10 +449,16 @@ impl MemoryReservation { /// # Panics /// /// Panics if `capacity` exceeds [`Self::size`] - pub fn split(&mut self, capacity: usize) -> MemoryReservation { - self.size = self.size.checked_sub(capacity).unwrap(); + pub fn split(&self, capacity: usize) -> MemoryReservation { + self.size + .fetch_update( + atomic::Ordering::Relaxed, + atomic::Ordering::Relaxed, + |prev| prev.checked_sub(capacity), + ) + .unwrap(); Self { - size: capacity, + size: atomic::AtomicUsize::new(capacity), registration: Arc::clone(&self.registration), } } @@ -458,7 +466,7 @@ impl MemoryReservation { /// Returns a new empty [`MemoryReservation`] with the same [`MemoryConsumer`] pub fn new_empty(&self) -> Self { Self { - size: 0, + size: atomic::AtomicUsize::new(0), registration: Arc::clone(&self.registration), } } @@ -466,7 +474,7 @@ impl MemoryReservation { /// Splits off all the bytes from this [`MemoryReservation`] into /// a new [`MemoryReservation`] with the same [`MemoryConsumer`] pub fn take(&mut self) -> MemoryReservation { - self.split(self.size) + self.split(self.size.load(atomic::Ordering::Relaxed)) } } @@ -492,7 +500,7 @@ mod tests { #[test] fn test_memory_pool_underflow() { let pool = Arc::new(GreedyMemoryPool::new(50)) as _; - let mut a1 = MemoryConsumer::new("a1").register(&pool); + let a1 = MemoryConsumer::new("a1").register(&pool); assert_eq!(pool.reserved(), 0); a1.grow(100); @@ -507,7 +515,7 @@ mod tests { a1.try_grow(30).unwrap(); assert_eq!(pool.reserved(), 30); - let mut a2 = MemoryConsumer::new("a2").register(&pool); + let a2 = MemoryConsumer::new("a2").register(&pool); a2.try_grow(25).unwrap_err(); assert_eq!(pool.reserved(), 30); @@ -521,7 +529,7 @@ mod tests { #[test] fn test_split() { let pool = Arc::new(GreedyMemoryPool::new(50)) as _; - let mut r1 = MemoryConsumer::new("r1").register(&pool); + let r1 = MemoryConsumer::new("r1").register(&pool); r1.try_grow(20).unwrap(); assert_eq!(r1.size(), 20); @@ -542,10 +550,10 @@ mod tests { #[test] fn test_new_empty() { let pool = Arc::new(GreedyMemoryPool::new(50)) as _; - let mut r1 = MemoryConsumer::new("r1").register(&pool); + let r1 = MemoryConsumer::new("r1").register(&pool); r1.try_grow(20).unwrap(); - let mut r2 = r1.new_empty(); + let r2 = r1.new_empty(); r2.try_grow(5).unwrap(); assert_eq!(r1.size(), 20); @@ -559,7 +567,7 @@ mod tests { let mut r1 = MemoryConsumer::new("r1").register(&pool); r1.try_grow(20).unwrap(); - let mut r2 = r1.take(); + let r2 = r1.take(); r2.try_grow(5).unwrap(); assert_eq!(r1.size(), 0); diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index bf74b5f6f4c6..b10270851cc0 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -212,7 +212,7 @@ impl MemoryPool for FairSpillPool { .checked_div(state.num_spill) .unwrap_or(spill_available); - if reservation.size + additional > available { + if reservation.size() + additional > available { return Err(insufficient_capacity_err( reservation, additional, @@ -264,7 +264,7 @@ fn insufficient_capacity_err( "Failed to allocate additional {} for {} with {} already allocated for this reservation - {} remain available for the total pool", human_readable_size(additional), reservation.registration.consumer.name, - human_readable_size(reservation.size), + human_readable_size(reservation.size()), human_readable_size(available) ) } @@ -526,12 +526,12 @@ mod tests { fn test_fair() { let pool = Arc::new(FairSpillPool::new(100)) as _; - let mut r1 = MemoryConsumer::new("unspillable").register(&pool); + let r1 = MemoryConsumer::new("unspillable").register(&pool); // Can grow beyond capacity of pool r1.grow(2000); assert_eq!(pool.reserved(), 2000); - let mut r2 = MemoryConsumer::new("r2") + let r2 = MemoryConsumer::new("r2") .with_can_spill(true) .register(&pool); // Can grow beyond capacity of pool @@ -563,7 +563,7 @@ mod tests { assert_eq!(r2.size(), 10); assert_eq!(pool.reserved(), 30); - let mut r3 = MemoryConsumer::new("r3") + let r3 = MemoryConsumer::new("r3") .with_can_spill(true) .register(&pool); @@ -584,7 +584,7 @@ mod tests { r1.free(); assert_eq!(pool.reserved(), 80); - let mut r4 = MemoryConsumer::new("s4").register(&pool); + let r4 = MemoryConsumer::new("s4").register(&pool); let err = r4.try_grow(30).unwrap_err().strip_backtrace(); assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 30.0 B for s4 with 0.0 B already allocated for this reservation - 20.0 B remain available for the total pool"); } @@ -601,18 +601,18 @@ mod tests { // Test: use all the different interfaces to change reservation size // set r1=50, using grow and shrink - let mut r1 = MemoryConsumer::new("r1").register(&pool); + let r1 = MemoryConsumer::new("r1").register(&pool); r1.grow(50); r1.grow(20); r1.shrink(20); // set r2=15 using try_grow - let mut r2 = MemoryConsumer::new("r2").register(&pool); + let r2 = MemoryConsumer::new("r2").register(&pool); r2.try_grow(15) .expect("should succeed in memory allotment for r2"); // set r3=20 using try_resize - let mut r3 = MemoryConsumer::new("r3").register(&pool); + let r3 = MemoryConsumer::new("r3").register(&pool); r3.try_resize(25) .expect("should succeed in memory allotment for r3"); r3.try_resize(20) @@ -620,12 +620,12 @@ mod tests { // set r4=10 // this should not be reported in top 3 - let mut r4 = MemoryConsumer::new("r4").register(&pool); + let r4 = MemoryConsumer::new("r4").register(&pool); r4.grow(10); // Test: reports if new reservation causes error // using the previously set sizes for other consumers - let mut r5 = MemoryConsumer::new("r5").register(&pool); + let r5 = MemoryConsumer::new("r5").register(&pool); let res = r5.try_grow(150); assert!(res.is_err()); let error = res.unwrap_err().strip_backtrace(); @@ -650,7 +650,7 @@ mod tests { let same_name = "foo"; // Test: see error message when no consumers recorded yet - let mut r0 = MemoryConsumer::new(same_name).register(&pool); + let r0 = MemoryConsumer::new(same_name).register(&pool); let res = r0.try_grow(150); assert!(res.is_err()); let error = res.unwrap_err().strip_backtrace(); @@ -665,7 +665,7 @@ mod tests { r0.grow(10); // make r0=10, pool available=90 let new_consumer_same_name = MemoryConsumer::new(same_name); - let mut r1 = new_consumer_same_name.register(&pool); + let r1 = new_consumer_same_name.register(&pool); // TODO: the insufficient_capacity_err() message is per reservation, not per consumer. // a followup PR will clarify this message "0 bytes already allocated for this reservation" let res = r1.try_grow(150); @@ -695,7 +695,7 @@ mod tests { // will be recognized as different in the TrackConsumersPool let consumer_with_same_name_but_different_hash = MemoryConsumer::new(same_name).with_can_spill(true); - let mut r2 = consumer_with_same_name_but_different_hash.register(&pool); + let r2 = consumer_with_same_name_but_different_hash.register(&pool); let res = r2.try_grow(150); assert!(res.is_err()); let error = res.unwrap_err().strip_backtrace(); @@ -714,10 +714,10 @@ mod tests { // Baseline: see the 2 memory consumers let setting = make_settings(); let _bound = setting.bind_to_scope(); - let mut r0 = MemoryConsumer::new("r0").register(&pool); + let r0 = MemoryConsumer::new("r0").register(&pool); r0.grow(10); let r1_consumer = MemoryConsumer::new("r1"); - let mut r1 = r1_consumer.register(&pool); + let r1 = r1_consumer.register(&pool); r1.grow(20); let res = r0.try_grow(150); @@ -791,13 +791,13 @@ mod tests { .downcast::>() .unwrap(); // set r1=20 - let mut r1 = MemoryConsumer::new("r1").register(&pool); + let r1 = MemoryConsumer::new("r1").register(&pool); r1.grow(20); // set r2=15 - let mut r2 = MemoryConsumer::new("r2").register(&pool); + let r2 = MemoryConsumer::new("r2").register(&pool); r2.grow(15); // set r3=45 - let mut r3 = MemoryConsumer::new("r3").register(&pool); + let r3 = MemoryConsumer::new("r3").register(&pool); r3.grow(45); let downcasted = upcasted diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 4f32b6176ec3..7ada14be6654 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -206,7 +206,7 @@ async fn load_left_input( let (batches, _metrics, reservation) = stream .try_fold( (Vec::new(), metrics, reservation), - |(mut batches, metrics, mut reservation), batch| async { + |(mut batches, metrics, reservation), batch| async { let batch_size = batch.get_array_memory_size(); // Reserve memory for incoming batch reservation.try_grow(batch_size)?; diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 44637321a7e3..b57f9132253b 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -682,10 +682,10 @@ async fn collect_left_input( let schema = stream.schema(); // Load all batches and count the rows - let (batches, metrics, mut reservation) = stream + let (batches, metrics, reservation) = stream .try_fold( (Vec::new(), join_metrics, reservation), - |(mut batches, metrics, mut reservation), batch| async { + |(mut batches, metrics, reservation), batch| async { let batch_size = batch.get_array_memory_size(); // Reserve memory for incoming batch reservation.try_grow(batch_size)?; diff --git a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs index 508be2e3984f..d7ece845e943 100644 --- a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs @@ -620,7 +620,7 @@ async fn build_buffered_data( // Combine batches and record number of rows let initial = (Vec::new(), 0, metrics, reservation); - let (batches, num_rows, metrics, mut reservation) = buffered + let (batches, num_rows, metrics, reservation) = buffered .try_fold(initial, |mut acc, batch| async { let batch_size = get_record_batch_memory_size(&batch); acc.3.try_grow(batch_size)?; diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 3e8fdf1f3ed7..a8361f7b2941 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -709,7 +709,7 @@ impl ExternalSorter { &self, batch: RecordBatch, metrics: &BaselineMetrics, - mut reservation: MemoryReservation, + reservation: MemoryReservation, ) -> Result { assert_eq!( get_reserved_bytes_for_record_batch(&batch)?, @@ -736,7 +736,7 @@ impl ExternalSorter { .then({ move |batches| async move { match batches { - Ok((schema, sorted_batches, mut reservation)) => { + Ok((schema, sorted_batches, reservation)) => { // Calculate the total size of sorted batches let total_sorted_size: usize = sorted_batches .iter() diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index a510f44e4f4d..779511a865b6 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -180,7 +180,7 @@ impl RowCursorStream { self.rows.save(stream_idx, &rows); // track the memory in the newly created Rows. - let mut rows_reservation = self.reservation.new_empty(); + let rows_reservation = self.reservation.new_empty(); rows_reservation.try_grow(rows.size())?; Ok(RowValues::new(rows, rows_reservation)) } @@ -246,7 +246,7 @@ impl FieldCursorStream { let array = value.into_array(batch.num_rows())?; let size_in_mem = array.get_buffer_memory_size(); let array = array.as_any().downcast_ref::().expect("field values"); - let mut array_reservation = self.reservation.new_empty(); + let array_reservation = self.reservation.new_empty(); array_reservation.try_grow(size_in_mem)?; Ok(ArrayValues::new( self.sort.options, diff --git a/datafusion/physical-plan/src/stream.rs b/datafusion/physical-plan/src/stream.rs index 80c2233d05db..4b7e707fcced 100644 --- a/datafusion/physical-plan/src/stream.rs +++ b/datafusion/physical-plan/src/stream.rs @@ -1005,7 +1005,7 @@ mod test { .build_arc() .unwrap(); - let mut reservation = MemoryConsumer::new("test").register(&runtime.memory_pool); + let reservation = MemoryConsumer::new("test").register(&runtime.memory_pool); let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); @@ -1071,7 +1071,7 @@ mod test { .build_arc() .unwrap(); - let mut reservation = MemoryConsumer::new("test").register(&runtime.memory_pool); + let reservation = MemoryConsumer::new("test").register(&runtime.memory_pool); let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index f1b9e3e88d12..1313909adbba 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -283,7 +283,7 @@ mod tests { assert!(work_table.take().is_err()); let pool = Arc::new(UnboundedMemoryPool::default()) as _; - let mut reservation = MemoryConsumer::new("test_work_table").register(&pool); + let reservation = MemoryConsumer::new("test_work_table").register(&pool); // Update batch to work_table let array: ArrayRef = Arc::new((0..5).collect::());