-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathsession_oneway.go
156 lines (140 loc) · 3.17 KB
/
session_oneway.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Copyright (C) 2018 Kun Zhong All rights reserved.
// Use of this source code is governed by a Licensed under the Apache License, Version 2.0 (the "License");
package grpcx
import (
xlog "github.com/pigogo/grpcx/grpclog"
"golang.org/x/net/context"
)
type oneState byte
const (
oneRequst oneState = 0
oneResendReq oneState = 1
oneFinish oneState = 2
)
func newOnewaySession(ctx context.Context, cc *ClientConn, method string, opts ...CallOption) (_ *onewaySession, err error) {
var (
conn *connDial
)
c := defaultCallInfo
mc := cc.GetMethodConfig(method)
if mc.WaitForReady != nil {
c.failFast = !*mc.WaitForReady
}
opts = append(cc.dopts.callOptions, opts...)
for _, o := range opts {
if err := o.before(&c); err != nil {
return nil, err
}
}
c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
hkey := uint32(0)
if c.hbKey != nil {
hkey = *c.hbKey
}
gopts := BalancerGetOptions{
BlockingWait: !c.failFast,
HashKey: hkey,
}
getConn := func() (*connDial, func(), error) {
for {
conn, put, err := cc.getConn(ctx, gopts)
if err != nil {
if err == errConnClosing || err == errConnUnavailable {
if c.failFast {
return nil, nil, err
}
continue
}
return nil, nil, err
}
return conn, put, err
}
}
conn, _, err = getConn()
if err != nil {
xlog.Warningf("grpcx: newOnewaySession getConn fail:%v", err)
return nil, err
}
cs := &onewaySession{
opts: c,
getConn: getConn,
conn: conn,
method: method,
state: oneRequst,
sessionid: cc.genStreamID(),
}
return cs, nil
}
type onewaySession struct {
opts callInfo
conn *connDial
method string
msg interface{}
header *PackHeader
sessionid int64
getConn func() (*connDial, func(), error)
err error
packet *netPack
state oneState
resendTimes int
}
func (cs *onewaySession) Run(args interface{}) (err error) {
for {
switch cs.state {
case oneRequst:
err = cs.SendMsg(args)
if err != nil {
return
}
case oneResendReq:
err = cs.resend()
if err != nil {
return err
}
case oneFinish:
return
}
}
}
func (cs *onewaySession) SendMsg(m interface{}) (err error) {
cs.header = &PackHeader{
Ptype: PackType_REQ,
Methord: cs.method,
Sessionid: cs.sessionid,
}
if cs.opts.token != nil {
cs.header.Metadata = withToken(cs.header.Metadata, *cs.opts.token)
}
cs.msg = m
cs.err = cs.conn.send(cs.header, m, *cs.opts.maxSendMessageSize)
if cs.err != nil {
xlog.Warningf("grpcx: SendMsg fail:%v", cs.err)
conn, put, e := cs.getConn()
if e != nil || conn == cs.conn {
return cs.err
}
cs.conn, _ = conn, put
cs.state = oneResendReq
return nil
}
cs.state = oneFinish
return
}
func (cs *onewaySession) resend() error {
cs.resendTimes++
if cs.resendTimes >= maxResendTimes {
return cs.err
}
cs.err = cs.conn.send(cs.header, cs.msg, *cs.opts.maxSendMessageSize)
if cs.err != nil {
xlog.Warningf("grpcx: resend fail:%v", cs.err)
conn, put, e := cs.getConn()
if e != nil || cs.conn == conn {
return cs.err
}
cs.conn, _ = conn, put
return nil
}
cs.state = oneFinish
return nil
}