Skip to content

Commit

Permalink
perf(transport): optimize server handle
Browse files Browse the repository at this point in the history
  • Loading branch information
lbbniu committed Jan 12, 2024
1 parent 0d66280 commit f841bf1
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 20 deletions.
2 changes: 1 addition & 1 deletion tars/transport/tcphandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (t *tcpHandler) getConnContext(connSt *connInfo) context.Context {
func (t *tcpHandler) handleConn(connSt *connInfo, pkg []byte) {
// recvPkgTs are more accurate
ctx := t.getConnContext(connSt)
atomic.AddInt32(&connSt.numInvoke, 1)
handler := func() {
defer atomic.AddInt32(&connSt.numInvoke, -1)
rsp := t.server.invoke(ctx, pkg)
Expand Down Expand Up @@ -262,7 +263,6 @@ func (t *tcpHandler) recv(connSt *connInfo) {
break
}
if status == PackageFull {
atomic.AddInt32(&connSt.numInvoke, 1)
pkg := make([]byte, pkgLen)
copy(pkg, currBuffer[:pkgLen])
currBuffer = currBuffer[pkgLen:]
Expand Down
42 changes: 23 additions & 19 deletions tars/transport/udphandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,28 @@ func (u *udpHandler) getConnContext(udpAddr *net.UDPAddr) context.Context {
return ctx
}

func (u *udpHandler) handleUDPAddr(udpAddr *net.UDPAddr, pkg []byte) {
ctx := u.getConnContext(udpAddr)
atomic.AddInt32(&u.server.numInvoke, 1)
go func() {
defer atomic.AddInt32(&u.server.numInvoke, -1)
rsp := u.server.invoke(ctx, pkg) // no need to check package

cPacketType, ok := current.GetPacketTypeFromContext(ctx)
if !ok {
TLOG.Error("Failed to GetPacketTypeFromContext")
}

if cPacketType == basef.TARSONEWAY {
return
}

if _, err := u.conn.WriteToUDP(rsp, udpAddr); err != nil {
TLOG.Errorf("send pkg to %v failed %v", udpAddr, err)
}
}()
}

func (u *udpHandler) Handle() error {
atomic.AddInt32(&u.server.numConn, 1)
// wait invoke done
Expand Down Expand Up @@ -67,25 +89,7 @@ func (u *udpHandler) Handle() error {
}
pkg := make([]byte, n)
copy(pkg, buffer[0:n])
ctx := u.getConnContext(udpAddr)
go func() {
atomic.AddInt32(&u.server.numInvoke, 1)
defer atomic.AddInt32(&u.server.numInvoke, -1)
rsp := u.server.invoke(ctx, pkg) // no need to check package

cPacketType, ok := current.GetPacketTypeFromContext(ctx)
if !ok {
TLOG.Error("Failed to GetPacketTypeFromContext")
}

if cPacketType == basef.TARSONEWAY {
return
}

if _, err := u.conn.WriteToUDP(rsp, udpAddr); err != nil {
TLOG.Errorf("send pkg to %v failed %v", udpAddr, err)
}
}()
u.handleUDPAddr(udpAddr, pkg)
}
}

Expand Down

0 comments on commit f841bf1

Please sign in to comment.