7
7
"github.com/sirupsen/logrus"
8
8
"github.com/slackhq/nebula/config"
9
9
"github.com/slackhq/nebula/iputil"
10
+ "gvisor.dev/gvisor/pkg/buffer"
10
11
)
11
12
12
13
func NewUserDeviceFromConfig (c * config.C , l * logrus.Logger , tunCidr * net.IPNet , routines int ) (Device , error ) {
@@ -15,25 +16,18 @@ func NewUserDeviceFromConfig(c *config.C, l *logrus.Logger, tunCidr *net.IPNet,
15
16
16
17
func NewUserDevice (tunCidr * net.IPNet ) (Device , error ) {
17
18
// these pipes guarantee each write/read will match 1:1
18
- or , ow := io .Pipe ()
19
- ir , iw := io .Pipe ()
20
19
return & UserDevice {
21
- tunCidr : tunCidr ,
22
- outboundReader : or ,
23
- outboundWriter : ow ,
24
- inboundReader : ir ,
25
- inboundWriter : iw ,
20
+ tunCidr : tunCidr ,
21
+ outboundChannel : make (chan * buffer.View ),
22
+ inboundChannel : make (chan * buffer.View ),
26
23
}, nil
27
24
}
28
25
29
26
type UserDevice struct {
30
27
tunCidr * net.IPNet
31
28
32
- outboundReader * io.PipeReader
33
- outboundWriter * io.PipeWriter
34
-
35
- inboundReader * io.PipeReader
36
- inboundWriter * io.PipeWriter
29
+ outboundChannel chan * buffer.View
30
+ inboundChannel chan * buffer.View
37
31
}
38
32
39
33
func (d * UserDevice ) Activate () error {
@@ -46,18 +40,41 @@ func (d *UserDevice) NewMultiQueueReader() (io.ReadWriteCloser, error) {
46
40
return d , nil
47
41
}
48
42
49
- func (d * UserDevice ) Pipe () (* io. PipeReader , * io. PipeWriter ) {
50
- return d .inboundReader , d .outboundWriter
43
+ func (d * UserDevice ) Pipe () (<- chan * buffer. View , chan <- * buffer. View ) {
44
+ return d .inboundChannel , d .outboundChannel
51
45
}
52
46
53
47
func (d * UserDevice ) Read (p []byte ) (n int , err error ) {
54
- return d .outboundReader .Read (p )
48
+ view , ok := <- d .outboundChannel
49
+ if ! ok {
50
+ return 0 , io .EOF
51
+ }
52
+ return view .Read (p )
53
+ }
54
+ func (d * UserDevice ) WriteTo (w io.Writer ) (n int64 , err error ) {
55
+ view , ok := <- d .outboundChannel
56
+ if ! ok {
57
+ return 0 , io .EOF
58
+ }
59
+ return view .WriteTo (w )
55
60
}
61
+
56
62
func (d * UserDevice ) Write (p []byte ) (n int , err error ) {
57
- return d .inboundWriter .Write (p )
63
+ view := buffer .NewViewWithData (p )
64
+ d .inboundChannel <- view
65
+ return view .Size (), nil
58
66
}
67
+ func (d * UserDevice ) ReadFrom (r io.Reader ) (n int64 , err error ) {
68
+ view := buffer .NewViewSize (2048 )
69
+ n , err = view .ReadFrom (r )
70
+ if n > 0 {
71
+ d .inboundChannel <- view
72
+ }
73
+ return
74
+ }
75
+
59
76
func (d * UserDevice ) Close () error {
60
- d . inboundWriter . Close ( )
61
- d . outboundWriter . Close ( )
77
+ close ( d . inboundChannel )
78
+ close ( d . outboundChannel )
62
79
return nil
63
80
}
0 commit comments