Skip to content

Commit 627d788

Browse files
committed
Implement read bench with refactoring
Signed-off-by: Shuo Wu <[email protected]>
1 parent 7833484 commit 627d788

File tree

1 file changed

+73
-38
lines changed

1 file changed

+73
-38
lines changed

pkg/util/util.go

Lines changed: 73 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,13 @@ func RandStringRunes(n int) string {
331331
}
332332

333333
func Bench(benchType string, thread int, size int64, writeAt, readAt func([]byte, int64) (int, error)) (output string, err error) {
334-
lock := sync.Mutex{}
334+
benchTypeInList := strings.Split(benchType, "-")
335+
if len(benchTypeInList) != 3 ||
336+
(benchTypeInList[0] != "seq" && benchTypeInList[0] != "rand") ||
337+
(benchTypeInList[1] != "iops" && benchTypeInList[1] != "bandwidth" && benchTypeInList[1] != "latency") ||
338+
(benchTypeInList[2] != "read" && benchTypeInList[2] != "write") {
339+
return "", fmt.Errorf("invalid bench type %s", benchType)
340+
}
335341

336342
if thread != 1 && strings.Contains(benchType, "-latency-") {
337343
logrus.Warnf("Using single thread for latency related benchmark")
@@ -343,41 +349,92 @@ func Bench(benchType string, thread int, size int64, writeAt, readAt func([]byte
343349
blockSize = 1 << 20 // 1MB
344350
}
345351

346-
blockBytes := []byte(RandStringRunes(blockSize))
352+
var duration time.Duration
353+
354+
// Prepare data before read
355+
if benchTypeInList[2] == "read" {
356+
// Typically 4-thread write is enough
357+
if _, err := dataIOWithMultipleThread(false, 4, 1<<20, size, writeAt); err != nil {
358+
return "", err
359+
}
360+
361+
if duration, err = dataIOWithMultipleThread(benchTypeInList[0] == "rand", thread, blockSize, size, readAt); err != nil {
362+
return "", err
363+
}
364+
}
365+
366+
if benchTypeInList[2] == "write" {
367+
if duration, err = dataIOWithMultipleThread(benchTypeInList[0] == "rand", thread, blockSize, size, writeAt); err != nil {
368+
return "", err
369+
}
370+
}
371+
372+
switch benchTypeInList[1] {
373+
case "iops":
374+
res := int(float64(size) / float64(blockSize) / float64(duration) * 1000000000)
375+
output = fmt.Sprintf("instance %s %v/s, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread)
376+
case "bandwidth":
377+
res := int(float64(size) / float64(duration) * 1000000000 / float64(1<<10))
378+
output = fmt.Sprintf("instance %s %vKB/s, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread)
379+
case "latency":
380+
res := float64(duration) / 1000 / (float64(size) / float64(blockSize))
381+
output = fmt.Sprintf("instance %s %.2fus, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread)
382+
}
383+
return output, nil
384+
}
385+
386+
func dataIOWithMultipleThread(isRandomIO bool, thread, blockSize int, size int64, ioAt func([]byte, int64) (int, error)) (duration time.Duration, err error) {
387+
lock := sync.Mutex{}
388+
347389
chunkSize := int(math.Ceil(float64(size) / float64(thread)))
348390
chunkBlocks := int(math.Ceil(float64(chunkSize) / float64(blockSize)))
391+
var sequenceList []int
392+
if isRandomIO {
393+
sequenceList = make([]int, chunkBlocks)
394+
for i := 0; i < chunkBlocks; i++ {
395+
sequenceList[i] = i
396+
}
397+
rand.Shuffle(chunkBlocks, func(i, j int) { sequenceList[i], sequenceList[j] = sequenceList[j], sequenceList[i] })
398+
}
399+
400+
if chunkSize < blockSize {
401+
return 0, fmt.Errorf("the io thread count is too much so that each thread cannot operate a single block")
402+
}
349403

350404
wg := sync.WaitGroup{}
351405
wg.Add(thread)
406+
352407
startTime := time.Now()
408+
defer func() {
409+
duration = time.Since(startTime)
410+
}()
411+
353412
for i := 0; i < thread; i++ {
354413
idx := i
355414
go func() {
356415
defer wg.Done()
357416

417+
// Ignore this randomly generate data if the ioAt is readAt
418+
blockBytes := []byte(RandStringRunes(blockSize))
419+
358420
start := int64(idx) * int64(chunkSize)
359421
end := int64(idx+1) * int64(chunkSize)
360422
offset := start
361423
for cnt := 0; cnt < chunkBlocks; cnt++ {
362-
if strings.HasPrefix(benchType, "seq-") {
363-
offset = start + int64(cnt*blockSize)
364-
if offset+int64(blockSize) > end {
365-
blockBytes = blockBytes[:end-offset]
366-
}
367-
} else if strings.HasPrefix(benchType, "rand-") {
368-
offset = start + int64(rand.Intn(cnt)*blockSize)
424+
if isRandomIO {
425+
offset = start + int64(sequenceList[cnt]*blockSize)
369426
if offset+int64(blockSize) > end {
370427
offset -= int64(blockSize)
371428
}
372429
} else {
373-
lock.Lock()
374-
err = fmt.Errorf("invalid bench type %s", benchType)
375-
lock.Unlock()
376-
return
430+
offset = start + int64(cnt*blockSize)
431+
if offset+int64(blockSize) > end {
432+
blockBytes = blockBytes[:end-offset]
433+
}
377434
}
378-
if _, writeErr := writeAt(blockBytes, offset); writeErr != nil {
435+
if _, ioErr := ioAt(blockBytes, offset); ioErr != nil {
379436
lock.Lock()
380-
err = writeErr
437+
err = ioErr
381438
lock.Unlock()
382439
return
383440
}
@@ -386,27 +443,5 @@ func Bench(benchType string, thread int, size int64, writeAt, readAt func([]byte
386443
}
387444
wg.Wait()
388445

389-
if err != nil {
390-
return "", err
391-
}
392-
393-
duration := time.Since(startTime)
394-
switch benchType {
395-
case "seq-iops-write":
396-
fallthrough
397-
case "rand-iops-write":
398-
res := int(float64(size) / float64(blockSize) / float64(duration) * 1000000000)
399-
output = fmt.Sprintf("instance %s %v/s, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread)
400-
case "seq-bandwidth-write":
401-
fallthrough
402-
case "rand-bandwidth-write":
403-
res := int(float64(size) / float64(duration) * 1000000000 / float64(1<<10))
404-
output = fmt.Sprintf("instance %s %vKB/s, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread)
405-
case "seq-latency-write":
406-
fallthrough
407-
case "rand-latency-write":
408-
res := float64(duration) / 1000 / (float64(size) / float64(blockSize))
409-
output = fmt.Sprintf("instance %s %.2fus, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread)
410-
}
411-
return output, nil
446+
return
412447
}

0 commit comments

Comments
 (0)