Skip to content

Commit 5002f21

Browse files
feat(syncwaves): add DAG ordering for syncwaves
Signed-off-by: SebastienFelix <sebastien.felix3@gmail.com>
1 parent b74cf45 commit 5002f21

File tree

12 files changed

+472
-58
lines changed

12 files changed

+472
-58
lines changed

controller/sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,7 @@ func hasSharedResourceCondition(app *v1alpha1.Application) (bool, string) {
571571
// Note, this is not foolproof, since a proper fix would require the CRD record
572572
// status.observedGeneration coupled with a health.lua that verifies
573573
// status.observedGeneration == metadata.generation
574-
func delayBetweenSyncWaves(_ common.SyncPhase, _ int, finalWave bool) error {
574+
func delayBetweenSyncWaves(_ []common.SyncIdentity, finalWave bool) error {
575575
if !finalWave {
576576
delaySec := 2
577577
if delaySecStr := os.Getenv(EnvVarSyncWaveDelay); delaySecStr != "" {

docs/user-guide/sync-waves.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,14 @@ It repeats this process until all phases and waves are in-sync and healthy.
8080

8181
Because an application can have resources that are unhealthy in the first wave, it may be that the app can never get to healthy.
8282

83+
## How Sync Waves Groups work?
84+
85+
On top of Sync waves, Argo CD offers a way to group resources belonging to a same component (examples : Kafka, UI, Database, MyCusonComponent, ...). These are sync wave groups. They are defined by the argocd.argoproj.io/sync-wave-group annotation. The value is an integer that defines the component of which the resource belongs. Sync Wave groups behave like apps within the main app. Resources within a same Sync Wave group will be synced according to their Sync wave's values.
86+
87+
It is possible to define dependencies between Sync Wave groups. These are sync wave group dependencies. They are defined at resource level by the argocd.argoproj.io/sync-wave-group-dependencies annotation. The value is a list of integers, separated by commas. These integers define the Sync Wave groups that need to be synced before the resource in which this annotation is defined.
88+
89+
Note that in order to avoid circular dependencies, values defined in argocd.argoproj.io/sync-wave-group-dependencies will only be taken into account if they are strictly less than the Sync Wave group value.
90+
8391
## How Do I Configure Phases?
8492

8593
Pre-sync and post-sync can only contain hooks. Apply the hook annotation:
@@ -100,9 +108,11 @@ Specify the wave using the following annotation:
100108
metadata:
101109
annotations:
102110
argocd.argoproj.io/sync-wave: "5"
111+
argocd.argoproj.io/sync-wave-group: "2"
112+
argocd.argoproj.io/sync-wave-group-dependencies: "0,1"
103113
```
104114
105-
Hooks and resources are assigned to wave zero by default. The wave can be negative, so you can create a wave that runs before all other resources.
115+
Hooks and resources are assigned to wave zero and wave goup zero by default. The wave can be negative, so you can create a wave that runs before all other resources.
106116
107117
## Examples
108118

gitops-engine/pkg/sync/common/types.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ const (
1212
AnnotationSyncOptions = "argocd.argoproj.io/sync-options"
1313
// AnnotationSyncWave indicates which wave of the sync the resource or hook should be in
1414
AnnotationSyncWave = "argocd.argoproj.io/sync-wave"
15+
// AnnotationSyncWaveGroup indicates which wave of the sync the resource or hook should be in
16+
AnnotationSyncWaveGroup = "argocd.argoproj.io/sync-wave-group"
17+
// AnnotationSyncWaveGroupDependencies indicates which wave of the sync the resource or hook should be in
18+
AnnotationSyncWaveGroupDependencies = "argocd.argoproj.io/sync-wave-group-dependencies"
1519
// AnnotationKeyHook contains the hook type of a resource
1620
AnnotationKeyHook = "argocd.argoproj.io/hook"
1721
// AnnotationKeyHookDeletePolicy is the policy of deleting a hook
@@ -59,10 +63,16 @@ type PermissionValidator func(un *unstructured.Unstructured, res *metav1.APIReso
5963

6064
type SyncPhase string
6165

66+
type SyncIdentity struct {
67+
Phase SyncPhase
68+
Wave int
69+
WaveGroup int
70+
}
71+
6272
// SyncWaveHook is a callback function which will be invoked after each sync wave is successfully
6373
// applied during a sync operation. The callback indicates which phase and wave it had just
6474
// executed, and whether or not that wave was the final one.
65-
type SyncWaveHook func(phase SyncPhase, wave int, final bool) error
75+
type SyncWaveHook func(t []SyncIdentity, final bool) error
6676

6777
const (
6878
SyncPhasePreSync = "PreSync"

gitops-engine/pkg/sync/doc.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ that runs before all other resources. The `argocd.argoproj.io/sync-wave` annotat
7575
annotations:
7676
argocd.argoproj.io/sync-wave: "5"
7777
78+
# Sync Groups
79+
80+
The wave groups allow to define independant/dependent sync processes
81+
7882
# Sync Options
7983
8084
The sync options allows customizing the synchronization of selected resources. The options are specified using the
@@ -89,6 +93,7 @@ How Does It Work Together?
8993
Syncing process orders the resources in the following precedence:
9094
9195
- The phase
96+
- The group with respect to group dependencies
9297
- The wave they are in (lower values first)
9398
- By kind (e.g. namespaces first)
9499
- By name

gitops-engine/pkg/sync/sync_context.go

Lines changed: 54 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"slices"
78
"sort"
89
"strings"
910
"sync"
@@ -560,26 +561,29 @@ func (sc *syncContext) Sync() {
560561
return
561562
}
562563

563-
// remove any tasks not in this wave
564+
// remove any tasks which have unsynced dependencies
564565
phase := tasks.phase()
565-
wave := tasks.wave()
566-
finalWave := phase == tasks.lastPhase() && wave == tasks.lastWave()
566+
independantSyncIdentities := tasks.independantSyncIdentities()
567+
allSyncIdentities := tasks.syncIdentities()
567568

568569
// if it is the last phase/wave and the only remaining tasks are non-hooks, the we are successful
569570
// EVEN if those objects subsequently degraded
570571
// This handles the common case where neither hooks or waves are used and a sync equates to simply an (asynchronous) kubectl apply of manifests, which succeeds immediately.
571-
remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || wave != t.wave() || t.isHook() })
572+
remainingTasks := tasks.Filter(func(t *syncTask) bool {
573+
return !slices.Contains(independantSyncIdentities, t.identity()) || t.isHook()
574+
})
572575

573-
sc.log.WithValues("phase", phase, "wave", wave, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave")
574-
tasks = tasks.Filter(func(t *syncTask) bool { return t.phase == phase && t.wave() == wave })
576+
sc.log.WithValues("phase", phase, "independantSyncIdentities", independantSyncIdentities, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave")
577+
tasks = tasks.Filter(func(t *syncTask) bool { return slices.Contains(independantSyncIdentities, t.identity()) })
575578

576579
sc.setOperationPhase(common.OperationRunning, "one or more tasks are running")
577580

578581
sc.log.WithValues("tasks", tasks).V(1).Info("Wet-run")
579582
runState := sc.runTasks(tasks, false)
580583

581584
if sc.syncWaveHook != nil && runState != failed {
582-
err := sc.syncWaveHook(phase, wave, finalWave)
585+
finalWave := phase == tasks.lastPhase() && len(independantSyncIdentities) == len(allSyncIdentities)
586+
err := sc.syncWaveHook(independantSyncIdentities, finalWave)
583587
if err != nil {
584588
sc.deleteHooks(hooksPendingDeletionFailed)
585589
sc.setOperationPhase(common.OperationFailed, fmt.Sprintf("SyncWaveHook failed: %v", err))
@@ -909,52 +913,61 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
909913
}
910914

911915
// for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order)
912-
pruneTasks := make(map[int][]*syncTask)
916+
917+
tasksByWaveGroup := make(map[int][]*syncTask)
913918
for _, task := range tasks {
914-
if task.isPrune() {
915-
pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task)
916-
}
919+
tasksByWaveGroup[task.waveGroup()] = append(tasksByWaveGroup[task.waveGroup()], task)
917920
}
921+
for waveGroup := range tasksByWaveGroup {
922+
pruneTasks := make(map[int][]*syncTask)
923+
for _, task := range tasksByWaveGroup[waveGroup] {
924+
if task.isPrune() {
925+
pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task)
926+
}
927+
}
918928

919-
var uniquePruneWaves []int
920-
for k := range pruneTasks {
921-
uniquePruneWaves = append(uniquePruneWaves, k)
922-
}
923-
sort.Ints(uniquePruneWaves)
929+
var uniquePruneWaves []int
930+
for k := range pruneTasks {
931+
uniquePruneWaves = append(uniquePruneWaves, k)
932+
}
933+
sort.Ints(uniquePruneWaves)
924934

925-
// reorder waves for pruning tasks using symmetric swap on prune waves
926-
n := len(uniquePruneWaves)
927-
for i := 0; i < n/2; i++ {
928-
// waves to swap
929-
startWave := uniquePruneWaves[i]
930-
endWave := uniquePruneWaves[n-1-i]
935+
// reorder waves for pruning tasks using symmetric swap on prune waves
936+
n := len(uniquePruneWaves)
937+
for j := 0; j < n/2; j++ {
938+
// waves to swap
939+
startWave := uniquePruneWaves[j]
940+
endWave := uniquePruneWaves[n-1-j]
931941

932-
for _, task := range pruneTasks[startWave] {
933-
task.waveOverride = &endWave
934-
}
942+
for _, task := range pruneTasks[startWave] {
943+
task.waveOverride = &endWave
944+
}
935945

936-
for _, task := range pruneTasks[endWave] {
937-
task.waveOverride = &startWave
946+
for _, task := range pruneTasks[endWave] {
947+
task.waveOverride = &startWave
948+
}
938949
}
939-
}
940950

941-
// for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1
942-
// to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave
943-
syncPhaseLastWave := 0
944-
for _, task := range tasks {
945-
if task.phase == common.SyncPhaseSync {
946-
if task.wave() > syncPhaseLastWave {
947-
syncPhaseLastWave = task.wave()
951+
// for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1
952+
// to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave
953+
954+
syncPhaseLastWave := 0
955+
for _, task := range tasksByWaveGroup[waveGroup] {
956+
if task.phase == common.SyncPhaseSync {
957+
if task.wave() > syncPhaseLastWave {
958+
syncPhaseLastWave = task.wave()
959+
}
948960
}
949961
}
950-
}
951-
syncPhaseLastWave = syncPhaseLastWave + 1
962+
syncPhaseLastWave = syncPhaseLastWave + 1
952963

953-
for _, task := range tasks {
954-
if task.isPrune() &&
955-
(sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) {
956-
task.waveOverride = &syncPhaseLastWave
964+
for _, task := range tasksByWaveGroup[waveGroup] {
965+
if task.isPrune() &&
966+
(sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) {
967+
task.waveOverride = &syncPhaseLastWave
968+
}
957969
}
970+
958971
}
959972

960973
tasks.Sort()

0 commit comments

Comments
 (0)