Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add the codec support #127

Merged
merged 1 commit into from
Mar 20, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 33 additions & 10 deletions pkg/backend/build/builder.go
Original file line number Diff line number Diff line change
@@ -26,8 +26,8 @@ import (
"path/filepath"
"time"

"github.com/CloudNativeAI/modctl/pkg/archiver"
"github.com/CloudNativeAI/modctl/pkg/backend/build/hooks"
"github.com/CloudNativeAI/modctl/pkg/codec"
"github.com/CloudNativeAI/modctl/pkg/modelfile"
"github.com/CloudNativeAI/modctl/pkg/storage"

@@ -38,11 +38,6 @@ import (
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

// tarHeaderSize is the size of a tar header.
// TODO: the real size should be calculated based on the actual stream,
// now we use a fixed size in order to avoid extra read costs.
const tarHeaderSize = 512

// OutputType defines the type of output to generate.
type OutputType string

@@ -67,7 +62,7 @@ type Builder interface {

type OutputStrategy interface {
// OutputLayer outputs the layer blob to the storage (local or remote).
OutputLayer(ctx context.Context, mediaType, workDir, relPath string, size int64, reader io.Reader, hooks hooks.Hooks) (ocispec.Descriptor, error)
OutputLayer(ctx context.Context, mediaType, relPath, digest string, size int64, reader io.Reader, hooks hooks.Hooks) (ocispec.Descriptor, error)

// OutputConfig outputs the config blob to the storage (local or remote).
OutputConfig(ctx context.Context, mediaType, digest string, size int64, reader io.Reader, hooks hooks.Hooks) (ocispec.Descriptor, error)
@@ -141,12 +136,40 @@ func (ab *abstractBuilder) BuildLayer(ctx context.Context, mediaType, workDir, p
return ocispec.Descriptor{}, fmt.Errorf("failed to get relative path: %w", err)
}

reader, err := archiver.Tar(path, workDirPath)
codec, err := codec.New(codec.TypeFromMediaType(mediaType))
if err != nil {
return ocispec.Descriptor{}, fmt.Errorf("failed to create codec: %w", err)
}

// Encode the content by codec depends on the media type.
reader, err := codec.Encode(path, workDirPath)
if err != nil {
return ocispec.Descriptor{}, fmt.Errorf("failed to encode file: %w", err)
}

// Calculate the digest of the encoded content.
hash := sha256.New()
size, err := io.Copy(hash, reader)
if err != nil {
return ocispec.Descriptor{}, fmt.Errorf("failed to tar file: %w", err)
return ocispec.Descriptor{}, fmt.Errorf("failed to copy content to hash: %w", err)
}

digest := fmt.Sprintf("sha256:%x", hash.Sum(nil))

// Seek the reader to the beginning if supported,
// otherwise we needs to re-encode the content again.
if seeker, ok := reader.(io.ReadSeeker); ok {
if _, err := seeker.Seek(0, io.SeekStart); err != nil {
return ocispec.Descriptor{}, fmt.Errorf("failed to seek reader: %w", err)
}
} else {
reader, err = codec.Encode(path, workDirPath)
if err != nil {
return ocispec.Descriptor{}, fmt.Errorf("failed to encode file: %w", err)
}
}

return ab.strategy.OutputLayer(ctx, mediaType, workDir, relPath, info.Size()+tarHeaderSize, reader, hooks)
return ab.strategy.OutputLayer(ctx, mediaType, relPath, digest, size, reader, hooks)
}

func (ab *abstractBuilder) BuildConfig(ctx context.Context, layers []ocispec.Descriptor, hooks hooks.Hooks) (ocispec.Descriptor, error) {
10 changes: 5 additions & 5 deletions pkg/backend/build/builder_test.go
Original file line number Diff line number Diff line change
@@ -116,26 +116,26 @@ func (s *BuilderTestSuite) TestNewBuilder() {
func (s *BuilderTestSuite) TestBuildLayer() {
s.Run("successful build layer", func() {
expectedDesc := ocispec.Descriptor{
MediaType: "test/media-type",
MediaType: "test/media-type.tar",
Digest: "sha256:test",
Size: 100,
}

s.mockOutputStrategy.On("OutputLayer", mock.Anything, "test/media-type", s.tempDir, "test-file.txt", mock.AnythingOfType("int64"), mock.AnythingOfType("*io.PipeReader"), mock.Anything).
s.mockOutputStrategy.On("OutputLayer", mock.Anything, "test/media-type.tar", "test-file.txt", mock.AnythingOfType("string"), mock.AnythingOfType("int64"), mock.AnythingOfType("*io.PipeReader"), mock.Anything).
Return(expectedDesc, nil)

desc, err := s.builder.BuildLayer(context.Background(), "test/media-type", s.tempDir, s.tempFile, hooks.NewHooks())
desc, err := s.builder.BuildLayer(context.Background(), "test/media-type.tar", s.tempDir, s.tempFile, hooks.NewHooks())
s.NoError(err)
s.Equal(expectedDesc, desc)
})

s.Run("file not found", func() {
_, err := s.builder.BuildLayer(context.Background(), "test/media-type", s.tempDir, filepath.Join(s.tempDir, "non-existent.txt"), hooks.NewHooks())
_, err := s.builder.BuildLayer(context.Background(), "test/media-type.tar", s.tempDir, filepath.Join(s.tempDir, "non-existent.txt"), hooks.NewHooks())
s.Error(err)
})

s.Run("directory not supported", func() {
_, err := s.builder.BuildLayer(context.Background(), "test/media-type", s.tempDir, s.tempDir, hooks.NewHooks())
_, err := s.builder.BuildLayer(context.Background(), "test/media-type.tar", s.tempDir, s.tempDir, hooks.NewHooks())
s.Error(err)
s.True(strings.Contains(err.Error(), "is a directory and not supported yet"))
})
2 changes: 1 addition & 1 deletion pkg/backend/build/local.go
Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@ type localOutput struct {
}

// OutputLayer outputs the layer blob to the local storage.
func (lo *localOutput) OutputLayer(ctx context.Context, mediaType, workDir, relPath string, size int64, reader io.Reader, hooks hooks.Hooks) (ocispec.Descriptor, error) {
func (lo *localOutput) OutputLayer(ctx context.Context, mediaType, relPath, digest string, size int64, reader io.Reader, hooks hooks.Hooks) (ocispec.Descriptor, error) {
reader = hooks.OnStart(relPath, size, reader)
digest, size, err := lo.store.PushBlob(ctx, lo.repo, reader, ocispec.Descriptor{})
if err != nil {
2 changes: 1 addition & 1 deletion pkg/backend/build/local_test.go
Original file line number Diff line number Diff line change
@@ -72,7 +72,7 @@ func (s *LocalOutputTestSuite) TestOutputLayer() {
s.mockStorage.On("PushBlob", s.ctx, "test-repo", mock.Anything, ocispec.Descriptor{}).
Return(expectedDigest, expectedSize, nil).Once()

desc, err := s.localOutput.OutputLayer(s.ctx, "test/mediatype", "/work", "test-file.txt", expectedSize, reader, hooks.NewHooks())
desc, err := s.localOutput.OutputLayer(s.ctx, "test/mediatype", "test-file.txt", expectedDigest, expectedSize, reader, hooks.NewHooks())

s.NoError(err)
s.Equal("test/mediatype", desc.MediaType)
18 changes: 2 additions & 16 deletions pkg/backend/build/remote.go
Original file line number Diff line number Diff line change
@@ -22,13 +22,10 @@ import (
"fmt"
"io"
"net/http"
"path/filepath"

"github.com/CloudNativeAI/modctl/pkg/archiver"
"github.com/CloudNativeAI/modctl/pkg/backend/build/hooks"

modelspec "github.com/CloudNativeAI/model-spec/specs-go/v1"
sha256 "github.com/minio/sha256-simd"
godigest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/registry/remote"
@@ -80,27 +77,16 @@ type remoteOutput struct {
}

// OutputLayer outputs the layer blob to the remote storage.
func (ro *remoteOutput) OutputLayer(ctx context.Context, mediaType, workDir, relPath string, size int64, reader io.Reader, hooks hooks.Hooks) (ocispec.Descriptor, error) {
hash := sha256.New()
size, err := io.Copy(hash, reader)
if err != nil {
return ocispec.Descriptor{}, fmt.Errorf("failed to copy layer to hash: %w", err)
}

func (ro *remoteOutput) OutputLayer(ctx context.Context, mediaType, relPath, digest string, size int64, reader io.Reader, hooks hooks.Hooks) (ocispec.Descriptor, error) {
desc := ocispec.Descriptor{
MediaType: mediaType,
Digest: godigest.Digest(fmt.Sprintf("sha256:%x", hash.Sum(nil))),
Digest: godigest.Digest(digest),
Size: size,
Annotations: map[string]string{
modelspec.AnnotationFilepath: relPath,
},
}

reader, err = archiver.Tar(filepath.Join(workDir, relPath), workDir)
if err != nil {
return ocispec.Descriptor{}, fmt.Errorf("failed to create tar archive: %w", err)
}

reader = hooks.OnStart(relPath, size, reader)
exist, err := ro.remote.Blobs().Exists(ctx, desc)
if err != nil {
31 changes: 25 additions & 6 deletions pkg/backend/extract.go
Original file line number Diff line number Diff line change
@@ -21,9 +21,11 @@ import (
"context"
"encoding/json"
"fmt"
"io"

"github.com/CloudNativeAI/modctl/pkg/archiver"
"github.com/CloudNativeAI/modctl/pkg/codec"
"github.com/CloudNativeAI/modctl/pkg/storage"
modelspec "github.com/CloudNativeAI/model-spec/specs-go/v1"

ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
@@ -57,22 +59,39 @@ func (b *backend) Extract(ctx context.Context, target string, output string) err
}

// exportModelArtifact exports the target model artifact to the output directory, which will open the artifact and extract to restore the original repo structure.
func exportModelArtifact(ctx context.Context, store storage.Storage, manifest ocispec.Manifest, repo, output string) error {
func exportModelArtifact(ctx context.Context, store storage.Storage, manifest ocispec.Manifest, repo, outputDir string) error {
for _, layer := range manifest.Layers {
// pull the blob from the storage.
reader, err := store.PullBlob(ctx, repo, layer.Digest.String())
if err != nil {
return fmt.Errorf("failed to pull the blob from storage: %w", err)
}

defer reader.Close()

bufferedReader := bufio.NewReaderSize(reader, defaultBufferSize)
// untar the blob to the target directory.
if err := archiver.Untar(bufferedReader, output); err != nil {
return fmt.Errorf("failed to untar the blob to output directory: %w", err)
if err := extractLayer(layer, outputDir, bufferedReader); err != nil {
return fmt.Errorf("failed to extract layer %s: %w", layer.Digest.String(), err)
}
}

return nil
}

// extractLayer extracts the layer to the output directory.
func extractLayer(desc ocispec.Descriptor, outputDir string, reader io.Reader) error {
var filepath string
if desc.Annotations != nil && desc.Annotations[modelspec.AnnotationFilepath] != "" {
filepath = desc.Annotations[modelspec.AnnotationFilepath]
}

codec, err := codec.New(codec.TypeFromMediaType(desc.MediaType))
if err != nil {
return fmt.Errorf("failed to create codec for media type %s: %w", desc.MediaType, err)
}

if err := codec.Decode(reader, outputDir, filepath); err != nil {
return fmt.Errorf("failed to decode the layer %s to output directory: %w", desc.Digest.String(), err)
}

return nil
}
8 changes: 3 additions & 5 deletions pkg/backend/pull.go
Original file line number Diff line number Diff line change
@@ -26,7 +26,6 @@ import (
"net/url"

internalpb "github.com/CloudNativeAI/modctl/internal/pb"
"github.com/CloudNativeAI/modctl/pkg/archiver"
"github.com/CloudNativeAI/modctl/pkg/config"
"github.com/CloudNativeAI/modctl/pkg/storage"

@@ -219,19 +218,18 @@ func pullIfNotExist(ctx context.Context, pb *internalpb.ProgressBar, prompt stri

// pullAndExtractFromRemote pulls the layer and extract it to the target output path directly,
// and will not store the layer to the local storage.
func pullAndExtractFromRemote(ctx context.Context, pb *internalpb.ProgressBar, prompt string, src *remote.Repository, output string, desc ocispec.Descriptor) error {
func pullAndExtractFromRemote(ctx context.Context, pb *internalpb.ProgressBar, prompt string, src *remote.Repository, outputDir string, desc ocispec.Descriptor) error {
// fetch the content from the source storage.
content, err := src.Fetch(ctx, desc)
if err != nil {
return fmt.Errorf("failed to fetch the content from source: %w", err)
}

defer content.Close()

reader := pb.Add(prompt, desc.Digest.String(), desc.Size, content)
if err := archiver.Untar(reader, output); err != nil {
if err := extractLayer(desc, outputDir, reader); err != nil {
pb.Complete(desc.Digest.String(), fmt.Sprintf("Failed to pull and extract blob %s from remote, err: %v", desc.Digest.String(), err))
return fmt.Errorf("failed to untar the blob %s: %w", desc.Digest.String(), err)
return fmt.Errorf("failed to extract the blob %s to output directory: %w", desc.Digest.String(), err)
}

return nil
69 changes: 69 additions & 0 deletions pkg/codec/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2025 The CNAI Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package codec

import (
"fmt"
"io"
"strings"
)

type Type = string

const (
// Raw is the raw codec type.
Raw Type = "raw"

// Tar is the tar codec type.
Tar Type = "tar"
)

// Codec is an interface for encoding and decoding the data.
type Codec interface {
// Encode encodes the target file into a reader.
Encode(targetFilePath, workDirPath string) (io.Reader, error)

// Decode reads the input reader and decodes the data into the output path.
Decode(reader io.Reader, outputDir, filePath string) error
}

func New(codecType Type) (Codec, error) {
switch codecType {
case Raw:
return newRaw(), nil
case Tar:
return newTar(), nil
default:
return nil, fmt.Errorf("unsupported codec type: %s", codecType)
}
}

// TypeFromMediaType returns the codec type from the media type,
// return empty string if not supported.
func TypeFromMediaType(mediaType string) Type {
// If the mediaType ends with ".tar", return Tar.
if strings.HasSuffix(mediaType, ".tar") {
return Tar
}

// If the mediaType ends with ".raw", return Raw.
if strings.HasSuffix(mediaType, ".raw") {
return Raw
}

return ""
}
57 changes: 57 additions & 0 deletions pkg/codec/raw.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2025 The CNAI Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package codec

import (
"io"
"os"
"path/filepath"
)

// raw is a codec that for raw files.
type raw struct{}

// newRaw creates a new raw codec instance.
func newRaw() *raw {
return &raw{}
}

// Encode reads the target file into a reader.
func (r *raw) Encode(targetFilePath, workDirPath string) (io.Reader, error) {
return os.Open(targetFilePath)
}

// Decode reads the input reader and decodes the data into the output path.
func (r *raw) Decode(reader io.Reader, outputDir, filePath string) error {
fullPath := filepath.Join(outputDir, filePath)
dir := filepath.Dir(fullPath)
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}

file, err := os.Create(fullPath)
if err != nil {
return err
}
defer file.Close()

if _, err := io.Copy(file, reader); err != nil {
return err
}

return nil
}
43 changes: 43 additions & 0 deletions pkg/codec/tar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2025 The CNAI Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package codec

import (
"io"

"github.com/CloudNativeAI/modctl/pkg/archiver"
)

// tar is a codec for tar files.
type tar struct{}

// newTar creates a new tar codec instance.
func newTar() *tar {
return &tar{}
}

// Encode tars the target file into a reader.
func (t *tar) Encode(targetFilePath, workDirPath string) (io.Reader, error) {
return archiver.Tar(targetFilePath, workDirPath)
}

// Decode reads the input reader and decodes the data into the output path.
func (t *tar) Decode(reader io.Reader, outputDir, filePath string) error {
// As the file name has been provided in the tar header,
// so we do not care about the filePath.
return archiver.Untar(reader, outputDir)
}