diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 8182e4fd47d4..60ff1f4b2ed4 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -3246,6 +3246,8 @@ impl ScalarValue { /// Retrieve ScalarValue for each row in `array` /// + /// Elements in `array` may be NULL, in which case the corresponding element in the returned vector is None. + /// /// Example 1: Array (ScalarValue::Int32) /// ``` /// use datafusion_common::ScalarValue; @@ -3262,15 +3264,15 @@ impl ScalarValue { /// let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&list_arr).unwrap(); /// /// let expected = vec![ - /// vec![ - /// ScalarValue::Int32(Some(1)), - /// ScalarValue::Int32(Some(2)), - /// ScalarValue::Int32(Some(3)), - /// ], - /// vec![ - /// ScalarValue::Int32(Some(4)), - /// ScalarValue::Int32(Some(5)), - /// ], + /// Some(vec![ + /// ScalarValue::Int32(Some(1)), + /// ScalarValue::Int32(Some(2)), + /// ScalarValue::Int32(Some(3)), + /// ]), + /// Some(vec![ + /// ScalarValue::Int32(Some(4)), + /// ScalarValue::Int32(Some(5)), + /// ]), /// ]; /// /// assert_eq!(scalar_vec, expected); @@ -3303,28 +3305,62 @@ impl ScalarValue { /// ]); /// /// let expected = vec![ - /// vec![ + /// Some(vec![ /// ScalarValue::List(Arc::new(l1)), /// ScalarValue::List(Arc::new(l2)), - /// ], + /// ]), + /// ]; + /// + /// assert_eq!(scalar_vec, expected); + /// ``` + /// + /// Example 3: Nullable array + /// ``` + /// use datafusion_common::ScalarValue; + /// use arrow::array::ListArray; + /// use arrow::datatypes::{DataType, Int32Type}; + /// + /// let list_arr = ListArray::from_iter_primitive::(vec![ + /// Some(vec![Some(1), Some(2), Some(3)]), + /// None, + /// Some(vec![Some(4), Some(5)]) + /// ]); + /// + /// // Convert the array into Scalar Values for each row + /// let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&list_arr).unwrap(); + /// + /// let expected = vec![ + /// Some(vec![ + /// ScalarValue::Int32(Some(1)), + /// ScalarValue::Int32(Some(2)), + /// ScalarValue::Int32(Some(3)), + /// ]), + /// None, + /// Some(vec![ + /// ScalarValue::Int32(Some(4)), + /// ScalarValue::Int32(Some(5)), + /// ]), /// ]; /// /// assert_eq!(scalar_vec, expected); /// ``` - pub fn convert_array_to_scalar_vec(array: &dyn Array) -> Result>> { + pub fn convert_array_to_scalar_vec( + array: &dyn Array, + ) -> Result>>> { fn generic_collect( array: &dyn Array, - ) -> Result>> { + ) -> Result>>> { array .as_list::() .iter() - .map(|nested_array| match nested_array { - Some(nested_array) => (0..nested_array.len()) - .map(|i| ScalarValue::try_from_array(&nested_array, i)) - .collect::>>(), - // TODO: what can we put for null? - // https://github.com/apache/datafusion/issues/17749 - None => Ok(vec![]), + .map(|nested_array| { + nested_array + .map(|array| { + (0..array.len()) + .map(|i| ScalarValue::try_from_array(&array, i)) + .collect::>>() + }) + .transpose() }) .collect() } @@ -9021,7 +9057,7 @@ mod tests { #[test] fn test_convert_array_to_scalar_vec() { - // Regular ListArray + // 1: Regular ListArray let list = ListArray::from_iter_primitive::(vec![ Some(vec![Some(1), Some(2)]), None, @@ -9031,17 +9067,20 @@ mod tests { assert_eq!( converted, vec![ - vec![ScalarValue::Int64(Some(1)), ScalarValue::Int64(Some(2))], - vec![], - vec![ + Some(vec![ + ScalarValue::Int64(Some(1)), + ScalarValue::Int64(Some(2)) + ]), + None, + Some(vec![ ScalarValue::Int64(Some(3)), ScalarValue::Int64(None), ScalarValue::Int64(Some(4)) - ], + ]), ] ); - // Regular LargeListArray + // 2: Regular LargeListArray let large_list = LargeListArray::from_iter_primitive::(vec![ Some(vec![Some(1), Some(2)]), None, @@ -9051,17 +9090,20 @@ mod tests { assert_eq!( converted, vec![ - vec![ScalarValue::Int64(Some(1)), ScalarValue::Int64(Some(2))], - vec![], - vec![ + Some(vec![ + ScalarValue::Int64(Some(1)), + ScalarValue::Int64(Some(2)) + ]), + None, + Some(vec![ ScalarValue::Int64(Some(3)), ScalarValue::Int64(None), ScalarValue::Int64(Some(4)) - ], + ]), ] ); - // Funky (null slot has non-zero list offsets) + // 3: Funky (null slot has non-zero list offsets) // Offsets + Values looks like this: [[1, 2], [3, 4], [5]] // But with NullBuffer it's like this: [[1, 2], NULL, [5]] let funky = ListArray::new( @@ -9074,9 +9116,63 @@ mod tests { assert_eq!( converted, vec![ - vec![ScalarValue::Int64(Some(1)), ScalarValue::Int64(Some(2))], - vec![], - vec![ScalarValue::Int64(Some(5))], + Some(vec![ + ScalarValue::Int64(Some(1)), + ScalarValue::Int64(Some(2)) + ]), + None, + Some(vec![ScalarValue::Int64(Some(5))]), + ] + ); + + // 4: Offsets + Values looks like this: [[1, 2], [], [5]] + // But with NullBuffer it's like this: [[1, 2], NULL, [5]] + // The converted result is: [[1, 2], None, [5]] + let array4 = ListArray::new( + Field::new_list_field(DataType::Int64, true).into(), + OffsetBuffer::new(vec![0, 2, 2, 5].into()), + Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5, 6])), + Some(NullBuffer::from(vec![true, false, true])), + ); + let converted = ScalarValue::convert_array_to_scalar_vec(&array4).unwrap(); + assert_eq!( + converted, + vec![ + Some(vec![ + ScalarValue::Int64(Some(1)), + ScalarValue::Int64(Some(2)) + ]), + None, + Some(vec![ + ScalarValue::Int64(Some(3)), + ScalarValue::Int64(Some(4)), + ScalarValue::Int64(Some(5)), + ]), + ] + ); + + // 5: Offsets + Values looks like this: [[1, 2], [], [5]] + // Same as 4, but the middle array is not null, so after conversion it's empty. + let array5 = ListArray::new( + Field::new_list_field(DataType::Int64, true).into(), + OffsetBuffer::new(vec![0, 2, 2, 5].into()), + Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5, 6])), + Some(NullBuffer::from(vec![true, true, true])), + ); + let converted = ScalarValue::convert_array_to_scalar_vec(&array5).unwrap(); + assert_eq!( + converted, + vec![ + Some(vec![ + ScalarValue::Int64(Some(1)), + ScalarValue::Int64(Some(2)) + ]), + Some(vec![]), + Some(vec![ + ScalarValue::Int64(Some(3)), + ScalarValue::Int64(Some(4)), + ScalarValue::Int64(Some(5)), + ]), ] ); } diff --git a/datafusion/core/tests/sql/aggregates/basic.rs b/datafusion/core/tests/sql/aggregates/basic.rs index c6ed260e714e..4b421b5294e0 100644 --- a/datafusion/core/tests/sql/aggregates/basic.rs +++ b/datafusion/core/tests/sql/aggregates/basic.rs @@ -48,7 +48,7 @@ async fn csv_query_array_agg_distinct() -> Result<()> { let column = actual[0].column(0); assert_eq!(column.len(), 1); let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&column)?; - let mut scalars = scalar_vec[0].clone(); + let mut scalars = scalar_vec[0].as_ref().unwrap().clone(); // workaround lack of Ord of ScalarValue let cmp = |a: &ScalarValue, b: &ScalarValue| { diff --git a/datafusion/functions-aggregate-common/src/merge_arrays.rs b/datafusion/functions-aggregate-common/src/merge_arrays.rs index c6989bc010bd..bdf1490417be 100644 --- a/datafusion/functions-aggregate-common/src/merge_arrays.rs +++ b/datafusion/functions-aggregate-common/src/merge_arrays.rs @@ -87,7 +87,7 @@ impl PartialOrd for CustomElement<'_> { /// This functions merges `values` array (`&[Vec]`) into single array `Vec` /// Merging done according to ordering values stored inside `ordering_values` (`&[Vec>]`) -/// Inner `Vec` in the `ordering_values` can be thought as ordering information for the +/// Inner `Vec` in the `ordering_values` can be thought as ordering information for /// each `ScalarValue` in the `values` array. /// Desired ordering specified by `sort_options` argument (Should have same size with inner `Vec` /// of the `ordering_values` array). @@ -119,17 +119,27 @@ pub fn merge_ordered_arrays( // Defines according to which ordering comparisons should be done. sort_options: &[SortOptions], ) -> datafusion_common::Result<(Vec, Vec>)> { - // Keep track the most recent data of each branch, in binary heap data structure. + // Keep track of the most recent data of each branch, in a binary heap data structure. let mut heap = BinaryHeap::::new(); - if values.len() != ordering_values.len() - || values - .iter() - .zip(ordering_values.iter()) - .any(|(vals, ordering_vals)| vals.len() != ordering_vals.len()) + if values.len() != ordering_values.len() { + return exec_err!( + "Expects values and ordering_values to have same size but got {} and {}", + values.len(), + ordering_values.len() + ); + } + if let Some((idx, (values, ordering_values))) = values + .iter() + .zip(ordering_values.iter()) + .enumerate() + .find(|(_, (vals, ordering_vals))| vals.len() != ordering_vals.len()) { return exec_err!( - "Expects values arguments and/or ordering_values arguments to have same size" + "Expects values elements and ordering_values elements to have same size but got {} and {} at index {}", + values.len(), + ordering_values.len(), + idx ); } let n_branch = values.len(); diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 268349ecf1b6..4d8676f24a28 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -687,13 +687,16 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { // Convert array to Scalars to sort them easily. Convert back to array at evaluation. let array_agg_res = ScalarValue::convert_array_to_scalar_vec(array_agg_values)?; - for v in array_agg_res.into_iter() { - partition_values.push(v.into()); + for maybe_v in array_agg_res.into_iter() { + if let Some(v) = maybe_v { + partition_values.push(v.into()); + } else { + partition_values.push(vec![].into()); + } } let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?; - - for partition_ordering_rows in orderings.into_iter() { + for partition_ordering_rows in orderings.into_iter().flatten() { // Extract value from struct to ordering_rows for each group/partition let ordering_value = partition_ordering_rows.into_iter().map(|ordering_row| { if let ScalarValue::Struct(s) = ordering_row { diff --git a/datafusion/functions-aggregate/src/nth_value.rs b/datafusion/functions-aggregate/src/nth_value.rs index c3f63134bea6..b9dc498ee746 100644 --- a/datafusion/functions-aggregate/src/nth_value.rs +++ b/datafusion/functions-aggregate/src/nth_value.rs @@ -267,7 +267,7 @@ impl Accumulator for TrivialNthValueAccumulator { // First entry in the state is the aggregation result. let n_required = self.n.unsigned_abs() as usize; let array_agg_res = ScalarValue::convert_array_to_scalar_vec(&states[0])?; - for v in array_agg_res.into_iter() { + for v in array_agg_res.into_iter().flatten() { self.values.extend(v); if self.values.len() > n_required { // There is enough data collected, can stop merging: @@ -457,14 +457,14 @@ impl Accumulator for NthValueAccumulator { let mut partition_values = vec![self.values.clone()]; // First entry in the state is the aggregation result. let array_agg_res = ScalarValue::convert_array_to_scalar_vec(&states[0])?; - for v in array_agg_res.into_iter() { + for v in array_agg_res.into_iter().flatten() { partition_values.push(v.into()); } // Stores ordering requirement expression results coming from each partition: let mut partition_ordering_values = vec![self.ordering_values.clone()]; let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?; // Extract value from struct to ordering_rows for each group/partition: - for partition_ordering_rows in orderings.into_iter() { + for partition_ordering_rows in orderings.into_iter().flatten() { let ordering_values = partition_ordering_rows.into_iter().map(|ordering_row| { let ScalarValue::Struct(s_array) = ordering_row else { return exec_err!( diff --git a/datafusion/functions-nested/src/array_has.rs b/datafusion/functions-nested/src/array_has.rs index 43aa5f4ae60d..f34fea0c4ba0 100644 --- a/datafusion/functions-nested/src/array_has.rs +++ b/datafusion/functions-nested/src/array_has.rs @@ -144,8 +144,11 @@ impl ScalarUDFImpl for ArrayHas { assert_eq!(scalar_values.len(), 1); let list = scalar_values .into_iter() + // If the vec is a singular null, `list` will be empty due to this flatten(). + // It would be more clear if we handled the None separately, but this is more performant. .flatten() - .map(|v| Expr::Literal(v, None)) + .flatten() + .map(|v| Expr::Literal(v.clone(), None)) .collect(); return Ok(ExprSimplifyResult::Simplified(in_list(