Skip to content

Commit fde6534

Browse files
authored
Merge pull request #329 from txn2/fix/pod-reconciliation
Pod Reconciliation Fixes & Production Resilience
2 parents e8718b0 + 63aa169 commit fde6534

7 files changed

Lines changed: 397 additions & 54 deletions

File tree

pkg/fwdmetrics/stream.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package fwdmetrics
22

33
import (
4+
"fmt"
45
"net/http"
56

7+
log "github.com/sirupsen/logrus"
68
"k8s.io/apimachinery/pkg/util/httpstream"
79
)
810

@@ -30,6 +32,17 @@ func NewMetricsStream(stream httpstream.Stream, metrics *PortForwardMetrics) *Me
3032

3133
// Read reads data from the stream and tracks bytes received
3234
func (ms *MetricsStream) Read(p []byte) (n int, err error) {
35+
// Panic recovery to prevent metrics/sniffing issues from crashing the stream
36+
defer func() {
37+
if r := recover(); r != nil {
38+
log.Errorf("MetricsStream Read panic recovered: %v", r)
39+
// If we panicked after reading, return what we read with an error
40+
if n == 0 {
41+
err = fmt.Errorf("metrics stream panic: %v", r)
42+
}
43+
}
44+
}()
45+
3346
n, err = ms.stream.Read(p)
3447
if n > 0 && ms.metrics != nil {
3548
ms.metrics.AddBytesIn(uint64(n))
@@ -43,6 +56,17 @@ func (ms *MetricsStream) Read(p []byte) (n int, err error) {
4356

4457
// Write writes data to the stream and tracks bytes sent
4558
func (ms *MetricsStream) Write(p []byte) (n int, err error) {
59+
// Panic recovery to prevent metrics/sniffing issues from crashing the stream
60+
defer func() {
61+
if r := recover(); r != nil {
62+
log.Errorf("MetricsStream Write panic recovered: %v", r)
63+
// If we panicked after writing, return what we wrote with an error
64+
if n == 0 {
65+
err = fmt.Errorf("metrics stream panic: %v", r)
66+
}
67+
}
68+
}()
69+
4670
n, err = ms.stream.Write(p)
4771
if n > 0 && ms.metrics != nil {
4872
ms.metrics.AddBytesOut(uint64(n))

pkg/fwdport/fwdport.go

Lines changed: 79 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import (
3636
// GlobalPodInformer manages a single informer for all pods across all services
3737
type GlobalPodInformer struct {
3838
mu sync.RWMutex
39-
activePods map[types.UID]*PortForwardOpts
39+
activePods map[types.UID][]*PortForwardOpts // Multiple pfos per pod UID (same pod can be forwarded by multiple services)
4040
clientSet kubernetes.Interface
4141
informers map[string]informers.SharedInformerFactory
4242
stopChannel chan struct{}
@@ -49,7 +49,7 @@ var globalPodInformerOnce sync.Once
4949
func GetGlobalPodInformer(clientSet kubernetes.Interface, namespace string) *GlobalPodInformer {
5050
globalPodInformerOnce.Do(func() {
5151
globalPodInformer = &GlobalPodInformer{
52-
activePods: make(map[types.UID]*PortForwardOpts),
52+
activePods: make(map[types.UID][]*PortForwardOpts),
5353
clientSet: clientSet,
5454
informers: make(map[string]informers.SharedInformerFactory),
5555
stopChannel: make(chan struct{}),
@@ -93,15 +93,26 @@ func (gpi *GlobalPodInformer) startInformer(namespace string) cache.InformerSync
9393
return
9494
}
9595

96-
pfo, exists := gpi.getPod(oldPod.UID)
96+
pfos, exists := gpi.getPods(oldPod.UID)
9797
if !exists {
9898
return
9999
}
100100

101101
if newPod.DeletionTimestamp != nil {
102-
log.Warnf("Pod %s marked for deletion, resyncing the %s service pods.", oldPod.Name, pfo.ServiceFwd)
103-
pfo.Stop()
104-
pfo.ServiceFwd.SyncPodForwards(false)
102+
// Track which services we've already synced to avoid duplicate syncs
103+
syncedServices := make(map[string]bool)
104+
for _, pfo := range pfos {
105+
svcKey := pfo.ServiceFwd.String()
106+
if !syncedServices[svcKey] {
107+
log.Warnf("Pod %s marked for deletion, resyncing the %s service pods.", oldPod.Name, pfo.ServiceFwd)
108+
pfo.Stop()
109+
pfo.ServiceFwd.SyncPodForwards(false)
110+
syncedServices[svcKey] = true
111+
} else {
112+
// Still need to stop this pfo even if service already synced
113+
pfo.Stop()
114+
}
115+
}
105116
gpi.removePod(oldPod.UID)
106117
}
107118
},
@@ -113,16 +124,27 @@ func (gpi *GlobalPodInformer) startInformer(namespace string) cache.InformerSync
113124
return
114125
}
115126

116-
pfo, exists := gpi.getPod(deletedPod.UID)
127+
pfos, exists := gpi.getPods(deletedPod.UID)
117128
if !exists {
118129
return
119130
}
120131

121-
log.Warnf("Pod %s deleted, resyncing the %s service pods.", deletedPod.Name, pfo.ServiceFwd)
122-
pfo.Stop()
123-
pfo.ServiceFwd.SyncPodForwards(false)
132+
// Track which services we've already synced to avoid duplicate syncs
133+
syncedServices := make(map[string]bool)
134+
for _, pfo := range pfos {
135+
svcKey := pfo.ServiceFwd.String()
136+
if !syncedServices[svcKey] {
137+
log.Warnf("Pod %s deleted, resyncing the %s service pods.", deletedPod.Name, pfo.ServiceFwd)
138+
pfo.Stop()
139+
pfo.ServiceFwd.SyncPodForwards(false)
140+
syncedServices[svcKey] = true
141+
log.Debugf("After pod %s was deleted, the %s service pods have been resynced.", deletedPod.Name, pfo.ServiceFwd)
142+
} else {
143+
// Still need to stop this pfo even if service already synced
144+
pfo.Stop()
145+
}
146+
}
124147
gpi.removePod(deletedPod.UID)
125-
log.Debugf("After pod %s was deleted, the %s service pods have been resynced.", deletedPod.Name, pfo.ServiceFwd)
126148
},
127149
},
128150
)
@@ -135,35 +157,64 @@ func (gpi *GlobalPodInformer) startInformer(namespace string) cache.InformerSync
135157
return podInformer.Informer().HasSynced
136158
}
137159

138-
// addPod adds a pod to the active pods map
160+
// addPod adds a pod forward to the active pods map
161+
// Multiple port forwards can exist for the same pod UID (e.g., headless + non-headless services)
139162
func (gpi *GlobalPodInformer) addPod(pod *v1.Pod, pfo *PortForwardOpts) {
140163
gpi.mu.Lock()
141164
defer gpi.mu.Unlock()
142-
gpi.activePods[pod.UID] = pfo
143-
log.Debugf("Added pod %s (UID: %s) to global informer", pod.Name, pod.UID)
165+
gpi.activePods[pod.UID] = append(gpi.activePods[pod.UID], pfo)
166+
log.Debugf("Added pod %s (UID: %s) to global informer (total forwards for this pod: %d)", pod.Name, pod.UID, len(gpi.activePods[pod.UID]))
144167
}
145168

146-
// getPod retrieves a pod from the active pods map
147-
func (gpi *GlobalPodInformer) getPod(podUID types.UID) (*PortForwardOpts, bool) {
169+
// getPods retrieves all port forwards for a pod UID
170+
func (gpi *GlobalPodInformer) getPods(podUID types.UID) ([]*PortForwardOpts, bool) {
148171
gpi.mu.RLock()
149172
defer gpi.mu.RUnlock()
150-
pfo, exists := gpi.activePods[podUID]
151-
return pfo, exists
173+
pfos, exists := gpi.activePods[podUID]
174+
return pfos, exists && len(pfos) > 0
152175
}
153176

154-
// removePod removes a pod from the active pods map
177+
// removePod removes all port forwards for a pod UID from the active pods map
155178
func (gpi *GlobalPodInformer) removePod(podUID types.UID) {
156179
gpi.mu.Lock()
157180
defer gpi.mu.Unlock()
181+
count := len(gpi.activePods[podUID])
158182
delete(gpi.activePods, podUID)
159-
log.Debugf("Removed pod (UID: %s) from global informer", podUID)
183+
log.Debugf("Removed all %d forwards for pod (UID: %s) from global informer", count, podUID)
160184
}
161185

162-
// RemovePodByUID removes a pod from the global informer by UID
186+
// removePfo removes a specific port forward from the active pods map
187+
func (gpi *GlobalPodInformer) removePfo(pfo *PortForwardOpts) {
188+
if pfo.PodUID == "" {
189+
return
190+
}
191+
gpi.mu.Lock()
192+
defer gpi.mu.Unlock()
193+
pfos := gpi.activePods[pfo.PodUID]
194+
for i, p := range pfos {
195+
if p == pfo {
196+
// Remove this pfo from the slice
197+
gpi.activePods[pfo.PodUID] = append(pfos[:i], pfos[i+1:]...)
198+
break
199+
}
200+
}
201+
// If no more forwards for this pod, remove the entry entirely
202+
if len(gpi.activePods[pfo.PodUID]) == 0 {
203+
delete(gpi.activePods, pfo.PodUID)
204+
}
205+
log.Debugf("Removed specific forward for pod (UID: %s) from global informer", pfo.PodUID)
206+
}
207+
208+
// RemovePodByUID removes a pod from the global informer by UID (removes all forwards)
163209
func (gpi *GlobalPodInformer) RemovePodByUID(podUID types.UID) {
164210
gpi.removePod(podUID)
165211
}
166212

213+
// RemovePfo removes a specific port forward from the global informer
214+
func (gpi *GlobalPodInformer) RemovePfo(pfo *PortForwardOpts) {
215+
gpi.removePfo(pfo)
216+
}
217+
167218
// Stop stops the global pod informer
168219
func (gpi *GlobalPodInformer) Stop() {
169220
select {
@@ -263,7 +314,13 @@ type pingingDialer struct {
263314
}
264315

265316
func (p pingingDialer) stopPing() {
266-
p.pingStopChan <- struct{}{}
317+
select {
318+
case p.pingStopChan <- struct{}{}:
319+
// Signal sent successfully
320+
case <-time.After(100 * time.Millisecond):
321+
// Timeout - ping goroutine is blocked or dead, continue cleanup anyway
322+
log.Debugf("Ping stop signal timed out for %s, continuing cleanup", p.pingTargetPodName)
323+
}
267324
}
268325

269326
func (p pingingDialer) Dial(protocols ...string) (httpstream.Connection, string, error) {

0 commit comments

Comments
 (0)