diff --git a/Makefile b/Makefile index 379c09c2..d23afee9 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ test: build - go test ./... -count=1 --race --timeout=5s + go test ./... -count=1 --race --timeout=20s proto: protoc --go_out=. --go-vtproto_out=. --go_opt=paths=source_relative --proto_path=. actor/actor.proto diff --git a/actor/context.go b/actor/context.go index 4f6a760f..1c05c7da 100644 --- a/actor/context.go +++ b/actor/context.go @@ -17,6 +17,8 @@ type Context struct { engine *Engine receiver Receiver message any + // function to get the current number of messages in the inbox + getInboxCount func() int // the context of the parent if we are a child. // we need this parentCtx, so we can remove the child from the parent Context // when the child dies. @@ -31,6 +33,9 @@ func newContext(ctx context.Context, e *Engine, pid *PID) *Context { engine: e, pid: pid, children: safemap.New[string, *PID](), + getInboxCount: func() int { + return -1 + }, } } @@ -145,13 +150,7 @@ func (c *Context) Child(id string) *PID { // Children returns all child PIDs for the current process. func (c *Context) Children() []*PID { - pids := make([]*PID, c.children.Len()) - i := 0 - c.children.ForEach(func(_ string, child *PID) { - pids[i] = child - i++ - }) - return pids + return c.children.Values() } // PID returns the PID of the process that belongs to the context. @@ -174,3 +173,8 @@ func (c *Context) Engine() *Engine { func (c *Context) Message() any { return c.message } + +// GetInboxCount returns the number of messages in the inbox of the current process. +func (c *Context) GetInboxCount() int { + return c.getInboxCount() +} diff --git a/actor/inbox.go b/actor/inbox.go index e47c1134..b8f30d98 100644 --- a/actor/inbox.go +++ b/actor/inbox.go @@ -42,6 +42,7 @@ type Inboxer interface { Send(Envelope) Start(Processer) Stop() error + Count() int } type Inbox struct { @@ -49,6 +50,7 @@ type Inbox struct { proc Processer scheduler Scheduler procStatus int32 + batch []Envelope } func NewInbox(size int) *Inbox { @@ -56,6 +58,7 @@ func NewInbox(size int) *Inbox { rb: ringbuffer.New[Envelope](int64(size)), scheduler: NewScheduler(defaultThroughput), procStatus: stopped, + batch: make([]Envelope, 0, messageBatchSize), } } @@ -88,11 +91,13 @@ func (in *Inbox) run() { } i++ - if msgs, ok := in.rb.PopN(messageBatchSize); ok && len(msgs) > 0 { - in.proc.Invoke(msgs) - } else { + msgs, ok := in.rb.PopNInto(in.batch, messageBatchSize) + if !ok || len(msgs) == 0 { return } + + in.batch = msgs[:0] + in.proc.Invoke(msgs) } } @@ -109,3 +114,7 @@ func (in *Inbox) Stop() error { atomic.StoreInt32(&in.procStatus, stopped) return nil } + +func (in *Inbox) Count() int { + return int(in.rb.Len()) +} diff --git a/actor/inbox_test.go b/actor/inbox_test.go index 38e7dd8c..0b980ebb 100644 --- a/actor/inbox_test.go +++ b/actor/inbox_test.go @@ -23,7 +23,7 @@ func TestInboxSendAndProcess(t *testing.T) { inbox.Send(msg) select { case <-processedMessages: // Message processed - case <-time.After(time.Millisecond): + case <-time.After(time.Second): t.Errorf("Message was not processed in time") } diff --git a/actor/process.go b/actor/process.go index 41ee1921..6bff244d 100644 --- a/actor/process.go +++ b/actor/process.go @@ -6,6 +6,7 @@ import ( "fmt" "log/slog" "runtime/debug" + "sync/atomic" "time" "github.com/DataDog/gostackparse" @@ -33,6 +34,7 @@ type process struct { pid *PID restarts int32 mbuffer []Envelope + mcount int32 } func newProcess(e *Engine, opts Opts) *process { @@ -45,6 +47,7 @@ func newProcess(e *Engine, opts Opts) *process { context: ctx, mbuffer: nil, } + ctx.getInboxCount = p.Count return p } @@ -66,23 +69,23 @@ func (p *process) Invoke(msgs []Envelope) { // for bookkeeping. processed = 0 ) + atomic.StoreInt32(&p.mcount, int32(nmsg)) defer func() { // If we recovered, we buffer up all the messages that we could not process // so we can retry them on the next restart. if v := recover(); v != nil { - p.context.message = Stopped{} - p.context.receiver.Receive(p.context) - p.mbuffer = make([]Envelope, nmsg-nproc) for i := 0; i < nmsg-nproc; i++ { p.mbuffer[i] = msgs[i+nproc] } + atomic.StoreInt32(&p.mcount, int32(nmsg-nproc)) p.tryRestart(v) } }() for i := 0; i < len(msgs); i++ { nproc++ + atomic.AddInt32(&p.mcount, -1) msg := msgs[i] if pill, ok := msg.Msg.(poisonPill); ok { // If we need to gracefuly stop, we process all the messages @@ -121,8 +124,6 @@ func (p *process) Start() { p.context.receiver = recv defer func() { if v := recover(); v != nil { - p.context.message = Stopped{} - p.context.receiver.Receive(p.context) p.tryRestart(v) } }() @@ -166,6 +167,9 @@ func (p *process) tryRestart(v any) { return } + p.context.message = Stopped{} + p.context.receiver.Receive(p.context) + p.restarts++ // Restart the process after its restartDelay p.context.engine.BroadcastEvent(ActorRestartedEvent{ @@ -180,7 +184,9 @@ func (p *process) tryRestart(v any) { } func (p *process) cleanup(cancel context.CancelFunc) { - defer cancel() + if cancel != nil { + defer cancel() + } if p.context.parentCtx != nil { p.context.parentCtx.children.Delete(p.pid.ID) @@ -209,6 +215,10 @@ func (p *process) Shutdown() { p.cleanup(nil) } +func (p *process) Count() int { + return p.inbox.Count() + int(atomic.LoadInt32(&p.mcount)) +} + func cleanTrace(stack []byte) []byte { goros, err := gostackparse.Parse(bytes.NewReader(stack)) if err != nil { diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index 20a8c06d..64ea576c 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -32,21 +32,25 @@ func NewInventory() actor.Receiver { func (i Inventory) Receive(c *actor.Context) {} func TestClusterSelectMemberFunc(t *testing.T) { - c1, err := New(NewConfig().WithID("A")) + c1Addr := getRandomLocalhostAddr() + c1Config := NewConfig().WithID("A").WithListenAddr(c1Addr) + c1, err := New(c1Config) require.Nil(t, err) - c2, err := New(NewConfig().WithID("B")) + + selfManagedConfigB := NewSelfManagedConfig().WithBootstrapMember(MemberAddr{ListenAddr: c1Addr, ID: "A"}) + c2Config := NewConfig().WithID("B").WithProvider(NewSelfManagedProvider(selfManagedConfigB)) + c2, err := New(c2Config) require.Nil(t, err) - c3, err := New(NewConfig().WithID("C")) + + selfManagedConfigC := NewSelfManagedConfig().WithBootstrapMember(MemberAddr{ListenAddr: c1Addr, ID: "A"}) + c3Config := NewConfig().WithID("C").WithProvider(NewSelfManagedProvider(selfManagedConfigC)) + c3, err := New(c3Config) require.Nil(t, err) c1.RegisterKind("player", NewPlayer, NewKindConfig()) c2.RegisterKind("player", NewPlayer, NewKindConfig()) c3.RegisterKind("player", NewPlayer, NewKindConfig()) - c1.Start() - c2.Start() - c3.Start() - selectMember := func(details ActivationDetails) *Member { for _, member := range details.Members { if member.ID == "C" { @@ -56,7 +60,10 @@ func TestClusterSelectMemberFunc(t *testing.T) { return nil } - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + // Subscribe BEFORE starting clusters to catch all MemberJoinEvents eventPID := c1.Engine().SpawnFunc(func(c *actor.Context) { switch msg := c.Message().(type) { case ActivationEvent: @@ -69,15 +76,19 @@ func TestClusterSelectMemberFunc(t *testing.T) { // Activate the actor from member A // Which should spawn the actor on member C config := NewActivationConfig().WithSelectMemberFunc(selectMember) - c1.Activate("cancel_receiver", config) + c1.Activate("player", config) } } }, "event") c1.Engine().Subscribe(eventPID) defer c1.Engine().Unsubscribe(eventPID) + c1.Start() + c2.Start() + c3.Start() + <-ctx.Done() - require.Equal(t, context.DeadlineExceeded, ctx.Err()) + require.Equal(t, context.Canceled, ctx.Err()) c1.Stop() c2.Stop() c3.Stop() @@ -103,7 +114,7 @@ func TestClusterSpawn(t *testing.T) { var ( c1Addr = getRandomLocalhostAddr() c1 = makeCluster(t, c1Addr, "A", "eu-west") - c2 = makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west") + c2 = makeClusterWithBootstrap(t, getRandomLocalhostAddr(), "B", "eu-west", MemberAddr{ListenAddr: c1Addr, ID: "A"}) wg = sync.WaitGroup{} expectedPID = actor.NewPID(c1Addr, "player/1") ) @@ -140,8 +151,9 @@ func TestClusterSpawn(t *testing.T) { } func TestMemberJoin(t *testing.T) { - c1 := makeCluster(t, getRandomLocalhostAddr(), "A", "eu-west") - c2 := makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west") + c1Addr := getRandomLocalhostAddr() + c1 := makeCluster(t, c1Addr, "A", "eu-west") + c2 := makeClusterWithBootstrap(t, getRandomLocalhostAddr(), "B", "eu-west", MemberAddr{ListenAddr: c1Addr, ID: "A"}) c2.RegisterKind("player", NewPlayer, NewKindConfig()) wg := sync.WaitGroup{} @@ -171,9 +183,9 @@ func TestMemberJoin(t *testing.T) { func TestActivate(t *testing.T) { var ( - addr = getRandomLocalhostAddr() - c1 = makeCluster(t, addr, "A", "eu-west") - c2 = makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west") + c1Addr = getRandomLocalhostAddr() + c1 = makeCluster(t, c1Addr, "A", "eu-west") + c2 = makeClusterWithBootstrap(t, getRandomLocalhostAddr(), "B", "eu-west", MemberAddr{ListenAddr: c1Addr, ID: "A"}) ) c2.RegisterKind("player", NewPlayer, NewKindConfig()) @@ -208,9 +220,9 @@ func TestActivate(t *testing.T) { } func TestDeactivate(t *testing.T) { - addr := getRandomLocalhostAddr() - c1 := makeCluster(t, addr, "A", "eu-west") - c2 := makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west") + c1Addr := getRandomLocalhostAddr() + c1 := makeCluster(t, c1Addr, "A", "eu-west") + c2 := makeClusterWithBootstrap(t, getRandomLocalhostAddr(), "B", "eu-west", MemberAddr{ListenAddr: c1Addr, ID: "A"}) c2.RegisterKind("player", NewPlayer, NewKindConfig()) expectedPID := actor.NewPID(c2.engine.Address(), "player/1") @@ -252,10 +264,12 @@ func TestMemberLeave(t *testing.T) { if err != nil { log.Fatal(err) } + selfManagedConfig := NewSelfManagedConfig().WithBootstrapMember(MemberAddr{ListenAddr: c1Addr, ID: "A"}) config := NewConfig(). WithID("B"). WithRegion("eu-east"). - WithEngine(e) + WithEngine(e). + WithProvider(NewSelfManagedProvider(selfManagedConfig)) c2, err := New(config) assert.Nil(t, err) @@ -319,19 +333,21 @@ func TestMembersExcept(t *testing.T) { func TestGetActiveByID(t *testing.T) { c1Addr := getRandomLocalhostAddr() - c2Addr := getRandomLocalhostAddr() c1 := makeCluster(t, c1Addr, "A", "eu") c1.RegisterKind("player", NewPlayer, NewKindConfig()) c1.Start() - c2 := makeCluster(t, c2Addr, "B", "eu") + c2 := makeClusterWithBootstrap(t, getRandomLocalhostAddr(), "B", "eu", MemberAddr{ListenAddr: c1Addr, ID: "A"}) c2.RegisterKind("player", NewPlayer, NewKindConfig()) c2.Start() + // Wait for cluster formation to complete + time.Sleep(time.Millisecond * 50) + pid1 := c1.Activate("player", NewActivationConfig().WithID("1")) pid2 := c2.Activate("player", NewActivationConfig().WithID("2")) - time.Sleep(time.Millisecond * 10) + time.Sleep(time.Millisecond * 200) pid := c1.GetActiveByID("player/1") assert.NotNil(t, pid) @@ -350,21 +366,23 @@ func TestGetActiveByID(t *testing.T) { func TestGetActiveByKind(t *testing.T) { c1Addr := getRandomLocalhostAddr() - c2Addr := getRandomLocalhostAddr() c1 := makeCluster(t, c1Addr, "A", "eu") c1.RegisterKind("player", NewPlayer, NewKindConfig()) c1.Start() - c2 := makeCluster(t, c2Addr, "B", "eu") + c2 := makeClusterWithBootstrap(t, getRandomLocalhostAddr(), "B", "eu", MemberAddr{ListenAddr: c1Addr, ID: "A"}) c2.RegisterKind("player", NewPlayer, NewKindConfig()) c2.Start() + // Wait for cluster formation to complete + time.Sleep(time.Millisecond * 50) + pid1 := c1.Activate("player", NewActivationConfig().WithID("1")) pid2 := c2.Activate("player", NewActivationConfig().WithID("2")) c1.Activate("foo", NewActivationConfig().WithID("2")) c1.Activate("bar", NewActivationConfig().WithID("2")) - time.Sleep(time.Millisecond * 10) + time.Sleep(time.Millisecond * 200) pids := c1.GetActiveByKind("player") assert.Len(t, pids, 2) @@ -380,16 +398,18 @@ func TestGetActiveByKind(t *testing.T) { func TestCannotDuplicateActor(t *testing.T) { c1Addr := getRandomLocalhostAddr() - c2Addr := getRandomLocalhostAddr() c1 := makeCluster(t, c1Addr, "A", "eu") c1.RegisterKind("player", NewPlayer, NewKindConfig()) c1.Start() - c2 := makeCluster(t, c2Addr, "B", "eu") + c2 := makeClusterWithBootstrap(t, getRandomLocalhostAddr(), "B", "eu", MemberAddr{ListenAddr: c1Addr, ID: "A"}) c2.RegisterKind("player", NewPlayer, NewKindConfig()) c2.Start() + // Wait for cluster formation to complete + time.Sleep(time.Millisecond * 50) + pid := c1.Activate("player", NewActivationConfig().WithID("1")) time.Sleep(10 * time.Millisecond) // Lets make sure we spawn the actor on "our" node. Why? @@ -420,6 +440,21 @@ func makeCluster(t *testing.T, addr, id, region string) *Cluster { return c } +func makeClusterWithBootstrap(t *testing.T, addr, id, region string, bootstrapMembers ...MemberAddr) *Cluster { + selfManagedConfig := NewSelfManagedConfig() + for _, member := range bootstrapMembers { + selfManagedConfig = selfManagedConfig.WithBootstrapMember(member) + } + config := NewConfig(). + WithID(id). + WithListenAddr(addr). + WithRegion(region). + WithProvider(NewSelfManagedProvider(selfManagedConfig)) + c, err := New(config) + assert.Nil(t, err) + return c +} + func getRandomLocalhostAddr() string { return fmt.Sprintf("127.0.0.1:%d", rand.Intn(50000)+10000) } diff --git a/ringbuffer/ringbuffer.go b/ringbuffer/ringbuffer.go index e3095594..938df117 100644 --- a/ringbuffer/ringbuffer.go +++ b/ringbuffer/ringbuffer.go @@ -96,3 +96,35 @@ func (rb *RingBuffer[T]) PopN(n int64) ([]T, bool) { rb.mu.Unlock() return items, true } + +func (rb *RingBuffer[T]) PopNInto(dst []T, n int64) ([]T, bool) { + rb.mu.Lock() + defer rb.mu.Unlock() + + if rb.len == 0 { + return dst[:0], false + } + + if n > rb.len { + n = rb.len + } + + if int64(cap(dst)) < n { + dst = make([]T, n) + } else { + dst = dst[:n] + } + + content := rb.content + for i := int64(0); i < n; i++ { + pos := (content.head + 1 + i) % content.mod + dst[i] = content.items[pos] + var z T + content.items[pos] = z + } + + content.head = (content.head + n) % content.mod + atomic.AddInt64(&rb.len, -n) + + return dst, true +} diff --git a/ringbuffer/ringbuffer_bench_test.go b/ringbuffer/ringbuffer_bench_test.go new file mode 100644 index 00000000..07a22f11 --- /dev/null +++ b/ringbuffer/ringbuffer_bench_test.go @@ -0,0 +1,131 @@ +package ringbuffer + +import ( + "testing" + "unsafe" +) + +var sinkByte byte + +// Payload sizes +type ( + P16 struct{ B [16]byte } + P256 struct{ B [256]byte } + P1K struct{ B [1024]byte } + P10K struct{ B [10 * 1024]byte } +) + +func fillPayload16(i int) P16 { + var p P16 + p.B[0] = byte(i) + p.B[15] = byte(i >> 8) + return p +} +func fillPayload256(i int) P256 { + var p P256 + p.B[0] = byte(i) + p.B[255] = byte(i >> 8) + return p +} +func fillPayload1K(i int) P1K { + var p P1K + p.B[0] = byte(i) + p.B[1023] = byte(i >> 8) + return p +} +func fillPayload10K(i int) P10K { + var p P10K + p.B[0] = byte(i) + p.B[len(p.B)-1] = byte(i >> 8) + return p +} + +func Benchmark_Compare_PopN_vs_PopNInto_Payload16B(b *testing.B) { + benchPopNvsPopNIntoPayload(b, int64(1<<20), int64(1024*4), fillPayload16) +} + +func Benchmark_Compare_PopN_vs_PopNInto_Payload256B(b *testing.B) { + benchPopNvsPopNIntoPayload(b, int64(1<<20), int64(1024*4), fillPayload256) +} + +func Benchmark_Compare_PopN_vs_PopNInto_Payload1KB(b *testing.B) { + benchPopNvsPopNIntoPayload(b, int64(1<<20), int64(1024*4), fillPayload1K) +} + +func Benchmark_Compare_PopN_vs_PopNInto_Payload10KB(b *testing.B) { + benchPopNvsPopNIntoPayload(b, int64(1<<20), int64(1024*4), fillPayload10K) +} + +// Generic benchmark helper. The "maker" function creates a payload that depends on i +// so the compiler can't constant-fold everything away. +func benchPopNvsPopNIntoPayload[T any](b *testing.B, rbSize, batchSize int64, maker func(int) T) { + b.Run("PopN", func(b *testing.B) { + b.ReportAllocs() + + rb := New[T](rbSize) + for i := int64(0); i < rbSize; i++ { + rb.Push(maker(int(i))) + } + + b.ResetTimer() + + var local byte + for i := 0; i < b.N; i++ { + msgs, ok := rb.PopN(batchSize) + if !ok || len(msgs) == 0 { + b.Fatal("unexpected empty pop") + } + + // Touch data so it can't be optimized away. + // We only read a byte-ish worth to keep benchmark focused on queueing. + for _, v := range msgs { + local ^= firstByte(&v) + } + + for _, v := range msgs { + rb.Push(v) + } + } + sinkByte = local + }) + + b.Run("PopNInto", func(b *testing.B) { + b.ReportAllocs() + + rb := New[T](rbSize) + for i := int64(0); i < rbSize; i++ { + rb.Push(maker(int(i))) + } + + dst := make([]T, 0, batchSize) + + b.ResetTimer() + + var local byte + for i := 0; i < b.N; i++ { + msgs, ok := rb.PopNInto(dst, batchSize) + if !ok || len(msgs) == 0 { + b.Fatal("unexpected empty pop") + } + + for i := range msgs { + local ^= firstByte(&msgs[i]) + } + + for _, v := range msgs { + rb.Push(v) + } + + dst = msgs[:0] + } + sinkByte = local + }) +} + +// firstByte returns a byte dependent on the value without knowing its shape. +// This keeps the benchmark generic and prevents dead-code elimination. +func firstByte[T any](p *T) byte { + // This is safe: it reads the first byte of the value's memory representation. + // We don't interpret it, just use it to keep the compiler honest. + return *(*byte)(unsafe.Pointer(p)) +} diff --git a/ringbuffer/ringbuffer_test.go b/ringbuffer/ringbuffer_test.go index 3e0ea552..076306f9 100644 --- a/ringbuffer/ringbuffer_test.go +++ b/ringbuffer/ringbuffer_test.go @@ -93,3 +93,87 @@ func TestPopThreadSafety(t *testing.T) { } }) } + +func TestPopNInto(t *testing.T) { + rb := New[Item](1024) + + // Push more than initial capacity to exercise growth path too. + n := 5000 + for i := 0; i < n; i++ { + rb.Push(Item{i}) + } + + // Reusable destination buffer (should not be replaced once large enough) + dst := make([]Item, 0, 64) + + // Pop in chunks + outAll := make([]Item, 0, n) + for len(outAll) < n { + want := int64(123) // arbitrary batch size + usedBefore := &dst[0:cap(dst)][0] // pointer into backing array (safe because cap>0) + + msgs, ok := rb.PopNInto(dst, want) + if !ok || len(msgs) == 0 { + t.Fatalf("expected more items, got ok=%v len=%d", ok, len(msgs)) + } + + // Ensure PopNInto reuses the backing array when cap(dst) is sufficient. + // It may grow dst during early iterations until cap >= want; after that it must be stable. + if cap(dst) >= int(want) { + usedAfter := &msgs[0:cap(msgs)][0] + if usedAfter != usedBefore { + t.Fatal("expected PopNInto to reuse dst backing array when capacity is sufficient") + } + } + + outAll = append(outAll, msgs...) + + // Reset dst for reuse (same backing array) + dst = msgs[:0] + } + + // Validate ordering + for i := 0; i < n; i++ { + if outAll[i].i != i { + t.Fatalf("invalid item popped at %d: got %d want %d", i, outAll[i].i, i) + } + } + + // Now buffer is empty + msgs, ok := rb.PopNInto(dst, 1) + if ok || len(msgs) != 0 { + t.Fatalf("expected empty pop: ok=%v len=%d", ok, len(msgs)) + } +} + +func TestPopNIntoThreadSafety(t *testing.T) { + t.Run("PopNInto should be thread-safe", func(t *testing.T) { + testCase := func() { + rb := New[int](1024) + rb.Push(1) + + counter := atomic.Int32{} + wg := sync.WaitGroup{} + for i := 0; i < 2; i++ { + wg.Add(1) + go func() { + defer wg.Done() + // Each goroutine uses its own dst to avoid sharing memory. + dst := make([]int, 0, 1) + _, ok := rb.PopNInto(dst, 1) + if ok { + counter.Add(1) + } + }() + } + wg.Wait() + if counter.Load() > 1 { + t.Fatal("false positive item removal") + } + } + + for i := 0; i < 100_000; i++ { + testCase() + } + }) +} diff --git a/safemap/safemap.go b/safemap/safemap.go index 2f77c69c..c41e92bc 100644 --- a/safemap/safemap.go +++ b/safemap/safemap.go @@ -45,3 +45,23 @@ func (s *SafeMap[K, V]) ForEach(f func(K, V)) { f(k, v) } } + +func (s *SafeMap[K, V]) Keys() []K { + s.mu.RLock() + defer s.mu.RUnlock() + keys := make([]K, 0, len(s.data)) + for k := range s.data { + keys = append(keys, k) + } + return keys +} + +func (s *SafeMap[K, V]) Values() []V { + s.mu.RLock() + defer s.mu.RUnlock() + values := make([]V, 0, len(s.data)) + for _, v := range s.data { + values = append(values, v) + } + return values +}