Skip to content

Commit

Permalink
perf(transport): udp server handle supports coroutine pools
Browse files Browse the repository at this point in the history
  • Loading branch information
lbbniu committed Jan 12, 2024
1 parent f841bf1 commit 0277e02
Showing 1 changed file with 17 additions and 2 deletions.
19 changes: 17 additions & 2 deletions tars/transport/udphandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync/atomic"
"time"

"github.com/TarsCloud/TarsGo/tars/util/gpool"

"github.com/TarsCloud/TarsGo/tars/protocol/res/basef"
"github.com/TarsCloud/TarsGo/tars/util/current"
"github.com/TarsCloud/TarsGo/tars/util/grace"
Expand All @@ -17,6 +19,7 @@ type udpHandler struct {
server *TarsServer

conn *net.UDPConn
pool *gpool.Pool
}

func (u *udpHandler) Listen() (err error) {
Expand All @@ -26,6 +29,11 @@ func (u *udpHandler) Listen() (err error) {
return err
}
TLOG.Info("UDP listen", u.conn.LocalAddr())

// init goroutine pool
if cfg.MaxInvoke > 0 {
u.pool = gpool.NewPool(int(cfg.MaxInvoke), cfg.QueueCap)
}
return nil
}

Expand All @@ -41,7 +49,7 @@ func (u *udpHandler) getConnContext(udpAddr *net.UDPAddr) context.Context {
func (u *udpHandler) handleUDPAddr(udpAddr *net.UDPAddr, pkg []byte) {
ctx := u.getConnContext(udpAddr)
atomic.AddInt32(&u.server.numInvoke, 1)
go func() {
handler := func() {
defer atomic.AddInt32(&u.server.numInvoke, -1)
rsp := u.server.invoke(ctx, pkg) // no need to check package

Expand All @@ -57,7 +65,14 @@ func (u *udpHandler) handleUDPAddr(udpAddr *net.UDPAddr, pkg []byte) {
if _, err := u.conn.WriteToUDP(rsp, udpAddr); err != nil {
TLOG.Errorf("send pkg to %v failed %v", udpAddr, err)
}
}()
}

cfg := u.config
if cfg.MaxInvoke > 0 { // use goroutine pool
u.pool.JobQueue <- handler
} else {
go handler()
}
}

func (u *udpHandler) Handle() error {
Expand Down

0 comments on commit 0277e02

Please sign in to comment.