Skip to content

Commit

Permalink
Go: fix channel passing from Go->Rust->Go by runtime.Pinner
Browse files Browse the repository at this point in the history
Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Feb 19, 2025
1 parent 65137d3 commit 038570d
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Go: Add `XINFO CONSUMERS` ([#3120](https://github.com/valkey-io/valkey-glide/pull/3120))
* Go: Add `XINFO GROUPS` ([#3106](https://github.com/valkey-io/valkey-glide/pull/3106))
* Go: Add `ZInterCard` ([#3078](https://github.com/valkey-io/valkey-glide/issues/3078))
* Go: Fix channel passing from Go to Rust by using `runtime.Pinner` or `cgo.Handle` ([#3208](https://github.com/valkey-io/valkey-glide/pull/3208))

#### Breaking Changes

Expand Down
3 changes: 3 additions & 0 deletions go/api/pinner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"unsafe"
)

// pinner is a wrapper of a runtime.Pinner making the interface
// compatible to the cgo.Handle in the Go < 1.21.
// Note that this make a pinner can only hold one unsafe.Pointer.
type pinner struct {
r runtime.Pinner
}
Expand Down
3 changes: 3 additions & 0 deletions go/api/pinner_old.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"unsafe"
)

// pinner is a wrapper of a cgo.Handle making the interface
// compatible to the runtime.Pinner in the Go >= 1.21.
// Note that a pinner can only hold one unsafe.Pointer.
type pinner struct {
h cgo.Handle
}
Expand Down
18 changes: 18 additions & 0 deletions go/api/pinner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package api

import (
"testing"
"unsafe"
)

func TestPinner(t *testing.T) {
v := make(chan payload)

p := pinner{}
n := p.Pin(unsafe.Pointer(&v))
defer p.Unpin()

if *(*chan payload)(getPinnedPtr(n)) != v {
t.Fail()
}
}
25 changes: 25 additions & 0 deletions go/integTest/glide_test_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"os/exec"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -227,6 +229,11 @@ func (suite *GlideTestSuite) runWithDefaultClients(test func(client api.BaseClie
suite.runWithClients(clients, test)
}

func (suite *GlideTestSuite) runParallelizedWithDefaultClients(parallelism int, count int64, test func(client api.BaseClient)) {
clients := suite.getDefaultClients()
suite.runParallelizedWithClients(clients, parallelism, count, test)
}

func (suite *GlideTestSuite) getDefaultClients() []api.BaseClient {
return []api.BaseClient{suite.defaultClient(), suite.defaultClusterClient()}
}
Expand Down Expand Up @@ -275,6 +282,24 @@ func (suite *GlideTestSuite) runWithClients(clients []api.BaseClient, test func(
}
}

func (suite *GlideTestSuite) runParallelizedWithClients(clients []api.BaseClient, parallelism int, count int64, test func(client api.BaseClient)) {
for _, client := range clients {
suite.T().Run(fmt.Sprintf("%T", client)[5:], func(t *testing.T) {
var wg sync.WaitGroup
wg.Add(parallelism)
for i := 0; i < parallelism; i++ {
go func() {
defer wg.Done()
for !suite.T().Failed() && atomic.AddInt64(&count, -1) > 0 {
test(client)
}
}()
}
wg.Wait()
})
}
}

func (suite *GlideTestSuite) verifyOK(result string, err error) {
assert.Nil(suite.T(), err)
assert.Equal(suite.T(), api.OK, result)
Expand Down
18 changes: 18 additions & 0 deletions go/integTest/parallelized_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package integTest

import (
"runtime"

"github.com/google/uuid"
"github.com/valkey-io/valkey-glide/go/api"
)

func (suite *GlideTestSuite) TestParallelizedSetWithGC() {
// The insane 640 parallelism is required to reproduce https://github.com/valkey-io/valkey-glide/issues/3207.
suite.runParallelizedWithDefaultClients(640, 640000, func(client api.BaseClient) {
runtime.GC()
key := uuid.New().String()
value := uuid.New().String()
suite.verifyOK(client.Set(key, value))
})
}

0 comments on commit 038570d

Please sign in to comment.