Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,8 @@ impl Statistics {
/// For example, if we had statistics for columns `{"a", "b", "c"}`,
/// projecting to `vec![2, 1]` would return statistics for columns `{"c",
/// "b"}`.
pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self {
let Some(projection) = projection else {
pub fn project(mut self, projection: Option<&impl AsRef<[usize]>>) -> Self {
let Some(projection) = projection.map(AsRef::as_ref) else {
return self;
};

Expand All @@ -410,7 +410,7 @@ impl Statistics {
.map(Slot::Present)
.collect();

for idx in projection {
for idx in projection.iter() {
let next_idx = self.column_statistics.len();
let slot = std::mem::replace(
columns.get_mut(*idx).expect("projection out of bounds"),
Expand Down Expand Up @@ -1066,8 +1066,8 @@ mod tests {

#[test]
fn test_project_none() {
let projection = None;
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
let projection: Option<&Vec<usize>> = None;
let stats = make_stats(vec![10, 20, 30]).project(projection);
assert_eq!(stats, make_stats(vec![10, 20, 30]));
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ use std::thread::available_parallelism;
/// ```
pub fn project_schema(
schema: &SchemaRef,
projection: Option<&Vec<usize>>,
projection: Option<&impl AsRef<[usize]>>,
) -> Result<SchemaRef> {
let schema = match projection {
let schema = match projection.map(AsRef::as_ref) {
Some(columns) => Arc::new(schema.project(columns)?),
None => Arc::clone(schema),
};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ impl DefaultPhysicalPlanner {
// project the output columns excluding the async functions
// The async functions are always appended to the end of the schema.
.apply_projection(Some(
(0..input.schema().fields().len()).collect(),
(0..input.schema().fields().len()).collect::<Vec<_>>(),
))?
.with_batch_size(session_state.config().batch_size())
.build()?
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ async fn test_hash_join_swap_on_joins_with_projections(
"ProjectionExec won't be added above if HashJoinExec contains embedded projection",
);

assert_eq!(swapped_join.projection, Some(vec![0_usize]));
assert_eq!(swapped_join.projection.as_ref().unwrap(), [0_usize]);
assert_eq!(swapped.schema().fields.len(), 1);
assert_eq!(swapped.schema().fields[0].name(), "small_col");
Ok(())
Expand Down
45 changes: 22 additions & 23 deletions datafusion/physical-expr/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl From<ProjectionExpr> for (Arc<dyn PhysicalExpr>, String) {
/// indices.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ProjectionExprs {
exprs: Vec<ProjectionExpr>,
exprs: Arc<[ProjectionExpr]>,
}

impl std::fmt::Display for ProjectionExprs {
Expand All @@ -137,22 +137,24 @@ impl std::fmt::Display for ProjectionExprs {

impl From<Vec<ProjectionExpr>> for ProjectionExprs {
fn from(value: Vec<ProjectionExpr>) -> Self {
Self { exprs: value }
Self {
exprs: value.into(),
}
}
}

impl From<&[ProjectionExpr]> for ProjectionExprs {
fn from(value: &[ProjectionExpr]) -> Self {
Self {
exprs: value.to_vec(),
exprs: value.iter().cloned().collect(),
}
}
}

impl FromIterator<ProjectionExpr> for ProjectionExprs {
fn from_iter<T: IntoIterator<Item = ProjectionExpr>>(exprs: T) -> Self {
Self {
exprs: exprs.into_iter().collect::<Vec<_>>(),
exprs: exprs.into_iter().collect(),
}
}
}
Expand All @@ -164,12 +166,17 @@ impl AsRef<[ProjectionExpr]> for ProjectionExprs {
}

impl ProjectionExprs {
pub fn new<I>(exprs: I) -> Self
where
I: IntoIterator<Item = ProjectionExpr>,
{
/// Make a new [`ProjectionExprs`] from expressions iterator.
pub fn new(exprs: impl IntoIterator<Item = ProjectionExpr>) -> Self {
Self {
exprs: exprs.into_iter().collect(),
}
}

/// Make a new [`ProjectionExprs`] from expressions.
pub fn from_expressions(exprs: impl Into<Arc<[ProjectionExpr]>>) -> Self {
Self {
exprs: exprs.into_iter().collect::<Vec<_>>(),
exprs: exprs.into(),
}
}

Expand Down Expand Up @@ -285,13 +292,14 @@ impl ProjectionExprs {
{
let exprs = self
.exprs
.into_iter()
.iter()
.cloned()
.map(|mut proj| {
proj.expr = f(proj.expr)?;
Ok(proj)
})
.collect::<Result<Vec<_>>>()?;
Ok(Self::new(exprs))
.collect::<Result<Arc<_>>>()?;
Ok(Self::from_expressions(exprs))
}

/// Apply another projection on top of this projection, returning the combined projection.
Expand Down Expand Up @@ -361,7 +369,7 @@ impl ProjectionExprs {
/// applied on top of this projection.
pub fn try_merge(&self, other: &ProjectionExprs) -> Result<ProjectionExprs> {
let mut new_exprs = Vec::with_capacity(other.exprs.len());
for proj_expr in &other.exprs {
for proj_expr in other.exprs.iter() {
let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)?
.ok_or_else(|| {
internal_datafusion_err!(
Expand Down Expand Up @@ -607,7 +615,7 @@ impl ProjectionExprs {
) -> Result<datafusion_common::Statistics> {
let mut column_statistics = vec![];

for proj_expr in &self.exprs {
for proj_expr in self.exprs.iter() {
let expr = &proj_expr.expr;
let col_stats = if let Some(col) = expr.as_any().downcast_ref::<Column>() {
std::mem::take(&mut stats.column_statistics[col.index()])
Expand Down Expand Up @@ -754,15 +762,6 @@ impl Projector {
}
}

impl IntoIterator for ProjectionExprs {
type Item = ProjectionExpr;
type IntoIter = std::vec::IntoIter<ProjectionExpr>;

fn into_iter(self) -> Self::IntoIter {
self.exprs.into_iter()
}
}

/// The function operates in two modes:
///
/// 1) When `sync_with_child` is `true`:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ fn handle_hash_join(
.collect();

let column_indices = build_join_column_index(plan);
let projected_indices: Vec<_> = if let Some(projection) = &plan.projection {
let projected_indices: Vec<_> = if let Some(projection) = plan.projection.as_ref() {
projection.iter().map(|&i| &column_indices[i]).collect()
} else {
column_indices.iter().collect()
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-optimizer/src/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ fn try_push_down_join_filter(
);

let new_lhs_length = lhs_rewrite.data.0.schema().fields.len();
let projections = match projections {
let projections = match projections.as_ref() {
None => match join.join_type() {
JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
// Build projections that ignore the newly projected columns.
Expand Down
40 changes: 22 additions & 18 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,11 +544,11 @@ pub struct AggregateExec {
/// Aggregation mode (full, partial)
mode: AggregateMode,
/// Group by expressions
group_by: PhysicalGroupBy,
group_by: Arc<PhysicalGroupBy>,
/// Aggregate expressions
aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
aggr_expr: Arc<[Arc<AggregateFunctionExpr>]>,
/// FILTER (WHERE clause) expression for each aggregate expression
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
filter_expr: Arc<[Option<Arc<dyn PhysicalExpr>>]>,
/// Configuration for limit-based optimizations
limit_options: Option<LimitOptions>,
/// Input plan, could be a partial aggregate or the input to the aggregate
Expand Down Expand Up @@ -582,18 +582,18 @@ impl AggregateExec {
/// Rewrites aggregate exec with new aggregate expressions.
pub fn with_new_aggr_exprs(
&self,
aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
aggr_expr: impl Into<Arc<[Arc<AggregateFunctionExpr>]>>,
) -> Self {
Self {
aggr_expr,
aggr_expr: aggr_expr.into(),
// clone the rest of the fields
required_input_ordering: self.required_input_ordering.clone(),
metrics: ExecutionPlanMetricsSet::new(),
input_order_mode: self.input_order_mode.clone(),
cache: self.cache.clone(),
mode: self.mode,
group_by: self.group_by.clone(),
filter_expr: self.filter_expr.clone(),
group_by: Arc::clone(&self.group_by),
filter_expr: Arc::clone(&self.filter_expr),
limit_options: self.limit_options,
input: Arc::clone(&self.input),
schema: Arc::clone(&self.schema),
Expand All @@ -612,9 +612,9 @@ impl AggregateExec {
input_order_mode: self.input_order_mode.clone(),
cache: self.cache.clone(),
mode: self.mode,
group_by: self.group_by.clone(),
aggr_expr: self.aggr_expr.clone(),
filter_expr: self.filter_expr.clone(),
group_by: Arc::clone(&self.group_by),
aggr_expr: Arc::clone(&self.aggr_expr),
filter_expr: Arc::clone(&self.filter_expr),
input: Arc::clone(&self.input),
schema: Arc::clone(&self.schema),
input_schema: Arc::clone(&self.input_schema),
Expand All @@ -629,12 +629,13 @@ impl AggregateExec {
/// Create a new hash aggregate execution plan
pub fn try_new(
mode: AggregateMode,
group_by: PhysicalGroupBy,
group_by: impl Into<Arc<PhysicalGroupBy>>,
aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
) -> Result<Self> {
let group_by = group_by.into();
let schema = create_schema(&input.schema(), &group_by, &aggr_expr, mode)?;

let schema = Arc::new(schema);
Expand All @@ -659,13 +660,16 @@ impl AggregateExec {
/// the schema in such cases.
fn try_new_with_schema(
mode: AggregateMode,
group_by: PhysicalGroupBy,
group_by: impl Into<Arc<PhysicalGroupBy>>,
mut aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
filter_expr: impl Into<Arc<[Option<Arc<dyn PhysicalExpr>>]>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
schema: SchemaRef,
) -> Result<Self> {
let group_by = group_by.into();
let filter_expr = filter_expr.into();

// Make sure arguments are consistent in size
assert_eq_or_internal_err!(
aggr_expr.len(),
Expand Down Expand Up @@ -732,13 +736,13 @@ impl AggregateExec {
&group_expr_mapping,
&mode,
&input_order_mode,
aggr_expr.as_slice(),
aggr_expr.as_ref(),
)?;

let mut exec = AggregateExec {
mode,
group_by,
aggr_expr,
aggr_expr: aggr_expr.into(),
filter_expr,
input,
schema,
Expand Down Expand Up @@ -1287,9 +1291,9 @@ impl ExecutionPlan for AggregateExec {
) -> Result<Arc<dyn ExecutionPlan>> {
let mut me = AggregateExec::try_new_with_schema(
self.mode,
self.group_by.clone(),
self.aggr_expr.clone(),
self.filter_expr.clone(),
Arc::clone(&self.group_by),
self.aggr_expr.to_vec(),
Arc::clone(&self.filter_expr),
Arc::clone(&children[0]),
Arc::clone(&self.input_schema),
Arc::clone(&self.schema),
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-plan/src/aggregates/no_grouping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ struct AggregateStreamInner {
mode: AggregateMode,
input: SendableRecordBatchStream,
aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
filter_expressions: Arc<[Option<Arc<dyn PhysicalExpr>>]>,

// ==== Runtime States/Buffers ====
accumulators: Vec<AccumulatorItem>,
Expand Down Expand Up @@ -276,7 +276,7 @@ impl AggregateStream {
partition: usize,
) -> Result<Self> {
let agg_schema = Arc::clone(&agg.schema);
let agg_filter_expr = agg.filter_expr.clone();
let agg_filter_expr = Arc::clone(&agg.filter_expr);

let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition);
let input = agg.input.execute(partition, Arc::clone(context))?;
Expand All @@ -287,7 +287,7 @@ impl AggregateStream {
| AggregateMode::Single
| AggregateMode::SinglePartitioned => agg_filter_expr,
AggregateMode::Final | AggregateMode::FinalPartitioned => {
vec![None; agg.aggr_expr.len()]
vec![None; agg.aggr_expr.len()].into()
}
};
let accumulators = create_accumulators(&agg.aggr_expr)?;
Expand Down
12 changes: 6 additions & 6 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,10 +377,10 @@ pub(crate) struct GroupedHashAggregateStream {
///
/// For example, for an aggregate like `SUM(x) FILTER (WHERE x >= 100)`,
/// the filter expression is `x > 100`.
filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
filter_expressions: Arc<[Option<Arc<dyn PhysicalExpr>>]>,

/// GROUP BY expressions
group_by: PhysicalGroupBy,
group_by: Arc<PhysicalGroupBy>,

/// max rows in output RecordBatches
batch_size: usize,
Expand Down Expand Up @@ -465,8 +465,8 @@ impl GroupedHashAggregateStream {
) -> Result<Self> {
debug!("Creating GroupedHashAggregateStream");
let agg_schema = Arc::clone(&agg.schema);
let agg_group_by = agg.group_by.clone();
let agg_filter_expr = agg.filter_expr.clone();
let agg_group_by = Arc::clone(&agg.group_by);
let agg_filter_expr = Arc::clone(&agg.filter_expr);

let batch_size = context.session_config().batch_size();
let input = agg.input.execute(partition, Arc::clone(context))?;
Expand All @@ -475,7 +475,7 @@ impl GroupedHashAggregateStream {

let timer = baseline_metrics.elapsed_compute().timer();

let aggregate_exprs = agg.aggr_expr.clone();
let aggregate_exprs = Arc::clone(&agg.aggr_expr);

// arguments for each aggregate, one vec of expressions per
// aggregate
Expand All @@ -496,7 +496,7 @@ impl GroupedHashAggregateStream {
| AggregateMode::Single
| AggregateMode::SinglePartitioned => agg_filter_expr,
AggregateMode::Final | AggregateMode::FinalPartitioned => {
vec![None; agg.aggr_expr.len()]
vec![None; agg.aggr_expr.len()].into()
}
};

Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/aggregates/topk_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub struct GroupedTopKAggregateStream {
baseline_metrics: BaselineMetrics,
group_by_metrics: GroupByMetrics,
aggregate_arguments: Vec<Vec<Arc<dyn PhysicalExpr>>>,
group_by: PhysicalGroupBy,
group_by: Arc<PhysicalGroupBy>,
priority_map: PriorityMap,
}

Expand All @@ -62,7 +62,7 @@ impl GroupedTopKAggregateStream {
limit: usize,
) -> Result<Self> {
let agg_schema = Arc::clone(&aggr.schema);
let group_by = aggr.group_by.clone();
let group_by = Arc::clone(&aggr.group_by);
let input = aggr.input.execute(partition, Arc::clone(context))?;
let baseline_metrics = BaselineMetrics::new(&aggr.metrics, partition);
let group_by_metrics = GroupByMetrics::new(&aggr.metrics, partition);
Expand Down
Loading