Skip to content

Commit 36d31a7

Browse files
committed
Implement read bench with refactoring
Signed-off-by: Shuo Wu <[email protected]>
1 parent 1898328 commit 36d31a7

File tree

1 file changed

+73
-38
lines changed

1 file changed

+73
-38
lines changed

pkg/util/util.go

Lines changed: 73 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,13 @@ func RandStringRunes(n int) string {
310310
}
311311

312312
func Bench(benchType string, thread int, size int64, writeAt, readAt func([]byte, int64) (int, error)) (output string, err error) {
313-
lock := sync.Mutex{}
313+
benchTypeInList := strings.Split(benchType, "-")
314+
if len(benchTypeInList) != 3 ||
315+
(benchTypeInList[0] != "seq" && benchTypeInList[0] != "rand") ||
316+
(benchTypeInList[1] != "iops" && benchTypeInList[1] != "bandwidth" && benchTypeInList[1] != "latency") ||
317+
(benchTypeInList[2] != "read" && benchTypeInList[2] != "write") {
318+
return "", fmt.Errorf("invalid bench type %s", benchType)
319+
}
314320

315321
if thread != 1 && strings.Contains(benchType, "-latency-") {
316322
logrus.Warnf("Using single thread for latency related benchmark")
@@ -322,41 +328,92 @@ func Bench(benchType string, thread int, size int64, writeAt, readAt func([]byte
322328
blockSize = 1 << 20 // 1MB
323329
}
324330

325-
blockBytes := []byte(RandStringRunes(blockSize))
331+
var duration time.Duration
332+
333+
// Prepare data before read
334+
if benchTypeInList[2] == "read" {
335+
// Typically 4-thread write is enough
336+
if _, err := dataIOWithMultipleThread(false, 4, 1<<20, size, writeAt); err != nil {
337+
return "", err
338+
}
339+
340+
if duration, err = dataIOWithMultipleThread(benchTypeInList[0] == "rand", thread, blockSize, size, readAt); err != nil {
341+
return "", err
342+
}
343+
}
344+
345+
if benchTypeInList[2] == "write" {
346+
if duration, err = dataIOWithMultipleThread(benchTypeInList[0] == "rand", thread, blockSize, size, writeAt); err != nil {
347+
return "", err
348+
}
349+
}
350+
351+
switch benchTypeInList[1] {
352+
case "iops":
353+
res := int(float64(size) / float64(blockSize) / float64(duration) * 1000000000)
354+
output = fmt.Sprintf("instance %s %v/s, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread)
355+
case "bandwidth":
356+
res := int(float64(size) / float64(duration) * 1000000000 / float64(1<<10))
357+
output = fmt.Sprintf("instance %s %vKB/s, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread)
358+
case "latency":
359+
res := float64(duration) / 1000 / (float64(size) / float64(blockSize))
360+
output = fmt.Sprintf("instance %s %.2fus, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread)
361+
}
362+
return output, nil
363+
}
364+
365+
func dataIOWithMultipleThread(isRandomIO bool, thread, blockSize int, size int64, ioAt func([]byte, int64) (int, error)) (duration time.Duration, err error) {
366+
lock := sync.Mutex{}
367+
326368
chunkSize := int(math.Ceil(float64(size) / float64(thread)))
327369
chunkBlocks := int(math.Ceil(float64(chunkSize) / float64(blockSize)))
370+
var sequenceList []int
371+
if isRandomIO {
372+
sequenceList = make([]int, chunkBlocks)
373+
for i := 0; i < chunkBlocks; i++ {
374+
sequenceList[i] = i
375+
}
376+
rand.Shuffle(chunkBlocks, func(i, j int) { sequenceList[i], sequenceList[j] = sequenceList[j], sequenceList[i] })
377+
}
378+
379+
if chunkSize < blockSize {
380+
return 0, fmt.Errorf("the io thread count is too much so that each thread cannot operate a single block")
381+
}
328382

329383
wg := sync.WaitGroup{}
330384
wg.Add(thread)
385+
331386
startTime := time.Now()
387+
defer func() {
388+
duration = time.Since(startTime)
389+
}()
390+
332391
for i := 0; i < thread; i++ {
333392
idx := i
334393
go func() {
335394
defer wg.Done()
336395

396+
// Ignore this randomly generate data if the ioAt is readAt
397+
blockBytes := []byte(RandStringRunes(blockSize))
398+
337399
start := int64(idx) * int64(chunkSize)
338400
end := int64(idx+1) * int64(chunkSize)
339401
offset := start
340402
for cnt := 0; cnt < chunkBlocks; cnt++ {
341-
if strings.HasPrefix(benchType, "seq-") {
342-
offset = start + int64(cnt*blockSize)
343-
if offset+int64(blockSize) > end {
344-
blockBytes = blockBytes[:end-offset]
345-
}
346-
} else if strings.HasPrefix(benchType, "rand-") {
347-
offset = start + int64(rand.Intn(cnt)*blockSize)
403+
if isRandomIO {
404+
offset = start + int64(sequenceList[cnt]*blockSize)
348405
if offset+int64(blockSize) > end {
349406
offset -= int64(blockSize)
350407
}
351408
} else {
352-
lock.Lock()
353-
err = fmt.Errorf("invalid bench type %s", benchType)
354-
lock.Unlock()
355-
return
409+
offset = start + int64(cnt*blockSize)
410+
if offset+int64(blockSize) > end {
411+
blockBytes = blockBytes[:end-offset]
412+
}
356413
}
357-
if _, writeErr := writeAt(blockBytes, offset); writeErr != nil {
414+
if _, ioErr := ioAt(blockBytes, offset); ioErr != nil {
358415
lock.Lock()
359-
err = writeErr
416+
err = ioErr
360417
lock.Unlock()
361418
return
362419
}
@@ -365,27 +422,5 @@ func Bench(benchType string, thread int, size int64, writeAt, readAt func([]byte
365422
}
366423
wg.Wait()
367424

368-
if err != nil {
369-
return "", err
370-
}
371-
372-
duration := time.Since(startTime)
373-
switch benchType {
374-
case "seq-iops-write":
375-
fallthrough
376-
case "rand-iops-write":
377-
res := int(float64(size) / float64(blockSize) / float64(duration) * 1000000000)
378-
output = fmt.Sprintf("instance %s %v/s, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread)
379-
case "seq-bandwidth-write":
380-
fallthrough
381-
case "rand-bandwidth-write":
382-
res := int(float64(size) / float64(duration) * 1000000000 / float64(1<<10))
383-
output = fmt.Sprintf("instance %s %vKB/s, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread)
384-
case "seq-latency-write":
385-
fallthrough
386-
case "rand-latency-write":
387-
res := float64(duration) / 1000 / (float64(size) / float64(blockSize))
388-
output = fmt.Sprintf("instance %s %.2fus, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread)
389-
}
390-
return output, nil
425+
return
391426
}

0 commit comments

Comments
 (0)