Skip to content

Commit dd66c88

Browse files
Merge pull request #32 from redis-performance/publisher
Optimize RTT payload generation to include configurable data size
2 parents 042bfdc + 24c646e commit dd66c88

File tree

1 file changed

+32
-9
lines changed

1 file changed

+32
-9
lines changed

subscriber.go

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ const (
3131
redisTLSCert = "tls_cert"
3232
redisTLSKey = "tls_key"
3333
redisTLSInsecureSkipVerify = "tls_insecure_skip_verify"
34+
timestampSize = 13 // UnixMilli() produces 13-digit number
3435
)
3536

3637
const Inf = rate.Limit(math.MaxFloat64)
@@ -65,16 +66,24 @@ func publisherRoutine(clientName string, channels []string, mode string, measure
6566
clientName, mode, len(channels),
6667
func() string {
6768
if measureRTT {
68-
return "RTT timestamp"
69+
return fmt.Sprintf("RTT timestamp + padding to %d bytes", dataSize)
6970
}
7071
return fmt.Sprintf("fixed size %d bytes", dataSize)
7172
}(),
7273
)
7374
}
7475

75-
var payload string
76-
if !measureRTT {
77-
payload = strings.Repeat("A", dataSize)
76+
// Pre-generate payload once per goroutine
77+
// For RTT mode: we'll use a template with padding that we'll prepend timestamp to
78+
// Timestamp format: 13 bytes (e.g., "1762249648882")
79+
// Format: "<timestamp> <padding>" to reach dataSize
80+
var paddingPayload string
81+
if measureRTT && dataSize > timestampSize+1 {
82+
// +1 for space separator
83+
paddingSize := dataSize - timestampSize - 1
84+
paddingPayload = strings.Repeat("A", paddingSize)
85+
} else if !measureRTT {
86+
paddingPayload = strings.Repeat("A", dataSize)
7887
}
7988

8089
for {
@@ -84,7 +93,7 @@ func publisherRoutine(clientName string, channels []string, mode string, measure
8493
return
8594

8695
default:
87-
msg := payload
96+
var msg string
8897

8998
for _, ch := range channels {
9099
if useLimiter {
@@ -93,7 +102,15 @@ func publisherRoutine(clientName string, channels []string, mode string, measure
93102
}
94103
if measureRTT {
95104
now := time.Now().UnixMilli()
96-
msg = strconv.FormatInt(int64(now), 10)
105+
if dataSize > timestampSize+1 {
106+
// Format: "<timestamp> <padding>"
107+
msg = strconv.FormatInt(int64(now), 10) + " " + paddingPayload
108+
} else {
109+
// Just timestamp if dataSize is too small
110+
msg = strconv.FormatInt(int64(now), 10)
111+
}
112+
} else {
113+
msg = paddingPayload
97114
}
98115
var err error
99116
switch mode {
@@ -198,15 +215,21 @@ func subscriberRoutine(clientName, mode string, channels []string, verbose bool,
198215
log.Println(fmt.Sprintf("received message in channel %s. Message: %s", msg.Channel, msg.Payload))
199216
}
200217
if measureRTT {
201-
if ts, err := strconv.ParseInt(msg.Payload, 10, 64); err == nil {
202-
now := time.Now().UnixMicro()
218+
now := time.Now().UnixMicro()
219+
// Extract timestamp from payload (format: "<timestamp> <padding>" or just "<timestamp>")
220+
// Timestamp is always 13 bytes (UnixMilli)
221+
timestampStr := msg.Payload
222+
if len(msg.Payload) > timestampSize {
223+
timestampStr = msg.Payload[:timestampSize]
224+
}
225+
if ts, err := strconv.ParseInt(timestampStr, 10, 64); err == nil {
203226
rtt := now - ts
204227
rttLatencyChannel <- rtt
205228
if verbose {
206229
log.Printf("RTT measured: %d ms\n", rtt/1000)
207230
}
208231
} else {
209-
log.Printf("Invalid timestamp in message: %s, err: %v\n", msg.Payload, err)
232+
log.Printf("Invalid timestamp in message: %s, err: %v\n", timestampStr, err)
210233
}
211234
}
212235
atomic.AddUint64(&totalMessages, 1)

0 commit comments

Comments
 (0)