Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl StatementExecutor {
let options = task_ctx.session_config().options();

// Track memory usage for the query result if it's bounded
let mut reservation =
let reservation =
MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool());

if physical_plan.boundedness().is_unbounded() {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2186,7 +2186,7 @@ mod tests {
// configure with same memory / disk manager
let memory_pool = ctx1.runtime_env().memory_pool.clone();

let mut reservation = MemoryConsumer::new("test").register(&memory_pool);
let reservation = MemoryConsumer::new("test").register(&memory_pool);
reservation.grow(100);

let disk_manager = ctx1.runtime_env().disk_manager.clone();
Expand Down
10 changes: 5 additions & 5 deletions datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,7 @@ impl FileSink for ParquetSink {
parquet_props.clone(),
)
.await?;
let mut reservation = MemoryConsumer::new(format!("ParquetSink[{path}]"))
let reservation = MemoryConsumer::new(format!("ParquetSink[{path}]"))
.register(context.memory_pool());
file_write_tasks.spawn(async move {
while let Some(batch) = rx.recv().await {
Expand Down Expand Up @@ -1465,7 +1465,7 @@ impl DataSink for ParquetSink {
async fn column_serializer_task(
mut rx: Receiver<ArrowLeafColumn>,
mut writer: ArrowColumnWriter,
mut reservation: MemoryReservation,
reservation: MemoryReservation,
) -> Result<(ArrowColumnWriter, MemoryReservation)> {
while let Some(col) = rx.recv().await {
writer.write(&col)?;
Expand Down Expand Up @@ -1550,7 +1550,7 @@ fn spawn_rg_join_and_finalize_task(
rg_rows: usize,
pool: &Arc<dyn MemoryPool>,
) -> SpawnedTask<RBStreamSerializeResult> {
let mut rg_reservation =
let rg_reservation =
MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(pool);

SpawnedTask::spawn(async move {
Expand Down Expand Up @@ -1682,12 +1682,12 @@ async fn concatenate_parallel_row_groups(
mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
pool: Arc<dyn MemoryPool>,
) -> Result<ParquetMetaData> {
let mut file_reservation =
let file_reservation =
MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool);

while let Some(task) = serialize_rx.recv().await {
let result = task.join_unwind().await;
let (serialized_columns, mut rg_reservation, _cnt) =
let (serialized_columns, rg_reservation, _cnt) =
result.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;

let mut rg_out = parquet_writer.next_row_group()?;
Expand Down
92 changes: 50 additions & 42 deletions datafusion/execution/src/memory_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! [`MemoryPool`] for memory management during query execution, [`proxy`] for
//! help with allocation accounting.

use datafusion_common::{Result, internal_err};
use datafusion_common::{Result, internal_datafusion_err};
use std::hash::{Hash, Hasher};
use std::{cmp::Ordering, sync::Arc, sync::atomic};

Expand Down Expand Up @@ -322,7 +322,7 @@ impl MemoryConsumer {
pool: Arc::clone(pool),
consumer: self,
}),
size: 0,
size: atomic::AtomicUsize::new(0),
}
}
}
Expand Down Expand Up @@ -351,13 +351,13 @@ impl Drop for SharedRegistration {
#[derive(Debug)]
pub struct MemoryReservation {
registration: Arc<SharedRegistration>,
size: usize,
size: atomic::AtomicUsize,
}

impl MemoryReservation {
/// Returns the size of this reservation in bytes
pub fn size(&self) -> usize {
self.size
self.size.load(atomic::Ordering::Relaxed)
}

/// Returns [MemoryConsumer] for this [MemoryReservation]
Expand All @@ -367,8 +367,8 @@ impl MemoryReservation {

/// Frees all bytes from this reservation back to the underlying
/// pool, returning the number of bytes freed.
pub fn free(&mut self) -> usize {
let size = self.size;
pub fn free(&self) -> usize {
let size = self.size.load(atomic::Ordering::Relaxed);
if size != 0 {
self.shrink(size)
}
Expand All @@ -380,60 +380,62 @@ impl MemoryReservation {
/// # Panics
///
/// Panics if `capacity` exceeds [`Self::size`]
pub fn shrink(&mut self, capacity: usize) {
let new_size = self.size.checked_sub(capacity).unwrap();
pub fn shrink(&self, capacity: usize) {
self.size.fetch_sub(capacity, atomic::Ordering::Relaxed);
self.registration.pool.shrink(self, capacity);
self.size = new_size
}

/// Tries to free `capacity` bytes from this reservation
/// if `capacity` does not exceed [`Self::size`]
/// Returns new reservation size
/// or error if shrinking capacity is more than allocated size
pub fn try_shrink(&mut self, capacity: usize) -> Result<usize> {
if let Some(new_size) = self.size.checked_sub(capacity) {
self.registration.pool.shrink(self, capacity);
self.size = new_size;
Ok(new_size)
} else {
internal_err!(
"Cannot free the capacity {capacity} out of allocated size {}",
self.size
pub fn try_shrink(&self, capacity: usize) -> Result<usize> {
let updated = self.size.fetch_update(
atomic::Ordering::Relaxed,
atomic::Ordering::Relaxed,
|prev| prev.checked_sub(capacity),
);
updated.map_err(|_| {
let prev = self.size.load(atomic::Ordering::Relaxed);
internal_datafusion_err!(
"Cannot free the capacity {capacity} out of allocated size {prev}"
)
}
})
}

/// Sets the size of this reservation to `capacity`
pub fn resize(&mut self, capacity: usize) {
match capacity.cmp(&self.size) {
Ordering::Greater => self.grow(capacity - self.size),
Ordering::Less => self.shrink(self.size - capacity),
pub fn resize(&self, capacity: usize) {
let size = self.size.load(atomic::Ordering::Relaxed);
match capacity.cmp(&size) {
Ordering::Greater => self.grow(capacity - size),
Ordering::Less => self.shrink(size - capacity),
_ => {}
}
}

/// Try to set the size of this reservation to `capacity`
pub fn try_resize(&mut self, capacity: usize) -> Result<()> {
match capacity.cmp(&self.size) {
Ordering::Greater => self.try_grow(capacity - self.size)?,
Ordering::Less => self.shrink(self.size - capacity),
pub fn try_resize(&self, capacity: usize) -> Result<()> {
let size = self.size.load(atomic::Ordering::Relaxed);
match capacity.cmp(&size) {
Ordering::Greater => self.try_grow(capacity - size)?,
Ordering::Less => self.shrink(size - capacity),
_ => {}
};
Ok(())
}

/// Increase the size of this reservation by `capacity` bytes
pub fn grow(&mut self, capacity: usize) {
pub fn grow(&self, capacity: usize) {
self.registration.pool.grow(self, capacity);
self.size += capacity;
self.size.fetch_add(capacity, atomic::Ordering::Relaxed);
}

/// Try to increase the size of this reservation by `capacity`
/// bytes, returning error if there is insufficient capacity left
/// in the pool.
pub fn try_grow(&mut self, capacity: usize) -> Result<()> {
pub fn try_grow(&self, capacity: usize) -> Result<()> {
self.registration.pool.try_grow(self, capacity)?;
self.size += capacity;
self.size.fetch_add(capacity, atomic::Ordering::Relaxed);
Ok(())
}

Expand All @@ -447,26 +449,32 @@ impl MemoryReservation {
/// # Panics
///
/// Panics if `capacity` exceeds [`Self::size`]
pub fn split(&mut self, capacity: usize) -> MemoryReservation {
self.size = self.size.checked_sub(capacity).unwrap();
pub fn split(&self, capacity: usize) -> MemoryReservation {
self.size
.fetch_update(
atomic::Ordering::Relaxed,
atomic::Ordering::Relaxed,
|prev| prev.checked_sub(capacity),
)
.unwrap();
Self {
size: capacity,
size: atomic::AtomicUsize::new(capacity),
registration: Arc::clone(&self.registration),
}
}

/// Returns a new empty [`MemoryReservation`] with the same [`MemoryConsumer`]
pub fn new_empty(&self) -> Self {
Self {
size: 0,
size: atomic::AtomicUsize::new(0),
registration: Arc::clone(&self.registration),
}
}

/// Splits off all the bytes from this [`MemoryReservation`] into
/// a new [`MemoryReservation`] with the same [`MemoryConsumer`]
pub fn take(&mut self) -> MemoryReservation {
self.split(self.size)
self.split(self.size.load(atomic::Ordering::Relaxed))
}
}

Expand All @@ -492,7 +500,7 @@ mod tests {
#[test]
fn test_memory_pool_underflow() {
let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
let mut a1 = MemoryConsumer::new("a1").register(&pool);
let a1 = MemoryConsumer::new("a1").register(&pool);
assert_eq!(pool.reserved(), 0);

a1.grow(100);
Expand All @@ -507,7 +515,7 @@ mod tests {
a1.try_grow(30).unwrap();
assert_eq!(pool.reserved(), 30);

let mut a2 = MemoryConsumer::new("a2").register(&pool);
let a2 = MemoryConsumer::new("a2").register(&pool);
a2.try_grow(25).unwrap_err();
assert_eq!(pool.reserved(), 30);

Expand All @@ -521,7 +529,7 @@ mod tests {
#[test]
fn test_split() {
let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
let mut r1 = MemoryConsumer::new("r1").register(&pool);
let r1 = MemoryConsumer::new("r1").register(&pool);

r1.try_grow(20).unwrap();
assert_eq!(r1.size(), 20);
Expand All @@ -542,10 +550,10 @@ mod tests {
#[test]
fn test_new_empty() {
let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
let mut r1 = MemoryConsumer::new("r1").register(&pool);
let r1 = MemoryConsumer::new("r1").register(&pool);

r1.try_grow(20).unwrap();
let mut r2 = r1.new_empty();
let r2 = r1.new_empty();
r2.try_grow(5).unwrap();

assert_eq!(r1.size(), 20);
Expand All @@ -559,7 +567,7 @@ mod tests {
let mut r1 = MemoryConsumer::new("r1").register(&pool);

r1.try_grow(20).unwrap();
let mut r2 = r1.take();
let r2 = r1.take();
r2.try_grow(5).unwrap();

assert_eq!(r1.size(), 0);
Expand Down
Loading