Skip to content

Commit edf4d8f

Browse files
committed
Add a flag to "hide" the channel full check race condition.
The check now has to be explicitly turned on. Fixed tests to use warnOnChannelLimit false so they don't trigger the race violation.
1 parent b5e4ac1 commit edf4d8f

File tree

5 files changed

+32
-14
lines changed

5 files changed

+32
-14
lines changed

Diff for: cmd/server/app/options/options.go

+10
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,13 @@ type ProxyRunOptions struct {
7070
// backend within the destCIDR. if it still can't find any backend,
7171
// it will use the default backend manager to choose a random backend.
7272
ProxyStrategies string
73+
74+
// This controls if we attempt to push onto a "full" transfer channel.
75+
// However checking that the transfer channel is full is not safe.
76+
// It violates our race condition checking. Adding locks around a potentially
77+
// blocking call has its own problems, so it cannot easily be made race condition safe.
78+
// The check is an "unlocked" read but is still use at your own peril.
79+
WarnOnChannelLimit bool
7380
}
7481

7582
func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
@@ -100,6 +107,7 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
100107
flags.IntVar(&o.KubeconfigBurst, "kubeconfig-burst", o.KubeconfigBurst, "Maximum client burst (proxy server uses this client to authenticate agent tokens).")
101108
flags.StringVar(&o.AuthenticationAudience, "authentication-audience", o.AuthenticationAudience, "Expected agent's token authentication audience (used with agent-namespace, agent-service-account, kubeconfig).")
102109
flags.StringVar(&o.ProxyStrategies, "proxy-strategies", o.ProxyStrategies, "The list of proxy strategies used by the server to pick a backend/tunnel, available strategies are: default, destHost.")
110+
flags.BoolVar(&o.WarnOnChannelLimit, "warn-on-channel-limit", o.WarnOnChannelLimit, "Turns on a warning if the system is going to push to a full channel. The check involves an unsafe read.")
103111
return flags
104112
}
105113

@@ -130,6 +138,7 @@ func (o *ProxyRunOptions) Print() {
130138
klog.V(1).Infof("KubeconfigQPS set to %f.\n", o.KubeconfigQPS)
131139
klog.V(1).Infof("KubeconfigBurst set to %d.\n", o.KubeconfigBurst)
132140
klog.V(1).Infof("ProxyStrategies set to %q.\n", o.ProxyStrategies)
141+
klog.V(1).Infof("WarnOnChannelLimit set to %t.\n", o.WarnOnChannelLimit)
133142
}
134143

135144
func (o *ProxyRunOptions) Validate() error {
@@ -290,6 +299,7 @@ func NewProxyRunOptions() *ProxyRunOptions {
290299
KubeconfigBurst: 0,
291300
AuthenticationAudience: "",
292301
ProxyStrategies: "default",
302+
WarnOnChannelLimit: false,
293303
}
294304
return &o
295305
}

Diff for: cmd/server/app/server.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (p *Proxy) run(o *options.ProxyRunOptions) error {
9292
if err != nil {
9393
return err
9494
}
95-
server := server.NewProxyServer(o.ServerID, ps, int(o.ServerCount), authOpt)
95+
server := server.NewProxyServer(o.ServerID, ps, int(o.ServerCount), authOpt, o.WarnOnChannelLimit)
9696

9797
frontendStop, err := p.runFrontendServer(ctx, o, server)
9898
if err != nil {

Diff for: pkg/server/server.go

+14-8
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ import (
4040
"sigs.k8s.io/apiserver-network-proxy/proto/header"
4141
)
4242

43+
const xfrChannelSize = 10
44+
4345
type key int
4446

4547
type ProxyClientConnection struct {
@@ -136,6 +138,9 @@ type ProxyServer struct {
136138
serverID string // unique ID of this server
137139
serverCount int // Number of proxy server instances, should be 1 unless it is a HA server.
138140

141+
// Allows a special debug flag which warns if we write to a full transfer channel
142+
warnOnChannelLimit bool
143+
139144
// agent authentication
140145
AgentAuthenticationOptions *AgentTokenAuthenticationOptions
141146

@@ -321,7 +326,7 @@ func (s *ProxyServer) getFrontendsForBackendConn(agentID string, backend Backend
321326
}
322327

323328
// NewProxyServer creates a new ProxyServer instance
324-
func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCount int, agentAuthenticationOptions *AgentTokenAuthenticationOptions) *ProxyServer {
329+
func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCount int, agentAuthenticationOptions *AgentTokenAuthenticationOptions, warnOnChannelLimit bool) *ProxyServer {
325330
var bms []BackendManager
326331
for _, ps := range proxyStrategies {
327332
switch ps {
@@ -343,9 +348,10 @@ func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCoun
343348
serverCount: serverCount,
344349
BackendManagers: bms,
345350
AgentAuthenticationOptions: agentAuthenticationOptions,
346-
// use the first backendmanager as the Readiness Manager
347-
Readiness: bms[0],
348-
proxyStrategies: proxyStrategies,
351+
// use the first backend-manager as the Readiness Manager
352+
Readiness: bms[0],
353+
proxyStrategies: proxyStrategies,
354+
warnOnChannelLimit: warnOnChannelLimit,
349355
}
350356
}
351357

@@ -361,7 +367,7 @@ func (s *ProxyServer) Proxy(stream client.ProxyService_ProxyServer) error {
361367
userAgent := md.Get(header.UserAgent)
362368
klog.V(2).InfoS("proxy request from client", "userAgent", userAgent)
363369

364-
recvCh := make(chan *client.Packet, 10)
370+
recvCh := make(chan *client.Packet, xfrChannelSize)
365371
stopCh := make(chan error)
366372

367373
go s.serveRecvFrontend(stream, recvCh)
@@ -386,7 +392,7 @@ func (s *ProxyServer) Proxy(stream client.ProxyService_ProxyServer) error {
386392
return
387393
}
388394

389-
if len(recvCh) > 9 {
395+
if s.warnOnChannelLimit && len(recvCh) >= xfrChannelSize {
390396
klog.V(2).InfoS("Receive channel on Proxy is full", "userAgent", userAgent, "serverID", s.serverID)
391397
}
392398
recvCh <- in
@@ -640,7 +646,7 @@ func (s *ProxyServer) Connect(stream agent.AgentService_ConnectServer) error {
640646
backend := s.addBackend(agentID, stream)
641647
defer s.removeBackend(agentID, stream)
642648

643-
recvCh := make(chan *client.Packet, 10)
649+
recvCh := make(chan *client.Packet, xfrChannelSize)
644650

645651
go s.serveRecvBackend(backend, stream, agentID, recvCh)
646652

@@ -664,7 +670,7 @@ func (s *ProxyServer) Connect(stream agent.AgentService_ConnectServer) error {
664670
return
665671
}
666672

667-
if len(recvCh) > 9 {
673+
if s.warnOnChannelLimit && len(recvCh) >= xfrChannelSize {
668674
klog.V(2).InfoS("Receive channel on Connect is full", "agentID", agentID, "serverID", s.serverID)
669675
}
670676
recvCh <- in

Diff for: pkg/server/server_test.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,9 @@ func TestAgentTokenAuthenticationErrorsToken(t *testing.T) {
164164
KubernetesClient: kcs,
165165
AgentNamespace: tc.wantNamespace,
166166
AgentServiceAccount: tc.wantServiceAccount,
167-
})
167+
},
168+
false,
169+
)
168170

169171
err := p.Connect(conn)
170172
if tc.wantError {
@@ -187,15 +189,15 @@ func TestAddRemoveFrontends(t *testing.T) {
187189
agent2ConnID2 := new(ProxyClientConnection)
188190
agent3ConnID1 := new(ProxyClientConnection)
189191

190-
p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil)
192+
p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil, false)
191193
p.addFrontend("agent1", int64(1), agent1ConnID1)
192194
p.removeFrontend("agent1", int64(1))
193195
expectedFrontends := make(map[string]map[int64]*ProxyClientConnection)
194196
if e, a := expectedFrontends, p.frontends; !reflect.DeepEqual(e, a) {
195197
t.Errorf("expected %v, got %v", e, a)
196198
}
197199

198-
p = NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil)
200+
p = NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil, false)
199201
p.addFrontend("agent1", int64(1), agent1ConnID1)
200202
p.addFrontend("agent1", int64(2), agent1ConnID2)
201203
p.addFrontend("agent2", int64(1), agent2ConnID1)

Diff for: tests/proxy_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ func runGRPCProxyServerWithServerCount(serverCount int) (proxy, *server.ProxySer
356356
var err error
357357
var lis, lis2 net.Listener
358358

359-
server := server.NewProxyServer(uuid.New().String(), []server.ProxyStrategy{server.ProxyStrategyDefault}, serverCount, &server.AgentTokenAuthenticationOptions{})
359+
server := server.NewProxyServer(uuid.New().String(), []server.ProxyStrategy{server.ProxyStrategyDefault}, serverCount, &server.AgentTokenAuthenticationOptions{}, false)
360360
grpcServer := grpc.NewServer()
361361
agentServer := grpc.NewServer()
362362
cleanup := func() {
@@ -395,7 +395,7 @@ func runGRPCProxyServerWithServerCount(serverCount int) (proxy, *server.ProxySer
395395
func runHTTPConnProxyServer() (proxy, func(), error) {
396396
ctx := context.Background()
397397
var proxy proxy
398-
s := server.NewProxyServer(uuid.New().String(), []server.ProxyStrategy{server.ProxyStrategyDefault}, 0, &server.AgentTokenAuthenticationOptions{})
398+
s := server.NewProxyServer(uuid.New().String(), []server.ProxyStrategy{server.ProxyStrategyDefault}, 0, &server.AgentTokenAuthenticationOptions{}, false)
399399
agentServer := grpc.NewServer()
400400

401401
agentproto.RegisterAgentServiceServer(agentServer, s)

0 commit comments

Comments
 (0)