From fbb3b957e4166bbb886fa411e6d5b6069be614a4 Mon Sep 17 00:00:00 2001 From: Christian Brunner Date: Wed, 4 Feb 2026 15:39:32 +0000 Subject: [PATCH 1/2] Initial DRBD implementation --- README.md | 217 +++++++++++- api/v1alpha1/groupversion_info.go | 23 ++ api/v1alpha1/types.go | 110 ++++++ api/v1alpha1/zz_generated.deepcopy.go | 86 +++++ .../csi-driver-lvm/templates/daemonset.yaml | 29 ++ charts/csi-driver-lvm/templates/drbd-crd.yaml | 89 +++++ .../templates/eviction-controller.yaml | 31 ++ .../templates/storageclasses.yaml | 23 ++ charts/csi-driver-lvm/values.yaml | 10 + cmd/controller/main.go | 19 ++ cmd/lvmplugin/Dockerfile | 2 +- cmd/lvmplugin/main.go | 41 ++- examples/csi-app-replicated.yaml | 16 + examples/csi-deployment-replicated.yaml | 27 ++ examples/csi-pod-replicated-raw.yaml | 20 ++ examples/csi-pvc-replicated-raw.yaml | 12 + examples/csi-pvc-replicated.yaml | 11 + examples/csi-statefulset-replicated.yaml | 51 +++ examples/csi-storageclass-replicated.yaml | 12 + pkg/controller/drbd_controller.go | 323 ++++++++++++++++++ pkg/controller/eviction_controller.go | 29 +- pkg/drbd/drbd.go | 280 +++++++++++++++ pkg/lvm/lvm.go | 99 ++++++ pkg/nodeagent/agent.go | 230 +++++++++++++ pkg/server/controller.go | 172 +++++++++- pkg/server/driver.go | 5 +- pkg/server/node.go | 66 ++++ 27 files changed, 2016 insertions(+), 17 deletions(-) create mode 100644 api/v1alpha1/groupversion_info.go create mode 100644 api/v1alpha1/types.go create mode 100644 api/v1alpha1/zz_generated.deepcopy.go create mode 100644 charts/csi-driver-lvm/templates/drbd-crd.yaml create mode 100644 examples/csi-app-replicated.yaml create mode 100644 examples/csi-deployment-replicated.yaml create mode 100644 examples/csi-pod-replicated-raw.yaml create mode 100644 examples/csi-pvc-replicated-raw.yaml create mode 100644 examples/csi-pvc-replicated.yaml create mode 100644 examples/csi-statefulset-replicated.yaml create mode 100644 examples/csi-storageclass-replicated.yaml create mode 100644 pkg/controller/drbd_controller.go create mode 100644 pkg/drbd/drbd.go create mode 100644 pkg/nodeagent/agent.go diff --git a/README.md b/README.md index 12896128..bd7ce146 100644 --- a/README.md +++ b/README.md @@ -9,12 +9,207 @@ Underneath it creates a LVM logical volume on the local disks. A comma-separated This CSI driver is derived from [csi-driver-host-path](https://github.com/kubernetes-csi/csi-driver-host-path) and [csi-lvm](https://github.com/metal-stack/csi-lvm) > [!WARNING] -> Note that there is always an inevitable risk of data loss when working with local volumes. For this reason, be sure to back up your data or implement proper data replication methods when using this CSI driver. +> Note that there is always an inevitable risk of data loss when working with non-replicated local volumes. For this reason, be sure to back up your data or enable DRBD replication when using this CSI driver. ## Currently it can create, delete, mount, unmount and resize block and filesystem volumes via lvm ## For the special case of block volumes, the filesystem-expansion has to be performed by the app using the block device +## DRBD Replication + +csi-driver-lvm supports optional synchronous replication of volumes to a second node in the cluster using [DRBD](https://linbit.com/drbd/). When enabled, each replicated volume maintains a real-time copy on a standby node. If the primary node fails, the pod and its PVC are automatically failed over to the standby node **without data loss**. + +### How it works + +``` + Node A (Primary) Node B (Secondary/Standby) + ┌─────────────────┐ ┌─────────────────┐ + │ LV: vol-abc │──── DRBD ───▶│ LV: vol-abc │ + │ /dev/vg/vol-abc │ (sync) │ /dev/vg/vol-abc │ + │ │ │ │ │ + │ /dev/drbdX │ │ /dev/drbdX │ + │ │ │ │ (Secondary, │ + │ mounted by pod │ │ not mounted) │ + └─────────────────┘ └─────────────────┘ +``` + +1. When a PVC is created with the `csi-driver-lvm-replicated` StorageClass, an LV is created on the primary node and a `DRBDVolume` custom resource is created. +2. The DRBD replication controller selects a secondary node using a **least-usage heuristic** (fewest existing replicas, most available capacity). +3. The node agents on both nodes create DRBD resource configs, initialize metadata, and establish replication. +4. The pod mounts the DRBD device (`/dev/drbdN`) instead of the raw LV. All writes are synchronously replicated to the secondary (DRBD protocol C). +5. On node failure, the eviction controller **promotes the secondary** and updates the PV node affinity instead of deleting the PVC. The pod reschedules to the standby node with its data intact. + +### Prerequisites + +- The DRBD kernel module (`drbd`) must be loaded on all nodes. Many distributions ship it by default. You can verify with `modprobe drbd`. +- At least two nodes with the same LVM volume group available. +- The eviction controller must be enabled for automatic failover. + +### Enabling DRBD replication + +Enable DRBD support and the replicated StorageClass in the Helm values: + +```yaml +drbd: + enabled: true + protocol: "C" # synchronous replication (recommended) + +storageClasses: + replicated: + enabled: true + reclaimPolicy: Delete + +evictionEnabled: true +``` + +Install or upgrade the Helm chart: + +```bash +helm upgrade --install csi-driver-lvm ./charts/csi-driver-lvm \ + --set lvm.devicePattern='/dev/nvme[0-9]n[0-9]' \ + --set drbd.enabled=true \ + --set storageClasses.replicated.enabled=true \ + --set evictionEnabled=true +``` + +This creates the `csi-driver-lvm-replicated` StorageClass and deploys the `DRBDVolume` CRD. + +### Using replicated volumes + +Create a PVC with the replicated StorageClass: + +```yaml +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: csi-pvc-replicated +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi + storageClassName: csi-driver-lvm-replicated +``` + +Use it in a Pod: + +```yaml +kind: Pod +apiVersion: v1 +metadata: + name: my-csi-app-replicated +spec: + containers: + - name: my-frontend + image: busybox + volumeMounts: + - mountPath: "/data" + name: my-csi-volume + command: [ "sleep", "1000000" ] + volumes: + - name: my-csi-volume + persistentVolumeClaim: + claimName: csi-pvc-replicated +``` + +### Using replicated volumes with StatefulSets (recommended for failover) + +For automatic failover on node failure, use a StatefulSet with `volumeClaimTemplates` and the eviction annotation: + +```yaml +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: nginx-replicated +spec: + serviceName: "nginx-replicated" + replicas: 1 + selector: + matchLabels: + app: nginx-replicated + template: + metadata: + labels: + app: nginx-replicated + annotations: + metal-stack.io/csi-driver-lvm.is-eviction-allowed: "true" + spec: + containers: + - name: nginx + image: nginx:1.14.2 + volumeMounts: + - mountPath: "/data" + name: data + volumeClaimTemplates: + - metadata: + name: data + spec: + accessModes: ["ReadWriteOnce"] + storageClassName: csi-driver-lvm-replicated + resources: + requests: + storage: 1Gi +``` + +When the primary node goes down or becomes unschedulable, the eviction controller will: + +1. Detect the node failure +2. Promote the DRBD secondary to primary +3. Update the PV node affinity to point to the new primary +4. The StatefulSet controller reschedules the pod to the new node with data intact + +### Inspecting DRBD volume state + +`DRBDVolume` is a cluster-scoped custom resource. You can inspect replication state with: + +```bash +kubectl get drbdvolumes +``` + +``` +NAME PRIMARY SECONDARY PHASE CONNECTION +pvc-abc node-a node-b Established Connected +pvc-def node-c node-a Established Connected +``` + +For detailed status: + +```bash +kubectl get drbdvolume pvc-abc -o yaml +``` + +### Raw block replicated volumes + +DRBD replication also works with raw block volumes: + +```yaml +kind: PersistentVolumeClaim +apiVersion: v1 +metadata: + name: pvc-replicated-raw +spec: + accessModes: + - ReadWriteOnce + storageClassName: csi-driver-lvm-replicated + volumeMode: Block + resources: + requests: + storage: 1Gi +``` + +### Configuration reference + +| Helm value | Default | Description | +|------------|---------|-------------| +| `drbd.enabled` | `false` | Enable DRBD replication support | +| `drbd.protocol` | `"C"` | DRBD replication protocol. `C` = synchronous (recommended), `B` = memory-synchronous, `A` = asynchronous | +| `drbd.portRange` | `"7900-7999"` | TCP port range for DRBD replication traffic | +| `drbd.minorRange` | `"100-999"` | DRBD device minor number range | +| `storageClasses.replicated.enabled` | `false` | Create the `csi-driver-lvm-replicated` StorageClass | +| `storageClasses.replicated.reclaimPolicy` | `Delete` | Reclaim policy for replicated volumes | +| `evictionEnabled` | `false` | Enable the eviction controller (required for automatic failover) | + ## Automatic PVC Deletion on Pod Eviction The persistent volumes created by this CSI driver are strictly node-affine to the node on which the pod was scheduled. This is intentional and prevents pods from starting without the LV data, which resides only on the specific node in the Kubernetes cluster. @@ -43,6 +238,7 @@ Now you can use one of following storageClasses: * `csi-driver-lvm-linear` * `csi-driver-lvm-mirror` * `csi-driver-lvm-striped` +* `csi-driver-lvm-replicated` (requires `drbd.enabled=true`, see [DRBD Replication](#drbd-replication)) To get the previous old and now deprecated `csi-lvm-sc-linear`, ... storageclasses, set helm-chart value `compat03x=true`. @@ -58,10 +254,10 @@ If you want to migrate your existing PVC to / from csi-driver-lvm, you can use [ ### Test ### ```bash +# non-replicated volumes kubectl apply -f examples/csi-pvc-raw.yaml kubectl apply -f examples/csi-pod-raw.yaml - kubectl apply -f examples/csi-pvc.yaml kubectl apply -f examples/csi-app.yaml @@ -70,6 +266,23 @@ kubectl delete -f examples/csi-pvc-raw.yaml kubectl delete -f examples/csi-app.yaml kubectl delete -f examples/csi-pvc.yaml + +# replicated volumes (requires drbd.enabled=true) +kubectl apply -f examples/csi-pvc-replicated.yaml +kubectl apply -f examples/csi-app-replicated.yaml + +kubectl get drbdvolumes + +kubectl delete -f examples/csi-app-replicated.yaml +kubectl delete -f examples/csi-pvc-replicated.yaml + +# replicated statefulset with automatic failover +kubectl apply -f examples/csi-statefulset-replicated.yaml + +kubectl get drbdvolumes +kubectl get pods -o wide + +kubectl delete -f examples/csi-statefulset-replicated.yaml ``` ### Development ### diff --git a/api/v1alpha1/groupversion_info.go b/api/v1alpha1/groupversion_info.go new file mode 100644 index 00000000..23c34a5d --- /dev/null +++ b/api/v1alpha1/groupversion_info.go @@ -0,0 +1,23 @@ +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +var ( + GroupVersion = schema.GroupVersion{Group: "lvm.csi.metal-stack.io", Version: "v1alpha1"} + + SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) + AddToScheme = SchemeBuilder.AddToScheme +) + +func addKnownTypes(scheme *runtime.Scheme) error { + scheme.AddKnownTypes(GroupVersion, + &DRBDVolume{}, + &DRBDVolumeList{}, + ) + metav1.AddToGroupVersion(scheme, GroupVersion) + return nil +} diff --git a/api/v1alpha1/types.go b/api/v1alpha1/types.go new file mode 100644 index 00000000..fa083dee --- /dev/null +++ b/api/v1alpha1/types.go @@ -0,0 +1,110 @@ +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// DRBDVolumeSpec defines the desired state of a DRBD-replicated volume. +type DRBDVolumeSpec struct { + // VolumeName is the CSI volume ID (matches the LV name). + VolumeName string `json:"volumeName"` + // SizeBytes is the size of the volume in bytes. + SizeBytes int64 `json:"sizeBytes"` + // PrimaryNode is the node currently serving the volume. + PrimaryNode string `json:"primaryNode"` + // SecondaryNode is the replication target. Set by the replication controller. + SecondaryNode string `json:"secondaryNode,omitempty"` + // VGName is the LVM volume group name on both nodes. + VGName string `json:"vgName"` + // LVMType is the backing LV type (linear, striped). + LVMType string `json:"lvmType"` + // DRBDMinor is the DRBD minor device number. + DRBDMinor int `json:"drbdMinor"` + // DRBDPort is the TCP port used for replication. + DRBDPort int `json:"drbdPort"` + // DRBDProtocol is the DRBD replication protocol (A, B, or C). + DRBDProtocol string `json:"drbdProtocol"` +} + +// DRBDConnectionState represents the DRBD connection state. +type DRBDConnectionState string + +const ( + ConnectionStateConnected DRBDConnectionState = "Connected" + ConnectionStateConnecting DRBDConnectionState = "Connecting" + ConnectionStateStandAlone DRBDConnectionState = "StandAlone" + ConnectionStateUnknown DRBDConnectionState = "Unknown" + ConnectionStateDisconnected DRBDConnectionState = "" +) + +// DRBDDiskState represents the DRBD disk state. +type DRBDDiskState string + +const ( + DiskStateUpToDate DRBDDiskState = "UpToDate" + DiskStateInconsistent DRBDDiskState = "Inconsistent" + DiskStateDiskless DRBDDiskState = "Diskless" + DiskStateUnknown DRBDDiskState = "" +) + +// DRBDVolumePhase represents the overall phase of the DRBD volume. +type DRBDVolumePhase string + +const ( + // VolumePhasePending means the secondary node has not been assigned yet. + VolumePhasePending DRBDVolumePhase = "Pending" + // VolumePhaseSecondaryAssigned means the secondary node was selected. + VolumePhaseSecondaryAssigned DRBDVolumePhase = "SecondaryAssigned" + // VolumePhasePrimaryReady means the primary node has set up its DRBD resource. + VolumePhasePrimaryReady DRBDVolumePhase = "PrimaryReady" + // VolumePhaseSecondaryReady means the secondary node has set up its DRBD resource. + VolumePhaseSecondaryReady DRBDVolumePhase = "SecondaryReady" + // VolumePhaseEstablished means both sides are connected and UpToDate. + VolumePhaseEstablished DRBDVolumePhase = "Established" + // VolumePhaseDegraded means the replication link is broken. + VolumePhaseDegraded DRBDVolumePhase = "Degraded" + // VolumePhaseDeleting means the volume is being torn down. + VolumePhaseDeleting DRBDVolumePhase = "Deleting" +) + +// DRBDVolumeStatus defines the observed state of a DRBD-replicated volume. +type DRBDVolumeStatus struct { + // Phase is the current lifecycle phase of the DRBD volume. + Phase DRBDVolumePhase `json:"phase,omitempty"` + // ConnectionState is the DRBD connection state between primary and secondary. + ConnectionState DRBDConnectionState `json:"connectionState,omitempty"` + // PrimaryDiskState is the disk state on the primary node. + PrimaryDiskState DRBDDiskState `json:"primaryDiskState,omitempty"` + // SecondaryDiskState is the disk state on the secondary node. + SecondaryDiskState DRBDDiskState `json:"secondaryDiskState,omitempty"` + // PrimaryReady indicates the primary node has completed DRBD setup. + PrimaryReady bool `json:"primaryReady,omitempty"` + // SecondaryReady indicates the secondary node has completed DRBD setup. + SecondaryReady bool `json:"secondaryReady,omitempty"` +} + +// +kubebuilder:object:root=true +// +kubebuilder:subresource:status +// +kubebuilder:resource:scope=Cluster +// +kubebuilder:printcolumn:name="Primary",type=string,JSONPath=`.spec.primaryNode` +// +kubebuilder:printcolumn:name="Secondary",type=string,JSONPath=`.spec.secondaryNode` +// +kubebuilder:printcolumn:name="Phase",type=string,JSONPath=`.status.phase` +// +kubebuilder:printcolumn:name="Connection",type=string,JSONPath=`.status.connectionState` + +// DRBDVolume represents a DRBD-replicated logical volume spanning two nodes. +type DRBDVolume struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec DRBDVolumeSpec `json:"spec,omitempty"` + Status DRBDVolumeStatus `json:"status,omitempty"` +} + +// +kubebuilder:object:root=true + +// DRBDVolumeList contains a list of DRBDVolume resources. +type DRBDVolumeList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []DRBDVolume `json:"items"` +} diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go new file mode 100644 index 00000000..6a6ac2c9 --- /dev/null +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -0,0 +1,86 @@ +//go:build !ignore_autogenerated + +package v1alpha1 + +import ( + runtime "k8s.io/apimachinery/pkg/runtime" +) + +func (in *DRBDVolume) DeepCopyInto(out *DRBDVolume) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + out.Status = in.Status +} + +func (in *DRBDVolume) DeepCopy() *DRBDVolume { + if in == nil { + return nil + } + out := new(DRBDVolume) + in.DeepCopyInto(out) + return out +} + +func (in *DRBDVolume) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +func (in *DRBDVolumeList) DeepCopyInto(out *DRBDVolumeList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]DRBDVolume, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +func (in *DRBDVolumeList) DeepCopy() *DRBDVolumeList { + if in == nil { + return nil + } + out := new(DRBDVolumeList) + in.DeepCopyInto(out) + return out +} + +func (in *DRBDVolumeList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +func (in *DRBDVolumeSpec) DeepCopyInto(out *DRBDVolumeSpec) { + *out = *in +} + +func (in *DRBDVolumeSpec) DeepCopy() *DRBDVolumeSpec { + if in == nil { + return nil + } + out := new(DRBDVolumeSpec) + in.DeepCopyInto(out) + return out +} + +func (in *DRBDVolumeStatus) DeepCopyInto(out *DRBDVolumeStatus) { + *out = *in +} + +func (in *DRBDVolumeStatus) DeepCopy() *DRBDVolumeStatus { + if in == nil { + return nil + } + out := new(DRBDVolumeStatus) + in.DeepCopyInto(out) + return out +} diff --git a/charts/csi-driver-lvm/templates/daemonset.yaml b/charts/csi-driver-lvm/templates/daemonset.yaml index fc5b310f..9b48513b 100644 --- a/charts/csi-driver-lvm/templates/daemonset.yaml +++ b/charts/csi-driver-lvm/templates/daemonset.yaml @@ -48,6 +48,14 @@ rules: - apiGroups: ["storage.k8s.io"] resources: ["csistoragecapacities"] verbs: ["get", "list", "watch", "update", "patch", "create", "delete"] +{{- if .Values.drbd.enabled }} + - apiGroups: ["lvm.csi.metal-stack.io"] + resources: ["drbdvolumes"] + verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] + - apiGroups: ["lvm.csi.metal-stack.io"] + resources: ["drbdvolumes/status"] + verbs: ["get", "update", "patch"] +{{- end }} --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 @@ -254,6 +262,9 @@ spec: - --nodeid=$(KUBE_NODE_NAME) - --vgname={{ .Values.lvm.vgName }} - --log-level={{ .Values.lvm.logLevel }} +{{- if .Values.drbd.enabled }} + - --enable-drbd +{{- end }} env: - name: KUBE_NODE_NAME valueFrom: @@ -308,6 +319,14 @@ spec: - mountPath: /run/lock/lvm name: lvmlock mountPropagation: Bidirectional +{{- if .Values.drbd.enabled }} + - mountPath: /etc/drbd.d + name: drbd-config + mountPropagation: Bidirectional + - mountPath: /var/lib/drbd + name: drbd-data + mountPropagation: Bidirectional +{{- end }} - name: liveness-probe args: - --csi-address=/csi/csi.sock @@ -367,4 +386,14 @@ spec: path: {{ .Values.lvm.hostWritePath }}/lock type: DirectoryOrCreate name: lvmlock +{{- if .Values.drbd.enabled }} + - hostPath: + path: /etc/drbd.d + type: DirectoryOrCreate + name: drbd-config + - hostPath: + path: /var/lib/drbd + type: DirectoryOrCreate + name: drbd-data +{{- end }} --- diff --git a/charts/csi-driver-lvm/templates/drbd-crd.yaml b/charts/csi-driver-lvm/templates/drbd-crd.yaml new file mode 100644 index 00000000..c520f331 --- /dev/null +++ b/charts/csi-driver-lvm/templates/drbd-crd.yaml @@ -0,0 +1,89 @@ +{{- if .Values.drbd.enabled }} +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: drbdvolumes.lvm.csi.metal-stack.io + labels: + heritage: {{ .Release.Service }} + release: {{ .Release.Name }} +spec: + group: lvm.csi.metal-stack.io + names: + kind: DRBDVolume + listKind: DRBDVolumeList + plural: drbdvolumes + singular: drbdvolume + shortNames: + - dv + scope: Cluster + versions: + - name: v1alpha1 + served: true + storage: true + subresources: + status: {} + additionalPrinterColumns: + - name: Primary + type: string + jsonPath: .spec.primaryNode + - name: Secondary + type: string + jsonPath: .spec.secondaryNode + - name: Phase + type: string + jsonPath: .status.phase + - name: Connection + type: string + jsonPath: .status.connectionState + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + properties: + volumeName: + type: string + sizeBytes: + type: integer + format: int64 + primaryNode: + type: string + secondaryNode: + type: string + vgName: + type: string + lvmType: + type: string + drbdMinor: + type: integer + drbdPort: + type: integer + drbdProtocol: + type: string + required: + - volumeName + - sizeBytes + - primaryNode + - vgName + - lvmType + - drbdMinor + - drbdPort + - drbdProtocol + status: + type: object + properties: + phase: + type: string + connectionState: + type: string + primaryDiskState: + type: string + secondaryDiskState: + type: string + primaryReady: + type: boolean + secondaryReady: + type: boolean +{{- end }} diff --git a/charts/csi-driver-lvm/templates/eviction-controller.yaml b/charts/csi-driver-lvm/templates/eviction-controller.yaml index 2df60cac..27f7d989 100644 --- a/charts/csi-driver-lvm/templates/eviction-controller.yaml +++ b/charts/csi-driver-lvm/templates/eviction-controller.yaml @@ -70,6 +70,34 @@ rules: - get - list - watch +{{- if $.Values.drbd.enabled }} + - apiGroups: + - lvm.csi.metal-stack.io + resources: + - drbdvolumes + verbs: + - get + - list + - watch + - update + - patch + - apiGroups: + - lvm.csi.metal-stack.io + resources: + - drbdvolumes/status + verbs: + - get + - update + - patch + - apiGroups: + - storage.k8s.io + resources: + - csistoragecapacities + verbs: + - get + - list + - watch +{{- end }} --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding @@ -115,6 +143,9 @@ spec: - --leader-elect - --health-probe-bind-address=:8081 - --provisioner-name={{ .Values.lvm.driverName }} +{{- if $.Values.drbd.enabled }} + - --enable-drbd +{{- end }} image: "{{ .Values.evictionControllerImage.repository }}:{{ .Values.evictionControllerImage.tag }}" imagePullPolicy: {{ .Values.evictionControllerImage.pullPolicy }} name: csi-driver-lvm-controller diff --git a/charts/csi-driver-lvm/templates/storageclasses.yaml b/charts/csi-driver-lvm/templates/storageclasses.yaml index 177c3919..434605a1 100644 --- a/charts/csi-driver-lvm/templates/storageclasses.yaml +++ b/charts/csi-driver-lvm/templates/storageclasses.yaml @@ -61,3 +61,26 @@ allowVolumeExpansion: true parameters: type: "striped" {{ end }} +--- +{{- if and .Values.drbd.enabled .Values.storageClasses.replicated.enabled -}} +{{- $storageClass := .Values.storageClasses.replicated -}} +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: {{ .Values.lvm.storageClassStub }}-replicated +{{- if not (empty $storageClass.additionalAnnotations) }} + annotations: + {{- $storageClass.additionalAnnotations | toYaml | nindent 4 -}} +{{ end }} + labels: + heritage: {{ .Release.Service }} + release: {{ .Release.Name }} +provisioner: {{ .Values.lvm.driverName }} +reclaimPolicy: {{ $storageClass.reclaimPolicy }} +volumeBindingMode: WaitForFirstConsumer +allowVolumeExpansion: true +parameters: + type: "linear" + replication: "drbd" + drbdProtocol: {{ .Values.drbd.protocol | quote }} +{{- end }} diff --git a/charts/csi-driver-lvm/values.yaml b/charts/csi-driver-lvm/values.yaml index 765dadb5..ac0486dc 100644 --- a/charts/csi-driver-lvm/values.yaml +++ b/charts/csi-driver-lvm/values.yaml @@ -21,6 +21,12 @@ compat03x: false evictionEnabled: false +drbd: + enabled: false + portRange: "7900-7999" + minorRange: "100-999" + protocol: "C" + pluginImage: repository: ghcr.io/metal-stack/csi-driver-lvm tag: latest @@ -56,6 +62,10 @@ storageClasses: enabled: true additionalAnnotations: [] reclaimPolicy: Delete + replicated: + enabled: false + additionalAnnotations: [] + reclaimPolicy: Delete nodeSelector: # The plugin daemonset will run on all nodes if it has a toleration, diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 1a9326c1..5c51b961 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -4,6 +4,7 @@ import ( "flag" "os" + v1alpha1 "github.com/metal-stack/csi-driver-lvm/api/v1alpha1" "github.com/metal-stack/csi-driver-lvm/pkg/controller" _ "k8s.io/client-go/plugin/pkg/client/auth" @@ -24,6 +25,7 @@ var ( func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(v1alpha1.AddToScheme(scheme)) // +kubebuilder:scaffold:scheme } @@ -35,9 +37,11 @@ func main() { metricsAddr string enableLeaderElection bool probeAddr string + enableDRBD bool ) flag.StringVar(&provisionerName, "provisioner-name", "lvm.csi.metal-stack.io", "The provisioner name of the csi-driver.") flag.StringVar(&logLevel, "log-level", "info", "The log level of the application.") + flag.BoolVar(&enableDRBD, "enable-drbd", false, "Enable the DRBD replication controller.") flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+ "Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") @@ -80,6 +84,21 @@ func main() { reconciler := controller.New(mgr.GetClient(), mgr.GetScheme(), ctrl.Log.WithName("CsiDriverLvmReconciler"), controller.Config{ProvisionerName: provisionerName}) + if enableDRBD { + drbdReconciler := controller.NewDRBDReplicationReconciler( + mgr.GetClient(), + mgr.GetScheme(), + ctrl.Log.WithName("DRBDReplicationReconciler"), + provisionerName, + ) + if err := drbdReconciler.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "DRBDReplication") + os.Exit(1) + } + reconciler.SetDRBDReconciler(drbdReconciler) + setupLog.Info("drbd replication controller enabled") + } + if err := reconciler.SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "CsiDriverLvm") os.Exit(1) diff --git a/cmd/lvmplugin/Dockerfile b/cmd/lvmplugin/Dockerfile index cf227478..f32366b8 100644 --- a/cmd/lvmplugin/Dockerfile +++ b/cmd/lvmplugin/Dockerfile @@ -2,7 +2,7 @@ FROM alpine:3.22 ARG TARGETPLATFORM LABEL maintainer="metal-stack authors " -RUN apk add lvm2 lvm2-extra e2fsprogs e2fsprogs-extra smartmontools nvme-cli util-linux device-mapper xfsprogs xfsprogs-extra +RUN apk add lvm2 lvm2-extra e2fsprogs e2fsprogs-extra smartmontools nvme-cli util-linux device-mapper xfsprogs xfsprogs-extra drbd-utils COPY --chmod=755 bin/${TARGETPLATFORM}/lvmplugin /lvmplugin USER root ENTRYPOINT ["/lvmplugin"] diff --git a/cmd/lvmplugin/main.go b/cmd/lvmplugin/main.go index 80c4d632..ac3dec7e 100644 --- a/cmd/lvmplugin/main.go +++ b/cmd/lvmplugin/main.go @@ -9,7 +9,14 @@ import ( "os/signal" "path" + v1alpha1 "github.com/metal-stack/csi-driver-lvm/api/v1alpha1" + "github.com/metal-stack/csi-driver-lvm/pkg/nodeagent" "github.com/metal-stack/csi-driver-lvm/pkg/server" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" ) var ( @@ -23,6 +30,7 @@ var ( devicesPattern = flag.String("devices", "", "comma-separated grok patterns of the physical volumes to use.") vgName = flag.String("vgname", "csi-lvm", "name of volume group") logLevel = flag.String("log-level", "info", "log-level of the application") + enableDRBD = flag.Bool("enable-drbd", false, "enable DRBD replication support") // Set by the build process version = "" @@ -65,7 +73,19 @@ func handle() { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) defer stop() - driver, err := server.NewDriver(log, *driverName, *nodeID, *endpoint, *hostWritePath, *ephemeral, *maxVolumesPerNode, version, *devicesPattern, *vgName) + var k8sClient client.Client + if *enableDRBD { + k8sClient, err = createK8sClient(log) + if err != nil { + log.Error("failed to create k8s client for drbd support", "error", err) + os.Exit(1) + } + log.Info("drbd support enabled, starting node agent") + agent := nodeagent.New(log, k8sClient, *nodeID, *vgName) + go agent.Run(ctx) + } + + driver, err := server.NewDriver(log, *driverName, *nodeID, *endpoint, *hostWritePath, *ephemeral, *maxVolumesPerNode, version, *devicesPattern, *vgName, k8sClient) if err != nil { log.Error("failed to initialize driver", "error", err) os.Exit(1) @@ -73,3 +93,22 @@ func handle() { driver.Run(ctx) } + +func createK8sClient(log *slog.Logger) (client.Client, error) { + scheme := runtime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(v1alpha1.AddToScheme(scheme)) + + cfg, err := rest.InClusterConfig() + if err != nil { + return nil, fmt.Errorf("failed to get in-cluster config: %w", err) + } + + c, err := client.New(cfg, client.Options{Scheme: scheme}) + if err != nil { + return nil, fmt.Errorf("failed to create k8s client: %w", err) + } + + log.Info("k8s client created for drbd support") + return c, nil +} diff --git a/examples/csi-app-replicated.yaml b/examples/csi-app-replicated.yaml new file mode 100644 index 00000000..95a3b294 --- /dev/null +++ b/examples/csi-app-replicated.yaml @@ -0,0 +1,16 @@ +kind: Pod +apiVersion: v1 +metadata: + name: my-csi-app-replicated +spec: + containers: + - name: my-frontend + image: busybox + volumeMounts: + - mountPath: "/data" + name: my-csi-volume + command: [ "sleep", "1000000" ] + volumes: + - name: my-csi-volume + persistentVolumeClaim: + claimName: csi-pvc-replicated diff --git a/examples/csi-deployment-replicated.yaml b/examples/csi-deployment-replicated.yaml new file mode 100644 index 00000000..5d688f1f --- /dev/null +++ b/examples/csi-deployment-replicated.yaml @@ -0,0 +1,27 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: csi-deployment-replicated + labels: + app: csi-app-replicated +spec: + replicas: 1 + selector: + matchLabels: + app: csi-app-replicated + template: + metadata: + labels: + app: csi-app-replicated + spec: + containers: + - name: csi-app + image: busybox + volumeMounts: + - mountPath: "/data" + name: my-csi-volume + command: [ "sleep", "1000000" ] + volumes: + - name: my-csi-volume + persistentVolumeClaim: + claimName: csi-pvc-replicated diff --git a/examples/csi-pod-replicated-raw.yaml b/examples/csi-pod-replicated-raw.yaml new file mode 100644 index 00000000..ee3181c1 --- /dev/null +++ b/examples/csi-pod-replicated-raw.yaml @@ -0,0 +1,20 @@ +apiVersion: v1 +kind: Pod +metadata: + name: pod-replicated-raw + labels: + name: busybox-replicated-raw +spec: + restartPolicy: Always + containers: + - image: gcr.io/google_containers/busybox + command: ["/bin/sh", "-c"] + args: [ "tail -f /dev/null" ] + name: busybox + volumeDevices: + - name: vol + devicePath: /dev/xdva + volumes: + - name: vol + persistentVolumeClaim: + claimName: pvc-replicated-raw diff --git a/examples/csi-pvc-replicated-raw.yaml b/examples/csi-pvc-replicated-raw.yaml new file mode 100644 index 00000000..5ad3e92b --- /dev/null +++ b/examples/csi-pvc-replicated-raw.yaml @@ -0,0 +1,12 @@ +kind: PersistentVolumeClaim +apiVersion: v1 +metadata: + name: pvc-replicated-raw +spec: + accessModes: + - ReadWriteOnce + storageClassName: csi-driver-lvm-replicated + volumeMode: Block + resources: + requests: + storage: 1Gi diff --git a/examples/csi-pvc-replicated.yaml b/examples/csi-pvc-replicated.yaml new file mode 100644 index 00000000..666909a0 --- /dev/null +++ b/examples/csi-pvc-replicated.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: csi-pvc-replicated +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi + storageClassName: csi-driver-lvm-replicated diff --git a/examples/csi-statefulset-replicated.yaml b/examples/csi-statefulset-replicated.yaml new file mode 100644 index 00000000..c3ae234d --- /dev/null +++ b/examples/csi-statefulset-replicated.yaml @@ -0,0 +1,51 @@ +--- +apiVersion: v1 +kind: Service +metadata: + name: nginx-replicated + labels: + app: nginx-replicated +spec: + ports: + - port: 80 + name: web + clusterIP: None + selector: + app: nginx-replicated +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: nginx-replicated + labels: + app: nginx-replicated +spec: + serviceName: "nginx-replicated" + replicas: 1 + selector: + matchLabels: + app: nginx-replicated + template: + metadata: + labels: + app: nginx-replicated + annotations: + metal-stack.io/csi-driver-lvm.is-eviction-allowed: "true" + spec: + containers: + - name: nginx + image: nginx:1.14.2 + ports: + - containerPort: 80 + volumeMounts: + - mountPath: "/data" + name: data + volumeClaimTemplates: + - metadata: + name: data + spec: + accessModes: ["ReadWriteOnce"] + storageClassName: csi-driver-lvm-replicated + resources: + requests: + storage: 1Gi diff --git a/examples/csi-storageclass-replicated.yaml b/examples/csi-storageclass-replicated.yaml new file mode 100644 index 00000000..b595a944 --- /dev/null +++ b/examples/csi-storageclass-replicated.yaml @@ -0,0 +1,12 @@ +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: csi-lvm-sc-replicated +provisioner: lvm.csi.metal-stack.io +reclaimPolicy: Delete +volumeBindingMode: WaitForFirstConsumer +allowVolumeExpansion: true +parameters: + type: "linear" + replication: "drbd" + drbdProtocol: "C" diff --git a/pkg/controller/drbd_controller.go b/pkg/controller/drbd_controller.go new file mode 100644 index 00000000..a5481f22 --- /dev/null +++ b/pkg/controller/drbd_controller.go @@ -0,0 +1,323 @@ +package controller + +import ( + "context" + "fmt" + "sort" + + "github.com/go-logr/logr" + v1alpha1 "github.com/metal-stack/csi-driver-lvm/api/v1alpha1" + corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +// +kubebuilder:rbac:groups=lvm.csi.metal-stack.io,resources=drbdvolumes,verbs=get;list;watch;update;patch +// +kubebuilder:rbac:groups=lvm.csi.metal-stack.io,resources=drbdvolumes/status,verbs=get;update;patch +// +kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch +// +kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=get;list;watch;update;patch +// +kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch +// +kubebuilder:rbac:groups=storage.k8s.io,resources=csistoragecapacities,verbs=get;list;watch + +type DRBDReplicationReconciler struct { + client.Client + Scheme *runtime.Scheme + Log logr.Logger + + ProvisionerName string +} + +func NewDRBDReplicationReconciler(c client.Client, scheme *runtime.Scheme, log logr.Logger, provisionerName string) *DRBDReplicationReconciler { + return &DRBDReplicationReconciler{ + Client: c, + Scheme: scheme, + Log: log, + ProvisionerName: provisionerName, + } +} + +func (r *DRBDReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := r.Log.WithValues("drbdvolume", req.Name) + + var dv v1alpha1.DRBDVolume + if err := r.Get(ctx, req.NamespacedName, &dv); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + // Skip volumes being deleted + if dv.Status.Phase == v1alpha1.VolumePhaseDeleting { + return ctrl.Result{}, nil + } + + // Phase 1: Assign secondary node if not set + if dv.Spec.SecondaryNode == "" { + log.Info("selecting secondary node for drbd volume") + secondary, err := r.selectSecondaryNode(ctx, dv.Spec.PrimaryNode, dv.Spec.SizeBytes) + if err != nil { + log.Error(err, "failed to select secondary node") + return ctrl.Result{}, err + } + + if secondary == "" { + log.Info("no suitable secondary node found, will retry") + return ctrl.Result{Requeue: true}, nil + } + + dv.Spec.SecondaryNode = secondary + if err := r.Update(ctx, &dv); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update drbdvolume with secondary node: %w", err) + } + + dv.Status.Phase = v1alpha1.VolumePhaseSecondaryAssigned + if err := r.Status().Update(ctx, &dv); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update drbdvolume status: %w", err) + } + + log.Info("assigned secondary node", "secondary", secondary) + return ctrl.Result{}, nil + } + + // Phase 2: Update phase based on readiness + if dv.Status.PrimaryReady && dv.Status.SecondaryReady && dv.Status.Phase != v1alpha1.VolumePhaseEstablished { + dv.Status.Phase = v1alpha1.VolumePhaseEstablished + if err := r.Status().Update(ctx, &dv); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update drbdvolume status to established: %w", err) + } + log.Info("drbd volume is established") + } + + return ctrl.Result{}, nil +} + +// nodeCapacity holds capacity data for ranking nodes. +type nodeCapacity struct { + NodeName string + AvailableBytes int64 + ReplicaCount int +} + +// selectSecondaryNode selects the best secondary node using a least-usage heuristic. +// It examines CSIStorageCapacity objects for available space and counts existing +// DRBD secondary assignments to prefer nodes with fewer replicas. +func (r *DRBDReplicationReconciler) selectSecondaryNode(ctx context.Context, primaryNode string, requiredBytes int64) (string, error) { + // List all nodes + var nodeList corev1.NodeList + if err := r.List(ctx, &nodeList); err != nil { + return "", fmt.Errorf("failed to list nodes: %w", err) + } + + // Build set of schedulable nodes (excluding primary) + schedulableNodes := map[string]bool{} + for _, node := range nodeList.Items { + if node.Name == primaryNode { + continue + } + if node.Spec.Unschedulable { + continue + } + schedulableNodes[node.Name] = true + } + + if len(schedulableNodes) == 0 { + return "", nil + } + + // Get storage capacities for our provisioner + var capacities storagev1.CSIStorageCapacityList + if err := r.List(ctx, &capacities); err != nil { + return "", fmt.Errorf("failed to list csi storage capacities: %w", err) + } + + nodeCapacities := map[string]int64{} + for _, cap := range capacities.Items { + if cap.StorageClassName == "" { + continue + } + + // Extract node name from topology + if cap.NodeTopology == nil { + continue + } + + for _, expr := range cap.NodeTopology.MatchLabels { + _ = expr // matchLabels iteration + } + + // CSIStorageCapacity uses NodeTopology as a label selector + // The provisioner sets topology.lvm.csi/node= + nodeName := "" + if cap.NodeTopology != nil { + for k, v := range cap.NodeTopology.MatchLabels { + if k == "topology.lvm.csi/node" { + nodeName = v + break + } + } + } + + if nodeName == "" || !schedulableNodes[nodeName] { + continue + } + + if cap.Capacity != nil { + bytes := cap.Capacity.Value() + if bytes > nodeCapacities[nodeName] { + nodeCapacities[nodeName] = bytes + } + } + } + + // Count existing DRBD secondary assignments per node + var dvList v1alpha1.DRBDVolumeList + if err := r.List(ctx, &dvList); err != nil { + return "", fmt.Errorf("failed to list drbd volumes: %w", err) + } + + replicaCounts := map[string]int{} + for _, dv := range dvList.Items { + if dv.Spec.SecondaryNode != "" { + replicaCounts[dv.Spec.SecondaryNode]++ + } + } + + // Build candidates with scoring + var candidates []nodeCapacity + for nodeName := range schedulableNodes { + avail, hasCapacity := nodeCapacities[nodeName] + if !hasCapacity { + // If no capacity data, skip — we can't verify the node has enough space + continue + } + + if avail < requiredBytes { + continue + } + + candidates = append(candidates, nodeCapacity{ + NodeName: nodeName, + AvailableBytes: avail, + ReplicaCount: replicaCounts[nodeName], + }) + } + + if len(candidates) == 0 { + // Fallback: if no capacity data available, pick any schedulable node with fewest replicas + for nodeName := range schedulableNodes { + candidates = append(candidates, nodeCapacity{ + NodeName: nodeName, + AvailableBytes: 0, + ReplicaCount: replicaCounts[nodeName], + }) + } + } + + if len(candidates) == 0 { + return "", nil + } + + // Sort: least replicas first, then most available capacity + sort.Slice(candidates, func(i, j int) bool { + if candidates[i].ReplicaCount != candidates[j].ReplicaCount { + return candidates[i].ReplicaCount < candidates[j].ReplicaCount + } + return candidates[i].AvailableBytes > candidates[j].AvailableBytes + }) + + return candidates[0].NodeName, nil +} + +// HandleFailover performs the failover of a DRBD volume from its current primary +// to its secondary. It updates the PV node affinity and swaps primary/secondary. +func (r *DRBDReplicationReconciler) HandleFailover(ctx context.Context, dvName string) error { + log := r.Log.WithValues("drbdvolume", dvName) + + var dv v1alpha1.DRBDVolume + if err := r.Get(ctx, types.NamespacedName{Name: dvName}, &dv); err != nil { + return fmt.Errorf("failed to get drbdvolume %s: %w", dvName, err) + } + + if dv.Spec.SecondaryNode == "" { + return fmt.Errorf("drbdvolume %s has no secondary node for failover", dvName) + } + + oldPrimary := dv.Spec.PrimaryNode + newPrimary := dv.Spec.SecondaryNode + + log.Info("performing failover", "old-primary", oldPrimary, "new-primary", newPrimary) + + // Update the PV node affinity to point to the new primary + var pvList corev1.PersistentVolumeList + if err := r.List(ctx, &pvList); err != nil { + return fmt.Errorf("failed to list pvs: %w", err) + } + + for i := range pvList.Items { + pv := &pvList.Items[i] + if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != r.ProvisionerName { + continue + } + if pv.Spec.CSI.VolumeHandle != dv.Spec.VolumeName { + continue + } + + // Update node affinity + if pv.Spec.NodeAffinity != nil && pv.Spec.NodeAffinity.Required != nil { + for j := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms { + for k := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms[j].MatchExpressions { + expr := &pv.Spec.NodeAffinity.Required.NodeSelectorTerms[j].MatchExpressions[k] + if expr.Key == "topology.lvm.csi/node" { + expr.Values = []string{newPrimary} + } + } + } + } + + if err := r.Update(ctx, pv); err != nil { + return fmt.Errorf("failed to update pv %s node affinity: %w", pv.Name, err) + } + + log.Info("updated pv node affinity", "pv", pv.Name, "new-node", newPrimary) + } + + // Swap primary and secondary in the DRBDVolume + dv.Spec.PrimaryNode = newPrimary + dv.Spec.SecondaryNode = oldPrimary + if err := r.Update(ctx, &dv); err != nil { + return fmt.Errorf("failed to update drbdvolume spec: %w", err) + } + + dv.Status.Phase = v1alpha1.VolumePhaseDegraded + if err := r.Status().Update(ctx, &dv); err != nil { + return fmt.Errorf("failed to update drbdvolume status: %w", err) + } + + log.Info("failover complete", "new-primary", newPrimary) + return nil +} + +func (r *DRBDReplicationReconciler) SetupWithManager(mgr ctrl.Manager) error { + pred := predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + UpdateFunc: func(e event.UpdateEvent) bool { + return true + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return false + }, + GenericFunc: func(e event.GenericEvent) bool { + return false + }, + } + + return ctrl.NewControllerManagedBy(mgr). + For(&v1alpha1.DRBDVolume{}). + WithEventFilter(pred). + Complete(r) +} diff --git a/pkg/controller/eviction_controller.go b/pkg/controller/eviction_controller.go index d6f75c00..fca89458 100644 --- a/pkg/controller/eviction_controller.go +++ b/pkg/controller/eviction_controller.go @@ -7,6 +7,7 @@ import ( "strconv" "github.com/go-logr/logr" + v1alpha1 "github.com/metal-stack/csi-driver-lvm/api/v1alpha1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" @@ -39,7 +40,8 @@ type CsiDriverLvmReconciler struct { Scheme *runtime.Scheme Log logr.Logger - cfg Config + cfg Config + drbdReconciler *DRBDReplicationReconciler } func New(client client.Client, scheme *runtime.Scheme, log logr.Logger, cfg Config) *CsiDriverLvmReconciler { @@ -51,6 +53,11 @@ func New(client client.Client, scheme *runtime.Scheme, log logr.Logger, cfg Conf } } +// SetDRBDReconciler sets the DRBD replication reconciler for failover support. +func (r *CsiDriverLvmReconciler) SetDRBDReconciler(drbd *DRBDReplicationReconciler) { + r.drbdReconciler = drbd +} + func parseBoolAnn(obj client.Object) (bool, error) { ann := obj.GetAnnotations() if v, ok := ann[isEvictionAllowedAnnotation]; ok { @@ -165,6 +172,26 @@ func (r *CsiDriverLvmReconciler) Reconcile(ctx context.Context, req ctrl.Request continue } + // Check if this volume has a DRBD replica — if so, failover instead of deleting PVC + if r.drbdReconciler != nil { + var dv v1alpha1.DRBDVolume + dvErr := r.Get(ctx, types.NamespacedName{Name: pv.Spec.CSI.VolumeHandle}, &dv) + if dvErr == nil && dv.Spec.SecondaryNode != "" { + r.Log.Info("performing drbd failover instead of pvc deletion", + "pvc", pvc.Name, "pod", pod.Name, "volume", dv.Name, + "old-primary", dv.Spec.PrimaryNode, "new-primary", dv.Spec.SecondaryNode, + ) + + if err := r.drbdReconciler.HandleFailover(ctx, dv.Name); err != nil { + return ctrl.Result{}, fmt.Errorf("unable to perform drbd failover for %q: %w", dv.Name, err) + } + r.Log.Info("drbd failover complete, pod will reschedule to new primary", + "pvc", pvc.Name, "pod", pod.Name, "new-primary", dv.Spec.SecondaryNode, + ) + continue + } + } + r.Log.Info("trying to delete pvc because of eviction", "pvc", pvc.Name, "pod", pod.Name, "namespace", pvc.Namespace, ) diff --git a/pkg/drbd/drbd.go b/pkg/drbd/drbd.go new file mode 100644 index 00000000..fe46208c --- /dev/null +++ b/pkg/drbd/drbd.go @@ -0,0 +1,280 @@ +package drbd + +import ( + "encoding/json" + "fmt" + "log/slog" + "os" + "os/exec" + "path/filepath" + "strings" + "text/template" +) + +const ( + resourceConfigDir = "/etc/drbd.d" + resourceTemplate = `resource {{.Name}} { + protocol {{.Protocol}}; + device /dev/drbd{{.Minor}}; + disk /dev/{{.VGName}}/{{.Name}}; + meta-disk internal; + on {{.LocalNodeName}} { + address {{.LocalAddr}}:{{.Port}}; + node-id 0; + } + on {{.RemoteNodeName}} { + address {{.RemoteAddr}}:{{.Port}}; + node-id 1; + } + net { + after-sb-0pri discard-zero-changes; + after-sb-1pri discard-secondary; + after-sb-2pri disconnect; + } +} +` +) + +type ResourceConfig struct { + Name string + Protocol string + Minor int + Port int + VGName string + LocalNodeName string + LocalAddr string + RemoteNodeName string + RemoteAddr string +} + +type Status struct { + Name string + Role string // Primary or Secondary + ConnectionState string // Connected, Connecting, StandAlone, etc. + DiskState string // UpToDate, Inconsistent, Diskless, etc. + PeerDiskState string +} + +// drbdsetup status output JSON structures +type drbdStatusReport []drbdResource + +type drbdResource struct { + Name string `json:"name"` + Role string `json:"role"` + Devices []drbdDevice `json:"devices"` + Connections []drbdConnection `json:"connections"` +} + +type drbdDevice struct { + Volume int `json:"volume"` + Minor int `json:"minor"` + DiskState string `json:"disk-state"` +} + +type drbdConnection struct { + PeerNodeID int `json:"peer-node-id"` + Name string `json:"name"` + ConnState string `json:"connection-state"` + PeerRole string `json:"peer-role"` + PeerDevices []drbdPeerDevice `json:"peer_devices"` +} + +type drbdPeerDevice struct { + Volume int `json:"volume"` + PeerDiskState string `json:"peer-disk-state"` +} + +// WriteResourceConfig writes a DRBD resource configuration file. +func WriteResourceConfig(log *slog.Logger, cfg ResourceConfig) error { + tmpl, err := template.New("drbd-resource").Parse(resourceTemplate) + if err != nil { + return fmt.Errorf("failed to parse drbd resource template: %w", err) + } + + configPath := filepath.Join(resourceConfigDir, cfg.Name+".res") + log.Info("writing drbd resource config", "path", configPath) + + f, err := os.Create(configPath) + if err != nil { + return fmt.Errorf("failed to create drbd resource config %s: %w", configPath, err) + } + defer f.Close() + + if err := tmpl.Execute(f, cfg); err != nil { + return fmt.Errorf("failed to render drbd resource config: %w", err) + } + + return nil +} + +// RemoveResourceConfig deletes the DRBD resource configuration file. +func RemoveResourceConfig(log *slog.Logger, name string) error { + configPath := filepath.Join(resourceConfigDir, name+".res") + log.Info("removing drbd resource config", "path", configPath) + + if err := os.Remove(configPath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove drbd resource config %s: %w", configPath, err) + } + return nil +} + +// CreateMetadata initializes DRBD metadata on the backing device. +func CreateMetadata(log *slog.Logger, name string) (string, error) { + log.Info("creating drbd metadata", "resource", name) + cmd := exec.Command("drbdadm", "create-md", "--force", name) + out, err := cmd.CombinedOutput() + if err != nil { + return string(out), fmt.Errorf("failed to create drbd metadata for %s: %w (%s)", name, err, string(out)) + } + return string(out), nil +} + +// Up brings up the DRBD resource (connects and attaches). +func Up(log *slog.Logger, name string) (string, error) { + log.Info("bringing up drbd resource", "resource", name) + cmd := exec.Command("drbdadm", "up", name) + out, err := cmd.CombinedOutput() + if err != nil { + return string(out), fmt.Errorf("failed to bring up drbd resource %s: %w (%s)", name, err, string(out)) + } + return string(out), nil +} + +// Down tears down the DRBD resource (disconnects and detaches). +func Down(log *slog.Logger, name string) (string, error) { + log.Info("bringing down drbd resource", "resource", name) + cmd := exec.Command("drbdadm", "down", name) + out, err := cmd.CombinedOutput() + if err != nil { + return string(out), fmt.Errorf("failed to bring down drbd resource %s: %w (%s)", name, err, string(out)) + } + return string(out), nil +} + +// Promote makes this node the primary for the DRBD resource. +func Promote(log *slog.Logger, name string) (string, error) { + log.Info("promoting drbd resource to primary", "resource", name) + cmd := exec.Command("drbdadm", "primary", name) + out, err := cmd.CombinedOutput() + if err != nil { + return string(out), fmt.Errorf("failed to promote drbd resource %s: %w (%s)", name, err, string(out)) + } + return string(out), nil +} + +// ForcePromote makes this node the primary even without a peer. +func ForcePromote(log *slog.Logger, name string) (string, error) { + log.Info("force-promoting drbd resource to primary", "resource", name) + cmd := exec.Command("drbdadm", "primary", "--force", name) + out, err := cmd.CombinedOutput() + if err != nil { + return string(out), fmt.Errorf("failed to force-promote drbd resource %s: %w (%s)", name, err, string(out)) + } + return string(out), nil +} + +// Demote makes this node the secondary for the DRBD resource. +func Demote(log *slog.Logger, name string) (string, error) { + log.Info("demoting drbd resource to secondary", "resource", name) + cmd := exec.Command("drbdadm", "secondary", name) + out, err := cmd.CombinedOutput() + if err != nil { + return string(out), fmt.Errorf("failed to demote drbd resource %s: %w (%s)", name, err, string(out)) + } + return string(out), nil +} + +// InitialSync triggers initial full sync from this node (must be primary). +func InitialSync(log *slog.Logger, name string) (string, error) { + log.Info("starting initial drbd sync (new-current-uuid --clear-bitmap)", "resource", name) + cmd := exec.Command("drbdadm", "--", "--overwrite-data-of-peer", "primary", name) + out, err := cmd.CombinedOutput() + if err != nil { + return string(out), fmt.Errorf("failed to start initial sync for %s: %w (%s)", name, err, string(out)) + } + return string(out), nil +} + +// Resize notifies DRBD that the backing device has changed size. +func Resize(log *slog.Logger, name string) (string, error) { + log.Info("resizing drbd resource", "resource", name) + cmd := exec.Command("drbdadm", "resize", name) + out, err := cmd.CombinedOutput() + if err != nil { + return string(out), fmt.Errorf("failed to resize drbd resource %s: %w (%s)", name, err, string(out)) + } + return string(out), nil +} + +// WipeMetadata removes DRBD metadata from the backing device. +func WipeMetadata(log *slog.Logger, name string) (string, error) { + log.Info("wiping drbd metadata", "resource", name) + cmd := exec.Command("drbdadm", "wipe-md", "--force", name) + out, err := cmd.CombinedOutput() + if err != nil { + return string(out), fmt.Errorf("failed to wipe drbd metadata for %s: %w (%s)", name, err, string(out)) + } + return string(out), nil +} + +// GetStatus returns the DRBD status for a resource. +func GetStatus(log *slog.Logger, name string) (*Status, error) { + cmd := exec.Command("drbdsetup", "status", name, "--json") + out, err := cmd.CombinedOutput() + if err != nil { + // Resource may not be up yet + if strings.Contains(string(out), "No such resource") { + return &Status{ + Name: name, + Role: "Unknown", + ConnectionState: "Unknown", + DiskState: "Unknown", + PeerDiskState: "Unknown", + }, nil + } + return nil, fmt.Errorf("failed to get drbd status for %s: %w (%s)", name, err, string(out)) + } + + var report drbdStatusReport + if err := json.Unmarshal(out, &report); err != nil { + return nil, fmt.Errorf("failed to parse drbd status output: %w", err) + } + + for _, res := range report { + if res.Name != name { + continue + } + + status := &Status{ + Name: name, + Role: res.Role, + } + + if len(res.Devices) > 0 { + status.DiskState = res.Devices[0].DiskState + } + + if len(res.Connections) > 0 { + status.ConnectionState = res.Connections[0].ConnState + if len(res.Connections[0].PeerDevices) > 0 { + status.PeerDiskState = res.Connections[0].PeerDevices[0].PeerDiskState + } + } + + return status, nil + } + + return nil, fmt.Errorf("resource %s not found in drbd status output", name) +} + +// ResourceExists checks if a DRBD resource config file exists. +func ResourceExists(name string) bool { + configPath := filepath.Join(resourceConfigDir, name+".res") + _, err := os.Stat(configPath) + return err == nil +} + +// DevicePath returns the DRBD device path for a given minor number. +func DevicePath(minor int) string { + return fmt.Sprintf("/dev/drbd%d", minor) +} diff --git a/pkg/lvm/lvm.go b/pkg/lvm/lvm.go index e64e396d..397f0168 100644 --- a/pkg/lvm/lvm.go +++ b/pkg/lvm/lvm.go @@ -111,6 +111,105 @@ func MountLV(log *slog.Logger, lvname, mountPath string, vgName string, fsType s return "", nil } +// MountLVByPath mounts a device at the given devicePath (e.g., /dev/drbd100) to mountPath. +func MountLVByPath(log *slog.Logger, devicePath, mountPath string, fsType string) (string, error) { + formatted := false + forceFormat := false + if fsType == "" { + fsType = "ext4" + } + + cmd := exec.Command("lsblk", "-J", "-f", devicePath) + out, err := cmd.CombinedOutput() + if err != nil { + return "", fmt.Errorf("unable to check if device %s is already formatted: %w (%s)", devicePath, err, string(out)) + } + + lsblkReport := lsblk{} + err = json.Unmarshal(out, &lsblkReport) + if err != nil { + return "", fmt.Errorf("failed to format lsblk output: %w", err) + } + + if len(lsblkReport.BlockDevices) != 1 { + return "", fmt.Errorf("unexpected amount of blockdevices found for lsblk (%d)", len(lsblkReport.BlockDevices)) + } + + switch f := lsblkReport.BlockDevices[0].FSType; f { + case nil: + formatted = false + log.Debug("device not yet formatted", "device-path", devicePath) + case ptr.To("xfs_external_log"): + formatted = false + forceFormat = true + default: + formatted = true + log.Debug("device already formatted", "device-path", devicePath, "format", *f) + } + + if !formatted { + formatArgs := []string{} + if forceFormat { + formatArgs = append(formatArgs, "-f") + } + formatArgs = append(formatArgs, devicePath) + + log.Debug("formatting with mkfs", "fs-type", fsType, "args", strings.Join(formatArgs, " ")) + cmd = exec.Command(fmt.Sprintf("mkfs.%s", fsType), formatArgs...) //nolint:gosec + out, err = cmd.CombinedOutput() + if err != nil { + return string(out), fmt.Errorf("unable to format device %q: %w (%s)", devicePath, err, string(out)) + } + } + + err = os.MkdirAll(mountPath, 0777|os.ModeSetgid) + if err != nil { + return string(out), fmt.Errorf("unable to create mount directory for device %s: %w", devicePath, err) + } + + mountArgs := []string{"--make-shared", "-t", fsType, devicePath, mountPath} + log.Debug("mounting with mount", "args", strings.Join(mountArgs, " ")) + cmd = exec.Command("mount", mountArgs...) + out, err = cmd.CombinedOutput() + if err != nil { + mountOutput := string(out) + if !strings.Contains(mountOutput, "already mounted") { + return string(out), fmt.Errorf("unable to mount %q to %q: %w (%s)", devicePath, mountPath, err, string(out)) + } + } + err = os.Chmod(mountPath, 0777|os.ModeSetgid) + if err != nil { + return "", fmt.Errorf("unable to change permissions of volume mount %s: %w", mountPath, err) + } + log.Debug("mountbypath output", "output", out) + return "", nil +} + +// BindMountLVByPath bind-mounts a device at devicePath to mountPath. +func BindMountLVByPath(log *slog.Logger, devicePath, mountPath string) (string, error) { + _, err := os.Create(mountPath) + if err != nil { + return "", fmt.Errorf("unable to create mount point for device %s: %w", devicePath, err) + } + + mountArgs := []string{"--make-shared", "--bind", devicePath, mountPath} + log.Debug("bindmountbypath command: mount", "args", strings.Join(mountArgs, " ")) + cmd := exec.Command("mount", mountArgs...) + out, err := cmd.CombinedOutput() + if err != nil { + mountOutput := string(out) + if !strings.Contains(mountOutput, "already mounted") { + return string(out), fmt.Errorf("unable to mount %q to %s: %w (%s)", devicePath, mountPath, err, string(out)) + } + } + err = os.Chmod(mountPath, 0777|os.ModeSetgid) + if err != nil { + return "", fmt.Errorf("unable to change permissions of volume mount %s: %w", mountPath, err) + } + log.Debug("bindmountbypath output", "output", out) + return "", nil +} + func BindMountLV(log *slog.Logger, lvname, mountPath string, vgName string) (string, error) { lvPath := fmt.Sprintf("/dev/%s/%s", vgName, lvname) _, err := os.Create(mountPath) diff --git a/pkg/nodeagent/agent.go b/pkg/nodeagent/agent.go new file mode 100644 index 00000000..f0899a61 --- /dev/null +++ b/pkg/nodeagent/agent.go @@ -0,0 +1,230 @@ +package nodeagent + +import ( + "context" + "fmt" + "log/slog" + "net" + "time" + + v1alpha1 "github.com/metal-stack/csi-driver-lvm/api/v1alpha1" + "github.com/metal-stack/csi-driver-lvm/pkg/drbd" + "github.com/metal-stack/csi-driver-lvm/pkg/lvm" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// Agent runs on each node and handles DRBD operations for DRBDVolume CRs +// that reference this node as either primary or secondary. +type Agent struct { + log *slog.Logger + client client.Client + nodeID string + vgName string + pollInterval time.Duration +} + +func New(log *slog.Logger, c client.Client, nodeID, vgName string) *Agent { + return &Agent{ + log: log, + client: c, + nodeID: nodeID, + vgName: vgName, + pollInterval: 10 * time.Second, + } +} + +// Run starts the polling loop that watches for DRBDVolume CRs assigned to this node. +func (a *Agent) Run(ctx context.Context) { + a.log.Info("starting drbd node agent", "node", a.nodeID) + + wait.UntilWithContext(ctx, a.reconcileAll, a.pollInterval) + + a.log.Info("drbd node agent stopped") +} + +func (a *Agent) reconcileAll(ctx context.Context) { + var dvList v1alpha1.DRBDVolumeList + if err := a.client.List(ctx, &dvList); err != nil { + a.log.Error("failed to list drbd volumes", "error", err) + return + } + + for i := range dvList.Items { + dv := &dvList.Items[i] + if err := a.reconcile(ctx, dv); err != nil { + a.log.Error("failed to reconcile drbd volume", "name", dv.Name, "error", err) + } + } +} + +func (a *Agent) reconcile(ctx context.Context, dv *v1alpha1.DRBDVolume) error { + isPrimary := dv.Spec.PrimaryNode == a.nodeID + isSecondary := dv.Spec.SecondaryNode == a.nodeID + + if !isPrimary && !isSecondary { + return nil + } + + // Skip if being deleted + if dv.Status.Phase == v1alpha1.VolumePhaseDeleting { + return nil + } + + // Skip if secondary not yet assigned + if dv.Spec.SecondaryNode == "" { + return nil + } + + // Check if we already completed our setup + if isPrimary && dv.Status.PrimaryReady { + return nil + } + if isSecondary && dv.Status.SecondaryReady { + return nil + } + + log := a.log.With("drbdvolume", dv.Name, "role", roleString(isPrimary)) + + // Get node IPs for DRBD config + primaryIP, err := a.getNodeIP(ctx, dv.Spec.PrimaryNode) + if err != nil { + return fmt.Errorf("failed to get primary node IP: %w", err) + } + secondaryIP, err := a.getNodeIP(ctx, dv.Spec.SecondaryNode) + if err != nil { + return fmt.Errorf("failed to get secondary node IP: %w", err) + } + + // Set up the local and remote config based on role + var localNodeName, remoteNodeName, localAddr, remoteAddr string + if isPrimary { + localNodeName = dv.Spec.PrimaryNode + remoteNodeName = dv.Spec.SecondaryNode + localAddr = primaryIP + remoteAddr = secondaryIP + } else { + localNodeName = dv.Spec.SecondaryNode + remoteNodeName = dv.Spec.PrimaryNode + localAddr = secondaryIP + remoteAddr = primaryIP + } + + // Secondary must create the LV first (primary already has it from CreateVolume) + if isSecondary { + if !lvm.LvExists(a.log, a.vgName, dv.Spec.VolumeName) { + log.Info("creating lv on secondary node") + output, err := lvm.CreateLV(a.log, a.vgName, dv.Spec.VolumeName, uint64(dv.Spec.SizeBytes), dv.Spec.LVMType, false) + if err != nil { + return fmt.Errorf("failed to create lv on secondary: %w, output: %s", err, output) + } + } + } + + // Write DRBD resource config if not present + if !drbd.ResourceExists(dv.Spec.VolumeName) { + log.Info("writing drbd resource config") + cfg := drbd.ResourceConfig{ + Name: dv.Spec.VolumeName, + Protocol: dv.Spec.DRBDProtocol, + Minor: dv.Spec.DRBDMinor, + Port: dv.Spec.DRBDPort, + VGName: a.vgName, + LocalNodeName: localNodeName, + LocalAddr: localAddr, + RemoteNodeName: remoteNodeName, + RemoteAddr: remoteAddr, + } + + if err := drbd.WriteResourceConfig(a.log, cfg); err != nil { + return fmt.Errorf("failed to write drbd resource config: %w", err) + } + + // Create DRBD metadata + if output, err := drbd.CreateMetadata(a.log, dv.Spec.VolumeName); err != nil { + return fmt.Errorf("failed to create drbd metadata: %w, output: %s", err, output) + } + + // Bring up DRBD resource + if output, err := drbd.Up(a.log, dv.Spec.VolumeName); err != nil { + return fmt.Errorf("failed to bring up drbd: %w, output: %s", err, output) + } + } + + // Mark this side as ready + // Re-fetch to avoid conflicts + var fresh v1alpha1.DRBDVolume + if err := a.client.Get(ctx, types.NamespacedName{Name: dv.Name}, &fresh); err != nil { + return fmt.Errorf("failed to re-fetch drbdvolume: %w", err) + } + + updated := false + if isPrimary && !fresh.Status.PrimaryReady { + fresh.Status.PrimaryReady = true + fresh.Status.Phase = v1alpha1.VolumePhasePrimaryReady + updated = true + } + if isSecondary && !fresh.Status.SecondaryReady { + fresh.Status.SecondaryReady = true + fresh.Status.Phase = v1alpha1.VolumePhaseSecondaryReady + updated = true + } + + if updated { + log.Info("marking node as ready for drbd volume") + if err := a.client.Status().Update(ctx, &fresh); err != nil { + return fmt.Errorf("failed to update drbdvolume status: %w", err) + } + } + + return nil +} + +// TeardownDRBD tears down a DRBD resource and removes the LV on this node. +// Called during volume deletion. +func (a *Agent) TeardownDRBD(dv *v1alpha1.DRBDVolume) error { + log := a.log.With("drbdvolume", dv.Name) + + if drbd.ResourceExists(dv.Spec.VolumeName) { + log.Info("tearing down drbd resource") + + if _, err := drbd.Down(a.log, dv.Spec.VolumeName); err != nil { + log.Error("failed to bring down drbd (may already be down)", "error", err) + } + + if err := drbd.RemoveResourceConfig(a.log, dv.Spec.VolumeName); err != nil { + log.Error("failed to remove drbd resource config", "error", err) + } + } + + return nil +} + +// getNodeIP looks up the InternalIP of a node. +func (a *Agent) getNodeIP(ctx context.Context, nodeName string) (string, error) { + var node corev1.Node + if err := a.client.Get(ctx, types.NamespacedName{Name: nodeName}, &node); err != nil { + return "", fmt.Errorf("failed to get node %s: %w", nodeName, err) + } + + for _, addr := range node.Status.Addresses { + if addr.Type == corev1.NodeInternalIP { + ip := net.ParseIP(addr.Address) + if ip == nil { + continue + } + return addr.Address, nil + } + } + + return "", fmt.Errorf("no InternalIP found for node %s", nodeName) +} + +func roleString(isPrimary bool) string { + if isPrimary { + return "primary" + } + return "secondary" +} diff --git a/pkg/server/controller.go b/pkg/server/controller.go index 76266f50..880df3ce 100644 --- a/pkg/server/controller.go +++ b/pkg/server/controller.go @@ -4,12 +4,26 @@ import ( "context" "fmt" "strconv" + "time" - "github.com/container-storage-interface/spec/lib/go/csi" + v1alpha1 "github.com/metal-stack/csi-driver-lvm/api/v1alpha1" + "github.com/metal-stack/csi-driver-lvm/pkg/drbd" "github.com/metal-stack/csi-driver-lvm/pkg/lvm" + + "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/wrapperspb" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// drbdMinorCounter is a simple in-memory counter for assigning DRBD minor numbers. +// In production, the DRBDReplicationController should manage this via the CRD. +var ( + drbdMinorCounter = 100 + drbdPortCounter = 7900 ) func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { @@ -58,7 +72,9 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) } } - d.log.Info("creating volume", "name", req.GetName()) + replication := req.GetParameters()["replication"] + + d.log.Info("creating volume", "name", req.GetName(), "replication", replication) requiredBytes := req.GetCapacityRange().GetRequiredBytes() @@ -72,31 +88,165 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) volumeContext := req.GetParameters() volumeContext["RequiredBytes"] = strconv.FormatInt(requiredBytes, 10) + topology := []*csi.Topology{{ + Segments: map[string]string{topologyKeyNode: d.nodeId}, + }} + + // Handle DRBD replication + if replication == "drbd" && d.k8sClient != nil { + dv, err := d.createDRBDVolume(ctx, req.GetName(), requiredBytes, lvmType, req.GetParameters()) + if err != nil { + return nil, fmt.Errorf("unable to create drbd volume: %w", err) + } + + // Wait for the secondary to be assigned and DRBD to be set up + if err := d.waitForDRBDReady(ctx, dv.Name); err != nil { + d.log.Warn("drbd not fully ready yet, volume will be available once replication establishes", "error", err) + } + + // If secondary was assigned, include it in the topology + var fresh v1alpha1.DRBDVolume + if err := d.k8sClient.Get(ctx, types.NamespacedName{Name: dv.Name}, &fresh); err == nil && fresh.Spec.SecondaryNode != "" { + topology = append(topology, &csi.Topology{ + Segments: map[string]string{topologyKeyNode: fresh.Spec.SecondaryNode}, + }) + } + + volumeContext["drbdVolume"] = dv.Name + volumeContext["drbdMinor"] = strconv.Itoa(dv.Spec.DRBDMinor) + } + return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ - VolumeId: req.GetName(), - CapacityBytes: requiredBytes, - VolumeContext: volumeContext, - ContentSource: req.GetVolumeContentSource(), - AccessibleTopology: []*csi.Topology{{ - Segments: map[string]string{topologyKeyNode: d.nodeId}, - }}, + VolumeId: req.GetName(), + CapacityBytes: requiredBytes, + VolumeContext: volumeContext, + ContentSource: req.GetVolumeContentSource(), + AccessibleTopology: topology, }, }, nil } +func (d *Driver) createDRBDVolume(ctx context.Context, volumeName string, sizeBytes int64, lvmType string, params map[string]string) (*v1alpha1.DRBDVolume, error) { + // Check if already exists + var existing v1alpha1.DRBDVolume + err := d.k8sClient.Get(ctx, types.NamespacedName{Name: volumeName}, &existing) + if err == nil { + return &existing, nil + } + + protocol := params["drbdProtocol"] + if protocol == "" { + protocol = "C" + } + + d.Lock() + minor := drbdMinorCounter + drbdMinorCounter++ + port := drbdPortCounter + drbdPortCounter++ + d.Unlock() + + dv := &v1alpha1.DRBDVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: volumeName, + }, + Spec: v1alpha1.DRBDVolumeSpec{ + VolumeName: volumeName, + SizeBytes: sizeBytes, + PrimaryNode: d.nodeId, + VGName: d.vgName, + LVMType: lvmType, + DRBDMinor: minor, + DRBDPort: port, + DRBDProtocol: protocol, + }, + } + + if err := d.k8sClient.Create(ctx, dv); err != nil { + return nil, fmt.Errorf("failed to create DRBDVolume CR: %w", err) + } + + dv.Status.Phase = v1alpha1.VolumePhasePending + if err := d.k8sClient.Status().Update(ctx, dv); err != nil { + d.log.Warn("failed to set initial drbd volume status", "error", err) + } + + d.log.Info("created DRBDVolume CR", "name", volumeName, "minor", minor, "port", port) + return dv, nil +} + +func (d *Driver) waitForDRBDReady(ctx context.Context, dvName string) error { + timeout := 60 * time.Second + pollInterval := 2 * time.Second + deadline := time.Now().Add(timeout) + + for time.Now().Before(deadline) { + var dv v1alpha1.DRBDVolume + if err := d.k8sClient.Get(ctx, types.NamespacedName{Name: dvName}, &dv); err != nil { + return err + } + + if dv.Status.Phase == v1alpha1.VolumePhaseEstablished { + return nil + } + + if dv.Status.PrimaryReady && dv.Status.SecondaryReady { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(pollInterval): + } + } + + return fmt.Errorf("timed out waiting for drbd volume %s to become ready", dvName) +} + func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { if len(req.GetVolumeId()) == 0 { return nil, status.Error(codes.InvalidArgument, "volume id missing in request") } + d.log.Info("trying to delete volume", "volume-id", req.VolumeId) + + // Tear down DRBD if this is a replicated volume + if d.k8sClient != nil { + var dv v1alpha1.DRBDVolume + err := d.k8sClient.Get(ctx, types.NamespacedName{Name: req.VolumeId}, &dv) + if err == nil { + d.log.Info("tearing down drbd for volume", "volume-id", req.VolumeId) + + // Mark as deleting + dv.Status.Phase = v1alpha1.VolumePhaseDeleting + if err := d.k8sClient.Status().Update(ctx, &dv); err != nil { + d.log.Warn("failed to update drbd volume phase to deleting", "error", err) + } + + // Tear down local DRBD + if drbd.ResourceExists(req.VolumeId) { + if _, err := drbd.Down(d.log, req.VolumeId); err != nil { + d.log.Warn("failed to bring down drbd", "error", err) + } + if err := drbd.RemoveResourceConfig(d.log, req.VolumeId); err != nil { + d.log.Warn("failed to remove drbd config", "error", err) + } + } + + // Delete the DRBDVolume CR (the secondary node agent will clean up its side) + if err := d.k8sClient.Delete(ctx, &dv); client.IgnoreNotFound(err) != nil { + d.log.Warn("failed to delete DRBDVolume CR", "error", err) + } + } + } + existsVolume := lvm.LvExists(d.log, d.vgName, req.VolumeId) if !existsVolume { return &csi.DeleteVolumeResponse{}, nil } - d.log.Info("trying to delete volume", "volume-id", req.VolumeId) - _, err := lvm.RemoveLVS(d.log, d.vgName, req.VolumeId) if err != nil { return nil, fmt.Errorf("unable to delete volume with id %s: %w", req.VolumeId, err) diff --git a/pkg/server/driver.go b/pkg/server/driver.go index 534587e4..806fbc1d 100644 --- a/pkg/server/driver.go +++ b/pkg/server/driver.go @@ -13,6 +13,7 @@ import ( "github.com/metal-stack/csi-driver-lvm/pkg/lvm" "github.com/metal-stack/v" "google.golang.org/grpc" + "sigs.k8s.io/controller-runtime/pkg/client" ) var ( @@ -36,9 +37,10 @@ type Driver struct { maxVolumesPerNode int64 devicesPattern string vgName string + k8sClient client.Client } -func NewDriver(log *slog.Logger, driverName, nodeId, endpoint string, hostWritePath string, ephemeral bool, maxVolumesPerNode int64, version string, devicesPattern string, vgName string) (*Driver, error) { +func NewDriver(log *slog.Logger, driverName, nodeId, endpoint string, hostWritePath string, ephemeral bool, maxVolumesPerNode int64, version string, devicesPattern string, vgName string, k8sClient client.Client) (*Driver, error) { if driverName == "" { return nil, fmt.Errorf("no driver name provided") } @@ -82,6 +84,7 @@ func NewDriver(log *slog.Logger, driverName, nodeId, endpoint string, hostWriteP maxVolumesPerNode: maxVolumesPerNode, devicesPattern: devicesPattern, vgName: vgName, + k8sClient: k8sClient, }, nil } diff --git a/pkg/server/node.go b/pkg/server/node.go index 207951ed..59b549b6 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -9,6 +9,8 @@ import ( "context" "github.com/docker/go-units" + v1alpha1 "github.com/metal-stack/csi-driver-lvm/api/v1alpha1" + "github.com/metal-stack/csi-driver-lvm/pkg/drbd" "github.com/metal-stack/csi-driver-lvm/pkg/lvm" "golang.org/x/sys/unix" @@ -16,6 +18,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/types" ) const topologyKeyNode = "topology.lvm.csi/node" @@ -79,6 +82,58 @@ func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolu d.log.Info("ephemeral mode: created volume", "volume", volID, "size", size) } + // Check if this is a DRBD-replicated volume + isDRBD := req.GetVolumeContext()["replication"] == "drbd" + drbdMinorStr := req.GetVolumeContext()["drbdMinor"] + + if isDRBD && drbdMinorStr != "" && d.k8sClient != nil { + minor, err := strconv.Atoi(drbdMinorStr) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "invalid drbdMinor: %s", drbdMinorStr) + } + + // Promote this node to primary for the DRBD resource + d.log.Info("promoting drbd resource for volume publish", "volume-id", req.GetVolumeId()) + + // Check if we need to force-promote (failover scenario) + drbdStatus, err := drbd.GetStatus(d.log, req.GetVolumeId()) + if err != nil { + d.log.Warn("could not get drbd status, attempting force promote", "error", err) + if _, err := drbd.ForcePromote(d.log, req.GetVolumeId()); err != nil { + return nil, fmt.Errorf("unable to force-promote drbd resource: %w", err) + } + } else if drbdStatus.Role != "Primary" { + if drbdStatus.ConnectionState == "Connected" { + if _, err := drbd.Promote(d.log, req.GetVolumeId()); err != nil { + return nil, fmt.Errorf("unable to promote drbd resource: %w", err) + } + } else { + if _, err := drbd.ForcePromote(d.log, req.GetVolumeId()); err != nil { + return nil, fmt.Errorf("unable to force-promote drbd resource: %w", err) + } + } + } + + // Use the DRBD device path instead of the LV path + drbdDev := drbd.DevicePath(minor) + + if req.GetVolumeCapability().GetBlock() != nil { + output, err := lvm.BindMountLVByPath(d.log, drbdDev, targetPath) + if err != nil { + return nil, fmt.Errorf("unable to bind mount drbd device: %w output:%s", err, output) + } + d.log.Info("block drbd", "id", req.GetVolumeId(), "device", drbdDev, "created at", targetPath) + } else if req.GetVolumeCapability().GetMount() != nil { + output, err := lvm.MountLVByPath(d.log, drbdDev, targetPath, req.GetVolumeCapability().GetMount().GetFsType()) + if err != nil { + return nil, fmt.Errorf("unable to mount drbd device: %w output:%s", err, output) + } + d.log.Info("mounted drbd", "id", req.GetVolumeId(), "device", drbdDev, "created at", targetPath) + } + + return &csi.NodePublishVolumeResponse{}, nil + } + if req.GetVolumeCapability().GetBlock() != nil { output, err := lvm.BindMountLV(d.log, req.GetVolumeId(), targetPath, d.vgName) if err != nil { @@ -113,6 +168,17 @@ func (d *Driver) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublish lvm.UmountLV(d.log, req.GetTargetPath()) + // Demote DRBD resource if this is a replicated volume + if d.k8sClient != nil { + var dv v1alpha1.DRBDVolume + if err := d.k8sClient.Get(ctx, types.NamespacedName{Name: volID}, &dv); err == nil { + d.log.Info("demoting drbd resource after unpublish", "volume-id", volID) + if _, err := drbd.Demote(d.log, volID); err != nil { + d.log.Warn("failed to demote drbd resource (may already be secondary)", "error", err) + } + } + } + // ephemeral volumes start with "csi-" if strings.HasPrefix(volID, "csi-") { // remove ephemeral volume here From 5453e83c4db553eacd91f2c28abf57d493cacb1e Mon Sep 17 00:00:00 2001 From: Christian Brunner Date: Wed, 4 Feb 2026 19:20:08 +0000 Subject: [PATCH 2/2] Handle node replacements with DRBD volumes --- README.md | 53 ++++++++ api/v1alpha1/types.go | 4 + api/v1alpha1/zz_generated.deepcopy.go | 6 +- charts/csi-driver-lvm/templates/drbd-crd.yaml | 3 + pkg/controller/drbd_controller.go | 126 +++++++++++++++++- pkg/nodeagent/agent.go | 64 ++++++++- 6 files changed, 251 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index bd7ce146..e14c37da 100644 --- a/README.md +++ b/README.md @@ -179,6 +179,59 @@ For detailed status: kubectl get drbdvolume pvc-abc -o yaml ``` +### Re-establishing redundancy after node replacement + +After a failover, the `DRBDVolume` enters the `Degraded` phase. The old failed node is listed as the secondary. The DRBD replication controller periodically checks whether the secondary node is truly gone (Node object deleted, or both unschedulable and NotReady). + +**Automatic re-replication:** Once the controller confirms the secondary node is gone **and** the grace period (5 minutes) has elapsed, it automatically selects a new secondary using the same least-usage heuristic, resets the DRBD setup, and the node agents establish replication to the replacement node. The volume transitions back through `SecondaryAssigned` → `PrimaryReady` → `SecondaryReady` → `Established`. + +``` +Degraded (node-a gone) → SecondaryAssigned (node-c picked) → Established (synced to node-c) +``` + +If you replaced the physical machine but reused the same node name, the controller sees the node as healthy and waits for it to recover on its own (DRBD reconnects automatically). If the node name changed, the old Node object must be removed from the cluster for re-replication to trigger: + +```bash +# Remove the old node object so the controller knows it's gone +kubectl delete node old-node-name + +# The controller will automatically select a new secondary +kubectl get drbdvolumes -w +``` + +**What happens on the new secondary:** +1. The node agent creates a fresh LV +2. Writes the DRBD resource config pointing to the primary +3. Initializes DRBD metadata and brings up the resource +4. DRBD performs a full initial sync from the primary + +**What happens on the primary:** +1. The node agent tears down the old DRBD config (pointing to the dead node) +2. Writes a new config pointing to the new secondary +3. Reinitializes and reconnects +4. DRBD syncs all data to the new secondary + +The volume remains fully usable during re-replication. The pod continues running on the primary while the sync happens in the background. + +### Rolling Kubernetes cluster updates + +DRBD replication is designed to work with rolling cluster updates where nodes are rebooted or reimaged one at a time. Two mechanisms ensure stability: + +**Grace period:** When a volume enters the `Degraded` phase, the controller records a `degradedSince` timestamp and waits **5 minutes** before triggering re-replication. This prevents unnecessary data movement when a node is simply rebooting during a rolling update. If the node comes back within the grace period and DRBD reconnects, the volume transitions back to `Established` without any re-replication. + +**Reimaged node detection:** If a node is reimaged (OS reinstalled) but keeps the same name, the node agent detects that its local LV and DRBD config are missing even though the readiness flag in the `DRBDVolume` CR is still `true`. It automatically resets the readiness flag, which triggers a rebuild of the DRBD resource on that node. + +**Recommended rolling update procedure:** + +1. Update nodes one at a time, waiting for each node to become `Ready` before proceeding to the next. +2. After each node comes back, verify DRBD volumes are `Established`: + ```bash + kubectl get drbdvolumes + ``` +3. Only proceed to the next node once all volumes show `Established` and `Connected`. + +This ensures that at any point during the update, at most one side of each DRBD pair is down, and the 5-minute grace period prevents premature re-replication. + ### Raw block replicated volumes DRBD replication also works with raw block volumes: diff --git a/api/v1alpha1/types.go b/api/v1alpha1/types.go index fa083dee..ff80e9f9 100644 --- a/api/v1alpha1/types.go +++ b/api/v1alpha1/types.go @@ -81,6 +81,10 @@ type DRBDVolumeStatus struct { PrimaryReady bool `json:"primaryReady,omitempty"` // SecondaryReady indicates the secondary node has completed DRBD setup. SecondaryReady bool `json:"secondaryReady,omitempty"` + // DegradedSince records when the volume first entered the Degraded phase. + // Used to implement a grace period before triggering re-replication, + // allowing nodes to recover from temporary outages like rolling updates. + DegradedSince *metav1.Time `json:"degradedSince,omitempty"` } // +kubebuilder:object:root=true diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 6a6ac2c9..55557742 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -11,7 +11,7 @@ func (in *DRBDVolume) DeepCopyInto(out *DRBDVolume) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) out.Spec = in.Spec - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) } func (in *DRBDVolume) DeepCopy() *DRBDVolume { @@ -74,6 +74,10 @@ func (in *DRBDVolumeSpec) DeepCopy() *DRBDVolumeSpec { func (in *DRBDVolumeStatus) DeepCopyInto(out *DRBDVolumeStatus) { *out = *in + if in.DegradedSince != nil { + in, out := &in.DegradedSince, &out.DegradedSince + *out = (*in).DeepCopy() + } } func (in *DRBDVolumeStatus) DeepCopy() *DRBDVolumeStatus { diff --git a/charts/csi-driver-lvm/templates/drbd-crd.yaml b/charts/csi-driver-lvm/templates/drbd-crd.yaml index c520f331..67cb5a60 100644 --- a/charts/csi-driver-lvm/templates/drbd-crd.yaml +++ b/charts/csi-driver-lvm/templates/drbd-crd.yaml @@ -86,4 +86,7 @@ spec: type: boolean secondaryReady: type: boolean + degradedSince: + type: string + format: date-time {{- end }} diff --git a/pkg/controller/drbd_controller.go b/pkg/controller/drbd_controller.go index a5481f22..ee593066 100644 --- a/pkg/controller/drbd_controller.go +++ b/pkg/controller/drbd_controller.go @@ -4,11 +4,13 @@ import ( "context" "fmt" "sort" + "time" "github.com/go-logr/logr" v1alpha1 "github.com/metal-stack/csi-driver-lvm/api/v1alpha1" corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" @@ -24,6 +26,13 @@ import ( // +kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch // +kubebuilder:rbac:groups=storage.k8s.io,resources=csistoragecapacities,verbs=get;list;watch +const ( + // DegradedGracePeriod is how long a volume must remain in Degraded phase + // before re-replication is triggered. This prevents premature re-replication + // during rolling updates where nodes are temporarily NotReady. + DegradedGracePeriod = 5 * time.Minute +) + type DRBDReplicationReconciler struct { client.Client Scheme *runtime.Scheme @@ -82,9 +91,88 @@ func (r *DRBDReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{}, nil } - // Phase 2: Update phase based on readiness + // Phase 2: Handle degraded volumes — check if secondary node is gone and re-replicate + if dv.Status.Phase == v1alpha1.VolumePhaseDegraded { + // Set DegradedSince if not already set (first time entering this path) + if dv.Status.DegradedSince == nil { + now := metav1.Now() + dv.Status.DegradedSince = &now + if err := r.Status().Update(ctx, &dv); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to set degradedSince timestamp: %w", err) + } + log.Info("recorded degraded timestamp, will wait for grace period before re-replication", + "degradedSince", now.Time, "gracePeriod", DegradedGracePeriod) + return ctrl.Result{RequeueAfter: 30 * time.Second}, nil + } + + secondaryGone, err := r.isNodeGone(ctx, dv.Spec.SecondaryNode) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to check secondary node status: %w", err) + } + + if secondaryGone { + // Enforce grace period: only re-replicate after DegradedGracePeriod has elapsed. + // This prevents premature re-replication during rolling updates where nodes + // are temporarily NotReady while rebooting. + elapsed := time.Since(dv.Status.DegradedSince.Time) + if elapsed < DegradedGracePeriod { + remaining := DegradedGracePeriod - elapsed + log.Info("secondary node appears gone but grace period has not elapsed, waiting", + "secondary", dv.Spec.SecondaryNode, + "degradedSince", dv.Status.DegradedSince.Time, + "remaining", remaining, + ) + return ctrl.Result{RequeueAfter: 30 * time.Second}, nil + } + + log.Info("secondary node is gone and grace period elapsed, selecting replacement", + "old-secondary", dv.Spec.SecondaryNode, + "degradedSince", dv.Status.DegradedSince.Time, + ) + + newSecondary, err := r.selectSecondaryNode(ctx, dv.Spec.PrimaryNode, dv.Spec.SizeBytes) + if err != nil { + log.Error(err, "failed to select replacement secondary node") + return ctrl.Result{}, err + } + + if newSecondary == "" { + log.Info("no suitable replacement secondary node found, will retry") + return ctrl.Result{Requeue: true}, nil + } + + oldSecondary := dv.Spec.SecondaryNode + dv.Spec.SecondaryNode = newSecondary + if err := r.Update(ctx, &dv); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update drbdvolume with replacement secondary: %w", err) + } + + // Reset readiness flags so both sides re-establish DRBD + dv.Status.SecondaryReady = false + dv.Status.PrimaryReady = false + dv.Status.DegradedSince = nil + dv.Status.Phase = v1alpha1.VolumePhaseSecondaryAssigned + if err := r.Status().Update(ctx, &dv); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update drbdvolume status for re-replication: %w", err) + } + + log.Info("assigned replacement secondary node", + "old-secondary", oldSecondary, + "new-secondary", newSecondary, + ) + return ctrl.Result{}, nil + } + + // Secondary node still exists — it may recover on its own. + // If it recovers and DRBD reconnects, the volume should transition back to Established. + log.Info("secondary node still exists, waiting for it to recover", "secondary", dv.Spec.SecondaryNode) + return ctrl.Result{RequeueAfter: 30 * time.Second}, nil + } + + // Phase 3: Update phase based on readiness if dv.Status.PrimaryReady && dv.Status.SecondaryReady && dv.Status.Phase != v1alpha1.VolumePhaseEstablished { dv.Status.Phase = v1alpha1.VolumePhaseEstablished + dv.Status.DegradedSince = nil if err := r.Status().Update(ctx, &dv); err != nil { return ctrl.Result{}, fmt.Errorf("failed to update drbdvolume status to established: %w", err) } @@ -94,6 +182,40 @@ func (r *DRBDReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{}, nil } +// isNodeGone returns true if the node does not exist or has been marked unschedulable +// and has no Ready condition (i.e. it's truly gone, not just cordoned for maintenance). +func (r *DRBDReplicationReconciler) isNodeGone(ctx context.Context, nodeName string) (bool, error) { + var node corev1.Node + err := r.Get(ctx, types.NamespacedName{Name: nodeName}, &node) + if err != nil { + // Node object doesn't exist — it's been removed from the cluster + if client.IgnoreNotFound(err) == nil { + return true, nil + } + return false, err + } + + // Node exists but check if it's both unschedulable and not Ready + // (a cordoned node that's still running should not trigger re-replication) + if !node.Spec.Unschedulable { + return false, nil + } + + for _, cond := range node.Status.Conditions { + if cond.Type == corev1.NodeReady { + if cond.Status == corev1.ConditionTrue { + // Node is cordoned but healthy — don't re-replicate yet + return false, nil + } + // Node is unschedulable and NotReady — consider it gone + return true, nil + } + } + + // No Ready condition found and unschedulable — consider it gone + return true, nil +} + // nodeCapacity holds capacity data for ranking nodes. type nodeCapacity struct { NodeName string @@ -291,7 +413,9 @@ func (r *DRBDReplicationReconciler) HandleFailover(ctx context.Context, dvName s return fmt.Errorf("failed to update drbdvolume spec: %w", err) } + now := metav1.Now() dv.Status.Phase = v1alpha1.VolumePhaseDegraded + dv.Status.DegradedSince = &now if err := r.Status().Update(ctx, &dv); err != nil { return fmt.Errorf("failed to update drbdvolume status: %w", err) } diff --git a/pkg/nodeagent/agent.go b/pkg/nodeagent/agent.go index f0899a61..e504f000 100644 --- a/pkg/nodeagent/agent.go +++ b/pkg/nodeagent/agent.go @@ -64,7 +64,13 @@ func (a *Agent) reconcile(ctx context.Context, dv *v1alpha1.DRBDVolume) error { isPrimary := dv.Spec.PrimaryNode == a.nodeID isSecondary := dv.Spec.SecondaryNode == a.nodeID + // If this node is no longer referenced, clean up any stale DRBD resources if !isPrimary && !isSecondary { + if drbd.ResourceExists(dv.Spec.VolumeName) { + a.log.Info("this node is no longer assigned to drbd volume, cleaning up", + "drbdvolume", dv.Name) + a.cleanupLocalDRBD(dv.Spec.VolumeName) + } return nil } @@ -78,12 +84,54 @@ func (a *Agent) reconcile(ctx context.Context, dv *v1alpha1.DRBDVolume) error { return nil } - // Check if we already completed our setup + // If the readiness flags were reset (re-replication), tear down old config so it's rebuilt + if isPrimary && !dv.Status.PrimaryReady && drbd.ResourceExists(dv.Spec.VolumeName) { + a.log.Info("primary readiness reset, tearing down old drbd config for rebuild", + "drbdvolume", dv.Name) + a.cleanupLocalDRBD(dv.Spec.VolumeName) + } + if isSecondary && !dv.Status.SecondaryReady && drbd.ResourceExists(dv.Spec.VolumeName) { + a.log.Info("secondary readiness reset, tearing down old drbd config for rebuild", + "drbdvolume", dv.Name) + a.cleanupLocalDRBD(dv.Spec.VolumeName) + } + + // Check if we already completed our setup. + // However, verify that the local state is still intact — a reimaged node + // will have lost its LV and DRBD config even though the readiness flag is still true. if isPrimary && dv.Status.PrimaryReady { - return nil + if lvm.LvExists(a.log, a.vgName, dv.Spec.VolumeName) && drbd.ResourceExists(dv.Spec.VolumeName) { + return nil + } + a.log.Warn("primary marked ready but local state is missing (node may have been reimaged), resetting readiness", + "drbdvolume", dv.Name) + var fresh v1alpha1.DRBDVolume + if err := a.client.Get(ctx, types.NamespacedName{Name: dv.Name}, &fresh); err != nil { + return fmt.Errorf("failed to re-fetch drbdvolume: %w", err) + } + fresh.Status.PrimaryReady = false + fresh.Status.Phase = v1alpha1.VolumePhaseSecondaryAssigned + if err := a.client.Status().Update(ctx, &fresh); err != nil { + return fmt.Errorf("failed to reset primary readiness: %w", err) + } + return nil // Will re-setup on next reconcile } if isSecondary && dv.Status.SecondaryReady { - return nil + if lvm.LvExists(a.log, a.vgName, dv.Spec.VolumeName) && drbd.ResourceExists(dv.Spec.VolumeName) { + return nil + } + a.log.Warn("secondary marked ready but local state is missing (node may have been reimaged), resetting readiness", + "drbdvolume", dv.Name) + var fresh v1alpha1.DRBDVolume + if err := a.client.Get(ctx, types.NamespacedName{Name: dv.Name}, &fresh); err != nil { + return fmt.Errorf("failed to re-fetch drbdvolume: %w", err) + } + fresh.Status.SecondaryReady = false + fresh.Status.Phase = v1alpha1.VolumePhasePrimaryReady + if err := a.client.Status().Update(ctx, &fresh); err != nil { + return fmt.Errorf("failed to reset secondary readiness: %w", err) + } + return nil // Will re-setup on next reconcile } log := a.log.With("drbdvolume", dv.Name, "role", roleString(isPrimary)) @@ -182,6 +230,16 @@ func (a *Agent) reconcile(ctx context.Context, dv *v1alpha1.DRBDVolume) error { return nil } +// cleanupLocalDRBD tears down the DRBD resource and removes the config on this node. +func (a *Agent) cleanupLocalDRBD(volumeName string) { + if _, err := drbd.Down(a.log, volumeName); err != nil { + a.log.Warn("failed to bring down drbd during cleanup (may already be down)", "error", err) + } + if err := drbd.RemoveResourceConfig(a.log, volumeName); err != nil { + a.log.Warn("failed to remove drbd config during cleanup", "error", err) + } +} + // TeardownDRBD tears down a DRBD resource and removes the LV on this node. // Called during volume deletion. func (a *Agent) TeardownDRBD(dv *v1alpha1.DRBDVolume) error {