Skip to content

Commit d12b7f8

Browse files
committed
add logical plan distributed optimizer to query frontend
Signed-off-by: rubywtl <[email protected]>
1 parent 52b9672 commit d12b7f8

File tree

9 files changed

+620
-4
lines changed

9 files changed

+620
-4
lines changed

pkg/api/queryapi/query_api.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ import (
1616
"github.com/prometheus/prometheus/util/annotations"
1717
"github.com/prometheus/prometheus/util/httputil"
1818
v1 "github.com/prometheus/prometheus/web/api/v1"
19-
"github.com/thanos-io/promql-engine/logicalplan"
2019
"github.com/weaveworks/common/httpgrpc"
2120

21+
"github.com/cortexproject/cortex/pkg/distributed_execution"
2222
"github.com/cortexproject/cortex/pkg/engine"
2323
"github.com/cortexproject/cortex/pkg/querier"
2424
"github.com/cortexproject/cortex/pkg/util"
@@ -110,7 +110,7 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
110110

111111
byteLP := []byte(r.PostFormValue("plan"))
112112
if len(byteLP) != 0 {
113-
logicalPlan, err := logicalplan.Unmarshal(byteLP)
113+
logicalPlan, err := distributed_execution.Unmarshal(byteLP)
114114
if err != nil {
115115
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
116116
}
@@ -183,7 +183,7 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
183183

184184
byteLP := []byte(r.PostFormValue("plan"))
185185
if len(byteLP) != 0 {
186-
logicalPlan, err := logicalplan.Unmarshal(byteLP)
186+
logicalPlan, err := distributed_execution.Unmarshal(byteLP)
187187
if err != nil {
188188
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
189189
}

pkg/distributed_execution/codec.go

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package distributed_execution
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"math"
7+
8+
"github.com/prometheus/prometheus/model/labels"
9+
"github.com/thanos-io/promql-engine/logicalplan"
10+
)
11+
12+
type jsonNode struct {
13+
Type logicalplan.NodeType `json:"type"`
14+
Data json.RawMessage `json:"data"`
15+
Children []json.RawMessage `json:"children,omitempty"`
16+
}
17+
18+
const (
19+
nanVal = `"NaN"`
20+
infVal = `"+Inf"`
21+
negInfVal = `"-Inf"`
22+
)
23+
24+
func Unmarshal(data []byte) (logicalplan.Node, error) {
25+
return unmarshalNode(data)
26+
}
27+
28+
func unmarshalNode(data []byte) (logicalplan.Node, error) {
29+
t := jsonNode{}
30+
if err := json.Unmarshal(data, &t); err != nil {
31+
return nil, err
32+
}
33+
34+
switch t.Type {
35+
case logicalplan.VectorSelectorNode:
36+
v := &logicalplan.VectorSelector{}
37+
if err := json.Unmarshal(t.Data, v); err != nil {
38+
return nil, err
39+
}
40+
var err error
41+
for i, m := range v.LabelMatchers {
42+
v.LabelMatchers[i], err = labels.NewMatcher(m.Type, m.Name, m.Value)
43+
if err != nil {
44+
return nil, err
45+
}
46+
}
47+
return v, nil
48+
case logicalplan.MatrixSelectorNode:
49+
m := &logicalplan.MatrixSelector{}
50+
if err := json.Unmarshal(t.Data, m); err != nil {
51+
return nil, err
52+
}
53+
vs, err := unmarshalNode(t.Children[0])
54+
if err != nil {
55+
return nil, err
56+
}
57+
m.VectorSelector = vs.(*logicalplan.VectorSelector)
58+
return m, nil
59+
case logicalplan.AggregationNode:
60+
a := &logicalplan.Aggregation{}
61+
if err := json.Unmarshal(t.Data, a); err != nil {
62+
return nil, err
63+
}
64+
var err error
65+
a.Expr, err = unmarshalNode(t.Children[0])
66+
if err != nil {
67+
return nil, err
68+
}
69+
if len(t.Children) > 1 {
70+
a.Param, err = unmarshalNode(t.Children[1])
71+
if err != nil {
72+
return nil, err
73+
}
74+
}
75+
return a, nil
76+
case logicalplan.BinaryNode:
77+
b := &logicalplan.Binary{}
78+
if err := json.Unmarshal(t.Data, b); err != nil {
79+
return nil, err
80+
}
81+
var err error
82+
b.LHS, err = unmarshalNode(t.Children[0])
83+
if err != nil {
84+
return nil, err
85+
}
86+
b.RHS, err = unmarshalNode(t.Children[1])
87+
if err != nil {
88+
return nil, err
89+
}
90+
return b, nil
91+
case logicalplan.FunctionNode:
92+
f := &logicalplan.FunctionCall{}
93+
if err := json.Unmarshal(t.Data, f); err != nil {
94+
return nil, err
95+
}
96+
for _, c := range t.Children {
97+
child, err := unmarshalNode(c)
98+
if err != nil {
99+
return nil, err
100+
}
101+
f.Args = append(f.Args, child)
102+
}
103+
return f, nil
104+
case logicalplan.NumberLiteralNode:
105+
n := &logicalplan.NumberLiteral{}
106+
if bytes.Equal(t.Data, []byte(infVal)) {
107+
n.Val = math.Inf(1)
108+
} else if bytes.Equal(t.Data, []byte(negInfVal)) {
109+
n.Val = math.Inf(-1)
110+
} else if bytes.Equal(t.Data, []byte(nanVal)) {
111+
n.Val = math.NaN()
112+
} else {
113+
if err := json.Unmarshal(t.Data, n); err != nil {
114+
return nil, err
115+
}
116+
}
117+
return n, nil
118+
case logicalplan.StringLiteralNode:
119+
s := &logicalplan.StringLiteral{}
120+
if err := json.Unmarshal(t.Data, s); err != nil {
121+
return nil, err
122+
}
123+
return s, nil
124+
case logicalplan.SubqueryNode:
125+
s := &logicalplan.Subquery{}
126+
if err := json.Unmarshal(t.Data, s); err != nil {
127+
return nil, err
128+
}
129+
var err error
130+
s.Expr, err = unmarshalNode(t.Children[0])
131+
if err != nil {
132+
return nil, err
133+
}
134+
return s, nil
135+
case logicalplan.CheckDuplicateNode:
136+
c := &logicalplan.CheckDuplicateLabels{}
137+
if err := json.Unmarshal(t.Data, c); err != nil {
138+
return nil, err
139+
}
140+
var err error
141+
c.Expr, err = unmarshalNode(t.Children[0])
142+
if err != nil {
143+
return nil, err
144+
}
145+
return c, nil
146+
case logicalplan.StepInvariantNode:
147+
s := &logicalplan.StepInvariantExpr{}
148+
if err := json.Unmarshal(t.Data, s); err != nil {
149+
return nil, err
150+
}
151+
var err error
152+
s.Expr, err = unmarshalNode(t.Children[0])
153+
if err != nil {
154+
return nil, err
155+
}
156+
return s, nil
157+
case logicalplan.ParensNode:
158+
p := &logicalplan.Parens{}
159+
if err := json.Unmarshal(t.Data, p); err != nil {
160+
return nil, err
161+
}
162+
var err error
163+
p.Expr, err = unmarshalNode(t.Children[0])
164+
if err != nil {
165+
return nil, err
166+
}
167+
return p, nil
168+
case logicalplan.UnaryNode:
169+
u := &logicalplan.Unary{}
170+
if err := json.Unmarshal(t.Data, u); err != nil {
171+
return nil, err
172+
}
173+
var err error
174+
u.Expr, err = unmarshalNode(t.Children[0])
175+
if err != nil {
176+
return nil, err
177+
}
178+
return u, nil
179+
case RemoteNode:
180+
r := &Remote{}
181+
if err := json.Unmarshal(t.Data, r); err != nil {
182+
return nil, err
183+
}
184+
var err error
185+
r.Expr, err = unmarshalNode(t.Children[0])
186+
if err != nil {
187+
return nil, err
188+
}
189+
return r, nil
190+
}
191+
return nil, nil
192+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package distributed_execution
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/require"
8+
"github.com/thanos-io/promql-engine/logicalplan"
9+
)
10+
11+
func TestUnmarshalWithLogicalPlan(t *testing.T) {
12+
t.Run("unmarshal complex query plan", func(t *testing.T) {
13+
start := time.Now()
14+
end := start.Add(1 * time.Hour)
15+
step := 15 * time.Second
16+
17+
testCases := []struct {
18+
name string
19+
query string
20+
}{
21+
{
22+
name: "binary operation",
23+
query: "http_requests_total + rate(node_cpu_seconds_total[5m])",
24+
},
25+
{
26+
name: "aggregation",
27+
query: "sum(rate(http_requests_total[5m])) by (job)",
28+
},
29+
{
30+
name: "complex query",
31+
query: "sum(rate(http_requests_total{job='prometheus'}[5m])) by (job) / sum(rate(node_cpu_seconds_total[5m])) by (job)",
32+
},
33+
}
34+
35+
for _, tc := range testCases {
36+
t.Run(tc.name, func(t *testing.T) {
37+
plan, _, err := CreateTestLogicalPlan(tc.query, start, end, step)
38+
require.NoError(t, err)
39+
require.NotNil(t, plan)
40+
41+
data, err := logicalplan.Marshal((*plan).Root())
42+
require.NoError(t, err)
43+
44+
node, err := Unmarshal(data)
45+
require.NoError(t, err)
46+
require.NotNil(t, node)
47+
48+
// the logical plan node before and after marshal/unmarshal should be the same
49+
verifyNodeStructure(t, (*plan).Root(), node)
50+
})
51+
}
52+
})
53+
}
54+
55+
func verifyNodeStructure(t *testing.T, expected logicalplan.Node, actual logicalplan.Node) {
56+
require.Equal(t, expected.Type(), actual.Type())
57+
require.Equal(t, expected.String(), actual.String())
58+
require.Equal(t, expected.ReturnType(), actual.ReturnType())
59+
60+
expectedChildren := expected.Children()
61+
actualChildren := actual.Children()
62+
63+
require.Equal(t, len(expectedChildren), len(actualChildren))
64+
65+
for i := 0; i < len(expectedChildren); i++ {
66+
if expectedChildren[i] != nil && actualChildren[i] != nil {
67+
verifyNodeStructure(t, *expectedChildren[i], *actualChildren[i])
68+
}
69+
}
70+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package distributed_execution
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/prometheus/prometheus/util/annotations"
7+
"github.com/thanos-io/promql-engine/logicalplan"
8+
)
9+
10+
// This is a simplified implementation that only handles binary aggregation cases
11+
// Future versions of the distributed optimizer are expected to:
12+
// - Support more complex query patterns
13+
// - Incorporate diverse optimization strategies
14+
// - Extend support to node types beyond binary operations
15+
16+
type DistributedOptimizer struct{}
17+
18+
func (d *DistributedOptimizer) Optimize(root logicalplan.Node) (logicalplan.Node, annotations.Annotations, error) {
19+
warns := annotations.New()
20+
21+
if root == nil {
22+
return nil, *warns, fmt.Errorf("nil root node")
23+
}
24+
25+
var hasAggregation bool
26+
logicalplan.TraverseBottomUp(nil, &root, func(parent, current *logicalplan.Node) bool {
27+
28+
if (*current).Type() == logicalplan.AggregationNode {
29+
hasAggregation = true
30+
}
31+
32+
if (*current).Type() == logicalplan.BinaryNode && hasAggregation {
33+
ch := (*current).Children()
34+
35+
for _, child := range ch {
36+
temp := (*child).Clone()
37+
*child = NewRemoteNode()
38+
*(*child).Children()[0] = temp
39+
}
40+
41+
hasAggregation = false
42+
}
43+
44+
return false
45+
})
46+
return root, *warns, nil
47+
}

0 commit comments

Comments
 (0)