From b34c1ef8e22f5d4065b6eb98fc6d2bba07c561ce Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Mon, 24 Jun 2024 10:21:26 +0530 Subject: [PATCH 1/2] add eval_mode to proto --- core/src/execution/proto/expr.proto | 1 + .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 6 +++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/execution/proto/expr.proto b/core/src/execution/proto/expr.proto index 6b66a307a..0e37d78be 100644 --- a/core/src/execution/proto/expr.proto +++ b/core/src/execution/proto/expr.proto @@ -219,6 +219,7 @@ message Subtract { Expr right = 2; bool fail_on_error = 3; DataType return_type = 4; + EvalMode eval_mode = 5; } message Multiply { diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 1e61ef75e..bd503179d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -736,9 +736,13 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.Subtract.newBuilder() + val failOnErr = getFailOnError(sub) + val evalModeStr = + if (failOnErr) ExprOuterClass.EvalMode.ANSI else ExprOuterClass.EvalMode.LEGACY builder.setLeft(leftExpr.get) builder.setRight(rightExpr.get) - builder.setFailOnError(getFailOnError(sub)) + builder.setFailOnError(failOnErr) + builder.setEvalMode(evalModeStr) serializeDataType(sub.dataType).foreach { t => builder.setReturnType(t) } From b52af0d37d6f23c5915a92250662e6bc10440599 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Mon, 24 Jun 2024 19:31:59 +0530 Subject: [PATCH 2/2] add comet binary expr --- .../datafusion/expressions/binary.rs | 114 ++++++++++++++++++ .../execution/datafusion/expressions/mod.rs | 1 + core/src/execution/datafusion/planner.rs | 9 +- 3 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 core/src/execution/datafusion/expressions/binary.rs diff --git a/core/src/execution/datafusion/expressions/binary.rs b/core/src/execution/datafusion/expressions/binary.rs new file mode 100644 index 000000000..7aa6330c0 --- /dev/null +++ b/core/src/execution/datafusion/expressions/binary.rs @@ -0,0 +1,114 @@ +use std::any::Any; +use std::fmt::{Display, Formatter}; +use std::hash::Hasher; +use std::sync::Arc; + +use arrow_array::{BooleanArray, RecordBatch}; +use arrow_schema::{DataType, Schema}; +use datafusion_common::Result; +use datafusion_expr::{ColumnarValue, Operator}; +use datafusion_expr::interval_arithmetic::Interval; +use datafusion_expr::sort_properties::ExprProperties; +use datafusion_physical_expr::expressions::BinaryExpr; +use datafusion_physical_expr_common::physical_expr::{down_cast_any_ref, PhysicalExpr}; + +use crate::execution::datafusion::expressions::EvalMode; + +#[derive(Debug, Hash, Clone)] +pub struct CometBinaryExpr { + left: Arc, + op: Operator, + right: Arc, + eval_mode: EvalMode, + inner: Arc, +} + +impl CometBinaryExpr { + pub fn new( + left: Arc, + op: Operator, + right: Arc, + eval_mode: EvalMode, + ) -> Self { + Self { + left: Arc::clone(&left), + op, + right: Arc::clone(&right), + eval_mode, + inner: Arc::new(BinaryExpr::new(left, op, right)), + } + } +} + +impl Display for CometBinaryExpr { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + self.inner.fmt(f) + } +} + +impl PhysicalExpr for CometBinaryExpr { + fn as_any(&self) -> &dyn Any { + self.inner.as_any() + } + + fn data_type(&self, input_schema: &Schema) -> Result { + self.inner.data_type(input_schema) + } + + fn nullable(&self, input_schema: &Schema) -> Result { + self.inner.nullable(input_schema) + } + + fn evaluate(&self, batch: &RecordBatch) -> Result { + // TODO: Do some work here + self.inner.evaluate(batch) + } + + fn evaluate_selection( + &self, + batch: &RecordBatch, + selection: &BooleanArray, + ) -> Result { + self.inner.evaluate_selection(batch, selection) + } + + fn children(&self) -> Vec<&Arc> { + self.inner.children() + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Arc::clone(&self.inner).with_new_children(children) + } + + fn evaluate_bounds(&self, children: &[&Interval]) -> Result { + self.inner.evaluate_bounds(children) + } + + fn propagate_constraints( + &self, + interval: &Interval, + children: &[&Interval], + ) -> Result>> { + self.inner.propagate_constraints(interval, children) + } + + fn dyn_hash(&self, state: &mut dyn Hasher) { + self.inner.dyn_hash(state) + } + + fn get_properties(&self, children: &[ExprProperties]) -> Result { + self.inner.get_properties(children) + } +} + +impl PartialEq for CometBinaryExpr { + fn eq(&self, other: &dyn Any) -> bool { + down_cast_any_ref(other) + .downcast_ref::() + .map(|x| self.left.eq(&x.left) && self.op == x.op && self.right.eq(&x.right)) + .unwrap_or(false) + } +} diff --git a/core/src/execution/datafusion/expressions/mod.rs b/core/src/execution/datafusion/expressions/mod.rs index 5d5f58e0c..0e2996944 100644 --- a/core/src/execution/datafusion/expressions/mod.rs +++ b/core/src/execution/datafusion/expressions/mod.rs @@ -30,6 +30,7 @@ use crate::{errors::CometError, execution::spark_expression}; pub mod abs; pub mod avg; pub mod avg_decimal; +pub mod binary; pub mod bloom_filter_might_contain; pub mod correlation; pub mod covariance; diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs index afdebce32..6a50abeb1 100644 --- a/core/src/execution/datafusion/planner.rs +++ b/core/src/execution/datafusion/planner.rs @@ -56,6 +56,7 @@ use itertools::Itertools; use jni::objects::GlobalRef; use num::{BigInt, ToPrimitive}; +use crate::execution::datafusion::expressions::binary::CometBinaryExpr; use crate::{ errors::ExpressionError, execution::{ @@ -166,6 +167,7 @@ impl PhysicalPlanner { expr.return_type.as_ref(), DataFusionOperator::Plus, input_schema, + EvalMode::Legacy, ), ExprStruct::Subtract(expr) => self.create_binary_expr( expr.left.as_ref().unwrap(), @@ -173,6 +175,7 @@ impl PhysicalPlanner { expr.return_type.as_ref(), DataFusionOperator::Minus, input_schema, + EvalMode::try_from(expr.eval_mode)?, ), ExprStruct::Multiply(expr) => self.create_binary_expr( expr.left.as_ref().unwrap(), @@ -180,6 +183,7 @@ impl PhysicalPlanner { expr.return_type.as_ref(), DataFusionOperator::Multiply, input_schema, + EvalMode::Legacy, ), ExprStruct::Divide(expr) => self.create_binary_expr( expr.left.as_ref().unwrap(), @@ -187,6 +191,7 @@ impl PhysicalPlanner { expr.return_type.as_ref(), DataFusionOperator::Divide, input_schema, + EvalMode::Legacy, ), ExprStruct::Remainder(expr) => self.create_binary_expr( expr.left.as_ref().unwrap(), @@ -194,6 +199,7 @@ impl PhysicalPlanner { expr.return_type.as_ref(), DataFusionOperator::Modulo, input_schema, + EvalMode::Legacy, ), ExprStruct::Eq(expr) => { let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?; @@ -627,6 +633,7 @@ impl PhysicalPlanner { return_type: Option<&spark_expression::DataType>, op: DataFusionOperator, input_schema: SchemaRef, + eval_mode: EvalMode, ) -> Result, ExecutionError> { let left = self.create_expr(left, input_schema.clone())?; let right = self.create_expr(right, input_schema.clone())?; @@ -681,7 +688,7 @@ impl PhysicalPlanner { data_type, ))) } - _ => Ok(Arc::new(BinaryExpr::new(left, op, right))), + _ => Ok(Arc::new(CometBinaryExpr::new(left, op, right, eval_mode))), } }