diff --git a/tars/transport/udphandler.go b/tars/transport/udphandler.go index b7a32668..0a2d6af3 100755 --- a/tars/transport/udphandler.go +++ b/tars/transport/udphandler.go @@ -7,6 +7,8 @@ import ( "sync/atomic" "time" + "github.com/TarsCloud/TarsGo/tars/util/gpool" + "github.com/TarsCloud/TarsGo/tars/protocol/res/basef" "github.com/TarsCloud/TarsGo/tars/util/current" "github.com/TarsCloud/TarsGo/tars/util/grace" @@ -17,6 +19,7 @@ type udpHandler struct { server *TarsServer conn *net.UDPConn + pool *gpool.Pool } func (u *udpHandler) Listen() (err error) { @@ -26,6 +29,11 @@ func (u *udpHandler) Listen() (err error) { return err } TLOG.Info("UDP listen", u.conn.LocalAddr()) + + // init goroutine pool + if cfg.MaxInvoke > 0 { + u.pool = gpool.NewPool(int(cfg.MaxInvoke), cfg.QueueCap) + } return nil } @@ -41,7 +49,7 @@ func (u *udpHandler) getConnContext(udpAddr *net.UDPAddr) context.Context { func (u *udpHandler) handleUDPAddr(udpAddr *net.UDPAddr, pkg []byte) { ctx := u.getConnContext(udpAddr) atomic.AddInt32(&u.server.numInvoke, 1) - go func() { + handler := func() { defer atomic.AddInt32(&u.server.numInvoke, -1) rsp := u.server.invoke(ctx, pkg) // no need to check package @@ -57,7 +65,14 @@ func (u *udpHandler) handleUDPAddr(udpAddr *net.UDPAddr, pkg []byte) { if _, err := u.conn.WriteToUDP(rsp, udpAddr); err != nil { TLOG.Errorf("send pkg to %v failed %v", udpAddr, err) } - }() + } + + cfg := u.config + if cfg.MaxInvoke > 0 { // use goroutine pool + u.pool.JobQueue <- handler + } else { + go handler() + } } func (u *udpHandler) Handle() error {