Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion client/clienttest/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/containerd/containerd"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/platforms"

Expand All @@ -45,12 +46,16 @@ func NewRuntime(followWaitTime time.Duration) client.Runtime {
}
}

func (r *runtime) Platform() platforms.Matcher { return platforms.All }
func (r *runtime) Platform() platforms.MatchComparer { return platforms.All }
func (r *runtime) ContainerdClient() *containerd.Client { return &containerd.Client{} }
func (r *runtime) Namespace() string { return "unknown" }
func (r *runtime) ConfigRuntime(context.Context) error { return nil }
func (r *runtime) RemoveNamespace(context.Context) error { return nil }

func (r *runtime) RecommendedRuntimeInfo(context.Context, *model.Container) *containers.RuntimeInfo {
return &containers.RuntimeInfo{}
}

func (r *runtime) NodeExecute(ctx context.Context, name string, commands ...string) error {
if len(commands) == 0 {
return nil
Expand Down Expand Up @@ -97,6 +102,14 @@ func (r *runtime) CreateContainer(ctx context.Context, container *model.Containe
return nil
}

func (r *runtime) UpdateContainer(_ context.Context, container *model.Container, _ *client.ContainerUpdateOptions) error {
if _, ok := r.containers[container.Name]; !ok {
return fmt.Errorf("container with name %s not exist", container.Name)
}
r.containers[container.Name] = container
return nil
}

func (r *runtime) RemoveContainer(ctx context.Context, containerID string) error {
delete(r.containers, containerID)
return nil
Expand Down
13 changes: 12 additions & 1 deletion client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Runtime interface {
io.Closer

// Platform supported by the runtime
Platform() platforms.Matcher
Platform() platforms.MatchComparer
// Namespace of the current runtime in
Namespace() string
// NodeExecute execute commands on the runtime node
Expand Down Expand Up @@ -72,15 +72,26 @@ type ImageManager interface {

type ContainerStatus struct {
containerd.Status
containerd.Task
containers.Container
}

type ContainerUpdateOptions struct {
UpdateSnapshot bool
}

// ContainerManager contains methods to manipulate containers managed by a
// container runtime. The methods are thread-safe.
type ContainerManager interface {
// RecommendedRuntimeInfo of the container.
RecommendedRuntimeInfo(ctx context.Context, container *model.Container) *containers.RuntimeInfo

// CreateContainer creates a new container.
CreateContainer(ctx context.Context, container *model.Container, follow bool) error

// UpdateContainer update the container.
UpdateContainer(ctx context.Context, container *model.Container, opts *ContainerUpdateOptions) error

// RemoveContainer removes the container.
RemoveContainer(ctx context.Context, containerID string) error

Expand Down
68 changes: 67 additions & 1 deletion client/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func newTCPClient(ctx context.Context, endpoint string, tlsConfig *tls.Config, t
return containerd.NewWithConn(conn, containerd.WithTimeout(timeout))
}

func (r *runtime) Platform() platforms.Matcher { return r.platform }
func (r *runtime) Platform() platforms.MatchComparer { return r.platform }
func (r *runtime) ContainerdClient() *containerd.Client { return r.client }
func (r *runtime) Namespace() string { return r.namespace }

Expand Down Expand Up @@ -225,6 +225,12 @@ func (r *runtime) getImage(ctx context.Context, ref string) (containerd.Image, e
return containerd.NewImageWithPlatform(r.client, i, r.platform), nil
}

func (r *runtime) RecommendedRuntimeInfo(ctx context.Context, container *model.Container) *containers.RuntimeInfo {
cc := &containers.Container{}
lo.Must0(withRuntime(r.runcPath, container)(ctx, r.client, cc))
return &cc.Runtime
}

func (r *runtime) CreateContainer(ctx context.Context, container *model.Container, following bool) error {
ctx = namespaces.WithNamespace(ctx, r.namespace)

Expand Down Expand Up @@ -273,6 +279,51 @@ func (r *runtime) CreateContainer(ctx context.Context, container *model.Containe
return nil
}

func (r *runtime) UpdateContainer(ctx context.Context, container *model.Container, opts *ContainerUpdateOptions) error {
ctx = namespaces.WithNamespace(ctx, r.namespace)

image, err := r.getImage(ctx, container.Image)
if err != nil {
return fmt.Errorf("get image %s: %w", container.Image, err)
}

updateOptions := []containerd.UpdateContainerOpts{
containerd.UpdateContainerOpts(containerd.WithImageName(container.Image)),
containerd.UpdateContainerOpts(withLogPath(container.Process.LogPath)),
// containerd.UpdateContainerOpts(withRuntime(r.runcPath, container)), fixme: runtime donot support update
containerd.UpdateContainerOpts(containerd.WithNewSpec(containerSpecOpts(r.namespace, image, container)...)),
}
if container.Process.RestartPolicy == model.RestartPolicyAlways {
updateOptions = append(updateOptions, restart.WithStatus(containerd.Running))
}
if opts.UpdateSnapshot {
updateOptions = append(updateOptions, containerd.UpdateContainerOpts(withNewSnapshotAndConfig(image, container.ConfigContent)))
}

c, err := r.client.LoadContainer(ctx, container.Name)
if err != nil {
return fmt.Errorf("load container: %w", err)
}
err = c.Update(ctx, updateOptions...)
if err != nil {
return fmt.Errorf("update container: %w", err)
}

task, err := c.Task(ctx, nil)
if err != nil {
if errdefs.IsNotFound(err) {
return nil
}
return fmt.Errorf("load task: %w", err)
}

spec, err := c.Spec(ctx)
if err != nil {
return fmt.Errorf("get container spec: %w", err)
}
return task.Update(ctx, containerd.WithResources(spec.Linux.Resources))
}

func (r *runtime) RemoveContainer(ctx context.Context, containerID string) error {
ctx = namespaces.WithNamespace(ctx, r.namespace)

Expand Down Expand Up @@ -354,6 +405,7 @@ func (r *runtime) GetContainerStatus(ctx context.Context, containerID string) (C

return ContainerStatus{
Status: status,
Task: task,
Container: lo.Must(c.Info(ctx, containerd.WithoutRefreshedMetadata)),
}, nil
}
Expand Down Expand Up @@ -399,6 +451,10 @@ func (r *runtime) doConfig(ctx context.Context) error {
return nil
}

func ContainerSpecOpts(namespace string, img containerd.Image, container *model.Container) []oci.SpecOpts {
return containerSpecOpts(namespace, img, container)
}

func containerSpecOpts(namespace string, img containerd.Image, container *model.Container) []oci.SpecOpts {
var specOpts []oci.SpecOpts
specOpts = append(specOpts, oci.WithProcessCwd(container.Process.WorkingDir))
Expand Down Expand Up @@ -576,6 +632,16 @@ func withLogPath(logPath string) func(ctx context.Context, client *containerd.Cl
}
}

func GetLogPath(c *containers.Container) string {
if c.Labels[restart.LogURILabel] != "" {
return strings.TrimPrefix(c.Labels[restart.LogURILabel], "file://")
}
if c.Labels[restart.LogPathLabel] != "" {
return c.Labels[restart.LogPathLabel]
}
return ""
}

func withRlimits(rlimits []specs.POSIXRlimit) oci.SpecOpts {
return func(_ context.Context, _ oci.Client, _ *containers.Container, spec *oci.Spec) error {
if spec.Process == nil {
Expand Down
34 changes: 23 additions & 11 deletions model/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,18 @@ type PluginInstanceDefinition struct {
}

type ContainerDefinition struct {
Name string `yaml:"name"`
Image string `yaml:"image"`
Mounts []MountDefinition `yaml:"mounts,omitempty"`
Process ProcessDefinition `yaml:"process"`
Logging *LoggingDefinition `yaml:"logging,omitempty"`
Metrics *MetricsDefinition `yaml:"metrics,omitempty"`
Resources *ResourceDefinition `yaml:"resources,omitempty"`
Runtime *RuntimeDefinition `yaml:"runtime,omitempty"`
StartupProbe *ContainerProbe `yaml:"startup_probe,omitempty"`
LivenessProbe *ContainerProbe `yaml:"liveness_probe,omitempty"`
SpecPatches []string `yaml:"spec_patches,omitempty"`
Name string `yaml:"name"`
Image string `yaml:"image"`
Mounts []MountDefinition `yaml:"mounts,omitempty"`
Process ProcessDefinition `yaml:"process"`
Logging *LoggingDefinition `yaml:"logging,omitempty"`
Metrics *MetricsDefinition `yaml:"metrics,omitempty"`
Resources *ResourceDefinition `yaml:"resources,omitempty"`
Runtime *RuntimeDefinition `yaml:"runtime,omitempty"`
UpdatePolicy *UpdatePolicyDefinition `yaml:"update_policy,omitempty"`
StartupProbe *ContainerProbe `yaml:"startup_probe,omitempty"`
LivenessProbe *ContainerProbe `yaml:"liveness_probe,omitempty"`
SpecPatches []string `yaml:"spec_patches,omitempty"`
}

type MountDefinition struct {
Expand Down Expand Up @@ -93,6 +94,17 @@ type RuntimeDefinition struct {
SystemdCgroup bool `yaml:"systemd_cgroup,omitempty"`
}

type UpdatePolicyDefinition struct {
OnNoChange UpdatePolicyMode `yaml:"on_no_change,omitempty"`
}

type UpdatePolicyMode string

const (
UpdatePolicyModeSkip UpdatePolicyMode = "skip"
UpdatePolicyModeRestart UpdatePolicyMode = "restart"
)

type POSIXRlimit struct {
Type string `yaml:"type"`
Hard uint64 `yaml:"hard"`
Expand Down
15 changes: 15 additions & 0 deletions plugin/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ContextKey string

const (
ContextSkipNoChange ContextKey = "skip-no-change"
ContextForceUpdate ContextKey = "force-update"
)

func SetSkipNoChange(ctx context.Context, skip bool) context.Context {
Expand All @@ -41,3 +42,17 @@ func GetSkipNoChange(ctx context.Context) bool {
func WithSkipNoChange(ctx context.Context) context.Context {
return SetSkipNoChange(ctx, true)
}

func SetForceUpdate(ctx context.Context, force bool) context.Context {
return context.WithValue(ctx, ContextForceUpdate, force)
}

func GetForceUpdate(ctx context.Context) bool {
v := ctx.Value(ContextForceUpdate)
force, ok := v.(bool)
return lo.If(ok, force).Else(false)
}

func WithForceUpdate(ctx context.Context) context.Context {
return SetForceUpdate(ctx, true)
}
11 changes: 11 additions & 0 deletions plugin/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,14 @@ func TestContextSkipNoChange(t *testing.T) {
ctx = plugin.SetSkipNoChange(ctx, false)
Expect(plugin.GetSkipNoChange(ctx)).Should(BeFalse())
}

func TestContextForceUpdate(t *testing.T) {
RegisterTestingT(t)

ctx := context.Background()
Expect(plugin.GetForceUpdate(ctx)).Should(BeFalse())
ctx = plugin.WithForceUpdate(ctx)
Expect(plugin.GetForceUpdate(ctx)).Should(BeTrue())
ctx = plugin.SetForceUpdate(ctx, false)
Expect(plugin.GetForceUpdate(ctx)).Should(BeFalse())
}
Loading
Loading