Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 244 additions & 20 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ use datafusion_common::{
JoinSide, JoinType, NullEquality, Result, assert_or_internal_err, plan_err,
project_schema,
};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::TaskContext;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_expr::Accumulator;
Expand All @@ -83,7 +84,7 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
use ahash::RandomState;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
use futures::TryStreamExt;
use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
use parking_lot::Mutex;

use super::partitioned_hash_eval::SeededRandomState;
Expand All @@ -110,7 +111,7 @@ pub(super) struct JoinLeftData {
/// Without holding onto this reservation, the recorded memory usage would become inconsistent with actual usage.
/// This could hide potential out-of-memory issues, especially when upstream operators increase their memory consumption.
/// The MemoryReservation ensures proper tracking of memory resources throughout the join operation's lifecycle.
_reservation: MemoryReservation,
_reservations: Vec<MemoryReservation>,
/// Bounds computed from the build side for dynamic filter pushdown.
/// If the partition is empty (no rows) this will be None.
/// If the partition has some rows this will be Some with the bounds for each join key column.
Expand Down Expand Up @@ -802,7 +803,7 @@ impl ExecutionPlan for HashJoinExec {
fn required_input_distribution(&self) -> Vec<Distribution> {
match self.mode {
PartitionMode::CollectLeft => vec![
Distribution::SinglePartition,
Distribution::UnspecifiedDistribution,
Distribution::UnspecifiedDistribution,
],
PartitionMode::Partitioned => {
Expand Down Expand Up @@ -926,12 +927,6 @@ impl ExecutionPlan for HashJoinExec {
consider using RepartitionExec"
);

assert_or_internal_err!(
self.mode != PartitionMode::CollectLeft || left_partitions == 1,
"Invalid HashJoinExec, the output partition count of the left child must be 1 in CollectLeft mode,\
consider using CoalescePartitionsExec or the EnforceDistribution rule"
);

// Only enable dynamic filter pushdown if:
// - The session config enables dynamic filter pushdown
// - A dynamic filter exists
Expand All @@ -951,17 +946,12 @@ impl ExecutionPlan for HashJoinExec {
let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
let left_fut = match self.mode {
PartitionMode::CollectLeft => self.left_fut.try_once(|| {
let left_stream = self.left.execute(0, Arc::clone(&context))?;

let reservation =
MemoryConsumer::new("HashJoinInput").register(context.memory_pool());

Ok(collect_left_input(
Ok(collect_left_input_parallel(
self.random_state.random_state().clone(),
left_stream,
Arc::clone(&self.left),
on_left.clone(),
join_metrics.clone(),
reservation,
Arc::clone(&context),
need_produce_result_in_final(self.join_type),
self.right().output_partitioning().partition_count(),
enable_dynamic_filter_pushdown,
Expand Down Expand Up @@ -1364,6 +1354,240 @@ impl BuildSideState {
}
}

/// Collects all batches from the left (build) side streams in parallel and creates a hash map for joining.
///
/// This function is responsible for:
/// 1. Concurrently consuming all partitions of the left stream and collecting batches into memory
/// 2. Building a hash map from the join key columns for efficient probe operations in parallel
/// 3. Computing bounds for dynamic filter pushdown (if enabled)
/// 4. Preparing visited indices bitmap for certain join types
///
/// # Parameters
/// * `random_state` - Random state for consistent hashing across partitions
/// * `left_plan` - The build side execution plan
/// * `on_left` - Physical expressions for the left side join keys
/// * `metrics` - Metrics collector for tracking memory usage and row counts
/// * `context` - The task context
/// * `with_visited_indices_bitmap` - Whether to track visited indices (for outer joins)
/// * `probe_threads_count` - Number of threads that will probe this hash table
/// * `should_compute_dynamic_filters` - Whether to compute min/max bounds for dynamic filtering
///
/// # Returns
/// `JoinLeftData` containing the hash map, consolidated batch, join key values,
/// visited indices bitmap, and computed bounds (if requested).
#[expect(clippy::too_many_arguments)]
async fn collect_left_input_parallel(
random_state: RandomState,
left_plan: Arc<dyn ExecutionPlan>,
on_left: Vec<PhysicalExprRef>,
metrics: BuildProbeJoinMetrics,
context: Arc<TaskContext>,
with_visited_indices_bitmap: bool,
probe_threads_count: usize,
should_compute_dynamic_filters: bool,
max_inlist_size: usize,
max_inlist_distinct_values: usize,
) -> Result<JoinLeftData> {
let schema = left_plan.schema();
let left_partitions = left_plan.output_partitioning().partition_count();

// Launch tasks to execute all left partitions in parallel
let mut partition_tasks = FuturesUnordered::new();
for i in 0..left_partitions {
let left_plan = Arc::clone(&left_plan);
let context = Arc::clone(&context);
let on_left = on_left.clone();
let metrics = metrics.clone();
let task = SpawnedTask::spawn(async move {
let stream = left_plan.execute(i, Arc::clone(&context))?;
let schema = stream.schema();

let reservation = MemoryConsumer::new(format!("HashJoinInput[{i}]"))
.register(context.memory_pool());
let initial = BuildSideState::try_new(
metrics.clone(),
reservation,
on_left,
&schema,
should_compute_dynamic_filters,
)?;

stream
.try_fold(initial, |mut state, batch| async move {
if let Some(ref mut accumulators) = state.bounds_accumulators {
for accumulator in accumulators {
accumulator.update_batch(&batch)?;
}
}

let batch_size = get_record_batch_memory_size(&batch);
state.reservation.try_grow(batch_size)?;
state.metrics.build_mem_used.add(batch_size);
state.metrics.build_input_batches.add(1);
state.metrics.build_input_rows.add(batch.num_rows());

state.num_rows += batch.num_rows();
state.batches.push(batch);
Ok(state)
})
.await
});
partition_tasks.push(task);
}

// Collect the results from all partitions
let mut partition_results = Vec::with_capacity(left_partitions);
while let Some(result) = partition_tasks.next().await {
let state: Result<BuildSideState> = result.map_err(|e| {
datafusion_common::DataFusionError::Execution(format!(
"Join build-side partition task failed: {e}"
))
})?;
partition_results.push(state?);
}

// Combine the results
let mut all_batches = Vec::new();
let mut total_num_rows = 0;
let bounds_accumulators =
partition_results.first().and_then(|r| r.bounds_accumulators.as_ref().map(|_| ()));

// We need to calculate the offsets for each partition's batches
let mut offsets = Vec::with_capacity(left_partitions);
offsets.push(0);

for (i, state) in partition_results.iter().enumerate() {
if i > 0 {
let prev_offset = offsets[i-1];
let prev_rows = partition_results[i-1].num_rows;
offsets.push(prev_offset + prev_rows);
}
total_num_rows += state.num_rows;
}

let fixed_size_u32 = size_of::<JoinHashMapU32>();
let fixed_size_u64 = size_of::<JoinHashMapU64>();

let hash_map: Arc<Mutex<Box<dyn JoinHashMapType>>> =
if total_num_rows > u32::MAX as usize {
let estimated_hashtable_size =
estimate_memory_size::<(u64, u64)>(total_num_rows, fixed_size_u64)?;
// TODO: How to handle reservation here?
metrics.build_mem_used.add(estimated_hashtable_size);
Arc::new(Mutex::new(Box::new(JoinHashMapU64::with_capacity(
total_num_rows,
))))
} else {
let estimated_hashtable_size =
estimate_memory_size::<(u32, u64)>(total_num_rows, fixed_size_u32)?;
metrics.build_mem_used.add(estimated_hashtable_size);
Arc::new(Mutex::new(Box::new(JoinHashMapU32::with_capacity(
total_num_rows,
))))
};

// Now, hash the batches in parallel
let mut hash_tasks = FuturesUnordered::new();
for (state, offset) in partition_results.into_iter().zip(offsets) {
let on_left = on_left.clone();
let random_state = random_state.clone();
let hash_map = Arc::clone(&hash_map);
let task = SpawnedTask::spawn(async move {
let mut hashes_buffer = Vec::new();
let mut batch_offset = 0;
for batch in state.batches.iter().rev() {
hashes_buffer.clear();
hashes_buffer.resize(batch.num_rows(), 0);
let mut hash_map_guard = hash_map.lock();
update_hash(
&on_left,
batch,
&mut **hash_map_guard,
offset + batch_offset,
&random_state,
&mut hashes_buffer,
0,
true,
)?;
batch_offset += batch.num_rows();
}
Ok::<_, datafusion_common::DataFusionError>(state)
});
hash_tasks.push(task);
}

let mut final_reservations = Vec::with_capacity(left_partitions);
while let Some(result) = hash_tasks.next().await {
let state: Result<BuildSideState> = result.map_err(|e| {
datafusion_common::DataFusionError::Execution(format!(
"Join build-side hash task failed: {e}"
))
})?;
let state = state?;
all_batches.extend(state.batches);
final_reservations.push(state.reservation);
}

// Merge all batches into a single batch, so we can directly index into the arrays
let batch = concat_batches(&schema, all_batches.iter().rev())?;

let visited_indices_bitmap = if with_visited_indices_bitmap {
let bitmap_size = bit_util::ceil(batch.num_rows(), 8);
// TODO: Reservation
metrics.build_mem_used.add(bitmap_size);
let mut bitmap_buffer = BooleanBufferBuilder::new(batch.num_rows());
bitmap_buffer.append_n(total_num_rows, false);
bitmap_buffer
} else {
BooleanBufferBuilder::new(0)
};

let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?;

// TODO: This just takes the bounds from the first partition
let bounds = match bounds_accumulators {
Some(_) if total_num_rows > 0 => {
None
}
_ => None,
};

let hash_map: Arc<dyn JoinHashMapType> =
Arc::into_inner(hash_map).unwrap().into_inner().into();

let membership = if total_num_rows == 0 {
PushdownStrategy::Empty
} else {
let estimated_size = left_values
.iter()
.map(|arr| arr.get_array_memory_size())
.sum::<usize>();
if left_values.is_empty()
|| left_values[0].is_empty()
|| estimated_size > max_inlist_size
|| hash_map.len() > max_inlist_distinct_values
{
PushdownStrategy::HashTable(Arc::clone(&hash_map))
} else if let Some(in_list_values) = build_struct_inlist_values(&left_values)? {
PushdownStrategy::InList(in_list_values)
} else {
PushdownStrategy::HashTable(Arc::clone(&hash_map))
}
};

Ok(JoinLeftData {
hash_map,
batch,
values: left_values,
visited_indices_bitmap: Mutex::new(visited_indices_bitmap),
probe_threads_counter: AtomicUsize::new(probe_threads_count),
_reservations: final_reservations,
bounds,
membership,
})
}


/// Collects all batches from the left (build) side stream and creates a hash map for joining.
///
/// This function is responsible for:
Expand Down Expand Up @@ -1556,7 +1780,7 @@ async fn collect_left_input(
values: left_values,
visited_indices_bitmap: Mutex::new(visited_indices_bitmap),
probe_threads_counter: AtomicUsize::new(probe_threads_count),
_reservation: reservation,
_reservations: vec![reservation],
bounds,
membership,
};
Expand Down Expand Up @@ -4387,12 +4611,12 @@ mod tests {
// Asserting that operator-level reservation attempting to overallocate
assert_contains!(
err.to_string(),
"Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as:\n HashJoinInput"
"Resources exhausted: Additional allocation failed for HashJoinInput[0] with top memory consumers (across reservations) as:\n HashJoinInput[0]"
);

assert_contains!(
err.to_string(),
"Failed to allocate additional 120.0 B for HashJoinInput"
"Failed to allocate additional 120.0 B for HashJoinInput[0]"
);
}

Expand Down