Skip to content

Commit 23d3091

Browse files
authored
feat: add the codec support (#127)
Signed-off-by: chlins <[email protected]>
1 parent 6c0bb24 commit 23d3091

File tree

10 files changed

+239
-44
lines changed

10 files changed

+239
-44
lines changed

pkg/backend/build/builder.go

+33-10
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import (
2626
"path/filepath"
2727
"time"
2828

29-
"github.com/CloudNativeAI/modctl/pkg/archiver"
3029
"github.com/CloudNativeAI/modctl/pkg/backend/build/hooks"
30+
"github.com/CloudNativeAI/modctl/pkg/codec"
3131
"github.com/CloudNativeAI/modctl/pkg/modelfile"
3232
"github.com/CloudNativeAI/modctl/pkg/storage"
3333

@@ -38,11 +38,6 @@ import (
3838
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
3939
)
4040

41-
// tarHeaderSize is the size of a tar header.
42-
// TODO: the real size should be calculated based on the actual stream,
43-
// now we use a fixed size in order to avoid extra read costs.
44-
const tarHeaderSize = 512
45-
4641
// OutputType defines the type of output to generate.
4742
type OutputType string
4843

@@ -67,7 +62,7 @@ type Builder interface {
6762

6863
type OutputStrategy interface {
6964
// OutputLayer outputs the layer blob to the storage (local or remote).
70-
OutputLayer(ctx context.Context, mediaType, workDir, relPath string, size int64, reader io.Reader, hooks hooks.Hooks) (ocispec.Descriptor, error)
65+
OutputLayer(ctx context.Context, mediaType, relPath, digest string, size int64, reader io.Reader, hooks hooks.Hooks) (ocispec.Descriptor, error)
7166

7267
// OutputConfig outputs the config blob to the storage (local or remote).
7368
OutputConfig(ctx context.Context, mediaType, digest string, size int64, reader io.Reader, hooks hooks.Hooks) (ocispec.Descriptor, error)
@@ -141,12 +136,40 @@ func (ab *abstractBuilder) BuildLayer(ctx context.Context, mediaType, workDir, p
141136
return ocispec.Descriptor{}, fmt.Errorf("failed to get relative path: %w", err)
142137
}
143138

144-
reader, err := archiver.Tar(path, workDirPath)
139+
codec, err := codec.New(codec.TypeFromMediaType(mediaType))
140+
if err != nil {
141+
return ocispec.Descriptor{}, fmt.Errorf("failed to create codec: %w", err)
142+
}
143+
144+
// Encode the content by codec depends on the media type.
145+
reader, err := codec.Encode(path, workDirPath)
146+
if err != nil {
147+
return ocispec.Descriptor{}, fmt.Errorf("failed to encode file: %w", err)
148+
}
149+
150+
// Calculate the digest of the encoded content.
151+
hash := sha256.New()
152+
size, err := io.Copy(hash, reader)
145153
if err != nil {
146-
return ocispec.Descriptor{}, fmt.Errorf("failed to tar file: %w", err)
154+
return ocispec.Descriptor{}, fmt.Errorf("failed to copy content to hash: %w", err)
155+
}
156+
157+
digest := fmt.Sprintf("sha256:%x", hash.Sum(nil))
158+
159+
// Seek the reader to the beginning if supported,
160+
// otherwise we needs to re-encode the content again.
161+
if seeker, ok := reader.(io.ReadSeeker); ok {
162+
if _, err := seeker.Seek(0, io.SeekStart); err != nil {
163+
return ocispec.Descriptor{}, fmt.Errorf("failed to seek reader: %w", err)
164+
}
165+
} else {
166+
reader, err = codec.Encode(path, workDirPath)
167+
if err != nil {
168+
return ocispec.Descriptor{}, fmt.Errorf("failed to encode file: %w", err)
169+
}
147170
}
148171

149-
return ab.strategy.OutputLayer(ctx, mediaType, workDir, relPath, info.Size()+tarHeaderSize, reader, hooks)
172+
return ab.strategy.OutputLayer(ctx, mediaType, relPath, digest, size, reader, hooks)
150173
}
151174

152175
func (ab *abstractBuilder) BuildConfig(ctx context.Context, layers []ocispec.Descriptor, hooks hooks.Hooks) (ocispec.Descriptor, error) {

pkg/backend/build/builder_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -116,26 +116,26 @@ func (s *BuilderTestSuite) TestNewBuilder() {
116116
func (s *BuilderTestSuite) TestBuildLayer() {
117117
s.Run("successful build layer", func() {
118118
expectedDesc := ocispec.Descriptor{
119-
MediaType: "test/media-type",
119+
MediaType: "test/media-type.tar",
120120
Digest: "sha256:test",
121121
Size: 100,
122122
}
123123

124-
s.mockOutputStrategy.On("OutputLayer", mock.Anything, "test/media-type", s.tempDir, "test-file.txt", mock.AnythingOfType("int64"), mock.AnythingOfType("*io.PipeReader"), mock.Anything).
124+
s.mockOutputStrategy.On("OutputLayer", mock.Anything, "test/media-type.tar", "test-file.txt", mock.AnythingOfType("string"), mock.AnythingOfType("int64"), mock.AnythingOfType("*io.PipeReader"), mock.Anything).
125125
Return(expectedDesc, nil)
126126

127-
desc, err := s.builder.BuildLayer(context.Background(), "test/media-type", s.tempDir, s.tempFile, hooks.NewHooks())
127+
desc, err := s.builder.BuildLayer(context.Background(), "test/media-type.tar", s.tempDir, s.tempFile, hooks.NewHooks())
128128
s.NoError(err)
129129
s.Equal(expectedDesc, desc)
130130
})
131131

132132
s.Run("file not found", func() {
133-
_, err := s.builder.BuildLayer(context.Background(), "test/media-type", s.tempDir, filepath.Join(s.tempDir, "non-existent.txt"), hooks.NewHooks())
133+
_, err := s.builder.BuildLayer(context.Background(), "test/media-type.tar", s.tempDir, filepath.Join(s.tempDir, "non-existent.txt"), hooks.NewHooks())
134134
s.Error(err)
135135
})
136136

137137
s.Run("directory not supported", func() {
138-
_, err := s.builder.BuildLayer(context.Background(), "test/media-type", s.tempDir, s.tempDir, hooks.NewHooks())
138+
_, err := s.builder.BuildLayer(context.Background(), "test/media-type.tar", s.tempDir, s.tempDir, hooks.NewHooks())
139139
s.Error(err)
140140
s.True(strings.Contains(err.Error(), "is a directory and not supported yet"))
141141
})

pkg/backend/build/local.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type localOutput struct {
4646
}
4747

4848
// OutputLayer outputs the layer blob to the local storage.
49-
func (lo *localOutput) OutputLayer(ctx context.Context, mediaType, workDir, relPath string, size int64, reader io.Reader, hooks hooks.Hooks) (ocispec.Descriptor, error) {
49+
func (lo *localOutput) OutputLayer(ctx context.Context, mediaType, relPath, digest string, size int64, reader io.Reader, hooks hooks.Hooks) (ocispec.Descriptor, error) {
5050
reader = hooks.OnStart(relPath, size, reader)
5151
digest, size, err := lo.store.PushBlob(ctx, lo.repo, reader, ocispec.Descriptor{})
5252
if err != nil {

pkg/backend/build/local_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func (s *LocalOutputTestSuite) TestOutputLayer() {
7272
s.mockStorage.On("PushBlob", s.ctx, "test-repo", mock.Anything, ocispec.Descriptor{}).
7373
Return(expectedDigest, expectedSize, nil).Once()
7474

75-
desc, err := s.localOutput.OutputLayer(s.ctx, "test/mediatype", "/work", "test-file.txt", expectedSize, reader, hooks.NewHooks())
75+
desc, err := s.localOutput.OutputLayer(s.ctx, "test/mediatype", "test-file.txt", expectedDigest, expectedSize, reader, hooks.NewHooks())
7676

7777
s.NoError(err)
7878
s.Equal("test/mediatype", desc.MediaType)

pkg/backend/build/remote.go

+2-16
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,10 @@ import (
2222
"fmt"
2323
"io"
2424
"net/http"
25-
"path/filepath"
2625

27-
"github.com/CloudNativeAI/modctl/pkg/archiver"
2826
"github.com/CloudNativeAI/modctl/pkg/backend/build/hooks"
2927

3028
modelspec "github.com/CloudNativeAI/model-spec/specs-go/v1"
31-
sha256 "github.com/minio/sha256-simd"
3229
godigest "github.com/opencontainers/go-digest"
3330
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
3431
"oras.land/oras-go/v2/registry/remote"
@@ -80,27 +77,16 @@ type remoteOutput struct {
8077
}
8178

8279
// OutputLayer outputs the layer blob to the remote storage.
83-
func (ro *remoteOutput) OutputLayer(ctx context.Context, mediaType, workDir, relPath string, size int64, reader io.Reader, hooks hooks.Hooks) (ocispec.Descriptor, error) {
84-
hash := sha256.New()
85-
size, err := io.Copy(hash, reader)
86-
if err != nil {
87-
return ocispec.Descriptor{}, fmt.Errorf("failed to copy layer to hash: %w", err)
88-
}
89-
80+
func (ro *remoteOutput) OutputLayer(ctx context.Context, mediaType, relPath, digest string, size int64, reader io.Reader, hooks hooks.Hooks) (ocispec.Descriptor, error) {
9081
desc := ocispec.Descriptor{
9182
MediaType: mediaType,
92-
Digest: godigest.Digest(fmt.Sprintf("sha256:%x", hash.Sum(nil))),
83+
Digest: godigest.Digest(digest),
9384
Size: size,
9485
Annotations: map[string]string{
9586
modelspec.AnnotationFilepath: relPath,
9687
},
9788
}
9889

99-
reader, err = archiver.Tar(filepath.Join(workDir, relPath), workDir)
100-
if err != nil {
101-
return ocispec.Descriptor{}, fmt.Errorf("failed to create tar archive: %w", err)
102-
}
103-
10490
reader = hooks.OnStart(relPath, size, reader)
10591
exist, err := ro.remote.Blobs().Exists(ctx, desc)
10692
if err != nil {

pkg/backend/extract.go

+25-6
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ import (
2121
"context"
2222
"encoding/json"
2323
"fmt"
24+
"io"
2425

25-
"github.com/CloudNativeAI/modctl/pkg/archiver"
26+
"github.com/CloudNativeAI/modctl/pkg/codec"
2627
"github.com/CloudNativeAI/modctl/pkg/storage"
28+
modelspec "github.com/CloudNativeAI/model-spec/specs-go/v1"
2729

2830
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
2931
)
@@ -57,22 +59,39 @@ func (b *backend) Extract(ctx context.Context, target string, output string) err
5759
}
5860

5961
// exportModelArtifact exports the target model artifact to the output directory, which will open the artifact and extract to restore the original repo structure.
60-
func exportModelArtifact(ctx context.Context, store storage.Storage, manifest ocispec.Manifest, repo, output string) error {
62+
func exportModelArtifact(ctx context.Context, store storage.Storage, manifest ocispec.Manifest, repo, outputDir string) error {
6163
for _, layer := range manifest.Layers {
6264
// pull the blob from the storage.
6365
reader, err := store.PullBlob(ctx, repo, layer.Digest.String())
6466
if err != nil {
6567
return fmt.Errorf("failed to pull the blob from storage: %w", err)
6668
}
67-
6869
defer reader.Close()
6970

7071
bufferedReader := bufio.NewReaderSize(reader, defaultBufferSize)
71-
// untar the blob to the target directory.
72-
if err := archiver.Untar(bufferedReader, output); err != nil {
73-
return fmt.Errorf("failed to untar the blob to output directory: %w", err)
72+
if err := extractLayer(layer, outputDir, bufferedReader); err != nil {
73+
return fmt.Errorf("failed to extract layer %s: %w", layer.Digest.String(), err)
7474
}
7575
}
7676

7777
return nil
7878
}
79+
80+
// extractLayer extracts the layer to the output directory.
81+
func extractLayer(desc ocispec.Descriptor, outputDir string, reader io.Reader) error {
82+
var filepath string
83+
if desc.Annotations != nil && desc.Annotations[modelspec.AnnotationFilepath] != "" {
84+
filepath = desc.Annotations[modelspec.AnnotationFilepath]
85+
}
86+
87+
codec, err := codec.New(codec.TypeFromMediaType(desc.MediaType))
88+
if err != nil {
89+
return fmt.Errorf("failed to create codec for media type %s: %w", desc.MediaType, err)
90+
}
91+
92+
if err := codec.Decode(reader, outputDir, filepath); err != nil {
93+
return fmt.Errorf("failed to decode the layer %s to output directory: %w", desc.Digest.String(), err)
94+
}
95+
96+
return nil
97+
}

pkg/backend/pull.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"net/url"
2727

2828
internalpb "github.com/CloudNativeAI/modctl/internal/pb"
29-
"github.com/CloudNativeAI/modctl/pkg/archiver"
3029
"github.com/CloudNativeAI/modctl/pkg/config"
3130
"github.com/CloudNativeAI/modctl/pkg/storage"
3231

@@ -219,19 +218,18 @@ func pullIfNotExist(ctx context.Context, pb *internalpb.ProgressBar, prompt stri
219218

220219
// pullAndExtractFromRemote pulls the layer and extract it to the target output path directly,
221220
// and will not store the layer to the local storage.
222-
func pullAndExtractFromRemote(ctx context.Context, pb *internalpb.ProgressBar, prompt string, src *remote.Repository, output string, desc ocispec.Descriptor) error {
221+
func pullAndExtractFromRemote(ctx context.Context, pb *internalpb.ProgressBar, prompt string, src *remote.Repository, outputDir string, desc ocispec.Descriptor) error {
223222
// fetch the content from the source storage.
224223
content, err := src.Fetch(ctx, desc)
225224
if err != nil {
226225
return fmt.Errorf("failed to fetch the content from source: %w", err)
227226
}
228-
229227
defer content.Close()
230228

231229
reader := pb.Add(prompt, desc.Digest.String(), desc.Size, content)
232-
if err := archiver.Untar(reader, output); err != nil {
230+
if err := extractLayer(desc, outputDir, reader); err != nil {
233231
pb.Complete(desc.Digest.String(), fmt.Sprintf("Failed to pull and extract blob %s from remote, err: %v", desc.Digest.String(), err))
234-
return fmt.Errorf("failed to untar the blob %s: %w", desc.Digest.String(), err)
232+
return fmt.Errorf("failed to extract the blob %s to output directory: %w", desc.Digest.String(), err)
235233
}
236234

237235
return nil

pkg/codec/codec.go

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2025 The CNAI Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package codec
18+
19+
import (
20+
"fmt"
21+
"io"
22+
"strings"
23+
)
24+
25+
type Type = string
26+
27+
const (
28+
// Raw is the raw codec type.
29+
Raw Type = "raw"
30+
31+
// Tar is the tar codec type.
32+
Tar Type = "tar"
33+
)
34+
35+
// Codec is an interface for encoding and decoding the data.
36+
type Codec interface {
37+
// Encode encodes the target file into a reader.
38+
Encode(targetFilePath, workDirPath string) (io.Reader, error)
39+
40+
// Decode reads the input reader and decodes the data into the output path.
41+
Decode(reader io.Reader, outputDir, filePath string) error
42+
}
43+
44+
func New(codecType Type) (Codec, error) {
45+
switch codecType {
46+
case Raw:
47+
return newRaw(), nil
48+
case Tar:
49+
return newTar(), nil
50+
default:
51+
return nil, fmt.Errorf("unsupported codec type: %s", codecType)
52+
}
53+
}
54+
55+
// TypeFromMediaType returns the codec type from the media type,
56+
// return empty string if not supported.
57+
func TypeFromMediaType(mediaType string) Type {
58+
// If the mediaType ends with ".tar", return Tar.
59+
if strings.HasSuffix(mediaType, ".tar") {
60+
return Tar
61+
}
62+
63+
// If the mediaType ends with ".raw", return Raw.
64+
if strings.HasSuffix(mediaType, ".raw") {
65+
return Raw
66+
}
67+
68+
return ""
69+
}

pkg/codec/raw.go

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2025 The CNAI Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package codec
18+
19+
import (
20+
"io"
21+
"os"
22+
"path/filepath"
23+
)
24+
25+
// raw is a codec that for raw files.
26+
type raw struct{}
27+
28+
// newRaw creates a new raw codec instance.
29+
func newRaw() *raw {
30+
return &raw{}
31+
}
32+
33+
// Encode reads the target file into a reader.
34+
func (r *raw) Encode(targetFilePath, workDirPath string) (io.Reader, error) {
35+
return os.Open(targetFilePath)
36+
}
37+
38+
// Decode reads the input reader and decodes the data into the output path.
39+
func (r *raw) Decode(reader io.Reader, outputDir, filePath string) error {
40+
fullPath := filepath.Join(outputDir, filePath)
41+
dir := filepath.Dir(fullPath)
42+
if err := os.MkdirAll(dir, 0755); err != nil {
43+
return err
44+
}
45+
46+
file, err := os.Create(fullPath)
47+
if err != nil {
48+
return err
49+
}
50+
defer file.Close()
51+
52+
if _, err := io.Copy(file, reader); err != nil {
53+
return err
54+
}
55+
56+
return nil
57+
}

0 commit comments

Comments
 (0)