Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
test: build
go test ./... -count=1 --race --timeout=5s
go test ./... -count=1 --race --timeout=20s

proto:
protoc --go_out=. --go-vtproto_out=. --go_opt=paths=source_relative --proto_path=. actor/actor.proto
Expand Down
18 changes: 11 additions & 7 deletions actor/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type Context struct {
engine *Engine
receiver Receiver
message any
// function to get the current number of messages in the inbox
getInboxCount func() int
// the context of the parent if we are a child.
// we need this parentCtx, so we can remove the child from the parent Context
// when the child dies.
Expand All @@ -31,6 +33,9 @@ func newContext(ctx context.Context, e *Engine, pid *PID) *Context {
engine: e,
pid: pid,
children: safemap.New[string, *PID](),
getInboxCount: func() int {
return -1
},
}
}

Expand Down Expand Up @@ -145,13 +150,7 @@ func (c *Context) Child(id string) *PID {

// Children returns all child PIDs for the current process.
func (c *Context) Children() []*PID {
pids := make([]*PID, c.children.Len())
i := 0
c.children.ForEach(func(_ string, child *PID) {
pids[i] = child
i++
})
return pids
return c.children.Values()
}

// PID returns the PID of the process that belongs to the context.
Expand All @@ -174,3 +173,8 @@ func (c *Context) Engine() *Engine {
func (c *Context) Message() any {
return c.message
}

// GetInboxCount returns the number of messages in the inbox of the current process.
func (c *Context) GetInboxCount() int {
return c.getInboxCount()
}
15 changes: 12 additions & 3 deletions actor/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,23 @@ type Inboxer interface {
Send(Envelope)
Start(Processer)
Stop() error
Count() int
}

type Inbox struct {
rb *ringbuffer.RingBuffer[Envelope]
proc Processer
scheduler Scheduler
procStatus int32
batch []Envelope
}

func NewInbox(size int) *Inbox {
return &Inbox{
rb: ringbuffer.New[Envelope](int64(size)),
scheduler: NewScheduler(defaultThroughput),
procStatus: stopped,
batch: make([]Envelope, 0, messageBatchSize),
}
}

Expand Down Expand Up @@ -88,11 +91,13 @@ func (in *Inbox) run() {
}
i++

if msgs, ok := in.rb.PopN(messageBatchSize); ok && len(msgs) > 0 {
in.proc.Invoke(msgs)
} else {
msgs, ok := in.rb.PopNInto(in.batch, messageBatchSize)
if !ok || len(msgs) == 0 {
return
}

in.batch = msgs[:0]
in.proc.Invoke(msgs)
}
}

Expand All @@ -109,3 +114,7 @@ func (in *Inbox) Stop() error {
atomic.StoreInt32(&in.procStatus, stopped)
return nil
}

func (in *Inbox) Count() int {
return int(in.rb.Len())
}
2 changes: 1 addition & 1 deletion actor/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestInboxSendAndProcess(t *testing.T) {
inbox.Send(msg)
select {
case <-processedMessages: // Message processed
case <-time.After(time.Millisecond):
case <-time.After(time.Second):
t.Errorf("Message was not processed in time")
}

Expand Down
22 changes: 16 additions & 6 deletions actor/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log/slog"
"runtime/debug"
"sync/atomic"
"time"

"github.com/DataDog/gostackparse"
Expand Down Expand Up @@ -33,6 +34,7 @@ type process struct {
pid *PID
restarts int32
mbuffer []Envelope
mcount int32
}

func newProcess(e *Engine, opts Opts) *process {
Expand All @@ -45,6 +47,7 @@ func newProcess(e *Engine, opts Opts) *process {
context: ctx,
mbuffer: nil,
}
ctx.getInboxCount = p.Count
return p
}

Expand All @@ -66,23 +69,23 @@ func (p *process) Invoke(msgs []Envelope) {
// for bookkeeping.
processed = 0
)
atomic.StoreInt32(&p.mcount, int32(nmsg))
defer func() {
// If we recovered, we buffer up all the messages that we could not process
// so we can retry them on the next restart.
if v := recover(); v != nil {
p.context.message = Stopped{}
p.context.receiver.Receive(p.context)

p.mbuffer = make([]Envelope, nmsg-nproc)
for i := 0; i < nmsg-nproc; i++ {
p.mbuffer[i] = msgs[i+nproc]
}
atomic.StoreInt32(&p.mcount, int32(nmsg-nproc))
p.tryRestart(v)
}
}()

for i := 0; i < len(msgs); i++ {
nproc++
atomic.AddInt32(&p.mcount, -1)
msg := msgs[i]
if pill, ok := msg.Msg.(poisonPill); ok {
// If we need to gracefuly stop, we process all the messages
Expand Down Expand Up @@ -121,8 +124,6 @@ func (p *process) Start() {
p.context.receiver = recv
defer func() {
if v := recover(); v != nil {
p.context.message = Stopped{}
p.context.receiver.Receive(p.context)
p.tryRestart(v)
}
}()
Expand Down Expand Up @@ -166,6 +167,9 @@ func (p *process) tryRestart(v any) {
return
}

p.context.message = Stopped{}
p.context.receiver.Receive(p.context)

p.restarts++
// Restart the process after its restartDelay
p.context.engine.BroadcastEvent(ActorRestartedEvent{
Expand All @@ -180,7 +184,9 @@ func (p *process) tryRestart(v any) {
}

func (p *process) cleanup(cancel context.CancelFunc) {
defer cancel()
if cancel != nil {
defer cancel()
}

if p.context.parentCtx != nil {
p.context.parentCtx.children.Delete(p.pid.ID)
Expand Down Expand Up @@ -209,6 +215,10 @@ func (p *process) Shutdown() {
p.cleanup(nil)
}

func (p *process) Count() int {
return p.inbox.Count() + int(atomic.LoadInt32(&p.mcount))
}

func cleanTrace(stack []byte) []byte {
goros, err := gostackparse.Parse(bytes.NewReader(stack))
if err != nil {
Expand Down
Loading
Loading