diff --git a/client/clienttest/runtime.go b/client/clienttest/runtime.go index efbb5bf..1c9fc87 100644 --- a/client/clienttest/runtime.go +++ b/client/clienttest/runtime.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "os/exec" + "sync" "time" "github.com/containerd/containerd" @@ -35,6 +36,7 @@ type runtime struct { followWaitTime time.Duration images map[string]images.Image containers map[string]*model.Container + mu sync.Mutex } // NewRuntime create a mock runtime, any images or containers record to memory only @@ -91,6 +93,9 @@ func (r *runtime) GetImage(ctx context.Context, ref string) (*images.Image, bool } func (r *runtime) CreateContainer(ctx context.Context, container *model.Container, follow bool) error { + r.mu.Lock() + defer r.mu.Unlock() + if _, ok := r.containers[container.Name]; ok { return fmt.Errorf("container with name %s exist", container.Name) } @@ -103,6 +108,9 @@ func (r *runtime) CreateContainer(ctx context.Context, container *model.Containe } func (r *runtime) UpdateContainer(_ context.Context, container *model.Container, _ *client.ContainerUpdateOptions) error { + r.mu.Lock() + defer r.mu.Unlock() + if _, ok := r.containers[container.Name]; !ok { return fmt.Errorf("container with name %s not exist", container.Name) } @@ -111,11 +119,17 @@ func (r *runtime) UpdateContainer(_ context.Context, container *model.Container, } func (r *runtime) RemoveContainer(ctx context.Context, containerID string) error { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.containers, containerID) return nil } func (r *runtime) GetContainer(ctx context.Context, containerID string) (*model.Container, error) { + r.mu.Lock() + defer r.mu.Unlock() + container, ok := r.containers[containerID] if !ok { return nil, fmt.Errorf("container %s not found", containerID) @@ -124,6 +138,9 @@ func (r *runtime) GetContainer(ctx context.Context, containerID string) (*model. } func (r *runtime) ListContainers(ctx context.Context) ([]*model.Container, error) { + r.mu.Lock() + defer r.mu.Unlock() + containers := make([]*model.Container, 0, len(r.containers)) for _, container := range r.containers { containers = append(containers, container) @@ -132,6 +149,9 @@ func (r *runtime) ListContainers(ctx context.Context) ([]*model.Container, error } func (r *runtime) GetContainerStatus(ctx context.Context, containerID string) (client.ContainerStatus, error) { + r.mu.Lock() + defer r.mu.Unlock() + _, ok := r.containers[containerID] if !ok { return client.ContainerStatus{}, fmt.Errorf("container %s not found", containerID) diff --git a/plugin/executor.go b/plugin/executor.go index b047a2d..33bb814 100644 --- a/plugin/executor.go +++ b/plugin/executor.go @@ -487,47 +487,44 @@ func (w *executor) loadContainerProbe(probe *model.ContainerProbe) *model.Contai } func (w *executor) runContainers(ctx context.Context, containers ...model.ContainerDefinition) error { + group := sync.NewGroup(0) + for item := range containers { // fix: Implicit memory aliasing in for loop c := containers[item] - updatePolicyMode := model.UpdatePolicyModeRestart - if !GetForceUpdate(ctx) && c.UpdatePolicy != nil && c.UpdatePolicy.OnNoChange != "" { - updatePolicyMode = c.UpdatePolicy.OnNoChange - } - mc := toRuntimeContainer(&c, model.RestartPolicyAlways) - switch updatePolicyMode { - case model.UpdatePolicyModeSkip: - can, err := canSkipRestart(ctx, w.runtime, mc) - if err != nil { - return err + group.Go(func() error { + updatePolicyMode := model.UpdatePolicyModeRestart + if !GetForceUpdate(ctx) && c.UpdatePolicy != nil && c.UpdatePolicy.OnNoChange != "" { + updatePolicyMode = c.UpdatePolicy.OnNoChange } - if can { - w.Infof("update container %s and skip restart", c.Name) - err = w.runtime.UpdateContainer(ctx, mc, &client.ContainerUpdateOptions{}) + mc := toRuntimeContainer(&c, model.RestartPolicyAlways) + switch updatePolicyMode { + case model.UpdatePolicyModeSkip: + can, err := canSkipRestart(ctx, w.runtime, mc) if err != nil { return err } - continue - } - fallthrough - case model.UpdatePolicyModeRestart: - if err := w.doContainerPreRestart(ctx, c); err != nil { - return fmt.Errorf("pre-restart container %s: %w", c.Name, err) - } - if _, err := w.runtime.GetContainer(ctx, c.Name); err == nil || !errdefs.IsNotFound(err) { - w.Infof("remove container %s from containerd", c.Name) - _ = w.runtime.RemoveContainer(ctx, c.Name) - } - w.Infof("start and run container %s", c.Name) - err := w.runtime.CreateContainer(ctx, mc, false) - if err != nil { - return err + if can { + w.Infof("update container %s and skip restart", c.Name) + return w.runtime.UpdateContainer(ctx, mc, &client.ContainerUpdateOptions{}) + } + fallthrough + case model.UpdatePolicyModeRestart: + if err := w.doContainerPreRestart(ctx, c); err != nil { + return fmt.Errorf("pre-restart container %s: %w", c.Name, err) + } + if _, err := w.runtime.GetContainer(ctx, c.Name); err == nil || !errdefs.IsNotFound(err) { + w.Infof("remove container %s from containerd", c.Name) + _ = w.runtime.RemoveContainer(ctx, c.Name) + } + w.Infof("start and run container %s", c.Name) + return w.runtime.CreateContainer(ctx, mc, false) + default: + return fmt.Errorf("unknown update policy mode: %s", updatePolicyMode) } - default: - return fmt.Errorf("unknown update policy mode: %s", updatePolicyMode) - } + }) } - return nil + return group.WaitResult() } func (w *executor) removeAllInNamespace(ctx context.Context) error {