Skip to content

Commit 9763990

Browse files
author
Aritra Basu
committed
remove route_watcher dependency on pubsub
Signed-off-by: Aritra Basu <[email protected]>
1 parent 05e1278 commit 9763990

File tree

7 files changed

+161
-114
lines changed

7 files changed

+161
-114
lines changed

calico-vpp-agent/cmd/calico_vpp_dataplane.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,10 @@ func main() {
147147
cniServer := watchers.NewCNIServer(felixServer.GetFelixServerEventChan(), log.WithFields(logrus.Fields{"component": "cni"}))
148148
serviceServer := watchers.NewServiceServer(felixServer.GetFelixServerEventChan(), k8sclient, log.WithFields(logrus.Fields{"component": "services"}))
149149

150+
// Register route watcher to receive network and IPAM events for VPP route management
151+
netWatcher.RegisterRouteWatcher(routeWatcher)
152+
felixServer.RegisterRouteWatcher(routeWatcher)
153+
150154
err = watchers.InstallFelixPlugin()
151155
if err != nil {
152156
log.Fatalf("could not install felix plugin: %s", err)

calico-vpp-agent/common/common.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,14 @@ type FelixServerIpam interface {
5757
GetPrefixIPPool(prefix *net.IPNet) *proto.IPAMPool
5858
}
5959

60+
// RouteWatcherEventHandler defines the interface for components that need to handle
61+
// network and IPAM events by updating VPP routing configuration
62+
type RouteWatcherEventHandler interface {
63+
OnNetDeleted(netDef *NetworkDefinition) error
64+
OnNetAddedOrUpdated(netDef *NetworkDefinition) error
65+
OnIpamConfChanged(oldPool, newPool *proto.IPAMPool) error
66+
}
67+
6068
type LocalNodeSpec struct {
6169
ASNumber *numorstring.ASNumber
6270
Labels map[string]string

calico-vpp-agent/felix/felix_server.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type Server struct {
5959
cniHandler *cni.CNIHandler
6060
connectivityHandler *connectivity.ConnectivityHandler
6161
serviceHandler *services.ServiceHandler
62+
routeWatcher common.RouteWatcherEventHandler
6263
}
6364

6465
// NewFelixServer creates a felix server
@@ -77,6 +78,7 @@ func NewFelixServer(vpp *vpplink.VppLink, clientv3 calicov3cli.Interface, log *l
7778
cniHandler: cni.NewCNIHandler(vpp, cache, log),
7879
connectivityHandler: connectivity.NewConnectivityHandler(vpp, cache, clientv3, log),
7980
serviceHandler: services.NewServiceHandler(vpp, cache, log),
81+
routeWatcher: nil,
8082
}
8183

8284
reg := common.RegisterHandler(server.felixServerEventChan, "felix server events")
@@ -96,6 +98,10 @@ func NewFelixServer(vpp *vpplink.VppLink, clientv3 calicov3cli.Interface, log *l
9698
return server
9799
}
98100

101+
func (s *Server) RegisterRouteWatcher(routeWatcher common.RouteWatcherEventHandler) {
102+
s.routeWatcher = routeWatcher
103+
}
104+
99105
func (s *Server) GetFelixServerEventChan() chan any {
100106
return s.felixServerEventChan
101107
}

calico-vpp-agent/felix/ipam.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ import (
2121
"github.com/pkg/errors"
2222

2323
"github.com/projectcalico/calico/felix/proto"
24-
25-
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common"
2624
)
2725

2826
func (s *Server) handleIpamPoolUpdate(msg *proto.IPAMPoolUpdate) (err error) {
@@ -49,11 +47,12 @@ func (s *Server) handleIpamPoolUpdate(msg *proto.IPAMPoolUpdate) (err error) {
4947
}
5048
s.connectivityHandler.OnIpamConfChanged(oldIpamPool, newIpamPool)
5149
s.cniHandler.OnIpamConfChanged(oldIpamPool, newIpamPool)
52-
common.SendEvent(common.CalicoVppEvent{
53-
Type: common.IpamConfChanged,
54-
Old: ipamPoolCopy(oldIpamPool),
55-
New: ipamPoolCopy(newIpamPool),
56-
})
50+
if s.routeWatcher != nil {
51+
err := s.routeWatcher.OnIpamConfChanged(oldIpamPool, newIpamPool)
52+
if err != nil {
53+
s.log.Errorf("Failed to handle IPAM update in RouteWatcher: %v", err)
54+
}
55+
}
5756
}
5857
} else {
5958
s.log.Infof("Adding pool: %s, nat:%t", msg.GetId(), newIpamPool.GetMasquerade())
@@ -65,10 +64,12 @@ func (s *Server) handleIpamPoolUpdate(msg *proto.IPAMPoolUpdate) (err error) {
6564
}
6665
s.connectivityHandler.OnIpamConfChanged(nil /*old*/, newIpamPool)
6766
s.cniHandler.OnIpamConfChanged(nil /*old*/, newIpamPool)
68-
common.SendEvent(common.CalicoVppEvent{
69-
Type: common.IpamConfChanged,
70-
New: ipamPoolCopy(newIpamPool),
71-
})
67+
if s.routeWatcher != nil {
68+
err := s.routeWatcher.OnIpamConfChanged(nil, newIpamPool)
69+
if err != nil {
70+
s.log.Errorf("Failed to handle IPAM addition in RouteWatcher: %v", err)
71+
}
72+
}
7273
}
7374
return nil
7475
}
@@ -88,13 +89,14 @@ func (s *Server) handleIpamPoolRemove(msg *proto.IPAMPoolRemove) (err error) {
8889
if err != nil {
8990
return errors.Wrap(err, "error handling ipam deletion")
9091
}
91-
common.SendEvent(common.CalicoVppEvent{
92-
Type: common.IpamConfChanged,
93-
Old: ipamPoolCopy(oldIpamPool),
94-
New: nil,
95-
})
9692
s.connectivityHandler.OnIpamConfChanged(oldIpamPool, nil /* new */)
9793
s.cniHandler.OnIpamConfChanged(oldIpamPool, nil /* new */)
94+
if s.routeWatcher != nil {
95+
err := s.routeWatcher.OnIpamConfChanged(oldIpamPool, nil)
96+
if err != nil {
97+
s.log.Errorf("Failed to handle IPAM deletion in RouteWatcher: %v", err)
98+
}
99+
}
98100
} else {
99101
s.log.Warnf("Deleting unknown ippool")
100102
return nil

calico-vpp-agent/felix/node_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,21 @@ import (
4545
"github.com/projectcalico/vpp-dataplane/v3/vpplink/types"
4646
)
4747

48+
// mockRouteWatcher implements the RouteWatcherEventHandler interface for testing
49+
type mockRouteWatcher struct{}
50+
51+
func (m *mockRouteWatcher) OnNetDeleted(netDef *common.NetworkDefinition) error {
52+
return nil
53+
}
54+
55+
func (m *mockRouteWatcher) OnNetAddedOrUpdated(netDef *common.NetworkDefinition) error {
56+
return nil
57+
}
58+
59+
func (m *mockRouteWatcher) OnIpamConfChanged(oldPool, newPool *proto.IPAMPool) error {
60+
return nil
61+
}
62+
4863
// Names of integration tests arguments
4964
const (
5065
IntegrationTestEnableArgName = "INTEGRATION_TEST"
@@ -132,11 +147,16 @@ var _ = Describe("Node-related functionality of CNI", func() {
132147
if ipamStub == nil {
133148
ipamStub = mocks.NewIpamCacheStub()
134149
}
150+
151+
// Create a mock RouteWatcher
152+
mockRouteWatcher := &mockRouteWatcher{}
153+
135154
felixServer = NewFelixServer(
136155
vpp,
137156
client,
138157
log.WithFields(logrus.Fields{"subcomponent": "connectivity"}),
139158
)
159+
felixServer.RegisterRouteWatcher(mockRouteWatcher)
140160
if felixConfig == nil {
141161
felixConfig = &config.Config{}
142162
}

calico-vpp-agent/watchers/net_watcher.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type NetWatcher struct {
4545
nads map[string]string
4646
InSync chan interface{}
4747
nodeBGPSpec *common.LocalNodeSpec
48+
routeWatcher common.RouteWatcherEventHandler
4849

4950
currentWatchRevisionNet string
5051
currentWatchRevisionNad string
@@ -65,10 +66,15 @@ func NewNetWatcher(vpp *vpplink.VppLink, log *logrus.Entry) *NetWatcher {
6566
networkDefinitions: make(map[string]*common.NetworkDefinition),
6667
nads: make(map[string]string),
6768
InSync: make(chan interface{}),
69+
routeWatcher: nil,
6870
}
6971
return &w
7072
}
7173

74+
func (w *NetWatcher) RegisterRouteWatcher(routeWatcher common.RouteWatcherEventHandler) {
75+
w.routeWatcher = routeWatcher
76+
}
77+
7278
func (w *NetWatcher) SetOurBGPSpec(nodeBGPSpec *common.LocalNodeSpec) {
7379
w.nodeBGPSpec = nodeBGPSpec
7480
}
@@ -106,7 +112,7 @@ func (w *NetWatcher) resyncAndCreateWatchers() error {
106112
return errors.Wrapf(err, "Listing NetworkAttachmentDefinitions failed")
107113
}
108114
for _, nad := range nadList.Items {
109-
err = w.onNadAdded(&nad)
115+
err = w.OnNadAdded(&nad)
110116
if err != nil {
111117
return errors.Wrapf(err, "OnNadAdded failed for %v", nad)
112118
}
@@ -200,7 +206,7 @@ func (w *NetWatcher) WatchNetworks(t *tomb.Tomb) error {
200206
w.log.Errorf("update.Object is not *NetworkAttachmentDefinition, %v", update.Object)
201207
continue
202208
}
203-
err := w.onNadAdded(nad)
209+
err := w.OnNadAdded(nad)
204210
if err != nil {
205211
w.log.Error(err)
206212
}
@@ -210,7 +216,7 @@ func (w *NetWatcher) WatchNetworks(t *tomb.Tomb) error {
210216
w.log.Errorf("update.Object is not *NetworkAttachmentDefinition, %v", update.Object)
211217
continue
212218
}
213-
err := w.onNadDeleted(nad)
219+
err := w.OnNadDeleted(nad)
214220
if err != nil {
215221
w.log.Error(err)
216222
}
@@ -230,7 +236,7 @@ func (w *NetWatcher) Stop() {
230236
close(w.stop)
231237
}
232238

233-
func (w *NetWatcher) onNadDeleted(nad *netv1.NetworkAttachmentDefinition) error {
239+
func (w *NetWatcher) OnNadDeleted(nad *netv1.NetworkAttachmentDefinition) error {
234240
delete(w.nads, nad.Namespace+"/"+nad.Name)
235241
for key, net := range w.networkDefinitions {
236242
if net.NetAttachDefs == nad.Namespace+"/"+nad.Name {
@@ -239,12 +245,18 @@ func (w *NetWatcher) onNadDeleted(nad *netv1.NetworkAttachmentDefinition) error
239245
Type: common.NetAddedOrUpdated,
240246
New: w.networkDefinitions[key],
241247
})
248+
if w.routeWatcher != nil {
249+
err := w.routeWatcher.OnNetAddedOrUpdated(w.networkDefinitions[key])
250+
if err != nil {
251+
w.log.Errorf("Failed to handle network update in RouteWatcher: %v", err)
252+
}
253+
}
242254
}
243255
}
244256
return nil
245257
}
246258

247-
func (w *NetWatcher) onNadAdded(nad *netv1.NetworkAttachmentDefinition) error {
259+
func (w *NetWatcher) OnNadAdded(nad *netv1.NetworkAttachmentDefinition) error {
248260
var nadConfig nadv1.NetConfList
249261
err := json.Unmarshal([]byte(nad.Spec.Config), &nadConfig)
250262
if err != nil {
@@ -260,6 +272,12 @@ func (w *NetWatcher) onNadAdded(nad *netv1.NetworkAttachmentDefinition) error {
260272
Type: common.NetAddedOrUpdated,
261273
New: w.networkDefinitions[key],
262274
})
275+
if w.routeWatcher != nil {
276+
err := w.routeWatcher.OnNetAddedOrUpdated(w.networkDefinitions[key])
277+
if err != nil {
278+
w.log.Errorf("Failed to handle network update in RouteWatcher: %v", err)
279+
}
280+
}
263281
}
264282
}
265283
}
@@ -283,6 +301,12 @@ func (w *NetWatcher) OnNetAdded(net *networkv3.Network) error {
283301
Type: common.NetAddedOrUpdated,
284302
New: netDef,
285303
})
304+
if w.routeWatcher != nil {
305+
err := w.routeWatcher.OnNetAddedOrUpdated(netDef)
306+
if err != nil {
307+
w.log.Errorf("Failed to handle network addition in RouteWatcher: %v", err)
308+
}
309+
}
286310
return nil
287311
}
288312

@@ -300,6 +324,12 @@ func (w *NetWatcher) OnNetDeleted(netName string) error {
300324
Type: common.NetDeleted,
301325
Old: netDef,
302326
})
327+
if w.routeWatcher != nil {
328+
err := w.routeWatcher.OnNetDeleted(netDef)
329+
if err != nil {
330+
w.log.Errorf("Failed to handle network deletion in RouteWatcher: %v", err)
331+
}
332+
}
303333
return nil
304334
}
305335

0 commit comments

Comments
 (0)