Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: testing my mvtx pr #4

Closed
wants to merge 12 commits into from
10 changes: 5 additions & 5 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: Setup Golang
uses: actions/[email protected]
with:
go-version: '1.22'
go-version: '1.22.7'

- name: Add bins to PATH
run: |
Expand Down Expand Up @@ -57,12 +57,12 @@ jobs:
- name: Setup Golang
uses: actions/setup-go@v5
with:
go-version: '1.22'
go-version: '1.22.7'

- name: golangci-lint
uses: golangci/golangci-lint-action@v6
with:
version: v1.58
version: v1.61
args: --timeout=10m

unit-tests:
Expand All @@ -76,7 +76,7 @@ jobs:
- name: Setup Golang
uses: actions/[email protected]
with:
go-version: '1.22'
go-version: '1.22.7'
id: go

- name: Install MockGen
Expand Down Expand Up @@ -128,7 +128,7 @@ jobs:
- name: Setup Golang
uses: actions/[email protected]
with:
go-version: '1.22'
go-version: '1.22.7'

- name: Add bins to PATH
run: |
Expand Down
18 changes: 9 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
module github.com/numaproj/numaplane

go 1.22
go 1.22.7

toolchain go1.22.4
toolchain go1.23.2

require (
github.com/argoproj/argo-cd/v2 v2.11.7
Expand All @@ -25,7 +25,7 @@ require (

require (
github.com/argoproj/pkg v0.13.7-0.20230626144333-d56162821bd1
github.com/numaproj/numaflow v0.0.0-20240920210944-669dc186a0d8
github.com/numaproj/numaflow v1.3.3
github.com/prometheus/client_golang v1.18.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/utils v0.0.0-20230726121419-3b25d923346b
Expand Down Expand Up @@ -115,12 +115,12 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/oauth2 v0.20.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/term v0.23.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/mod v0.20.0 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/term v0.24.0 // indirect
golang.org/x/text v0.18.0 // indirect
golang.org/x/time v0.6.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
Expand Down
28 changes: 14 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -980,8 +980,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/numaproj/numaflow v0.0.0-20240920210944-669dc186a0d8 h1:DzM8300bBxSy32Rsb2SSv6Rbmk7olZYvO6oYQtpZXJA=
github.com/numaproj/numaflow v0.0.0-20240920210944-669dc186a0d8/go.mod h1:koYtNRsPfx4pi6Ti7PTOqYawz7o49ZPAxa8EVluNr5M=
github.com/numaproj/numaflow v1.3.3 h1:+OvB1ryj/pBb+Ny/BOMy5A7Kf7rzJz1gm5x2IyVdcis=
github.com/numaproj/numaflow v1.3.3/go.mod h1:/qvtND1Fkvtw+xM4zpRaglCZ++eE0kqbKcPOgFD87Uw=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852/go.mod h1:eqOVx5Vwu4gd2mmMZvVZsgIqNSaW3xxRThUJ0k/TPk4=
Expand Down Expand Up @@ -1232,8 +1232,8 @@ golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.20.0 h1:utOm6MM3R3dnawAiJgn0y+xvuYRsm1RKM/4giyfDgV0=
golang.org/x/mod v0.20.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -1306,8 +1306,8 @@ golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -1339,8 +1339,8 @@ golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw
golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4=
golang.org/x/oauth2 v0.8.0/go.mod h1:yr7u4HXZRm1R1kBWqr/xKNqewf0plRYoB7sla+BCIXE=
golang.org/x/oauth2 v0.10.0/go.mod h1:kTpgurOux7LqtuxjuyZa4Gj2gdezIt/jQtGnNFfypQI=
golang.org/x/oauth2 v0.20.0 h1:4mQdhULixXKP1rwYBW0vAijoXnkTG0BLCDRzfe1idMo=
golang.org/x/oauth2 v0.20.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs=
golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down Expand Up @@ -1459,8 +1459,8 @@ golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.0.0-20220526004731-065cf7ba2467/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand All @@ -1480,8 +1480,8 @@ golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU=
golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk=
golang.org/x/term v0.24.0 h1:Mh5cbb+Zk2hqqXNO7S1iTjEphVL+jb8ZWaqh/g+JWkM=
golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -1503,8 +1503,8 @@ golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
29 changes: 29 additions & 0 deletions internal/controller/monovertexrollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controller
import (
"context"
"fmt"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -313,12 +314,40 @@ func (r *MonoVertexRolloutReconciler) processMonoVertexStatus(ctx context.Contex
rollout.Status.MarkChildResourcesUnhealthy("Progressing", "MonoVertex Progressing", rollout.Generation)
} else if monoVertexPhase == numaflowv1.MonoVertexPhaseFailed || monoVertexChildResourceStatus == "False" {
rollout.Status.MarkChildResourcesUnhealthy("MonoVertexFailed", "MonoVertex Failed", rollout.Generation)
} else if monoVertexPhase == numaflowv1.MonoVertexPhasePaused {
rollout.Status.MarkChildResourcesHealthUnknown("MonoVertexUnknown", "MonoVertex Pausing - health unknown", rollout.Generation)
} else if monoVertexPhase == numaflowv1.MonoVertexPhaseUnknown || monoVertexChildResourceStatus == "Unknown" {
rollout.Status.MarkChildResourcesHealthUnknown("MonoVertexUnkown", "MonoVertex Phase Unknown", rollout.Generation)
} else {
rollout.Status.MarkChildResourcesHealthy(rollout.Generation)
}

r.setChildResourcesPauseCondition(rollout, monoVertexPhase)

}

func (r *MonoVertexRolloutReconciler) setChildResourcesPauseCondition(rollout *apiv1.MonoVertexRollout, mvtxPhase numaflowv1.MonoVertexPhase) {

if mvtxPhase == numaflowv1.MonoVertexPhasePaused {
// TODO: METRICS
// if BeginTime hasn't been set yet, we must have just started pausing - set it
// if rollout.Status.PauseStatus.LastPauseBeginTime == metav1.NewTime(initTime) || !rollout.Status.PauseStatus.LastPauseBeginTime.After(rollout.Status.PauseStatus.LastPauseEndTime.Time) {
// rollout.Status.PauseStatus.LastPauseBeginTime = metav1.NewTime(time.Now())
// }
reason := fmt.Sprintf("MonoVertex%s", string(mvtxPhase))
msg := fmt.Sprintf("MonoVertex %s", strings.ToLower(string(mvtxPhase)))
// r.updatePauseMetric(rollout)
rollout.Status.MarkMonoVertexPaused(reason, msg, rollout.Generation)
} else {
// only set EndTime if BeginTime has been previously set AND EndTime is before/equal to BeginTime
// EndTime is either just initialized or the end of a previous pause which is why it will be before the new BeginTime
// if (rollout.Status.PauseStatus.LastPauseBeginTime != metav1.NewTime(initTime)) && !rollout.Status.PauseStatus.LastPauseEndTime.After(rollout.Status.PauseStatus.LastPauseBeginTime.Time) {
// rollout.Status.PauseStatus.LastPauseEndTime = metav1.NewTime(time.Now())
// r.updatePauseMetric(rollout)
// }
rollout.Status.MarkMonoVertexUnpaused(rollout.Generation)
}

}

func (r *MonoVertexRolloutReconciler) needsUpdate(old, new *apiv1.MonoVertexRollout) bool {
Expand Down
13 changes: 13 additions & 0 deletions pkg/apis/numaplane/v1alpha1/monovertexrollout_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import (
"k8s.io/apimachinery/pkg/runtime"
)

const (
// ConditionMonoVertexPausingOrPaused indicates that the MonoVertex is either pausing or paused.
ConditionMonoVertexPausingOrPaused ConditionType = "MonoVertexPausingOrPaused"
)

// MonoVertexRolloutSpec defines the desired state of MonoVertexRollout
type MonoVertexRolloutSpec struct {
MonoVertex MonoVertex `json:"monoVertex"`
Expand Down Expand Up @@ -63,6 +68,14 @@ func init() {
SchemeBuilder.Register(&MonoVertexRollout{}, &MonoVertexRolloutList{})
}

func (status *MonoVertexRolloutStatus) MarkMonoVertexPaused(reason, message string, generation int64) {
status.MarkTrueWithReason(ConditionMonoVertexPausingOrPaused, reason, message, generation)
}

func (status *MonoVertexRolloutStatus) MarkMonoVertexUnpaused(generation int64) {
status.MarkFalse(ConditionMonoVertexPausingOrPaused, "Unpaused", "MonoVertex unpaused", generation)
}

// IsHealthy indicates whether the MonoVertexRollout is healthy.
func (mv *MonoVertexRolloutStatus) IsHealthy() bool {
return mv.Phase == PhaseDeployed || mv.Phase == PhasePending
Expand Down
64 changes: 63 additions & 1 deletion tests/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ var (
},
}

monoVertexSpec = numaflowv1.MonoVertexSpec{
currentMonoVertexSpec numaflowv1.MonoVertexSpec
monoVertexSpec = numaflowv1.MonoVertexSpec{
Replicas: ptr.To(int32(1)),
Source: &numaflowv1.Source{
UDSource: &numaflowv1.UDSource{
Expand Down Expand Up @@ -317,6 +318,8 @@ var _ = Describe("Functional e2e", Serial, func() {

})

currentMonoVertexSpec = monoVertexSpec

time.Sleep(2 * time.Second)

It("Should automatically heal a Pipeline if it is updated directly", func() {
Expand Down Expand Up @@ -460,6 +463,65 @@ var _ = Describe("Functional e2e", Serial, func() {
verifyPipelineRunning(Namespace, pipelineName, 3)
})

It("Should pause the MonoVertex if user requests it", func() {

currentMonoVertexSpec.Lifecycle.DesiredPhase = numaflowv1.MonoVertexPhasePaused

document("setting desiredPhase=Paused")

rawSpec, err := json.Marshal(currentMonoVertexSpec)
Expect(err).ShouldNot(HaveOccurred())

// update the MonoVertexRollout
updateMonoVertexRolloutInK8S(monoVertexRolloutName, func(rollout apiv1.MonoVertexRollout) (apiv1.MonoVertexRollout, error) {
rollout.Spec.MonoVertex.Spec.Raw = rawSpec
return rollout, nil
})
document("verifying MonoVertexRollout spec deployed")
verifyMonoVertexRolloutDeployed(monoVertexRolloutName)

// Give it a little while to get to Paused and then verify that it stays in Paused
verifyMonoVertexPaused(Namespace, monoVertexRolloutName, monoVertexRolloutName)
document("verifying MonoVertex stays in paused or otherwise pausing")
Consistently(func() bool {
rollout, _ := monoVertexRolloutClient.Get(ctx, monoVertexRolloutName, metav1.GetOptions{})
_, _, retrievedMonoVertexStatus, err := getMonoVertexFromK8S(Namespace, monoVertexRolloutName)
if err != nil {
return false
}
return getRolloutCondition(rollout.Status.Conditions, apiv1.ConditionMonoVertexPausingOrPaused) == metav1.ConditionTrue &&
(retrievedMonoVertexStatus.Phase == numaflowv1.MonoVertexPhasePaused)
}, 1*time.Minute, testPollingInterval).Should(BeTrue())

verifyInProgressStrategy(monoVertexRolloutName, apiv1.UpgradeStrategyNoOp)

verifyPodsRunning(Namespace, 0, getVertexLabelSelector(monoVertexRolloutName))
})

time.Sleep(2 * time.Second)

It("Should resume the MonoVertex if user requests it", func() {

currentMonoVertexSpec.Lifecycle.DesiredPhase = numaflowv1.MonoVertexPhaseRunning

document("setting desiredPhase=Running")

rawSpec, err := json.Marshal(currentMonoVertexSpec)
Expect(err).ShouldNot(HaveOccurred())

// update the MonoVertexRollout
updateMonoVertexRolloutInK8S(monoVertexRolloutName, func(rollout apiv1.MonoVertexRollout) (apiv1.MonoVertexRollout, error) {
rollout.Spec.MonoVertex.Spec.Raw = rawSpec
return rollout, nil
})

document("verifying MonoVertexRollout spec deployed")
verifyMonoVertexRolloutDeployed(monoVertexRolloutName)
verifyMonoVertexRolloutHealthy(monoVertexRolloutName)

verifyInProgressStrategy(monoVertexRolloutName, apiv1.UpgradeStrategyNoOp)
})

time.Sleep(2 * time.Second)

It("Should update the child NumaflowController if the NumaflowControllerRollout is updated", func() {
Expand Down
72 changes: 72 additions & 0 deletions tests/e2e/monovertex.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,75 @@ func watchMonoVertex() {
}

}

func verifyMonoVertexPaused(namespace string, monoVertexRolloutName string, monoVertexName string) {

document("Verify that MonoVertex Rollout condition is Pausing/Paused")
Eventually(func() metav1.ConditionStatus {
rollout, _ := monoVertexRolloutClient.Get(ctx, monoVertexRolloutName, metav1.GetOptions{})
return getRolloutCondition(rollout.Status.Conditions, apiv1.ConditionMonoVertexPausingOrPaused)
}, testTimeout).Should(Equal(metav1.ConditionTrue))

document("Verify that MonoVertex is paused")
verifyMonoVertexStatusEventually(namespace, monoVertexName,
func(retrievedMonoVertexSpec numaflowv1.MonoVertexSpec, retrievedMonoVertexStatus numaflowv1.MonoVertexStatus) bool {
return retrievedMonoVertexStatus.Phase == numaflowv1.MonoVertexPhasePaused
})
}

func verifyMonoVertexStatusEventually(namespace string, monoVertexName string, f func(numaflowv1.MonoVertexSpec, numaflowv1.MonoVertexStatus) bool) {
Eventually(func() bool {
_, retrievedMonoVertexSpec, retrievedMonoVertexStatus, err := getMonoVertexFromK8S(namespace, monoVertexName)
return err == nil && f(retrievedMonoVertexSpec, retrievedMonoVertexStatus)
}, testTimeout).Should(BeTrue())
}

func getMonoVertexFromK8S(namespace string, monoVertexName string) (*unstructured.Unstructured, numaflowv1.MonoVertexSpec, numaflowv1.MonoVertexStatus, error) {
var retrievedMonoVertexSpec numaflowv1.MonoVertexSpec
var retrievedMonoVertexStatus numaflowv1.MonoVertexStatus

unstruct, err := dynamicClient.Resource(getGVRForMonoVertex()).Namespace(namespace).Get(ctx, monoVertexName, metav1.GetOptions{})
if err != nil {
return nil, retrievedMonoVertexSpec, retrievedMonoVertexStatus, err
}
retrievedMonoVertexSpec, err = getMonoVertexSpec(unstruct)
if err != nil {
return unstruct, retrievedMonoVertexSpec, retrievedMonoVertexStatus, err
}

retrievedMonoVertexStatus, err = getMonoVertexStatus(unstruct)

if err != nil {
return unstruct, retrievedMonoVertexSpec, retrievedMonoVertexStatus, err
}
return unstruct, retrievedMonoVertexSpec, retrievedMonoVertexStatus, nil
}

func getMonoVertexStatus(u *unstructured.Unstructured) (numaflowv1.MonoVertexStatus, error) {
statusMap := u.Object["status"]
var status numaflowv1.MonoVertexStatus
err := util.StructToStruct(&statusMap, &status)
return status, err
}

func verifyMonoVertexRolloutHealthy(monoVertexRolloutName string) {
document("Verifying that the MonoVertexRollout Child Condition is Healthy")
Eventually(func() metav1.ConditionStatus {
rollout, _ := monoVertexRolloutClient.Get(ctx, monoVertexRolloutName, metav1.GetOptions{})
return getRolloutCondition(rollout.Status.Conditions, apiv1.ConditionChildResourceHealthy)
}, testTimeout, testPollingInterval).Should(Equal(metav1.ConditionTrue))
}

func verifyMonoVertexRolloutDeployed(monoVertexRolloutName string) {
document("Verifying that the MonoVertexRollout is Deployed")
Eventually(func() bool {
rollout, _ := monoVertexRolloutClient.Get(ctx, monoVertexRolloutName, metav1.GetOptions{})
return rollout.Status.Phase == apiv1.PhaseDeployed
}, testTimeout, testPollingInterval).Should(BeTrue())

Eventually(func() metav1.ConditionStatus {
rollout, _ := monoVertexRolloutClient.Get(ctx, monoVertexRolloutName, metav1.GetOptions{})
return getRolloutCondition(rollout.Status.Conditions, apiv1.ConditionChildResourceDeployed)
}, testTimeout, testPollingInterval).Should(Equal(metav1.ConditionTrue))

}
Loading