forked from getlantern/marionette
-
Notifications
You must be signed in to change notification settings - Fork 0
/
conn.go
207 lines (177 loc) · 4.9 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
package marionette
import (
"io"
"net"
"strings"
"sync"
)
// BufferedConn wraps a net.Conn and continually reads from it into a buffer.
//
// The buffer is inspectable and seekable by the caller. This provides buffering
// until a complete cell can be decoded from the connection. The buffer is sized
// based on the max cell size and does not support cells that exceed that size.
type BufferedConn struct {
net.Conn
// Current buffer & last error, protected for concurrent use.
mu sync.RWMutex
buf []byte
err error
// Close management.
closing chan struct{}
once sync.Once
// Channels used to notify caller when the connection & buffer have changed.
seekNotify chan struct{} // sent when seeking forward
writeNotify chan struct{} // sent when data has been written to the buffer.
}
// NewBufferedConn returns a new BufferConn wrapping conn, sized to bufferSize.
func NewBufferedConn(conn net.Conn, bufferSize int) *BufferedConn {
c := &BufferedConn{
Conn: conn,
buf: make([]byte, 0, bufferSize*2),
closing: make(chan struct{}, 0),
seekNotify: make(chan struct{}, 1),
writeNotify: make(chan struct{}, 1),
}
go c.monitor()
return c
}
// Close closes the connection.
func (conn *BufferedConn) Close() error {
conn.once.Do(func() { close(conn.closing) })
return conn.Conn.Close()
}
// Append adds b to the end of the buffer, under lock.
func (conn *BufferedConn) Append(b []byte) {
conn.mu.Lock()
defer conn.mu.Unlock()
copy(conn.buf[len(conn.buf):len(conn.buf)+len(b)], b)
conn.buf = conn.buf[:len(conn.buf)+len(b)]
}
// Read is unavailable for BufferedConn.
func (conn *BufferedConn) Read(p []byte) (int, error) {
panic("BufferedConn.Read(): unavailable, use Peek/Seek")
}
// Peek returns the first n bytes of the read buffer.
// If n is -1 then returns any available data after attempting a read.
func (conn *BufferedConn) Peek(n int, blocking bool) ([]byte, error) {
for {
// Read buffer & error from monitor under read lock.
conn.mu.RLock()
buf, err := conn.buf, conn.err
conn.mu.RUnlock()
// Return any data that exists in the buffer.
switch n {
case -1:
if len(buf) > 0 {
return buf, nil
} else if err != nil {
return nil, err
}
default:
if n <= len(buf) {
return buf[:n], nil
} else if isEOFError(err) {
return buf, io.EOF
} else if err != nil {
return buf, err
}
}
// Exit immediately if we are not blocking.
if !blocking {
return buf, err
}
// Wait for a new write or error from the monitor.
<-conn.writeNotify
}
}
// Seek moves the buffer forward a given number of bytes.
// This implementation only supports io.SeekCurrent.
func (conn *BufferedConn) Seek(offset int64, whence int) (int64, error) {
assert(whence == io.SeekCurrent)
conn.mu.Lock()
defer conn.mu.Unlock()
assert(offset <= int64(len(conn.buf)))
b := conn.buf[offset:]
conn.buf = conn.buf[:len(b)]
copy(conn.buf, b)
conn.notifySeek()
return 0, nil
}
// monitor runs in a separate goroutine and continually reads to the buffer.
func (conn *BufferedConn) monitor() {
conn.mu.RLock()
buf := make([]byte, cap(conn.buf))
conn.mu.RUnlock()
for {
// Ensure connection is not closed.
select {
case <-conn.closing:
return
default:
}
// Determine remaining space on buffer.
// If no capacity remains then wait for seek or connection close.
conn.mu.RLock()
capacity := cap(conn.buf) - len(conn.buf)
conn.mu.RUnlock()
if capacity == 0 {
select {
case <-conn.closing:
return
case <-conn.seekNotify:
continue
}
}
// Attempt to read next bytes from connection.
n, err := conn.Conn.Read(buf[:capacity])
// Append bytes to connection buffer.
if n > 0 {
conn.Append(buf[:n])
conn.notifyWrite()
}
// If an error occurred then save on connection and exit.
if err != nil && !isTemporaryError(err) {
conn.mu.Lock()
conn.err = err
conn.mu.Unlock()
conn.notifyWrite()
return
}
}
}
// notifySeek performs a non-blocking send to the seekNotify channel.
func (conn *BufferedConn) notifySeek() {
select {
case conn.seekNotify <- struct{}{}:
default:
}
}
// notifyWrite performs a non-blocking send to the seekWrite channel.
func (conn *BufferedConn) notifyWrite() {
select {
case conn.writeNotify <- struct{}{}:
default:
}
}
// isTimeoutError returns true if the error is a timeout error.
func isTimeoutError(err error) bool {
if err == nil {
return false
} else if err, ok := err.(interface{ Timeout() bool }); ok && err.Timeout() {
return true
}
return false
}
// isTemporaryError returns true if the error is a temporary error.
func isTemporaryError(err error) bool {
if err == nil {
return false
} else if err, ok := err.(interface{ Temporary() bool }); ok && err.Temporary() {
return true
}
return false
}
// isEOFError returns true if error represents a closed connection.
func isEOFError(err error) bool {
return err != nil && strings.Contains(err.Error(), "connection reset by peer")
}