Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions client/clienttest/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"os/exec"
"sync"
"time"

"github.com/containerd/containerd"
Expand All @@ -35,6 +36,7 @@ type runtime struct {
followWaitTime time.Duration
images map[string]images.Image
containers map[string]*model.Container
mu sync.Mutex
}

// NewRuntime create a mock runtime, any images or containers record to memory only
Expand Down Expand Up @@ -91,6 +93,9 @@ func (r *runtime) GetImage(ctx context.Context, ref string) (*images.Image, bool
}

func (r *runtime) CreateContainer(ctx context.Context, container *model.Container, follow bool) error {
r.mu.Lock()
defer r.mu.Unlock()

if _, ok := r.containers[container.Name]; ok {
return fmt.Errorf("container with name %s exist", container.Name)
}
Expand All @@ -103,6 +108,9 @@ func (r *runtime) CreateContainer(ctx context.Context, container *model.Containe
}

func (r *runtime) UpdateContainer(_ context.Context, container *model.Container, _ *client.ContainerUpdateOptions) error {
r.mu.Lock()
defer r.mu.Unlock()

if _, ok := r.containers[container.Name]; !ok {
return fmt.Errorf("container with name %s not exist", container.Name)
}
Expand All @@ -111,11 +119,17 @@ func (r *runtime) UpdateContainer(_ context.Context, container *model.Container,
}

func (r *runtime) RemoveContainer(ctx context.Context, containerID string) error {
r.mu.Lock()
defer r.mu.Unlock()

delete(r.containers, containerID)
return nil
}

func (r *runtime) GetContainer(ctx context.Context, containerID string) (*model.Container, error) {
r.mu.Lock()
defer r.mu.Unlock()

container, ok := r.containers[containerID]
if !ok {
return nil, fmt.Errorf("container %s not found", containerID)
Expand All @@ -124,6 +138,9 @@ func (r *runtime) GetContainer(ctx context.Context, containerID string) (*model.
}

func (r *runtime) ListContainers(ctx context.Context) ([]*model.Container, error) {
r.mu.Lock()
defer r.mu.Unlock()

containers := make([]*model.Container, 0, len(r.containers))
for _, container := range r.containers {
containers = append(containers, container)
Expand All @@ -132,6 +149,9 @@ func (r *runtime) ListContainers(ctx context.Context) ([]*model.Container, error
}

func (r *runtime) GetContainerStatus(ctx context.Context, containerID string) (client.ContainerStatus, error) {
r.mu.Lock()
defer r.mu.Unlock()

_, ok := r.containers[containerID]
if !ok {
return client.ContainerStatus{}, fmt.Errorf("container %s not found", containerID)
Expand Down
61 changes: 29 additions & 32 deletions plugin/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,47 +487,44 @@ func (w *executor) loadContainerProbe(probe *model.ContainerProbe) *model.Contai
}

func (w *executor) runContainers(ctx context.Context, containers ...model.ContainerDefinition) error {
group := sync.NewGroup(0)

for item := range containers {
// fix: Implicit memory aliasing in for loop
c := containers[item]
updatePolicyMode := model.UpdatePolicyModeRestart
if !GetForceUpdate(ctx) && c.UpdatePolicy != nil && c.UpdatePolicy.OnNoChange != "" {
updatePolicyMode = c.UpdatePolicy.OnNoChange
}
mc := toRuntimeContainer(&c, model.RestartPolicyAlways)
switch updatePolicyMode {
case model.UpdatePolicyModeSkip:
can, err := canSkipRestart(ctx, w.runtime, mc)
if err != nil {
return err
group.Go(func() error {
updatePolicyMode := model.UpdatePolicyModeRestart
if !GetForceUpdate(ctx) && c.UpdatePolicy != nil && c.UpdatePolicy.OnNoChange != "" {
updatePolicyMode = c.UpdatePolicy.OnNoChange
}
if can {
w.Infof("update container %s and skip restart", c.Name)
err = w.runtime.UpdateContainer(ctx, mc, &client.ContainerUpdateOptions{})
mc := toRuntimeContainer(&c, model.RestartPolicyAlways)
switch updatePolicyMode {
case model.UpdatePolicyModeSkip:
can, err := canSkipRestart(ctx, w.runtime, mc)
if err != nil {
return err
}
continue
}
fallthrough
case model.UpdatePolicyModeRestart:
if err := w.doContainerPreRestart(ctx, c); err != nil {
return fmt.Errorf("pre-restart container %s: %w", c.Name, err)
}
if _, err := w.runtime.GetContainer(ctx, c.Name); err == nil || !errdefs.IsNotFound(err) {
w.Infof("remove container %s from containerd", c.Name)
_ = w.runtime.RemoveContainer(ctx, c.Name)
}
w.Infof("start and run container %s", c.Name)
err := w.runtime.CreateContainer(ctx, mc, false)
if err != nil {
return err
if can {
w.Infof("update container %s and skip restart", c.Name)
return w.runtime.UpdateContainer(ctx, mc, &client.ContainerUpdateOptions{})
}
fallthrough
case model.UpdatePolicyModeRestart:
if err := w.doContainerPreRestart(ctx, c); err != nil {
return fmt.Errorf("pre-restart container %s: %w", c.Name, err)
}
if _, err := w.runtime.GetContainer(ctx, c.Name); err == nil || !errdefs.IsNotFound(err) {
w.Infof("remove container %s from containerd", c.Name)
_ = w.runtime.RemoveContainer(ctx, c.Name)
}
w.Infof("start and run container %s", c.Name)
return w.runtime.CreateContainer(ctx, mc, false)
default:
return fmt.Errorf("unknown update policy mode: %s", updatePolicyMode)
}
default:
return fmt.Errorf("unknown update policy mode: %s", updatePolicyMode)
}
})
}
return nil
return group.WaitResult()
}

func (w *executor) removeAllInNamespace(ctx context.Context) error {
Expand Down
Loading