diff --git a/client/clienttest/runtime.go b/client/clienttest/runtime.go index 1c9fc87..ede91d6 100644 --- a/client/clienttest/runtime.go +++ b/client/clienttest/runtime.go @@ -48,11 +48,12 @@ func NewRuntime(followWaitTime time.Duration) client.Runtime { } } -func (r *runtime) Platform() platforms.MatchComparer { return platforms.All } -func (r *runtime) ContainerdClient() *containerd.Client { return &containerd.Client{} } -func (r *runtime) Namespace() string { return "unknown" } -func (r *runtime) ConfigRuntime(context.Context) error { return nil } -func (r *runtime) RemoveNamespace(context.Context) error { return nil } +func (r *runtime) Platform() platforms.MatchComparer { return platforms.All } +func (r *runtime) ContainerdClient() *containerd.Client { return &containerd.Client{} } +func (r *runtime) Namespace() string { return "unknown" } +func (r *runtime) ConfigRuntime(context.Context) error { return nil } +func (r *runtime) RemoveNamespace(context.Context) error { return nil } +func (r *runtime) UnpackImage(context.Context, string) error { return nil } func (r *runtime) RecommendedRuntimeInfo(context.Context, *model.Container) *containers.RuntimeInfo { return &containers.RuntimeInfo{} diff --git a/client/interface.go b/client/interface.go index c6ab88f..2706a86 100644 --- a/client/interface.go +++ b/client/interface.go @@ -66,6 +66,9 @@ type ImageManager interface { // RemoveImage remove an image from containerd RemoveImage(ctx context.Context, ref string) error + // UnpackImage unpack an image in containerd + UnpackImage(ctx context.Context, ref string) error + // GetImage return image details GetImage(ctx context.Context, ref string) (*images.Image, bool, error) } diff --git a/client/runtime.go b/client/runtime.go index de997d8..e60e17d 100644 --- a/client/runtime.go +++ b/client/runtime.go @@ -181,15 +181,10 @@ func (r *runtime) ImportImages(ctx context.Context, refs ...string) error { for _, ref := range refs { // fix: pull with unpack do not fetch missing contents - img, err := r.client.Fetch(ctx, ref, containerd.WithPlatformMatcher(r.platform), containerd.WithResolver(r.resolver)) + _, err := r.client.Fetch(ctx, ref, containerd.WithPlatformMatcher(r.platform), containerd.WithResolver(r.resolver)) if err != nil { return fmt.Errorf("import %s: %w", ref, err) } - - err = containerd.NewImageWithPlatform(r.client, img, r.platform).Unpack(ctx, containerd.DefaultSnapshotter) - if err != nil { - return fmt.Errorf("unpack %s: %w", ref, err) - } } return nil } @@ -205,6 +200,15 @@ func (r *runtime) RemoveImage(ctx context.Context, ref string) error { return ignoreNotFoundError(err) } +func (r *runtime) UnpackImage(ctx context.Context, ref string) error { + ctx = namespaces.WithNamespace(ctx, r.namespace) + img, err := r.getImage(ctx, ref) + if err != nil { + return err + } + return unpackImage(ctx, img, containerd.DefaultSnapshotter) +} + func (r *runtime) GetImage(ctx context.Context, ref string) (*images.Image, bool, error) { ctx = namespaces.WithNamespace(ctx, r.namespace) @@ -549,9 +553,10 @@ func withoutAnyMounts() oci.SpecOpts { func withNewSnapshotAndConfig(img containerd.Image, configContent []model.ConfigFile) containerd.NewContainerOpts { return func(ctx context.Context, client *containerd.Client, c *containers.Container) error { var ( - snapshotID = rand.String(10) - data = toRawConfig(configContent) - descriptor = v1.Descriptor{ + snapshotID = rand.String(10) + snapshotterName = containerd.DefaultSnapshotter + data = toRawConfig(configContent) + descriptor = v1.Descriptor{ MediaType: v1.MediaTypeImageLayer, Digest: digest.SHA256.FromBytes(data), Size: int64(len(data)), @@ -559,12 +564,16 @@ func withNewSnapshotAndConfig(img containerd.Image, configContent []model.Config ref = fmt.Sprintf("ingest-%s", descriptor.Digest) ) - diffIDs, err := img.RootFS(ctx) + err := unpackImage(ctx, img, snapshotterName) if err != nil { return err } - mounts, err := client.SnapshotService(containerd.DefaultSnapshotter).Prepare(ctx, snapshotID, identity.ChainID(diffIDs).String()) + diffIDs, err := img.RootFS(ctx) + if err != nil { + return err + } + mounts, err := client.SnapshotService(snapshotterName).Prepare(ctx, snapshotID, identity.ChainID(diffIDs).String()) if err != nil { return err } @@ -734,3 +743,16 @@ func withAllowAllDevices(_ context.Context, _ oci.Client, _ *containers.Containe } return nil } + +func unpackImage(ctx context.Context, img containerd.Image, snapshotterName string) error { + unpacked, err := img.IsUnpacked(ctx, snapshotterName) + if err != nil { + return err + } + if !unpacked { + if err := img.Unpack(ctx, snapshotterName); err != nil { + return err + } + } + return nil +} diff --git a/model/plugin.go b/model/plugin.go index 07c6e35..2d21c0d 100644 --- a/model/plugin.go +++ b/model/plugin.go @@ -21,6 +21,7 @@ import "time" // PluginInstanceDefinition contains container definitions about how // to install a plugin instance type PluginInstanceDefinition struct { + ExtraRequireImages []ExtraRequireImage `yaml:"extra_require_images,omitempty"` PrecheckContainers []ContainerDefinition `yaml:"precheck_containers,omitempty"` InitContainers []ContainerDefinition `yaml:"init_containers,omitempty"` Containers []ContainerDefinition `yaml:"containers"` @@ -28,6 +29,11 @@ type PluginInstanceDefinition struct { CleanContainers []ContainerDefinition `yaml:"clean_containers,omitempty"` } +type ExtraRequireImage struct { + Name string `yaml:"name"` + Unpack bool `yaml:"unpack,omitempty"` +} + type ContainerDefinition struct { Name string `yaml:"name"` Image string `yaml:"image"` diff --git a/plugin/executor.go b/plugin/executor.go index 33bb814..65ef853 100644 --- a/plugin/executor.go +++ b/plugin/executor.go @@ -45,6 +45,7 @@ import ( "github.com/pkg/errors" "github.com/samber/lo" "gopkg.in/yaml.v3" + apierr "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" @@ -135,7 +136,8 @@ func (w *executor) Precheck(ctx context.Context) error { return fmt.Errorf("remove precheck containers: %s", err) } - err = w.uploadContainerImages(ctx, w.instance.PrecheckContainers...) + expectImages := lo.Map(w.instance.PrecheckContainers, func(c model.ContainerDefinition, _ int) string { return c.Image }) + err = w.uploadContainerImages(ctx, expectImages...) if err != nil { return fmt.Errorf("upload precheck images: %s", err) } @@ -159,15 +161,16 @@ const ( // 2. remove operation metadata labels from containerd namespace. // 3. config container runtime. // 4. upload required images to containerd. -// 5. remove obsolete containers in the containerd namespace. -// 6. start and wait init_containers, kill the container after timeout. -// 7. start, run and update containers. -// 8. wait for all containers ready. -// 9. setup container logging config. -// 10. setup container metrics config. -// 11. start and wait post_containers, kill the container after timeout. -// 12. remove unused images from containerd. -// 13. update operation metadata labels into containerd namespace. +// 5. unpack required images in containerd. +// 6. remove obsolete containers in the containerd namespace. +// 7. start and wait init_containers, kill the container after timeout. +// 8. start, run and update containers. +// 9. wait for all containers ready. +// 10. setup container logging config. +// 11. setup container metrics config. +// 12. start and wait post_containers, kill the container after timeout. +// 13. remove unused images from containerd. +// 14. update operation metadata labels into containerd namespace. func (w *executor) Apply(ctx context.Context) error { skip, err := w.needSkipApplyPlugin(ctx) if skip || err != nil { @@ -184,11 +187,23 @@ func (w *executor) Apply(ctx context.Context) error { return fmt.Errorf("config container runtime: %s", err) } - err = w.uploadContainerImages(ctx, append(append(w.instance.InitContainers, w.instance.Containers...), w.instance.PostContainers...)...) + expectImages := lo.Union( + lo.Map(w.instance.ExtraRequireImages, func(e model.ExtraRequireImage, _ int) string { return e.Name }), + lo.Map(w.instance.InitContainers, func(c model.ContainerDefinition, _ int) string { return c.Image }), + lo.Map(w.instance.Containers, func(c model.ContainerDefinition, _ int) string { return c.Image }), + lo.Map(w.instance.PostContainers, func(c model.ContainerDefinition, _ int) string { return c.Image }), + ) + err = w.uploadContainerImages(ctx, expectImages...) if err != nil { return fmt.Errorf("upload container images: %s", err) } + unpackImages := lo.FilterMap(w.instance.ExtraRequireImages, func(e model.ExtraRequireImage, _ int) (string, bool) { return e.Name, e.Unpack }) + err = w.unpackContainerImages(ctx, unpackImages...) + if err != nil { + return fmt.Errorf("unpack container images: %s", err) + } + err = w.removeContainersInNamespaceExcludes(ctx, w.instance.Containers...) if err != nil { return fmt.Errorf("remove containers: %s", err) @@ -224,7 +239,15 @@ func (w *executor) Apply(ctx context.Context) error { return fmt.Errorf("start post containers: %s", err) } - err = w.removeUnusedImages(ctx, w.instance.Containers...) + inuseImages := lo.Union( + lo.Map(w.instance.ExtraRequireImages, func(e model.ExtraRequireImage, _ int) string { return e.Name }), + lo.Map(w.instance.PrecheckContainers, func(c model.ContainerDefinition, _ int) string { return c.Image }), + lo.Map(w.instance.InitContainers, func(c model.ContainerDefinition, _ int) string { return c.Image }), + lo.Map(w.instance.Containers, func(c model.ContainerDefinition, _ int) string { return c.Image }), + lo.Map(w.instance.PostContainers, func(c model.ContainerDefinition, _ int) string { return c.Image }), + lo.Map(w.instance.CleanContainers, func(c model.ContainerDefinition, _ int) string { return c.Image }), + ) + err = w.removeUnusedImages(ctx, inuseImages...) if err != nil { return fmt.Errorf("remove unused images: %s", err) } @@ -275,7 +298,8 @@ func (w *executor) Remove(ctx context.Context) error { } if len(w.instance.CleanContainers) != 0 { - err = w.uploadContainerImages(ctx, w.instance.CleanContainers...) + expectImages := lo.Map(w.instance.CleanContainers, func(c model.ContainerDefinition, _ int) string { return c.Image }) + err = w.uploadContainerImages(ctx, expectImages...) if err != nil { return fmt.Errorf("upload cleanup images: %s", err) } @@ -371,13 +395,24 @@ func (w *executor) configContainerRuntime(ctx context.Context) error { return w.runtime.ConfigRuntime(ctx) } -func (w *executor) uploadContainerImages(ctx context.Context, containers ...model.ContainerDefinition) error { - imageRefs := sets.NewString() - for _, c := range containers { - imageRefs.Insert(c.Image) +func (w *executor) uploadContainerImages(ctx context.Context, expectImages ...string) error { + if len(expectImages) == 0 { + return nil + } + expectImages = lo.Uniq(expectImages) + w.Infof("uploading images to containerd: %v", expectImages) + return w.runtime.ImportImages(ctx, expectImages...) +} + +func (w *executor) unpackContainerImages(ctx context.Context, expectImages ...string) error { + if len(expectImages) == 0 { + return nil } - w.Infof("uploading images to containerd: %v", imageRefs.List()) - return w.runtime.ImportImages(ctx, imageRefs.List()...) + expectImages = lo.Uniq(expectImages) + w.Infof("uppacking images in containerd: %v", expectImages) + return apierr.NewAggregate(lo.Map(expectImages, func(image string, _ int) error { + return w.runtime.UnpackImage(ctx, image) + })) } func (w *executor) removeContainersInNamespaceIncludes(ctx context.Context, containers ...model.ContainerDefinition) error { @@ -543,7 +578,7 @@ func (w *executor) removeAllInNamespace(ctx context.Context) error { return nil } -func (w *executor) removeUnusedImages(ctx context.Context, exceptImagesInContainer ...model.ContainerDefinition) error { +func (w *executor) removeUnusedImages(ctx context.Context, inuseImages ...string) error { images, err := w.runtime.ListImages(ctx) if err != nil { return err @@ -553,10 +588,7 @@ func (w *executor) removeUnusedImages(ctx context.Context, exceptImagesInContain for _, i := range images { imageSet.Insert(i.Name) } - - for _, c := range exceptImagesInContainer { - imageSet.Delete(c.Image) - } + imageSet.Delete(inuseImages...) for _, image := range imageSet.List() { w.Infof("remove image %s from containerd", image) diff --git a/plugin/executor_test.go b/plugin/executor_test.go index 89f208a..2a8cea7 100644 --- a/plugin/executor_test.go +++ b/plugin/executor_test.go @@ -108,7 +108,7 @@ func TestHostPluginExecutorApply(t *testing.T) { images, err := runtime.ListImages(ctx) Expect(err).ShouldNot(HaveOccurred()) - Expect(images).Should(HaveLen(2)) + Expect(images).Should(HaveLen(6)) }) }