Skip to content

Commit 6581b1e

Browse files
committed
feat: make container creation concurrently
1 parent 0561ced commit 6581b1e

File tree

2 files changed

+83
-45
lines changed

2 files changed

+83
-45
lines changed

client/clienttest/runtime.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"os/exec"
23+
"sync"
2324
"time"
2425

2526
"github.com/containerd/containerd"
@@ -35,6 +36,7 @@ type runtime struct {
3536
followWaitTime time.Duration
3637
images map[string]images.Image
3738
containers map[string]*model.Container
39+
mu sync.Mutex
3840
}
3941

4042
// NewRuntime create a mock runtime, any images or containers record to memory only
@@ -91,6 +93,9 @@ func (r *runtime) GetImage(ctx context.Context, ref string) (*images.Image, bool
9193
}
9294

9395
func (r *runtime) CreateContainer(ctx context.Context, container *model.Container, follow bool) error {
96+
r.mu.Lock()
97+
defer r.mu.Unlock()
98+
9499
if _, ok := r.containers[container.Name]; ok {
95100
return fmt.Errorf("container with name %s exist", container.Name)
96101
}
@@ -103,6 +108,9 @@ func (r *runtime) CreateContainer(ctx context.Context, container *model.Containe
103108
}
104109

105110
func (r *runtime) UpdateContainer(_ context.Context, container *model.Container, _ *client.ContainerUpdateOptions) error {
111+
r.mu.Lock()
112+
defer r.mu.Unlock()
113+
106114
if _, ok := r.containers[container.Name]; !ok {
107115
return fmt.Errorf("container with name %s not exist", container.Name)
108116
}
@@ -111,11 +119,17 @@ func (r *runtime) UpdateContainer(_ context.Context, container *model.Container,
111119
}
112120

113121
func (r *runtime) RemoveContainer(ctx context.Context, containerID string) error {
122+
r.mu.Lock()
123+
defer r.mu.Unlock()
124+
114125
delete(r.containers, containerID)
115126
return nil
116127
}
117128

118129
func (r *runtime) GetContainer(ctx context.Context, containerID string) (*model.Container, error) {
130+
r.mu.Lock()
131+
defer r.mu.Unlock()
132+
119133
container, ok := r.containers[containerID]
120134
if !ok {
121135
return nil, fmt.Errorf("container %s not found", containerID)
@@ -124,6 +138,9 @@ func (r *runtime) GetContainer(ctx context.Context, containerID string) (*model.
124138
}
125139

126140
func (r *runtime) ListContainers(ctx context.Context) ([]*model.Container, error) {
141+
r.mu.Lock()
142+
defer r.mu.Unlock()
143+
127144
containers := make([]*model.Container, 0, len(r.containers))
128145
for _, container := range r.containers {
129146
containers = append(containers, container)
@@ -132,6 +149,9 @@ func (r *runtime) ListContainers(ctx context.Context) ([]*model.Container, error
132149
}
133150

134151
func (r *runtime) GetContainerStatus(ctx context.Context, containerID string) (client.ContainerStatus, error) {
152+
r.mu.Lock()
153+
defer r.mu.Unlock()
154+
135155
_, ok := r.containers[containerID]
136156
if !ok {
137157
return client.ContainerStatus{}, fmt.Errorf("container %s not found", containerID)

plugin/executor.go

Lines changed: 63 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -420,20 +420,28 @@ func (w *executor) removeContainersInNamespace(ctx context.Context, includes, ex
420420
}
421421

422422
func (w *executor) runAndWaitContainers(ctx context.Context, containers ...model.ContainerDefinition) error {
423+
group := sync.NewGroup(0)
424+
423425
for item := range containers {
424426
// fix: Implicit memory aliasing in for loop
425427
c := containers[item]
426428
w.Infof("start and wait container %s", c.Name)
427-
err := w.runtime.CreateContainer(ctx, toRuntimeContainer(&c, model.RestartPolicyNever), true)
428-
if err != nil {
429-
return err
430-
}
431-
err = w.runtime.RemoveContainer(ctx, c.Name)
432-
if err != nil {
433-
return err
434-
}
429+
probe := w.loadContainerProbe(c.StartupProbe)
430+
group.Go(func() error {
431+
return wait.PollImmediate(startProbeCheckInterval, time.Duration(probe.ProbeTimeout)*time.Second, func() (bool, error) {
432+
err := w.runtime.CreateContainer(ctx, toRuntimeContainer(&c, model.RestartPolicyNever), true)
433+
if err != nil {
434+
return false, err
435+
}
436+
err = w.runtime.RemoveContainer(ctx, c.Name)
437+
if err != nil {
438+
return false, err
439+
}
440+
return true, nil
441+
})
442+
})
435443
}
436-
return nil
444+
return group.WaitResult()
437445
}
438446

439447
const (
@@ -487,47 +495,57 @@ func (w *executor) loadContainerProbe(probe *model.ContainerProbe) *model.Contai
487495
}
488496

489497
func (w *executor) runContainers(ctx context.Context, containers ...model.ContainerDefinition) error {
498+
group := sync.NewGroup(0)
499+
490500
for item := range containers {
491501
// fix: Implicit memory aliasing in for loop
492502
c := containers[item]
493-
updatePolicyMode := model.UpdatePolicyModeRestart
494-
if !GetForceUpdate(ctx) && c.UpdatePolicy != nil && c.UpdatePolicy.OnNoChange != "" {
495-
updatePolicyMode = c.UpdatePolicy.OnNoChange
496-
}
497-
mc := toRuntimeContainer(&c, model.RestartPolicyAlways)
498-
switch updatePolicyMode {
499-
case model.UpdatePolicyModeSkip:
500-
can, err := canSkipRestart(ctx, w.runtime, mc)
501-
if err != nil {
502-
return err
503-
}
504-
if can {
505-
w.Infof("update container %s and skip restart", c.Name)
506-
err = w.runtime.UpdateContainer(ctx, mc, &client.ContainerUpdateOptions{})
507-
if err != nil {
508-
return err
503+
probe := w.loadContainerProbe(c.StartupProbe)
504+
group.Go(func() error {
505+
return wait.PollImmediate(startProbeCheckInterval, time.Duration(probe.ProbeTimeout)*time.Second, func() (bool, error) {
506+
w.Infof("try to handle container running process", c.Name)
507+
508+
updatePolicyMode := model.UpdatePolicyModeRestart
509+
if !GetForceUpdate(ctx) && c.UpdatePolicy != nil && c.UpdatePolicy.OnNoChange != "" {
510+
updatePolicyMode = c.UpdatePolicy.OnNoChange
509511
}
510-
continue
511-
}
512-
fallthrough
513-
case model.UpdatePolicyModeRestart:
514-
if err := w.doContainerPreRestart(ctx, c); err != nil {
515-
return fmt.Errorf("pre-restart container %s: %w", c.Name, err)
516-
}
517-
if _, err := w.runtime.GetContainer(ctx, c.Name); err == nil || !errdefs.IsNotFound(err) {
518-
w.Infof("remove container %s from containerd", c.Name)
519-
_ = w.runtime.RemoveContainer(ctx, c.Name)
520-
}
521-
w.Infof("start and run container %s", c.Name)
522-
err := w.runtime.CreateContainer(ctx, mc, false)
523-
if err != nil {
524-
return err
525-
}
526-
default:
527-
return fmt.Errorf("unknown update policy mode: %s", updatePolicyMode)
528-
}
512+
mc := toRuntimeContainer(&c, model.RestartPolicyAlways)
513+
switch updatePolicyMode {
514+
case model.UpdatePolicyModeSkip:
515+
can, err := canSkipRestart(ctx, w.runtime, mc)
516+
if err != nil {
517+
return false, err
518+
}
519+
if can {
520+
w.Infof("update container %s and skip restart", c.Name)
521+
err = w.runtime.UpdateContainer(ctx, mc, &client.ContainerUpdateOptions{})
522+
if err != nil {
523+
return false, err
524+
}
525+
return true, nil
526+
}
527+
fallthrough
528+
case model.UpdatePolicyModeRestart:
529+
if err := w.doContainerPreRestart(ctx, c); err != nil {
530+
return false, fmt.Errorf("pre-restart container %s: %w", c.Name, err)
531+
}
532+
if _, err := w.runtime.GetContainer(ctx, c.Name); err == nil || !errdefs.IsNotFound(err) {
533+
w.Infof("remove container %s from containerd", c.Name)
534+
_ = w.runtime.RemoveContainer(ctx, c.Name)
535+
}
536+
w.Infof("start and run container %s", c.Name)
537+
err := w.runtime.CreateContainer(ctx, mc, false)
538+
if err != nil {
539+
return false, err
540+
}
541+
default:
542+
return false, fmt.Errorf("unknown update policy mode: %s", updatePolicyMode)
543+
}
544+
return true, nil
545+
})
546+
})
529547
}
530-
return nil
548+
return group.WaitResult()
531549
}
532550

533551
func (w *executor) removeAllInNamespace(ctx context.Context) error {

0 commit comments

Comments
 (0)