This repository has been archived by the owner on May 10, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
region.go
256 lines (234 loc) · 6.9 KB
/
region.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
package main
import (
"encoding/binary"
"encoding/hex"
"encoding/json"
"github.com/HunDunDM/key-visual/matrix"
"sync"
"time"
)
const defaultRegionPath = "../storage/region"
type regionInfo struct {
ID uint64 `json:"id"`
StartKey string `json:"start_key"`
EndKey string `json:"end_key"`
WrittenBytes uint64 `json:"written_bytes,omitempty"`
ReadBytes uint64 `json:"read_bytes,omitempty"`
WrittenKeys uint64 `json:"written_keys,omitempty"`
ReadKeys uint64 `json:"read_keys,omitempty"`
}
func ScanRegions() []*regionInfo {
var key []byte
var err error
regions := make([]*regionInfo, 0, 1024)
for {
info := regionRequest(key, 1024)
length := len(info.Regions)
if length == 0 {
break
}
regions = append(regions, info.Regions...)
lastEndKey := info.Regions[length-1].EndKey
if lastEndKey == "" {
break
}
key, err = hex.DecodeString(lastEndKey)
perr(err)
}
return regions
}
type regionData struct {
WrittenBytes uint64 `json:"written_bytes"`
ReadBytes uint64 `json:"read_bytes"`
WrittenKeys uint64 `json:"written_keys"`
ReadKeys uint64 `json:"read_keys"`
}
// a storage unit of region information, which needs to implement the matrix.Value interface
type regionUnit struct {
// calculate average and maximum simultaneously
Max regionData `json:"max"`
Average regionData `json:"average"`
}
func newRegionUnit(r *regionInfo) *regionUnit {
rValue := regionData{
WrittenBytes: r.WrittenBytes,
ReadBytes: r.ReadBytes,
WrittenKeys: r.WrittenKeys,
ReadKeys: r.ReadKeys,
}
return ®ionUnit{
Max: rValue,
Average: rValue,
}
}
func (r *regionUnit) Merge(other *regionUnit) {
r.Max.WrittenBytes = Max(r.Max.WrittenBytes, other.Max.WrittenBytes)
r.Max.WrittenKeys = Max(r.Max.WrittenKeys, other.Max.WrittenKeys)
r.Max.ReadBytes = Max(r.Max.ReadBytes, other.Max.ReadBytes)
r.Max.ReadKeys = Max(r.Max.ReadKeys, other.Max.ReadKeys)
r.Average.WrittenBytes = r.Average.WrittenBytes + other.Average.WrittenBytes
r.Average.WrittenKeys = r.Average.WrittenKeys + other.Average.WrittenKeys
r.Average.ReadBytes = r.Average.ReadBytes + other.Average.ReadBytes
r.Average.ReadKeys = r.Average.ReadKeys + other.Average.ReadKeys
}
func (r regionUnit) Useless(threshold uint64) bool {
return Max(r.Max.ReadBytes, r.Max.WrittenBytes) < threshold
}
func (r regionUnit) BuildMultiValue() *MultiUnit {
max := MultiValue{
r.Max.WrittenBytes,
r.Max.ReadBytes,
r.Max.WrittenKeys,
r.Max.ReadKeys,
}
average := MultiValue{
r.Average.WrittenBytes,
r.Average.ReadBytes,
r.Average.WrittenKeys,
r.Average.ReadKeys,
}
return &MultiUnit{
max,
average,
}
}
// here we define another Line structure different from matrix.Line
// because that one uses a interface and cannot be encoded to json string
type Line struct {
EndKey string `json:"end_key"`
RegionUnit *regionUnit `json:"region_unit"`
}
type DiscreteAxis struct {
StartKey string `json:"start_key"` // the first line's StartKey
Lines []*Line `json:"lines"`
EndTime time.Time `json:"end_time"` // the last line's EndTime
}
// merge lines that have values less than threshold
// which is like eliminate the noise point in a map
func (axis *DiscreteAxis) DeNoise(threshold uint64) {
newAxis := make([]*Line, 0)
// a consecutive set of lines which all have values less than threshold can be merged
// a consecutive set of lines which have values that are very close to each other can also be merged
isLastLess := false // indicates whether the last line's value is less than threshold
var lastIndex int64 = -1 // the last line's index
for _, line := range axis.Lines {
if line.RegionUnit.Useless(threshold) {
if isLastLess { // if the prior line's value is also less than threshold, do merge operation
newAxis[len(newAxis)-1].RegionUnit.Merge(line.RegionUnit)
newAxis[len(newAxis)-1].EndKey = line.EndKey
} else {
isLastLess = true
newAxis = append(newAxis, line)
}
} else { // when meeting a line which has value bigger than threshold
isLastLess = false
if lastIndex == -1 || line.RegionUnit != axis.Lines[lastIndex].RegionUnit {
newAxis = append(newAxis, line)
} else { // means that this value is the same as the prior value
newAxis[len(newAxis)-1].RegionUnit.Merge(line.RegionUnit)
newAxis[len(newAxis)-1].EndKey = line.EndKey
}
}
lastIndex++
}
axis.Lines = newAxis
}
// convert the regionInfo into key axis and insert it into Stat
func (r *RegionStore) Append(regions []*regionInfo) {
if len(regions) == 0 {
return
}
if regions[len(regions)-1].EndKey == "" {
regions[len(regions)-1].EndKey = "~"
}
// find the first regionInfo that is not nil
firstIndex := 0
for firstIndex < len(regions) {
if regions[firstIndex] != nil {
break
} else {
firstIndex++
}
}
if firstIndex == len(regions) {
return
}
// generate DiscreteAxis firstly
axis := &DiscreteAxis{
StartKey: regions[firstIndex].StartKey,
EndTime: time.Now(),
}
// generate lines
for _, info := range regions {
if info == nil {
continue
}
line := &Line{
EndKey: info.EndKey,
RegionUnit: newRegionUnit(info),
}
axis.Lines = append(axis.Lines, line)
}
// compress those lines that have values 0
axis.DeNoise(1)
value, err := json.Marshal(axis)
perr(err)
nowTime := make([]byte, 8)
binary.BigEndian.PutUint64(nowTime, uint64(time.Now().Unix()))
r.Lock()
defer r.Unlock()
err = r.Save(nowTime, value)
perr(err)
}
func (r *RegionStore) Range(startTime time.Time, endTime time.Time, separateValue func(r *regionUnit) matrix.Value) *matrix.DiscretePlane {
// range information in time axis
start := startTime.Unix()
end := endTime.Unix()
var startBuf = make([]byte, 8)
var endBuf = make([]byte, 8)
binary.BigEndian.PutUint64(startBuf, uint64(start))
binary.BigEndian.PutUint64(endBuf, uint64(end))
r.RLock()
_, rangeValues, _ := r.LoadRange(startBuf, endBuf)
r.RUnlock()
if rangeValues == nil || len(rangeValues) == 0 {
return nil
}
var rangeTimePlane matrix.DiscretePlane
for _, value := range rangeValues {
axis := DiscreteAxis{}
err := json.Unmarshal([]byte(value), &axis)
perr(err)
lines := make([]*matrix.Line, len(axis.Lines))
for i, v := range axis.Lines {
lines[i] = &matrix.Line{
EndKey: v.EndKey,
Value: separateValue(v.RegionUnit),
}
}
newAxis := matrix.DiscreteAxis{
StartKey: axis.StartKey,
Lines: lines,
EndTime: axis.EndTime,
}
rangeTimePlane.Axes = append(rangeTimePlane.Axes, &newAxis)
}
rangeTimePlane.StartTime = rangeTimePlane.Axes[0].EndTime.Add(-*interval)
return &rangeTimePlane
}
type RegionStore struct {
sync.RWMutex
*LeveldbStorage
}
var globalRegionStore RegionStore
func init() {
globalRegionStore.LeveldbStorage, _ = NewLeveldbStorage(defaultRegionPath)
regions := []*regionInfo{
{
StartKey: "",
EndKey: "~",
},
}
// insert an empty axis, which means that from the last time the server shutdown till now the data is zero
globalRegionStore.Append(regions)
}