Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions calico-vpp-agent/cmd/calico_vpp_dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/prometheus"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/routing"
"github.com/projectcalico/vpp-dataplane/v3/config"

watchdog "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/watch_dog"
Expand Down Expand Up @@ -139,10 +138,10 @@ func main() {
peerWatcher := watchers.NewPeerWatcher(clientv3, k8sclient, log.WithFields(logrus.Fields{"subcomponent": "peer-watcher"}))
bgpFilterWatcher := watchers.NewBGPFilterWatcher(clientv3, k8sclient, log.WithFields(logrus.Fields{"subcomponent": "BGPFilter-watcher"}))
netWatcher := watchers.NewNetWatcher(vpp, log.WithFields(logrus.Fields{"component": "net-watcher"}))
routingServer := routing.NewRoutingServer(vpp, bgpServer, log.WithFields(logrus.Fields{"component": "routing"}))
prometheusServer := prometheus.NewPrometheusServer(vpp, log.WithFields(logrus.Fields{"component": "prometheus"}))
localSIDWatcher := watchers.NewLocalSIDWatcher(vpp, clientv3, log.WithFields(logrus.Fields{"subcomponent": "localsid-watcher"}))
felixServer := felix.NewFelixServer(vpp, clientv3, log.WithFields(logrus.Fields{"component": "policy"}))
felixServer.SetBGPServer(bgpServer)
felixWatcher := watchers.NewFelixWatcher(felixServer.GetFelixServerEventChan(), log.WithFields(logrus.Fields{"component": "felix watcher"}))
cniServer := watchers.NewCNIServer(felixServer.GetFelixServerEventChan(), log.WithFields(logrus.Fields{"component": "cni"}))
serviceServer := watchers.NewServiceServer(felixServer.GetFelixServerEventChan(), k8sclient, log.WithFields(logrus.Fields{"component": "services"}))
Expand All @@ -159,10 +158,11 @@ func main() {
log.Fatalf("cannot get default BGP config %s", err)
}

routingServer.SetBGPConf(bgpConf)
felixServer.SetBGPConf(bgpConf)

routingServer.SetPeerHandler(felixServer.GetPeerHandler())
bgpWatcher := watchers.NewBGPWatcher(felixServer.GetCache(), log.WithFields(logrus.Fields{"component": "bgp-watcher"}))
bgpWatcher.SetBGPServer(bgpServer)
bgpWatcher.SetBGPHandler(felixServer.GetBGPHandler())

watchDog := watchdog.NewWatchDog(log.WithFields(logrus.Fields{"component": "watchDog"}), &t)
Go(felixServer.ServeFelix)
Expand All @@ -179,7 +179,8 @@ func main() {
panic("ourBGPSpec is not *common.LocalNodeSpec")
}
prefixWatcher.SetOurBGPSpec(bgpSpec)
routingServer.SetOurBGPSpec(bgpSpec)
bgpWatcher.SetOurBGPSpec(bgpSpec)
felixServer.GetRoutingHandler().SetOurBGPSpec(bgpSpec)
localSIDWatcher.SetOurBGPSpec(bgpSpec)
netWatcher.SetOurBGPSpec(bgpSpec)
}
Expand All @@ -195,7 +196,8 @@ func main() {
Go(prefixWatcher.WatchPrefix)
Go(peerWatcher.WatchBGPPeers)
Go(bgpFilterWatcher.WatchBGPFilters)
Go(routingServer.ServeRouting)
Go(bgpWatcher.WatchBGPPath)
Go(felixServer.GetRoutingHandler().ServeRoutingHandler)
Go(serviceServer.ServeService)
Go(cniServer.ServeCNI)
Go(prometheusServer.ServePrometheus)
Expand Down
129 changes: 108 additions & 21 deletions calico-vpp-agent/felix/felix_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"fmt"
"net"

bgpapi "github.com/osrg/gobgp/v3/api"
bgpserver "github.com/osrg/gobgp/v3/pkg/server"
"github.com/pkg/errors"
calicov3cli "github.com/projectcalico/calico/libcalico-go/lib/clientv3"
"github.com/sirupsen/logrus"
Expand All @@ -33,8 +35,8 @@ import (
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix/cni/model"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix/connectivity"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix/policies"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix/routing"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix/services"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/routing"
"github.com/projectcalico/vpp-dataplane/v3/config"
"github.com/projectcalico/vpp-dataplane/v3/vpplink"
"github.com/projectcalico/vpp-dataplane/v3/vpplink/types"
Expand All @@ -61,6 +63,8 @@ type Server struct {
connectivityHandler *connectivity.ConnectivityHandler
serviceHandler *services.ServiceHandler
peerHandler *routing.PeerHandler
routingHandler *routing.RoutingHandler
bgpHandler *routing.BGPHandler
}

// NewFelixServer creates a felix server
Expand All @@ -80,6 +84,8 @@ func NewFelixServer(vpp *vpplink.VppLink, clientv3 calicov3cli.Interface, log *l
connectivityHandler: connectivity.NewConnectivityHandler(vpp, cache, clientv3, log),
serviceHandler: services.NewServiceHandler(vpp, cache, log),
peerHandler: routing.NewPeerHandler(cache, log),
routingHandler: routing.NewRoutingHandler(vpp, cache, log),
bgpHandler: routing.NewBGPHandler(log),
}

reg := common.RegisterHandler(server.felixServerEventChan, "felix server events")
Expand All @@ -101,6 +107,17 @@ func NewFelixServer(vpp *vpplink.VppLink, clientv3 calicov3cli.Interface, log *l
common.SecretAdded,
common.SecretChanged,
common.SecretDeleted,
common.BGPPathAdded,
common.BGPPathDeleted,
common.BGPPeerAdded,
common.BGPPeerUpdated,
common.BGPPeerDeleted,
common.BGPFilterAddedOrUpdated,
common.BGPFilterDeleted,
common.BGPDefinedSetAdded,
common.BGPDefinedSetDeleted,
common.LocalPodAddressAdded,
common.LocalPodAddressDeleted,
)

return server
Expand All @@ -118,10 +135,22 @@ func (s *Server) GetPeerHandler() *routing.PeerHandler {
return s.peerHandler
}

func (s *Server) GetRoutingHandler() *routing.RoutingHandler {
return s.routingHandler
}

func (s *Server) GetBGPHandler() *routing.BGPHandler {
return s.bgpHandler
}

func (s *Server) SetBGPConf(bgpConf *calicov3.BGPConfigurationSpec) {
s.cache.BGPConf = bgpConf
}

func (s *Server) SetBGPServer(bgpServer *bgpserver.BgpServer) {
s.bgpHandler.SetBGPServer(bgpServer)
}

func (s *Server) getMainInterface() *config.UplinkStatus {
for _, i := range common.VppManagerInfo.UplinkStatuses {
if i.IsMain {
Expand Down Expand Up @@ -385,39 +414,27 @@ func (s *Server) handleFelixServerEvents(msg interface{}) (err error) {
case common.ConnectivityAdded:
new, ok := evt.New.(*common.NodeConnectivity)
if !ok {
s.log.Errorf("evt.New is not a *common.NodeConnectivity %v", evt.New)
}
err := s.connectivityHandler.UpdateIPConnectivity(new, false /* isWithdraw */)
if err != nil {
s.log.Errorf("Error while adding connectivity %s", err)
return fmt.Errorf("evt.New is not a (*common.NodeConnectivity) %v", evt.New)
}
err = s.connectivityHandler.UpdateIPConnectivity(new, false /* isWithdraw */)
case common.ConnectivityDeleted:
old, ok := evt.Old.(*common.NodeConnectivity)
if !ok {
s.log.Errorf("evt.Old is not a *common.NodeConnectivity %v", evt.Old)
}
err := s.connectivityHandler.UpdateIPConnectivity(old, true /* isWithdraw */)
if err != nil {
s.log.Errorf("Error while deleting connectivity %s", err)
return fmt.Errorf("evt.Old is not a (*common.NodeConnectivity) %v", evt.Old)
}
err = s.connectivityHandler.UpdateIPConnectivity(old, true /* isWithdraw */)
case common.SRv6PolicyAdded:
new, ok := evt.New.(*common.NodeConnectivity)
if !ok {
s.log.Errorf("evt.New is not a *common.NodeConnectivity %v", evt.New)
}
err := s.connectivityHandler.UpdateSRv6Policy(new, false /* isWithdraw */)
if err != nil {
s.log.Errorf("Error while adding SRv6 Policy %s", err)
return fmt.Errorf("evt.New is not a (*common.NodeConnectivity) %v", evt.New)
}
err = s.connectivityHandler.UpdateSRv6Policy(new, false /* isWithdraw */)
case common.SRv6PolicyDeleted:
old, ok := evt.Old.(*common.NodeConnectivity)
if !ok {
s.log.Errorf("evt.Old is not a *common.NodeConnectivity %v", evt.Old)
}
err := s.connectivityHandler.UpdateSRv6Policy(old, true /* isWithdraw */)
if err != nil {
s.log.Errorf("Error while deleting SRv6 Policy %s", err)
return fmt.Errorf("evt.Old is not a (*common.NodeConnectivity) %v", evt.Old)
}
err = s.connectivityHandler.UpdateSRv6Policy(old, true /* isWithdraw */)
case common.PeersChanged:
peersEvent, ok := evt.New.(*common.PeersChangedEvent)
if !ok {
Expand Down Expand Up @@ -472,6 +489,76 @@ func (s *Server) handleFelixServerEvents(msg interface{}) (err error) {
return fmt.Errorf("evt.New is not a (*common.SecretDeletedEvent) %v", evt.New)
}
s.peerHandler.OnSecretDeleted(secretEvent.SecretName)
case common.BGPPathAdded:
path, ok := evt.New.(*bgpapi.Path)
if !ok {
return fmt.Errorf("evt.New is not a (*bgpapi.Path) %v", evt.New)
}
err = s.bgpHandler.HandleBGPPathAdded(path)
case common.BGPPathDeleted:
path, ok := evt.Old.(*bgpapi.Path)
if !ok {
return fmt.Errorf("evt.Old is not a (*bgpapi.Path) %v", evt.Old)
}
err = s.bgpHandler.HandleBGPPathDeleted(path)
case common.BGPPeerAdded:
peer, ok := evt.New.(*routing.LocalBGPPeer)
if !ok {
return fmt.Errorf("evt.New is not a (*routing.LocalBGPPeer) %v", evt.New)
}
err = s.bgpHandler.HandleBGPPeerAdded(peer)
case common.BGPPeerUpdated:
newPeer, ok := evt.New.(*routing.LocalBGPPeer)
if !ok {
return fmt.Errorf("evt.New is not a (*routing.LocalBGPPeer) %v", evt.New)
}
oldPeer, ok := evt.Old.(*routing.LocalBGPPeer)
if !ok {
return fmt.Errorf("evt.Old is not a (*routing.LocalBGPPeer) %v", evt.Old)
}
err = s.bgpHandler.HandleBGPPeerUpdated(newPeer, oldPeer)
case common.BGPPeerDeleted:
peerIP, ok := evt.Old.(string)
if !ok {
return fmt.Errorf("evt.Old is not a string %v", evt.Old)
}
err = s.bgpHandler.HandleBGPPeerDeleted(peerIP)
case common.BGPFilterAddedOrUpdated:
filter, ok := evt.New.(calicov3.BGPFilter)
if !ok {
return fmt.Errorf("evt.New is not a (calicov3.BGPFilter) %v", evt.New)
}
err = s.bgpHandler.HandleBGPFilterAddedOrUpdated(filter)
case common.BGPFilterDeleted:
filter, ok := evt.Old.(calicov3.BGPFilter)
if !ok {
return fmt.Errorf("evt.Old is not a (calicov3.BGPFilter) %v", evt.Old)
}
err = s.bgpHandler.HandleBGPFilterDeleted(filter)
case common.BGPDefinedSetAdded:
definedSet, ok := evt.New.(*bgpapi.DefinedSet)
if !ok {
return fmt.Errorf("evt.New is not a (*bgpapi.DefinedSet) %v", evt.New)
}
err = s.bgpHandler.HandleBGPDefinedSetAdded(definedSet)
case common.BGPDefinedSetDeleted:
definedSet, ok := evt.Old.(*bgpapi.DefinedSet)
if !ok {
return fmt.Errorf("evt.Old is not a (*bgpapi.DefinedSet) %v", evt.Old)
}
err = s.bgpHandler.HandleBGPDefinedSetDeleted(definedSet)
case common.LocalPodAddressAdded:
networkPod, ok := evt.New.(cni.NetworkPod)
if !ok {
return fmt.Errorf("evt.New is not a (cni.NetworkPod) %v", evt.New)
}
err = s.routingHandler.AnnounceLocalAddress(networkPod.ContainerIP, networkPod.NetworkVni)
case common.LocalPodAddressDeleted:
networkPod, ok := evt.Old.(cni.NetworkPod)
if !ok {
return fmt.Errorf("evt.Old is not a (cni.NetworkPod) %v", evt.Old)
}
err = s.routingHandler.WithdrawLocalAddress(networkPod.ContainerIP, networkPod.NetworkVni)
default:
s.log.Warnf("Unhandled CalicoVppEvent.Type: %s", evt.Type)
}
Expand Down
Loading