Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions pkg/kubelet/cm/dra/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
Copyright 2023 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package checkpoint

import (
"encoding/json"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
state "k8s.io/kubernetes/pkg/kubelet/cm/dra/state/v1"
)

const (
CheckpointAPIGroup = "checkpoint.dra.kubelet.k8s.io"
CheckpointKind = "DRACheckpoint"
CheckpointAPIVersion = CheckpointAPIGroup + "/" + state.Version
)

// Checkpoint represents a structure to store DRA checkpoint data
type Checkpoint struct {
// Data is a JSON serialized checkpoint data
// See state.CheckpointData for the details
Data string
// Checksum is a checksum of Data
Checksum checksum.Checksum
}

type CheckpointData struct {
metav1.TypeMeta
Entries state.ClaimInfoStateList
}

// NewCheckpoint creates a new checkpoint from a list of claim info states
func NewCheckpoint(data state.ClaimInfoStateList) (*Checkpoint, error) {
cpData := &CheckpointData{
TypeMeta: metav1.TypeMeta{
Kind: CheckpointKind,
APIVersion: CheckpointAPIVersion,
},
Entries: data,
}

cpDataBytes, err := json.Marshal(cpData)
if err != nil {
return nil, err
}

return &Checkpoint{
Data: string(cpDataBytes),
Checksum: checksum.New(string(cpDataBytes)),
}, nil
}

// MarshalCheckpoint marshals checkpoint to JSON
func (cp *Checkpoint) MarshalCheckpoint() ([]byte, error) {
return json.Marshal(cp)
}

// UnmarshalCheckpoint unmarshals checkpoint from JSON
// and verifies its data checksum
func (cp *Checkpoint) UnmarshalCheckpoint(blob []byte) error {
if err := json.Unmarshal(blob, cp); err != nil {
return err
}

// verify checksum
if err := cp.VerifyChecksum(); err != nil {
return err
}

return nil
}

// VerifyChecksum verifies that current checksum
// of checkpointed Data is valid
func (cp *Checkpoint) VerifyChecksum() error {
return cp.Checksum.Verify(cp.Data)
}

// GetEntries returns list of claim info states from checkpoint
func (cp *Checkpoint) GetEntries() (state.ClaimInfoStateList, error) {
var cpData CheckpointData
if err := json.Unmarshal([]byte(cp.Data), &cpData); err != nil {
return nil, err
}

return cpData.Entries, nil
}
94 changes: 94 additions & 0 deletions pkg/kubelet/cm/dra/checkpoint/checkpointer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
Copyright 2023 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package checkpoint

import (
"fmt"
"sync"

"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
state "k8s.io/kubernetes/pkg/kubelet/cm/dra/state/v1"
)

type Checkpointer interface {
GetOrCreate() (*Checkpoint, error)
Store(*Checkpoint) error
}

type checkpointer struct {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be called checkpointerV1

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it should always work with the latest API version.

sync.RWMutex
checkpointManager checkpointmanager.CheckpointManager
checkpointName string
}

// NewCheckpointer creates new checkpointer for keeping track of claim info with checkpoint backend
func NewCheckpointer(stateDir, checkpointName string) (Checkpointer, error) {
if len(checkpointName) == 0 {
return nil, fmt.Errorf("received empty string instead of checkpointName")
}

checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
}
checkpointer := &checkpointer{
checkpointManager: checkpointManager,
checkpointName: checkpointName,
}

return checkpointer, nil
}

// GetOrCreate gets list of claim info states from a checkpoint
// or creates empty list it checkpoint doesn't exist yet
func (sc *checkpointer) GetOrCreate() (*Checkpoint, error) {
sc.Lock()
defer sc.Unlock()

checkpoint, err := NewCheckpoint(nil)
if err != nil {
return nil, fmt.Errorf("failed to create new checkpoint: %w", err)
}

err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint)
if err == errors.ErrCheckpointNotFound {
sc.store(checkpoint)
return NewCheckpoint(state.ClaimInfoStateList{})
}
if err != nil {
return nil, fmt.Errorf("failed to get checkpoint %v: %w", sc.checkpointName, err)
}

return checkpoint, nil
}

// Store stores checkpoint to the file
func (sc *checkpointer) Store(checkpoint *Checkpoint) error {
sc.Lock()
defer sc.Unlock()

return sc.store(checkpoint)
}

// store saves state to a checkpoint, caller is responsible for locking
func (sc *checkpointer) store(checkpoint *Checkpoint) error {
if err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint); err != nil {
return fmt.Errorf("could not save checkpoint %s: %v", sc.checkpointName, err)
}
return nil
}
Loading