Skip to content

Commit 50f2cb6

Browse files
frouiouisystay
authored andcommitted
feat: include query timings in json log from keys
Signed-off-by: Florent Poinsard <[email protected]> Signed-off-by: Andres Taylor <[email protected]>
1 parent c9a7943 commit 50f2cb6

9 files changed

+551
-186
lines changed

go/cmd/keys.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func addInputTypeFlag(cmd *cobra.Command, s *string) {
6161
func configureLoader(inputType string, needsBindVars bool) (data.Loader, error) {
6262
switch inputType {
6363
case "sql":
64-
return data.SQLScriptLoader{}, nil
64+
return data.SlowQueryLogLoader{}, nil
6565
case "mysql-log":
6666
return data.MySQLLogLoader{}, nil
6767
case "vtgate-log":

go/data/query.go

+5
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ type (
4040
Query string
4141
Line int
4242
Type CmdType
43+
44+
// These fields are only set if the log file is a slow query log
45+
QueryTime, LockTime float64
46+
RowsSent, RowsExamined int
47+
Timestamp int64
4348
}
4449

4550
errLoader struct {

go/data/slow_query_log_loader.go

+330
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,330 @@
1+
/*
2+
Copyright 2024 The Vitess Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package data
18+
19+
import (
20+
"bufio"
21+
"bytes"
22+
"errors"
23+
"fmt"
24+
"io"
25+
"net/http"
26+
"os"
27+
"strconv"
28+
"strings"
29+
)
30+
31+
type SlowQueryLogLoader struct{}
32+
33+
type slowQueryLogReaderState struct {
34+
logReaderState
35+
}
36+
37+
type lineProcessorState struct {
38+
currentQuery Query
39+
newStmt bool
40+
hasQueryMetadata bool
41+
}
42+
43+
func (s *slowQueryLogReaderState) Next() (Query, bool) {
44+
s.mu.Lock()
45+
defer s.mu.Unlock()
46+
47+
if s.closed || s.err != nil {
48+
return Query{}, false
49+
}
50+
51+
state := &lineProcessorState{
52+
newStmt: true,
53+
}
54+
55+
for s.scanner.Scan() {
56+
s.lineNumber++
57+
line := strings.TrimSpace(s.scanner.Text())
58+
59+
result, done, err := s.processLine(line, state)
60+
if err != nil {
61+
s.err = err
62+
return Query{}, false
63+
}
64+
if done {
65+
return result, true
66+
}
67+
}
68+
69+
if err := s.scanner.Err(); err != nil {
70+
s.err = err
71+
return Query{}, false
72+
}
73+
74+
if !state.newStmt && state.currentQuery.Query != "" {
75+
s.err = errors.New("EOF: missing semicolon")
76+
}
77+
return Query{}, false
78+
}
79+
80+
func (s *slowQueryLogReaderState) processLine(line string, state *lineProcessorState) (Query, bool, error) {
81+
switch {
82+
case len(line) == 0:
83+
return Query{}, false, nil
84+
case strings.HasPrefix(line, "#"):
85+
hasMetadata, err := s.processCommentLine(line, state)
86+
if err != nil {
87+
return Query{}, false, fmt.Errorf("line %d: %w", s.lineNumber, err)
88+
}
89+
if hasMetadata {
90+
state.hasQueryMetadata = true
91+
}
92+
return Query{}, false, nil
93+
case strings.HasPrefix(line, "SET timestamp=") && state.hasQueryMetadata:
94+
err := s.processSetTimestampLine(line, state)
95+
if err != nil {
96+
return Query{}, false, fmt.Errorf("line %d: %w", s.lineNumber, err)
97+
}
98+
state.hasQueryMetadata = false
99+
return Query{}, false, nil
100+
case strings.HasPrefix(line, "--"):
101+
pq, err := s.processStatementLine(line, state)
102+
if err != nil {
103+
return Query{}, false, fmt.Errorf("line %d: %w", s.lineNumber, err)
104+
}
105+
if pq != nil {
106+
return *pq, true, nil
107+
}
108+
return Query{}, false, nil
109+
case state.newStmt:
110+
s.startNewQuery(line, state)
111+
default:
112+
s.appendToCurrentQuery(line, state)
113+
}
114+
115+
state.newStmt = strings.HasSuffix(line, ";")
116+
if state.newStmt {
117+
pq, err := s.processEndOfStatement(line, state)
118+
if err != nil {
119+
return Query{}, false, fmt.Errorf("line %d: %w", s.lineNumber, err)
120+
}
121+
if pq != nil {
122+
return *pq, true, nil
123+
}
124+
}
125+
126+
return Query{}, false, nil
127+
}
128+
129+
func (s *slowQueryLogReaderState) processCommentLine(line string, state *lineProcessorState) (bool, error) {
130+
if strings.HasPrefix(line, "# Query_time:") {
131+
if err := parseQueryMetrics(line, &state.currentQuery); err != nil {
132+
return false, err
133+
}
134+
return true, nil
135+
}
136+
return false, nil
137+
}
138+
139+
func (s *slowQueryLogReaderState) processSetTimestampLine(line string, state *lineProcessorState) error {
140+
tsStr := strings.TrimPrefix(line, "SET timestamp=")
141+
tsStr = strings.TrimSuffix(tsStr, ";")
142+
ts, err := strconv.ParseInt(tsStr, 10, 64)
143+
if err != nil {
144+
return fmt.Errorf("invalid timestamp '%s': %w", tsStr, err)
145+
}
146+
state.currentQuery.Timestamp = ts
147+
return nil
148+
}
149+
150+
func (s *slowQueryLogReaderState) processStatementLine(line string, state *lineProcessorState) (*Query, error) {
151+
state.newStmt = true
152+
q := Query{Query: line, Line: s.lineNumber}
153+
pq, err := parseQuery(q)
154+
if err != nil {
155+
return nil, err
156+
}
157+
return pq, nil
158+
}
159+
160+
func (s *slowQueryLogReaderState) processEndOfStatement(line string, state *lineProcessorState) (*Query, error) {
161+
if strings.HasPrefix(line, "SET timestamp=") && state.currentQuery.QueryTime > 0 {
162+
return nil, nil
163+
}
164+
pq, err := parseQuery(state.currentQuery)
165+
if err != nil {
166+
return nil, err
167+
}
168+
return pq, nil
169+
}
170+
171+
func (s *slowQueryLogReaderState) startNewQuery(line string, state *lineProcessorState) {
172+
state.currentQuery.Query = line
173+
state.currentQuery.Line = s.lineNumber
174+
}
175+
176+
func (s *slowQueryLogReaderState) appendToCurrentQuery(line string, state *lineProcessorState) {
177+
state.currentQuery.Query = fmt.Sprintf("%s\n%s", state.currentQuery.Query, line)
178+
}
179+
180+
// parseQueryMetrics parses the metrics from the comment line and assigns them to the Query struct.
181+
func parseQueryMetrics(line string, q *Query) error {
182+
line = strings.TrimPrefix(line, "#")
183+
line = strings.TrimSpace(line)
184+
185+
fields := strings.Fields(line)
186+
187+
i := 0
188+
for i < len(fields) {
189+
field := fields[i]
190+
if !strings.HasSuffix(field, ":") {
191+
return fmt.Errorf("unexpected field format '%s'", field)
192+
}
193+
194+
// Remove the trailing colon to get the key
195+
key := strings.TrimSuffix(field, ":")
196+
if i+1 >= len(fields) {
197+
return fmt.Errorf("missing value for key '%s'", key)
198+
}
199+
value := fields[i+1]
200+
201+
// Assign to Query struct based on key
202+
switch key {
203+
case "Query_time":
204+
fval, err := strconv.ParseFloat(value, 64)
205+
if err != nil {
206+
return fmt.Errorf("invalid Query_time value '%s'", value)
207+
}
208+
q.QueryTime = fval
209+
case "Lock_time":
210+
fval, err := strconv.ParseFloat(value, 64)
211+
if err != nil {
212+
return fmt.Errorf("invalid Lock_time value '%s'", value)
213+
}
214+
q.LockTime = fval
215+
case "Rows_sent":
216+
ival, err := strconv.Atoi(value)
217+
if err != nil {
218+
return fmt.Errorf("invalid Rows_sent value '%s'", value)
219+
}
220+
q.RowsSent = ival
221+
case "Rows_examined":
222+
ival, err := strconv.Atoi(value)
223+
if err != nil {
224+
return fmt.Errorf("invalid Rows_examined value '%s'", value)
225+
}
226+
q.RowsExamined = ival
227+
}
228+
i += 2 // Move to the next key-value pair
229+
}
230+
231+
return nil
232+
}
233+
234+
func readData(url string) ([]byte, error) {
235+
client := http.Client{}
236+
res, err := client.Get(url)
237+
if err != nil {
238+
return nil, err
239+
}
240+
if res.StatusCode != http.StatusOK {
241+
return nil, fmt.Errorf("failed to get data from %s, status code %d", url, res.StatusCode)
242+
}
243+
defer res.Body.Close()
244+
return io.ReadAll(res.Body)
245+
}
246+
247+
func (SlowQueryLogLoader) Load(filename string) IteratorLoader {
248+
var scanner *bufio.Scanner
249+
var fd *os.File
250+
251+
if strings.HasPrefix(filename, "http") {
252+
data, err := readData(filename)
253+
if err != nil {
254+
return &errLoader{err: err}
255+
}
256+
scanner = bufio.NewScanner(bytes.NewReader(data))
257+
} else {
258+
var err error
259+
fd, err = os.OpenFile(filename, os.O_RDONLY, 0)
260+
if err != nil {
261+
return &errLoader{err: err}
262+
}
263+
scanner = bufio.NewScanner(fd)
264+
}
265+
266+
return &slowQueryLogReaderState{
267+
logReaderState: logReaderState{
268+
scanner: scanner,
269+
fd: fd,
270+
},
271+
}
272+
}
273+
274+
// Helper function to parse individual queries
275+
func parseQuery(rs Query) (*Query, error) {
276+
realS := rs.Query
277+
s := rs.Query
278+
q := Query{
279+
Line: rs.Line,
280+
Type: Unknown,
281+
QueryTime: rs.QueryTime,
282+
LockTime: rs.LockTime,
283+
RowsSent: rs.RowsSent,
284+
RowsExamined: rs.RowsExamined,
285+
Timestamp: rs.Timestamp,
286+
}
287+
288+
if len(s) < 3 {
289+
return nil, nil
290+
}
291+
292+
switch {
293+
case strings.HasPrefix(s, "#"):
294+
q.Type = Comment
295+
return &q, nil
296+
case strings.HasPrefix(s, "--"):
297+
q.Type = CommentWithCommand
298+
if len(s) > 2 && s[2] == ' ' {
299+
s = s[3:]
300+
} else {
301+
s = s[2:]
302+
}
303+
case s[0] == '\n':
304+
q.Type = EmptyLine
305+
return &q, nil
306+
}
307+
308+
i := findFirstWord(s)
309+
if i > 0 {
310+
q.FirstWord = s[:i]
311+
}
312+
q.Query = s[i:]
313+
314+
if q.Type == Unknown || q.Type == CommentWithCommand {
315+
if err := q.getQueryType(realS); err != nil {
316+
return nil, err
317+
}
318+
}
319+
320+
return &q, nil
321+
}
322+
323+
// findFirstWord calculates the length of the first word in the string
324+
func findFirstWord(s string) int {
325+
i := 0
326+
for i < len(s) && s[i] != '(' && s[i] != ' ' && s[i] != ';' && s[i] != '\n' {
327+
i++
328+
}
329+
return i
330+
}

0 commit comments

Comments
 (0)