Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 31 additions & 111 deletions network/packet_listener_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,67 +15,64 @@
package network

import (
"context"
"errors"
"io"
"net"
"net/netip"
"sync"
"time"

"golang.getoutline.org/sdk/internal/slicepool"
"golang.getoutline.org/sdk/network/packetrelay"
"golang.getoutline.org/sdk/transport"
)

// this was the buffer size used before, we may consider update it in the future
const packetMaxSize = 2048

// packetBufferPool is used to create buffers to read UDP response packets
var packetBufferPool = slicepool.MakePool(packetMaxSize)

// Compilation guard against interface implementation
var _ PacketProxy = (*PacketListenerProxy)(nil)
var _ PacketRequestSender = (*packetListenerRequestSender)(nil)

// PacketListenerProxy creates a new [PacketProxy] that uses the existing [transport.PacketListener] to
// create connections to a proxy.
//
// Deprecated: Use [packetrelay.PacketListenerRelay] instead.
type PacketListenerProxy struct {
listener transport.PacketListener
writeIdleTimeout time.Duration
}

type packetListenerRequestSender struct {
mu sync.Mutex // Protects closed and timer function calls
closed bool

proxyConn net.PacketConn
relay packetrelay.PacketRelay
baseRelay *packetrelay.PacketListenerRelay
writeIdleTimeout time.Duration
writeIdleTimer *time.Timer
}

// NewPacketProxyFromPacketListener creates a new [PacketProxy] that uses the existing [transport.PacketListener] to
// create connections to a proxy. You can also specify additional options.
// This function is useful if you already have an implementation of [transport.PacketListener] and you want to use it
// with one of the network stacks (for example, network/lwip2transport) as a UDP traffic handler.
//
// Deprecated: Use [packetrelay.NewPacketRelayFromPacketListener] instead.
func NewPacketProxyFromPacketListener(pl transport.PacketListener, options ...func(*PacketListenerProxy) error) (*PacketListenerProxy, error) {
if pl == nil {
return nil, errors.New("pl must not be nil")
// Create the underlying base relay
baseRelay, err := packetrelay.NewPacketRelayFromPacketListener(pl)
if err != nil {
return nil, err
}

p := &PacketListenerProxy{
listener: pl,
writeIdleTimeout: 30 * time.Second,
baseRelay: baseRelay,
writeIdleTimeout: 30 * time.Second, // Default timeout
}

// Apply options
for _, opt := range options {
if err := opt(p); err != nil {
return nil, err
}
}

// Build the final relay chain: TimeoutPacketRelay(PacketListenerRelay)
timeoutRelay, err := packetrelay.NewTimeoutPacketRelay(p.baseRelay, p.writeIdleTimeout)
if err != nil {
return nil, err
}
p.relay = timeoutRelay

return p, nil
}

// WithPacketListenerWriteIdleTimeout sets the write idle timeout of the [PacketListenerProxy].
// This means that if there are no WriteTo operations on the UDP session created by NewSession for the specified amount
// of time, the proxy will end this session.
//
// This should be used together with the [NewPacketProxyFromPacketListenerWithOptions] function.
// Deprecated: Use [packetrelay.NewTimeoutPacketRelay] to decorate the underlying [packetrelay.PacketRelay] instead.
func WithPacketListenerWriteIdleTimeout(timeout time.Duration) func(*PacketListenerProxy) error {
return func(p *PacketListenerProxy) error {
if timeout <= 0 {
Expand All @@ -86,85 +83,8 @@ func WithPacketListenerWriteIdleTimeout(timeout time.Duration) func(*PacketListe
}
}

// NewSession implements [PacketProxy].NewSession function. It uses [transport.PacketListener].ListenPacket to create
// a [net.PacketConn], and constructs a new [PacketRequestSender] that is based on this [net.PacketConn].
func (proxy *PacketListenerProxy) NewSession(respWriter PacketResponseReceiver) (PacketRequestSender, error) {
if respWriter == nil {
return nil, errors.New("respWriter must not be nil")
}
proxyConn, err := proxy.listener.ListenPacket(context.Background())
if err != nil {
return nil, err
}
reqSender := &packetListenerRequestSender{
proxyConn: proxyConn,
writeIdleTimeout: proxy.writeIdleTimeout,
}

// Terminate the session after timeout with no outgoing writes (deadline is refreshed by WriteTo)
reqSender.writeIdleTimer = time.AfterFunc(reqSender.writeIdleTimeout, func() {
reqSender.Close()
})

// Relay incoming UDP responses from the proxy asynchronously until EOF, session expiration or error
go func() {
defer respWriter.Close()

// Allocate buffer from slicepool, because `go build -gcflags="-m"` shows a local array will escape to heap
slice := packetBufferPool.LazySlice()
buf := slice.Acquire()
defer slice.Release()

for {
n, srcAddr, err := proxyConn.ReadFrom(buf)
if err != nil {
// Ignore some specific recoverable errors
if errors.Is(err, io.ErrShortBuffer) {
continue
}
return
}
if _, err := respWriter.WriteFrom(buf[:n], srcAddr); err != nil {
return
}
}
}()

return reqSender, nil
}

// WriteTo implements [PacketRequestSender].WriteTo function. It simply forwards the packet to the underlying
// [net.PacketConn].WriteTo function.
func (s *packetListenerRequestSender) WriteTo(p []byte, destination netip.AddrPort) (int, error) {
if err := s.resetWriteIdleTimer(); err != nil {
return 0, err
}
return s.proxyConn.WriteTo(p, net.UDPAddrFromAddrPort(destination))
}

// Close implements [PacketRequestSender].Close function. It closes the underlying [net.PacketConn]. This will also
// terminate the goroutine created in NewSession because s.conn.ReadFrom will return [io.EOF].
func (s *packetListenerRequestSender) Close() error {
s.mu.Lock()
defer s.mu.Unlock()

if s.closed {
return ErrClosed
}
s.closed = true
s.writeIdleTimer.Stop()
return s.proxyConn.Close()
}

// resetWriteIdleTimer extends the writeIdleTimer's timeout to now() + writeIdleTimeout. If `s` is closed, it will
// return ErrClosed.
func (s *packetListenerRequestSender) resetWriteIdleTimer() error {
s.mu.Lock()
defer s.mu.Unlock()

if s.closed {
return ErrClosed
}
s.writeIdleTimer.Reset(s.writeIdleTimeout)
return nil
// NewSession implements [PacketProxy].NewSession. It uses the adapter pattern to wrap the underlying relay.
func (p *PacketListenerProxy) NewSession(respReceiver PacketResponseReceiver) (PacketRequestSender, error) {
adapter := NewPacketProxyFromPacketRelay(p.relay)
return adapter.NewSession(respReceiver)
}