diff --git a/CMakeLists.txt b/CMakeLists.txt index 0080859eac5b..4bd083ff6d4d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -140,7 +140,7 @@ option(VELOX_ENABLE_HIVE_CONNECTOR "Build Hive connector." ON) option(VELOX_ENABLE_TPCH_CONNECTOR "Build TPC-H connector." ON) option(VELOX_ENABLE_TPCDS_CONNECTOR "Build TPC-DS connector." ON) option(VELOX_ENABLE_PRESTO_FUNCTIONS "Build Presto SQL functions." ON) -option(VELOX_ENABLE_SPARK_FUNCTIONS "Build Spark SQL functions." ON) +option(VELOX_ENABLE_SPARK_FUNCTIONS "Build Spark SQL functions." OFF) option(VELOX_ENABLE_ICEBERG_FUNCTIONS "Build Iceberg functions." ON) option(VELOX_ENABLE_EXPRESSION "Build expression." ON) option( diff --git a/velox/core/Expressions.h b/velox/core/Expressions.h index 4d406b347b74..3f71434933cd 100644 --- a/velox/core/Expressions.h +++ b/velox/core/Expressions.h @@ -273,6 +273,15 @@ class FieldAccessTypedExpr : public ITypedExpr { const std::string& name() const { return name_; } +// +// void setName(const std::string& newName) { +// name_ = newName; +// } +// +// void updateNewType(const std::string& newName, TypePtr newType) { +// name_ = newName; +// type_= newType; +// } TypedExprPtr rewriteInputNames( const std::unordered_map& mapping) diff --git a/velox/core/ITypedExpr.h b/velox/core/ITypedExpr.h index bcdf55f233f0..c9e1e6345040 100644 --- a/velox/core/ITypedExpr.h +++ b/velox/core/ITypedExpr.h @@ -52,10 +52,18 @@ class ITypedExpr : public ISerializable { ExprKind kind() const { return kind_; } + // + // virtual const std::string& name() const { + // VELOX_UNSUPPORTED("name() is not supported for this expression"); + // } const TypePtr& type() const { return type_; } + // + // void setType(const TypePtr& newType) { + // type_.reset(newType.get()); + // } const std::vector& inputs() const { return inputs_; diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index 88b60dbb61f8..ae98277f12d7 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -497,6 +497,14 @@ PlanNodePtr AggregationNode::create(const folly::dynamic& obj, void* context) { deserializeSingleSource(obj, context)); } +PlanNodePtr AggregationNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 1); + Builder builder(*this); + builder.source(newSources[0]); + return builder.build(); +} + namespace { RowTypePtr getExpandOutputType( const std::vector>& projections, @@ -602,6 +610,14 @@ PlanNodePtr ExpandNode::create(const folly::dynamic& obj, void* context) { std::move(source)); } +PlanNodePtr ExpandNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 1); + Builder builder(*this); + builder.source(newSources[0]); + return builder.build(); +} + namespace { RowTypePtr getGroupIdOutputType( const std::vector& groupingKeyInfos, @@ -719,6 +735,14 @@ PlanNodePtr GroupIdNode::create(const folly::dynamic& obj, void* context) { std::move(source)); } +PlanNodePtr GroupIdNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 1); + Builder builder(*this); + builder.source(newSources[0]); + return builder.build(); +} + const std::vector& ValuesNode::sources() const { return kEmptySources; } @@ -811,6 +835,34 @@ void AbstractProjectNode::addDetails(std::stringstream& stream) const { << projection->toString() << ")"; } } +// +// void AbstractProjectNode::updateNewTypes( +// const std::map>& newTypes) { +// // Update output type first as it may contain new column names. +// PlanNode::updateNewTypes(newTypes); +// +// // Update projections +// for (auto& projection : projections_) { +// if (auto field = TypedExprs::asFieldAccess(projection)) { +// auto it = newTypes.find(field->name()); +// if (it != newTypes.end()) { +// auto mutableField = +// std::const_pointer_cast(field); +// mutableField->setName(it->second.first); +// mutableField->setType(it->second.second); +// } +// } +// } +// +// // Update names +// for (auto i = 0; i < names_.size(); i++) { +// auto& name = names_[i]; +// auto it = newTypes.find(name); +// if (it != newTypes.end()) { +// setName(i, it->second.first); +// } +// } +//} namespace { @@ -1132,6 +1184,14 @@ PlanNodePtr ProjectNode::create(const folly::dynamic& obj, void* context) { std::move(source)); } +PlanNodePtr ProjectNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 1); + Builder builder(*this); + builder.source(newSources[0]); + return builder.build(); +} + namespace { // makes a list of all names for use in the ProjectNode. std::vector allNames( @@ -1155,8 +1215,9 @@ std::vector flattenExprs( const auto& sourceType = input->outputType(); for (auto& name : moreNames) { - result.push_back(std::make_shared( - sourceType->findChild(name), name)); + result.push_back( + std::make_shared( + sourceType->findChild(name), name)); } return result; } @@ -1231,6 +1292,15 @@ PlanNodePtr ParallelProjectNode::create( std::move(source)); } +PlanNodePtr ParallelProjectNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 1, "LazyDereferenceNode is unary"); + + Builder builder(*this); + builder.source(newSources[0]); + return builder.build(); +} + // static PlanNodePtr LazyDereferenceNode::create( const folly::dynamic& obj, @@ -1247,6 +1317,15 @@ PlanNodePtr LazyDereferenceNode::create( std::move(source)); } +PlanNodePtr LazyDereferenceNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 1, "LazyDereferenceNode is unary"); + + ProjectNode::Builder builder(*this); + builder.source(newSources[0]); + return builder.build(); +} + const std::vector& TableScanNode::sources() const { return kEmptySources; } @@ -1303,6 +1382,14 @@ PlanNodePtr TableScanNode::create(const folly::dynamic& obj, void* context) { planNodeId, outputType, tableHandle, assignments); } +// PlanNodePtr TableScanNode::copyWithNewSources( +// std::vector newSources) const { +// VELOX_CHECK_EQ(newSources.size(), 1, "LazyDereferenceNode is unary"); +// +// Builder builder(*this); +// return builder.build(); +// } + const std::vector& ArrowStreamNode::sources() const { return kEmptySources; } @@ -1454,6 +1541,14 @@ PlanNodePtr UnnestNode::create(const folly::dynamic& obj, void* context) { std::move(source)); } +PlanNodePtr UnnestNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 1); + Builder builder(*this); + builder.source(newSources[0]); + return builder.build(); +} + AbstractJoinNode::AbstractJoinNode( const PlanNodeId& id, JoinType joinType, @@ -1471,6 +1566,13 @@ AbstractJoinNode::AbstractJoinNode( sources_({std::move(left), std::move(right)}), outputType_(std::move(outputType)) {} +namespace { +bool isIntegral(const TypePtr& type) { + return type->isBigint() || type->isInteger() || type->isSmallint() || + type->isTinyint(); +} +} // namespace + void AbstractJoinNode::validate() const { VELOX_CHECK(!leftKeys_.empty(), "JoinNode requires at least one join key"); VELOX_CHECK_EQ( @@ -1492,9 +1594,15 @@ void AbstractJoinNode::validate() const { key->name()); } for (auto i = 0; i < leftKeys_.size(); ++i) { + auto leftType = leftKeys_[i]->type(); + auto rightType = rightKeys_[i]->type(); + if (isIntegral(leftType) && isIntegral(rightType)) { + continue; + } + VELOX_CHECK_EQ( - leftKeys_[i]->type()->kind(), - rightKeys_[i]->type()->kind(), + leftType, + rightType, "Join key types on the left and right sides must match"); } @@ -1540,6 +1648,28 @@ void AbstractJoinNode::validate() const { } } +// void AbstractJoinNode::updateNewTypes( +// const std::map>& newTypes) { +// // Update output type first as it may contain new column names. +// PlanNode::updateNewTypes(newTypes); +// +// // Update join keys. +// for (auto& key : leftKeys_) { +// auto iter = newTypes.find(key->name()); +// if (iter != newTypes.end()) { +// auto mutableKey = std::const_pointer_cast(key); +// mutableKey->updateNewType(iter->second.first, iter->second.second); +// } +// } +// for (auto& key : rightKeys_) { +// auto iter = newTypes.find(key->name()); +// if (iter != newTypes.end()) { +// auto mutableKey = std::const_pointer_cast(key); +// mutableKey->updateNewType(iter->second.first, iter->second.second); +// } +// } +// } + void AbstractJoinNode::addDetails(std::stringstream& stream) const { stream << JoinTypeName::toName(joinType_) << " "; @@ -1655,6 +1785,40 @@ PlanNodePtr HashJoinNode::create(const folly::dynamic& obj, void* context) { outputType); } +namespace { +std::vector getKeysFromSource( + const std::vector& keys, + const PlanNodePtr& source) { + std::vector result; + auto sourceType = source->outputType(); + for (const auto& key : keys) { + auto keyName = key->name(); + auto keyType = sourceType->findChild(keyName); + VELOX_CHECK_NOT_NULL( + keyType, "Join key not found in source output: {}", keyName); + result.push_back(std::make_shared(keyType, keyName)); + } + return result; +} +} // namespace + +PlanNodePtr HashJoinNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 2); + + std::vector leftKeys = + getKeysFromSource(leftKeys_, newSources[0]); + std::vector rightKeys = + getKeysFromSource(rightKeys_, newSources[1]); + + Builder builder(*this); + builder.left(std::move(newSources[0])) + .right(std::move(newSources[1])) + .leftKeys(std::move(leftKeys)) + .rightKeys(std::move(rightKeys)); + return builder.build(); +} + MergeJoinNode::MergeJoinNode( const PlanNodeId& id, JoinType joinType, @@ -1733,6 +1897,23 @@ PlanNodePtr MergeJoinNode::create(const folly::dynamic& obj, void* context) { outputType); } +PlanNodePtr MergeJoinNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 2); + + std::vector leftKeys = + getKeysFromSource(leftKeys_, newSources[0]); + std::vector rightKeys = + getKeysFromSource(rightKeys_, newSources[1]); + + Builder builder(*this); + builder.left(std::move(newSources[0])) + .right(std::move(newSources[1])) + .leftKeys(std::move(leftKeys)) + .rightKeys(std::move(rightKeys)); + return builder.build(); +} + IndexLookupJoinNode::IndexLookupJoinNode( const PlanNodeId& id, JoinType joinType, @@ -1857,6 +2038,23 @@ PlanNodePtr IndexLookupJoinNode::create( std::move(outputType)); } +PlanNodePtr IndexLookupJoinNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 2); + + std::vector leftKeys = + getKeysFromSource(leftKeys_, newSources[0]); + std::vector rightKeys = + getKeysFromSource(rightKeys_, newSources[1]); + + Builder builder(*this); + builder.left(std::move(newSources[0])) + .right(std::move(newSources[1])) + .leftKeys(std::move(leftKeys)) + .rightKeys(std::move(rightKeys)); + return builder.build(); +} + folly::dynamic IndexLookupJoinNode::serialize() const { auto obj = serializeBase(); if (!joinConditions_.empty()) { @@ -2028,6 +2226,15 @@ PlanNodePtr NestedLoopJoinNode::create( outputType); } +PlanNodePtr NestedLoopJoinNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 2); + + Builder builder(*this); + builder.left(std::move(newSources[0])).right(std::move(newSources[1])); + return builder.build(); +} + AssignUniqueIdNode::AssignUniqueIdNode( const PlanNodeId& id, const std::string& idName, @@ -2073,6 +2280,14 @@ PlanNodePtr AssignUniqueIdNode::create( std::move(source)); } +PlanNodePtr AssignUniqueIdNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 1); + Builder builder(*this); + builder.source(newSources[0]); + return builder.build(); +} + namespace { RowTypePtr getWindowOutputType( const RowTypePtr& inputType, @@ -2340,6 +2555,14 @@ PlanNodePtr WindowNode::create(const folly::dynamic& obj, void* context) { source); } +PlanNodePtr WindowNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 1); + Builder builder(*this); + builder.source(newSources[0]); + return builder.build(); +} + RowTypePtr getMarkDistinctOutputType( const RowTypePtr& inputType, const std::string& markerName) { @@ -2389,6 +2612,14 @@ PlanNodePtr MarkDistinctNode::create(const folly::dynamic& obj, void* context) { deserializePlanNodeId(obj), markerName, distinctKeys, source); } +PlanNodePtr MarkDistinctNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 1); + Builder builder(*this); + builder.source(newSources[0]); + return builder.build(); +} + namespace { RowTypePtr getRowNumberOutputType( const RowTypePtr& inputType, @@ -2484,6 +2715,14 @@ PlanNodePtr RowNumberNode::create(const folly::dynamic& obj, void* context) { source); } +PlanNodePtr RowNumberNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 1); + Builder builder(*this); + builder.source(newSources[0]); + return builder.build(); +} + namespace { std::unordered_map rankFunctionNames() { @@ -2625,6 +2864,14 @@ PlanNodePtr TopNRowNumberNode::create( source); } +PlanNodePtr TopNRowNumberNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 1); + Builder builder(*this); + builder.source(newSources[0]); + return builder.build(); +} + void LocalMergeNode::addDetails(std::stringstream& stream) const { addSortingKeys(sortingKeys_, sortingOrders_, stream); } @@ -2655,6 +2902,14 @@ PlanNodePtr LocalMergeNode::create(const folly::dynamic& obj, void* context) { std::move(sources)); } +PlanNodePtr LocalMergeNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 1); + Builder builder(*this); + builder.sources(std::move(newSources)); + return builder.build(); +} + void TableWriteNode::addDetails(std::stringstream& stream) const { stream << insertTableHandle_->connectorInsertTableHandle()->toString(); } @@ -2749,6 +3004,14 @@ PlanNodePtr TableWriteNode::create(const folly::dynamic& obj, void* context) { deserializeSingleSource(obj, context)); } +PlanNodePtr TableWriteNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 1); + Builder builder(*this); + builder.source(newSources[0]); + return builder.build(); +} + void TableWriteMergeNode::addDetails(std::stringstream& /* stream */) const {} folly::dynamic TableWriteMergeNode::serialize() const { @@ -2785,6 +3048,14 @@ PlanNodePtr TableWriteMergeNode::create( deserializeSingleSource(obj, context)); } +PlanNodePtr TableWriteMergeNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 1); + Builder builder(*this); + builder.source(newSources[0]); + return builder.build(); +} + MergeExchangeNode::MergeExchangeNode( const PlanNodeId& id, const RowTypePtr& type, @@ -2832,6 +3103,16 @@ PlanNodePtr MergeExchangeNode::create( serdeKind); } +PlanNodePtr MergeExchangeNode::copyWithNewSources( + std::vector newSources) const { + // VELOX_CHECK_EQ(newSources.size(), 1); + + // TODO: sortingKeys_ and outputType_ need to be constructed from newSources. + Builder builder(*this); + // builder.source(newSources[0]); + return builder.build(); +} + void LocalPartitionNode::addDetails(std::stringstream& stream) const { stream << toName(type_); if (type_ != Type::kGather) { @@ -2869,6 +3150,14 @@ PlanNodePtr LocalPartitionNode::create( deserializeSources(obj, context)); } +PlanNodePtr LocalPartitionNode::copyWithNewSources( + std::vector newSources) const { + // VELOX_CHECK_EQ(newSources.size(), 1); + Builder builder(*this); + builder.sources(std::move(newSources)); + return builder.build(); +} + namespace { const auto& localPartitionTypeNames() { static const folly::F14FastMap @@ -3002,6 +3291,14 @@ PlanNodePtr EnforceSingleRowNode::create( deserializePlanNodeId(obj), deserializeSingleSource(obj, context)); } +PlanNodePtr EnforceSingleRowNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 1); + Builder builder(*this); + builder.source(newSources[0]); + return builder.build(); +} + namespace { const auto& partitionKindNames() { static const folly::F14FastMap @@ -3075,6 +3372,14 @@ PlanNodePtr PartitionedOutputNode::create( deserializeSingleSource(obj, context)); } +PlanNodePtr PartitionedOutputNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 1); + Builder builder(*this); + builder.source(newSources[0]); + return builder.build(); +} + SpatialJoinNode::SpatialJoinNode( const PlanNodeId& id, JoinType joinType, @@ -3255,6 +3560,15 @@ PlanNodePtr SpatialJoinNode::create(const folly::dynamic& obj, void* context) { outputType); } +PlanNodePtr SpatialJoinNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 2); + + Builder builder(*this); + builder.left(std::move(newSources[0])).right(std::move(newSources[1])); + return builder.build(); +} + TopNNode::TopNNode( const PlanNodeId& id, const std::vector& sortingKeys, @@ -3324,6 +3638,14 @@ PlanNodePtr TopNNode::create(const folly::dynamic& obj, void* context) { std::move(source)); } +PlanNodePtr TopNNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 1); + Builder builder(*this); + builder.source(newSources[0]); + return builder.build(); +} + void LimitNode::addDetails(std::stringstream& stream) const { if (isPartial_) { stream << "PARTIAL "; @@ -3360,6 +3682,14 @@ PlanNodePtr LimitNode::create(const folly::dynamic& obj, void* context) { std::move(source)); } +PlanNodePtr LimitNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 1); + Builder builder(*this); + builder.source(newSources[0]); + return builder.build(); +} + void OrderByNode::addDetails(std::stringstream& stream) const { if (isPartial_) { stream << "PARTIAL "; @@ -3395,6 +3725,14 @@ PlanNodePtr OrderByNode::create(const folly::dynamic& obj, void* context) { std::move(source)); } +PlanNodePtr OrderByNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 1); + Builder builder(*this); + builder.source(newSources[0]); + return builder.build(); +} + void MarkDistinctNode::addDetails(std::stringstream& stream) const { addFields(stream, distinctKeys_); } @@ -3428,6 +3766,8 @@ void PlanNode::toString( source->toString(stream, detailed, true, indentationSize + 2, addContext); } } + +// stream << serialize(); } namespace { @@ -3623,6 +3963,34 @@ folly::dynamic PlanNode::serialize() const { return obj; } +PlanNodePtr PlanNode::copyWithNewSources( + std::vector newSources) const { + VELOX_UNSUPPORTED("copyWithNewSources is not implemented for {}", name()); +} +// +// void PlanNode::updateNewTypes( +// const std::map>& +// newOutputTypes) { +// RowTypePtr originalOutputType = outputType(); +// std::map> updatedOutputTypes; +// for (auto i = 0; i < originalOutputType->size(); i++) { +// auto originalName = originalOutputType->nameOf(i); +// auto newOutTypeIter = newOutputTypes.find(originalName); +// if (newOutTypeIter != newOutputTypes.end()) { +// // If the field was updated by upstream PlanNodes before, it must be a +// // FieldAccessTypedExpr that directly references the same field name in +// // this current ProjectionNode. Otherwise it would have a different +// symbol +// // name. In this case we only need to update the output type of this +// // node, because FieldAccessTypedExpr doesn't specify the input type. +// auto newOutputType = newOutTypeIter->second.second; +// auto newName = newOutTypeIter->second.first; +// updateOutputNameAndType(i, newName, newOutputType); +// } +// } +// originalOutputType->ensureNameToIndex(); +//} + const std::vector& TraceScanNode::sources() const { return kEmptySources; } @@ -3700,6 +4068,14 @@ PlanNodePtr FilterNode::create(const folly::dynamic& obj, void* context) { deserializePlanNodeId(obj), filter, std::move(source)); } +PlanNodePtr FilterNode::copyWithNewSources( + std::vector newSources) const { + VELOX_CHECK_EQ(newSources.size(), 1); + Builder builder; + builder.id(id()).filter(filter_).source(newSources[0]); + return builder.build(); +} + folly::dynamic IndexLookupCondition::serialize() const { folly::dynamic obj = folly::dynamic::object; obj["key"] = key->serialize(); diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index 3dafa65233ed..7d5c7ef4a53b 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -152,6 +152,9 @@ struct PlanSummaryOptions { AggregateOptions aggregate = {}; }; +class PlanNode; +using PlanNodePtr = std::shared_ptr; + class PlanNode : public ISerializable { public: explicit PlanNode(PlanNodeId id) : id_{std::move(id)} {} @@ -168,9 +171,24 @@ class PlanNode : public ISerializable { virtual const RowTypePtr& outputType() const = 0; +// void updateOutputNameAndType(int i, std::string& newName, TypePtr newType) +// const { +// auto rowType = std::const_pointer_cast(outputType()); +// rowType->updateChildAt(i, newName, newType); +// } +// +// // TODO: We are still marking const for the join key upcast elimination +// // optimization, but the PlanNode need to provide mutable interface if we +// // allow similar optimizations happen in other places. +// virtual void updateNewTypes( +// const std::map>& inputTypes); + virtual const std::vector>& sources() const = 0; + virtual PlanNodePtr copyWithNewSources( + std::vector> newSources) const; + /// Accepts a visitor to visit this plan node. /// Implementations of this class should implement it as /// visitor.visit(*this, context); @@ -321,7 +339,7 @@ class PlanNode : public ISerializable { const PlanNodeId id_; }; -using PlanNodePtr = std::shared_ptr; +// using PlanNodePtr = std::shared_ptr; class ValuesNode : public PlanNode { public: @@ -695,6 +713,9 @@ class FilterNode : public PlanNode { return sources_; } + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + void accept(const PlanNodeVisitor& visitor, PlanNodeVisitorContext& context) const override; @@ -796,6 +817,13 @@ class AbstractProjectNode : public PlanNode { return outputType_; } +// void updateNewTypes( +// const std::map>& newTypes) +// override; + +// PlanNodePtr copyWithNewSources( +// std::vector newSources) const override; + const std::vector& sources() const override { return sources_; } @@ -807,6 +835,14 @@ class AbstractProjectNode : public PlanNode { const std::vector& projections() const { return projections_; } +// +// void setProjection(int i, TypedExprPtr expr) { +// projections_[i] = expr; +// } +// +// void setName(int i, const std::string& projectionName) { +// names_[i] = projectionName; +// } // This function is virtual to allow customized projections to inherit from // this class without re-implementing the other functions. @@ -886,6 +922,9 @@ class ProjectNode : public AbstractProjectNode { const override; static PlanNodePtr create(const folly::dynamic& obj, void* context); + + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; }; using ProjectNodePtr = std::shared_ptr; @@ -911,6 +950,47 @@ class ParallelProjectNode : public core::AbstractProjectNode { std::vector noLoadIdentities, core::PlanNodePtr input); + class Builder + : public AbstractProjectNode::Builder { + public: + Builder() : AbstractProjectNode::Builder() {} + + explicit Builder(const ParallelProjectNode& other) + : AbstractProjectNode::Builder(other) {} + + Builder& exprNames(std::vector exprNames) { + exprNames_ = std::move(exprNames); + return static_cast(*this); + } + + Builder& exprGroups( + std::vector> exprGroups) { + exprGroups_ = std::move(exprGroups); + return static_cast(*this); + } + + Builder& noLoadIdentities(std::vector noLoadIdentities) { + noLoadIdentities_ = std::move(noLoadIdentities); + return static_cast(*this); + } + + std::shared_ptr build() const { + VELOX_USER_CHECK(id_.has_value(), "ProjectNode id is not set"); + VELOX_USER_CHECK(names_.has_value(), "ProjectNode names is not set"); + VELOX_USER_CHECK( + projections_.has_value(), "ProjectNode projections is not set"); + VELOX_USER_CHECK(source_.has_value(), "ProjectNode source is not set"); + + return std::make_shared( + id_.value(), names_.value(), projections_.value(), source_.value()); + } + + private: + std::vector exprNames_; + std::vector> exprGroups_; + std::vector noLoadIdentities_; + }; + std::string_view name() const override { return "ParallelProject"; } @@ -934,6 +1014,9 @@ class ParallelProjectNode : public core::AbstractProjectNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + private: void addDetails(std::stringstream& stream) const override; @@ -963,6 +1046,9 @@ class LazyDereferenceNode : public core::ProjectNode { } static PlanNodePtr create(const folly::dynamic& obj, void* context); + + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; }; using ParallelProjectNodePtr = std::shared_ptr; @@ -1066,6 +1152,9 @@ class TableScanNode : public PlanNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); +// PlanNodePtr copyWithNewSources( +// std::vector newSources) const override; + private: void addDetails(std::stringstream& stream) const override; @@ -1348,6 +1437,12 @@ class AggregationNode : public PlanNode { folly::dynamic serialize() const override; static PlanNodePtr create(const folly::dynamic& obj, void* context); +// void updateNewTypes( +// const std::map>& newTypes) +// override; + + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; private: static const std::vector kDefaultGlobalGroupingSets; @@ -1643,6 +1738,9 @@ class TableWriteNode : public PlanNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + private: void addDetails(std::stringstream& stream) const override; @@ -1762,6 +1860,9 @@ class TableWriteMergeNode : public PlanNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + private: void addDetails(std::stringstream& stream) const override; @@ -1867,6 +1968,9 @@ class ExpandNode : public PlanNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + private: void addDetails(std::stringstream& stream) const override; @@ -2029,6 +2133,9 @@ class GroupIdNode : public PlanNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + private: void addDetails(std::stringstream& stream) const override; @@ -2126,6 +2233,9 @@ class ExchangeNode : public PlanNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); +// PlanNodePtr copyWithNewSources( +// std::vector newSources) const override; + private: void addDetails(std::stringstream& stream) const override; @@ -2228,6 +2338,9 @@ class MergeExchangeNode : public ExchangeNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + private: void addDetails(std::stringstream& stream) const override; @@ -2335,6 +2448,9 @@ class LocalMergeNode : public PlanNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + private: void addDetails(std::stringstream& stream) const override; @@ -2557,6 +2673,9 @@ class LocalPartitionNode : public PlanNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + private: void addDetails(std::stringstream& stream) const override; @@ -2785,6 +2904,9 @@ class PartitionedOutputNode : public PlanNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + private: void addDetails(std::stringstream& stream) const override; @@ -3067,6 +3189,15 @@ class AbstractJoinNode : public PlanNode { return rightKeys_; } +// void updateNewTypes( +// const std::map>& +// newOutputTypes) override; + + // void updateJoinKeys(std::map updatedOutputTypes) { + // leftKeys_ = leftKeys; + // rightKeys_ = rightKeys; + // } + const TypedExprPtr& filter() const { return filter_; } @@ -3202,10 +3333,28 @@ class HashJoinNode : public AbstractJoinNode { return nullAware_; } + bool updateKeys(std::map updatedOutputTypes) { + bool updated = false; + for (auto& key : leftKeys_) { + auto it = updatedOutputTypes.find(key->name()); + if (it != updatedOutputTypes.end()) { + updated = true; + printf( + "Update left join key %s to type %s\n", + key->name().c_str(), + it->second->toString().c_str()); + } + } + return updated; + } + folly::dynamic serialize() const override; static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + private: void addDetails(std::stringstream& stream) const override; @@ -3282,6 +3431,9 @@ class MergeJoinNode : public AbstractJoinNode { static bool isSupported(JoinType joinType); static PlanNodePtr create(const folly::dynamic& obj, void* context); + + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; }; using MergeJoinNodePtr = std::shared_ptr; @@ -3554,6 +3706,9 @@ class IndexLookupJoinNode : public AbstractJoinNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + /// Returns true if the lookup join supports this join type, otherwise false. static bool isSupported(JoinType joinType); @@ -3706,6 +3861,9 @@ class NestedLoopJoinNode : public PlanNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + private: static const JoinType kDefaultJoinType; static const TypedExprPtr kDefaultJoinCondition; @@ -3852,6 +4010,9 @@ class OrderByNode : public PlanNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + private: void addDetails(std::stringstream& stream) const override; @@ -4050,6 +4211,9 @@ class SpatialJoinNode : public PlanNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + private: constexpr static JoinType kDefaultJoinType = JoinType::kInner; @@ -4183,6 +4347,9 @@ class TopNNode : public PlanNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + private: void addDetails(std::stringstream& stream) const override; @@ -4314,6 +4481,9 @@ class LimitNode : public PlanNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + private: void addDetails(std::stringstream& stream) const override; @@ -4491,6 +4661,9 @@ class UnnestNode : public PlanNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + private: void addDetails(std::stringstream& stream) const override; @@ -4568,6 +4741,9 @@ class EnforceSingleRowNode : public PlanNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + private: void addDetails(std::stringstream& stream) const override; @@ -4677,6 +4853,9 @@ class AssignUniqueIdNode : public PlanNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + private: void addDetails(std::stringstream& stream) const override; @@ -4913,6 +5092,9 @@ class WindowNode : public PlanNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + private: void addDetails(std::stringstream& stream) const override; @@ -5055,6 +5237,9 @@ class RowNumberNode : public PlanNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + private: void addDetails(std::stringstream& stream) const override; @@ -5167,6 +5352,9 @@ class MarkDistinctNode : public PlanNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + private: void addDetails(std::stringstream& stream) const override; @@ -5370,6 +5558,9 @@ class TopNRowNumberNode : public PlanNode { static PlanNodePtr create(const folly::dynamic& obj, void* context); + PlanNodePtr copyWithNewSources( + std::vector newSources) const override; + private: void addDetails(std::stringstream& stream) const override; diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 0307ffdbf7b1..e7b4cb9a7798 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -716,6 +716,9 @@ class QueryConfig { static constexpr const char* kRowSizeTrackingEnabled = "row_size_tracking_enabled"; + static constexpr const char* kPushdownIntegerUpcastsToScan = + "pushdown_integer_upcasts_to_scan"; + bool selectiveNimbleReaderEnabled() const { return get(kSelectiveNimbleReaderEnabled, false); } @@ -1291,6 +1294,10 @@ class QueryConfig { return get(kClientTags, ""); } + bool pushdownIntegerUpcastsToScan() const { + return get(kPushdownIntegerUpcastsToScan, false); + } + template T get(const std::string& key, const T& defaultValue) const { return config_->get(key, defaultValue); diff --git a/velox/exec/LocalPlanner.cpp b/velox/exec/LocalPlanner.cpp index b78ca6ec8b96..91160d64ba71 100644 --- a/velox/exec/LocalPlanner.cpp +++ b/velox/exec/LocalPlanner.cpp @@ -13,7 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "velox/exec/LocalPlanner.h" +#include + #include "velox/core/PlanFragment.h" #include "velox/exec/ArrowStream.h" #include "velox/exec/AssignUniqueId.h" @@ -28,6 +29,7 @@ #include "velox/exec/HashProbe.h" #include "velox/exec/IndexLookupJoin.h" #include "velox/exec/Limit.h" +#include "velox/exec/LocalPlanner.h" #include "velox/exec/MarkDistinct.h" #include "velox/exec/Merge.h" #include "velox/exec/MergeJoin.h" @@ -74,10 +76,10 @@ bool eagerFlush(const core::PlanNode& node) { namespace detail { +using PlanNodePtr = std::shared_ptr; + /// Returns true if source nodes must run in a separate pipeline. -bool mustStartNewPipeline( - const std::shared_ptr& planNode, - int sourceId) { +bool mustStartNewPipeline(const PlanNodePtr& planNode, int sourceId) { if (auto localMerge = std::dynamic_pointer_cast(planNode)) { // LocalMerge's source runs on its own pipeline. @@ -118,8 +120,7 @@ OperatorSupplier makeOperatorSupplier(ConsumerSupplier consumerSupplier) { return nullptr; } -OperatorSupplier makeOperatorSupplier( - const std::shared_ptr& planNode) { +OperatorSupplier makeOperatorSupplier(const PlanNodePtr& planNode) { if (auto localMerge = std::dynamic_pointer_cast(planNode)) { return [localMerge](int32_t operatorId, DriverCtx* ctx) { @@ -210,6 +211,171 @@ OperatorSupplier makeOperatorSupplier( return Operator::operatorSupplierFromPlanNode(planNode); } +// void removeOutputFromHashKeys( +// const RowTypePtr& outputRowType, +// std::set& hashKeys) { +// for (const auto& outputFieldName : outputRowType->names()) { +// hashKeys.erase(outputFieldName); +// } +// +// if (type->isPrimitiveType()) { +// hashKeys.erase(std::string(name)); +// return; +// } +// +// if (type->isMap()) { +// const auto& mapType = type->as(); +// removeOutputFromHashKeys(mapType.keyType(), hashKeys); +// removeOutputFromHashKeys(mapType.valueType(), hashKeys); +// } else if (type->isArray()) { +// const auto& arrayType = type->as(); +// removeOutputFromHashKeys(arrayType.elementType(), hashKeys); +// } else if (type->isRow()) { +// const auto& rowType = type->as(); +// for (const auto& childType : rowType.children()) { +// removeOutputFromHashKeys(childType, hashKeys); +// } +// } else { +// VELOX_UNREACHABLE( +// "removeOutputFromHashKeys: Unsupported complex type: {}", +// type->toString()); +// } +// } + +bool isIntegral(const TypePtr& type) { + return type->isBigint() || type->isInteger() || type->isSmallint() || + type->isTinyint(); +} + +bool isWideningIntegralCast(const core::TypedExprPtr& expr) { + auto castExpr = std::dynamic_pointer_cast(expr); + if (!castExpr) { + return false; + } + + auto outputType = castExpr->type(); + auto inputType = castExpr->inputs()[0]->type(); + if (!isIntegral(outputType) || !isIntegral(inputType)) { + return false; + } + if (outputType->cppSizeInBytes() > inputType->cppSizeInBytes()) { + return true; + } + return false; +} +// +// std::map> plan( +// const PlanNodePtr& planNode, +// std::vector* currentPlanNodes, +// const PlanNodePtr& consumerNode, +// OperatorSupplier operatorSupplier, +// std::vector>* driverFactories, +// std::set& hashOnlyKeys) { +// if (!currentPlanNodes) { +// auto driverFactory = std::make_unique(); +// currentPlanNodes = &driverFactory->planNodes; +// driverFactory->operatorSupplier = std::move(operatorSupplier); +// driverFactory->consumerNode = consumerNode; +// driverFactories->push_back(std::move(driverFactory)); +// } +// +// std::map> updatedOutputTypes; +// if (auto joinNode = +// std::dynamic_pointer_cast(planNode)) { +// // Collect all hash keys from both sides of the join that are not output. +// std::set currentHashKeys; +// auto& leftKeys = joinNode->leftKeys(); +// for (int i = 0; i < leftKeys.size(); ++i) { +// currentHashKeys.insert(leftKeys[i]->name()); +// } +// auto& rightKeys = joinNode->rightKeys(); +// for (int i = 0; i < rightKeys.size(); ++i) { +// currentHashKeys.insert(rightKeys[i]->name()); +// } +// +// // Traverse the output type and remove the hash keys that are also output +// removeOutputFromHashKeys(planNode->outputType(), currentHashKeys); +// +// // Merge with the hash keys from the upper level. +// for (const auto& key : currentHashKeys) { +// hashOnlyKeys.insert(key); +// } +// } else if ( +// auto projectNode = std::const_pointer_cast( +// dynamic_pointer_cast(planNode))) { +// // Remove cast expressions on hash keys if they are widening integral +// casts. for (auto i = 0; i < projectNode->names().size(); i++) { +// auto name = projectNode->names()[i]; +// if (hashOnlyKeys.find(name) != hashOnlyKeys.end()) { +// auto projection = projectNode->projections()[i]; +// if (isWideningIntegralCast(projection)) { +// hashOnlyKeys.erase(name); +// VELOX_CHECK_EQ(projection->inputs().size(), 1); +// auto newProjection = projection->inputs()[0]; +// +// // This implementation only handles field access expressions. +// if (newProjection->isFieldAccessKind()) { +// auto fieldAccessExpr = +// std::dynamic_pointer_cast( +// newProjection); +// auto newName = fieldAccessExpr->name(); +// projectNode->setProjection(i, newProjection); +// updatedOutputTypes[name] = +// std::make_pair(newName, newProjection->type()); +// } +// } +// } +// } +// if (!updatedOutputTypes.empty()) { +// projectNode->updateNewTypes(updatedOutputTypes); +// } +// } else if ( +// auto tableScanNode = std::const_pointer_cast( +// dynamic_pointer_cast(planNode))) { +// // Remove cast expressions on hash keys if they are widening integral +// casts. for (auto& column : tableScanNode->assignments()) { +// printf("Column: %s", column.first.c_str()); +// } +// } +// +// const auto& sources = planNode->sources(); +// if (sources.empty()) { +// driverFactories->back()->inputDriver = true; +// } else { +// const auto numSourcesToPlan = +// isIndexLookupJoin(planNode.get()) ? 1 : sources.size(); +// for (int32_t i = 0; i < numSourcesToPlan; ++i) { +// auto updatedTypesBySource = plan( +// sources[i], +// mustStartNewPipeline(planNode, i) ? nullptr : currentPlanNodes, +// planNode, +// makeOperatorSupplier(planNode), +// driverFactories, +// hashOnlyKeys); +// +// // Backtrack and propagate the updated types to the current plan node. +// if (!updatedTypesBySource.empty()) { +// // auto updatedOutputTypesBySource = +// auto mutablePlanNode = +// std::const_pointer_cast(planNode); +// mutablePlanNode->updateNewTypes(updatedTypesBySource); +// updatedOutputTypes.insert( +// updatedTypesBySource.begin(), updatedTypesBySource.end()); +// // if (auto joinNode = +// // std::const_pointer_cast( +// // std::dynamic_pointer_cast(planNode))) { +// // joinNode->updateJoinKeys(updatedTypesBySource); +// // } +// } +// } +// } +// +// currentPlanNodes->push_back(planNode); +// return updatedOutputTypes; +//} +// + void plan( const std::shared_ptr& planNode, std::vector>* currentPlanNodes, @@ -243,9 +409,339 @@ void plan( currentPlanNodes->push_back(planNode); } +void updateHashKeysForHashJoinNode( + const std::shared_ptr& joinNode, + int8_t childIndex, + std::set& hashOnlyKeys) { + std::vector* keys; + + if (childIndex == 0) { + auto leftKeys = joinNode->leftKeys(); + for (int i = 0; i < leftKeys.size(); ++i) { + hashOnlyKeys.insert(leftKeys[i]->name()); + } + } else if (childIndex == 1) { + auto rightKeys = joinNode->rightKeys(); + for (int i = 0; i < rightKeys.size(); ++i) { + hashOnlyKeys.insert(rightKeys[i]->name()); + } + } else { + VELOX_UNREACHABLE("Invalid child index: {}", childIndex); + } + + // Traverse the output type and remove the hash keys that are also output + for (const auto& outputFieldName : joinNode->outputType()->names()) { + hashOnlyKeys.erase(outputFieldName); + } + // removeOutputFromHashKeys(joinNode->outputType(), hashOnlyKeys); +} + +PlanNodePtr plan2( + PlanNodePtr planNode, + std::vector* currentPlanNodes, + const PlanNodePtr& consumerNode, + OperatorSupplier operatorSupplier, + std::vector>* driverFactories, + std::set& hashOnlyKeys) { + LOG(INFO) << "Planning node: " << planNode->toString() << " from consumer: " + << (consumerNode.get() ? consumerNode->toString() : "null"); + // 0) Start a new pipeline if needed. + if (!currentPlanNodes) { + // Leaf node, start a new driver factory. + auto driverFactory = std::make_unique(); + currentPlanNodes = &driverFactory->planNodes; + driverFactory->operatorSupplier = std::move(operatorSupplier); + driverFactory->consumerNode = consumerNode; + driverFactories->push_back(std::move(driverFactory)); + } + + auto& sources = planNode->sources(); + if (sources.empty()) { + // Leaf: fall through and add this node below. + driverFactories->back()->inputDriver = true; + } else { + bool needNewPlanNode = false; + std::set projectionsNeedUpdate; + std::vector newSources; + + // 1) Plan children + const auto numSourcesToPlan = + isIndexLookupJoin(planNode.get()) ? 1 : sources.size(); + for (int32_t i = 0; i < numSourcesToPlan; ++i) { + // 1.1 Prepare for planning, update the hash keys. + // For each source, we may need to update the hash keys based on the + // current node type and whether it's a widening cast. + if (auto joinNode = + std::dynamic_pointer_cast(planNode)) { + // 1.1.1 For hash join node, we need to update the hash keys for each + // side of the join. + updateHashKeysForHashJoinNode(joinNode, i, hashOnlyKeys); + for (const auto& key : hashOnlyKeys) { + LOG(INFO) << "Hash Join key: " << key; + } + } else if ( + auto projectNode = + dynamic_pointer_cast(planNode)) { + // 1.1.2 For ProjectNode, remove cast expressions on hash keys if they + // are widening integral casts. + for (auto i = 0; i < projectNode->names().size(); i++) { + const auto& name = projectNode->names()[i]; + if (hashOnlyKeys.find(name) != hashOnlyKeys.end()) { + if (isWideningIntegralCast(projectNode->projections()[i])) { + hashOnlyKeys.erase(name); + projectionsNeedUpdate.insert(i); + needNewPlanNode = true; + LOG(INFO) << "Remove widening integral cast on hash key: " + << name; + } + } + } + } + + // 1.2 Recurse on the i-th source + auto source = plan2( + sources[i], + mustStartNewPipeline(planNode, i) ? nullptr : currentPlanNodes, + planNode, + makeOperatorSupplier(planNode), + driverFactories, + hashOnlyKeys); + + // Backtrack and propagate the updated types to the current plan node. + if (source != sources[i]) { + needNewPlanNode = true; + } + newSources.push_back(source); // TODO: Do not push into vector if not + } + + // 2. Create a new plan node with updated sources and/or projections. + if (needNewPlanNode) { + VELOX_CHECK_GT(newSources.size(), 0); + VELOX_CHECK_EQ(newSources.size(), sources.size()); + if (const auto& projectNode = + std::dynamic_pointer_cast(planNode)) { + // 2.1 Special handling for ProjectNode to remove widening integral + // casts. Create new projections with the updated expressions. + auto newProjections = projectNode->projections(); // copy + if (!projectionsNeedUpdate.empty()) { + for (auto index : projectionsNeedUpdate) { + const auto& projection = projectNode->projections()[index]; + VELOX_CHECK( + isWideningIntegralCast(projection), + "Expect widening integral cast, got {}", + projection->toString()); + VELOX_CHECK_EQ(projection->inputs().size(), 1); + // Replace CAST(int->bigint) by its input expr (identity on base + // type). + newProjections[index] = projection->inputs()[0]; + } + } + + // Update the projections from newSources[0] if needed. + for (auto i = 0; i < newProjections.size(); i++) { + auto newProjection = newProjections[i]; + if (newProjection->isFieldAccessKind()) { + auto fieldAccessExpr = + std::dynamic_pointer_cast( + newProjection); + auto fieldName = fieldAccessExpr->name(); + + // It's guranteed that the field name exists in the new source type. + auto newType = newSources[0]->outputType()->findChild(fieldName); + + // Create a new FieldAccessTypedExpr with the updated type from + // newSources[0]. + newProjections[i] = std::make_shared( + newType, fieldName); + } + } + + core::ProjectNode::Builder builder; + planNode = builder.id(planNode->id()) + .source(newSources[0]) + .projections(newProjections) + .names(projectNode->names()) + .build(); + LOG(INFO) << "Created new ProjectNode: " << planNode->toString(); + } else { + // 2.2 Generic path: same node with new children. + planNode = planNode->copyWithNewSources(newSources); + LOG(INFO) << "Created new generic PlanNode: " << planNode->toString(); + } + } + } + + // 3. Add the current plan node to the current pipeline. + currentPlanNodes->push_back(planNode); + return planNode; +} + +PlanNodePtr plan3( + PlanNodePtr planNode, + std::vector>* currentPlanNodes, + const std::shared_ptr& consumerNode, + OperatorSupplier operatorSupplier, + std::vector>* driverFactories, + std::map& desiredOutputTypes) { + if (!currentPlanNodes) { + auto driverFactory = std::make_unique(); + currentPlanNodes = &driverFactory->planNodes; + driverFactory->operatorSupplier = std::move(operatorSupplier); + driverFactory->consumerNode = consumerNode; + driverFactories->push_back(std::move(driverFactory)); + } + + auto& sources = planNode->sources(); + if (sources.empty()) { + // Leaf: new driver, and update TableScan output type if needed. + driverFactories->back()->inputDriver = true; + + if ( + // TableScan Node: update the output type to avoid upcasting in + // the upcoming FilterProject + const auto& tableScanNode = + std::dynamic_pointer_cast(planNode)) { + const auto& outputType = tableScanNode->outputType(); + std::vector names; + std::vector types; + names.reserve(outputType->size()); + types.reserve(outputType->size()); + for (int i = 0; i < outputType->size(); i++) { + std::string name = outputType->nameOf(i); + + auto iter = desiredOutputTypes.find(name); + if (iter != desiredOutputTypes.end()) { + types.push_back(iter->second); + LOG(INFO) << "Update TableScanNode output type for column: " + << names.back() << " from " << outputType->childAt(i) + << " to " << types.back(); + } else { + types.push_back(outputType->childAt(i)); + } + names.push_back(std::move(name)); + } + auto newOutputType = + std::make_shared(std::move(names), std::move(types)); + + core::TableScanNode::Builder builder(*tableScanNode); + planNode = builder.outputType(std::move(newOutputType)).build(); + LOG(INFO) << "Created new TableScanNode: " << planNode->toString(); + } + } else { + // Non-leaf node. + bool needNewPlanNode = false; + std::set projectionsNeedUpdate; + std::vector newSources; + + // For ProjectNode that is the immediate consumer of TableScan, remove + // cast expressions if they are widening integer casts. We want the + // TableScan to return the upcasted type by type coercion. + if (const auto& projectNode = + dynamic_pointer_cast(planNode)) { + VELOX_CHECK_EQ(projectNode->sources().size(), 1); + if (auto tableScanNode = + std::dynamic_pointer_cast( + projectNode->sources()[0])) { + // Only handle the case where ProjectNode is the immediate consumer of + // TableScan. + for (auto i = 0; i < projectNode->names().size(); i++) { + auto& projection = projectNode->projections()[i]; + // TODO: Do not copy if no change. + auto newProjection = projection; + + if (isWideningIntegralCast(projection)) { + // Find the new name from the input of the cast expression, which + // must be a FieldAccessTypedExpr. + VELOX_CHECK_EQ(projection->inputs().size(), 1); + const auto& inputExpr = projection->inputs()[0]; + VELOX_CHECK(inputExpr->isFieldAccessKind()); + auto childName = + std::dynamic_pointer_cast( + inputExpr) + ->name(); + + // Update the desired output type for its source to avoid upcasting. + desiredOutputTypes[childName] = + projectNode->outputType()->childAt(i); + projectionsNeedUpdate.insert(i); + needNewPlanNode = true; + LOG(INFO) << "Remove widening integral cast on key: " + << projectNode->names()[i]; + } + } + } + } + + const auto numSourcesToPlan = + isIndexLookupJoin(planNode.get()) ? 1 : sources.size(); + for (int32_t i = 0; i < numSourcesToPlan; ++i) { + auto source = plan3( + sources[i], + mustStartNewPipeline(planNode, i) ? nullptr : currentPlanNodes, + planNode, + makeOperatorSupplier(planNode), + driverFactories, + desiredOutputTypes); + + // Backtrack and propagate the updated types to the current plan node. + if (source != sources[i]) { + needNewPlanNode = true; + } + newSources.push_back(source); + } + + if (needNewPlanNode) { + if (const auto& projectNode = + std::dynamic_pointer_cast(planNode)) { + // Special handling for ProjectNode to remove widening integral + // casts. Create new projections with the updated expressions. + + // TODO: Do not copy if no change. + auto newProjections = projectNode->projections(); // copy + if (!projectionsNeedUpdate.empty()) { + for (auto index : projectionsNeedUpdate) { + const auto& projection = projectNode->projections()[index]; + VELOX_CHECK( + isWideningIntegralCast(projection), + "Expect widening integral cast, got {}", + projection->toString()); + VELOX_CHECK_EQ(projection->inputs().size(), 1); + + // Replace CAST(int->bigint) by its input expr (identity on base + // type). + const auto& inputExpr = projection->inputs()[0]; + VELOX_CHECK(inputExpr->isFieldAccessKind()); + auto fieldAccessExpr = + std::dynamic_pointer_cast( + inputExpr); + auto fieldName = fieldAccessExpr->name(); + TypePtr type = projectNode->outputType()->childAt(index); + newProjections[index] = + std::make_shared(type, fieldName); + } + } + + core::ProjectNode::Builder builder; + planNode = builder.id(planNode->id()) + .source(newSources[0]) + .projections(newProjections) + .names(projectNode->names()) + .build(); + LOG(INFO) << "Created new ProjectNode: " << planNode->toString(); + } else { + // 2.2 Generic path: same node with new children. + planNode = planNode->copyWithNewSources(newSources); + LOG(INFO) << "Created new generic PlanNode: " << planNode->toString(); + } + } + } + // 3. Add the current (updated) plan node to the current pipeline. + currentPlanNodes->push_back(planNode); + return planNode; +} + // Sometimes consumer limits the number of drivers its producer can run. -uint32_t maxDriversForConsumer( - const std::shared_ptr& node) { +uint32_t maxDriversForConsumer(const PlanNodePtr& node) { if (std::dynamic_pointer_cast(node)) { // MergeJoinNode must run single-threaded. return 1; @@ -361,12 +857,24 @@ void LocalPlanner::plan( adapter.inspect(planFragment); } } - detail::plan( - planFragment.planNode, - nullptr, - nullptr, - detail::makeOperatorSupplier(std::move(consumerSupplier)), - driverFactories); + + if (queryConfig.pushdownIntegerUpcastsToScan()) { + std::map desiredOutputTypes; + detail::plan3( + planFragment.planNode, + nullptr, + nullptr, + detail::makeOperatorSupplier(std::move(consumerSupplier)), + driverFactories, + desiredOutputTypes); + } else { + detail::plan( + planFragment.planNode, + nullptr, + nullptr, + detail::makeOperatorSupplier(std::move(consumerSupplier)), + driverFactories); + } (*driverFactories)[0]->outputDriver = true; @@ -501,8 +1009,9 @@ std::shared_ptr DriverFactory::createDriver( auto next = planNodes[i + 1]; if (auto projectNode = std::dynamic_pointer_cast(next)) { - operators.push_back(std::make_unique( - id, ctx.get(), filterNode, projectNode)); + operators.push_back( + std::make_unique( + id, ctx.get(), filterNode, projectNode)); i++; continue; } @@ -543,8 +1052,9 @@ std::shared_ptr DriverFactory::createDriver( auto tableWriteMergeNode = std::dynamic_pointer_cast( planNode)) { - operators.push_back(std::make_unique( - id, ctx.get(), tableWriteMergeNode)); + operators.push_back( + std::make_unique( + id, ctx.get(), tableWriteMergeNode)); } else if ( auto mergeExchangeNode = std::dynamic_pointer_cast( @@ -554,16 +1064,19 @@ std::shared_ptr DriverFactory::createDriver( } else if ( auto exchangeNode = std::dynamic_pointer_cast(planNode)) { - // NOTE: the exchange client can only be used by one operator in a driver. + // NOTE: the exchange client can only be used by one operator in a + // driver. VELOX_CHECK_NOT_NULL(exchangeClient); - operators.push_back(std::make_unique( - id, ctx.get(), exchangeNode, std::move(exchangeClient))); + operators.push_back( + std::make_unique( + id, ctx.get(), exchangeNode, std::move(exchangeClient))); } else if ( auto partitionedOutputNode = std::dynamic_pointer_cast( planNode)) { - operators.push_back(std::make_unique( - id, ctx.get(), partitionedOutputNode, eagerFlush(*planNode))); + operators.push_back( + std::make_unique( + id, ctx.get(), partitionedOutputNode, eagerFlush(*planNode))); } else if ( auto joinNode = std::dynamic_pointer_cast(planNode)) { @@ -589,8 +1102,9 @@ std::shared_ptr DriverFactory::createDriver( auto aggregationNode = std::dynamic_pointer_cast(planNode)) { if (aggregationNode->isPreGrouped()) { - operators.push_back(std::make_unique( - id, ctx.get(), aggregationNode)); + operators.push_back( + std::make_unique( + id, ctx.get(), aggregationNode)); } else { operators.push_back( std::make_unique(id, ctx.get(), aggregationNode)); @@ -653,12 +1167,13 @@ std::shared_ptr DriverFactory::createDriver( auto localPartitionNode = std::dynamic_pointer_cast( planNode)) { - operators.push_back(std::make_unique( - id, - ctx.get(), - localPartitionNode->outputType(), - localPartitionNode->id(), - ctx->partitionId)); + operators.push_back( + std::make_unique( + id, + ctx.get(), + localPartitionNode->outputType(), + localPartitionNode->id(), + ctx->partitionId)); } else if ( auto unnest = std::dynamic_pointer_cast(planNode)) { @@ -673,17 +1188,19 @@ std::shared_ptr DriverFactory::createDriver( auto assignUniqueIdNode = std::dynamic_pointer_cast( planNode)) { - operators.push_back(std::make_unique( - id, - ctx.get(), - assignUniqueIdNode, - assignUniqueIdNode->taskUniqueId(), - assignUniqueIdNode->uniqueIdCounter())); + operators.push_back( + std::make_unique( + id, + ctx.get(), + assignUniqueIdNode, + assignUniqueIdNode->taskUniqueId(), + assignUniqueIdNode->uniqueIdCounter())); } else if ( const auto traceScanNode = std::dynamic_pointer_cast(planNode)) { - operators.push_back(std::make_unique( - id, ctx.get(), traceScanNode)); + operators.push_back( + std::make_unique( + id, ctx.get(), traceScanNode)); } else { std::unique_ptr extended; if (planNode->requiresExchangeClient()) { diff --git a/velox/functions/CMakeLists.txt b/velox/functions/CMakeLists.txt index 3401db234e0a..1e86ac2b0ba3 100644 --- a/velox/functions/CMakeLists.txt +++ b/velox/functions/CMakeLists.txt @@ -21,6 +21,8 @@ if(${VELOX_ENABLE_PRESTO_FUNCTIONS}) add_subdirectory(prestosql) endif() +set(VELOX_ENABLE_SPARK_FUNCTIONS OFF) + if(${VELOX_ENABLE_SPARK_FUNCTIONS}) add_subdirectory(sparksql) endif() diff --git a/velox/type/Type.cpp b/velox/type/Type.cpp index 69a4a3210389..8ce3d8c529c4 100644 --- a/velox/type/Type.cpp +++ b/velox/type/Type.cpp @@ -510,8 +510,16 @@ bool RowType::containsChild(std::string_view name) const { } uint32_t RowType::getChildIdx(std::string_view name) const { +// LOG(INFO) << "\ngetChildIdx: \n" << std::string(name).c_str(); + auto index = getChildIdxIfExists(name); if (!index.has_value()) { + const auto& nameToIndex = this->nameToIndex(); + LOG(INFO) << name << " not found. nameToIndex: \n"; + for (auto entry : nameToIndex) { + LOG(INFO) << entry.data << " -> " << entry.index; + } + VELOX_USER_FAIL(makeFieldNotFoundErrorMessage(name, names_)); } return index.value(); @@ -519,7 +527,11 @@ uint32_t RowType::getChildIdx(std::string_view name) const { std::optional RowType::getChildIdxIfExists( std::string_view name) const { - const auto& nameToIndex = this->nameToIndex(); + const auto& nameToIndex = this->nameToIndex(); +// for (auto entry : nameToIndex) { +// LOG(INFO) << entry.data << " -> " << entry.index; +// } + auto it = nameToIndex.find(NameIndex{name, 0}); if (it != nameToIndex.end()) { return it->index; diff --git a/velox/type/Type.h b/velox/type/Type.h index dcbaf3600709..94574e66292a 100644 --- a/velox/type/Type.h +++ b/velox/type/Type.h @@ -1142,6 +1142,12 @@ class RowType : public TypeBase { return children_[idx]; } +// void updateChildAt(uint32_t idx, std::string& newName, TypePtr newType) { +// VELOX_CHECK_LT(idx, children_.size()); +// names_[idx] = newName; +// children_[idx] = newType; +// } + const std::vector& children() const { return children_; } diff --git a/velox/vector/ComplexVector.h b/velox/vector/ComplexVector.h index 7c773c46e030..ddf9b8fd1981 100644 --- a/velox/vector/ComplexVector.h +++ b/velox/vector/ComplexVector.h @@ -59,6 +59,14 @@ class RowVector : public BaseVector { for (size_t i = 0; i < children_.size(); i++) { const auto& child = children_[i]; if (child) { + if (!child->type()->kindEquals(type->childAt(i))) { + LOG(ERROR) << "Child type: " << child->type()->toString() + << ", expected type: " << type->childAt(i)->toString() + << ", for field: " << rowType->nameOf(i) + << ", at position: " << i + << ", in RowVector type: " << type_->toString() + << ", child vector: " << child->toString(); + } VELOX_DCHECK( child->type()->kindEquals(type->childAt(i)), "Got type {} for field `{}` at position {}, but expected {}.",