From 2138a5d09f8b83b152765fce34d15387b1e722f0 Mon Sep 17 00:00:00 2001 From: zwtop Date: Mon, 25 Aug 2025 18:08:21 +0800 Subject: [PATCH] feat: support containers skip restart when apply plugin Signed-off-by: zwtop --- client/clienttest/runtime.go | 15 ++- client/interface.go | 13 ++- client/runtime.go | 68 +++++++++++++- model/plugin.go | 34 ++++--- plugin/context.go | 15 +++ plugin/context_test.go | 11 +++ plugin/executor.go | 176 +++++++++++++++++++++++++++++++---- 7 files changed, 301 insertions(+), 31 deletions(-) diff --git a/client/clienttest/runtime.go b/client/clienttest/runtime.go index 0e05838..efbb5bf 100644 --- a/client/clienttest/runtime.go +++ b/client/clienttest/runtime.go @@ -23,6 +23,7 @@ import ( "time" "github.com/containerd/containerd" + "github.com/containerd/containerd/containers" "github.com/containerd/containerd/images" "github.com/containerd/containerd/platforms" @@ -45,12 +46,16 @@ func NewRuntime(followWaitTime time.Duration) client.Runtime { } } -func (r *runtime) Platform() platforms.Matcher { return platforms.All } +func (r *runtime) Platform() platforms.MatchComparer { return platforms.All } func (r *runtime) ContainerdClient() *containerd.Client { return &containerd.Client{} } func (r *runtime) Namespace() string { return "unknown" } func (r *runtime) ConfigRuntime(context.Context) error { return nil } func (r *runtime) RemoveNamespace(context.Context) error { return nil } +func (r *runtime) RecommendedRuntimeInfo(context.Context, *model.Container) *containers.RuntimeInfo { + return &containers.RuntimeInfo{} +} + func (r *runtime) NodeExecute(ctx context.Context, name string, commands ...string) error { if len(commands) == 0 { return nil @@ -97,6 +102,14 @@ func (r *runtime) CreateContainer(ctx context.Context, container *model.Containe return nil } +func (r *runtime) UpdateContainer(_ context.Context, container *model.Container, _ *client.ContainerUpdateOptions) error { + if _, ok := r.containers[container.Name]; !ok { + return fmt.Errorf("container with name %s not exist", container.Name) + } + r.containers[container.Name] = container + return nil +} + func (r *runtime) RemoveContainer(ctx context.Context, containerID string) error { delete(r.containers, containerID) return nil diff --git a/client/interface.go b/client/interface.go index 4777469..c6ab88f 100644 --- a/client/interface.go +++ b/client/interface.go @@ -40,7 +40,7 @@ type Runtime interface { io.Closer // Platform supported by the runtime - Platform() platforms.Matcher + Platform() platforms.MatchComparer // Namespace of the current runtime in Namespace() string // NodeExecute execute commands on the runtime node @@ -72,15 +72,26 @@ type ImageManager interface { type ContainerStatus struct { containerd.Status + containerd.Task containers.Container } +type ContainerUpdateOptions struct { + UpdateSnapshot bool +} + // ContainerManager contains methods to manipulate containers managed by a // container runtime. The methods are thread-safe. type ContainerManager interface { + // RecommendedRuntimeInfo of the container. + RecommendedRuntimeInfo(ctx context.Context, container *model.Container) *containers.RuntimeInfo + // CreateContainer creates a new container. CreateContainer(ctx context.Context, container *model.Container, follow bool) error + // UpdateContainer update the container. + UpdateContainer(ctx context.Context, container *model.Container, opts *ContainerUpdateOptions) error + // RemoveContainer removes the container. RemoveContainer(ctx context.Context, containerID string) error diff --git a/client/runtime.go b/client/runtime.go index 1042cb9..04b2d1b 100644 --- a/client/runtime.go +++ b/client/runtime.go @@ -158,7 +158,7 @@ func newTCPClient(ctx context.Context, endpoint string, tlsConfig *tls.Config, t return containerd.NewWithConn(conn, containerd.WithTimeout(timeout)) } -func (r *runtime) Platform() platforms.Matcher { return r.platform } +func (r *runtime) Platform() platforms.MatchComparer { return r.platform } func (r *runtime) ContainerdClient() *containerd.Client { return r.client } func (r *runtime) Namespace() string { return r.namespace } @@ -225,6 +225,12 @@ func (r *runtime) getImage(ctx context.Context, ref string) (containerd.Image, e return containerd.NewImageWithPlatform(r.client, i, r.platform), nil } +func (r *runtime) RecommendedRuntimeInfo(ctx context.Context, container *model.Container) *containers.RuntimeInfo { + cc := &containers.Container{} + lo.Must0(withRuntime(r.runcPath, container)(ctx, r.client, cc)) + return &cc.Runtime +} + func (r *runtime) CreateContainer(ctx context.Context, container *model.Container, following bool) error { ctx = namespaces.WithNamespace(ctx, r.namespace) @@ -273,6 +279,51 @@ func (r *runtime) CreateContainer(ctx context.Context, container *model.Containe return nil } +func (r *runtime) UpdateContainer(ctx context.Context, container *model.Container, opts *ContainerUpdateOptions) error { + ctx = namespaces.WithNamespace(ctx, r.namespace) + + image, err := r.getImage(ctx, container.Image) + if err != nil { + return fmt.Errorf("get image %s: %w", container.Image, err) + } + + updateOptions := []containerd.UpdateContainerOpts{ + containerd.UpdateContainerOpts(containerd.WithImageName(container.Image)), + containerd.UpdateContainerOpts(withLogPath(container.Process.LogPath)), + // containerd.UpdateContainerOpts(withRuntime(r.runcPath, container)), fixme: runtime donot support update + containerd.UpdateContainerOpts(containerd.WithNewSpec(containerSpecOpts(r.namespace, image, container)...)), + } + if container.Process.RestartPolicy == model.RestartPolicyAlways { + updateOptions = append(updateOptions, restart.WithStatus(containerd.Running)) + } + if opts.UpdateSnapshot { + updateOptions = append(updateOptions, containerd.UpdateContainerOpts(withNewSnapshotAndConfig(image, container.ConfigContent))) + } + + c, err := r.client.LoadContainer(ctx, container.Name) + if err != nil { + return fmt.Errorf("load container: %w", err) + } + err = c.Update(ctx, updateOptions...) + if err != nil { + return fmt.Errorf("update container: %w", err) + } + + task, err := c.Task(ctx, nil) + if err != nil { + if errdefs.IsNotFound(err) { + return nil + } + return fmt.Errorf("load task: %w", err) + } + + spec, err := c.Spec(ctx) + if err != nil { + return fmt.Errorf("get container spec: %w", err) + } + return task.Update(ctx, containerd.WithResources(spec.Linux.Resources)) +} + func (r *runtime) RemoveContainer(ctx context.Context, containerID string) error { ctx = namespaces.WithNamespace(ctx, r.namespace) @@ -354,6 +405,7 @@ func (r *runtime) GetContainerStatus(ctx context.Context, containerID string) (C return ContainerStatus{ Status: status, + Task: task, Container: lo.Must(c.Info(ctx, containerd.WithoutRefreshedMetadata)), }, nil } @@ -399,6 +451,10 @@ func (r *runtime) doConfig(ctx context.Context) error { return nil } +func ContainerSpecOpts(namespace string, img containerd.Image, container *model.Container) []oci.SpecOpts { + return containerSpecOpts(namespace, img, container) +} + func containerSpecOpts(namespace string, img containerd.Image, container *model.Container) []oci.SpecOpts { var specOpts []oci.SpecOpts specOpts = append(specOpts, oci.WithProcessCwd(container.Process.WorkingDir)) @@ -576,6 +632,16 @@ func withLogPath(logPath string) func(ctx context.Context, client *containerd.Cl } } +func GetLogPath(c *containers.Container) string { + if c.Labels[restart.LogURILabel] != "" { + return strings.TrimPrefix(c.Labels[restart.LogURILabel], "file://") + } + if c.Labels[restart.LogPathLabel] != "" { + return c.Labels[restart.LogPathLabel] + } + return "" +} + func withRlimits(rlimits []specs.POSIXRlimit) oci.SpecOpts { return func(_ context.Context, _ oci.Client, _ *containers.Container, spec *oci.Spec) error { if spec.Process == nil { diff --git a/model/plugin.go b/model/plugin.go index 35af26d..2836525 100644 --- a/model/plugin.go +++ b/model/plugin.go @@ -29,17 +29,18 @@ type PluginInstanceDefinition struct { } type ContainerDefinition struct { - Name string `yaml:"name"` - Image string `yaml:"image"` - Mounts []MountDefinition `yaml:"mounts,omitempty"` - Process ProcessDefinition `yaml:"process"` - Logging *LoggingDefinition `yaml:"logging,omitempty"` - Metrics *MetricsDefinition `yaml:"metrics,omitempty"` - Resources *ResourceDefinition `yaml:"resources,omitempty"` - Runtime *RuntimeDefinition `yaml:"runtime,omitempty"` - StartupProbe *ContainerProbe `yaml:"startup_probe,omitempty"` - LivenessProbe *ContainerProbe `yaml:"liveness_probe,omitempty"` - SpecPatches []string `yaml:"spec_patches,omitempty"` + Name string `yaml:"name"` + Image string `yaml:"image"` + Mounts []MountDefinition `yaml:"mounts,omitempty"` + Process ProcessDefinition `yaml:"process"` + Logging *LoggingDefinition `yaml:"logging,omitempty"` + Metrics *MetricsDefinition `yaml:"metrics,omitempty"` + Resources *ResourceDefinition `yaml:"resources,omitempty"` + Runtime *RuntimeDefinition `yaml:"runtime,omitempty"` + UpdatePolicy *UpdatePolicyDefinition `yaml:"update_policy,omitempty"` + StartupProbe *ContainerProbe `yaml:"startup_probe,omitempty"` + LivenessProbe *ContainerProbe `yaml:"liveness_probe,omitempty"` + SpecPatches []string `yaml:"spec_patches,omitempty"` } type MountDefinition struct { @@ -93,6 +94,17 @@ type RuntimeDefinition struct { SystemdCgroup bool `yaml:"systemd_cgroup,omitempty"` } +type UpdatePolicyDefinition struct { + OnNoChange UpdatePolicyMode `yaml:"on_no_change,omitempty"` +} + +type UpdatePolicyMode string + +const ( + UpdatePolicyModeSkip UpdatePolicyMode = "skip" + UpdatePolicyModeRestart UpdatePolicyMode = "restart" +) + type POSIXRlimit struct { Type string `yaml:"type"` Hard uint64 `yaml:"hard"` diff --git a/plugin/context.go b/plugin/context.go index c9dc2ea..14d5fe0 100644 --- a/plugin/context.go +++ b/plugin/context.go @@ -26,6 +26,7 @@ type ContextKey string const ( ContextSkipNoChange ContextKey = "skip-no-change" + ContextForceUpdate ContextKey = "force-update" ) func SetSkipNoChange(ctx context.Context, skip bool) context.Context { @@ -41,3 +42,17 @@ func GetSkipNoChange(ctx context.Context) bool { func WithSkipNoChange(ctx context.Context) context.Context { return SetSkipNoChange(ctx, true) } + +func SetForceUpdate(ctx context.Context, force bool) context.Context { + return context.WithValue(ctx, ContextForceUpdate, force) +} + +func GetForceUpdate(ctx context.Context) bool { + v := ctx.Value(ContextForceUpdate) + force, ok := v.(bool) + return lo.If(ok, force).Else(false) +} + +func WithForceUpdate(ctx context.Context) context.Context { + return SetForceUpdate(ctx, true) +} diff --git a/plugin/context_test.go b/plugin/context_test.go index 16f5149..001c6a9 100644 --- a/plugin/context_test.go +++ b/plugin/context_test.go @@ -35,3 +35,14 @@ func TestContextSkipNoChange(t *testing.T) { ctx = plugin.SetSkipNoChange(ctx, false) Expect(plugin.GetSkipNoChange(ctx)).Should(BeFalse()) } + +func TestContextForceUpdate(t *testing.T) { + RegisterTestingT(t) + + ctx := context.Background() + Expect(plugin.GetForceUpdate(ctx)).Should(BeFalse()) + ctx = plugin.WithForceUpdate(ctx) + Expect(plugin.GetForceUpdate(ctx)).Should(BeTrue()) + ctx = plugin.SetForceUpdate(ctx, false) + Expect(plugin.GetForceUpdate(ctx)).Should(BeFalse()) +} diff --git a/plugin/executor.go b/plugin/executor.go index c74550a..1e286ed 100644 --- a/plugin/executor.go +++ b/plugin/executor.go @@ -20,9 +20,12 @@ import ( "context" "crypto/sha1" "crypto/tls" + "encoding/json" "fmt" "io" "net/http" + "reflect" + "sort" "strconv" "strings" gosync "sync" @@ -30,10 +33,14 @@ import ( "github.com/containerd/containerd" nsapi "github.com/containerd/containerd/api/services/namespaces/v1" + "github.com/containerd/containerd/containers" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" "github.com/containerd/containerd/leases" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/oci" "github.com/gogo/protobuf/types" + "github.com/opencontainers/image-spec/identity" "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" "github.com/samber/lo" @@ -123,7 +130,7 @@ func (w *executor) Precheck(ctx context.Context) error { return nil } - err := w.removeContainersInNamespace(ctx, w.instance.PrecheckContainers...) + err := w.removeContainersInNamespaceIncludes(ctx, w.instance.PrecheckContainers...) if err != nil { return fmt.Errorf("remove precheck containers: %s", err) } @@ -152,9 +159,9 @@ const ( // 2. remove operation metadata labels from containerd namespace. // 3. config container runtime. // 4. upload required images to containerd. -// 5. remove all containers in the containerd namespace. +// 5. remove obsolete containers in the containerd namespace. // 6. start and wait init_containers, kill the container after timeout. -// 7. start and run containers. +// 7. start, run and update containers. // 8. wait for all containers ready. // 9. setup container logging config. // 10. setup container metrics config. @@ -182,7 +189,7 @@ func (w *executor) Apply(ctx context.Context) error { return fmt.Errorf("upload container images: %s", err) } - err = w.removeContainersInNamespace(ctx) + err = w.removeContainersInNamespaceExcludes(ctx, w.instance.Containers...) if err != nil { return fmt.Errorf("remove containers: %s", err) } @@ -192,7 +199,7 @@ func (w *executor) Apply(ctx context.Context) error { return fmt.Errorf("start init containers: %s", err) } - err = w.startContainers(ctx, w.instance.Containers...) + err = w.runContainers(ctx, w.instance.Containers...) if err != nil { return fmt.Errorf("start workload containers: %s", err) } @@ -262,7 +269,7 @@ func (w *executor) Remove(ctx context.Context) error { return fmt.Errorf("remove metrics: %s", err) } - err = w.removeContainersInNamespace(ctx) + err = w.removeContainersInNamespace(ctx, nil, nil) if err != nil { return fmt.Errorf("remove containers: %s", err) } @@ -326,7 +333,7 @@ func (w *executor) HealthProbe(ctx context.Context) *model.PluginInstanceHealthR } func (w *executor) needSkipApplyPlugin(ctx context.Context) (bool, error) { - if !GetSkipNoChange(ctx) { + if !GetSkipNoChange(ctx) || GetForceUpdate(ctx) { return false, nil } @@ -355,7 +362,7 @@ func (w *executor) needSkipApplyPlugin(ctx context.Context) (bool, error) { } } - w.Infof("skip plugin apply becauseof no changes since the last update") + w.Infof("skip plugin apply becauseof no changes since update at %s", timestamp.Format("2006-01-02T15:04:05Z")) return true, nil } @@ -373,10 +380,18 @@ func (w *executor) uploadContainerImages(ctx context.Context, containers ...mode return w.runtime.ImportImages(ctx, imageRefs.List()...) } -func (w *executor) removeContainersInNamespace(ctx context.Context, containers ...model.ContainerDefinition) error { +func (w *executor) removeContainersInNamespaceIncludes(ctx context.Context, containers ...model.ContainerDefinition) error { + return w.removeContainersInNamespace(ctx, containers, nil) +} + +func (w *executor) removeContainersInNamespaceExcludes(ctx context.Context, containers ...model.ContainerDefinition) error { + return w.removeContainersInNamespace(ctx, nil, containers) +} + +func (w *executor) removeContainersInNamespace(ctx context.Context, includes, excludes []model.ContainerDefinition) error { var containersToRemove []string - if len(containers) == 0 { // remove all containers on containerd + if len(includes) == 0 { // remove all containers on containerd cs, err := w.runtime.ListContainers(ctx) if err != nil { return err @@ -385,12 +400,16 @@ func (w *executor) removeContainersInNamespace(ctx context.Context, containers . containersToRemove = append(containersToRemove, c.Name) } } else { - for _, c := range containers { + for _, c := range includes { containersToRemove = append(containersToRemove, c.Name) } } + excludeNames := lo.Map(excludes, func(cd model.ContainerDefinition, _ int) string { return cd.Name }) for _, c := range containersToRemove { + if lo.Contains(excludeNames, c) { + continue + } w.Infof("remove container %s from containerd", c) if err := w.runtime.RemoveContainer(ctx, c); err != nil { return err @@ -465,21 +484,49 @@ func (w *executor) loadContainerProbe(probe *model.ContainerProbe) *model.Contai return probe } -func (w *executor) startContainers(ctx context.Context, containers ...model.ContainerDefinition) error { +func (w *executor) runContainers(ctx context.Context, containers ...model.ContainerDefinition) error { for item := range containers { // fix: Implicit memory aliasing in for loop c := containers[item] - w.Infof("start container %s", c.Name) - err := w.runtime.CreateContainer(ctx, toRuntimeContainer(&c, model.RestartPolicyAlways), false) - if err != nil { - return err + updatePolicyMode := model.UpdatePolicyModeRestart + if !GetForceUpdate(ctx) && c.UpdatePolicy != nil && c.UpdatePolicy.OnNoChange != "" { + updatePolicyMode = c.UpdatePolicy.OnNoChange + } + mc := toRuntimeContainer(&c, model.RestartPolicyAlways) + switch updatePolicyMode { + case model.UpdatePolicyModeSkip: + can, err := canSkipRestart(ctx, w.runtime, mc) + if err != nil { + return err + } + if can { + w.Infof("update container %s and skip restart", c.Name) + err = w.runtime.UpdateContainer(ctx, mc, &client.ContainerUpdateOptions{}) + if err != nil { + return err + } + continue + } + fallthrough + case model.UpdatePolicyModeRestart: + if _, err := w.runtime.GetContainer(ctx, c.Name); err == nil || !errdefs.IsNotFound(err) { + w.Infof("remove container %s from containerd", c.Name) + _ = w.runtime.RemoveContainer(ctx, c.Name) + } + w.Infof("start and run container %s", c.Name) + err := w.runtime.CreateContainer(ctx, mc, false) + if err != nil { + return err + } + default: + return fmt.Errorf("unknown update policy mode: %s", updatePolicyMode) } } return nil } func (w *executor) removeAllInNamespace(ctx context.Context) error { - err := w.removeContainersInNamespace(ctx) + err := w.removeContainersInNamespace(ctx, nil, nil) if err != nil { return err } @@ -768,3 +815,98 @@ func namespaceLabels(ctx context.Context, runtime client.Runtime) (map[string]st ctx = namespaces.WithNamespace(ctx, runtime.Namespace()) return p.ContainerdClient().NamespaceService().Labels(ctx, runtime.Namespace()) } + +// container can select skip restart when: +// 1. container state is running +// 2. logging path donot change +// 3. container options donot update +// 4. snapshot parent donot change +// 5. spec (except resource) donot change +func canSkipRestart(ctx context.Context, runtime client.Runtime, mc *model.Container) (bool, error) { + p, ok := runtime.(client.ContainerdClientProvider) + if !ok { + return false, nil + } + c := p.ContainerdClient() + ctx = namespaces.WithNamespace(ctx, runtime.Namespace()) + + status, err := runtime.GetContainerStatus(ctx, mc.Name) + if err != nil { + if errdefs.IsNotFound(err) { + return false, nil + } + return false, fmt.Errorf("status of %s: %w", mc.Name, err) + } + + if status.Status.Status != containerd.Running { + return false, nil + } + + if client.GetLogPath(&status.Container) != mc.Process.LogPath { + return false, nil + } + + runtimeInfo := runtime.RecommendedRuntimeInfo(ctx, mc) + if !reflect.DeepEqual(status.Runtime, *runtimeInfo) { + return false, nil + } + + img, err := c.ImageService().Get(ctx, mc.Image) + if err != nil { + return false, fmt.Errorf("image %s: %w", mc.Image, err) + } + digests, err := img.RootFS(ctx, c.ContentStore(), runtime.Platform()) + if err != nil { + return false, fmt.Errorf("image %s: %w", mc.Image, err) + } + targetSnapshotID := identity.ChainID(digests).String() + + info, err := c.SnapshotService(status.Snapshotter).Stat(ctx, status.SnapshotKey) + if err != nil { + return false, fmt.Errorf("snapshot %s: %w", mc.Name, err) + } + if info.Parent != targetSnapshotID { + return false, nil + } + + newSpec, err := containerSpec(ctx, c, runtime.Namespace(), mc, img) + if err != nil { + return false, fmt.Errorf("generate %s spec: %w", mc.Name, err) + } + + oldSpec := &specs.Spec{} + err = json.Unmarshal(status.Container.Spec.Value, oldSpec) + if err != nil { + return false, fmt.Errorf("decode %s spec: %w", mc.Name, err) + } + + for _, spec := range []*specs.Spec{oldSpec, newSpec} { + if spec.Linux != nil { // donot need restart when update resource + spec.Linux.Resources = nil + } + if spec.Process != nil { + runtimeENV := []string{client.ENVRuntimeContainerName, client.ENVRuntimeContainerNamespace, client.ENVRuntimeContainerImage} + spec.Process.Env = lo.Filter(spec.Process.Env, func(env string, _ int) bool { + return !lo.Contains(runtimeENV, strings.Split(env, "=")[0]) + }) + if spec.Process.Capabilities != nil { + sort.Strings(spec.Process.Capabilities.Bounding) + sort.Strings(spec.Process.Capabilities.Effective) + sort.Strings(spec.Process.Capabilities.Inheritable) + sort.Strings(spec.Process.Capabilities.Permitted) + sort.Strings(spec.Process.Capabilities.Ambient) + } + } + } + + newSpecRaw := lo.Must(json.Marshal(newSpec)) + oldSpecRaw := lo.Must(json.Marshal(oldSpec)) + return string(newSpecRaw) == string(oldSpecRaw), nil +} + +func containerSpec(ctx context.Context, c *containerd.Client, namespace string, mc *model.Container, image images.Image) (*specs.Spec, error) { + ctx = namespaces.WithNamespace(ctx, namespace) + cc := &containers.Container{ID: mc.Name} + img := containerd.NewImage(c, image) + return oci.GenerateSpec(ctx, c, cc, client.ContainerSpecOpts(namespace, img, mc)...) +}