Skip to content

Commit

Permalink
chore: create better isolation between isbsvc test cases by adding in…
Browse files Browse the repository at this point in the history
…itial UpgradeInProgress value (#359)

Signed-off-by: Julie Vogelman <[email protected]>
  • Loading branch information
juliev0 authored Oct 25, 2024
1 parent 7da779d commit 76c78af
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 93 deletions.
161 changes: 87 additions & 74 deletions internal/controller/isbservicerollout_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"

Expand Down Expand Up @@ -273,48 +274,51 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) {
pipelineROReconciler = &PipelineRolloutReconciler{queue: util.NewWorkQueue("fake_queue")}

testCases := []struct {
name string
newISBSvcSpec numaflowv1.InterStepBufferServiceSpec
existingISBSvcDef *numaflowv1.InterStepBufferService
existingStatefulSetDef *appsv1.StatefulSet
existingPipelineRollout *apiv1.PipelineRollout
existingPipeline *numaflowv1.Pipeline
existingPauseRequest *bool // was ISBServiceRollout previously requesting pause?
expectedPauseRequest *bool // after reconcile(), should it be requesting pause?
expectedRolloutPhase apiv1.Phase
name string
newISBSvcSpec numaflowv1.InterStepBufferServiceSpec
existingISBSvcDef *numaflowv1.InterStepBufferService
existingStatefulSetDef *appsv1.StatefulSet
existingPipelineRollout *apiv1.PipelineRollout
existingPipeline *numaflowv1.Pipeline
existingPauseRequest *bool // was ISBServiceRollout previously requesting pause?
initialInProgressStrategy apiv1.UpgradeStrategy
expectedPauseRequest *bool // after reconcile(), should it be requesting pause?
expectedRolloutPhase apiv1.Phase
// require these Conditions to be set (note that in real life, previous reconciliations may have set other Conditions from before which are still present)
expectedConditionsSet map[apiv1.ConditionType]metav1.ConditionStatus
expectedISBSvcSpec numaflowv1.InterStepBufferServiceSpec
expectedInProgressStrategy apiv1.UpgradeStrategy
}{
{
name: "new ISBService",
newISBSvcSpec: createDefaultISBServiceSpec("2.10.3"),
existingISBSvcDef: nil,
existingStatefulSetDef: nil,
existingPipelineRollout: createPipelineRollout(numaflowv1.PipelineSpec{InterStepBufferServiceName: defaultISBSvcRolloutName}, map[string]string{}, map[string]string{}),
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhaseRunning),
existingPauseRequest: nil,
expectedPauseRequest: nil,
expectedRolloutPhase: apiv1.PhaseDeployed,
name: "new ISBService",
newISBSvcSpec: createDefaultISBServiceSpec("2.10.3"),
existingISBSvcDef: nil,
existingStatefulSetDef: nil,
existingPipelineRollout: createPipelineRollout(numaflowv1.PipelineSpec{InterStepBufferServiceName: defaultISBSvcRolloutName}, map[string]string{}, map[string]string{}),
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhaseRunning),
existingPauseRequest: nil,
initialInProgressStrategy: apiv1.UpgradeStrategyNoOp,
expectedPauseRequest: nil,
expectedRolloutPhase: apiv1.PhaseDeployed,
expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{
apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue,
},
expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"),
expectedInProgressStrategy: apiv1.UpgradeStrategyNoOp,
},
{
name: "existing ISBService - no change",
newISBSvcSpec: createDefaultISBServiceSpec("2.10.3"),
existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true),
existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true),
existingPipelineRollout: createPipelineRollout(numaflowv1.PipelineSpec{InterStepBufferServiceName: defaultISBSvcRolloutName}, map[string]string{}, map[string]string{}),
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhaseRunning),
existingPauseRequest: &falseValue,
expectedPauseRequest: &falseValue,
expectedRolloutPhase: apiv1.PhaseDeployed,
expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{}, // some Conditions may be set from before, but in any case nothing new to verify
expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"),
name: "existing ISBService - no change",
newISBSvcSpec: createDefaultISBServiceSpec("2.10.3"),
existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true),
existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true),
existingPipelineRollout: createPipelineRollout(numaflowv1.PipelineSpec{InterStepBufferServiceName: defaultISBSvcRolloutName}, map[string]string{}, map[string]string{}),
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhaseRunning),
existingPauseRequest: &falseValue,
initialInProgressStrategy: apiv1.UpgradeStrategyNoOp,
expectedPauseRequest: &falseValue,
expectedRolloutPhase: apiv1.PhaseDeployed,
expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{}, // some Conditions may be set from before, but in any case nothing new to verify
expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"),
},
{
name: "existing ISBService - new spec - pipelines not paused",
Expand All @@ -324,22 +328,24 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) {
existingPipelineRollout: createPipelineRollout(numaflowv1.PipelineSpec{InterStepBufferServiceName: defaultISBSvcRolloutName}, map[string]string{}, map[string]string{}),
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhaseRunning),
existingPauseRequest: &falseValue,
initialInProgressStrategy: apiv1.UpgradeStrategyNoOp,
expectedPauseRequest: &trueValue,
expectedRolloutPhase: apiv1.PhasePending,
expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{apiv1.ConditionPausingPipelines: metav1.ConditionTrue},
expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"),
expectedInProgressStrategy: apiv1.UpgradeStrategyPPND,
},
{
name: "existing ISBService - new spec - pipelines paused",
newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"),
existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true),
existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true),
existingPipelineRollout: createPipelineRollout(numaflowv1.PipelineSpec{InterStepBufferServiceName: defaultISBSvcRolloutName}, map[string]string{}, map[string]string{}),
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePaused),
existingPauseRequest: &trueValue,
expectedPauseRequest: &trueValue,
expectedRolloutPhase: apiv1.PhaseDeployed,
name: "existing ISBService - new spec - pipelines paused",
newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"),
existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true),
existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true),
existingPipelineRollout: createPipelineRollout(numaflowv1.PipelineSpec{InterStepBufferServiceName: defaultISBSvcRolloutName}, map[string]string{}, map[string]string{}),
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePaused),
existingPauseRequest: &trueValue,
initialInProgressStrategy: apiv1.UpgradeStrategyPPND,
expectedPauseRequest: &trueValue,
expectedRolloutPhase: apiv1.PhaseDeployed,
expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{
apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue,
apiv1.ConditionPausingPipelines: metav1.ConditionTrue,
Expand All @@ -348,15 +354,16 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) {
expectedInProgressStrategy: apiv1.UpgradeStrategyPPND,
},
{
name: "existing ISBService - new spec - pipelines failed",
newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"),
existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true),
existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true),
existingPipelineRollout: createPipelineRollout(numaflowv1.PipelineSpec{InterStepBufferServiceName: defaultISBSvcRolloutName}, map[string]string{}, map[string]string{}),
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhaseFailed),
existingPauseRequest: &trueValue,
expectedPauseRequest: &trueValue,
expectedRolloutPhase: apiv1.PhaseDeployed,
name: "existing ISBService - new spec - pipelines failed",
newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"),
existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true),
existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true),
existingPipelineRollout: createPipelineRollout(numaflowv1.PipelineSpec{InterStepBufferServiceName: defaultISBSvcRolloutName}, map[string]string{}, map[string]string{}),
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhaseFailed),
existingPauseRequest: &trueValue,
initialInProgressStrategy: apiv1.UpgradeStrategyPPND,
expectedPauseRequest: &trueValue,
expectedRolloutPhase: apiv1.PhaseDeployed,
expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{
apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue,
apiv1.ConditionPausingPipelines: metav1.ConditionTrue,
Expand All @@ -365,15 +372,16 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) {
expectedInProgressStrategy: apiv1.UpgradeStrategyPPND,
},
{
name: "existing ISBService - new spec - pipelines set to allow data loss",
newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"),
existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhasePending, true),
existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true),
existingPipelineRollout: createPipelineRollout(numaflowv1.PipelineSpec{InterStepBufferServiceName: defaultISBSvcRolloutName}, map[string]string{common.LabelKeyAllowDataLoss: "true"}, map[string]string{}),
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePausing),
existingPauseRequest: &trueValue,
expectedPauseRequest: &trueValue,
expectedRolloutPhase: apiv1.PhaseDeployed,
name: "existing ISBService - new spec - pipelines set to allow data loss",
newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"),
existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhasePending, true),
existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true),
existingPipelineRollout: createPipelineRollout(numaflowv1.PipelineSpec{InterStepBufferServiceName: defaultISBSvcRolloutName}, map[string]string{common.LabelKeyAllowDataLoss: "true"}, map[string]string{}),
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePausing),
existingPauseRequest: &trueValue,
initialInProgressStrategy: apiv1.UpgradeStrategyPPND,
expectedPauseRequest: &trueValue,
expectedRolloutPhase: apiv1.PhaseDeployed,
expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{
apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue,
apiv1.ConditionPausingPipelines: metav1.ConditionTrue,
Expand All @@ -382,31 +390,33 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) {
expectedInProgressStrategy: apiv1.UpgradeStrategyPPND,
},
{
name: "existing ISBService - spec already updated - isbsvc reconciling",
newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"),
existingISBSvcDef: createDefaultISBService("2.10.11", numaflowv1.ISBSvcPhaseRunning, false),
existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", false),
existingPipelineRollout: createPipelineRollout(numaflowv1.PipelineSpec{InterStepBufferServiceName: defaultISBSvcRolloutName}, map[string]string{}, map[string]string{}),
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePaused),
existingPauseRequest: &trueValue,
expectedPauseRequest: &trueValue,
expectedRolloutPhase: apiv1.PhaseDeployed,
name: "existing ISBService - spec already updated - isbsvc reconciling",
newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"),
existingISBSvcDef: createDefaultISBService("2.10.11", numaflowv1.ISBSvcPhaseRunning, false),
existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", false),
existingPipelineRollout: createPipelineRollout(numaflowv1.PipelineSpec{InterStepBufferServiceName: defaultISBSvcRolloutName}, map[string]string{}, map[string]string{}),
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePaused),
existingPauseRequest: &trueValue,
initialInProgressStrategy: apiv1.UpgradeStrategyPPND,
expectedPauseRequest: &trueValue,
expectedRolloutPhase: apiv1.PhaseDeployed,
expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{
apiv1.ConditionPausingPipelines: metav1.ConditionTrue,
},
expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"),
expectedInProgressStrategy: apiv1.UpgradeStrategyPPND,
},
{
name: "existing ISBService - spec already updated - isbsvc done reconciling",
newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"),
existingISBSvcDef: createDefaultISBService("2.10.11", numaflowv1.ISBSvcPhaseRunning, true),
existingStatefulSetDef: createDefaultISBStatefulSet("2.10.11", true),
existingPipelineRollout: createPipelineRollout(numaflowv1.PipelineSpec{InterStepBufferServiceName: defaultISBSvcRolloutName}, map[string]string{}, map[string]string{}),
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePaused),
existingPauseRequest: &trueValue,
expectedPauseRequest: &falseValue,
expectedRolloutPhase: apiv1.PhaseDeployed,
name: "existing ISBService - spec already updated - isbsvc done reconciling",
newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"),
existingISBSvcDef: createDefaultISBService("2.10.11", numaflowv1.ISBSvcPhaseRunning, true),
existingStatefulSetDef: createDefaultISBStatefulSet("2.10.11", true),
existingPipelineRollout: createPipelineRollout(numaflowv1.PipelineSpec{InterStepBufferServiceName: defaultISBSvcRolloutName}, map[string]string{}, map[string]string{}),
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePaused),
existingPauseRequest: &trueValue,
initialInProgressStrategy: apiv1.UpgradeStrategyPPND,
expectedPauseRequest: &falseValue,
expectedRolloutPhase: apiv1.PhaseDeployed,
expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{
apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue,
},
Expand Down Expand Up @@ -457,6 +467,9 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) {
pm := GetPauseModule()
pm.pauseRequests[pm.getISBServiceKey(defaultNamespace, defaultISBSvcRolloutName)] = tc.existingPauseRequest

rollout.Status.UpgradeInProgress = tc.initialInProgressStrategy
r.inProgressStrategyMgr.store.setStrategy(k8stypes.NamespacedName{Namespace: defaultNamespace, Name: defaultPipelineRolloutName}, tc.initialInProgressStrategy)

// call reconcile()
_, err = r.reconcile(ctx, rollout, time.Now())
assert.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func Test_reconcile_numaflowcontrollerrollout_PPND(t *testing.T) {
newControllerVersion: "1.2.1",
existingController: createDeploymentDefinition("quay.io/numaproj/numaflow:v1.2.0", false),
existingPipelineRollout: createPipelineRollout(numaflowv1.PipelineSpec{InterStepBufferServiceName: defaultISBSvcRolloutName}, map[string]string{}, map[string]string{}),
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePausing),
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePausing), // could be pausing for another reason such as Pipeline updating
existingPauseRequest: &falseValue,
expectedPauseRequest: &trueValue,
expectedRolloutPhase: apiv1.PhasePending,
Expand Down
Loading

0 comments on commit 76c78af

Please sign in to comment.