Skip to content

Commit 0354524

Browse files
committed
Split felix server into watcher/handler
This patch splits the felix server in two pieces: - a felix watcher placed under `agent/watchers/felix` - a felix server placed under `agent/felix` The former will have only the responsibility of watching and submitting events into a single event queue. The latter will receive the event in a single goroutine and proceed to program VPP as a single thred. The intent is to move away from a model with multiple servers replicating state and communicating over a pubsub. This being prone to race conditions, deadlocks, and not providing many benefits as scale & asynchronicity will not be a constraint on nodes with relatively small number of pods (~100) as is k8s default. Signed-off-by: Nathan Skrzypczak <[email protected]>
1 parent b400da8 commit 0354524

33 files changed

+2567
-2275
lines changed

calico-vpp-agent/cmd/calico_vpp_dataplane.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,11 +147,9 @@ func main() {
147147
serviceServer := services.NewServiceServer(vpp, k8sclient, log.WithFields(logrus.Fields{"component": "services"}))
148148
prometheusServer := prometheus.NewPrometheusServer(vpp, log.WithFields(logrus.Fields{"component": "prometheus"}))
149149
localSIDWatcher := watchers.NewLocalSIDWatcher(vpp, clientv3, log.WithFields(logrus.Fields{"subcomponent": "localsid-watcher"}))
150-
felixServer, err := felix.NewFelixServer(vpp, log.WithFields(logrus.Fields{"component": "policy"}))
151-
if err != nil {
152-
log.Fatalf("Failed to create policy server %s", err)
153-
}
154-
err = felix.InstallFelixPlugin()
150+
felixServer := felix.NewFelixServer(vpp, clientv3, log.WithFields(logrus.Fields{"component": "policy"}))
151+
felixWatcher := watchers.NewFelixWatcher(felixServer.GetFelixServerEventChan(), log.WithFields(logrus.Fields{"component": "felix watcher"}))
152+
err = watchers.InstallFelixPlugin()
155153
if err != nil {
156154
log.Fatalf("could not install felix plugin: %s", err)
157155
}
@@ -168,10 +166,12 @@ func main() {
168166
peerWatcher.SetBGPConf(bgpConf)
169167
routingServer.SetBGPConf(bgpConf)
170168
serviceServer.SetBGPConf(bgpConf)
169+
felixServer.SetBGPConf(bgpConf)
171170

172171
watchDog := watchdog.NewWatchDog(log.WithFields(logrus.Fields{"component": "watchDog"}), &t)
173172
Go(felixServer.ServeFelix)
174173
felixConfig := watchDog.Wait(felixServer.FelixConfigChan, "Waiting for FelixConfig to be provided by the calico pod")
174+
Go(felixWatcher.WatchFelix)
175175
ourBGPSpec := watchDog.Wait(felixServer.GotOurNodeBGPchan, "Waiting for bgp spec to be provided on node add")
176176
// check if the watchDog timer has issued the t.Kill() which would mean we are dead
177177
if !t.Alive() {

calico-vpp-agent/cni/cni_pod_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import (
3535
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common"
3636
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/tests/mocks"
3737
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/testutils"
38-
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/watchers"
3938
"github.com/projectcalico/vpp-dataplane/v3/config"
4039
"github.com/projectcalico/vpp-dataplane/v3/vpplink"
4140
"github.com/projectcalico/vpp-dataplane/v3/vpplink/types"
@@ -323,7 +322,7 @@ var _ = Describe("Pod-related functionality of CNI", func() {
323322

324323
Context("With MultiNet configuration (and multinet VRF and loopback already configured)", func() {
325324
var (
326-
networkDefinition *watchers.NetworkDefinition
325+
networkDefinition *common.NetworkDefinition
327326
pubSubHandlerMock *mocks.PubSubHandlerMock
328327
)
329328

@@ -355,9 +354,9 @@ var _ = Describe("Pod-related functionality of CNI", func() {
355354
}
356355
// NetworkDefinition CRD information caught by NetWatcher and send with additional information
357356
// (VRF and loopback created by watcher) to the cni server as common.NetAdded CalicoVPPEvent
358-
networkDefinition = &watchers.NetworkDefinition{
359-
VRF: watchers.VRF{Tables: tables},
360-
PodVRF: watchers.VRF{Tables: podTables},
357+
networkDefinition = &common.NetworkDefinition{
358+
VRF: common.VRF{Tables: tables},
359+
PodVRF: common.VRF{Tables: podTables},
361360
Vni: uint32(0), // important only for VXLAN tunnel going out of node
362361
Name: networkName,
363362
Range: "10.1.1.0/24", // IP range for secondary network defined by multinet

calico-vpp-agent/cni/cni_server.go

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ import (
3636
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/cni/model"
3737
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/cni/podinterface"
3838
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common"
39-
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/watchers"
4039
"github.com/projectcalico/vpp-dataplane/v3/config"
4140
"github.com/projectcalico/vpp-dataplane/v3/vpplink"
4241
"github.com/projectcalico/vpp-dataplane/v3/vpplink/types"
@@ -53,7 +52,7 @@ type Server struct {
5352

5453
podInterfaceMap map[string]model.LocalPodSpec
5554
lock sync.Mutex /* protects Add/DelVppInterace/RescanState */
56-
cniEventChan chan common.CalicoVppEvent
55+
cniEventChan chan any
5756

5857
memifDriver *podinterface.MemifPodInterfaceDriver
5958
tuntapDriver *podinterface.TunTapPodInterfaceDriver
@@ -65,7 +64,7 @@ type Server struct {
6564
RedirectToHostClassifyTableIndex uint32
6665

6766
networkDefinitions sync.Map
68-
cniMultinetEventChan chan common.CalicoVppEvent
67+
cniMultinetEventChan chan any
6968
nodeBGPSpec *common.LocalNodeSpec
7069
}
7170

@@ -96,9 +95,9 @@ func (s *Server) Add(ctx context.Context, request *cniproto.AddRequest) (*cnipro
9695
if !ok {
9796
return nil, fmt.Errorf("trying to create a pod in an unexisting network %s", podSpec.NetworkName)
9897
} else {
99-
networkDefinition, ok := value.(*watchers.NetworkDefinition)
98+
networkDefinition, ok := value.(*common.NetworkDefinition)
10099
if !ok || networkDefinition == nil {
101-
panic("Value is not of type *watchers.NetworkDefinition")
100+
panic("Value is not of type *common.NetworkDefinition")
102101
}
103102
_, route, err := net.ParseCIDR(networkDefinition.Range)
104103
if err == nil {
@@ -283,7 +282,7 @@ func NewCNIServer(vpp *vpplink.VppLink, felixServerIpam common.FelixServerIpam,
283282
log: log,
284283

285284
felixServerIpam: felixServerIpam,
286-
cniEventChan: make(chan common.CalicoVppEvent, common.ChanSize),
285+
cniEventChan: make(chan any, common.ChanSize),
287286

288287
grpcServer: grpc.NewServer(),
289288
podInterfaceMap: make(map[string]model.LocalPodSpec),
@@ -292,7 +291,7 @@ func NewCNIServer(vpp *vpplink.VppLink, felixServerIpam common.FelixServerIpam,
292291
vclDriver: podinterface.NewVclPodInterfaceDriver(vpp, log),
293292
loopbackDriver: podinterface.NewLoopbackPodInterfaceDriver(vpp, log),
294293

295-
cniMultinetEventChan: make(chan common.CalicoVppEvent, common.ChanSize),
294+
cniMultinetEventChan: make(chan any, common.ChanSize),
296295
}
297296
reg := common.RegisterHandler(server.cniEventChan, "CNI server events")
298297
reg.ExpectEvents(
@@ -313,7 +312,11 @@ forloop:
313312
select {
314313
case <-t.Dying():
315314
break forloop
316-
case evt := <-s.cniEventChan:
315+
case msg := <-s.cniEventChan:
316+
evt, ok := msg.(common.CalicoVppEvent)
317+
if !ok {
318+
continue
319+
}
317320
switch evt.Type {
318321
case common.FelixConfChanged:
319322
if new, _ := evt.New.(*felixConfig.Config); new != nil {
@@ -434,21 +437,25 @@ func (s *Server) ServeCNI(t *tomb.Tomb) error {
434437
case <-t.Dying():
435438
s.log.Warn("Cni server asked to exit")
436439
return
437-
case event := <-s.cniMultinetEventChan:
440+
case msg := <-s.cniMultinetEventChan:
441+
event, ok := msg.(common.CalicoVppEvent)
442+
if !ok {
443+
continue
444+
}
438445
switch event.Type {
439446
case common.NetsSynced:
440447
netsSynced <- true
441448
case common.NetAddedOrUpdated:
442-
netDef, ok := event.New.(*watchers.NetworkDefinition)
449+
netDef, ok := event.New.(*common.NetworkDefinition)
443450
if !ok {
444-
s.log.Errorf("event.New is not a *watchers.NetworkDefinition %v", event.New)
451+
s.log.Errorf("event.New is not a *common.NetworkDefinition %v", event.New)
445452
continue
446453
}
447454
s.networkDefinitions.Store(netDef.Name, netDef)
448455
case common.NetDeleted:
449-
netDef, ok := event.Old.(*watchers.NetworkDefinition)
456+
netDef, ok := event.Old.(*common.NetworkDefinition)
450457
if !ok {
451-
s.log.Errorf("event.Old is not a *watchers.NetworkDefinition %v", event.Old)
458+
s.log.Errorf("event.Old is not a *common.NetworkDefinition %v", event.Old)
452459
continue
453460
}
454461
s.networkDefinitions.Delete(netDef.Name)
@@ -488,6 +495,6 @@ func (s *Server) ServeCNI(t *tomb.Tomb) error {
488495

489496
// ForceAddingNetworkDefinition will add another NetworkDefinition to this CNI server.
490497
// The usage is mainly for testing purposes.
491-
func (s *Server) ForceAddingNetworkDefinition(networkDefinition *watchers.NetworkDefinition) {
498+
func (s *Server) ForceAddingNetworkDefinition(networkDefinition *common.NetworkDefinition) {
492499
s.networkDefinitions.Store(networkDefinition.Name, networkDefinition)
493500
}

calico-vpp-agent/cni/network_vpp.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424

2525
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/cni/model"
2626
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common"
27-
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/watchers"
2827
"github.com/projectcalico/vpp-dataplane/v3/config"
2928
"github.com/projectcalico/vpp-dataplane/v3/vpplink"
3029
"github.com/projectcalico/vpp-dataplane/v3/vpplink/types"
@@ -252,9 +251,9 @@ func (s *Server) AddVppInterface(podSpec *model.LocalPodSpec, doHostSideConf boo
252251
if !ok {
253252
s.log.Errorf("network not found %s", podSpec.NetworkName)
254253
} else {
255-
networkDefinition, ok := value.(*watchers.NetworkDefinition)
254+
networkDefinition, ok := value.(*common.NetworkDefinition)
256255
if !ok || networkDefinition == nil {
257-
panic("networkDefinition not of type *watchers.NetworkDefinition")
256+
panic("networkDefinition not of type *common.NetworkDefinition")
258257
}
259258
vni = networkDefinition.Vni
260259
}
@@ -324,9 +323,9 @@ func (s *Server) DelVppInterface(podSpec *model.LocalPodSpec) {
324323
if !ok {
325324
deleteLocalPodAddress = false
326325
} else {
327-
networkDefinition, ok := value.(*watchers.NetworkDefinition)
326+
networkDefinition, ok := value.(*common.NetworkDefinition)
328327
if !ok || networkDefinition == nil {
329-
panic("networkDefinition not of type *watchers.NetworkDefinition")
328+
panic("networkDefinition not of type *common.NetworkDefinition")
330329
}
331330
vni = networkDefinition.Vni
332331
}

calico-vpp-agent/cni/network_vpp_routes.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020

2121
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/cni/model"
2222
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common"
23-
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/watchers"
2423
"github.com/projectcalico/vpp-dataplane/v3/vpplink"
2524
"github.com/projectcalico/vpp-dataplane/v3/vpplink/types"
2625
)
@@ -37,9 +36,9 @@ func (s *Server) RoutePodInterface(podSpec *model.LocalPodSpec, stack *vpplink.C
3736
if !ok {
3837
s.log.Errorf("network not found %s", podSpec.NetworkName)
3938
} else {
40-
networkDefinition, ok := value.(*watchers.NetworkDefinition)
39+
networkDefinition, ok := value.(*common.NetworkDefinition)
4140
if !ok || networkDefinition == nil {
42-
panic("networkDefinition not of type *watchers.NetworkDefinition")
41+
panic("networkDefinition not of type *common.NetworkDefinition")
4342
}
4443
table = networkDefinition.VRF.Tables[idx]
4544
}
@@ -88,9 +87,9 @@ func (s *Server) UnroutePodInterface(podSpec *model.LocalPodSpec, swIfIndex uint
8887
if !ok {
8988
s.log.Errorf("network not found %s", podSpec.NetworkName)
9089
} else {
91-
networkDefinition, ok := value.(*watchers.NetworkDefinition)
90+
networkDefinition, ok := value.(*common.NetworkDefinition)
9291
if !ok || networkDefinition == nil {
93-
panic("networkDefinition not of type *watchers.NetworkDefinition")
92+
panic("networkDefinition not of type *common.NetworkDefinition")
9493
}
9594
table = networkDefinition.VRF.Tables[idx]
9695
}
@@ -215,9 +214,9 @@ func (s *Server) CreatePodVRF(podSpec *model.LocalPodSpec, stack *vpplink.Cleanu
215214
if !ok {
216215
return errors.Errorf("network not found %s", podSpec.NetworkName)
217216
}
218-
networkDefinition, ok := value.(*watchers.NetworkDefinition)
217+
networkDefinition, ok := value.(*common.NetworkDefinition)
219218
if !ok || networkDefinition == nil {
220-
panic("networkDefinition not of type *watchers.NetworkDefinition")
219+
panic("networkDefinition not of type *common.NetworkDefinition")
221220
}
222221
vrfIndex = networkDefinition.PodVRF.Tables[idx]
223222
}
@@ -375,9 +374,9 @@ func (s *Server) DeletePodVRF(podSpec *model.LocalPodSpec) {
375374
if !ok {
376375
s.log.Errorf("network not found %s", podSpec.NetworkName)
377376
} else {
378-
networkDefinition, ok := value.(*watchers.NetworkDefinition)
377+
networkDefinition, ok := value.(*common.NetworkDefinition)
379378
if !ok || networkDefinition == nil {
380-
panic("networkDefinition not of type *watchers.NetworkDefinition")
379+
panic("networkDefinition not of type *common.NetworkDefinition")
381380
}
382381
vrfIndex = networkDefinition.PodVRF.Tables[idx]
383382
}

calico-vpp-agent/common/pubsub.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -85,31 +85,27 @@ type PubSubHandlerRegistration struct {
8585
/* Name for the registration, for logging & debugging */
8686
name string
8787
/* Channel where to send events */
88-
channel chan CalicoVppEvent
88+
channel chan any
8989
/* Receive only these events. If empty we'll receive all */
9090
expectedEvents map[CalicoVppEventType]bool
91-
/* Receive all events */
92-
expectAllEvents bool
9391
}
9492

9593
func (reg *PubSubHandlerRegistration) ExpectEvents(eventTypes ...CalicoVppEventType) {
9694
for _, eventType := range eventTypes {
9795
reg.expectedEvents[eventType] = true
9896
}
99-
reg.expectAllEvents = false
10097
}
10198

10299
type PubSub struct {
103100
log *log.Entry
104101
pubSubHandlerRegistrations []*PubSubHandlerRegistration
105102
}
106103

107-
func RegisterHandler(channel chan CalicoVppEvent, name string) *PubSubHandlerRegistration {
104+
func RegisterHandler(channel chan any, name string) *PubSubHandlerRegistration {
108105
reg := &PubSubHandlerRegistration{
109-
channel: channel,
110-
name: name,
111-
expectedEvents: make(map[CalicoVppEventType]bool),
112-
expectAllEvents: true, /* By default receive everything, unless we ask for a filter */
106+
channel: channel,
107+
name: name,
108+
expectedEvents: make(map[CalicoVppEventType]bool),
113109
}
114110
ThePubSub.pubSubHandlerRegistrations = append(ThePubSub.pubSubHandlerRegistrations, reg)
115111
return reg
@@ -128,7 +124,7 @@ func redactPassword(event CalicoVppEvent) string {
128124
func SendEvent(event CalicoVppEvent) {
129125
ThePubSub.log.Debugf("Broadcasting event %s", redactPassword(event))
130126
for _, reg := range ThePubSub.pubSubHandlerRegistrations {
131-
if reg.expectAllEvents || reg.expectedEvents[event.Type] {
127+
if reg.expectedEvents[event.Type] {
132128
reg.channel <- event
133129
}
134130
}

calico-vpp-agent/common/types.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright (C) 2025 Cisco Systems Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12+
// implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
package common
17+
18+
import (
19+
v1 "k8s.io/api/core/v1"
20+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21+
)
22+
23+
type VRF struct {
24+
Tables [2]uint32 // one for ipv4, one for ipv6
25+
}
26+
27+
type NetworkDefinition struct {
28+
// VRF is the main table used for the corresponding physical network
29+
VRF VRF
30+
// PodVRF is the table used for the pods in the corresponding physical network
31+
PodVRF VRF
32+
Vni uint32
33+
PhysicalNetworkName string
34+
Name string
35+
Range string
36+
NetAttachDefs string
37+
}
38+
39+
// FelixSocketSyncState describes the status of the
40+
// felix socket connection. It applies mostly to policies
41+
type FelixSocketSyncState int
42+
43+
const (
44+
StateDisconnected FelixSocketSyncState = iota
45+
StateConnected
46+
StateSyncing
47+
StateInSync
48+
)
49+
50+
func (state FelixSocketSyncState) IsPending() bool {
51+
return state != StateInSync
52+
}
53+
54+
// FelixSocketStateChanged is emitted when the state
55+
// of the socket changed. Typically connection and disconnection.
56+
type FelixSocketStateChanged struct {
57+
NewState FelixSocketSyncState
58+
}
59+
60+
type ServiceAndEndpoints struct {
61+
Service *v1.Service
62+
Endpoints *v1.Endpoints
63+
}
64+
65+
type ServiceEndpointsUpdate struct {
66+
New *ServiceAndEndpoints
67+
Old *ServiceAndEndpoints
68+
}
69+
70+
type ServiceEndpointsDelete struct {
71+
Meta *metav1.ObjectMeta
72+
}

0 commit comments

Comments
 (0)