Skip to content

Commit

Permalink
Auto healing (#66)
Browse files Browse the repository at this point in the history
Signed-off-by: Antonino Fugazzotto <[email protected]>
  • Loading branch information
afugazzotto authored Jun 20, 2024
1 parent 58d4a53 commit 60c01a9
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 58 deletions.
33 changes: 30 additions & 3 deletions internal/controller/isbservicerollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -167,21 +168,38 @@ func (r *ISBServiceRolloutReconciler) reconcile(ctx context.Context, isbServiceR
Spec: isbServiceRollout.Spec.InterStepBufferService,
}

specHash, err := util.CalculateSpecHash(isbServiceRollout.Spec.InterStepBufferService)
previousRolloutSpecHash := isbServiceRollout.Annotations[apiv1.KeyHash]
currentRolloutSpecHash, err := util.CalculateSpecHash(isbServiceRollout.Spec.InterStepBufferService)
if err != nil {
numaLogger.Errorf(err, "failed to calculate spec hash from rollout CR: %v", err)
isbServiceRollout.Status.MarkFailed("ApplyISBServiceFailure", err.Error())
return err
}

err = kubernetes.ApplyCRSpec(ctx, r.restConfig, &obj, "interstepbufferservices", specHash, isbServiceRollout.Annotations[apiv1.KeyHash])
// Calculate child resource spec hash only if no changes are necessary by the rollout spec comparison
isbsvcSpecHash := ""
if currentRolloutSpecHash == previousRolloutSpecHash {
isbsvcSpecHash, err = r.calculateChildSpecHash(ctx, &obj)
if err != nil {
numaLogger.Errorf(err, "failed to calculate spec hash from ISBService CR: %v", err)
isbServiceRollout.Status.MarkFailed("ApplyISBServiceFailure", err.Error())
return err
}
}

shouldUpdateCR := true
if currentRolloutSpecHash == previousRolloutSpecHash && currentRolloutSpecHash == isbsvcSpecHash {
shouldUpdateCR = false
}

err = kubernetes.ApplyCRSpec(ctx, r.restConfig, &obj, "interstepbufferservices", shouldUpdateCR)
if err != nil {
numaLogger.Errorf(err, "failed to apply CR: %v", err)
isbServiceRollout.Status.MarkFailed("ApplyISBServiceFailure", err.Error())
return err
}

kubernetes.SetAnnotation(isbServiceRollout, apiv1.KeyHash, specHash)
kubernetes.SetAnnotation(isbServiceRollout, apiv1.KeyHash, currentRolloutSpecHash)

// after the Apply, Get the ISBService so that we can propagate its health into our Status
isbsvc, err := kubernetes.GetCR(ctx, r.restConfig, &obj, "interstepbufferservices")
Expand All @@ -197,6 +215,15 @@ func (r *ISBServiceRolloutReconciler) reconcile(ctx context.Context, isbServiceR
return nil
}

func (r *ISBServiceRolloutReconciler) calculateChildSpecHash(ctx context.Context, obj *kubernetes.GenericObject) (string, error) {
isbsvc, err := kubernetes.GetCR(ctx, r.restConfig, obj, "interstepbufferservices")
if err != nil {
return "", fmt.Errorf("error retrieving InterStepBufferService CR: %v", err)
}

return util.CalculateSpecHash(isbsvc.Spec)
}

func processISBServiceStatus(ctx context.Context, isbsvc *kubernetes.GenericObject, rollout *apiv1.ISBServiceRollout) {
numaLogger := logger.FromContext(ctx)
isbsvcStatus, err := kubernetes.ParseStatus(isbsvc)
Expand Down
27 changes: 27 additions & 0 deletions internal/controller/isbservicerollout_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var _ = Describe("ISBServiceRollout Controller", func() {
namespace = "default"
isbServiceRolloutName = "isbservicerollout-test"
timeout = 10 * time.Second
duration = 10 * time.Second
interval = 250 * time.Millisecond
)

Expand Down Expand Up @@ -255,6 +256,32 @@ var _ = Describe("ISBServiceRollout Controller", func() {

})

It("Should auto heal the InterStepBufferService with the ISBServiceRollout interstepbufferservice spec when the InterStepBufferService spec is changed", func() {
By("updating the InterStepBufferService")
currentISBService := &numaflowv1.InterStepBufferService{}
Expect(k8sClient.Get(ctx, resourceLookupKey, currentISBService)).To(Succeed())

originalJetstreamVersion := currentISBService.Spec.JetStream.Version
newJetstreamVersion := "1.2.3"
currentISBService.Spec.JetStream.Version = newJetstreamVersion

Expect(k8sClient.Update(ctx, currentISBService)).ToNot(HaveOccurred())

By("Verifying the changed field of the InterStepBufferService is the same as the original and not the modified version")
e := Consistently(func() (string, error) {
updatedResource := &numaflowv1.InterStepBufferService{}
err := k8sClient.Get(ctx, resourceLookupKey, updatedResource)
if err != nil {
return "", err
}

return updatedResource.Spec.JetStream.Version, nil
}, duration, interval)

e.Should(Equal(originalJetstreamVersion))
e.ShouldNot(Equal(newJetstreamVersion))
})

It("Should delete the ISBServiceRollout and InterStepBufferService", func() {
Expect(k8sClient.Delete(ctx, &apiv1.ISBServiceRollout{
ObjectMeta: isbServiceRollout.ObjectMeta,
Expand Down
142 changes: 91 additions & 51 deletions internal/controller/pipelinerollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package controller

import (
"context"
"encoding/json"
"fmt"
"time"

"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -179,15 +181,15 @@ func (r *PipelineRolloutReconciler) reconcile(
Spec: pipelineRollout.Spec.Pipeline,
}

specHash, err := util.CalculateSpecHash(pipelineRollout.Spec.Pipeline)
currentRolloutSpecHash, err := util.CalculateSpecHash(pipelineRollout.Spec.Pipeline)
if err != nil {
numaLogger.Errorf(err, "failed to calculate spec hash from rollout CR: %v", err)
pipelineRollout.Status.MarkFailed("ApplyPipelineFailure", err.Error())
return false, err
}

// Get the object to see if it exists
_, err = kubernetes.GetResource(ctx, r.restConfig, &obj, "pipelines")
pipeline, err := kubernetes.GetCR(ctx, r.restConfig, &obj, "pipelines")
if err != nil {
// create object as it doesn't exist
if apierrors.IsNotFound(err) {
Expand All @@ -196,76 +198,106 @@ func (r *PipelineRolloutReconciler) reconcile(
return false, err
}

kubernetes.SetAnnotation(pipelineRollout, apiv1.KeyHash, specHash)
kubernetes.SetAnnotation(pipelineRollout, apiv1.KeyHash, currentRolloutSpecHash)

pipelineRollout.Status.MarkRunning()

return false, nil
}
} else {
// If the pipeline already exists, first check if the pipeline status
// is pausing. If so, re-enqueue immediately.
pipeline, err := kubernetes.GetCR(ctx, r.restConfig, &obj, "pipelines")

return false, fmt.Errorf("error getting Pipeline: %v", err)
}

// Calculate child resource spec hash only if no changes are necessary by the rollout spec comparison
pipelineSpecHash := ""
if currentRolloutSpecHash == pipelineRollout.Annotations[apiv1.KeyHash] {
pipelineSpecHash, err = calculateChildSpecHash(pipeline)
if err != nil {
numaLogger.Errorf(err, "failed to get Pipeline: %v", err)
numaLogger.Errorf(err, "failed to calculate spec hash from Pipeline CR: %v", err)
return false, err
}
pausing := isPipelinePausing(ctx, pipeline)
if pausing {
// re-enqueue
return true, nil
}

// Check if the pipeline status is paused. If so, apply the change and
// resume.
paused := isPipelinePaused(ctx, pipeline)
if paused {
// Apply the new spec and resume the pipeline
// TODO: in the future, need to take into account whether Numaflow Controller
// or ISBService is being installed to determine whether it's safe to unpause
newObj, err := setPipelineDesiredStatus(&obj, "Running")
if err != nil {
return false, err
}
obj = *newObj
}

err = applyPipelineSpec(ctx, r.restConfig, &obj, pipelineRollout, specHash)
if err != nil {
return false, err
}
// If the pipeline already exists, first check if the pipeline status
// is pausing. If so, re-enqueue immediately.
pausing := isPipelinePausing(ctx, pipeline)
if pausing {
// re-enqueue
return true, nil
}

return false, nil
// Check if the pipeline status is paused. If so, apply the change and
// resume.
paused := isPipelinePaused(ctx, pipeline)
if paused {
// Apply the new spec and resume the pipeline
// TODO: in the future, need to take into account whether Numaflow Controller
// or ISBService is being installed to determine whether it's safe to unpause
newObj, err := setPipelineDesiredStatus(&obj, "Running")
if err != nil {
return false, err
}
obj = *newObj

// If pipeline status is not above, detect if pausing is required.
shouldPause, err := needsPausing(pipeline, &obj)
err = applyPipelineSpec(ctx, r.restConfig, &obj, pipelineRollout, currentRolloutSpecHash, pipelineSpecHash)
if err != nil {
return false, err
}
if shouldPause {
// Use the existing spec, then pause and re-enqueue
obj.Spec = pipeline.Spec
newObj, err := setPipelineDesiredStatus(&obj, "Paused")
if err != nil {
return false, err
}
obj = *newObj

err = applyPipelineSpec(ctx, r.restConfig, &obj, pipelineRollout, specHash)
if err != nil {
return false, err
}
return true, err
return false, nil
}

// If pipeline status is not above, detect if pausing is required.
shouldPause, err := needsPausing(pipeline, &obj)
if err != nil {
return false, err
}
if shouldPause {
// Use the existing spec, then pause and re-enqueue
obj.Spec = pipeline.Spec
newObj, err := setPipelineDesiredStatus(&obj, "Paused")
if err != nil {
return false, err
}
obj = *newObj

// If no need to pause, just apply the spec
err = applyPipelineSpec(ctx, r.restConfig, &obj, pipelineRollout, specHash)
err = applyPipelineSpec(ctx, r.restConfig, &obj, pipelineRollout, currentRolloutSpecHash, pipelineSpecHash)
if err != nil {
return false, err
}
return true, err
}

// If no need to pause, just apply the spec
err = applyPipelineSpec(ctx, r.restConfig, &obj, pipelineRollout, currentRolloutSpecHash, pipelineSpecHash)
if err != nil {
return false, err
}

pipelineRollout.Status.MarkRunning()

return false, nil
}

func calculateChildSpecHash(pipeline *kubernetes.GenericObject) (string, error) {
// Remove the lifecycle field from pipelineObj.Spec before calculating the hash

pipelineRawSpecAsMap := map[string]any{}
err := json.Unmarshal(pipeline.Spec.Raw, &pipelineRawSpecAsMap)
if err != nil {
return "", fmt.Errorf("unable to unmarshal Pipeline object spec to map: %v", err)
}

delete(pipelineRawSpecAsMap, "lifecycle")

rawSpec, err := json.Marshal(pipelineRawSpecAsMap)
if err != nil {
return "", fmt.Errorf("unable to marshal map to Pipeline object spec: %v", err)
}

return util.CalculateSpecHash(runtime.RawExtension{Raw: rawSpec})
}

// Set the Condition in the Status for child resource health
func processPipelineStatus(ctx context.Context, pipeline *kubernetes.GenericObject, pipelineRollout *apiv1.PipelineRollout) {
numaLogger := logger.FromContext(ctx)
Expand Down Expand Up @@ -370,19 +402,27 @@ func applyPipelineSpec(
restConfig *rest.Config,
obj *kubernetes.GenericObject,
pipelineRollout *apiv1.PipelineRollout,
specHash string,
currentRolloutSpecHash string,
pipelineSpecHash string,
) error {
numaLogger := logger.FromContext(ctx)

// TODO: compare the in-cluster Pipeline lifecycle state when implementing dedicated logic managing desired pipeline lifecycle.
// Ideally, create a function to perform various checks and return if the CR should be updated or not.
shouldUpdateCR := true
if currentRolloutSpecHash == pipelineRollout.Annotations[apiv1.KeyHash] && currentRolloutSpecHash == pipelineSpecHash {
shouldUpdateCR = false
}

// TODO: use UpdateSpec instead
err := kubernetes.ApplyCRSpec(ctx, restConfig, obj, "pipelines", specHash, pipelineRollout.Annotations[apiv1.KeyHash])
err := kubernetes.ApplyCRSpec(ctx, restConfig, obj, "pipelines", shouldUpdateCR)
if err != nil {
numaLogger.Errorf(err, "failed to apply Pipeline: %v", err)
pipelineRollout.Status.MarkFailed("ApplyPipelineFailure", err.Error())
return err
}

kubernetes.SetAnnotation(pipelineRollout, apiv1.KeyHash, specHash)
kubernetes.SetAnnotation(pipelineRollout, apiv1.KeyHash, currentRolloutSpecHash)

// after the Apply, Get the Pipeline so that we can propagate its health into our Status
pipeline, err := kubernetes.GetCR(ctx, restConfig, obj, "pipelines")
Expand Down
Loading

0 comments on commit 60c01a9

Please sign in to comment.