Skip to content

Commit

Permalink
Merge pull request #138 from cdapio/controller_skip_preupgrade
Browse files Browse the repository at this point in the history
Refactor upgrade logic to skip pre upgrade job in case of patch revisions
  • Loading branch information
anshumanks authored Feb 14, 2025
2 parents 6f4b3fa + 889480a commit f949c2b
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 86 deletions.
1 change: 1 addition & 0 deletions controllers/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const (
confTwillSecurityWorkerSecretDiskPath = "twill.security.worker.secret.disk.path"
confJMXServerPort = "jmx.metrics.collector.server.port"
confSecretMountDefaultMode = "secret.mount.default.mode"
confSkipPreUpgrade = "cdap-operator.preupgrade-job.skip"

// default values
defaultImage = "gcr.io/cdapio/cdap:latest"
Expand Down
165 changes: 90 additions & 75 deletions controllers/version_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,21 @@ func init() {
/////////////////////////////////////////////////////////////

func handleVersionUpdate(master *v1alpha1.CDAPMaster, labels map[string]string, observed []reconciler.Object) ([]reconciler.Object, error) {
curVersion, err := getCurrentImageVersion(master)
if err != nil {
return nil, err
}
newVersion, err := getNewImageVersion(master)
if err != nil {
return nil, err
}
versionComparison := compareVersion(curVersion, newVersion)
patchRevision := versionComparison == -4

// Let the current update complete if there is any
if isConditionTrue(master, updateStatus.Inprogress) {
log.Printf("Version update ingress. Continue... ")
return upgradeForBackend(master, labels, observed)
return upgradeForBackend(master, labels, observed, patchRevision)
}

if objs, versionUpdated, err := updateForUserInterface(master); err != nil {
Expand All @@ -45,23 +56,13 @@ func handleVersionUpdate(master *v1alpha1.CDAPMaster, labels map[string]string,
}

// Update backend service image version
curVersion, err := getCurrentImageVersion(master)
if err != nil {
return nil, err
}
newVersion, err := getNewImageVersion(master)
if err != nil {
return nil, err
}
if len(curVersion.rawString) == 0 {
setImageToUse(master)
return []reconciler.Object{}, nil
}

switch compareVersion(curVersion, newVersion) {
case -1:
// Upgrade case

if versionComparison < 0 {
// Upgrade case.
// Don't retry upgrade if it failed.
if isConditionTrue(master, updateStatus.UpgradeFailed) {
return []reconciler.Object{}, nil
Expand All @@ -73,15 +74,14 @@ func handleVersionUpdate(master *v1alpha1.CDAPMaster, labels map[string]string,
setCondition(master, updateStatus.Inprogress)
master.Status.UpgradeStartTimeMillis = getCurrentTimeMs()
log.Printf("Version update: start upgrading %s -> %s ", curVersion.rawString, newVersion.rawString)
return upgradeForBackend(master, labels, observed)
case 0:
return upgradeForBackend(master, labels, observed, patchRevision)
} else if versionComparison == 0 {
// No change.
// Reset all condition so that failed upgraded/downgrade can be retried later if needed.
// This is needed when last upgrade failed and user has reset the version in spec.
updateStatus.clearAllConditions(master)
break
case 1:
// Downgrade

} else {
// Downgrade case.
// At the moment, downgrade never fails, so no need to check if isConditionTrue(downgrade failed)
updateStatus.clearAllConditions(master)
setCondition(master, updateStatus.Inprogress)
Expand Down Expand Up @@ -120,7 +120,10 @@ func downgradeForBackend(master *v1alpha1.CDAPMaster) ([]reconciler.Object, erro
return []reconciler.Object{}, nil
}

func upgradeForBackend(master *v1alpha1.CDAPMaster, labels map[string]string, observed []reconciler.Object) ([]reconciler.Object, error) {
func upgradeForBackend(master *v1alpha1.CDAPMaster, labels map[string]string, observed []reconciler.Object, patchRevision bool) ([]reconciler.Object, error) {
// Skip pre-upgrade and post-upgrade jobs for patch revisions
skipPreUpgrade := patchRevision && !(master.Spec.Config[confSkipPreUpgrade] == "false")

// Find either pre- or post- upgrade job
findJob := func(jobName string) *batchv1.Job {
var job *batchv1.Job = nil
Expand Down Expand Up @@ -154,46 +157,59 @@ func upgradeForBackend(master *v1alpha1.CDAPMaster, labels map[string]string, ob
return jobObj
}

// First, run pre-upgrade job
//
// Note that pre-upgrade job doesn't have an "activeDeadlineSeconds" set it on, so it will
// try as many as imageVersionUpgradeJobMaxRetryCount times before giving up. If we ever
// needed to set an overall deadline for the pre-upgrade job, the logic below needs to check
// deadline exceeded condition on job's status
if !isConditionTrue(master, updateStatus.PreUpgradeSucceeded) {
log.Printf("Version update: pre-upgrade job not completed")
preJobName := getPreUpgradeJobName(master.Status.UpgradeStartTimeMillis)
preJobSpec := buildPreUpgradeJobSpec(getPreUpgradeJobName(master.Status.UpgradeStartTimeMillis), master, labels)
job := findJob(preJobName)
if job == nil {
obj, err := createJob(preJobSpec)
if err != nil {
return nil, err
if !skipPreUpgrade {
// First, run pre-upgrade job
//
// Note that pre-upgrade job doesn't have an "activeDeadlineSeconds" set it on, so it will
// try as many as imageVersionUpgradeJobMaxRetryCount times before giving up. If we ever
// needed to set an overall deadline for the pre-upgrade job, the logic below needs to check
// deadline exceeded condition on job's status
if !isConditionTrue(master, updateStatus.PreUpgradeSucceeded) {
log.Printf("Version update: pre-upgrade job not completed")
preJobName := getPreUpgradeJobName(master.Status.UpgradeStartTimeMillis)
preJobSpec := buildPreUpgradeJobSpec(getPreUpgradeJobName(master.Status.UpgradeStartTimeMillis), master, labels)
job := findJob(preJobName)
if job == nil {
obj, err := createJob(preJobSpec)
if err != nil {
return nil, err
}
log.Printf("Version update: creating pre-upgrade job")
return []reconciler.Object{*obj}, nil
} else if job.Status.Succeeded > 0 {
setCondition(master, updateStatus.PreUpgradeSucceeded)
log.Printf("Version update: pre-upgrade job succeeded")
// Return empty to delete preUpgrade jobObj
return []reconciler.Object{}, nil
} else if job.Status.Failed > imageVersionUpgradeJobMaxRetryCount {
setCondition(master, updateStatus.PreUpgradeFailed)
setCondition(master, updateStatus.UpgradeFailed)
clearCondition(master, updateStatus.Inprogress)
log.Printf("Version update: pre-upgrade job failed, exceeded max retries.")
return []reconciler.Object{}, nil
} else {
log.Printf("Version update: pre-upgrade job inprogress.")
return []reconciler.Object{*buildObject(job)}, nil
}
log.Printf("Version update: creating pre-upgrade job")
return []reconciler.Object{*obj}, nil
} else if job.Status.Succeeded > 0 {
setCondition(master, updateStatus.PreUpgradeSucceeded)
log.Printf("Version update: pre-upgrade job succeeded")
// Return empty to delete preUpgrade jobObj
return []reconciler.Object{}, nil
} else if job.Status.Failed > imageVersionUpgradeJobMaxRetryCount {
setCondition(master, updateStatus.PreUpgradeFailed)
setCondition(master, updateStatus.UpgradeFailed)
clearCondition(master, updateStatus.Inprogress)
log.Printf("Version update: pre-upgrade job failed, exceeded max retries.")
return []reconciler.Object{}, nil
} else {
log.Printf("Version update: pre-upgrade job inprogress.")
return []reconciler.Object{*buildObject(job)}, nil
}
}

// Then, actually update the image version
if !isConditionTrue(master, updateStatus.VersionUpdated) {
// If it's a patch revision, skip the pre and post upgrade jobs. Mark the update as succeeded.
if skipPreUpgrade {
log.Printf("Version update: patch revision detected, skipping pre-upgrade and post-upgrade jobs.")
}

setImageToUse(master)
setCondition(master, updateStatus.VersionUpdated)
log.Printf("Version update: set new version.")

if skipPreUpgrade {
setCondition(master, updateStatus.UpgradeSucceeded)
clearCondition(master, updateStatus.Inprogress)
log.Printf("Version update: upgrade succeeded.")
}
return []reconciler.Object{}, nil
}

Expand Down Expand Up @@ -406,9 +422,9 @@ func parseImageString(imageString string) (*Version, error) {
}

// compare two parsed versions
// -1: left < right
// n: left > right, nth component differs (1-indexed)
// 0: left = right
// 1: left > right
// -n: left < right, nth component differs (1-indexed)
func compareVersion(l, r *Version) int {
if l.latest && r.latest {
return 0
Expand All @@ -418,30 +434,29 @@ func compareVersion(l, r *Version) int {
return -1
}

i := 0
j := 0
for i < len(l.components) && j < len(r.components) {
if l.components[i] > r.components[j] {
return 1
} else if l.components[i] < r.components[j] {
return -1
}
i++
j++
lenL, lenR := len(l.components), len(r.components)
maxLen := lenL
if lenR > lenL {
maxLen = lenR
}
for i < len(l.components) {
if l.components[i] > 0 {
return 1

for i := 0; i < maxLen; i++ {
valL, valR := 0, 0
if i < lenL {
valL = l.components[i]
}
i++
}
for j < len(r.components) {
if r.components[j] > 0 {
return 1
if i < lenR {
valR = r.components[i]
}

if valL > valR {
return i + 1 // Return positive index (1-based) for left > right
} else if valL < valR {
return -(i + 1) // Return negative index (1-based) for left < right
}
j++
}
return 0

return 0 // Versions are equal
}

//////////////////////////////////
Expand Down Expand Up @@ -504,12 +519,12 @@ func getCurrentTimeMs() int64 {

// The returned name is just the suffix of actual k8s object name, as we prepend it with const string + CR name
func getPreUpgradeJobName(startTimeMs int64) string {
return fmt.Sprintf("pre-upgrade-job-%d", startTimeMs / 1000)
return fmt.Sprintf("pre-upgrade-job-%d", startTimeMs/1000)
}

// The returned name is just the suffix of actual k8s object name, as we prepend it with const string + CR name
func getPostUpgradeJobName(startTimeMs int64) string {
return fmt.Sprintf("post-upgrade-job-%d", startTimeMs / 1000)
return fmt.Sprintf("post-upgrade-job-%d", startTimeMs/1000)
}

// Return pre-upgrade job spec
Expand Down
67 changes: 56 additions & 11 deletions controllers/version_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,26 @@ var _ = Describe("Controller Suite", func() {
Expect(version.latest).To(BeFalse())
Expect(version.components).To(Equal([]int{6, 0, 0, 0}))
})
It("Compare image versions", func() {
It("Compare same image versions", func() {
imagePairs := []Pair{
Pair{"gcr.io/cdapio/cdap:latest", "gcr.io/cdapio/cdap:latest"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0.0"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6"},
}
for _, imagePair := range imagePairs {
first, err := parseImageString(imagePair.first.(string))
Expect(err).To(BeNil())
second, err := parseImageString(imagePair.second.(string))
Expect(err).To(BeNil())
Expect(compareVersion(first, second)).To(Equal(0))
}
})
It("Compare image versions for difference in 1st component", func() {
imagePairs := []Pair{
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:latest"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0.0.1"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.1.0"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:7.0.0.0"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:7.1.0"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:7"},
}
for _, imagePair := range imagePairs {
Expand All @@ -55,19 +70,49 @@ var _ = Describe("Controller Suite", func() {
Expect(compareVersion(high, low)).To(Equal(1))
}
})
It("Compare same image versions", func() {
It("Compare image versions for difference in 2nd component", func() {
imagePairs := []Pair{
Pair{"gcr.io/cdapio/cdap:latest", "gcr.io/cdapio/cdap:latest"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0.0"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.1.0.0"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.1.0"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.1.2"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.1"},
}
for _, imagePair := range imagePairs {
first, err := parseImageString(imagePair.first.(string))
low, err := parseImageString(imagePair.first.(string))
Expect(err).To(BeNil())
second, err := parseImageString(imagePair.second.(string))
high, err := parseImageString(imagePair.second.(string))
Expect(err).To(BeNil())
Expect(compareVersion(first, second)).To(Equal(0))
Expect(compareVersion(low, high)).To(Equal(-2))
Expect(compareVersion(high, low)).To(Equal(2))
}
})
It("Compare image versions for difference in 3rd component", func() {
imagePairs := []Pair{
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0.1.0"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0.1.2"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0.1"},
}
for _, imagePair := range imagePairs {
low, err := parseImageString(imagePair.first.(string))
Expect(err).To(BeNil())
high, err := parseImageString(imagePair.second.(string))
Expect(err).To(BeNil())
Expect(compareVersion(low, high)).To(Equal(-3))
Expect(compareVersion(high, low)).To(Equal(3))
}
})
It("Compare image versions for difference in 4th component", func() {
imagePairs := []Pair{
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0.0.1"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0.0.2"},
}
for _, imagePair := range imagePairs {
low, err := parseImageString(imagePair.first.(string))
Expect(err).To(BeNil())
high, err := parseImageString(imagePair.second.(string))
Expect(err).To(BeNil())
Expect(compareVersion(low, high)).To(Equal(-4))
Expect(compareVersion(high, low)).To(Equal(4))
}
})
It("Fail to parse invalid image string", func() {
Expand Down

0 comments on commit f949c2b

Please sign in to comment.