Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client] WGwatcher error handling #3254

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 15 additions & 22 deletions client/internal/peer/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,11 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
semaphore: semaphore,
}

rFns := WorkerRelayCallbacks{
OnConnReady: conn.relayConnectionIsReady,
OnDisconnected: conn.onWorkerRelayStateDisconnected,
}

wFns := WorkerICECallbacks{
OnConnReady: conn.iCEConnectionIsReady,
OnStatusChanged: conn.onWorkerICEStateDisconnected,
}

ctrl := isController(config)
conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, relayManager, rFns)
conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, conn, relayManager)

relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
conn.workerICE, err = NewWorkerICE(ctx, connLog, config, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally, wFns)
conn.workerICE, err = NewWorkerICE(ctx, connLog, config, conn, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -304,7 +294,7 @@ func (conn *Conn) GetKey() string {
}

// configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected
func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) {
func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) {
conn.mu.Lock()
defer conn.mu.Unlock()

Expand Down Expand Up @@ -376,15 +366,15 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon
}

// todo review to make sense to handle connecting and disconnected status also?
func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
func (conn *Conn) onICEStateDisconnected() {
conn.mu.Lock()
defer conn.mu.Unlock()

if conn.ctx.Err() != nil {
return
}

conn.log.Tracef("ICE connection state changed to %s", newState)
conn.log.Tracef("ICE connection state changed to disconnected")

if conn.wgProxyICE != nil {
if err := conn.wgProxyICE.CloseConn(); err != nil {
Expand All @@ -404,10 +394,11 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
conn.currentConnPriority = connPriorityRelay
}

changed := conn.statusICE.Get() != newState && newState != StatusConnecting
conn.statusICE.Set(newState)

conn.guard.SetICEConnDisconnected(changed)
changed := conn.statusICE.Get() != StatusDisconnected
if changed {
conn.guard.SetICEConnDisconnected()
}
conn.statusICE.Set(StatusDisconnected)

peerState := State{
PubKey: conn.config.Key,
Expand All @@ -422,7 +413,7 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
}
}

func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) {
func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
conn.mu.Lock()
defer conn.mu.Unlock()

Expand Down Expand Up @@ -474,7 +465,7 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) {
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr)
}

func (conn *Conn) onWorkerRelayStateDisconnected() {
func (conn *Conn) onRelayDisconnected() {
conn.mu.Lock()
defer conn.mu.Unlock()

Expand All @@ -497,8 +488,10 @@ func (conn *Conn) onWorkerRelayStateDisconnected() {
}

changed := conn.statusRelay.Get() != StatusDisconnected
if changed {
conn.guard.SetRelayedConnDisconnected()
}
conn.statusRelay.Set(StatusDisconnected)
conn.guard.SetRelayedConnDisconnected(changed)

peerState := State{
PubKey: conn.config.Key,
Expand Down
36 changes: 12 additions & 24 deletions client/internal/peer/guard/guard.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type Guard struct {
isConnectedOnAllWay isConnectedFunc
timeout time.Duration
srWatcher *SRWatcher
relayedConnDisconnected chan bool
iCEConnDisconnected chan bool
relayedConnDisconnected chan struct{}
iCEConnDisconnected chan struct{}
}

func NewGuard(log *log.Entry, isController bool, isConnectedFn isConnectedFunc, timeout time.Duration, srWatcher *SRWatcher) *Guard {
Expand All @@ -41,8 +41,8 @@ func NewGuard(log *log.Entry, isController bool, isConnectedFn isConnectedFunc,
isConnectedOnAllWay: isConnectedFn,
timeout: timeout,
srWatcher: srWatcher,
relayedConnDisconnected: make(chan bool, 1),
iCEConnDisconnected: make(chan bool, 1),
relayedConnDisconnected: make(chan struct{}, 1),
iCEConnDisconnected: make(chan struct{}, 1),
}
}

Expand All @@ -54,16 +54,16 @@ func (g *Guard) Start(ctx context.Context) {
}
}

func (g *Guard) SetRelayedConnDisconnected(changed bool) {
func (g *Guard) SetRelayedConnDisconnected() {
select {
case g.relayedConnDisconnected <- changed:
case g.relayedConnDisconnected <- struct{}{}:
default:
}
}

func (g *Guard) SetICEConnDisconnected(changed bool) {
func (g *Guard) SetICEConnDisconnected() {
select {
case g.iCEConnDisconnected <- changed:
case g.iCEConnDisconnected <- struct{}{}:
default:
}
}
Expand Down Expand Up @@ -96,19 +96,13 @@ func (g *Guard) reconnectLoopWithRetry(ctx context.Context) {
g.triggerOfferSending()
}

case changed := <-g.relayedConnDisconnected:
if !changed {
continue
}
case <-g.relayedConnDisconnected:
g.log.Debugf("Relay connection changed, reset reconnection ticker")
ticker.Stop()
ticker = g.prepareExponentTicker(ctx)
tickerChannel = ticker.C

case changed := <-g.iCEConnDisconnected:
if !changed {
continue
}
case <-g.iCEConnDisconnected:
g.log.Debugf("ICE connection changed, reset reconnection ticker")
ticker.Stop()
ticker = g.prepareExponentTicker(ctx)
Expand Down Expand Up @@ -138,16 +132,10 @@ func (g *Guard) listenForDisconnectEvents(ctx context.Context) {
g.log.Infof("start listen for reconnect events...")
for {
select {
case changed := <-g.relayedConnDisconnected:
if !changed {
continue
}
case <-g.relayedConnDisconnected:
g.log.Debugf("Relay connection changed, triggering reconnect")
g.triggerOfferSending()
case changed := <-g.iCEConnDisconnected:
if !changed {
continue
}
case <-g.iCEConnDisconnected:
g.log.Debugf("ICE state changed, try to send new offer")
g.triggerOfferSending()
case <-srReconnectedChan:
Expand Down
134 changes: 134 additions & 0 deletions client/internal/peer/wg_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package peer

import (
"context"
"sync"
"time"

log "github.com/sirupsen/logrus"

"github.com/netbirdio/netbird/client/iface/configurer"
)

const (
wgHandshakePeriod = 3 * time.Minute
)

var (
wgHandshakeOvertime = 30 * time.Second
checkPeriod = wgHandshakePeriod + wgHandshakeOvertime
)

type WGInterfaceStater interface {
GetStats(key string) (configurer.WGStats, error)
}

type WGWatcher struct {
log *log.Entry
wgIfaceStater WGInterfaceStater
peerKey string

ctxCancel context.CancelFunc
ctxLock sync.Mutex
waitGroup sync.WaitGroup
}

func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey string) *WGWatcher {
return &WGWatcher{
log: log,
wgIfaceStater: wgIfaceStater,
peerKey: peerKey,
}
}

// EnableWgWatcher starts the WireGuard watcher. If it is already enabled, it will return immediately and do nothing.
func (w *WGWatcher) EnableWgWatcher(parentCtx context.Context, onDisconnectedFn func()) {
w.log.Debugf("enable WireGuard watcher")
w.ctxLock.Lock()
defer w.ctxLock.Unlock()

if w.ctxCancel != nil {
w.log.Errorf("WireGuard watcher already enabled")
return
}

ctx, ctxCancel := context.WithCancel(parentCtx)
w.ctxCancel = ctxCancel

initialHandshake, err := w.wgState()
if err != nil {
w.log.Warnf("failed to read wg stats: %v", err)
}

w.waitGroup.Add(1)
go w.periodicHandshakeCheck(ctx, w.ctxCancel, onDisconnectedFn, initialHandshake)
}

// DisableWgWatcher stops the WireGuard watcher and wait for the watcher to exit
func (w *WGWatcher) DisableWgWatcher() {
w.ctxLock.Lock()
defer w.ctxLock.Unlock()

if w.ctxCancel == nil {
return
}

w.log.Debugf("disable WireGuard watcher")

w.ctxCancel()
w.ctxCancel = nil
w.waitGroup.Wait()
}

// wgStateCheck help to check the state of the WireGuard handshake and relay connection
func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, ctxCancel context.CancelFunc, onDisconnectedFn func(), initialHandshake time.Time) {
w.log.Debugf("WireGuard watcher started")
defer w.waitGroup.Done()

timer := time.NewTimer(wgHandshakeOvertime)
defer timer.Stop()
defer ctxCancel()

lastHandshake := initialHandshake

for {
select {
case <-timer.C:
handshake, ok := w.handshakeCheck(lastHandshake)
if !ok {
onDisconnectedFn()
return
}
timer.Reset(time.Until(handshake.Add(checkPeriod)))
lastHandshake = *handshake
case <-ctx.Done():
w.log.Debugf("WireGuard watcher stopped")
return
}
}
}

func (w *WGWatcher) wgState() (time.Time, error) {
wgState, err := w.wgIfaceStater.GetStats(w.peerKey)
if err != nil {
return time.Time{}, err
}
return wgState.LastHandshake, nil
}

func (w *WGWatcher) handshakeCheck(lastHandshake time.Time) (*time.Time, bool) {
handshake, err := w.wgState()
if err != nil {
w.log.Errorf("failed to read wg stats: %v", err)
return nil, false
}

w.log.Tracef("previous handshake, handshake: %v, %v", lastHandshake, handshake)

if handshake.Equal(lastHandshake) {
w.log.Infof("WireGuard handshake timed out, closing relay connection: %v", handshake)
return nil, false
}

return &handshake, true
}
Loading
Loading