Skip to content

Commit 00fbb5f

Browse files
committed
Migrate virtual instances using virtctl (KubeVirt)
Signed-off-by: Mathieu Grzybek <[email protected]>
1 parent 96f8d2f commit 00fbb5f

File tree

5 files changed

+244
-1
lines changed

5 files changed

+244
-1
lines changed

Dockerfile

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ RUN set -ex \
2020
&& cp /dist/kured_${TARGETOS}_${TARGETARCH}${SUFFIX}/kured /dist/kured;
2121

2222
FROM alpine:3.21.3@sha256:a8560b36e8b8210634f77d9f7f9efd7ffa463e380b75e2e74aff4511df3ef88c
23-
RUN apk update --no-cache && apk upgrade --no-cache && apk add --no-cache ca-certificates tzdata
23+
RUN echo testing https://dl-cdn.alpinelinux.org/alpine/edge/testing >> /etc/apk/repositories && \
24+
apk update --no-cache && apk upgrade --no-cache && apk add --no-cache ca-certificates tzdata virtctl
2425
COPY --from=bin /dist/kured /usr/bin/kured
2526
ENTRYPOINT ["/usr/bin/kured"]

cmd/kured/main.go

+38
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/kubereboot/kured/pkg/checkers"
2121
"github.com/kubereboot/kured/pkg/daemonsetlock"
2222
"github.com/kubereboot/kured/pkg/delaytick"
23+
"github.com/kubereboot/kured/pkg/evacuators"
2324
"github.com/kubereboot/kured/pkg/reboot"
2425
"github.com/kubereboot/kured/pkg/taints"
2526
"github.com/kubereboot/kured/pkg/timewindow"
@@ -404,6 +405,38 @@ func stripQuotes(str string) string {
404405
return str
405406
}
406407

408+
func evacuate(client *kubernetes.Clientset, node *v1.Node) error {
409+
var err error
410+
411+
nodename := node.GetName()
412+
413+
drainer := &kubectldrain.Helper{
414+
Client: client,
415+
Ctx: context.Background(),
416+
GracePeriodSeconds: drainGracePeriod,
417+
PodSelector: drainPodSelector,
418+
SkipWaitForDeleteTimeoutSeconds: skipWaitForDeleteTimeoutSeconds,
419+
Force: true,
420+
DeleteEmptyDirData: true,
421+
IgnoreAllDaemonSets: true,
422+
ErrOut: os.Stderr,
423+
Out: os.Stdout,
424+
Timeout: drainTimeout,
425+
}
426+
427+
if err = kubectldrain.RunCordonOrUncordon(drainer, node, true); err != nil {
428+
log.Errorf("Error cordonning %s: %v", nodename, err)
429+
return err
430+
}
431+
432+
kubeVirtEvacuator, err := evacuators.NewKubeVirtEvacuator(nodename, client)
433+
if err != nil {
434+
return err
435+
}
436+
437+
return kubeVirtEvacuator.Evacuate()
438+
}
439+
407440
func drain(client *kubernetes.Clientset, node *v1.Node) error {
408441
nodename := node.GetName()
409442

@@ -676,6 +709,11 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.
676709
continue
677710
}
678711
}
712+
// Evacuate VM
713+
err = evacuate(client, node)
714+
if err != nil {
715+
log.Errorf("Error trying to live migrate VMs: %v", err)
716+
}
679717

680718
err = drain(client, node)
681719
if err != nil {

kured-rbac.yaml

+4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ metadata:
66
rules:
77
# Allow kured to read spec.unschedulable
88
# Allow kubectl to drain/uncordon
9+
# Allow kubectl to migrate KubeVirt instances
910
#
1011
# NB: These permissions are tightly coupled to the bundled version of kubectl; the ones below
1112
# match https://github.com/kubernetes/kubernetes/blob/v1.19.4/staging/src/k8s.io/kubectl/pkg/cmd/drain/drain.go
@@ -22,6 +23,9 @@ rules:
2223
- apiGroups: [""]
2324
resources: ["pods/eviction"]
2425
verbs: ["create"]
26+
- apiGroups: [ "subresources.kubevirt.io" ]
27+
resources: [ "virtualmachines/migrate" ]
28+
verbs: ["update" ]
2529
---
2630
apiVersion: rbac.authorization.k8s.io/v1
2731
kind: ClusterRoleBinding

pkg/evacuators/evacuator.go

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package evacuators
2+
3+
// Evacuator is an interface used to implement business logic needed by some components before rebooting a node
4+
type Evacuator interface {
5+
Evacuate() error
6+
}

pkg/evacuators/kubevirt.go

+194
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
package evacuators
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os/exec"
7+
"sync"
8+
"time"
9+
10+
v1 "k8s.io/api/core/v1"
11+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12+
"k8s.io/client-go/kubernetes"
13+
14+
log "github.com/sirupsen/logrus"
15+
)
16+
17+
// KubeVirtEvacuator implements Evacuator interface, managing KubeVirt instances
18+
type KubeVirtEvacuator struct {
19+
client *kubernetes.Clientset // Kubernetes client object inherited from the caller
20+
nodeID string // Kubernetes Node ID
21+
errors []error // Errors created by the evacuation threads
22+
23+
sleepingTime time.Duration // Time in seconds between each VM state change
24+
timeoutCounter int // Number of times checking each VM state change
25+
26+
mutex sync.Mutex // Used to start non-threadsafe commands
27+
}
28+
29+
// NewKubeVirtEvacuator is the constructor
30+
func NewKubeVirtEvacuator(nodeID string, client *kubernetes.Clientset) (*KubeVirtEvacuator, error) {
31+
var result KubeVirtEvacuator
32+
var err error
33+
34+
if client == nil {
35+
err = fmt.Errorf("NewKubeVirtEvacuator: the given clientset is nil")
36+
}
37+
38+
if len(nodeID) == 0 {
39+
err = fmt.Errorf("NewKubeVirtEvacuator: the given nodeID is empty")
40+
}
41+
42+
result.nodeID = nodeID
43+
result.client = client
44+
result.sleepingTime = 30
45+
result.timeoutCounter = 40
46+
47+
return &result, err
48+
}
49+
50+
// Evacuate start the live migration process of the hosted virtual instances
51+
func (k *KubeVirtEvacuator) Evacuate() (err error) {
52+
log.Infof("Evacuate: migration configuration is %v retries every %v", k.timeoutCounter, k.sleepingTime*time.Second)
53+
54+
vms, err := k.getVMRunningOnNode()
55+
56+
if err == nil {
57+
k.startAsyncEvacuation(vms)
58+
59+
for {
60+
if k.timeoutCounter == 0 {
61+
err = fmt.Errorf("Evacuate: timeout exceeded")
62+
break
63+
}
64+
65+
log.Infof("EvacuateVM: %v retries left. %v remaining instances on the node", k.timeoutCounter, vms.Size())
66+
67+
vms, err = k.getVMRunningOnNode()
68+
if err != nil {
69+
err = fmt.Errorf("%v errors occured", len(k.errors))
70+
break
71+
}
72+
73+
if vms.Size() == 0 {
74+
log.Info("Evacuate: Completed.")
75+
break
76+
}
77+
78+
k.countDown()
79+
}
80+
}
81+
82+
return err
83+
}
84+
85+
// startAsyncEvacuation starts one evacuateVM fonction per VM
86+
func (k *KubeVirtEvacuator) startAsyncEvacuation(vms *v1.PodList) {
87+
for _, vm := range vms.Items {
88+
go k.evacuateVM(&vm)
89+
}
90+
}
91+
92+
// countDown counts down the timer
93+
func (k *KubeVirtEvacuator) countDown() {
94+
time.Sleep(k.sleepingTime * time.Second)
95+
k.timeoutCounter = k.timeoutCounter - 1
96+
}
97+
98+
// getVMRunningOnNode gets the virt-launcher pods running on the node
99+
func (k *KubeVirtEvacuator) getVMRunningOnNode() (*v1.PodList, error) {
100+
labelSelector := "kubevirt.io=virt-launcher"
101+
fieldSelector := fmt.Sprintf("spec.nodeName=%s", k.nodeID)
102+
103+
return k.client.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
104+
LabelSelector: labelSelector,
105+
FieldSelector: fieldSelector,
106+
})
107+
}
108+
109+
// evacuateVM starts and monitors the migration of the given virtual instance
110+
func (k *KubeVirtEvacuator) evacuateVM(vm *v1.Pod) {
111+
var newNode string
112+
var err error
113+
114+
vmName := vm.Labels["kubevirt.io/vm"]
115+
116+
if len(vmName) > 2 {
117+
logPrefix := fmt.Sprintf("evacuateVM: %s (ns: %s, pod %s).", vmName, vm.Namespace, vm.Name)
118+
shellCommand := exec.Command("/usr/bin/virtctl", "migrate", vmName, "-n", vm.Namespace)
119+
120+
log.Infof("%s Evacuating from %s", logPrefix, k.nodeID)
121+
122+
k.execCommand(shellCommand)
123+
if err != nil {
124+
err = fmt.Errorf("%s %v", logPrefix, err)
125+
} else {
126+
for {
127+
newNode, err = k.getNodeOfVM(vmName)
128+
if err != nil {
129+
break
130+
}
131+
132+
if k.checkMigrationCompletion(logPrefix, newNode) {
133+
time.Sleep(k.sleepingTime * time.Second)
134+
} else {
135+
break
136+
}
137+
}
138+
}
139+
140+
k.appendError(err)
141+
} else {
142+
log.Infof("given pod %s (ns %s) has an empty VM name. Skipping", vm.Name, vm.Namespace)
143+
}
144+
}
145+
146+
// checkMigrationCompletion return true if the migration is completed
147+
func (k *KubeVirtEvacuator) checkMigrationCompletion(logPrefix, newNode string) (result bool) {
148+
if k.nodeID == newNode {
149+
log.Infof("%s Still on %v", logPrefix, newNode)
150+
} else {
151+
log.Infof("%s Completed.", logPrefix)
152+
result = true
153+
}
154+
155+
return result
156+
}
157+
158+
// appendError append the given error to the internal errors array in a threadsafe way
159+
func (k *KubeVirtEvacuator) appendError(err error) {
160+
if err != nil {
161+
k.mutex.Lock()
162+
k.errors = append(k.errors, err) // TODO: is append threadsafe?
163+
k.mutex.Unlock()
164+
}
165+
}
166+
167+
// execCommand starts the given command in a threadsafe way
168+
func (k *KubeVirtEvacuator) execCommand(command *exec.Cmd) (err error) {
169+
k.mutex.Lock()
170+
defer k.mutex.Unlock()
171+
172+
return command.Run()
173+
}
174+
175+
// getNodeOfVM provides the node ID hosting the given virtual instance
176+
func (k *KubeVirtEvacuator) getNodeOfVM(vmName string) (result string, err error) {
177+
var podList *v1.PodList
178+
179+
if len(vmName) == 0 {
180+
err = fmt.Errorf("getNodeOfVM: the given VM name is empty")
181+
} else {
182+
labelSelector := fmt.Sprintf("kubevirt.io/vm=%s", vmName)
183+
184+
podList, err = k.client.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
185+
LabelSelector: labelSelector,
186+
})
187+
}
188+
189+
if err == nil && podList != nil {
190+
result = podList.Items[0].Spec.NodeName
191+
}
192+
193+
return result, err
194+
}

0 commit comments

Comments
 (0)