-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmeasurement.manual.go
More file actions
155 lines (125 loc) · 4.02 KB
/
measurement.manual.go
File metadata and controls
155 lines (125 loc) · 4.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package db
import (
"context"
"math"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
)
type TimeseriesMeasurementCollectionGetForRangeParams struct {
TimeseriesID uuid.UUID
EncodedTimeWindow string
Threshold int
Limit int32
CursorTime time.Time
SortDesc bool
}
// TimeseriesMeasurementCollectionGetForRange queries timeseries measurements and optionally downsamples them using LTTB in go
func (q *Queries) TimeseriesMeasurementCollectionGetForRange(ctx context.Context, arg TimeseriesMeasurementCollectionGetForRangeParams) (MeasurementCollection, error) {
var mc MeasurementCollection
timeseriesMeasurementListForRange := `
select * from v_timeseries_measurement
where timeseries_id=$1
and time <@ $2::tstzrange
`
if arg.SortDesc {
timeseriesMeasurementListForRange += " order by time desc"
} else {
timeseriesMeasurementListForRange += " order by time asc"
}
param := []any{arg.TimeseriesID, arg.EncodedTimeWindow}
if arg.Limit != 0 {
param = append(param, arg.Limit)
timeseriesMeasurementListForRange += " limit $3"
}
rows, err := q.db.Query(ctx, timeseriesMeasurementListForRange, param...)
if err != nil {
return mc, err
}
mm, err := pgx.CollectRows[Measurement](rows, pgx.RowToStructByNameLax)
if err != nil {
return mc, err
}
mc.TimeseriesID = arg.TimeseriesID
mc.Items = LTTB(mm, arg.Threshold)
return mc, nil
}
type MeasurementGetter interface {
getTime() time.Time
getValue() float64
}
func (m VTimeseriesMeasurement) getTime() time.Time {
return m.Time
}
func (m VTimeseriesMeasurement) getValue() float64 {
return float64(m.Value)
}
func (m Measurement) getTime() time.Time {
return m.Time
}
func (m Measurement) getValue() float64 {
return float64(m.Value)
}
func (m ProcessMeasurement) getTime() time.Time {
return m.Time
}
func (m ProcessMeasurement) getValue() float64 {
return float64(m.Value)
}
// A slightly modified LTTB (Largest-Triange-Three-Buckets) algorithm for downsampling timeseries measurements
// https://godoc.org/github.com/dgryski/go-lttb
func LTTB[T MeasurementGetter](data []T, threshold int) []T {
if threshold == 0 || threshold >= len(data) {
return data // Nothing to do
}
if threshold < 3 {
threshold = 3
}
sampled := make([]T, 0, threshold)
// Bucket size. Leave room for start and end data points
every := float64(len(data)-2) / float64(threshold-2)
sampled = append(sampled, data[0]) // Always add the first point
bucketStart := 1
bucketCenter := int(math.Floor(every)) + 1
var a int
for i := range threshold - 2 {
bucketEnd := int(math.Floor(float64(i+2)*every)) + 1
// Calculate point average for next bucket (containing c)
avgRangeStart := bucketCenter
avgRangeEnd := bucketEnd
avgRangeEnd = min(avgRangeEnd, len(data))
avgRangeLength := float64(avgRangeEnd - avgRangeStart)
var avgX, avgY float64
for ; avgRangeStart < avgRangeEnd; avgRangeStart++ {
avgX += time.Duration(data[avgRangeStart].getTime().Unix()).Seconds()
avgY += data[avgRangeStart].getValue()
}
avgX /= avgRangeLength
avgY /= avgRangeLength
// Get the range for this bucket
rangeOffs := bucketStart
rangeTo := bucketCenter
// Point a
pointAX := time.Duration(data[a].getTime().UnixNano()).Seconds()
pointAY := data[a].getValue()
maxArea := float64(-1.0)
var nextA int
for ; rangeOffs < rangeTo; rangeOffs++ {
// Calculate triangle area over three buckets
area := (pointAX-avgX)*(data[rangeOffs].getValue()-pointAY) - (pointAX-time.Duration(data[rangeOffs].getTime().Unix()).Seconds())*(avgY-pointAY)
// We only care about the relative area here.
// Calling math.Abs() is slower than squaring
area *= area
if area > maxArea {
maxArea = area
nextA = rangeOffs // Next a is this b
}
}
sampled = append(sampled, data[nextA]) // Pick this point from the bucket
a = nextA // This a is the next a (chosen b)
bucketStart = bucketCenter
bucketCenter = bucketEnd
}
sampled = append(sampled, data[len(data)-1]) // Always add last
return sampled
}