Skip to content

Commit 6707a3a

Browse files
committed
perf: reduce contention caused by the usage of runtime.GOMAXPROCS
1 parent 90e01ab commit 6707a3a

File tree

5 files changed

+23
-17
lines changed

5 files changed

+23
-17
lines changed

internal/util/parallel.go

+9-10
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,40 @@
11
package util
22

33
import (
4-
"runtime"
54
"sync"
65
)
76

8-
func ParallelKeys[K comparable, V any](p map[K]V, fn func(k K)) {
7+
func ParallelKeys[K comparable, V any](maxp int, p map[K]V, fn func(k K)) {
98
ch := make(chan K, len(p))
109
for k := range p {
1110
ch <- k
1211
}
13-
closeThenParallel(ch, fn)
12+
closeThenParallel(maxp, ch, fn)
1413
}
1514

16-
func ParallelVals[K comparable, V any](p map[K]V, fn func(k V)) {
15+
func ParallelVals[K comparable, V any](maxp int, p map[K]V, fn func(k V)) {
1716
ch := make(chan V, len(p))
1817
for _, v := range p {
1918
ch <- v
2019
}
21-
closeThenParallel(ch, fn)
20+
closeThenParallel(maxp, ch, fn)
2221
}
2322

24-
func closeThenParallel[V any](ch chan V, fn func(k V)) {
23+
func closeThenParallel[V any](maxp int, ch chan V, fn func(k V)) {
2524
close(ch)
2625
concurrency := len(ch)
27-
if cpus := runtime.GOMAXPROCS(0); concurrency > cpus {
28-
concurrency = cpus
26+
if concurrency > maxp {
27+
concurrency = maxp
2928
}
3029
wg := sync.WaitGroup{}
3130
wg.Add(concurrency)
3231
for i := 1; i < concurrency; i++ {
33-
go func() {
32+
go func(wg *sync.WaitGroup) {
3433
for v := range ch {
3534
fn(v)
3635
}
3736
wg.Done()
38-
}()
37+
}(&wg)
3938
}
4039
for v := range ch {
4140
fn(v)

internal/util/parallel_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
func TestParallelKeys(t *testing.T) {
1010
var sum int64
1111
data, sk, _ := gen(int64(runtime.NumCPU() * 1000))
12-
ParallelKeys(data, func(i int64) {
12+
ParallelKeys(runtime.GOMAXPROCS(0), data, func(i int64) {
1313
atomic.AddInt64(&sum, i)
1414
})
1515
if atomic.LoadInt64(&sum) != sk {
@@ -20,7 +20,7 @@ func TestParallelKeys(t *testing.T) {
2020
func TestParallelVals(t *testing.T) {
2121
var sum int64
2222
data, _, sv := gen(int64(runtime.NumCPU() * 1000))
23-
ParallelVals(data, func(i int64) {
23+
ParallelVals(runtime.GOMAXPROCS(0), data, func(i int64) {
2424
atomic.AddInt64(&sum, i)
2525
})
2626
if atomic.LoadInt64(&sum) != sv {

lua.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/sha1"
66
"encoding/hex"
7+
"runtime"
78
"sync/atomic"
89

910
"github.com/redis/rueidis/internal/util"
@@ -12,7 +13,7 @@ import (
1213
// NewLuaScript creates a Lua instance whose Lua.Exec uses EVALSHA and EVAL.
1314
func NewLuaScript(script string) *Lua {
1415
sum := sha1.Sum([]byte(script))
15-
return &Lua{script: script, sha1: hex.EncodeToString(sum[:])}
16+
return &Lua{script: script, sha1: hex.EncodeToString(sum[:]), maxp: runtime.GOMAXPROCS(0)}
1617
}
1718

1819
// NewLuaScriptReadOnly creates a Lua instance whose Lua.Exec uses EVALSHA_RO and EVAL_RO.
@@ -26,6 +27,7 @@ func NewLuaScriptReadOnly(script string) *Lua {
2627
type Lua struct {
2728
script string
2829
sha1 string
30+
maxp int
2931
readonly bool
3032
}
3133

@@ -59,7 +61,7 @@ type LuaExec struct {
5961
// Cross slot keys within single LuaExec are prohibited if the Client is a cluster client.
6062
func (s *Lua) ExecMulti(ctx context.Context, c Client, multi ...LuaExec) (resp []RedisResult) {
6163
var e atomic.Value
62-
util.ParallelVals(c.Nodes(), func(n Client) {
64+
util.ParallelVals(s.maxp, c.Nodes(), func(n Client) {
6365
if err := n.Do(ctx, n.B().ScriptLoad().Script(s.script).Build()).Error(); err != nil {
6466
e.CompareAndSwap(nil, &errs{error: err})
6567
}

mux.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package rueidis
33
import (
44
"context"
55
"net"
6+
"runtime"
67
"sync"
78
"sync/atomic"
89
"time"
@@ -48,6 +49,7 @@ type mux struct {
4849
wire []atomic.Value
4950
sc []*singleconnect
5051
mu []sync.Mutex
52+
maxp int
5153
}
5254

5355
func makeMux(dst string, option *ClientOption, dialFn dialFn) *mux {
@@ -75,6 +77,7 @@ func newMux(dst string, option *ClientOption, init, dead wire, wireFn wireFn) *m
7577
wire: make([]atomic.Value, multiplex),
7678
mu: make([]sync.Mutex, multiplex),
7779
sc: make([]*singleconnect, multiplex),
80+
maxp: runtime.GOMAXPROCS(0),
7881
}
7982
for i := 0; i < len(m.wire); i++ {
8083
m.wire[i].Store(init)
@@ -260,7 +263,7 @@ func (m *mux) DoMultiCache(ctx context.Context, multi ...CacheableTTL) (results
260263
}
261264

262265
results = make([]RedisResult, len(multi))
263-
util.ParallelKeys(commands, func(slot uint16) {
266+
util.ParallelKeys(m.maxp, commands, func(slot uint16) {
264267
for i, r := range m.doMultiCache(ctx, slot, commands[slot]) {
265268
results[cIndexes[slot][i]] = r
266269
}

rueidiscompat/adapter.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"context"
3131
"encoding"
3232
"fmt"
33+
"runtime"
3334
"strconv"
3435
"strings"
3536
"sync"
@@ -389,6 +390,7 @@ type Cmdable interface {
389390

390391
type Compat struct {
391392
client rueidis.Client
393+
maxp int
392394
}
393395

394396
type CacheCompat struct {
@@ -397,7 +399,7 @@ type CacheCompat struct {
397399
}
398400

399401
func NewAdapter(client rueidis.Client) Cmdable {
400-
return &Compat{client: client}
402+
return &Compat{client: client, maxp: runtime.GOMAXPROCS(0)}
401403
}
402404

403405
func (c *Compat) Cache(ttl time.Duration) CacheCompat {
@@ -2799,7 +2801,7 @@ func (c *Compat) ACLDryRun(ctx context.Context, username string, command ...any)
27992801

28002802
func (c *Compat) doPrimaries(ctx context.Context, fn func(c rueidis.Client) error) error {
28012803
var firsterr atomic.Value
2802-
util.ParallelVals(c.client.Nodes(), func(client rueidis.Client) {
2804+
util.ParallelVals(c.maxp, c.client.Nodes(), func(client rueidis.Client) {
28032805
msgs, err := client.Do(ctx, client.B().Role().Build()).ToArray()
28042806
if err == nil {
28052807
if role, _ := msgs[0].ToString(); role == "master" {

0 commit comments

Comments
 (0)