Skip to content

Commit

Permalink
fix: --fail-fast deadlock (#204)
Browse files Browse the repository at this point in the history
  • Loading branch information
raffis authored Sep 11, 2024
1 parent 7374eb2 commit 5bfcd44
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 321 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ require (
github.com/alibabacloud-go/tea-utils v1.4.5 // indirect
github.com/alibabacloud-go/tea-utils/v2 v2.0.6 // indirect
github.com/alibabacloud-go/tea-xml v1.1.3 // indirect
github.com/alitto/pond v1.9.2 // indirect
github.com/aliyun/credentials-go v1.3.9 // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
github.com/aws/aws-sdk-go-v2 v1.30.5 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ github.com/alibabacloud-go/tea-utils/v2 v2.0.6/go.mod h1:qxn986l+q33J5VkialKMqT/
github.com/alibabacloud-go/tea-xml v1.1.2/go.mod h1:Rq08vgCcCAjHyRi/M7xlHKUykZCEtyBy9+DPF6GgEu8=
github.com/alibabacloud-go/tea-xml v1.1.3 h1:7LYnm+JbOq2B+T/B0fHC4Ies4/FofC4zHzYtqw7dgt0=
github.com/alibabacloud-go/tea-xml v1.1.3/go.mod h1:Rq08vgCcCAjHyRi/M7xlHKUykZCEtyBy9+DPF6GgEu8=
github.com/alitto/pond v1.9.2 h1:9Qb75z/scEZVCoSU+osVmQ0I0JOeLfdTDafrbcJ8CLs=
github.com/alitto/pond v1.9.2/go.mod h1:xQn3P/sHTYcU/1BR3i86IGIrilcrGC2LiS+E2+CJWsI=
github.com/aliyun/credentials-go v1.1.2/go.mod h1:ozcZaMR5kLM7pwtCMEpVmQ242suV6qTJya2bDq4X1Tw=
github.com/aliyun/credentials-go v1.3.6/go.mod h1:1LxUuX7L5YrZUWzBrRyk0SwSdH4OmPrib8NVePL3fxM=
github.com/aliyun/credentials-go v1.3.9 h1:xz4W+ebo2xlq5LXshm4YLz7P7ZfmQaNYGTx+Lm0HbQ4=
Expand Down
149 changes: 59 additions & 90 deletions internal/action/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"io"
"os"

"github.com/alitto/pond"
"github.com/doodlescheduling/flux-build/internal/build"
"github.com/doodlescheduling/flux-build/internal/cachemgr"
"github.com/doodlescheduling/flux-build/internal/worker"
helmv1 "github.com/fluxcd/helm-controller/api/v2beta1"
"github.com/go-logr/logr"
"helm.sh/helm/v3/pkg/chartutil"
Expand All @@ -31,33 +31,32 @@ func (a *Action) Run(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

abort := func(err error) error {
if err == nil {
return nil
}
errs := make(chan error)
var lastErr error
helmResultPool := pond.New(1, 1, pond.Context(ctx))
kustomizePool := pond.New(len(a.Paths), len(a.Paths), pond.Context(ctx))
helmPool := pond.New(a.Workers, a.Workers, pond.Context(ctx))
resourcePool := pond.New(1, 1, pond.Context(ctx))

if a.FailFast {
cancel()
defer func() {
if lastErr != nil && !a.AllowFailure {
os.Exit(1)
}
}()

return err
}

helmResultPool := worker.New(ctx, worker.PoolOptions{
Workers: 1,
})

kustomizePool := worker.New(ctx, worker.PoolOptions{
Workers: len(a.Paths),
})
go func() {
for err := range errs {
if err == nil {
continue
}

helmPool := worker.New(ctx, worker.PoolOptions{
Workers: a.Workers,
})
lastErr = err

resourcePool := worker.New(ctx, worker.PoolOptions{
Workers: 1,
})
if a.FailFast {
cancel()
}
}
}()

resources := make(chan resmap.ResMap, len(a.Paths))
manifests := make(chan resmap.ResMap, a.Workers)
Expand All @@ -68,110 +67,80 @@ func (a *Action) Run(ctx context.Context) error {
Cache: a.Cache,
})

helmResultPool.Push(worker.Task(func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case index, ok := <-manifests:
if !ok {
return nil
}

y, err := index.AsYaml()
if err != nil {
a.Logger.Error(err, "failed to encode as yaml")
return abort(err)
}

_, err = a.Output.Write(append([]byte("---\n"), y...))
if err != nil {

a.Logger.Error(err, "failed to write helm manifests to output")
return abort(err)
}
helmResultPool.Submit(func() {
for index := range manifests {
y, err := index.AsYaml()
if err != nil {
a.Logger.Error(err, "failed to encode as yaml")
errs <- err
continue
}

_, err = a.Output.Write(append([]byte("---\n"), y...))
if err != nil {
a.Logger.Error(err, "failed to write helm manifests to output")
errs <- err
continue
}
}
}))
})

for _, path := range a.Paths {
p := path
a.Logger.Info("build kustomize path", "path", p)

kustomizePool.Push(worker.Task(func(ctx context.Context) error {
kustomizePool.Submit(func() {
if index, err := build.Kustomize(ctx, p); err != nil {
a.Logger.Error(err, "failed build kustomization", "path", p)
return abort(err)
errs <- err
} else {
manifests <- index
resources <- index
}

return nil
}))
})
}

index := make(build.ResourceIndex)
resourcePool.Push(worker.Task(func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case build, ok := <-resources:
if !ok {
return nil
}

if err := index.Push(build.Resources()); err != nil {
return abort(err)
}

if ctx.Err() != nil {
return nil
}
resourcePool.Submit(func() {
for build := range resources {
if err := index.Push(build.Resources()); err != nil {
errs <- err
continue
}
}
}))
})

a.exit(kustomizePool)
kustomizePool.StopAndWait()
close(resources)
a.exit(resourcePool)
resourcePool.StopAndWait()

for _, r := range index {
res := r
if r.GetKind() != helmv1.HelmReleaseKind {
continue
}

helmPool.Push(worker.Task(func(ctx context.Context) error {
if ctx.Err() != nil {
break
}

helmPool.Submit(func() {
a.Logger.Info("build helm release", "namespace", res.GetNamespace(), "name", res.GetName())
index, err := helmBuilder.Build(ctx, res, index)
if err != nil {
a.Logger.Error(err, "failed build helmrelease", "namespace", res.GetNamespace(), "name", res.GetName())
return abort(err)
}

if ctx.Err() != nil {
return nil
errs <- err
return
}

manifests <- index
return nil
}))
})
}

a.exit(helmPool)
helmPool.StopAndWait()
close(manifests)
a.exit(helmResultPool)
helmResultPool.StopAndWait()
close(errs)

return nil
}

func (a *Action) exit(waiters ...worker.Waiter) {
for _, w := range waiters {
err := w.Wait()
if err != nil && !a.AllowFailure {
os.Exit(1)
}
}
}
146 changes: 0 additions & 146 deletions internal/worker/pool.go

This file was deleted.

Loading

0 comments on commit 5bfcd44

Please sign in to comment.