Skip to content

Commit d70bcc2

Browse files
committed
[MOD]优化
1 parent 35c1cdd commit d70bcc2

File tree

9 files changed

+89
-13
lines changed

9 files changed

+89
-13
lines changed

network/client.go

+5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ type Session interface {
1111
SendMsg(msg proto.Message)
1212
SendRawMsg(msgID uint32, data []byte)
1313
ID() int32
14+
Close()
1415
}
1516

1617
type Client struct {
@@ -84,3 +85,7 @@ func (p *Client) SendRawMsg(msgID uint32, data []byte) {
8485
func (p *Client) ID() int32 {
8586
return p.conn.ID()
8687
}
88+
89+
func (p *Client) Close() {
90+
p.conn.Close()
91+
}

network/conn.go

+1
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ type Conn interface {
77
WriteMsg(args []byte) error
88
LocalAddr() net.Addr
99
ID() int32
10+
Close()
1011
}

rpc/client.go

-2
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,6 @@ func (p *Client) ReadLoop() error {
150150
} else if err != nil {
151151
return err
152152
}
153-
util.PrintCurrNano("client data")
154153
rpcData := &rpcmsg.Data{}
155154
err = proto.Unmarshal(m.Data, rpcData)
156155
if err != nil {
@@ -179,7 +178,6 @@ func (p *Client) handle(rpcData *rpcmsg.Data) {
179178
return
180179
}
181180
case rpcmsg.Data_Response:
182-
util.PrintCurrNano("client Data_Response")
183181
err := p.processor.HandleResponse(seqID, data)
184182
if err != nil {
185183
logrus.WithError(err)

rpc/rpc.go

+4
Original file line numberDiff line numberDiff line change
@@ -105,3 +105,7 @@ func (p *RPC) GetServerByType(serverType ServerType) Server {
105105
func (p *RPC) RegisterSend2Session(send2Session func(sesID int32, msgID uint32, data []byte)) {
106106
p.client.RegisterSend2Session(send2Session)
107107
}
108+
109+
func (p *RPC) Session(gateID, sesID int32) Session {
110+
return NewSession(p.client, gateID, sesID)
111+
}

rpc/server.go

+25-5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ type Server interface {
2222

2323
//func(*msg.XXX, error))
2424
Request(proto.Message, interface{}) error
25+
26+
ID() int32
2527
}
2628

2729
type server struct {
@@ -42,6 +44,10 @@ func (p *server) Send(msg proto.Message) {
4244
p.rpcClient.SendMsg(p.serverTopic, msg)
4345
}
4446

47+
func (p *server) ID() int32 {
48+
return p.serverid
49+
}
50+
4551
//func (p *server) Request(msg proto.Message, f func(proto.Message, error)) error {
4652
// p.rpcClient.Request(p.serverTopic, msg, f)
4753
// return nil
@@ -92,31 +98,45 @@ func (p *requestserver) Answer(msg proto.Message) {
9298
p.server.rpcClient.Answer(p.serverTopic, p.seqid, msg)
9399
}
94100

101+
func (p *requestserver) ID() int32 {
102+
return p.serverid
103+
}
104+
95105
type Session interface {
96106
SendMsg(msg proto.Message)
97107
SendRawMsg(msgID uint16, data []byte)
108+
GateSessionID() GateSessionID
109+
}
110+
111+
type GateSessionID struct {
112+
GateID int32
113+
SesID int32
98114
}
99115

100116
type session struct {
101-
sid int32
117+
gsID GateSessionID
102118
rpcClient *Client
103-
gateID int32
104119
gateTopic string
105120
}
106121

107122
func NewSession(client *Client, gateID int32, sesID int32) Session {
123+
g := GateSessionID{GateID: gateID, SesID: sesID}
124+
108125
return &session{
109-
sid: sesID,
110-
gateID: gateID,
126+
gsID: g,
111127
rpcClient: client,
112128
gateTopic: fmt.Sprintf("%v", gateID),
113129
}
114130
}
115131

116132
func (p *session) SendMsg(msg proto.Message) {
117-
p.rpcClient.RouteGate(p.gateTopic, p.sid, msg)
133+
p.rpcClient.RouteGate(p.gateTopic, p.gsID.SesID, msg)
118134
}
119135

120136
func (p *session) SendRawMsg(msgID uint16, data []byte) {
121137

122138
}
139+
140+
func (p *session) GateSessionID() GateSessionID {
141+
return p.gsID
142+
}

server/gate.go

+4
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,7 @@ func (p *Gate) RouteSessionMsg(msg proto.Message, serverID int32) {
6666
p.GetServerById(serverID).RouteSession2Server(s.ID(), msg)
6767
})
6868
}
69+
70+
func (p *Gate) RegisterRawSessionMsgHandler(msg proto.Message, f func(s network.Session, message proto.Message)) {
71+
p.networkMgr.RegisterRawSessionMsgHandler(msg, f)
72+
}

server/server.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ import (
66
)
77

88
type Server struct {
9-
worker service.Worker
10-
rpc *rpc.RPC
9+
worker service.Worker
10+
rpc *rpc.RPC
11+
serverID int32
1112
}
1213

1314
func NewServer(serverID int32) (*Server, error) {
@@ -18,6 +19,7 @@ func NewServer(serverID int32) (*Server, error) {
1819
return nil, err
1920
}
2021
p.rpc = rpc
22+
p.serverID = serverID
2123
return p, nil
2224
}
2325

@@ -27,6 +29,10 @@ func (p *Server) Run() {
2729
p.rpc.Run()
2830
}
2931

32+
func (p *Server) Worker() service.Worker {
33+
return p.worker
34+
}
35+
3036
func (p *Server) Post(f func()) {
3137
p.worker.Post(f)
3238
}
@@ -51,3 +57,11 @@ func (p *Server) RegisterSessionMsgHandler(cb interface{}) {
5157
func (p *Server) RegisterServerHandler(cb interface{}) {
5258
p.rpc.RegisterServerMsgHandler(cb)
5359
}
60+
61+
func (p *Server) ID() int32 {
62+
return p.serverID
63+
}
64+
65+
func (p *Server) RPCSession(s rpc.GateSessionID) rpc.Session {
66+
return p.rpc.Session(s.GateID, s.SesID)
67+
}

service/worker.go

+21-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package service
22

33
import (
4+
"github.com/0990/goserver/util"
45
"runtime/debug"
56
"time"
67
)
@@ -10,6 +11,8 @@ type Worker interface {
1011
Run()
1112
Close()
1213
AfterPost(duration time.Duration, f func()) *time.Timer
14+
NewTicker(d time.Duration, f func()) *time.Ticker
15+
Len() int
1316
}
1417

1518
//TODO 这里的实现 chan如果塞满会阻塞进程,可对比参照github.com/davyxu/cellnet EventQueue实现,选方案
@@ -19,7 +22,7 @@ type Work struct {
1922

2023
func NewWorker() Worker {
2124
p := new(Work)
22-
p.funChan = make(chan func(), 1024)
25+
p.funChan = make(chan func(), 10240)
2326
return p
2427
}
2528

@@ -30,7 +33,7 @@ func (p *Work) Post(f func()) {
3033
func (p *Work) Run() {
3134
go func() {
3235
for f := range p.funChan {
33-
p.protectedFun(f)
36+
util.ProtectedFun(f)
3437
}
3538
}()
3639
}
@@ -39,6 +42,9 @@ func (p *Work) Close() {
3942
close(p.funChan)
4043
}
4144

45+
func (p *Work) Len() int {
46+
return len(p.funChan)
47+
}
4248
func (p *Work) protectedFun(callback func()) {
4349
//TODO 每个函数都包装了defer,性能怎样?
4450
defer func() {
@@ -49,8 +55,19 @@ func (p *Work) protectedFun(callback func()) {
4955
callback()
5056
}
5157

52-
func (p *Work) AfterPost(duration time.Duration, f func()) *time.Timer {
53-
return time.AfterFunc(duration, func() {
58+
func (p *Work) AfterPost(d time.Duration, f func()) *time.Timer {
59+
return time.AfterFunc(d, func() {
5460
p.Post(f)
5561
})
5662
}
63+
64+
func (p *Work) NewTicker(d time.Duration, f func()) *time.Ticker {
65+
ticker := time.NewTicker(d)
66+
go func() {
67+
for range ticker.C {
68+
p.Post(f)
69+
//util.ProtectedFun(f)
70+
}
71+
}()
72+
return ticker
73+
}

util/fun.go

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package util
2+
3+
import "runtime/debug"
4+
5+
func ProtectedFun(f func()) {
6+
//TODO 每个函数都包装了defer,性能怎样?
7+
defer func() {
8+
if err := recover(); err != nil {
9+
debug.PrintStack()
10+
}
11+
}()
12+
f()
13+
}

0 commit comments

Comments
 (0)