Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0deb2bb
added ipoverib and adding to limited pkey options
Yarosh Jul 24, 2025
dcf8713
added logs
Yarosh Jul 28, 2025
c33feb9
chagned config logs
Yarosh Jul 28, 2025
389cd57
WIP
Yarosh Jul 29, 2025
81ab658
fixed plugin tesrt
Yarosh Jul 29, 2025
7cb2dea
changed tag
Yarosh Jul 29, 2025
316aade
added exponential backoff to pod update and better handling of alloca…
Yarosh Jul 29, 2025
449c584
bumped to version v1.2.0
Yarosh Aug 4, 2025
a642587
fixed config passing
Yarosh Aug 14, 2025
71d1aaa
WIP
Yarosh Aug 15, 2025
5455d12
refactor and better finalizer logic
Yarosh Aug 15, 2025
808a979
Added example yamls
Yarosh Aug 15, 2025
739ba2e
updated build
Yarosh Aug 21, 2025
5523566
added index0 enabling to main partition
Yarosh Aug 26, 2025
cb69c21
better verify creation of new pkey with mtu 4k
Yarosh Aug 26, 2025
19bc028
changed createEmptyPKey to use the actual pkey creation endpoint
Yarosh Aug 26, 2025
d8e4cce
better checking if pkey exists
Yarosh Aug 26, 2025
cff4958
removed uneeded
Yarosh Aug 26, 2025
65dc81c
updated status check
Yarosh Aug 26, 2025
dc6d68d
only logging
Yarosh Aug 26, 2025
c4a35c9
reversed error handling logic
Yarosh Aug 26, 2025
502bb4a
actually try pkey creation
Yarosh Aug 26, 2025
cb4215d
reverting
Yarosh Aug 26, 2025
39d13f9
trying with index-0
Yarosh Aug 27, 2025
1c75df6
adding index0 and mtu t guids
Yarosh Aug 27, 2025
906a69d
added config for index0
Yarosh Aug 27, 2025
fcdc43b
added deployment yamls
Yarosh Aug 27, 2025
3b9e7f1
changing build to tag v1.2.1
Yarosh Aug 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .go-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go1.22.4
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,12 @@ test-coverage: | plugins-coverage envtest gocovmerge gcov2lcov ## Run coverage t

# Container image
.PHONY: image
image: ; $(info Building Docker image...) ## Build conatiner image
$(IMAGE_BUILDER) build -t $(TAG) -f $(DOCKERFILE) $(CURDIR) $(IMAGE_BUILD_OPTS)
image: ; $(info Building Docker image...) ## Build container image
$(IMAGE_BUILDER) build --platform linux/amd64 -t $(TAG) -f $(DOCKERFILE) $(CURDIR) $(IMAGE_BUILD_OPTS)

.PHONY: docker-push
docker-push: ## Push docker image with the manager.
$(IMAGE_BUILDER) push ${TAG}

# Misc

Expand Down
2 changes: 2 additions & 0 deletions deployment/ib-kubernetes-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ data:
DAEMON_PERIODIC_UPDATE: "5"
GUID_POOL_RANGE_START: "02:00:00:00:00:00:00:00"
GUID_POOL_RANGE_END: "02:FF:FF:FF:FF:FF:FF:FF"
# DEFAULT_LIMITED_PARTITION: "0x0001" # optional
ENABLE_IP_OVER_IB: "false" # default false
13 changes: 13 additions & 0 deletions deployment/ib-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,16 @@ spec:
name: ib-kubernetes-ufm-secret
key: UFM_CERTIFICATE
optional: true
- name: ENABLE_IP_OVER_IB # add
valueFrom:
configMapKeyRef:
key: ENABLE_IP_OVER_IB
name: ib-kubernetes-config
optional: true
- name: DEFAULT_LIMITED_PARTITION # add
valueFrom:
configMapKeyRef:
key: DEFAULT_LIMITED_PARTITION
name: ib-kubernetes-config
optional: true

18 changes: 18 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ type DaemonConfig struct {
Plugin string `env:"DAEMON_SM_PLUGIN"`
// Subnet manager plugins path
PluginPath string `env:"DAEMON_SM_PLUGIN_PATH" envDefault:"/plugins"`
// Default partition key for limited membership
DefaultLimitedPartition string `env:"DEFAULT_LIMITED_PARTITION"`
// Enable IP over IB functionality
EnableIPOverIB bool `env:"ENABLE_IP_OVER_IB" envDefault:"false"`
}

type GUIDPoolConfig struct {
Expand All @@ -28,6 +32,20 @@ func (dc *DaemonConfig) ReadConfig() error {
log.Debug().Msg("Reading configuration environment variables")
err := env.Parse(dc)

// If IP over IB enabled - log at startup
if dc.EnableIPOverIB {
log.Warn().Msg("New partitions will be created with IP over IB enabled.")
} else {
log.Info().Msg("New partitions will be created with IP over IB disabled.")
}

// If default limited partition is set - log at startup
if dc.DefaultLimitedPartition != "" {
log.Info().Msgf("Default limited partition is set to %s. New GUIDs will be added as limited members to this partition.", dc.DefaultLimitedPartition)
} else {
log.Info().Msg("Default limited partition is not set.")
}

return err
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ var _ = Describe("Configuration", func() {
Expect(os.Setenv("GUID_POOL_RANGE_END", "02:00:00:00:00:00:00:FF")).ToNot(HaveOccurred())
Expect(os.Setenv("DAEMON_SM_PLUGIN", "ufm")).ToNot(HaveOccurred())
Expect(os.Setenv("DAEMON_SM_PLUGIN_PATH", "/custom/plugins/location")).ToNot(HaveOccurred())
Expect(os.Setenv("DEFAULT_LIMITED_PARTITION", "0x2")).ToNot(HaveOccurred())
Expect(os.Setenv("ENABLE_IP_OVER_IB", "true")).ToNot(HaveOccurred())

err := dc.ReadConfig()
Expect(err).ToNot(HaveOccurred())
Expand All @@ -28,6 +30,8 @@ var _ = Describe("Configuration", func() {
Expect(dc.GUIDPool.RangeEnd).To(Equal("02:00:00:00:00:00:00:FF"))
Expect(dc.Plugin).To(Equal("ufm"))
Expect(dc.PluginPath).To(Equal("/custom/plugins/location"))
Expect(dc.DefaultLimitedPartition).To(Equal("0x2"))
Expect(dc.EnableIPOverIB).To(BeTrue())
})
It("Read configuration with default values", func() {
dc := &DaemonConfig{}
Expand All @@ -40,6 +44,20 @@ var _ = Describe("Configuration", func() {
Expect(dc.GUIDPool.RangeEnd).To(Equal("02:FF:FF:FF:FF:FF:FF:FF"))
Expect(dc.Plugin).To(Equal("ufm"))
Expect(dc.PluginPath).To(Equal("/plugins"))
Expect(dc.DefaultLimitedPartition).To(Equal("")) // Default should be empty
Expect(dc.EnableIPOverIB).To(BeFalse()) // Default should be false
})
It("Read configuration with new environment variables", func() {
dc := &DaemonConfig{}

Expect(os.Setenv("DAEMON_SM_PLUGIN", "ufm")).ToNot(HaveOccurred())
Expect(os.Setenv("DEFAULT_LIMITED_PARTITION", "0x1")).ToNot(HaveOccurred())
Expect(os.Setenv("ENABLE_IP_OVER_IB", "true")).ToNot(HaveOccurred())

err := dc.ReadConfig()
Expect(err).ToNot(HaveOccurred())
Expect(dc.DefaultLimitedPartition).To(Equal("0x1"))
Expect(dc.EnableIPOverIB).To(BeTrue())
})
})
Context("ValidateConfig", func() {
Expand Down
195 changes: 166 additions & 29 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
)

const GUIDInUFMFinalizer = "ufm.together.ai/guid-cleanup-protection"
const PodGUIDFinalizer = "ufm.together.ai/pod-guid-cleanup-protection"

type Daemon interface {
// Execute Daemon loop, returns when os.Interrupt signal is received
Expand Down Expand Up @@ -407,6 +408,22 @@ func (d *daemon) AddPeriodicUpdate() {
continue
}

// Add finalizer to pod since it now has a GUID that needs cleanup
if err = wait.ExponentialBackoff(backoffValues, func() (bool, error) {
if err = d.kubeClient.AddFinalizerToPod(pi.pod, PodGUIDFinalizer); err != nil {
log.Warn().Msgf("failed to add finalizer to pod %s/%s: %v",
pi.pod.Namespace, pi.pod.Name, err)
return false, nil
}
return true, nil
}); err != nil {
log.Error().Msgf("failed to add finalizer to pod %s/%s", pi.pod.Namespace, pi.pod.Name)
continue
} else {
log.Info().Msgf("added finalizer %s to pod %s/%s",
PodGUIDFinalizer, pi.pod.Namespace, pi.pod.Name)
}

guidList = append(guidList, pi.addr)
passedPods = append(passedPods, pi)
}
Expand All @@ -432,7 +449,29 @@ func (d *daemon) AddPeriodicUpdate() {
log.Error().Msgf("failed to config pKey with subnet manager %s", d.smClient.Name())
continue
} else {
// AddGuidsToPKey successful, add finalizer to NetworkAttachmentDefinition
// AddGuidsToPKey successful, now add GUIDs to limited partition if configured
if d.config.DefaultLimitedPartition != "" {
limitedPKey, err := utils.ParsePKey(d.config.DefaultLimitedPartition)
if err != nil {
log.Error().Msgf("failed to parse DEFAULT_LIMITED_PARTITION %s: %v", d.config.DefaultLimitedPartition, err)
} else {
// Try to add GUIDs to limited partition in backoff loop
if err = wait.ExponentialBackoff(backoffValues, func() (bool, error) {
if err = d.smClient.AddGuidsToLimitedPKey(limitedPKey, guidList); err != nil {
log.Warn().Msgf("failed to add GUIDs to limited partition 0x%04X with subnet manager %s with error: %v",
limitedPKey, d.smClient.Name(), err)
return false, nil
}
return true, nil
}); err != nil {
log.Error().Msgf("failed to add GUIDs to limited partition 0x%04X with subnet manager %s", limitedPKey, d.smClient.Name())
} else {
log.Info().Msgf("successfully added GUIDs %v to limited partition 0x%04X", guidList, limitedPKey)
}
}
}

// Add finalizer to NetworkAttachmentDefinition
networkNamespace, networkName, _ := utils.ParseNetworkID(networkID)
if err := wait.ExponentialBackoff(backoffValues, func() (bool, error) {
if err := d.kubeClient.AddFinalizerToNetworkAttachmentDefinition(
Expand Down Expand Up @@ -480,23 +519,30 @@ func (d *daemon) AddPeriodicUpdate() {
" with subnet manager %s", ibCniSpec.PKey, d.smClient.Name())
continue
} else {
// RemoveGuidsFromPKey successful, remove finalizer from NetworkAttachmentDefinition
networkNamespace, networkName, _ := utils.ParseNetworkID(networkID)
if err := wait.ExponentialBackoff(backoffValues, func() (bool, error) {
if err := d.kubeClient.RemoveFinalizerFromNetworkAttachmentDefinition(
networkNamespace, networkName, GUIDInUFMFinalizer); err != nil {
log.Warn().Msgf("failed to remove finalizer from NetworkAttachmentDefinition %s/%s: %v",
networkNamespace, networkName, err)
return false, nil
// RemoveGuidsFromPKey successful, now remove GUIDs from limited partition if configured
if d.config.DefaultLimitedPartition != "" {
limitedPKey, err := utils.ParsePKey(d.config.DefaultLimitedPartition)
if err != nil {
log.Error().Msgf("failed to parse DEFAULT_LIMITED_PARTITION %s: %v", d.config.DefaultLimitedPartition, err)
} else {
// Try to remove GUIDs from limited partition in backoff loop
if err = wait.ExponentialBackoff(backoffValues, func() (bool, error) {
if err = d.smClient.RemoveGuidsFromPKey(limitedPKey, removedGUIDList); err != nil {
log.Warn().Msgf("failed to remove GUIDs from limited partition 0x%04X with subnet manager %s with error: %v",
limitedPKey, d.smClient.Name(), err)
return false, nil
}
return true, nil
}); err != nil {
log.Error().Msgf("failed to remove GUIDs from limited partition 0x%04X with subnet manager %s", limitedPKey, d.smClient.Name())
} else {
log.Info().Msgf("successfully removed GUIDs %v from limited partition 0x%04X", removedGUIDList, limitedPKey)
}
}
return true, nil
}); err != nil {
log.Error().Msgf("failed to remove finalizer from NetworkAttachmentDefinition %s/%s",
networkNamespace, networkName)
} else {
log.Info().Msgf("removed finalizer %s from NetworkAttachmentDefinition %s/%s",
GUIDInUFMFinalizer, networkNamespace, networkName)
}

// Note: NAD finalizer is not removed here during pod addition
// It will only be removed during pod deletion when all pods using this NAD are cleaned up
}
}

Expand Down Expand Up @@ -562,6 +608,7 @@ func (d *daemon) DeletePeriodicUpdate() {
}

var guidList []net.HardwareAddr
var podGUIDMap = make(map[string]*kapi.Pod) // maps GUID string to pod
var guidAddr net.HardwareAddr
for _, pod := range pods {
log.Debug().Msgf("pod namespace %s name %s", pod.Namespace, pod.Name)
Expand All @@ -572,6 +619,7 @@ func (d *daemon) DeletePeriodicUpdate() {
}

guidList = append(guidList, guidAddr)
podGUIDMap[guidAddr.String()] = pod
}

if ibCniSpec.PKey != "" && len(guidList) != 0 {
Expand All @@ -595,22 +643,54 @@ func (d *daemon) DeletePeriodicUpdate() {
" with subnet manager %s", ibCniSpec.PKey, d.smClient.Name())
continue
} else {
// RemoveGuidsFromPKey successful, remove finalizer from NetworkAttachmentDefinition
// RemoveGuidsFromPKey successful, now remove GUIDs from limited partition if configured
if d.config.DefaultLimitedPartition != "" {
limitedPKey, err := utils.ParsePKey(d.config.DefaultLimitedPartition)
if err != nil {
log.Error().Msgf("failed to parse DEFAULT_LIMITED_PARTITION %s: %v", d.config.DefaultLimitedPartition, err)
} else {
// Try to remove GUIDs from limited partition in backoff loop
if err = wait.ExponentialBackoff(backoffValues, func() (bool, error) {
if err = d.smClient.RemoveGuidsFromPKey(limitedPKey, guidList); err != nil {
log.Warn().Msgf("failed to remove GUIDs from limited partition 0x%04X with subnet manager %s with error: %v",
limitedPKey, d.smClient.Name(), err)
return false, nil
}
return true, nil
}); err != nil {
log.Error().Msgf("failed to remove GUIDs from limited partition 0x%04X with subnet manager %s", limitedPKey, d.smClient.Name())
} else {
log.Info().Msgf("successfully removed GUIDs %v from limited partition 0x%04X", guidList, limitedPKey)
}
}
}

// Check if any pods are still using this network before removing NAD finalizer
networkNamespace, networkName, _ := utils.ParseNetworkID(networkID)
if err := wait.ExponentialBackoff(backoffValues, func() (bool, error) {
if err := d.kubeClient.RemoveFinalizerFromNetworkAttachmentDefinition(
networkNamespace, networkName, GUIDInUFMFinalizer); err != nil {
log.Warn().Msgf("failed to remove finalizer from NetworkAttachmentDefinition %s/%s: %v",
networkNamespace, networkName, err)
return false, nil
podsStillUsingNetwork, err := d.checkIfAnyPodsUsingNetwork(networkNamespace, networkName)
if err != nil {
log.Error().Msgf("failed to check if pods are still using network %s/%s: %v",
networkNamespace, networkName, err)
} else if !podsStillUsingNetwork {
// No pods are using this network anymore, safe to remove NAD finalizer
if err := wait.ExponentialBackoff(backoffValues, func() (bool, error) {
if err := d.kubeClient.RemoveFinalizerFromNetworkAttachmentDefinition(
networkNamespace, networkName, GUIDInUFMFinalizer); err != nil {
log.Warn().Msgf("failed to remove finalizer from NetworkAttachmentDefinition %s/%s: %v",
networkNamespace, networkName, err)
return false, nil
}
return true, nil
}); err != nil {
log.Error().Msgf("failed to remove finalizer from NetworkAttachmentDefinition %s/%s",
networkNamespace, networkName)
} else {
log.Info().Msgf("removed finalizer %s from NetworkAttachmentDefinition %s/%s",
GUIDInUFMFinalizer, networkNamespace, networkName)
}
return true, nil
}); err != nil {
log.Error().Msgf("failed to remove finalizer from NetworkAttachmentDefinition %s/%s",
networkNamespace, networkName)
} else {
log.Info().Msgf("removed finalizer %s from NetworkAttachmentDefinition %s/%s",
GUIDInUFMFinalizer, networkNamespace, networkName)
log.Info().Msgf("NAD finalizer not removed from %s/%s - other pods still using this network",
networkNamespace, networkName)
}
}
}
Expand All @@ -622,6 +702,23 @@ func (d *daemon) DeletePeriodicUpdate() {
}

delete(d.guidPodNetworkMap, guidAddr.String())

// Remove finalizer from pod after successfully cleaning up GUID
if pod, exists := podGUIDMap[guidAddr.String()]; exists {
if err = wait.ExponentialBackoff(backoffValues, func() (bool, error) {
if err = d.kubeClient.RemoveFinalizerFromPod(pod, PodGUIDFinalizer); err != nil {
log.Warn().Msgf("failed to remove finalizer from pod %s/%s: %v",
pod.Namespace, pod.Name, err)
return false, nil
}
return true, nil
}); err != nil {
log.Error().Msgf("failed to remove finalizer from pod %s/%s", pod.Namespace, pod.Name)
} else {
log.Info().Msgf("removed finalizer %s from pod %s/%s",
PodGUIDFinalizer, pod.Namespace, pod.Name)
}
}
}
deleteMap.UnSafeRemove(networkID)
}
Expand Down Expand Up @@ -686,3 +783,43 @@ func (d *daemon) initPool() error {

return nil
}

// checkIfAnyPodsUsingNetwork checks if there are any pods still using the given network
func (d *daemon) checkIfAnyPodsUsingNetwork(networkNamespace, networkName string) (bool, error) {
pods, err := d.kubeClient.GetPods(kapi.NamespaceAll)
if err != nil {
return false, fmt.Errorf("failed to get pods: %v", err)
}

for i := range pods.Items {
pod := &pods.Items[i]

// Skip pods that are being deleted (have deletion timestamp)
if pod.DeletionTimestamp != nil {
continue
}

if !utils.HasNetworkAttachmentAnnot(pod) {
continue
}

networks, err := netAttUtils.ParsePodNetworkAnnotation(pod)
if err != nil {
continue
}

for _, network := range networks {
// Check if this pod uses the network we're checking
if network.Namespace == networkNamespace && network.Name == networkName {
// Check if this network is configured with InfiniBand and has a GUID
if utils.IsPodNetworkConfiguredWithInfiniBand(network) && utils.PodNetworkHasGUID(network) {
log.Debug().Msgf("Found pod %s/%s still using network %s/%s",
pod.Namespace, pod.Name, networkNamespace, networkName)
return true, nil
}
}
}
}

return false, nil
}
12 changes: 12 additions & 0 deletions pkg/guid/guid_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ func (p *guidPool) Reset(guids []string) error {
// Out of range GUID may be expected and shouldn't be allocated in the pool
continue
}

guidAddr, err := ParseGUID(guid)
if err != nil {
log.Debug().Msgf("error parsing GUID: %s: %v", guid, err)
return err
}

// Check if GUID is already allocated in the pool, if so skip it
if _, exist := p.guidPoolMap[guidAddr]; exist {
continue
}

err = p.AllocateGUID(guid)
if err != nil {
log.Debug().Msgf("error resetting the pool with value: %s: %v", guid, err)
Expand Down
Loading