Skip to content

Commit 57df148

Browse files
authored
Merge pull request kubernetes-sigs#269 from cheftako/agent-log
Instrumenting connection issues on Proxy Server
2 parents daed7d0 + edf4d8f commit 57df148

File tree

5 files changed

+72
-37
lines changed

5 files changed

+72
-37
lines changed

cmd/server/app/options/options.go

+10
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,13 @@ type ProxyRunOptions struct {
7070
// backend within the destCIDR. if it still can't find any backend,
7171
// it will use the default backend manager to choose a random backend.
7272
ProxyStrategies string
73+
74+
// This controls if we attempt to push onto a "full" transfer channel.
75+
// However checking that the transfer channel is full is not safe.
76+
// It violates our race condition checking. Adding locks around a potentially
77+
// blocking call has its own problems, so it cannot easily be made race condition safe.
78+
// The check is an "unlocked" read but is still use at your own peril.
79+
WarnOnChannelLimit bool
7380
}
7481

7582
func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
@@ -100,6 +107,7 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
100107
flags.IntVar(&o.KubeconfigBurst, "kubeconfig-burst", o.KubeconfigBurst, "Maximum client burst (proxy server uses this client to authenticate agent tokens).")
101108
flags.StringVar(&o.AuthenticationAudience, "authentication-audience", o.AuthenticationAudience, "Expected agent's token authentication audience (used with agent-namespace, agent-service-account, kubeconfig).")
102109
flags.StringVar(&o.ProxyStrategies, "proxy-strategies", o.ProxyStrategies, "The list of proxy strategies used by the server to pick a backend/tunnel, available strategies are: default, destHost.")
110+
flags.BoolVar(&o.WarnOnChannelLimit, "warn-on-channel-limit", o.WarnOnChannelLimit, "Turns on a warning if the system is going to push to a full channel. The check involves an unsafe read.")
103111
return flags
104112
}
105113

@@ -130,6 +138,7 @@ func (o *ProxyRunOptions) Print() {
130138
klog.V(1).Infof("KubeconfigQPS set to %f.\n", o.KubeconfigQPS)
131139
klog.V(1).Infof("KubeconfigBurst set to %d.\n", o.KubeconfigBurst)
132140
klog.V(1).Infof("ProxyStrategies set to %q.\n", o.ProxyStrategies)
141+
klog.V(1).Infof("WarnOnChannelLimit set to %t.\n", o.WarnOnChannelLimit)
133142
}
134143

135144
func (o *ProxyRunOptions) Validate() error {
@@ -290,6 +299,7 @@ func NewProxyRunOptions() *ProxyRunOptions {
290299
KubeconfigBurst: 0,
291300
AuthenticationAudience: "",
292301
ProxyStrategies: "default",
302+
WarnOnChannelLimit: false,
293303
}
294304
return &o
295305
}

cmd/server/app/server.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (p *Proxy) run(o *options.ProxyRunOptions) error {
9292
if err != nil {
9393
return err
9494
}
95-
server := server.NewProxyServer(o.ServerID, ps, int(o.ServerCount), authOpt)
95+
server := server.NewProxyServer(o.ServerID, ps, int(o.ServerCount), authOpt, o.WarnOnChannelLimit)
9696

9797
frontendStop, err := p.runFrontendServer(ctx, o, server)
9898
if err != nil {

pkg/server/server.go

+54-31
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ import (
4040
"sigs.k8s.io/apiserver-network-proxy/proto/header"
4141
)
4242

43+
const xfrChannelSize = 10
44+
4345
type key int
4446

4547
type ProxyClientConnection struct {
@@ -136,6 +138,9 @@ type ProxyServer struct {
136138
serverID string // unique ID of this server
137139
serverCount int // Number of proxy server instances, should be 1 unless it is a HA server.
138140

141+
// Allows a special debug flag which warns if we write to a full transfer channel
142+
warnOnChannelLimit bool
143+
139144
// agent authentication
140145
AgentAuthenticationOptions *AgentTokenAuthenticationOptions
141146

@@ -321,7 +326,7 @@ func (s *ProxyServer) getFrontendsForBackendConn(agentID string, backend Backend
321326
}
322327

323328
// NewProxyServer creates a new ProxyServer instance
324-
func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCount int, agentAuthenticationOptions *AgentTokenAuthenticationOptions) *ProxyServer {
329+
func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCount int, agentAuthenticationOptions *AgentTokenAuthenticationOptions, warnOnChannelLimit bool) *ProxyServer {
325330
var bms []BackendManager
326331
for _, ps := range proxyStrategies {
327332
switch ps {
@@ -343,9 +348,10 @@ func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCoun
343348
serverCount: serverCount,
344349
BackendManagers: bms,
345350
AgentAuthenticationOptions: agentAuthenticationOptions,
346-
// use the first backendmanager as the Readiness Manager
347-
Readiness: bms[0],
348-
proxyStrategies: proxyStrategies,
351+
// use the first backend-manager as the Readiness Manager
352+
Readiness: bms[0],
353+
proxyStrategies: proxyStrategies,
354+
warnOnChannelLimit: warnOnChannelLimit,
349355
}
350356
}
351357

@@ -361,12 +367,13 @@ func (s *ProxyServer) Proxy(stream client.ProxyService_ProxyServer) error {
361367
userAgent := md.Get(header.UserAgent)
362368
klog.V(2).InfoS("proxy request from client", "userAgent", userAgent)
363369

364-
recvCh := make(chan *client.Packet, 10)
370+
recvCh := make(chan *client.Packet, xfrChannelSize)
365371
stopCh := make(chan error)
366372

367373
go s.serveRecvFrontend(stream, recvCh)
368374

369375
defer func() {
376+
klog.V(2).InfoS("Receive channel on Proxy is stopping", "userAgent", userAgent, "serverID", s.serverID)
370377
close(recvCh)
371378
}()
372379

@@ -375,6 +382,7 @@ func (s *ProxyServer) Proxy(stream client.ProxyService_ProxyServer) error {
375382
for {
376383
in, err := stream.Recv()
377384
if err == io.EOF {
385+
klog.V(2).InfoS("Stream closed on Proxy", "userAgent", userAgent, "serverID", s.serverID)
378386
close(stopCh)
379387
return
380388
}
@@ -384,6 +392,9 @@ func (s *ProxyServer) Proxy(stream client.ProxyService_ProxyServer) error {
384392
return
385393
}
386394

395+
if s.warnOnChannelLimit && len(recvCh) >= xfrChannelSize {
396+
klog.V(2).InfoS("Receive channel on Proxy is full", "userAgent", userAgent, "serverID", s.serverID)
397+
}
387398
recvCh <- in
388399
}
389400
}()
@@ -410,13 +421,15 @@ func (s *ProxyServer) serveRecvFrontend(stream client.ProxyService_ProxyServer,
410421
// a new connection to the address.
411422
backend, err = s.getBackend(pkt.GetDialRequest().Address)
412423
if err != nil {
413-
klog.ErrorS(err, "Failed to get a backend")
424+
klog.ErrorS(err, "Failed to get a backend", "serverID", s.serverID)
414425

415426
resp := &client.Packet{
416427
Type: client.PacketType_DIAL_RSP,
417428
Payload: &client.Packet_DialResponse{DialResponse: &client.DialResponse{Error: err.Error()}},
418429
}
419-
stream.Send(resp)
430+
if err := stream.Send(resp); err != nil {
431+
klog.V(5).Infoln("Failed to send DIAL_RSP for no backend", "error", err, "serverID", s.serverID)
432+
}
420433
// The Dial is failing; no reason to keep this goroutine.
421434
return
422435
}
@@ -430,29 +443,30 @@ func (s *ProxyServer) serveRecvFrontend(stream client.ProxyService_ProxyServer,
430443
backend: backend,
431444
})
432445
if err := backend.Send(pkt); err != nil {
433-
klog.ErrorS(err, "DIAL_REQ to Backend failed")
446+
klog.ErrorS(err, "DIAL_REQ to Backend failed", "serverID", s.serverID)
434447
}
435448
klog.V(5).Infoln("DIAL_REQ sent to backend") // got this. but backend didn't receive anything.
436449

437450
case client.PacketType_CLOSE_REQ:
438451
connID := pkt.GetCloseRequest().ConnectID
439452
klog.V(5).InfoS("Received CLOSE_REQ", "connectionID", connID)
440453
if backend == nil {
441-
klog.V(2).InfoS("Backend has not been initialized for requested connection. Client should send a Dial Request first", "connectionID", connID)
454+
klog.V(2).InfoS("Backend has not been initialized for requested connection. Client should send a Dial Request first",
455+
"serverID", s.serverID, "connectionID", connID)
442456
continue
443457
}
444458
if err := backend.Send(pkt); err != nil {
445459
// TODO: retry with other backends connecting to this agent.
446-
klog.ErrorS(err, "CLOSE_REQ to Backend failed")
460+
klog.ErrorS(err, "CLOSE_REQ to Backend failed", "serverID", s.serverID, "connectionID", connID)
447461
}
448-
klog.V(5).Infoln("CLOSE_REQ sent to backend")
462+
klog.V(5).Infoln("CLOSE_REQ sent to backend", "serverID", s.serverID, "connectionID", connID)
449463

450464
case client.PacketType_DIAL_CLS:
451465
random := pkt.GetCloseDial().Random
452-
klog.V(5).InfoS("Received DIAL_CLOSE", "random", random)
466+
klog.V(5).InfoS("Received DIAL_CLOSE", "serverID", s.serverID, "random", random)
453467
// Currently not worrying about backend as we do not have an established connection,
454468
s.PendingDial.Remove(random)
455-
klog.V(5).Infoln("Removing pending dial request", "random", random)
469+
klog.V(5).Infoln("Removing pending dial request", "serverID", s.serverID, "random", random)
456470

457471
case client.PacketType_DATA:
458472
connID := pkt.GetData().ConnectID
@@ -470,17 +484,18 @@ func (s *ProxyServer) serveRecvFrontend(stream client.ProxyService_ProxyServer,
470484
}
471485
if err := backend.Send(pkt); err != nil {
472486
// TODO: retry with other backends connecting to this agent.
473-
klog.ErrorS(err, "DATA to Backend failed")
487+
klog.ErrorS(err, "DATA to Backend failed", "serverID", s.serverID, "connectionID", connID)
474488
continue
475489
}
476490
klog.V(5).Infoln("DATA sent to Backend")
477491

478492
default:
479-
klog.V(5).InfoS("Ignore packet coming from frontend", "type", pkt.Type)
493+
klog.V(5).InfoS("Ignore packet coming from frontend",
494+
"type", pkt.Type, "serverID", s.serverID, "connectionID", firstConnID)
480495
}
481496
}
482497

483-
klog.V(5).InfoS("Close streaming", "connectionID", firstConnID)
498+
klog.V(5).InfoS("Close streaming", "serverID", s.serverID, "connectionID", firstConnID)
484499

485500
pkt := &client.Packet{
486501
Type: client.PacketType_CLOSE_REQ,
@@ -496,7 +511,7 @@ func (s *ProxyServer) serveRecvFrontend(stream client.ProxyService_ProxyServer,
496511
return
497512
}
498513
if err := backend.Send(pkt); err != nil {
499-
klog.ErrorS(err, "CLOSE_REQ to Backend failed")
514+
klog.ErrorS(err, "CLOSE_REQ to Backend failed", "serverID", s.serverID)
500515
}
501516
}
502517

@@ -617,24 +632,26 @@ func (s *ProxyServer) Connect(stream agent.AgentService_ConnectServer) error {
617632

618633
if s.AgentAuthenticationOptions.Enabled {
619634
if err := s.authenticateAgentViaToken(stream.Context()); err != nil {
620-
klog.ErrorS(err, "Client authentication failed")
635+
klog.ErrorS(err, "Client authentication failed", "agentID", agentID)
621636
return err
622637
}
623638
}
624639

625640
h := metadata.Pairs(header.ServerID, s.serverID, header.ServerCount, strconv.Itoa(s.serverCount))
626641
if err := stream.SendHeader(h); err != nil {
642+
klog.ErrorS(err, "Failed to send server count back to agent", "agentID", agentID)
627643
return err
628644
}
629645

630646
backend := s.addBackend(agentID, stream)
631647
defer s.removeBackend(agentID, stream)
632648

633-
recvCh := make(chan *client.Packet, 10)
649+
recvCh := make(chan *client.Packet, xfrChannelSize)
634650

635651
go s.serveRecvBackend(backend, stream, agentID, recvCh)
636652

637653
defer func() {
654+
klog.V(2).InfoS("Receive channel on Connect is stopping", "agentID", agentID, "serverID", s.serverID)
638655
close(recvCh)
639656
}()
640657

@@ -643,6 +660,7 @@ func (s *ProxyServer) Connect(stream agent.AgentService_ConnectServer) error {
643660
for {
644661
in, err := stream.Recv()
645662
if err == io.EOF {
663+
klog.V(2).InfoS("Stream closed on Connect", "agentID", agentID, "serverID", s.serverID)
646664
close(stopCh)
647665
return
648666
}
@@ -652,6 +670,9 @@ func (s *ProxyServer) Connect(stream agent.AgentService_ConnectServer) error {
652670
return
653671
}
654672

673+
if s.warnOnChannelLimit && len(recvCh) >= xfrChannelSize {
674+
klog.V(2).InfoS("Receive channel on Connect is full", "agentID", agentID, "serverID", s.serverID)
675+
}
655676
recvCh <- in
656677
}
657678
}()
@@ -666,7 +687,8 @@ func (s *ProxyServer) serveRecvBackend(backend Backend, stream agent.AgentServic
666687
// TODO(#126): Frontends in PendingDial state that have not been added to the
667688
// list of frontends should also be closed.
668689
frontends, _ := s.getFrontendsForBackendConn(agentID, backend)
669-
klog.V(3).InfoS("Close frontends connected to agent", "count", len(frontends), "agentID", agentID)
690+
klog.V(3).InfoS("Close frontends connected to agent",
691+
"serverID", s.serverID, "count", len(frontends), "agentID", agentID)
670692

671693
for _, frontend := range frontends {
672694
s.removeFrontend(agentID, frontend.connectID)
@@ -678,7 +700,7 @@ func (s *ProxyServer) serveRecvBackend(backend Backend, stream agent.AgentServic
678700
}
679701
pkt.GetCloseResponse().ConnectID = frontend.connectID
680702
if err := frontend.send(pkt); err != nil {
681-
klog.ErrorS(err, "CLOSE_RSP to frontend failed")
703+
klog.ErrorS(err, "CLOSE_RSP to frontend failed", "serverID", s.serverID, "agentID", agentID)
682704
}
683705
}
684706
}()
@@ -690,17 +712,18 @@ func (s *ProxyServer) serveRecvBackend(backend Backend, stream agent.AgentServic
690712
klog.V(5).InfoS("Received DIAL_RSP", "random", resp.Random, "agentID", agentID, "connectionID", resp.ConnectID)
691713

692714
if frontend, ok := s.PendingDial.Get(resp.Random); !ok {
693-
klog.V(5).Infoln("DIAL_RSP not recognized; dropped")
715+
klog.V(2).Infoln("DIAL_RSP not recognized; dropped", "random", resp.Random, "agentID", agentID, "connectionID", resp.ConnectID)
694716
} else {
695717
dialErr := false
696718
if resp.Error != "" {
697-
klog.ErrorS(errors.New(resp.Error), "DIAL_RSP contains failure")
719+
klog.ErrorS(errors.New(resp.Error), "DIAL_RSP contains failure", "random", resp.Random, "agentID", agentID, "connectionID", resp.ConnectID)
698720
dialErr = true
699721
}
700722
err := frontend.send(pkt)
701723
s.PendingDial.Remove(resp.Random)
702724
if err != nil {
703-
klog.ErrorS(err, "DIAL_RSP send to frontend stream failure")
725+
klog.ErrorS(err, "DIAL_RSP send to frontend stream failure",
726+
"random", resp.Random, "serverID", s.serverID, "agentID", agentID, "connectionID", resp.ConnectID)
704727
dialErr = true
705728
}
706729
// Avoid adding the frontend if there was an error dialing the destination
@@ -719,35 +742,35 @@ func (s *ProxyServer) serveRecvBackend(backend Backend, stream agent.AgentServic
719742
klog.V(5).InfoS("Received data from agent", "bytes", len(resp.Data), "agentID", agentID, "connectionID", resp.ConnectID)
720743
frontend, err := s.getFrontend(agentID, resp.ConnectID)
721744
if err != nil {
722-
klog.ErrorS(err, "could not get frontend client", "connectionID", resp.ConnectID)
745+
klog.ErrorS(err, "could not get frontend client", "serverID", s.serverID, "agentID", agentID, "connectionID", resp.ConnectID)
723746
break
724747
}
725748
if err := frontend.send(pkt); err != nil {
726-
klog.ErrorS(err, "send to client stream failure")
749+
klog.ErrorS(err, "send to client stream failure", "serverID", s.serverID, "agentID", agentID, "connectionID", resp.ConnectID)
727750
} else {
728751
klog.V(5).InfoS("DATA sent to frontend")
729752
}
730753

731754
case client.PacketType_CLOSE_RSP:
732755
resp := pkt.GetCloseResponse()
733-
klog.V(5).InfoS("Received CLOSE_RSP", "connectionID", resp.ConnectID)
756+
klog.V(5).InfoS("Received CLOSE_RSP", "serverID", s.serverID, "agentID", agentID, "connectionID", resp.ConnectID)
734757
frontend, err := s.getFrontend(agentID, resp.ConnectID)
735758
if err != nil {
736-
klog.ErrorS(err, "could not get frontend client", "connectionID", resp.ConnectID)
759+
klog.ErrorS(err, "could not get frontend client", "serverID", s.serverID, "agentID", agentID, "connectionID", resp.ConnectID)
737760
break
738761
}
739762
if err := frontend.send(pkt); err != nil {
740763
// Normal when frontend closes it.
741-
klog.ErrorS(err, "CLOSE_RSP send to client stream error", "connectionID", resp.ConnectID)
764+
klog.ErrorS(err, "CLOSE_RSP send to client stream error", "serverID", s.serverID, "agentID", agentID, "connectionID", resp.ConnectID)
742765
} else {
743766
klog.V(5).Infoln("CLOSE_RSP sent to frontend", "connectionID", resp.ConnectID)
744767
}
745768
s.removeFrontend(agentID, resp.ConnectID)
746769
klog.V(5).InfoS("Close streaming", "agentID", agentID, "connectionID", resp.ConnectID)
747770

748771
default:
749-
klog.V(2).InfoS("Unrecognized packet", "packet", pkt)
772+
klog.V(2).InfoS("Unrecognized packet", "packet", pkt, "serverID", s.serverID, "agentID", agentID)
750773
}
751774
}
752-
klog.V(5).InfoS("Close backend of agent", "backend", stream, "agentID", agentID)
775+
klog.V(5).InfoS("Close backend of agent", "backend", stream, "serverID", s.serverID, "agentID", agentID)
753776
}

pkg/server/server_test.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,9 @@ func TestAgentTokenAuthenticationErrorsToken(t *testing.T) {
164164
KubernetesClient: kcs,
165165
AgentNamespace: tc.wantNamespace,
166166
AgentServiceAccount: tc.wantServiceAccount,
167-
})
167+
},
168+
false,
169+
)
168170

169171
err := p.Connect(conn)
170172
if tc.wantError {
@@ -187,15 +189,15 @@ func TestAddRemoveFrontends(t *testing.T) {
187189
agent2ConnID2 := new(ProxyClientConnection)
188190
agent3ConnID1 := new(ProxyClientConnection)
189191

190-
p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil)
192+
p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil, false)
191193
p.addFrontend("agent1", int64(1), agent1ConnID1)
192194
p.removeFrontend("agent1", int64(1))
193195
expectedFrontends := make(map[string]map[int64]*ProxyClientConnection)
194196
if e, a := expectedFrontends, p.frontends; !reflect.DeepEqual(e, a) {
195197
t.Errorf("expected %v, got %v", e, a)
196198
}
197199

198-
p = NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil)
200+
p = NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil, false)
199201
p.addFrontend("agent1", int64(1), agent1ConnID1)
200202
p.addFrontend("agent1", int64(2), agent1ConnID2)
201203
p.addFrontend("agent2", int64(1), agent2ConnID1)

tests/proxy_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ func runGRPCProxyServerWithServerCount(serverCount int) (proxy, *server.ProxySer
356356
var err error
357357
var lis, lis2 net.Listener
358358

359-
server := server.NewProxyServer(uuid.New().String(), []server.ProxyStrategy{server.ProxyStrategyDefault}, serverCount, &server.AgentTokenAuthenticationOptions{})
359+
server := server.NewProxyServer(uuid.New().String(), []server.ProxyStrategy{server.ProxyStrategyDefault}, serverCount, &server.AgentTokenAuthenticationOptions{}, false)
360360
grpcServer := grpc.NewServer()
361361
agentServer := grpc.NewServer()
362362
cleanup := func() {
@@ -395,7 +395,7 @@ func runGRPCProxyServerWithServerCount(serverCount int) (proxy, *server.ProxySer
395395
func runHTTPConnProxyServer() (proxy, func(), error) {
396396
ctx := context.Background()
397397
var proxy proxy
398-
s := server.NewProxyServer(uuid.New().String(), []server.ProxyStrategy{server.ProxyStrategyDefault}, 0, &server.AgentTokenAuthenticationOptions{})
398+
s := server.NewProxyServer(uuid.New().String(), []server.ProxyStrategy{server.ProxyStrategyDefault}, 0, &server.AgentTokenAuthenticationOptions{}, false)
399399
agentServer := grpc.NewServer()
400400

401401
agentproto.RegisterAgentServiceServer(agentServer, s)

0 commit comments

Comments
 (0)