Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions calico-vpp-agent/cmd/calico_vpp_dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ func main() {
prometheusServer := prometheus.NewPrometheusServer(vpp, log.WithFields(logrus.Fields{"component": "prometheus"}))
localSIDWatcher := watchers.NewLocalSIDWatcher(vpp, clientv3, log.WithFields(logrus.Fields{"subcomponent": "localsid-watcher"}))
felixServer := felix.NewFelixServer(vpp, clientv3, log.WithFields(logrus.Fields{"component": "policy"}))

// Register prometheus server as pod event handler
felixServer.RegisterPodEventHandler(prometheusServer)

felixWatcher := watchers.NewFelixWatcher(felixServer.GetFelixServerEventChan(), log.WithFields(logrus.Fields{"component": "felix watcher"}))
cniServer := watchers.NewCNIServer(felixServer.GetFelixServerEventChan(), log.WithFields(logrus.Fields{"component": "cni"}))
serviceServer := watchers.NewServiceServer(felixServer.GetFelixServerEventChan(), k8sclient, log.WithFields(logrus.Fields{"component": "services"}))
Expand Down
25 changes: 25 additions & 0 deletions calico-vpp-agent/felix/felix_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ func (e NodeWatcherRestartError) Error() string {
return "node configuration changed, restarting"
}

// PodEventHandler defines the interface for handling pod events
type PodEventHandler interface {
OnPodAdded(podSpec *model.LocalPodSpec)
OnPodDeleted(podSpec *model.LocalPodSpec)
}

// Server holds all the data required to configure the policies defined by felix in VPP
type Server struct {
log *logrus.Entry
Expand All @@ -59,6 +65,9 @@ type Server struct {
cniHandler *cni.CNIHandler
connectivityHandler *connectivity.ConnectivityHandler
serviceHandler *services.ServiceHandler

// Pod event handlers for pod events (add/delete)
podEventHandlers []PodEventHandler
}

// NewFelixServer creates a felix server
Expand All @@ -77,6 +86,7 @@ func NewFelixServer(vpp *vpplink.VppLink, clientv3 calicov3cli.Interface, log *l
cniHandler: cni.NewCNIHandler(vpp, cache, log),
connectivityHandler: connectivity.NewConnectivityHandler(vpp, cache, clientv3, log),
serviceHandler: services.NewServiceHandler(vpp, cache, log),
podEventHandlers: make([]PodEventHandler, 0),
}

reg := common.RegisterHandler(server.felixServerEventChan, "felix server events")
Expand All @@ -96,6 +106,11 @@ func NewFelixServer(vpp *vpplink.VppLink, clientv3 calicov3cli.Interface, log *l
return server
}

// RegisterPodEventHandler registers a pod event handler
func (s *Server) RegisterPodEventHandler(handler PodEventHandler) {
s.podEventHandlers = append(s.podEventHandlers, handler)
}

func (s *Server) GetFelixServerEventChan() chan any {
return s.felixServerEventChan
}
Expand Down Expand Up @@ -343,6 +358,11 @@ func (s *Server) handleFelixServerEvents(msg interface{}) (err error) {
EndpointID: podSpec.EndpointID,
Network: podSpec.NetworkName,
}, swIfIndex, podSpec.InterfaceName, podSpec.GetContainerIPs())

// Notify all registered pod event handlers
for _, handler := range s.podEventHandlers {
handler.OnPodAdded(podSpec)
}
case common.PodDeleted:
podSpec, ok := evt.Old.(*model.LocalPodSpec)
if !ok {
Expand All @@ -355,6 +375,11 @@ func (s *Server) handleFelixServerEvents(msg interface{}) (err error) {
EndpointID: podSpec.EndpointID,
Network: podSpec.NetworkName,
}, podSpec.GetContainerIPs())

// Notify all registered pod event handlers
for _, handler := range s.podEventHandlers {
handler.OnPodDeleted(podSpec)
}
}
case common.TunnelAdded:
swIfIndex, ok := evt.New.(uint32)
Expand Down
121 changes: 55 additions & 66 deletions calico-vpp-agent/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"go.fd.io/govpp/adapter/statsclient"
"gopkg.in/tomb.v2"

"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common"
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix/cni/model"
"github.com/projectcalico/vpp-dataplane/v3/config"
"github.com/projectcalico/vpp-dataplane/v3/vpplink"
Expand All @@ -49,7 +48,6 @@ type PrometheusServer struct {
podInterfacesDetailsBySwifIndex map[uint32]podInterfaceDetails
podInterfacesByKey map[string]model.LocalPodSpec
statsclient *statsclient.StatsClient
channel chan any
lock sync.Mutex
httpServer *http.Server
exporter *prometheusExporter.Exporter
Expand All @@ -65,7 +63,6 @@ func NewPrometheusServer(vpp *vpplink.VppLink, log *logrus.Entry) *PrometheusSer
server := &PrometheusServer{
log: log,
vpp: vpp,
channel: make(chan any, 10),
podInterfacesByKey: make(map[string]model.LocalPodSpec),
podInterfacesDetailsBySwifIndex: make(map[uint32]podInterfaceDetails),
statsclient: statsclient.NewStatsClient("" /* default socket name */),
Expand All @@ -76,10 +73,6 @@ func NewPrometheusServer(vpp *vpplink.VppLink, log *logrus.Entry) *PrometheusSer
exporter: exporter,
}

if *config.GetCalicoVppFeatureGates().PrometheusEnabled {
reg := common.RegisterHandler(server.channel, "prometheus events")
reg.ExpectEvents(common.PodAdded, common.PodDeleted)
}
return server
}

Expand Down Expand Up @@ -407,70 +400,66 @@ func (p *PrometheusServer) exportSessionScalarStat(name string, value int64) {
}
}

// OnPodAdded handles pod addition events directly
func (p *PrometheusServer) OnPodAdded(podSpec *model.LocalPodSpec) {
if !(*config.GetCalicoVppFeatureGates().PrometheusEnabled) {
return
}

splittedWorkloadID := strings.SplitN(podSpec.WorkloadID, "/", 2)
if len(splittedWorkloadID) != 2 {
return
}

p.lock.Lock()
defer p.lock.Unlock()

if podSpec.MemifSwIfIndex != vpplink.InvalidSwIfIndex {
memifName := podSpec.InterfaceName
if podSpec.NetworkName == "" {
memifName = "vpp/memif-" + podSpec.InterfaceName
}
p.podInterfacesDetailsBySwifIndex[podSpec.MemifSwIfIndex] = podInterfaceDetails{
podNamespace: splittedWorkloadID[0],
podName: splittedWorkloadID[1],
interfaceName: memifName,
}
}
if podSpec.TunTapSwIfIndex != vpplink.InvalidSwIfIndex {
p.podInterfacesDetailsBySwifIndex[podSpec.TunTapSwIfIndex] = podInterfaceDetails{
podNamespace: splittedWorkloadID[0],
podName: splittedWorkloadID[1],
interfaceName: podSpec.InterfaceName,
}
}
p.podInterfacesByKey[podSpec.Key()] = *podSpec
}

// OnPodDeleted handles pod deletion events directly
func (p *PrometheusServer) OnPodDeleted(podSpec *model.LocalPodSpec) {
if !(*config.GetCalicoVppFeatureGates().PrometheusEnabled) {
return
}

p.lock.Lock()
defer p.lock.Unlock()

initialPod := p.podInterfacesByKey[podSpec.Key()]
delete(p.podInterfacesByKey, initialPod.Key())
if podSpec.MemifSwIfIndex != vpplink.InvalidSwIfIndex {
delete(p.podInterfacesDetailsBySwifIndex, initialPod.MemifSwIfIndex)
}
if podSpec.TunTapSwIfIndex != vpplink.InvalidSwIfIndex {
delete(p.podInterfacesDetailsBySwifIndex, initialPod.TunTapSwIfIndex)
}
}

func (p *PrometheusServer) ServePrometheus(t *tomb.Tomb) error {
if !(*config.GetCalicoVppFeatureGates().PrometheusEnabled) {
return nil
}
p.log.Infof("Serve() Prometheus exporter")
go func() {
for t.Alive() {
/* Note: we will only receive events we ask for when registering the chan */
msg := <-p.channel
evt, ok := msg.(common.CalicoVppEvent)
if !ok {
continue
}
switch evt.Type {
case common.PodAdded:
podSpec, ok := evt.New.(*model.LocalPodSpec)
if !ok {
p.log.Errorf("evt.New is not a *model.LocalPodSpec %v", evt.New)
continue
}
splittedWorkloadID := strings.SplitN(podSpec.WorkloadID, "/", 2)
if len(splittedWorkloadID) != 2 {
continue
}
p.lock.Lock()
if podSpec.MemifSwIfIndex != vpplink.InvalidSwIfIndex {
memifName := podSpec.InterfaceName
if podSpec.NetworkName == "" {
memifName = "vpp/memif-" + podSpec.InterfaceName
}
p.podInterfacesDetailsBySwifIndex[podSpec.MemifSwIfIndex] = podInterfaceDetails{
podNamespace: splittedWorkloadID[0],
podName: splittedWorkloadID[1],
interfaceName: memifName,
}
}
if podSpec.TunTapSwIfIndex != vpplink.InvalidSwIfIndex {
p.podInterfacesDetailsBySwifIndex[podSpec.TunTapSwIfIndex] = podInterfaceDetails{
podNamespace: splittedWorkloadID[0],
podName: splittedWorkloadID[1],
interfaceName: podSpec.InterfaceName,
}
}
p.podInterfacesByKey[podSpec.Key()] = *podSpec
p.lock.Unlock()
case common.PodDeleted:
podSpec, ok := evt.Old.(*model.LocalPodSpec)
if !ok {
p.log.Errorf("evt.Old is not a *model.LocalPodSpec %v", evt.Old)
continue
}
p.lock.Lock()
initialPod := p.podInterfacesByKey[podSpec.Key()]
delete(p.podInterfacesByKey, initialPod.Key())
if podSpec.MemifSwIfIndex != vpplink.InvalidSwIfIndex {
delete(p.podInterfacesDetailsBySwifIndex, initialPod.MemifSwIfIndex)
}
if podSpec.TunTapSwIfIndex != vpplink.InvalidSwIfIndex {
delete(p.podInterfacesDetailsBySwifIndex, initialPod.TunTapSwIfIndex)
}
p.lock.Unlock()
}
}
}()

err := p.statsclient.Connect()
if err != nil {
return errors.Wrap(err, "could not connect statsclient")
Expand Down