diff --git a/.go-version b/.go-version new file mode 100644 index 00000000..0aa2042b --- /dev/null +++ b/.go-version @@ -0,0 +1 @@ +go1.22.4 \ No newline at end of file diff --git a/Makefile b/Makefile index d0c58117..97a1996c 100644 --- a/Makefile +++ b/Makefile @@ -149,8 +149,12 @@ test-coverage: | plugins-coverage envtest gocovmerge gcov2lcov ## Run coverage t # Container image .PHONY: image -image: ; $(info Building Docker image...) ## Build conatiner image - $(IMAGE_BUILDER) build -t $(TAG) -f $(DOCKERFILE) $(CURDIR) $(IMAGE_BUILD_OPTS) +image: ; $(info Building Docker image...) ## Build container image + $(IMAGE_BUILDER) build --platform linux/amd64 -t $(TAG) -f $(DOCKERFILE) $(CURDIR) $(IMAGE_BUILD_OPTS) + +.PHONY: docker-push +docker-push: ## Push docker image with the manager. + $(IMAGE_BUILDER) push ${TAG} # Misc diff --git a/deployment/ib-kubernetes-configmap.yaml b/deployment/ib-kubernetes-configmap.yaml index cd95b9d3..de6b6bc6 100644 --- a/deployment/ib-kubernetes-configmap.yaml +++ b/deployment/ib-kubernetes-configmap.yaml @@ -8,3 +8,6 @@ data: DAEMON_PERIODIC_UPDATE: "5" GUID_POOL_RANGE_START: "02:00:00:00:00:00:00:00" GUID_POOL_RANGE_END: "02:FF:FF:FF:FF:FF:FF:FF" + # DEFAULT_LIMITED_PARTITION: "0x0001" # optional + ENABLE_IP_OVER_IB: "false" # default false + ENABLE_INDEX0_FOR_PRIMARY_PKEY: "true" # default true diff --git a/deployment/ib-kubernetes.yaml b/deployment/ib-kubernetes.yaml index 2669f9a8..0b6ae73b 100644 --- a/deployment/ib-kubernetes.yaml +++ b/deployment/ib-kubernetes.yaml @@ -151,3 +151,21 @@ spec: name: ib-kubernetes-ufm-secret key: UFM_CERTIFICATE optional: true + - name: ENABLE_IP_OVER_IB # add + valueFrom: + configMapKeyRef: + key: ENABLE_IP_OVER_IB + name: ib-kubernetes-config + optional: true + - name: DEFAULT_LIMITED_PARTITION # add + valueFrom: + configMapKeyRef: + key: DEFAULT_LIMITED_PARTITION + name: ib-kubernetes-config + optional: true + - name: ENABLE_INDEX0_FOR_PRIMARY_PKEY # add + valueFrom: + configMapKeyRef: + key: ENABLE_INDEX0_FOR_PRIMARY_PKEY + name: ib-kubernetes-config + optional: true diff --git a/example_yamls/nad_example.yaml b/example_yamls/nad_example.yaml new file mode 100644 index 00000000..c926aa5e --- /dev/null +++ b/example_yamls/nad_example.yaml @@ -0,0 +1,30 @@ +apiVersion: k8s.cni.cncf.io/v1 +kind: NetworkAttachmentDefinition +metadata: + annotations: + k8s.v1.cni.cncf.io/resourceName: openshift.io/mlnx_connectx_7_dp_ib + creationTimestamp: "2025-08-15T02:49:29Z" + finalizers: + - ufm.together.ai/guid-cleanup-protection + generation: 1 + name: test-yaron-with-partitions-gpu-dp-mlnx-hca-ib-net-1 + namespace: test-yaron-with-partitions + resourceVersion: "120784040" + uid: bfc1fe50-2950-43b3-84d4-2e5a03b28dd0 +spec: + config: |- + { + "cniVersion": "1.0.0", + "name": "test-yaron-with-partitions-gpu-dp-mlnx-hca-ib-net-1", + "type": "ib-sriov", + "capabilities": { + "infinibandGUID": true, + "ips": true + }, + "pkey": "0x0040", + "link_state": "enable", + "ipam": { + "type": "whereabouts", + "range": "172.20.128.0/17" + } + } diff --git a/example_yamls/pod_example.yaml b/example_yamls/pod_example.yaml new file mode 100644 index 00000000..6c0473f3 --- /dev/null +++ b/example_yamls/pod_example.yaml @@ -0,0 +1,640 @@ +apiVersion: v1 +kind: Pod +metadata: + annotations: + descheduler.alpha.kubernetes.io/request-evict-only: "" + hooks.kubevirt.io/hookSidecars: '[{"image": "public.ecr.aws/k6t4m3l7/kubevirt-sidecar:5616cac"}]' + k8s.v1.cni.cncf.io/network-status: |- + [{ + "name": "kube-ovn", + "interface": "eth0", + "ips": [ + "10.50.0.5" + ], + "mac": "f6:29:df:7d:d5:da", + "default": true, + "dns": {}, + "gateway": [ + "10.50.0.1" + ] + },{ + "name": "test-yaron-with-partitions/test-yaron-with-partitions-gpu-dp-mlnx-hca-ib-net-1", + "interface": "pod6dfd889bb4d", + "ips": [ + "172.20.128.9" + ], + "dns": {}, + "device-info": { + "type": "pci", + "version": "1.1.0", + "pci": { + "pci-address": "0000:e3:00.1" + } + } + },{ + "name": "test-yaron-with-partitions/test-yaron-with-partitions-gpu-dp-mlnx-hca-ib-net-2", + "interface": "pod1b5e3d3aebd", + "ips": [ + "172.20.128.10" + ], + "dns": {}, + "device-info": { + "type": "pci", + "version": "1.1.0", + "pci": { + "pci-address": "0000:9c:00.1" + } + } + },{ + "name": "test-yaron-with-partitions/test-yaron-with-partitions-gpu-dp-mlnx-hca-ib-net-3", + "interface": "pod17c48572395", + "ips": [ + "172.20.128.11" + ], + "dns": {}, + "device-info": { + "type": "pci", + "version": "1.1.0", + "pci": { + "pci-address": "0000:ad:00.1" + } + } + },{ + "name": "test-yaron-with-partitions/test-yaron-with-partitions-gpu-dp-mlnx-hca-ib-net-4", + "interface": "poda8b12b8cd2b", + "ips": [ + "172.20.128.12" + ], + "dns": {}, + "device-info": { + "type": "pci", + "version": "1.1.0", + "pci": { + "pci-address": "0000:1a:00.1" + } + } + },{ + "name": "test-yaron-with-partitions/test-yaron-with-partitions-gpu-dp-mlnx-hca-ib-net-5", + "interface": "podfbdfa9c6092", + "ips": [ + "172.20.128.13" + ], + "dns": {}, + "device-info": { + "type": "pci", + "version": "1.1.0", + "pci": { + "pci-address": "0000:2c:00.1" + } + } + },{ + "name": "test-yaron-with-partitions/test-yaron-with-partitions-gpu-dp-mlnx-hca-ib-net-6", + "interface": "podeb140a2b94b", + "ips": [ + "172.20.128.14" + ], + "dns": {}, + "device-info": { + "type": "pci", + "version": "1.1.0", + "pci": { + "pci-address": "0000:40:00.1" + } + } + },{ + "name": "test-yaron-with-partitions/test-yaron-with-partitions-gpu-dp-mlnx-hca-ib-net-7", + "interface": "pod2c81bc4badb", + "ips": [ + "172.20.128.15" + ], + "dns": {}, + "device-info": { + "type": "pci", + "version": "1.1.0", + "pci": { + "pci-address": "0000:65:00.1" + } + } + },{ + "name": "test-yaron-with-partitions/test-yaron-with-partitions-gpu-dp-mlnx-hca-ib-net-8", + "interface": "pod652b63b70b1", + "ips": [ + "172.20.128.16" + ], + "dns": {}, + "device-info": { + "type": "pci", + "version": "1.1.0", + "pci": { + "pci-address": "0000:c0:00.1" + } + } + }] + k8s.v1.cni.cncf.io/networks: '[{"name":"test-yaron-with-partitions-gpu-dp-mlnx-hca-ib-net-1","namespace":"test-yaron-with-partitions","infiniband-guid":"02:01:00:00:00:00:00:1f","interface":"pod6dfd889bb4d","cni-args":{"mellanox.infiniband.app":"configured"}},{"name":"test-yaron-with-partitions-gpu-dp-mlnx-hca-ib-net-2","namespace":"test-yaron-with-partitions","infiniband-guid":"02:01:00:00:00:00:00:18","interface":"pod1b5e3d3aebd","cni-args":{"mellanox.infiniband.app":"configured"}},{"name":"test-yaron-with-partitions-gpu-dp-mlnx-hca-ib-net-3","namespace":"test-yaron-with-partitions","infiniband-guid":"02:01:00:00:00:00:00:19","interface":"pod17c48572395","cni-args":{"mellanox.infiniband.app":"configured"}},{"name":"test-yaron-with-partitions-gpu-dp-mlnx-hca-ib-net-4","namespace":"test-yaron-with-partitions","infiniband-guid":"02:01:00:00:00:00:00:1a","interface":"poda8b12b8cd2b","cni-args":{"mellanox.infiniband.app":"configured"}},{"name":"test-yaron-with-partitions-gpu-dp-mlnx-hca-ib-net-5","namespace":"test-yaron-with-partitions","infiniband-guid":"02:01:00:00:00:00:00:1b","interface":"podfbdfa9c6092","cni-args":{"mellanox.infiniband.app":"configured"}},{"name":"test-yaron-with-partitions-gpu-dp-mlnx-hca-ib-net-6","namespace":"test-yaron-with-partitions","infiniband-guid":"02:01:00:00:00:00:00:1c","interface":"podeb140a2b94b","cni-args":{"mellanox.infiniband.app":"configured"}},{"name":"test-yaron-with-partitions-gpu-dp-mlnx-hca-ib-net-7","namespace":"test-yaron-with-partitions","infiniband-guid":"02:01:00:00:00:00:00:1d","interface":"pod2c81bc4badb","cni-args":{"mellanox.infiniband.app":"configured"}},{"name":"test-yaron-with-partitions-gpu-dp-mlnx-hca-ib-net-8","namespace":"test-yaron-with-partitions","infiniband-guid":"02:01:00:00:00:00:00:1e","interface":"pod652b63b70b1","cni-args":{"mellanox.infiniband.app":"configured"}}]' + kubectl.kubernetes.io/default-container: compute + kubevirt.io/domain: gpu-dp-zsjr4-dkznf + kubevirt.io/migrationTransportUnix: "true" + kubevirt.io/network-info: '{"interfaces":[{"network":"mlnx_connectx_7_dp_ib-1","deviceInfo":{"type":"pci","version":"1.1.0","pci":{"pci-address":"0000:e3:00.1"}}},{"network":"mlnx_connectx_7_dp_ib-2","deviceInfo":{"type":"pci","version":"1.1.0","pci":{"pci-address":"0000:9c:00.1"}}},{"network":"mlnx_connectx_7_dp_ib-3","deviceInfo":{"type":"pci","version":"1.1.0","pci":{"pci-address":"0000:ad:00.1"}}},{"network":"mlnx_connectx_7_dp_ib-4","deviceInfo":{"type":"pci","version":"1.1.0","pci":{"pci-address":"0000:1a:00.1"}}},{"network":"mlnx_connectx_7_dp_ib-5","deviceInfo":{"type":"pci","version":"1.1.0","pci":{"pci-address":"0000:2c:00.1"}}},{"network":"mlnx_connectx_7_dp_ib-6","deviceInfo":{"type":"pci","version":"1.1.0","pci":{"pci-address":"0000:40:00.1"}}},{"network":"mlnx_connectx_7_dp_ib-7","deviceInfo":{"type":"pci","version":"1.1.0","pci":{"pci-address":"0000:65:00.1"}}},{"network":"mlnx_connectx_7_dp_ib-8","deviceInfo":{"type":"pci","version":"1.1.0","pci":{"pci-address":"0000:c0:00.1"}}}]}' + kubevirt.io/vm-generation: "1" + ovn.kubernetes.io/allocated: "true" + ovn.kubernetes.io/cidr: 10.50.0.0/24 + ovn.kubernetes.io/gateway: 10.50.0.1 + ovn.kubernetes.io/ip_address: 10.50.0.5 + ovn.kubernetes.io/logical_router: ovn-cluster + ovn.kubernetes.io/logical_switch: test-yaron-with-partitions + ovn.kubernetes.io/mac_address: f6:29:df:7d:d5:da + ovn.kubernetes.io/pod_nic_type: veth-pair + ovn.kubernetes.io/routed: "true" + ovn.kubernetes.io/virtualmachine: gpu-dp-zsjr4-dkznf + post.hook.backup.velero.io/command: '["/usr/bin/virt-freezer", "--unfreeze", "--name", + "gpu-dp-zsjr4-dkznf", "--namespace", "test-yaron-with-partitions"]' + post.hook.backup.velero.io/container: compute + pre.hook.backup.velero.io/command: '["/usr/bin/virt-freezer", "--freeze", "--name", + "gpu-dp-zsjr4-dkznf", "--namespace", "test-yaron-with-partitions"]' + pre.hook.backup.velero.io/container: compute + qemuargs.vm.kubevirt.io/args: -fw_cfg name=opt/ovmf/X-PciMmio64Mb,string=2097152 + creationTimestamp: "2025-08-15T03:02:54Z" + finalizers: + - ufm.together.ai/pod-guid-cleanup-protection + generateName: virt-launcher-gpu-dp-zsjr4-dkznf- + generation: 1 + labels: + capk.cluster.x-k8s.io/kubevirt-machine-name: gpu-dp-zsjr4-dkznf + capk.cluster.x-k8s.io/kubevirt-machine-namespace: test-yaron-with-partitions + cluster.x-k8s.io/cluster-name: test-yaron-with-partitions + cluster.x-k8s.io/role: worker + kubevirt.io: virt-launcher + kubevirt.io/created-by: eec3ac39-1823-4d00-a98e-a1e97012fbb9 + kubevirt.io/nodeName: gpu007c.cloud.together.ai + kubevirt.io/vm: gpu-dp-zsjr4-dkznf + name: gpu-dp-zsjr4-dkznf + vm.kubevirt.io/name: gpu-dp-zsjr4-dkznf + name: virt-launcher-gpu-dp-zsjr4-dkznf-zktl2 + namespace: test-yaron-with-partitions + ownerReferences: + - apiVersion: kubevirt.io/v1 + blockOwnerDeletion: true + controller: true + kind: VirtualMachineInstance + name: gpu-dp-zsjr4-dkznf + uid: eec3ac39-1823-4d00-a98e-a1e97012fbb9 + resourceVersion: "120786005" + uid: 1a0c3fb4-6b7d-452b-8ee9-150ad55da19a +spec: + automountServiceAccountToken: false + containers: + - command: + - /usr/bin/virt-launcher-monitor + - --qemu-timeout + - 341s + - --name + - gpu-dp-zsjr4-dkznf + - --uid + - eec3ac39-1823-4d00-a98e-a1e97012fbb9 + - --namespace + - test-yaron-with-partitions + - --kubevirt-share-dir + - /var/run/kubevirt + - --ephemeral-disk-dir + - /var/run/kubevirt-ephemeral-disks + - --container-disk-dir + - /var/run/kubevirt/container-disks + - --grace-period-seconds + - "45" + - --hook-sidecars + - "1" + - --ovmf-path + - /usr/share/OVMF + - --run-as-nonroot + env: + - name: XDG_CACHE_HOME + value: /var/run/kubevirt-private + - name: XDG_CONFIG_HOME + value: /var/run/kubevirt-private + - name: XDG_RUNTIME_DIR + value: /var/run + - name: KUBEVIRT_RESOURCE_NAME_mlnx_connectx_7_dp_ib-2 + value: openshift.io/mlnx_connectx_7_dp_ib + - name: KUBEVIRT_RESOURCE_NAME_mlnx_connectx_7_dp_ib-3 + value: openshift.io/mlnx_connectx_7_dp_ib + - name: KUBEVIRT_RESOURCE_NAME_mlnx_connectx_7_dp_ib-4 + value: openshift.io/mlnx_connectx_7_dp_ib + - name: KUBEVIRT_RESOURCE_NAME_mlnx_connectx_7_dp_ib-5 + value: openshift.io/mlnx_connectx_7_dp_ib + - name: KUBEVIRT_RESOURCE_NAME_mlnx_connectx_7_dp_ib-6 + value: openshift.io/mlnx_connectx_7_dp_ib + - name: KUBEVIRT_RESOURCE_NAME_mlnx_connectx_7_dp_ib-7 + value: openshift.io/mlnx_connectx_7_dp_ib + - name: KUBEVIRT_RESOURCE_NAME_mlnx_connectx_7_dp_ib-8 + value: openshift.io/mlnx_connectx_7_dp_ib + - name: KUBEVIRT_RESOURCE_NAME_mlnx_connectx_7_dp_ib-1 + value: openshift.io/mlnx_connectx_7_dp_ib + - name: POD_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.name + image: quay.io/kubevirt/virt-launcher:v1.4.0 + imagePullPolicy: IfNotPresent + name: compute + ports: + - containerPort: 22 + name: ssh + protocol: TCP + - containerPort: 16443 + name: telegraf + protocol: TCP + resources: + limits: + cpu: "65" + devices.kubevirt.io/kvm: "1" + devices.kubevirt.io/tun: "1" + devices.kubevirt.io/vhost-net: "1" + ephemeral-storage: "32262254720" + hugepages-1Gi: 512Gi + memory: 56246Mi + nvidia.com/GH100_H100_NVSWITCH: "4" + nvidia.com/GH100_H100_SXM5_80GB: "8" + openshift.io/mlnx_connectx_7_dp_ib: "8" + requests: + cpu: "65" + devices.kubevirt.io/kvm: "1" + devices.kubevirt.io/tun: "1" + devices.kubevirt.io/vhost-net: "1" + ephemeral-storage: 50M + hugepages-1Gi: 512Gi + memory: 56246Mi + nvidia.com/GH100_H100_NVSWITCH: "4" + nvidia.com/GH100_H100_SXM5_80GB: "8" + openshift.io/mlnx_connectx_7_dp_ib: "8" + securityContext: + allowPrivilegeEscalation: false + capabilities: + add: + - NET_BIND_SERVICE + drop: + - ALL + privileged: false + runAsGroup: 107 + runAsNonRoot: true + runAsUser: 107 + terminationMessagePath: /dev/termination-log + terminationMessagePolicy: File + volumeMounts: + - mountPath: /var/run/kubevirt-private + name: private + - mountPath: /var/run/kubevirt + name: public + - mountPath: /var/run/kubevirt-ephemeral-disks + name: ephemeral-disks + - mountPath: /var/run/kubevirt/container-disks + mountPropagation: HostToContainer + name: container-disks + - mountPath: /var/run/libvirt + name: libvirt-runtime + - mountPath: /var/run/kubevirt/sockets + name: sockets + - mountPath: /var/run/kubevirt-private/vmi-disks/containervolume + name: containervolume + - mountPath: /var/run/kubevirt-private/vmi-disks/scratch + name: scratch + - mountPath: /var/run/kubevirt-private/secret/cloudinitvolume/userdata + name: cloudinitvolume-udata + readOnly: true + subPath: userdata + - mountPath: /var/run/kubevirt-private/secret/cloudinitvolume/userData + name: cloudinitvolume-udata + readOnly: true + subPath: userData + - mountPath: /var/run/kubevirt-hooks + name: hook-sidecar-sockets + - mountPath: /dev/hugepages + name: hugepages + - mountPath: /dev/hugepages/libvirt/qemu + name: hugetblfs-dir + - mountPath: /var/run/kubevirt/hotplug-disks + mountPropagation: HostToContainer + name: hotplug-disks + - mountPath: /etc/podinfo + name: network-info-annotation + - args: + - --logfile + - /var/run/kubevirt-private/eec3ac39-1823-4d00-a98e-a1e97012fbb9/virt-serial0-log + command: + - /usr/bin/virt-tail + env: + - name: VIRT_LAUNCHER_LOG_VERBOSITY + value: "2" + image: quay.io/kubevirt/virt-launcher:v1.4.0 + imagePullPolicy: IfNotPresent + name: guest-console-log + resources: + limits: + cpu: 15m + memory: 60M + requests: + cpu: 15m + memory: 60M + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + runAsNonRoot: true + runAsUser: 107 + terminationMessagePath: /dev/termination-log + terminationMessagePolicy: File + volumeMounts: + - mountPath: /var/run/kubevirt-private + name: private + readOnly: true + - env: + - name: XDG_CACHE_HOME + value: /var/run/kubevirt-private + - name: XDG_CONFIG_HOME + value: /var/run/kubevirt-private + - name: XDG_RUNTIME_DIR + value: /var/run + image: public.ecr.aws/k6t4m3l7/kubevirt-sidecar:5616cac + imagePullPolicy: IfNotPresent + name: hook-sidecar-0 + resources: + limits: + cpu: 200m + memory: 64M + requests: + cpu: 200m + memory: 64M + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + privileged: false + runAsGroup: 107 + runAsNonRoot: true + runAsUser: 107 + terminationMessagePath: /dev/termination-log + terminationMessagePolicy: File + volumeMounts: + - mountPath: /var/run/kubevirt-hooks + name: hook-sidecar-sockets + dnsPolicy: ClusterFirst + enableServiceLinks: false + hostname: gpu-dp-zsjr4-dkznf + nodeName: gpu007c.cloud.together.ai + nodeSelector: + cpumanager: "true" + kubernetes.io/arch: amd64 + kubevirt.io/schedulable: "true" + preemptionPolicy: PreemptLowerPriority + priority: 0 + readinessGates: + - conditionType: kubevirt.io/virtual-machine-unpaused + restartPolicy: Never + schedulerName: default-scheduler + securityContext: + fsGroup: 107 + runAsGroup: 107 + runAsNonRoot: true + runAsUser: 107 + serviceAccount: default + serviceAccountName: default + terminationGracePeriodSeconds: 60 + tolerations: + - effect: NoExecute + key: node.kubernetes.io/not-ready + operator: Exists + tolerationSeconds: 300 + - effect: NoExecute + key: node.kubernetes.io/unreachable + operator: Exists + tolerationSeconds: 300 + volumes: + - emptyDir: {} + name: private + - emptyDir: {} + name: public + - emptyDir: {} + name: sockets + - emptyDir: {} + name: virt-bin-share-dir + - emptyDir: {} + name: libvirt-runtime + - emptyDir: {} + name: ephemeral-disks + - emptyDir: {} + name: container-disks + - name: containervolume + persistentVolumeClaim: + claimName: gpu-dp-zsjr4-dkznf-ubuntu-22.04-dp-dv + - name: scratch + persistentVolumeClaim: + claimName: gpu-dp-zsjr4-dkznf-scratch + - name: cloudinitvolume-udata + secret: + defaultMode: 420 + secretName: gpu-dp-zsjr4-dkznf-userdata + - emptyDir: {} + name: hook-sidecar-sockets + - emptyDir: + medium: HugePages + name: hugepages + - emptyDir: {} + name: hugetblfs-dir + - emptyDir: {} + name: hotplug-disks + - downwardAPI: + defaultMode: 420 + items: + - fieldRef: + apiVersion: v1 + fieldPath: metadata.annotations['kubevirt.io/network-info'] + path: network-info + name: network-info-annotation +status: + conditions: + - lastProbeTime: "2025-08-15T03:02:55Z" + lastTransitionTime: "2025-08-15T03:02:55Z" + message: the virtual machine is not paused + reason: NotPaused + status: "True" + type: kubevirt.io/virtual-machine-unpaused + - lastProbeTime: null + lastTransitionTime: "2025-08-15T03:03:08Z" + status: "True" + type: PodReadyToStartContainers + - lastProbeTime: null + lastTransitionTime: "2025-08-15T03:02:55Z" + status: "True" + type: Initialized + - lastProbeTime: null + lastTransitionTime: "2025-08-15T03:03:08Z" + status: "True" + type: Ready + - lastProbeTime: null + lastTransitionTime: "2025-08-15T03:03:08Z" + status: "True" + type: ContainersReady + - lastProbeTime: null + lastTransitionTime: "2025-08-15T03:02:54Z" + status: "True" + type: PodScheduled + containerStatuses: + - allocatedResources: + cpu: "65" + devices.kubevirt.io/kvm: "1" + devices.kubevirt.io/tun: "1" + devices.kubevirt.io/vhost-net: "1" + ephemeral-storage: 50M + hugepages-1Gi: 512Gi + memory: 56246Mi + nvidia.com/GH100_H100_NVSWITCH: "4" + nvidia.com/GH100_H100_SXM5_80GB: "8" + openshift.io/mlnx_connectx_7_dp_ib: "8" + containerID: containerd://97e20e4b88fa3e58c57c3b645d29084d9a983a2e4d39cb7c20913c8a8bb5f447 + image: quay.io/kubevirt/virt-launcher:v1.4.0 + imageID: quay.io/kubevirt/virt-launcher@sha256:18508d8767f8530df4d6aca4ffe467c40f9891fbe052548b3b2316f8bfb1bc38 + lastState: {} + name: compute + ready: true + resources: + limits: + cpu: "65" + devices.kubevirt.io/kvm: "1" + devices.kubevirt.io/tun: "1" + devices.kubevirt.io/vhost-net: "1" + ephemeral-storage: "32262254720" + hugepages-1Gi: 512Gi + memory: 56246Mi + nvidia.com/GH100_H100_NVSWITCH: "4" + nvidia.com/GH100_H100_SXM5_80GB: "8" + openshift.io/mlnx_connectx_7_dp_ib: "8" + requests: + cpu: "65" + devices.kubevirt.io/kvm: "1" + devices.kubevirt.io/tun: "1" + devices.kubevirt.io/vhost-net: "1" + ephemeral-storage: 50M + hugepages-1Gi: 512Gi + memory: 56246Mi + nvidia.com/GH100_H100_NVSWITCH: "4" + nvidia.com/GH100_H100_SXM5_80GB: "8" + openshift.io/mlnx_connectx_7_dp_ib: "8" + restartCount: 0 + started: true + state: + running: + startedAt: "2025-08-15T03:03:07Z" + user: + linux: + gid: 107 + supplementalGroups: + - 107 + uid: 107 + volumeMounts: + - mountPath: /var/run/kubevirt-private + name: private + - mountPath: /var/run/kubevirt + name: public + - mountPath: /var/run/kubevirt-ephemeral-disks + name: ephemeral-disks + - mountPath: /var/run/kubevirt/container-disks + name: container-disks + - mountPath: /var/run/libvirt + name: libvirt-runtime + - mountPath: /var/run/kubevirt/sockets + name: sockets + - mountPath: /var/run/kubevirt-private/vmi-disks/containervolume + name: containervolume + - mountPath: /var/run/kubevirt-private/vmi-disks/scratch + name: scratch + - mountPath: /var/run/kubevirt-private/secret/cloudinitvolume/userdata + name: cloudinitvolume-udata + readOnly: true + recursiveReadOnly: Disabled + - mountPath: /var/run/kubevirt-private/secret/cloudinitvolume/userData + name: cloudinitvolume-udata + readOnly: true + recursiveReadOnly: Disabled + - mountPath: /var/run/kubevirt-hooks + name: hook-sidecar-sockets + - mountPath: /dev/hugepages + name: hugepages + - mountPath: /dev/hugepages/libvirt/qemu + name: hugetblfs-dir + - mountPath: /var/run/kubevirt/hotplug-disks + name: hotplug-disks + - mountPath: /etc/podinfo + name: network-info-annotation + - allocatedResources: + cpu: 15m + memory: 60M + containerID: containerd://e526ca887d152a084c08263c511dd18f1d02b71dfec1ad007f4821255401060b + image: quay.io/kubevirt/virt-launcher:v1.4.0 + imageID: quay.io/kubevirt/virt-launcher@sha256:18508d8767f8530df4d6aca4ffe467c40f9891fbe052548b3b2316f8bfb1bc38 + lastState: {} + name: guest-console-log + ready: true + resources: + limits: + cpu: 15m + memory: 60M + requests: + cpu: 15m + memory: 60M + restartCount: 0 + started: true + state: + running: + startedAt: "2025-08-15T03:03:08Z" + user: + linux: + gid: 107 + supplementalGroups: + - 107 + uid: 107 + volumeMounts: + - mountPath: /var/run/kubevirt-private + name: private + readOnly: true + recursiveReadOnly: Disabled + - allocatedResources: + cpu: 200m + memory: 64M + containerID: containerd://4b37959c9c547e03fb0de71d9933e1a2a2b93322a91f56a9c425bf0773039c77 + image: public.ecr.aws/k6t4m3l7/kubevirt-sidecar:5616cac + imageID: public.ecr.aws/k6t4m3l7/kubevirt-sidecar@sha256:fb145d3e83575de0ade1a26a8fd8979b86b8177c5862ecd2dba5aa6da1b927da + lastState: {} + name: hook-sidecar-0 + ready: true + resources: + limits: + cpu: 200m + memory: 62500Ki + requests: + cpu: 200m + memory: 64M + restartCount: 0 + started: true + state: + running: + startedAt: "2025-08-15T03:03:08Z" + user: + linux: + gid: 107 + supplementalGroups: + - 107 + uid: 107 + volumeMounts: + - mountPath: /var/run/kubevirt-hooks + name: hook-sidecar-sockets + hostIP: 10.49.5.7 + hostIPs: + - ip: 10.49.5.7 + phase: Running + podIP: 10.50.0.5 + podIPs: + - ip: 10.50.0.5 + qosClass: Guaranteed + startTime: "2025-08-15T03:02:55Z" diff --git a/pkg/config/config.go b/pkg/config/config.go index eb160cb2..ac9ce528 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -15,6 +15,12 @@ type DaemonConfig struct { Plugin string `env:"DAEMON_SM_PLUGIN"` // Subnet manager plugins path PluginPath string `env:"DAEMON_SM_PLUGIN_PATH" envDefault:"/plugins"` + // Default partition key for limited membership + DefaultLimitedPartition string `env:"DEFAULT_LIMITED_PARTITION"` + // Enable IP over IB functionality + EnableIPOverIB bool `env:"ENABLE_IP_OVER_IB" envDefault:"false"` + // Enable index0 for primary pkey GUID additions + EnableIndex0ForPrimaryPkey bool `env:"ENABLE_INDEX0_FOR_PRIMARY_PKEY" envDefault:"true"` } type GUIDPoolConfig struct { @@ -28,6 +34,27 @@ func (dc *DaemonConfig) ReadConfig() error { log.Debug().Msg("Reading configuration environment variables") err := env.Parse(dc) + // If IP over IB enabled - log at startup + if dc.EnableIPOverIB { + log.Warn().Msg("New partitions will be created with IP over IB enabled.") + } else { + log.Info().Msg("New partitions will be created with IP over IB disabled.") + } + + // If index0 for primary pkey enabled - log at startup + if dc.EnableIndex0ForPrimaryPkey { + log.Info().Msg("Primary pkey GUID additions will be created with index0 enabled.") + } else { + log.Info().Msg("Primary pkey GUID additions will be created with index0 disabled.") + } + + // If default limited partition is set - log at startup + if dc.DefaultLimitedPartition != "" { + log.Info().Msgf("Default limited partition is set to %s. New GUIDs will be added as limited members to this partition.", dc.DefaultLimitedPartition) + } else { + log.Info().Msg("Default limited partition is not set.") + } + return err } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index e3d12820..6f330d47 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -20,6 +20,8 @@ var _ = Describe("Configuration", func() { Expect(os.Setenv("GUID_POOL_RANGE_END", "02:00:00:00:00:00:00:FF")).ToNot(HaveOccurred()) Expect(os.Setenv("DAEMON_SM_PLUGIN", "ufm")).ToNot(HaveOccurred()) Expect(os.Setenv("DAEMON_SM_PLUGIN_PATH", "/custom/plugins/location")).ToNot(HaveOccurred()) + Expect(os.Setenv("DEFAULT_LIMITED_PARTITION", "0x2")).ToNot(HaveOccurred()) + Expect(os.Setenv("ENABLE_IP_OVER_IB", "true")).ToNot(HaveOccurred()) err := dc.ReadConfig() Expect(err).ToNot(HaveOccurred()) @@ -28,6 +30,8 @@ var _ = Describe("Configuration", func() { Expect(dc.GUIDPool.RangeEnd).To(Equal("02:00:00:00:00:00:00:FF")) Expect(dc.Plugin).To(Equal("ufm")) Expect(dc.PluginPath).To(Equal("/custom/plugins/location")) + Expect(dc.DefaultLimitedPartition).To(Equal("0x2")) + Expect(dc.EnableIPOverIB).To(BeTrue()) }) It("Read configuration with default values", func() { dc := &DaemonConfig{} @@ -40,6 +44,20 @@ var _ = Describe("Configuration", func() { Expect(dc.GUIDPool.RangeEnd).To(Equal("02:FF:FF:FF:FF:FF:FF:FF")) Expect(dc.Plugin).To(Equal("ufm")) Expect(dc.PluginPath).To(Equal("/plugins")) + Expect(dc.DefaultLimitedPartition).To(Equal("")) // Default should be empty + Expect(dc.EnableIPOverIB).To(BeFalse()) // Default should be false + }) + It("Read configuration with new environment variables", func() { + dc := &DaemonConfig{} + + Expect(os.Setenv("DAEMON_SM_PLUGIN", "ufm")).ToNot(HaveOccurred()) + Expect(os.Setenv("DEFAULT_LIMITED_PARTITION", "0x1")).ToNot(HaveOccurred()) + Expect(os.Setenv("ENABLE_IP_OVER_IB", "true")).ToNot(HaveOccurred()) + + err := dc.ReadConfig() + Expect(err).ToNot(HaveOccurred()) + Expect(dc.DefaultLimitedPartition).To(Equal("0x1")) + Expect(dc.EnableIPOverIB).To(BeTrue()) }) }) Context("ValidateConfig", func() { diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 01d6086c..7ef6d90d 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -30,6 +30,7 @@ import ( ) const GUIDInUFMFinalizer = "ufm.together.ai/guid-cleanup-protection" +const PodGUIDFinalizer = "ufm.together.ai/pod-guid-cleanup-protection" type Daemon interface { // Execute Daemon loop, returns when os.Interrupt signal is received @@ -109,6 +110,16 @@ func NewDaemon() (Daemon, error) { return nil, err } + // Pass configuration from daemon to the plugin + pluginConfig := map[string]interface{}{ + "ENABLE_IP_OVER_IB": daemonConfig.EnableIPOverIB, + "DEFAULT_LIMITED_PARTITION": daemonConfig.DefaultLimitedPartition, + "ENABLE_INDEX0_FOR_PRIMARY_PKEY": daemonConfig.EnableIndex0ForPrimaryPkey, + } + if err := smClient.SetConfig(pluginConfig); err != nil { + log.Warn().Msgf("Failed to set configuration on subnet manager plugin: %v", err) + } + // Try to validate if subnet manager is reachable in backoff loop var validateErr error if err := wait.ExponentialBackoff(backoffValues, func() (bool, error) { @@ -233,6 +244,144 @@ func getPodNetworkInfo(netName string, pod *kapi.Pod, netMap networksMap) (*podN }, nil } +// addPodFinalizer adds the GUID cleanup finalizer to a pod +func (d *daemon) addPodFinalizer(pod *kapi.Pod) error { + return wait.ExponentialBackoff(backoffValues, func() (bool, error) { + if err := d.kubeClient.AddFinalizerToPod(pod, PodGUIDFinalizer); err != nil { + log.Warn().Msgf("failed to add finalizer to pod %s/%s: %v", + pod.Namespace, pod.Name, err) + return false, nil + } + return true, nil + }) +} + +// removePodFinalizer removes the GUID cleanup finalizer from a pod +func (d *daemon) removePodFinalizer(pod *kapi.Pod) error { + return wait.ExponentialBackoff(backoffValues, func() (bool, error) { + if err := d.kubeClient.RemoveFinalizerFromPod(pod, PodGUIDFinalizer); err != nil { + log.Warn().Msgf("failed to remove finalizer from pod %s/%s: %v", + pod.Namespace, pod.Name, err) + return false, nil + } + return true, nil + }) +} + +// addNADFinalizer adds the GUID cleanup finalizer to a NetworkAttachmentDefinition +func (d *daemon) addNADFinalizer(networkNamespace, networkName string) error { + return wait.ExponentialBackoff(backoffValues, func() (bool, error) { + if err := d.kubeClient.AddFinalizerToNetworkAttachmentDefinition( + networkNamespace, networkName, GUIDInUFMFinalizer); err != nil { + log.Warn().Msgf("failed to add finalizer to NetworkAttachmentDefinition %s/%s: %v", + networkNamespace, networkName, err) + return false, nil + } + return true, nil + }) +} + +// removeNADFinalizerIfSafe removes the finalizer from NAD only if no pods are using the network +func (d *daemon) removeNADFinalizerIfSafe(networkNamespace, networkName string) error { + podsUsingNetwork, err := d.checkIfAnyPodsUsingNetwork(networkNamespace, networkName) + if err != nil { + return fmt.Errorf("failed to check if pods are still using network %s/%s: %v", + networkNamespace, networkName, err) + } + + if podsUsingNetwork { + log.Info().Msgf("NAD finalizer not removed from %s/%s - other pods still using this network", + networkNamespace, networkName) + return nil + } + + return wait.ExponentialBackoff(backoffValues, func() (bool, error) { + if err := d.kubeClient.RemoveFinalizerFromNetworkAttachmentDefinition( + networkNamespace, networkName, GUIDInUFMFinalizer); err != nil { + log.Warn().Msgf("failed to remove finalizer from NetworkAttachmentDefinition %s/%s: %v", + networkNamespace, networkName, err) + return false, nil + } + return true, nil + }) +} + +// addGUIDsToPKeyWithLimitedPartition adds GUIDs to both main pkey and limited partition if configured +func (d *daemon) addGUIDsToPKeyWithLimitedPartition(pKey int, guidList []net.HardwareAddr) error { + // Add to main pKey + if err := wait.ExponentialBackoff(backoffValues, func() (bool, error) { + if err := d.smClient.AddGuidsToPKey(pKey, guidList); err != nil { + log.Warn().Msgf("failed to config pKey with subnet manager %s with error : %v", + d.smClient.Name(), err) + return false, nil + } + return true, nil + }); err != nil { + return fmt.Errorf("failed to config pKey with subnet manager %s", d.smClient.Name()) + } + + // Add to limited partition if configured + if d.config.DefaultLimitedPartition != "" { + limitedPKey, err := utils.ParsePKey(d.config.DefaultLimitedPartition) + if err != nil { + log.Error().Msgf("failed to parse DEFAULT_LIMITED_PARTITION %s: %v", d.config.DefaultLimitedPartition, err) + } else { + if err := wait.ExponentialBackoff(backoffValues, func() (bool, error) { + if err := d.smClient.AddGuidsToLimitedPKey(limitedPKey, guidList); err != nil { + log.Warn().Msgf("failed to add GUIDs to limited partition 0x%04X with subnet manager %s with error: %v", + limitedPKey, d.smClient.Name(), err) + return false, nil + } + return true, nil + }); err != nil { + log.Error().Msgf("failed to add GUIDs to limited partition 0x%04X with subnet manager %s", limitedPKey, d.smClient.Name()) + } else { + log.Info().Msgf("successfully added GUIDs %v to limited partition 0x%04X", guidList, limitedPKey) + } + } + } + + return nil +} + +// removeGUIDsFromPKeyWithLimitedPartition removes GUIDs from both main pkey and limited partition if configured +func (d *daemon) removeGUIDsFromPKeyWithLimitedPartition(pKey int, guidList []net.HardwareAddr) error { + // Remove from main pKey + if err := wait.ExponentialBackoff(backoffValues, func() (bool, error) { + if err := d.smClient.RemoveGuidsFromPKey(pKey, guidList); err != nil { + log.Warn().Msgf("failed to remove guids from pKey 0x%04X with subnet manager %s with error: %v", + pKey, d.smClient.Name(), err) + return false, nil + } + return true, nil + }); err != nil { + return fmt.Errorf("failed to remove guids from pKey 0x%04X with subnet manager %s", pKey, d.smClient.Name()) + } + + // Remove from limited partition if configured + if d.config.DefaultLimitedPartition != "" { + limitedPKey, err := utils.ParsePKey(d.config.DefaultLimitedPartition) + if err != nil { + log.Error().Msgf("failed to parse DEFAULT_LIMITED_PARTITION %s: %v", d.config.DefaultLimitedPartition, err) + } else { + if err := wait.ExponentialBackoff(backoffValues, func() (bool, error) { + if err := d.smClient.RemoveGuidsFromPKey(limitedPKey, guidList); err != nil { + log.Warn().Msgf("failed to remove GUIDs from limited partition 0x%04X with subnet manager %s with error: %v", + limitedPKey, d.smClient.Name(), err) + return false, nil + } + return true, nil + }); err != nil { + log.Error().Msgf("failed to remove GUIDs from limited partition 0x%04X with subnet manager %s", limitedPKey, d.smClient.Name()) + } else { + log.Info().Msgf("successfully removed GUIDs %v from limited partition 0x%04X", guidList, limitedPKey) + } + } + } + + return nil +} + // Verify if GUID already exist for given network ID and allocates new one if not func (d *daemon) allocatePodNetworkGUID(allocatedGUID, podNetworkID string, podUID types.UID) error { if mappedID, exist := d.guidPodNetworkMap[allocatedGUID]; exist { @@ -407,6 +556,15 @@ func (d *daemon) AddPeriodicUpdate() { continue } + // Add finalizer to pod since it now has a GUID that needs cleanup + if err = d.addPodFinalizer(pi.pod); err != nil { + log.Error().Msgf("failed to add finalizer to pod %s/%s: %v", pi.pod.Namespace, pi.pod.Name, err) + continue + } else { + log.Info().Msgf("added finalizer %s to pod %s/%s", + PodGUIDFinalizer, pi.pod.Namespace, pi.pod.Name) + } + guidList = append(guidList, pi.addr) passedPods = append(passedPods, pi) } @@ -420,35 +578,20 @@ func (d *daemon) AddPeriodicUpdate() { continue } - // Try to add pKeys via subnet manager in backoff loop - if err = wait.ExponentialBackoff(backoffValues, func() (bool, error) { - if err = d.smClient.AddGuidsToPKey(pKey, guidList); err != nil { - log.Warn().Msgf("failed to config pKey with subnet manager %s with error : %v", - d.smClient.Name(), err) - return false, nil - } - return true, nil - }); err != nil { - log.Error().Msgf("failed to config pKey with subnet manager %s", d.smClient.Name()) + // Add GUIDs to pKeys (main and limited partition) + if err = d.addGUIDsToPKeyWithLimitedPartition(pKey, guidList); err != nil { + log.Error().Msgf("%v", err) continue + } + + // Add finalizer to NetworkAttachmentDefinition + networkNamespace, networkName, _ := utils.ParseNetworkID(networkID) + if err := d.addNADFinalizer(networkNamespace, networkName); err != nil { + log.Error().Msgf("failed to add finalizer to NetworkAttachmentDefinition %s/%s: %v", + networkNamespace, networkName, err) } else { - // AddGuidsToPKey successful, add finalizer to NetworkAttachmentDefinition - networkNamespace, networkName, _ := utils.ParseNetworkID(networkID) - if err := wait.ExponentialBackoff(backoffValues, func() (bool, error) { - if err := d.kubeClient.AddFinalizerToNetworkAttachmentDefinition( - networkNamespace, networkName, GUIDInUFMFinalizer); err != nil { - log.Warn().Msgf("failed to add finalizer to NetworkAttachmentDefinition %s/%s: %v", - networkNamespace, networkName, err) - return false, nil - } - return true, nil - }); err != nil { - log.Error().Msgf("failed to add finalizer to NetworkAttachmentDefinition %s/%s", - networkNamespace, networkName) - } else { - log.Info().Msgf("added finalizer %s to NetworkAttachmentDefinition %s/%s", - GUIDInUFMFinalizer, networkNamespace, networkName) - } + log.Info().Msgf("added finalizer %s to NetworkAttachmentDefinition %s/%s", + GUIDInUFMFinalizer, networkNamespace, networkName) } } @@ -466,38 +609,14 @@ func (d *daemon) AddPeriodicUpdate() { // Already check the parse above pKey, _ := utils.ParsePKey(ibCniSpec.PKey) - // Try to remove pKeys via subnet manager in backoff loop - if err = wait.ExponentialBackoff(backoffValues, func() (bool, error) { - if err = d.smClient.RemoveGuidsFromPKey(pKey, removedGUIDList); err != nil { - log.Warn().Msgf("failed to remove guids of removed pods from pKey %s"+ - " with subnet manager %s with error: %v", ibCniSpec.PKey, - d.smClient.Name(), err) - return false, nil - } - return true, nil - }); err != nil { - log.Warn().Msgf("failed to remove guids of removed pods from pKey %s"+ - " with subnet manager %s", ibCniSpec.PKey, d.smClient.Name()) + // Remove GUIDs from pKeys (main and limited partition) + if err = d.removeGUIDsFromPKeyWithLimitedPartition(pKey, removedGUIDList); err != nil { + log.Warn().Msgf("%v", err) continue - } else { - // RemoveGuidsFromPKey successful, remove finalizer from NetworkAttachmentDefinition - networkNamespace, networkName, _ := utils.ParseNetworkID(networkID) - if err := wait.ExponentialBackoff(backoffValues, func() (bool, error) { - if err := d.kubeClient.RemoveFinalizerFromNetworkAttachmentDefinition( - networkNamespace, networkName, GUIDInUFMFinalizer); err != nil { - log.Warn().Msgf("failed to remove finalizer from NetworkAttachmentDefinition %s/%s: %v", - networkNamespace, networkName, err) - return false, nil - } - return true, nil - }); err != nil { - log.Error().Msgf("failed to remove finalizer from NetworkAttachmentDefinition %s/%s", - networkNamespace, networkName) - } else { - log.Info().Msgf("removed finalizer %s from NetworkAttachmentDefinition %s/%s", - GUIDInUFMFinalizer, networkNamespace, networkName) - } } + + // Note: NAD finalizer is not removed here during pod addition + // It will only be removed during pod deletion when all pods using this NAD are cleaned up } addMap.UnSafeRemove(networkID) @@ -562,6 +681,7 @@ func (d *daemon) DeletePeriodicUpdate() { } var guidList []net.HardwareAddr + var podGUIDMap = make(map[string]*kapi.Pod) // maps GUID string to pod var guidAddr net.HardwareAddr for _, pod := range pods { log.Debug().Msgf("pod namespace %s name %s", pod.Namespace, pod.Name) @@ -572,6 +692,7 @@ func (d *daemon) DeletePeriodicUpdate() { } guidList = append(guidList, guidAddr) + podGUIDMap[guidAddr.String()] = pod } if ibCniSpec.PKey != "" && len(guidList) != 0 { @@ -581,37 +702,19 @@ func (d *daemon) DeletePeriodicUpdate() { continue } - // Try to remove pKeys via subnet manager on backoff loop - if err = wait.ExponentialBackoff(backoffValues, func() (bool, error) { - if err = d.smClient.RemoveGuidsFromPKey(pKey, guidList); err != nil { - log.Warn().Msgf("failed to remove guids of removed pods from pKey %s"+ - " with subnet manager %s with error: %v", ibCniSpec.PKey, - d.smClient.Name(), err) - return false, nil - } - return true, nil - }); err != nil { - log.Warn().Msgf("failed to remove guids of removed pods from pKey %s"+ - " with subnet manager %s", ibCniSpec.PKey, d.smClient.Name()) + // Remove GUIDs from pKeys (main and limited partition) + if err = d.removeGUIDsFromPKeyWithLimitedPartition(pKey, guidList); err != nil { + log.Warn().Msgf("%v", err) continue + } + + // Check if NAD finalizer can be safely removed + networkNamespace, networkName, _ := utils.ParseNetworkID(networkID) + if err := d.removeNADFinalizerIfSafe(networkNamespace, networkName); err != nil { + log.Error().Msgf("failed to remove NAD finalizer for %s/%s: %v", networkNamespace, networkName, err) } else { - // RemoveGuidsFromPKey successful, remove finalizer from NetworkAttachmentDefinition - networkNamespace, networkName, _ := utils.ParseNetworkID(networkID) - if err := wait.ExponentialBackoff(backoffValues, func() (bool, error) { - if err := d.kubeClient.RemoveFinalizerFromNetworkAttachmentDefinition( - networkNamespace, networkName, GUIDInUFMFinalizer); err != nil { - log.Warn().Msgf("failed to remove finalizer from NetworkAttachmentDefinition %s/%s: %v", - networkNamespace, networkName, err) - return false, nil - } - return true, nil - }); err != nil { - log.Error().Msgf("failed to remove finalizer from NetworkAttachmentDefinition %s/%s", - networkNamespace, networkName) - } else { - log.Info().Msgf("removed finalizer %s from NetworkAttachmentDefinition %s/%s", - GUIDInUFMFinalizer, networkNamespace, networkName) - } + log.Info().Msgf("checked and potentially removed finalizer %s from NetworkAttachmentDefinition %s/%s", + GUIDInUFMFinalizer, networkNamespace, networkName) } } @@ -622,6 +725,16 @@ func (d *daemon) DeletePeriodicUpdate() { } delete(d.guidPodNetworkMap, guidAddr.String()) + + // Remove finalizer from pod after successfully cleaning up GUID + if pod, exists := podGUIDMap[guidAddr.String()]; exists { + if err = d.removePodFinalizer(pod); err != nil { + log.Error().Msgf("failed to remove finalizer from pod %s/%s: %v", pod.Namespace, pod.Name, err) + } else { + log.Info().Msgf("removed finalizer %s from pod %s/%s", + PodGUIDFinalizer, pod.Namespace, pod.Name) + } + } } deleteMap.UnSafeRemove(networkID) } @@ -686,3 +799,43 @@ func (d *daemon) initPool() error { return nil } + +// checkIfAnyPodsUsingNetwork checks if there are any pods still using the given network +func (d *daemon) checkIfAnyPodsUsingNetwork(networkNamespace, networkName string) (bool, error) { + pods, err := d.kubeClient.GetPods(kapi.NamespaceAll) + if err != nil { + return false, fmt.Errorf("failed to get pods: %v", err) + } + + for i := range pods.Items { + pod := &pods.Items[i] + + // Skip pods that are being deleted (have deletion timestamp) + if pod.DeletionTimestamp != nil { + continue + } + + if !utils.HasNetworkAttachmentAnnot(pod) { + continue + } + + networks, err := netAttUtils.ParsePodNetworkAnnotation(pod) + if err != nil { + continue + } + + for _, network := range networks { + // Check if this pod uses the network we're checking + if network.Namespace == networkNamespace && network.Name == networkName { + // Check if this network is configured with InfiniBand and has a GUID + if utils.IsPodNetworkConfiguredWithInfiniBand(network) && utils.PodNetworkHasGUID(network) { + log.Debug().Msgf("Found pod %s/%s still using network %s/%s", + pod.Namespace, pod.Name, networkNamespace, networkName) + return true, nil + } + } + } + } + + return false, nil +} diff --git a/pkg/guid/guid_pool.go b/pkg/guid/guid_pool.go index a59b159f..cfa486bf 100644 --- a/pkg/guid/guid_pool.go +++ b/pkg/guid/guid_pool.go @@ -75,6 +75,18 @@ func (p *guidPool) Reset(guids []string) error { // Out of range GUID may be expected and shouldn't be allocated in the pool continue } + + guidAddr, err := ParseGUID(guid) + if err != nil { + log.Debug().Msgf("error parsing GUID: %s: %v", guid, err) + return err + } + + // Check if GUID is already allocated in the pool, if so skip it + if _, exist := p.guidPoolMap[guidAddr]; exist { + continue + } + err = p.AllocateGUID(guid) if err != nil { log.Debug().Msgf("error resetting the pool with value: %s: %v", guid, err) diff --git a/pkg/k8s-client/client.go b/pkg/k8s-client/client.go index f744a230..6b3452ec 100644 --- a/pkg/k8s-client/client.go +++ b/pkg/k8s-client/client.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "time" netapi "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1" netclient "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned/typed/k8s.cni.cncf.io/v1" @@ -11,6 +12,7 @@ import ( kapi "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client/config" @@ -24,6 +26,8 @@ type Client interface { GetRestClient() rest.Interface AddFinalizerToNetworkAttachmentDefinition(namespace, name, finalizer string) error RemoveFinalizerFromNetworkAttachmentDefinition(namespace, name, finalizer string) error + AddFinalizerToPod(pod *kapi.Pod, finalizer string) error + RemoveFinalizerFromPod(pod *kapi.Pod, finalizer string) error } type client struct { @@ -31,6 +35,8 @@ type client struct { netClient netclient.K8sCniCncfIoV1Interface } +var backoffValues = wait.Backoff{Duration: 1 * time.Second, Factor: 1.6, Jitter: 0.1, Steps: 6} + // NewK8sClient returns a kubernetes client func NewK8sClient() (Client, error) { // Get a config to talk to the api server @@ -154,3 +160,68 @@ func (c *client) RemoveFinalizerFromNetworkAttachmentDefinition(namespace, name, context.Background(), netAttDef, metav1.UpdateOptions{}) return err } + +// AddFinalizerToPod adds a finalizer to a Pod +func (c *client) AddFinalizerToPod(pod *kapi.Pod, finalizer string) error { + // Get the latest version of the pod + currentPod, err := c.clientset.CoreV1().Pods(pod.Namespace).Get( + context.Background(), pod.Name, metav1.GetOptions{}) + if err != nil { + return err + } + + // Check if finalizer already exists + for _, existingFinalizer := range currentPod.Finalizers { + if existingFinalizer == finalizer { + return nil // Finalizer already exists, nothing to do + } + } + + // Add the finalizer + currentPod.Finalizers = append(currentPod.Finalizers, finalizer) + + // Update the Pod with retry and backoff + err = wait.ExponentialBackoff(backoffValues, func() (bool, error) { + _, err = c.clientset.CoreV1().Pods(pod.Namespace).Update( + context.Background(), currentPod, metav1.UpdateOptions{}) + return err == nil, nil + }) + return err +} + +// RemoveFinalizerFromPod removes a finalizer from a Pod +func (c *client) RemoveFinalizerFromPod(pod *kapi.Pod, finalizer string) error { + // Get the latest version of the pod + currentPod, err := c.clientset.CoreV1().Pods(pod.Namespace).Get( + context.Background(), pod.Name, metav1.GetOptions{}) + if err != nil { + return err + } + + // Check if finalizer exists and remove it + var found bool + var updatedFinalizers []string + for _, existingFinalizer := range currentPod.Finalizers { + if existingFinalizer != finalizer { + updatedFinalizers = append(updatedFinalizers, existingFinalizer) + } else { + found = true + } + } + + // If finalizer wasn't found, nothing to do + if !found { + return nil + } + + // Update finalizers + currentPod.Finalizers = updatedFinalizers + + // Update the Pod with retry and backoff + err = wait.ExponentialBackoff(backoffValues, func() (bool, error) { + _, err = c.clientset.CoreV1().Pods(pod.Namespace).Update( + context.Background(), currentPod, metav1.UpdateOptions{}) + return err == nil, nil + }) + return err +} diff --git a/pkg/k8s-client/mocks/Client.go b/pkg/k8s-client/mocks/Client.go index 771273d5..e78ef67f 100644 --- a/pkg/k8s-client/mocks/Client.go +++ b/pkg/k8s-client/mocks/Client.go @@ -90,6 +90,34 @@ func (_m *Client) PatchPod(pod *corev1.Pod, patchType types.PatchType, patchData return r0 } +// AddFinalizerToNetworkAttachmentDefinition provides a mock function with given fields: namespace, name, finalizer +func (_m *Client) AddFinalizerToNetworkAttachmentDefinition(namespace string, name string, finalizer string) error { + ret := _m.Called(namespace, name, finalizer) + + var r0 error + if rf, ok := ret.Get(0).(func(string, string, string) error); ok { + r0 = rf(namespace, name, finalizer) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// RemoveFinalizerFromNetworkAttachmentDefinition provides a mock function with given fields: namespace, name, finalizer +func (_m *Client) RemoveFinalizerFromNetworkAttachmentDefinition(namespace string, name string, finalizer string) error { + ret := _m.Called(namespace, name, finalizer) + + var r0 error + if rf, ok := ret.Get(0).(func(string, string, string) error); ok { + r0 = rf(namespace, name, finalizer) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // SetAnnotationsOnPod provides a mock function with given fields: pod, annotations func (_m *Client) SetAnnotationsOnPod(pod *corev1.Pod, annotations map[string]string) error { ret := _m.Called(pod, annotations) @@ -103,3 +131,31 @@ func (_m *Client) SetAnnotationsOnPod(pod *corev1.Pod, annotations map[string]st return r0 } + +// AddFinalizerToPod provides a mock function with given fields: pod, finalizer +func (_m *Client) AddFinalizerToPod(pod *corev1.Pod, finalizer string) error { + ret := _m.Called(pod, finalizer) + + var r0 error + if rf, ok := ret.Get(0).(func(*corev1.Pod, string) error); ok { + r0 = rf(pod, finalizer) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// RemoveFinalizerFromPod provides a mock function with given fields: pod, finalizer +func (_m *Client) RemoveFinalizerFromPod(pod *corev1.Pod, finalizer string) error { + ret := _m.Called(pod, finalizer) + + var r0 error + if rf, ok := ret.Get(0).(func(*corev1.Pod, string) error); ok { + r0 = rf(pod, finalizer) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/pkg/sm/plugin_loader_test.go b/pkg/sm/plugin_loader_test.go index 97e5286e..5c319660 100644 --- a/pkg/sm/plugin_loader_test.go +++ b/pkg/sm/plugin_loader_test.go @@ -3,12 +3,23 @@ package sm import ( "os" "path/filepath" + "runtime" "strings" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) +// isPluginSupported checks if Go plugins are supported on the current platform +func isPluginSupported() bool { + // Go plugins are not supported on macOS ARM64 (Apple Silicon) + if runtime.GOOS == "darwin" && runtime.GOARCH == "arm64" { + return false + } + // Add other unsupported platforms as needed + return true +} + var _ = Describe("Subnet Manager Plugin", func() { Context("NewPluginLoader", func() { It("Create new plugin loader", func() { @@ -24,12 +35,18 @@ var _ = Describe("Subnet Manager Plugin", func() { testPlugin = filepath.Join(curDir, "../../build/plugins/noop.so") }) It("Load valid subnet manager client plugin", func() { + if !isPluginSupported() { + Skip("Go plugins are not supported on this platform") + } pl := NewPluginLoader() smClient, err := pl.LoadPlugin(testPlugin, InitializePluginFunc) Expect(err).ToNot(HaveOccurred()) Expect(smClient).ToNot(BeNil()) }) It("Load non existing plugin", func() { + if !isPluginSupported() { + Skip("Go plugins are not supported on this platform") + } pl := NewPluginLoader() plugin, err := pl.LoadPlugin("not existing", InitializePluginFunc) Expect(err).To(HaveOccurred()) @@ -38,6 +55,9 @@ var _ = Describe("Subnet Manager Plugin", func() { Expect(isTextInError).To(BeTrue()) }) It("Load plugin with no Plugin object", func() { + if !isPluginSupported() { + Skip("Go plugins are not supported on this platform") + } pl := NewPluginLoader() plugin, err := pl.LoadPlugin(testPlugin, "NotExits") Expect(err).To(HaveOccurred()) @@ -47,6 +67,9 @@ var _ = Describe("Subnet Manager Plugin", func() { Expect(isTextInError).To(BeTrue()) }) It("Load plugin with not valid Plugin object", func() { + if !isPluginSupported() { + Skip("Go plugins are not supported on this platform") + } pl := NewPluginLoader() plugin, err := pl.LoadPlugin(testPlugin, "InvalidPlugin") Expect(err).To(HaveOccurred()) diff --git a/pkg/sm/plugins/noop/noop.go b/pkg/sm/plugins/noop/noop.go index bfe5427f..109d1437 100644 --- a/pkg/sm/plugins/noop/noop.go +++ b/pkg/sm/plugins/noop/noop.go @@ -43,6 +43,11 @@ func (p *plugin) AddGuidsToPKey(pkey int, guids []net.HardwareAddr) error { return nil } +func (p *plugin) AddGuidsToLimitedPKey(pkey int, guids []net.HardwareAddr) error { + log.Info().Msg("noop Plugin AddGuidsToLimitedPKey()") + return nil +} + func (p *plugin) RemoveGuidsFromPKey(pkey int, guids []net.HardwareAddr) error { log.Info().Msg("noop Plugin RemovePKey()") return nil @@ -53,6 +58,12 @@ func (p *plugin) ListGuidsInUse() ([]string, error) { return nil, nil } +// SetConfig allows the daemon to pass configuration to the plugin +func (p *plugin) SetConfig(config map[string]interface{}) error { + log.Info().Msg("noop Plugin SetConfig()") + return nil +} + // Initialize applies configs to plugin and return a subnet manager client func Initialize() (plugins.SubnetManagerClient, error) { log.Info().Msg("Initializing noop plugin") diff --git a/pkg/sm/plugins/noop/noop_test.go b/pkg/sm/plugins/noop/noop_test.go index 2b19c0b6..a20e8330 100644 --- a/pkg/sm/plugins/noop/noop_test.go +++ b/pkg/sm/plugins/noop/noop_test.go @@ -21,6 +21,9 @@ var _ = Describe("noop plugin", func() { err = plugin.AddGuidsToPKey(0, nil) Expect(err).ToNot(HaveOccurred()) + err = plugin.AddGuidsToLimitedPKey(0, nil) + Expect(err).ToNot(HaveOccurred()) + err = plugin.RemoveGuidsFromPKey(0, nil) Expect(err).ToNot(HaveOccurred()) }) diff --git a/pkg/sm/plugins/plugin.go b/pkg/sm/plugins/plugin.go index bedb5088..b990ea8b 100644 --- a/pkg/sm/plugins/plugin.go +++ b/pkg/sm/plugins/plugin.go @@ -16,10 +16,17 @@ type SubnetManagerClient interface { // It return error if failed. AddGuidsToPKey(pkey int, guids []net.HardwareAddr) error + // AddGuidsToLimitedPKey add guids as limited members to pkey. + // It return error if failed. + AddGuidsToLimitedPKey(pkey int, guids []net.HardwareAddr) error + // RemoveGuidsFromPKey remove guids for given pkey. // It return error if failed. RemoveGuidsFromPKey(pkey int, guids []net.HardwareAddr) error // ListGuidsInUse returns a list of all GUIDS associated with PKeys ListGuidsInUse() ([]string, error) + + // SetConfig allows the daemon to pass configuration to the plugin + SetConfig(config map[string]interface{}) error } diff --git a/pkg/sm/plugins/ufm/ufm.go b/pkg/sm/plugins/ufm/ufm.go index 125efc37..759c6784 100644 --- a/pkg/sm/plugins/ufm/ufm.go +++ b/pkg/sm/plugins/ufm/ufm.go @@ -29,12 +29,15 @@ const ( ) type UFMConfig struct { - Username string `env:"UFM_USERNAME"` // Username of ufm - Password string `env:"UFM_PASSWORD"` // Password of ufm - Address string `env:"UFM_ADDRESS"` // IP address or hostname of ufm server - Port int `env:"UFM_PORT"` // REST API port of ufm - HTTPSchema string `env:"UFM_HTTP_SCHEMA"` // http or https - Certificate string `env:"UFM_CERTIFICATE"` // Certificate of ufm + Username string `env:"UFM_USERNAME"` // Username of ufm + Password string `env:"UFM_PASSWORD"` // Password of ufm + Address string `env:"UFM_ADDRESS"` // IP address or hostname of ufm server + Port int `env:"UFM_PORT"` // REST API port of ufm + HTTPSchema string `env:"UFM_HTTP_SCHEMA"` // http or https + Certificate string `env:"UFM_CERTIFICATE"` // Certificate of ufm + EnableIPOverIB bool `env:"ENABLE_IP_OVER_IB" envDefault:"false"` // Enable IP over IB functionality + DefaultLimitedPartition string `env:"DEFAULT_LIMITED_PARTITION"` // Default partition key for limited membership + EnableIndex0ForPrimaryPkey bool `env:"ENABLE_INDEX0_FOR_PRIMARY_PKEY" envDefault:"true"` // Enable index0 for primary pkey GUID additions } func newUfmPlugin() (*ufmPlugin, error) { @@ -43,6 +46,11 @@ func newUfmPlugin() (*ufmPlugin, error) { return nil, err } + // Debug logging for environment variable parsing + log.Info().Msgf("UFM plugin: Environment variable ENABLE_IP_OVER_IB parsed as: %t", ufmConf.EnableIPOverIB) + log.Info().Msgf("UFM plugin: Environment variable DEFAULT_LIMITED_PARTITION parsed as: '%s'", ufmConf.DefaultLimitedPartition) + log.Info().Msgf("UFM plugin: Environment variable ENABLE_INDEX0_FOR_PRIMARY_PKEY parsed as: %t", ufmConf.EnableIndex0ForPrimaryPkey) + if ufmConf.Username == "" || ufmConf.Password == "" || ufmConf.Address == "" { return nil, fmt.Errorf("missing one or more required fileds for ufm [\"username\", \"password\", \"address\"]") } @@ -98,6 +106,18 @@ func (u *ufmPlugin) AddGuidsToPKey(pKey int, guids []net.HardwareAddr) error { return fmt.Errorf("invalid pkey 0x%04X, out of range 0x0001 - 0xFFFE", pKey) } + // Check if PKEY exists, create it if it doesn't + exists, err := u.pKeyExists(pKey) + if err != nil { + return fmt.Errorf("failed to check if pkey exists: %v", err) + } + + if !exists { + if err := u.createEmptyPKey(pKey); err != nil { + return fmt.Errorf("failed to create pkey: %v", err) + } + } + guidsString := make([]string, 0, len(guids)) for _, guid := range guids { guidAddr := ibUtils.GUIDToString(guid) @@ -105,8 +125,8 @@ func (u *ufmPlugin) AddGuidsToPKey(pKey int, guids []net.HardwareAddr) error { } data := []byte(fmt.Sprintf( - `{"pkey": "0x%04X", "index0": true, "ip_over_ib": false, "mtu_limit": 4, "service_level": 0, "rate_limit": 300, "guids": [%v], "membership": "full"}`, - pKey, strings.Join(guidsString, ","))) + `{"pkey": "0x%04X", "guids": [%v], "membership": "full", "index0": %t}`, + pKey, strings.Join(guidsString, ","), u.conf.EnableIndex0ForPrimaryPkey)) log.Info().Msgf("/ufmRest/resources/pkeys: Sending data %s", data) if _, err := u.client.Post(u.buildURL("/ufmRest/resources/pkeys"), http.StatusOK, data); err != nil { return fmt.Errorf("failed to add guids %v to PKey 0x%04X with error: %v", guids, pKey, err) @@ -115,6 +135,40 @@ func (u *ufmPlugin) AddGuidsToPKey(pKey int, guids []net.HardwareAddr) error { return nil } +func (u *ufmPlugin) AddGuidsToLimitedPKey(pKey int, guids []net.HardwareAddr) error { + log.Info().Msgf("adding guids %v as limited members to pKey 0x%04X", guids, pKey) + + if !ibUtils.IsPKeyValid(pKey) { + return fmt.Errorf("invalid pkey 0x%04X, out of range 0x0001 - 0xFFFE", pKey) + } + + // Check if PKEY exists, do not create it if it doesn't exist + exists, err := u.pKeyExists(pKey) + if err != nil { + return fmt.Errorf("failed to check if pkey exists: %v", err) + } + + if !exists { + return fmt.Errorf("limited pkey 0x%04X does not exist, will not create it", pKey) + } + + guidsString := make([]string, 0, len(guids)) + for _, guid := range guids { + guidAddr := ibUtils.GUIDToString(guid) + guidsString = append(guidsString, fmt.Sprintf("%q", guidAddr)) + } + + data := []byte(fmt.Sprintf( + `{"pkey": "0x%04X", "guids": [%v], "membership": "limited", "index0": false}`, + pKey, strings.Join(guidsString, ","))) + log.Info().Msgf("/ufmRest/resources/pkeys: Sending data %s", data) + if _, err := u.client.Post(u.buildURL("/ufmRest/resources/pkeys"), http.StatusOK, data); err != nil { + return fmt.Errorf("failed to add guids %v as limited members to PKey 0x%04X with error: %v", guids, pKey, err) + } + + return nil +} + func (u *ufmPlugin) RemoveGuidsFromPKey(pKey int, guids []net.HardwareAddr) error { log.Debug().Msgf("removing guids %v pkey 0x%04X", guids, pKey) @@ -178,10 +232,83 @@ func (u *ufmPlugin) ListGuidsInUse() ([]string, error) { return guids, nil } +func (u *ufmPlugin) pKeyExists(pKey int) (bool, error) { + response, err := u.client.Get(u.buildURL(fmt.Sprintf("/ufmRest/resources/pkeys/0x%04X", pKey)), http.StatusOK) + if err != nil { + if strings.Contains(err.Error(), "404") { + return false, nil + } + return false, fmt.Errorf("failed to check if pkey 0x%04X exists: %v", pKey, err) + } + + // Parse the JSON response to check if it contains actual data + var pkeyData map[string]interface{} + if err := json.Unmarshal(response, &pkeyData); err != nil { + return false, fmt.Errorf("failed to parse pkey response: %v", err) + } + + log.Info().Msgf("Pkey 0x%04X doesn't exist", pKey) + // If the response is empty (like {}) or doesn't contain expected fields, the pkey doesn't exist + return len(pkeyData) > 0, nil +} + +func (u *ufmPlugin) createEmptyPKey(pKey int) error { + // WARNING - this breaks UFM and causes it to reboot back to factory settings!!! + // TODO: fix this by finding the right API call to create a pkey + // ipOverIBStatus := "disabled" + // if u.conf.EnableIPOverIB { + // ipOverIBStatus = "enabled" + // } + // log.Info().Msgf("creating empty pKey 0x%04X with MTU 4k and IP over IB %s", pKey, ipOverIBStatus) + // data := []byte(fmt.Sprintf( + // `{"pkey": "0x%04X", "index0": true, "ip_over_ib": %t, "mtu_limit": 4, "service_level": 0, "rate_limit": 300}`, + // pKey, u.conf.EnableIPOverIB)) + + // if _, err := u.client.Post(u.buildURL("/ufmRest/resources/pkeys/add"), http.StatusCreated, data); err != nil { + // return fmt.Errorf("failed to create empty PKey 0x%04X: %v", pKey, err) + // } + + return nil +} + func (u *ufmPlugin) buildURL(path string) string { return fmt.Sprintf("%s://%s:%d%s", u.conf.HTTPSchema, u.conf.Address, u.conf.Port, path) } +// SetConfig allows the daemon to pass configuration to the plugin +func (u *ufmPlugin) SetConfig(config map[string]interface{}) error { + if enableIPOverIB, exists := config["ENABLE_IP_OVER_IB"]; exists { + if boolVal, ok := enableIPOverIB.(bool); ok { + u.conf.EnableIPOverIB = boolVal + log.Info().Msgf("UFM plugin: EnableIPOverIB set to %t via SetConfig", boolVal) + } else if strVal, ok := enableIPOverIB.(string); ok { + // Handle string values like "true", "false" + u.conf.EnableIPOverIB = strVal == "true" + log.Info().Msgf("UFM plugin: EnableIPOverIB set to %t via SetConfig (from string %s)", u.conf.EnableIPOverIB, strVal) + } + } + + if defaultLimitedPartition, exists := config["DEFAULT_LIMITED_PARTITION"]; exists { + if strVal, ok := defaultLimitedPartition.(string); ok { + u.conf.DefaultLimitedPartition = strVal + log.Info().Msgf("UFM plugin: DefaultLimitedPartition set to %s via SetConfig", strVal) + } + } + + if enableIndex0ForPrimaryPkey, exists := config["ENABLE_INDEX0_FOR_PRIMARY_PKEY"]; exists { + if boolVal, ok := enableIndex0ForPrimaryPkey.(bool); ok { + u.conf.EnableIndex0ForPrimaryPkey = boolVal + log.Info().Msgf("UFM plugin: EnableIndex0ForPrimaryPkey set to %t via SetConfig", boolVal) + } else if strVal, ok := enableIndex0ForPrimaryPkey.(string); ok { + // Handle string values like "true", "false" + u.conf.EnableIndex0ForPrimaryPkey = strVal == "true" + log.Info().Msgf("UFM plugin: EnableIndex0ForPrimaryPkey set to %t via SetConfig (from string %s)", u.conf.EnableIndex0ForPrimaryPkey, strVal) + } + } + + return nil +} + // Initialize applies configs to plugin and return a subnet manager client func Initialize() (plugins.SubnetManagerClient, error) { log.Info().Msg("Initializing ufm plugin") diff --git a/pkg/sm/plugins/ufm/ufm_test.go b/pkg/sm/plugins/ufm/ufm_test.go index e3091f04..5a859350 100644 --- a/pkg/sm/plugins/ufm/ufm_test.go +++ b/pkg/sm/plugins/ufm/ufm_test.go @@ -79,6 +79,7 @@ var _ = Describe("Ufm Subnet Manager Client plugin", func() { Context("AddGuidsToPKey", func() { It("Add guid to valid pkey", func() { client := &mocks.Client{} + client.On("Get", mock.Anything, mock.Anything).Return([]byte(`{"pkey": "0x1234"}`), nil) client.On("Post", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) plugin := &ufmPlugin{client: client, conf: UFMConfig{}} @@ -88,6 +89,40 @@ var _ = Describe("Ufm Subnet Manager Client plugin", func() { err = plugin.AddGuidsToPKey(0x1234, []net.HardwareAddr{guid}) Expect(err).ToNot(HaveOccurred()) }) + It("Add guid to valid pkey with index0 true", func() { + client := &mocks.Client{} + client.On("Get", mock.Anything, mock.Anything).Return([]byte(`{"pkey": "0x1234"}`), nil) + client.On("Post", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + + plugin := &ufmPlugin{client: client, conf: UFMConfig{EnableIndex0ForPrimaryPkey: true}} + guid, err := net.ParseMAC("11:22:33:44:55:66:77:88") + Expect(err).ToNot(HaveOccurred()) + + err = plugin.AddGuidsToPKey(0x1234, []net.HardwareAddr{guid}) + Expect(err).ToNot(HaveOccurred()) + + // Verify the Post call was made with index0: true + client.AssertCalled(GinkgoT(), "Post", mock.Anything, mock.Anything, mock.MatchedBy(func(data []byte) bool { + return string(data) == `{"pkey": "0x1234", "guids": ["1122334455667788"], "membership": "full", "index0": true}` + })) + }) + It("Add guid to valid pkey with index0 false", func() { + client := &mocks.Client{} + client.On("Get", mock.Anything, mock.Anything).Return([]byte(`{"pkey": "0x1234"}`), nil) + client.On("Post", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + + plugin := &ufmPlugin{client: client, conf: UFMConfig{EnableIndex0ForPrimaryPkey: false}} + guid, err := net.ParseMAC("11:22:33:44:55:66:77:88") + Expect(err).ToNot(HaveOccurred()) + + err = plugin.AddGuidsToPKey(0x1234, []net.HardwareAddr{guid}) + Expect(err).ToNot(HaveOccurred()) + + // Verify the Post call was made with index0: false + client.AssertCalled(GinkgoT(), "Post", mock.Anything, mock.Anything, mock.MatchedBy(func(data []byte) bool { + return string(data) == `{"pkey": "0x1234", "guids": ["1122334455667788"], "membership": "full", "index0": false}` + })) + }) It("Add guid to invalid pkey", func() { plugin := &ufmPlugin{conf: UFMConfig{}} guid, err := net.ParseMAC("11:22:33:44:55:66:77:88") @@ -99,6 +134,7 @@ var _ = Describe("Ufm Subnet Manager Client plugin", func() { }) It("Add guid to pkey failed from ufm", func() { client := &mocks.Client{} + client.On("Get", mock.Anything, mock.Anything).Return([]byte(`{"pkey": "0x1234"}`), nil) client.On("Post", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("failed")) plugin := &ufmPlugin{client: client, conf: UFMConfig{}} @@ -152,6 +188,155 @@ var _ = Describe("Ufm Subnet Manager Client plugin", func() { Expect(&errMsg).To(Equal(&errMessage)) }) }) + Context("AddGuidsToLimitedPKey", func() { + It("Add guid to valid limited pkey that exists", func() { + client := &mocks.Client{} + client.On("Get", mock.Anything, mock.Anything).Return([]byte(`{"pkey": "0x1234"}`), nil) + client.On("Post", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + + plugin := &ufmPlugin{client: client, conf: UFMConfig{}} + guid, err := net.ParseMAC("11:22:33:44:55:66:77:88") + Expect(err).ToNot(HaveOccurred()) + + err = plugin.AddGuidsToLimitedPKey(0x1234, []net.HardwareAddr{guid}) + Expect(err).ToNot(HaveOccurred()) + }) + It("Add guid to valid limited pkey with index0 false", func() { + client := &mocks.Client{} + client.On("Get", mock.Anything, mock.Anything).Return([]byte(`{"pkey": "0x1234"}`), nil) + client.On("Post", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + + plugin := &ufmPlugin{client: client, conf: UFMConfig{}} + guid, err := net.ParseMAC("11:22:33:44:55:66:77:88") + Expect(err).ToNot(HaveOccurred()) + + err = plugin.AddGuidsToLimitedPKey(0x1234, []net.HardwareAddr{guid}) + Expect(err).ToNot(HaveOccurred()) + + // Verify the Post call was made with index0: false + client.AssertCalled(GinkgoT(), "Post", mock.Anything, mock.Anything, mock.MatchedBy(func(data []byte) bool { + return string(data) == `{"pkey": "0x1234", "guids": ["1122334455667788"], "membership": "limited", "index0": false}` + })) + }) + It("Add guid to limited pkey that does not exist", func() { + client := &mocks.Client{} + client.On("Get", mock.Anything, mock.Anything).Return(nil, errors.New("404")) + + plugin := &ufmPlugin{client: client, conf: UFMConfig{}} + guid, err := net.ParseMAC("11:22:33:44:55:66:77:88") + Expect(err).ToNot(HaveOccurred()) + + err = plugin.AddGuidsToLimitedPKey(0x1234, []net.HardwareAddr{guid}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(Equal("limited pkey 0x1234 does not exist, will not create it")) + }) + It("Add guid to invalid limited pkey", func() { + plugin := &ufmPlugin{conf: UFMConfig{}} + guid, err := net.ParseMAC("11:22:33:44:55:66:77:88") + Expect(err).ToNot(HaveOccurred()) + + err = plugin.AddGuidsToLimitedPKey(0xFFFF, []net.HardwareAddr{guid}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(Equal("invalid pkey 0xFFFF, out of range 0x0001 - 0xFFFE")) + }) + It("Add guid to limited pkey failed from ufm", func() { + client := &mocks.Client{} + client.On("Get", mock.Anything, mock.Anything).Return([]byte(`{"pkey": "0x1234"}`), nil) + client.On("Post", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("failed")) + + plugin := &ufmPlugin{client: client, conf: UFMConfig{}} + guid, err := net.ParseMAC("11:22:33:44:55:66:77:88") + Expect(err).ToNot(HaveOccurred()) + + guids := []net.HardwareAddr{guid} + pKey := 0x1234 + err = plugin.AddGuidsToLimitedPKey(pKey, guids) + Expect(err).To(HaveOccurred()) + errMessage := fmt.Sprintf("failed to add guids %v as limited members to PKey 0x%04X with error: failed", guids, pKey) + Expect(err.Error()).To(Equal(errMessage)) + }) + }) + // Context("createEmptyPKey with EnableIPOverIB", func() { + // It("Create pkey with IP over IB enabled", func() { + // client := &mocks.Client{} + // client.On("Post", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + + // plugin := &ufmPlugin{client: client, conf: UFMConfig{EnableIPOverIB: true}} + // err := plugin.createEmptyPKey(0x1234) + // Expect(err).ToNot(HaveOccurred()) + + // // Verify the Post call was made with ip_over_ib: true + // client.AssertCalled(GinkgoT(), "Post", mock.Anything, mock.Anything, mock.MatchedBy(func(data []byte) bool { + // return string(data) == `{"pkey": "0x1234", "index0": true, "ip_over_ib": true, "mtu_limit": 4, "service_level": 0, "rate_limit": 300, "guids": [], "membership": "full"}` + // })) + // }) + // It("Create pkey with IP over IB disabled", func() { + // client := &mocks.Client{} + // client.On("Post", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + + // plugin := &ufmPlugin{client: client, conf: UFMConfig{EnableIPOverIB: false}} + // err := plugin.createEmptyPKey(0x1234) + // Expect(err).ToNot(HaveOccurred()) + + // // Verify the Post call was made with ip_over_ib: false + // client.AssertCalled(GinkgoT(), "Post", mock.Anything, mock.Anything, mock.MatchedBy(func(data []byte) bool { + // return string(data) == `{"pkey": "0x1234", "index0": true, "ip_over_ib": false, "mtu_limit": 4, "service_level": 0, "rate_limit": 300, "guids": [], "membership": "full"}` + // })) + // }) + // }) + Context("UFMConfig with new environment variables", func() { + AfterEach(func() { + os.Clearenv() + }) + It("newUfmPlugin with EnableIPOverIB and DefaultLimitedPartition config", func() { + Expect(os.Setenv("UFM_USERNAME", "admin")).ToNot(HaveOccurred()) + Expect(os.Setenv("UFM_PASSWORD", "123456")).ToNot(HaveOccurred()) + Expect(os.Setenv("UFM_ADDRESS", "1.1.1.1")).ToNot(HaveOccurred()) + Expect(os.Setenv("UFM_HTTP_SCHEMA", "http")).ToNot(HaveOccurred()) + Expect(os.Setenv("ENABLE_IP_OVER_IB", "true")).ToNot(HaveOccurred()) + Expect(os.Setenv("DEFAULT_LIMITED_PARTITION", "0x1")).ToNot(HaveOccurred()) + plugin, err := newUfmPlugin() + Expect(err).ToNot(HaveOccurred()) + Expect(plugin).ToNot(BeNil()) + Expect(plugin.conf.EnableIPOverIB).To(BeTrue()) + Expect(plugin.conf.DefaultLimitedPartition).To(Equal("0x1")) + }) + It("newUfmPlugin with EnableIndex0ForPrimaryPkey config", func() { + Expect(os.Setenv("UFM_USERNAME", "admin")).ToNot(HaveOccurred()) + Expect(os.Setenv("UFM_PASSWORD", "123456")).ToNot(HaveOccurred()) + Expect(os.Setenv("UFM_ADDRESS", "1.1.1.1")).ToNot(HaveOccurred()) + Expect(os.Setenv("UFM_HTTP_SCHEMA", "http")).ToNot(HaveOccurred()) + Expect(os.Setenv("ENABLE_INDEX0_FOR_PRIMARY_PKEY", "false")).ToNot(HaveOccurred()) + plugin, err := newUfmPlugin() + Expect(err).ToNot(HaveOccurred()) + Expect(plugin).ToNot(BeNil()) + Expect(plugin.conf.EnableIndex0ForPrimaryPkey).To(BeFalse()) + }) + It("newUfmPlugin with default EnableIPOverIB config", func() { + Expect(os.Setenv("UFM_USERNAME", "admin")).ToNot(HaveOccurred()) + Expect(os.Setenv("UFM_PASSWORD", "123456")).ToNot(HaveOccurred()) + Expect(os.Setenv("UFM_ADDRESS", "1.1.1.1")).ToNot(HaveOccurred()) + Expect(os.Setenv("UFM_HTTP_SCHEMA", "http")).ToNot(HaveOccurred()) + plugin, err := newUfmPlugin() + Expect(err).ToNot(HaveOccurred()) + Expect(plugin).ToNot(BeNil()) + Expect(plugin.conf.EnableIPOverIB).To(BeFalse()) // Default should be false + Expect(plugin.conf.DefaultLimitedPartition).To(Equal("")) // Default should be empty + Expect(plugin.conf.EnableIndex0ForPrimaryPkey).To(BeTrue()) // Default should be true + }) + It("newUfmPlugin with explicit false EnableIPOverIB config", func() { + Expect(os.Setenv("UFM_USERNAME", "admin")).ToNot(HaveOccurred()) + Expect(os.Setenv("UFM_PASSWORD", "123456")).ToNot(HaveOccurred()) + Expect(os.Setenv("UFM_ADDRESS", "1.1.1.1")).ToNot(HaveOccurred()) + Expect(os.Setenv("UFM_HTTP_SCHEMA", "http")).ToNot(HaveOccurred()) + Expect(os.Setenv("ENABLE_IP_OVER_IB", "false")).ToNot(HaveOccurred()) + plugin, err := newUfmPlugin() + Expect(err).ToNot(HaveOccurred()) + Expect(plugin).ToNot(BeNil()) + Expect(plugin.conf.EnableIPOverIB).To(BeFalse()) + Expect(plugin.conf.DefaultLimitedPartition).To(Equal("")) + }) + }) Context("ListGuidsInUse", func() { It("Remove guid from valid pkey", func() { testResponse := `{ diff --git a/pkg/watcher/handler/pod.go b/pkg/watcher/handler/pod.go index 9141fb9e..a1e23985 100644 --- a/pkg/watcher/handler/pod.go +++ b/pkg/watcher/handler/pod.go @@ -15,16 +15,18 @@ import ( ) type podEventHandler struct { - retryPods sync.Map - addedPods *utils.SynchronizedMap - deletedPods *utils.SynchronizedMap + retryPods sync.Map + addedPods *utils.SynchronizedMap + deletedPods *utils.SynchronizedMap + terminatingPods sync.Map // Track pods that are already being handled for termination } func NewPodEventHandler() ResourceEventHandler { eventHandler := &podEventHandler{ - retryPods: sync.Map{}, - addedPods: utils.NewSynchronizedMap(), - deletedPods: utils.NewSynchronizedMap(), + retryPods: sync.Map{}, + addedPods: utils.NewSynchronizedMap(), + deletedPods: utils.NewSynchronizedMap(), + terminatingPods: sync.Map{}, } return eventHandler @@ -73,6 +75,16 @@ func (p *podEventHandler) OnUpdate(oldObj, newObj interface{}) { log.Debug().Msgf("pod update event: namespace %s name %s", pod.Namespace, pod.Name) log.Info().Msgf("pod update event - podName: %s", pod.Name) + // Check if pod is entering terminating state (DeletionTimestamp was just set) + if oldObj != nil { + oldPod := oldObj.(*kapi.Pod) + if oldPod.DeletionTimestamp == nil && pod.DeletionTimestamp != nil { + log.Info().Msgf("pod entering terminating state: namespace %s name %s", pod.Namespace, pod.Name) + p.handleTerminatingPod(pod) + return + } + } + if !utils.PodWantsNetwork(pod) { log.Info().Msg("pod doesn't require network") return @@ -108,8 +120,9 @@ func (p *podEventHandler) OnDelete(obj interface{}) { pod := obj.(*kapi.Pod) log.Info().Msgf("pod delete event: namespace %s name %s", pod.Namespace, pod.Name) - // make sure this pod won't be in the retry pods + // Clean up tracking maps p.retryPods.Delete(pod.UID) + p.terminatingPods.Delete(pod.UID) if !utils.PodWantsNetwork(pod) { log.Debug().Msg("pod doesn't require network") @@ -152,6 +165,61 @@ func (p *podEventHandler) OnDelete(obj interface{}) { log.Info().Msgf("successfully deleted namespace %s name %s", pod.Namespace, pod.Name) } +// handleTerminatingPod processes pods that are entering terminating state (DeletionTimestamp set) +// This allows cleanup to start immediately when a pod starts terminating, rather than waiting +// for the actual delete event which only fires after finalizers are removed +func (p *podEventHandler) handleTerminatingPod(pod *kapi.Pod) { + // Check if we've already started handling this terminating pod to avoid duplicates + if _, alreadyHandling := p.terminatingPods.Load(pod.UID); alreadyHandling { + log.Debug().Msgf("terminating pod %s/%s already being handled", pod.Namespace, pod.Name) + return + } + p.terminatingPods.Store(pod.UID, true) + + // make sure this pod won't be in the retry pods + p.retryPods.Delete(pod.UID) + + if !utils.PodWantsNetwork(pod) { + log.Debug().Msg("terminating pod doesn't require network") + return + } + + if !utils.HasNetworkAttachmentAnnot(pod) { + log.Debug().Msgf("terminating pod doesn't have network annotation \"%v\"", v1.NetworkAttachmentAnnot) + return + } + + networks, err := netAttUtils.ParsePodNetworkAnnotation(pod) + if err != nil { + log.Error().Msgf("failed to parse network annotations for terminating pod with error: %v", err) + return + } + + for _, network := range networks { + if !utils.IsPodNetworkConfiguredWithInfiniBand(network) { + continue + } + + // check if pod network has guid + if !utils.PodNetworkHasGUID(network) { + log.Error().Msgf("terminating pod %s has network %s marked as configured with InfiniBand without having guid", + pod.Name, network.Name) + continue + } + + networkID := utils.GenerateNetworkID(network) + pods, ok := p.deletedPods.Get(networkID) + if !ok { + pods = []*kapi.Pod{pod} + } else { + pods = append(pods.([]*kapi.Pod), pod) + } + p.deletedPods.Set(networkID, pods) + } + + log.Info().Msgf("successfully handled terminating pod namespace %s name %s", pod.Namespace, pod.Name) +} + func (p *podEventHandler) GetResults() (*utils.SynchronizedMap, *utils.SynchronizedMap) { return p.addedPods, p.deletedPods } diff --git a/tag_build_and_push.sh b/tag_build_and_push.sh new file mode 100644 index 00000000..55f3fceb --- /dev/null +++ b/tag_build_and_push.sh @@ -0,0 +1,34 @@ +#! /bin/bash +# get short git commit hash +#TAG=$(git rev-parse --short HEAD) +TAG=v1.2.1 +echo "Tag: $TAG" + +set -eou pipefail + + +read -p "Have you logged in to aws and ecr? ([y]/n): " confirm +if [ "$confirm" == "n" ]; then + echo "Logging in to aws and ecr" + aws sso login + aws ecr-public get-login-password --region us-east-1 | docker login --username AWS --password-stdin public.ecr.aws +fi + +echo "Building and deploying $TAG" +make build +# echo "Running tests" +# make test > /dev/null 2>&1 +# if [ $? -ne 0 ]; then +# echo "Tests failed" +# exit 1 +# fi +make image TAG=public.ecr.aws/k6t4m3l7/ib-kubernetes:$TAG +# ask to confirm +read -p "Are you sure you want to push and deploy $TAG? (y/[n]): " confirm +if [ "$confirm" != "y" ]; then + echo "Deployment cancelled" + exit 1 +fi +make docker-push TAG=public.ecr.aws/k6t4m3l7/ib-kubernetes:$TAG + +echo "To Deploy, install/upgrade via helm!" \ No newline at end of file