@@ -2,11 +2,12 @@ package writer
2
2
3
3
import (
4
4
"fmt"
5
- "github.com/Netflix/spectator-go/v2/spectator/logger"
6
5
"runtime"
7
6
"sync"
8
7
"sync/atomic"
9
8
"time"
9
+
10
+ "github.com/Netflix/spectator-go/v2/spectator/logger"
10
11
)
11
12
12
13
// chunkSize is set to 60KB, to ensure each message fits in the socket buffer (64KB), with some room
@@ -26,28 +27,32 @@ const separator = "\n"
26
27
// is less than the impact of the front and back buffer design, but it is still important for
27
28
// throughput reasons.
28
29
type bufferShard struct {
29
- data [][]byte // Array of chunkSize chunks of spectatord protocol lines, stored as bytes
30
- chunkIndex int // Index of the chunk available for writes
31
- overflows int // Count the buffer overflows, which correspond to data drops, for reporting metrics
32
- mu sync.Mutex
30
+ data [][]byte // Array of chunkSize chunks of spectatord protocol lines, stored as bytes
31
+ chunkIndex int // Index of the chunk available for writes
32
+ overflows int // Count the buffer overflows, which correspond to data drops, for reporting metrics
33
+ overflowBytes int64 // Count the bytes that were dropped, for reporting metrics
34
+ mu sync.Mutex
33
35
}
34
36
35
37
// getChunkIndexForLine returns the chunkIndex that should be used for storing the line, or -1, if there is
36
38
// an overflow and the line cannot be stored in the bufferShard.
37
39
func (b * bufferShard ) getChunkIndexForLine (line []byte ) int {
40
+ totalWriteLength := len (line )
41
+
38
42
// All chunks are full for the shard, drop the data
39
43
if b .chunkIndex >= len (b .data ) {
40
44
b .overflows ++
45
+ b .overflowBytes += int64 (totalWriteLength )
41
46
return - 1
42
47
}
43
48
44
49
// This should not happen, drop the data. The maximum length of a well-formed protocol line is 3.8KB.
45
50
if len (line ) > chunkSize {
46
51
b .overflows ++
52
+ b .overflowBytes += int64 (totalWriteLength )
47
53
return - 1
48
54
}
49
55
50
- totalWriteLength := len (line )
51
56
if len (b .data [b .chunkIndex ]) > 0 {
52
57
// Chunk has data, so account for the separator character
53
58
totalWriteLength ++
@@ -61,6 +66,7 @@ func (b *bufferShard) getChunkIndexForLine(line []byte) int {
61
66
// Out of space in the shard, drop the data
62
67
if b .chunkIndex == len (b .data ) {
63
68
b .overflows ++
69
+ b .overflowBytes += int64 (totalWriteLength )
64
70
return - 1
65
71
}
66
72
@@ -190,6 +196,8 @@ func (llb *LowLatencyBuffer) flushLoop() {
190
196
191
197
// swapAndFlush swaps the front and back buffers and flushes the deactivated buffers
192
198
func (llb * LowLatencyBuffer ) swapAndFlush () {
199
+ start := time .Now ()
200
+
193
201
// Swap the buffer sets, so one can be drained, while the other accepts application writes
194
202
old := llb .useFrontBuffers .Load ()
195
203
llb .useFrontBuffers .CompareAndSwap (old , ! old )
@@ -219,6 +227,8 @@ func (llb *LowLatencyBuffer) swapAndFlush() {
219
227
if pctUsage > 0 {
220
228
llb .writer .WriteString (fmt .Sprintf ("g,1:spectator-go.lowLatencyBuffer.pctUsage,bufferSet=%s:%f" , bufferSet , pctUsage ))
221
229
}
230
+
231
+ llb .writer .WriteString (fmt .Sprintf ("t:spectator-go.lowLatencyBuffer.flushTime,bufferSet=%s:%f" , bufferSet , time .Since (start ).Seconds ()))
222
232
}
223
233
224
234
// flushBufferShard flushes a single bufferShard to the socket, iterating through all chunks
@@ -244,7 +254,9 @@ func (llb *LowLatencyBuffer) flushBufferShard(buffer *bufferShard, bufferSet str
244
254
// record status metrics and reset shard statistics
245
255
if buffer .overflows > 0 {
246
256
llb .writer .WriteString (fmt .Sprintf ("c:spectator-go.lowLatencyBuffer.overflows,bufferSet=%s:%d" , bufferSet , buffer .overflows ))
257
+ llb .writer .WriteString (fmt .Sprintf ("d:spectator-go.lowLatencyBuffer.overflowBytes,bufferSet=%s:%d" , bufferSet , buffer .overflowBytes ))
247
258
buffer .overflows = 0
259
+ buffer .overflowBytes = 0
248
260
}
249
261
buffer .chunkIndex = 0
250
262
return bytesWritten
0 commit comments