[presto][rift] Add riftTier field to RPCNode for coordinator-to-worker tier injection (#27725)#27725
Open
abhinavmuk04 wants to merge 1 commit into
Open
[presto][rift] Add riftTier field to RPCNode for coordinator-to-worker tier injection (#27725)#27725abhinavmuk04 wants to merge 1 commit into
abhinavmuk04 wants to merge 1 commit into
Conversation
Contributor
Reviewer's GuideAdds a riftTier string field to RPCNode and threads it end-to-end from the Presto Java planner through the C++ protocol and Velox plan nodes to async RPC functions, allowing workers (notably FbLlmInference) to receive an explicit RIFT SMC tier chosen by the coordinator. Sequence diagram for riftTier injection from coordinator to FbLlmInferencesequenceDiagram
actor Coordinator
participant JavaPlanner
participant CppProtocol as PrestoCppProtocol
participant VeloxPlan
participant RPCOp as RPCOperator
participant Func as FbLlmInference
participant RIFT as RIFTService
Coordinator->>JavaPlanner: Build plan with RPCNode(riftTier)
JavaPlanner->>CppProtocol: Serialize RPCNode with riftTier
CppProtocol->>VeloxPlan: Convert to Velox RPCNode(riftTier)
VeloxPlan->>RPCOp: Create RPCOperator with plan node riftTier
RPCOp->>Func: setRiftTier(planNode.riftTier)
RPCOp->>Func: initialize()
Func->>Func: selectTier(optionsJson, planNode.riftTier, sessionProperty)
Func->>RIFT: Invoke RPC on selected tier
Class diagram for RPCNode riftTier propagation across layersclassDiagram
class Java_RPCNode {
+PlanNode source
+String functionName
+List~RowExpression~ arguments
+List~String~ argumentColumns
+VariableReferenceExpression outputVariable
+StreamingMode streamingMode
+int dispatchBatchSize
+String riftTier
+Java_RPCNode(sourceLocation, id, source, functionName, arguments, argumentColumns, outputVariable, streamingMode, dispatchBatchSize, riftTier)
+Java_RPCNode(sourceLocation, id, statsEquivalentPlanNode, source, functionName, arguments, argumentColumns, outputVariable, streamingMode, dispatchBatchSize, riftTier)
+String getRiftTier()
+PlanNode replaceChildren(newChildren)
+PlanNode assignStatsEquivalentPlanNode(statsEquivalentPlanNode)
}
class Cpp_RPCNode {
+PlanNode source
+String functionName
+vector~RowExpression~ arguments
+vector~String~ argumentColumns
+VariableReferenceExpression outputVariable
+RPCNodeStreamingMode streamingMode
+Integer dispatchBatchSize
+String riftTier
+RPCNode()
}
class Velox_RPCNode {
+core::PlanNodePtr source
+std::string functionName
+std::vector~RowExpressionPtr~ arguments
+std::vector~std::string~ argumentColumns
+core::TypedExprPtr outputVariable
+StreamingMode streamingMode
+int32_t dispatchBatchSize
+std::string riftTier
+Velox_RPCNode(id, source, functionName, arguments, argumentColumns, outputVariable, streamingMode, dispatchBatchSize, riftTier)
+const std::string& riftTier()
+serialize()
+deserialize()
}
class VeloxQueryPlanConverterBase {
+core::PlanNodePtr toVeloxQueryPlan(protocol::RPCNode node, TableWriteInfo tableWriteInfo, TaskId taskId)
}
class AsyncRPCFunction {
<<interface>>
+initialize()
+virtual void setRiftTier(const std::string& riftTier)
}
class FbLlmInference {
+std::string optionsTier
+std::string planNodeTier
+std::string sessionTier
+std::string effectiveTier
+void setRiftTier(const std::string& riftTier)
+void initialize()
+void selectTier()
}
class RPCOperator {
+std::unique_ptr~AsyncRPCFunction~ function_
+void addInput()
+void initialize()
+void getOutput()
+void prepareFunction(const std::string& riftTier)
}
Java_RPCNode <.. Cpp_RPCNode : json
Cpp_RPCNode <.. Velox_RPCNode : converted_by
VeloxQueryPlanConverterBase --> Velox_RPCNode : creates
VeloxQueryPlanConverterBase --> Cpp_RPCNode : reads riftTier
RPCOperator --> AsyncRPCFunction : uses
AsyncRPCFunction <|.. FbLlmInference
RPCOperator --> Velox_RPCNode : reads riftTier
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.Scanned FilesNone |
Contributor
There was a problem hiding this comment.
Hey - I've left some high level feedback:
- The Java
RPCNodeconstructor silently normalizesnullriftTiervalues to an empty string while the C++RPCNodejust exposes aString riftTierfield; consider aligning the default/empty semantics across layers (e.g., consistently treating missing vs empty tiers) to avoid subtle mismatches in behavior. - In
RpcFunctionOptimizerthe newriftTierargument is hardcoded as an empty string; if this is meant to represent an unset tier, consider using a named constant or helper to make the intent clear and to avoid scattering magic values if tier injection logic changes later.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The Java `RPCNode` constructor silently normalizes `null` `riftTier` values to an empty string while the C++ `RPCNode` just exposes a `String riftTier` field; consider aligning the default/empty semantics across layers (e.g., consistently treating missing vs empty tiers) to avoid subtle mismatches in behavior.
- In `RpcFunctionOptimizer` the new `riftTier` argument is hardcoded as an empty string; if this is meant to represent an unset tier, consider using a named constant or helper to make the intent clear and to avoid scattering magic values if tier injection logic changes later.Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
Contributor
|
meta-codesync Bot
pushed a commit
that referenced
this pull request
May 12, 2026
…r tier injection (#27725) Summary: Adds a riftTier field to the RPCNode plan node across all layers (Java → C++ protocol → Velox) so the coordinator can pass the auto-provisioned RIFT SMC tier to workers through the query plan. This is the follow-up to D101101436 that completes the Mode 2 (auto-start) data path. The RIFT CLI generates random tier names (e.g., smc.rift_amukher1_llama3_a3f1b2m), so the coordinator must communicate the tier through the plan node rather than deterministic naming. Changes across the full plan node chain: - Java RPCNode: add riftTier field, both constructors, getter, pass through in replaceChildren/assignStatsEquivalentPlanNode - Java RpcFunctionOptimizer: pass empty riftTier (coordinator injection TBD) - Java UnaliasSymbolReferences, PruneUnreferencedOutputs: pass through - C++ presto_protocol_core: add riftTier to RPCNode struct + serialization - C++ PrestoToVeloxQueryPlan: pass node->riftTier to Velox RPCNode - Velox PlanNode.h/cpp: add riftTier field, getter, serialize/deserialize - AsyncRPCFunction.h: add virtual setRiftTier() method (default no-op) - RPCOperator.cpp: call function_->setRiftTier() before initialize() - FbLlmInference: implement setRiftTier(), use plan node tier with highest priority (options JSON > plan node riftTier > session property) Differential Revision: D101105948
01796a9 to
5bec072
Compare
meta-codesync Bot
pushed a commit
that referenced
this pull request
May 12, 2026
…r tier injection (#27725) Summary: Adds a riftTier field to the RPCNode plan node across all layers (Java → C++ protocol → Velox) so the coordinator can pass the auto-provisioned RIFT SMC tier to workers through the query plan. This is the follow-up to D101101436 that completes the Mode 2 (auto-start) data path. The RIFT CLI generates random tier names (e.g., smc.rift_amukher1_llama3_a3f1b2m), so the coordinator must communicate the tier through the plan node rather than deterministic naming. Changes across the full plan node chain: - Java RPCNode: add riftTier field, both constructors, getter, pass through in replaceChildren/assignStatsEquivalentPlanNode - Java RpcFunctionOptimizer: pass empty riftTier (coordinator injection TBD) - Java UnaliasSymbolReferences, PruneUnreferencedOutputs: pass through - C++ presto_protocol_core: add riftTier to RPCNode struct + serialization - C++ PrestoToVeloxQueryPlan: pass node->riftTier to Velox RPCNode - Velox PlanNode.h/cpp: add riftTier field, getter, serialize/deserialize - AsyncRPCFunction.h: add virtual setRiftTier() method (default no-op) - RPCOperator.cpp: call function_->setRiftTier() before initialize() - FbLlmInference: implement setRiftTier(), use plan node tier with highest priority (options JSON > plan node riftTier > session property) Differential Revision: D101105948
5bec072 to
12f29d5
Compare
meta-codesync Bot
pushed a commit
that referenced
this pull request
May 12, 2026
…r tier injection (#27725) Summary: Adds a riftTier field to the RPCNode plan node across all layers (Java → C++ protocol → Velox) so the coordinator can pass the auto-provisioned RIFT SMC tier to workers through the query plan. This is the follow-up to D101101436 that completes the Mode 2 (auto-start) data path. The RIFT CLI generates random tier names (e.g., smc.rift_amukher1_llama3_a3f1b2m), so the coordinator must communicate the tier through the plan node rather than deterministic naming. Changes across the full plan node chain: - Java RPCNode: add riftTier field, both constructors, getter, pass through in replaceChildren/assignStatsEquivalentPlanNode - Java RpcFunctionOptimizer: pass empty riftTier (coordinator injection TBD) - Java UnaliasSymbolReferences, PruneUnreferencedOutputs: pass through - C++ presto_protocol_core: add riftTier to RPCNode struct + serialization - C++ PrestoToVeloxQueryPlan: pass node->riftTier to Velox RPCNode - Velox PlanNode.h/cpp: add riftTier field, getter, serialize/deserialize - AsyncRPCFunction.h: add virtual setRiftTier() method (default no-op) - RPCOperator.cpp: call function_->setRiftTier() before initialize() - FbLlmInference: implement setRiftTier(), use plan node tier with highest priority (options JSON > plan node riftTier > session property) Differential Revision: D101105948
12f29d5 to
ec3469c
Compare
meta-codesync Bot
pushed a commit
that referenced
this pull request
May 12, 2026
…r tier injection (#27725) Summary: Adds a riftTier field to the RPCNode plan node across all layers (Java → C++ protocol → Velox) so the coordinator can pass the auto-provisioned RIFT SMC tier to workers through the query plan. This is the follow-up to D101101436 that completes the Mode 2 (auto-start) data path. The RIFT CLI generates random tier names (e.g., smc.rift_amukher1_llama3_a3f1b2m), so the coordinator must communicate the tier through the plan node rather than deterministic naming. Changes across the full plan node chain: - Java RPCNode: add riftTier field, both constructors, getter, pass through in replaceChildren/assignStatsEquivalentPlanNode - Java RpcFunctionOptimizer: pass empty riftTier (coordinator injection TBD) - Java UnaliasSymbolReferences, PruneUnreferencedOutputs: pass through - C++ presto_protocol_core: add riftTier to RPCNode struct + serialization - C++ PrestoToVeloxQueryPlan: pass node->riftTier to Velox RPCNode - Velox PlanNode.h/cpp: add riftTier field, getter, serialize/deserialize - AsyncRPCFunction.h: add virtual setRiftTier() method (default no-op) - RPCOperator.cpp: call function_->setRiftTier() before initialize() - FbLlmInference: implement setRiftTier(), use plan node tier with highest priority (options JSON > plan node riftTier > session property) Differential Revision: D101105948
ec3469c to
9e72e7d
Compare
…r tier injection (#27725) Summary: Adds a riftTier field to the RPCNode plan node across all layers (Java → C++ protocol → Velox) so the coordinator can pass the auto-provisioned RIFT SMC tier to workers through the query plan. This is the follow-up to D101101436 that completes the Mode 2 (auto-start) data path. The RIFT CLI generates random tier names (e.g., smc.rift_amukher1_llama3_a3f1b2m), so the coordinator must communicate the tier through the plan node rather than deterministic naming. Changes across the full plan node chain: - Java RPCNode: add riftTier field, both constructors, getter, pass through in replaceChildren/assignStatsEquivalentPlanNode - Java RpcFunctionOptimizer: pass empty riftTier (coordinator injection TBD) - Java UnaliasSymbolReferences, PruneUnreferencedOutputs: pass through - C++ presto_protocol_core: add riftTier to RPCNode struct + serialization - C++ PrestoToVeloxQueryPlan: pass node->riftTier to Velox RPCNode - Velox PlanNode.h/cpp: add riftTier field, getter, serialize/deserialize - AsyncRPCFunction.h: add virtual setRiftTier() method (default no-op) - RPCOperator.cpp: call function_->setRiftTier() before initialize() - FbLlmInference: implement setRiftTier(), use plan node tier with highest priority (options JSON > plan node riftTier > session property) Differential Revision: D101105948
9e72e7d to
80f08f9
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary:
Adds a riftTier field to the RPCNode plan node across all layers (Java →
C++ protocol → Velox) so the coordinator can pass the auto-provisioned RIFT
SMC tier to workers through the query plan.
This is the follow-up to D101101436 that completes the Mode 2 (auto-start)
data path. The RIFT CLI generates random tier names (e.g.,
smc.rift_amukher1_llama3_a3f1b2m), so the coordinator must communicate the
tier through the plan node rather than deterministic naming.
Changes across the full plan node chain:
in replaceChildren/assignStatsEquivalentPlanNode
priority (options JSON > plan node riftTier > session property)
Differential Revision: D101105948