-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathflow_control.go
118 lines (99 loc) · 2.92 KB
/
flow_control.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package quic
import (
"fmt"
"sync"
qerror "github.com/ami-GS/gQUIC/error"
"github.com/ami-GS/gQUIC/qtype"
)
type baseFlowController struct {
bytesSent qtype.QuicInt
bytesReceived qtype.QuicInt
largestSent qtype.QuicInt
largestReceived qtype.QuicInt
limitMutex sync.Mutex
MaxDataLimit qtype.QuicInt
}
func (f *baseFlowController) String() string {
return fmt.Sprintf("sent:%d, recvd:%d, largestSent:%d, largestRcvd:%d\nMaxDataLimit:%d", f.bytesSent, f.bytesReceived, f.largestSent, f.largestReceived, f.MaxDataLimit)
}
func (f *baseFlowController) maybeUpdateMaxDataLimit(newLimit qtype.QuicInt) bool {
f.limitMutex.Lock()
defer f.limitMutex.Unlock()
if f.MaxDataLimit < newLimit {
f.MaxDataLimit = newLimit
return true
}
return false
}
type StreamFlowController struct {
IsStreamZero bool
baseFlowController
connFC *ConnectionFlowController
}
func NewStreamFlowController(isZero bool, connFC *ConnectionFlowController) *StreamFlowController {
return &StreamFlowController{
IsStreamZero: isZero,
connFC: connFC,
baseFlowController: baseFlowController{
MaxDataLimit: qtype.MaxPayloadSizeIPv4, // TODO: set appropriately
},
}
}
type FlowControlFlag byte
const (
Sendable FlowControlFlag = 1
StreamBlocked FlowControlFlag = 2
ConnectionBlocked FlowControlFlag = 3
// will be represent by StreamBlcked * ConnectionBlocked
BothBlocked FlowControlFlag = 6
)
func (s *StreamFlowController) SendableByOffset(offset qtype.QuicInt) FlowControlFlag {
if offset > s.MaxDataLimit {
return StreamBlocked
}
return Sendable
}
func (s *StreamFlowController) ReceivableByOffset(offset qtype.QuicInt) error {
if offset > s.MaxDataLimit {
return qerror.FlowControlError
}
return nil
}
func (s *StreamFlowController) updateLargestReceived(offset qtype.QuicInt) {
s.largestReceived = offset
}
func (s *StreamFlowController) updateLargestSent(offset qtype.QuicInt) {
s.largestSent = offset
}
type ConnectionFlowController struct {
baseFlowController
updateMutex *sync.Mutex
}
func NewConnectionFlowController() *ConnectionFlowController {
return &ConnectionFlowController{
baseFlowController: baseFlowController{
MaxDataLimit: qtype.MaxPayloadSizeIPv4, //TODO: set appropriate
},
updateMutex: new(sync.Mutex),
}
}
func (c *ConnectionFlowController) SendableByOffset(largestOffset qtype.QuicInt) FlowControlFlag {
if c.bytesSent+largestOffset <= c.MaxDataLimit {
return Sendable
}
return ConnectionBlocked
}
func (c *ConnectionFlowController) ReceivableByOffset(largestOffset qtype.QuicInt) error {
if c.bytesReceived+largestOffset > c.MaxDataLimit {
return qerror.FlowControlError
}
return nil
}
func (c *ConnectionFlowController) updateByteSent(largestOffset qtype.QuicInt) {
c.updateMutex.Lock()
defer c.updateMutex.Unlock()
c.bytesSent += largestOffset
}
func (c *ConnectionFlowController) updateByteReceived(largestOffset qtype.QuicInt) {
c.bytesReceived += largestOffset
}