Skip to content

Commit aa5acbc

Browse files
committed
feat: make container creation concurrently
Signed-off-by: zwtop <[email protected]>
1 parent 0561ced commit aa5acbc

File tree

2 files changed

+49
-32
lines changed

2 files changed

+49
-32
lines changed

client/clienttest/runtime.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"os/exec"
23+
"sync"
2324
"time"
2425

2526
"github.com/containerd/containerd"
@@ -35,6 +36,7 @@ type runtime struct {
3536
followWaitTime time.Duration
3637
images map[string]images.Image
3738
containers map[string]*model.Container
39+
mu sync.Mutex
3840
}
3941

4042
// NewRuntime create a mock runtime, any images or containers record to memory only
@@ -91,6 +93,9 @@ func (r *runtime) GetImage(ctx context.Context, ref string) (*images.Image, bool
9193
}
9294

9395
func (r *runtime) CreateContainer(ctx context.Context, container *model.Container, follow bool) error {
96+
r.mu.Lock()
97+
defer r.mu.Unlock()
98+
9499
if _, ok := r.containers[container.Name]; ok {
95100
return fmt.Errorf("container with name %s exist", container.Name)
96101
}
@@ -103,6 +108,9 @@ func (r *runtime) CreateContainer(ctx context.Context, container *model.Containe
103108
}
104109

105110
func (r *runtime) UpdateContainer(_ context.Context, container *model.Container, _ *client.ContainerUpdateOptions) error {
111+
r.mu.Lock()
112+
defer r.mu.Unlock()
113+
106114
if _, ok := r.containers[container.Name]; !ok {
107115
return fmt.Errorf("container with name %s not exist", container.Name)
108116
}
@@ -111,11 +119,17 @@ func (r *runtime) UpdateContainer(_ context.Context, container *model.Container,
111119
}
112120

113121
func (r *runtime) RemoveContainer(ctx context.Context, containerID string) error {
122+
r.mu.Lock()
123+
defer r.mu.Unlock()
124+
114125
delete(r.containers, containerID)
115126
return nil
116127
}
117128

118129
func (r *runtime) GetContainer(ctx context.Context, containerID string) (*model.Container, error) {
130+
r.mu.Lock()
131+
defer r.mu.Unlock()
132+
119133
container, ok := r.containers[containerID]
120134
if !ok {
121135
return nil, fmt.Errorf("container %s not found", containerID)
@@ -124,6 +138,9 @@ func (r *runtime) GetContainer(ctx context.Context, containerID string) (*model.
124138
}
125139

126140
func (r *runtime) ListContainers(ctx context.Context) ([]*model.Container, error) {
141+
r.mu.Lock()
142+
defer r.mu.Unlock()
143+
127144
containers := make([]*model.Container, 0, len(r.containers))
128145
for _, container := range r.containers {
129146
containers = append(containers, container)
@@ -132,6 +149,9 @@ func (r *runtime) ListContainers(ctx context.Context) ([]*model.Container, error
132149
}
133150

134151
func (r *runtime) GetContainerStatus(ctx context.Context, containerID string) (client.ContainerStatus, error) {
152+
r.mu.Lock()
153+
defer r.mu.Unlock()
154+
135155
_, ok := r.containers[containerID]
136156
if !ok {
137157
return client.ContainerStatus{}, fmt.Errorf("container %s not found", containerID)

plugin/executor.go

Lines changed: 29 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -487,47 +487,44 @@ func (w *executor) loadContainerProbe(probe *model.ContainerProbe) *model.Contai
487487
}
488488

489489
func (w *executor) runContainers(ctx context.Context, containers ...model.ContainerDefinition) error {
490+
group := sync.NewGroup(0)
491+
490492
for item := range containers {
491493
// fix: Implicit memory aliasing in for loop
492494
c := containers[item]
493-
updatePolicyMode := model.UpdatePolicyModeRestart
494-
if !GetForceUpdate(ctx) && c.UpdatePolicy != nil && c.UpdatePolicy.OnNoChange != "" {
495-
updatePolicyMode = c.UpdatePolicy.OnNoChange
496-
}
497-
mc := toRuntimeContainer(&c, model.RestartPolicyAlways)
498-
switch updatePolicyMode {
499-
case model.UpdatePolicyModeSkip:
500-
can, err := canSkipRestart(ctx, w.runtime, mc)
501-
if err != nil {
502-
return err
495+
group.Go(func() error {
496+
updatePolicyMode := model.UpdatePolicyModeRestart
497+
if !GetForceUpdate(ctx) && c.UpdatePolicy != nil && c.UpdatePolicy.OnNoChange != "" {
498+
updatePolicyMode = c.UpdatePolicy.OnNoChange
503499
}
504-
if can {
505-
w.Infof("update container %s and skip restart", c.Name)
506-
err = w.runtime.UpdateContainer(ctx, mc, &client.ContainerUpdateOptions{})
500+
mc := toRuntimeContainer(&c, model.RestartPolicyAlways)
501+
switch updatePolicyMode {
502+
case model.UpdatePolicyModeSkip:
503+
can, err := canSkipRestart(ctx, w.runtime, mc)
507504
if err != nil {
508505
return err
509506
}
510-
continue
511-
}
512-
fallthrough
513-
case model.UpdatePolicyModeRestart:
514-
if err := w.doContainerPreRestart(ctx, c); err != nil {
515-
return fmt.Errorf("pre-restart container %s: %w", c.Name, err)
516-
}
517-
if _, err := w.runtime.GetContainer(ctx, c.Name); err == nil || !errdefs.IsNotFound(err) {
518-
w.Infof("remove container %s from containerd", c.Name)
519-
_ = w.runtime.RemoveContainer(ctx, c.Name)
520-
}
521-
w.Infof("start and run container %s", c.Name)
522-
err := w.runtime.CreateContainer(ctx, mc, false)
523-
if err != nil {
524-
return err
507+
if can {
508+
w.Infof("update container %s and skip restart", c.Name)
509+
return w.runtime.UpdateContainer(ctx, mc, &client.ContainerUpdateOptions{})
510+
}
511+
fallthrough
512+
case model.UpdatePolicyModeRestart:
513+
if err := w.doContainerPreRestart(ctx, c); err != nil {
514+
return fmt.Errorf("pre-restart container %s: %w", c.Name, err)
515+
}
516+
if _, err := w.runtime.GetContainer(ctx, c.Name); err == nil || !errdefs.IsNotFound(err) {
517+
w.Infof("remove container %s from containerd", c.Name)
518+
_ = w.runtime.RemoveContainer(ctx, c.Name)
519+
}
520+
w.Infof("start and run container %s", c.Name)
521+
return w.runtime.CreateContainer(ctx, mc, false)
522+
default:
523+
return fmt.Errorf("unknown update policy mode: %s", updatePolicyMode)
525524
}
526-
default:
527-
return fmt.Errorf("unknown update policy mode: %s", updatePolicyMode)
528-
}
525+
})
529526
}
530-
return nil
527+
return group.WaitResult()
531528
}
532529

533530
func (w *executor) removeAllInNamespace(ctx context.Context) error {

0 commit comments

Comments
 (0)