diff --git a/pkg/backend/build/builder.go b/pkg/backend/build/builder.go index 9978c88..ccbcf17 100644 --- a/pkg/backend/build/builder.go +++ b/pkg/backend/build/builder.go @@ -26,8 +26,8 @@ import ( "path/filepath" "time" - "github.com/CloudNativeAI/modctl/pkg/archiver" "github.com/CloudNativeAI/modctl/pkg/backend/build/hooks" + "github.com/CloudNativeAI/modctl/pkg/codec" "github.com/CloudNativeAI/modctl/pkg/modelfile" "github.com/CloudNativeAI/modctl/pkg/storage" @@ -38,11 +38,6 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) -// tarHeaderSize is the size of a tar header. -// TODO: the real size should be calculated based on the actual stream, -// now we use a fixed size in order to avoid extra read costs. -const tarHeaderSize = 512 - // OutputType defines the type of output to generate. type OutputType string @@ -67,7 +62,7 @@ type Builder interface { type OutputStrategy interface { // OutputLayer outputs the layer blob to the storage (local or remote). - OutputLayer(ctx context.Context, mediaType, workDir, relPath string, size int64, reader io.Reader, hooks hooks.Hooks) (ocispec.Descriptor, error) + OutputLayer(ctx context.Context, mediaType, relPath, digest string, size int64, reader io.Reader, hooks hooks.Hooks) (ocispec.Descriptor, error) // OutputConfig outputs the config blob to the storage (local or remote). OutputConfig(ctx context.Context, mediaType, digest string, size int64, reader io.Reader, hooks hooks.Hooks) (ocispec.Descriptor, error) @@ -141,12 +136,40 @@ func (ab *abstractBuilder) BuildLayer(ctx context.Context, mediaType, workDir, p return ocispec.Descriptor{}, fmt.Errorf("failed to get relative path: %w", err) } - reader, err := archiver.Tar(path, workDirPath) + codec, err := codec.New(codec.TypeFromMediaType(mediaType)) + if err != nil { + return ocispec.Descriptor{}, fmt.Errorf("failed to create codec: %w", err) + } + + // Encode the content by codec depends on the media type. + reader, err := codec.Encode(path, workDirPath) + if err != nil { + return ocispec.Descriptor{}, fmt.Errorf("failed to encode file: %w", err) + } + + // Calculate the digest of the encoded content. + hash := sha256.New() + size, err := io.Copy(hash, reader) if err != nil { - return ocispec.Descriptor{}, fmt.Errorf("failed to tar file: %w", err) + return ocispec.Descriptor{}, fmt.Errorf("failed to copy content to hash: %w", err) + } + + digest := fmt.Sprintf("sha256:%x", hash.Sum(nil)) + + // Seek the reader to the beginning if supported, + // otherwise we needs to re-encode the content again. + if seeker, ok := reader.(io.ReadSeeker); ok { + if _, err := seeker.Seek(0, io.SeekStart); err != nil { + return ocispec.Descriptor{}, fmt.Errorf("failed to seek reader: %w", err) + } + } else { + reader, err = codec.Encode(path, workDirPath) + if err != nil { + return ocispec.Descriptor{}, fmt.Errorf("failed to encode file: %w", err) + } } - return ab.strategy.OutputLayer(ctx, mediaType, workDir, relPath, info.Size()+tarHeaderSize, reader, hooks) + return ab.strategy.OutputLayer(ctx, mediaType, relPath, digest, size, reader, hooks) } func (ab *abstractBuilder) BuildConfig(ctx context.Context, layers []ocispec.Descriptor, hooks hooks.Hooks) (ocispec.Descriptor, error) { diff --git a/pkg/backend/build/builder_test.go b/pkg/backend/build/builder_test.go index b5e4c64..52ce92e 100644 --- a/pkg/backend/build/builder_test.go +++ b/pkg/backend/build/builder_test.go @@ -116,26 +116,26 @@ func (s *BuilderTestSuite) TestNewBuilder() { func (s *BuilderTestSuite) TestBuildLayer() { s.Run("successful build layer", func() { expectedDesc := ocispec.Descriptor{ - MediaType: "test/media-type", + MediaType: "test/media-type.tar", Digest: "sha256:test", Size: 100, } - s.mockOutputStrategy.On("OutputLayer", mock.Anything, "test/media-type", s.tempDir, "test-file.txt", mock.AnythingOfType("int64"), mock.AnythingOfType("*io.PipeReader"), mock.Anything). + s.mockOutputStrategy.On("OutputLayer", mock.Anything, "test/media-type.tar", "test-file.txt", mock.AnythingOfType("string"), mock.AnythingOfType("int64"), mock.AnythingOfType("*io.PipeReader"), mock.Anything). Return(expectedDesc, nil) - desc, err := s.builder.BuildLayer(context.Background(), "test/media-type", s.tempDir, s.tempFile, hooks.NewHooks()) + desc, err := s.builder.BuildLayer(context.Background(), "test/media-type.tar", s.tempDir, s.tempFile, hooks.NewHooks()) s.NoError(err) s.Equal(expectedDesc, desc) }) s.Run("file not found", func() { - _, err := s.builder.BuildLayer(context.Background(), "test/media-type", s.tempDir, filepath.Join(s.tempDir, "non-existent.txt"), hooks.NewHooks()) + _, err := s.builder.BuildLayer(context.Background(), "test/media-type.tar", s.tempDir, filepath.Join(s.tempDir, "non-existent.txt"), hooks.NewHooks()) s.Error(err) }) s.Run("directory not supported", func() { - _, err := s.builder.BuildLayer(context.Background(), "test/media-type", s.tempDir, s.tempDir, hooks.NewHooks()) + _, err := s.builder.BuildLayer(context.Background(), "test/media-type.tar", s.tempDir, s.tempDir, hooks.NewHooks()) s.Error(err) s.True(strings.Contains(err.Error(), "is a directory and not supported yet")) }) diff --git a/pkg/backend/build/local.go b/pkg/backend/build/local.go index 0a692af..71cf84d 100644 --- a/pkg/backend/build/local.go +++ b/pkg/backend/build/local.go @@ -46,7 +46,7 @@ type localOutput struct { } // OutputLayer outputs the layer blob to the local storage. -func (lo *localOutput) OutputLayer(ctx context.Context, mediaType, workDir, relPath string, size int64, reader io.Reader, hooks hooks.Hooks) (ocispec.Descriptor, error) { +func (lo *localOutput) OutputLayer(ctx context.Context, mediaType, relPath, digest string, size int64, reader io.Reader, hooks hooks.Hooks) (ocispec.Descriptor, error) { reader = hooks.OnStart(relPath, size, reader) digest, size, err := lo.store.PushBlob(ctx, lo.repo, reader, ocispec.Descriptor{}) if err != nil { diff --git a/pkg/backend/build/local_test.go b/pkg/backend/build/local_test.go index 1233c91..2f2573b 100644 --- a/pkg/backend/build/local_test.go +++ b/pkg/backend/build/local_test.go @@ -72,7 +72,7 @@ func (s *LocalOutputTestSuite) TestOutputLayer() { s.mockStorage.On("PushBlob", s.ctx, "test-repo", mock.Anything, ocispec.Descriptor{}). Return(expectedDigest, expectedSize, nil).Once() - desc, err := s.localOutput.OutputLayer(s.ctx, "test/mediatype", "/work", "test-file.txt", expectedSize, reader, hooks.NewHooks()) + desc, err := s.localOutput.OutputLayer(s.ctx, "test/mediatype", "test-file.txt", expectedDigest, expectedSize, reader, hooks.NewHooks()) s.NoError(err) s.Equal("test/mediatype", desc.MediaType) diff --git a/pkg/backend/build/remote.go b/pkg/backend/build/remote.go index dfff2d5..37ed7dd 100644 --- a/pkg/backend/build/remote.go +++ b/pkg/backend/build/remote.go @@ -22,13 +22,10 @@ import ( "fmt" "io" "net/http" - "path/filepath" - "github.com/CloudNativeAI/modctl/pkg/archiver" "github.com/CloudNativeAI/modctl/pkg/backend/build/hooks" modelspec "github.com/CloudNativeAI/model-spec/specs-go/v1" - sha256 "github.com/minio/sha256-simd" godigest "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "oras.land/oras-go/v2/registry/remote" @@ -80,27 +77,16 @@ type remoteOutput struct { } // OutputLayer outputs the layer blob to the remote storage. -func (ro *remoteOutput) OutputLayer(ctx context.Context, mediaType, workDir, relPath string, size int64, reader io.Reader, hooks hooks.Hooks) (ocispec.Descriptor, error) { - hash := sha256.New() - size, err := io.Copy(hash, reader) - if err != nil { - return ocispec.Descriptor{}, fmt.Errorf("failed to copy layer to hash: %w", err) - } - +func (ro *remoteOutput) OutputLayer(ctx context.Context, mediaType, relPath, digest string, size int64, reader io.Reader, hooks hooks.Hooks) (ocispec.Descriptor, error) { desc := ocispec.Descriptor{ MediaType: mediaType, - Digest: godigest.Digest(fmt.Sprintf("sha256:%x", hash.Sum(nil))), + Digest: godigest.Digest(digest), Size: size, Annotations: map[string]string{ modelspec.AnnotationFilepath: relPath, }, } - reader, err = archiver.Tar(filepath.Join(workDir, relPath), workDir) - if err != nil { - return ocispec.Descriptor{}, fmt.Errorf("failed to create tar archive: %w", err) - } - reader = hooks.OnStart(relPath, size, reader) exist, err := ro.remote.Blobs().Exists(ctx, desc) if err != nil { diff --git a/pkg/backend/extract.go b/pkg/backend/extract.go index c369461..6d5fbfd 100644 --- a/pkg/backend/extract.go +++ b/pkg/backend/extract.go @@ -21,9 +21,11 @@ import ( "context" "encoding/json" "fmt" + "io" - "github.com/CloudNativeAI/modctl/pkg/archiver" + "github.com/CloudNativeAI/modctl/pkg/codec" "github.com/CloudNativeAI/modctl/pkg/storage" + modelspec "github.com/CloudNativeAI/model-spec/specs-go/v1" ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) @@ -57,22 +59,39 @@ func (b *backend) Extract(ctx context.Context, target string, output string) err } // exportModelArtifact exports the target model artifact to the output directory, which will open the artifact and extract to restore the original repo structure. -func exportModelArtifact(ctx context.Context, store storage.Storage, manifest ocispec.Manifest, repo, output string) error { +func exportModelArtifact(ctx context.Context, store storage.Storage, manifest ocispec.Manifest, repo, outputDir string) error { for _, layer := range manifest.Layers { // pull the blob from the storage. reader, err := store.PullBlob(ctx, repo, layer.Digest.String()) if err != nil { return fmt.Errorf("failed to pull the blob from storage: %w", err) } - defer reader.Close() bufferedReader := bufio.NewReaderSize(reader, defaultBufferSize) - // untar the blob to the target directory. - if err := archiver.Untar(bufferedReader, output); err != nil { - return fmt.Errorf("failed to untar the blob to output directory: %w", err) + if err := extractLayer(layer, outputDir, bufferedReader); err != nil { + return fmt.Errorf("failed to extract layer %s: %w", layer.Digest.String(), err) } } return nil } + +// extractLayer extracts the layer to the output directory. +func extractLayer(desc ocispec.Descriptor, outputDir string, reader io.Reader) error { + var filepath string + if desc.Annotations != nil && desc.Annotations[modelspec.AnnotationFilepath] != "" { + filepath = desc.Annotations[modelspec.AnnotationFilepath] + } + + codec, err := codec.New(codec.TypeFromMediaType(desc.MediaType)) + if err != nil { + return fmt.Errorf("failed to create codec for media type %s: %w", desc.MediaType, err) + } + + if err := codec.Decode(reader, outputDir, filepath); err != nil { + return fmt.Errorf("failed to decode the layer %s to output directory: %w", desc.Digest.String(), err) + } + + return nil +} diff --git a/pkg/backend/pull.go b/pkg/backend/pull.go index 5f58957..ff71f59 100644 --- a/pkg/backend/pull.go +++ b/pkg/backend/pull.go @@ -26,7 +26,6 @@ import ( "net/url" internalpb "github.com/CloudNativeAI/modctl/internal/pb" - "github.com/CloudNativeAI/modctl/pkg/archiver" "github.com/CloudNativeAI/modctl/pkg/config" "github.com/CloudNativeAI/modctl/pkg/storage" @@ -219,19 +218,18 @@ func pullIfNotExist(ctx context.Context, pb *internalpb.ProgressBar, prompt stri // pullAndExtractFromRemote pulls the layer and extract it to the target output path directly, // and will not store the layer to the local storage. -func pullAndExtractFromRemote(ctx context.Context, pb *internalpb.ProgressBar, prompt string, src *remote.Repository, output string, desc ocispec.Descriptor) error { +func pullAndExtractFromRemote(ctx context.Context, pb *internalpb.ProgressBar, prompt string, src *remote.Repository, outputDir string, desc ocispec.Descriptor) error { // fetch the content from the source storage. content, err := src.Fetch(ctx, desc) if err != nil { return fmt.Errorf("failed to fetch the content from source: %w", err) } - defer content.Close() reader := pb.Add(prompt, desc.Digest.String(), desc.Size, content) - if err := archiver.Untar(reader, output); err != nil { + if err := extractLayer(desc, outputDir, reader); err != nil { pb.Complete(desc.Digest.String(), fmt.Sprintf("Failed to pull and extract blob %s from remote, err: %v", desc.Digest.String(), err)) - return fmt.Errorf("failed to untar the blob %s: %w", desc.Digest.String(), err) + return fmt.Errorf("failed to extract the blob %s to output directory: %w", desc.Digest.String(), err) } return nil diff --git a/pkg/codec/codec.go b/pkg/codec/codec.go new file mode 100644 index 0000000..afe1ae9 --- /dev/null +++ b/pkg/codec/codec.go @@ -0,0 +1,69 @@ +/* + * Copyright 2025 The CNAI Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package codec + +import ( + "fmt" + "io" + "strings" +) + +type Type = string + +const ( + // Raw is the raw codec type. + Raw Type = "raw" + + // Tar is the tar codec type. + Tar Type = "tar" +) + +// Codec is an interface for encoding and decoding the data. +type Codec interface { + // Encode encodes the target file into a reader. + Encode(targetFilePath, workDirPath string) (io.Reader, error) + + // Decode reads the input reader and decodes the data into the output path. + Decode(reader io.Reader, outputDir, filePath string) error +} + +func New(codecType Type) (Codec, error) { + switch codecType { + case Raw: + return newRaw(), nil + case Tar: + return newTar(), nil + default: + return nil, fmt.Errorf("unsupported codec type: %s", codecType) + } +} + +// TypeFromMediaType returns the codec type from the media type, +// return empty string if not supported. +func TypeFromMediaType(mediaType string) Type { + // If the mediaType ends with ".tar", return Tar. + if strings.HasSuffix(mediaType, ".tar") { + return Tar + } + + // If the mediaType ends with ".raw", return Raw. + if strings.HasSuffix(mediaType, ".raw") { + return Raw + } + + return "" +} diff --git a/pkg/codec/raw.go b/pkg/codec/raw.go new file mode 100644 index 0000000..0618377 --- /dev/null +++ b/pkg/codec/raw.go @@ -0,0 +1,57 @@ +/* + * Copyright 2025 The CNAI Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package codec + +import ( + "io" + "os" + "path/filepath" +) + +// raw is a codec that for raw files. +type raw struct{} + +// newRaw creates a new raw codec instance. +func newRaw() *raw { + return &raw{} +} + +// Encode reads the target file into a reader. +func (r *raw) Encode(targetFilePath, workDirPath string) (io.Reader, error) { + return os.Open(targetFilePath) +} + +// Decode reads the input reader and decodes the data into the output path. +func (r *raw) Decode(reader io.Reader, outputDir, filePath string) error { + fullPath := filepath.Join(outputDir, filePath) + dir := filepath.Dir(fullPath) + if err := os.MkdirAll(dir, 0755); err != nil { + return err + } + + file, err := os.Create(fullPath) + if err != nil { + return err + } + defer file.Close() + + if _, err := io.Copy(file, reader); err != nil { + return err + } + + return nil +} diff --git a/pkg/codec/tar.go b/pkg/codec/tar.go new file mode 100644 index 0000000..2d1320a --- /dev/null +++ b/pkg/codec/tar.go @@ -0,0 +1,43 @@ +/* + * Copyright 2025 The CNAI Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package codec + +import ( + "io" + + "github.com/CloudNativeAI/modctl/pkg/archiver" +) + +// tar is a codec for tar files. +type tar struct{} + +// newTar creates a new tar codec instance. +func newTar() *tar { + return &tar{} +} + +// Encode tars the target file into a reader. +func (t *tar) Encode(targetFilePath, workDirPath string) (io.Reader, error) { + return archiver.Tar(targetFilePath, workDirPath) +} + +// Decode reads the input reader and decodes the data into the output path. +func (t *tar) Decode(reader io.Reader, outputDir, filePath string) error { + // As the file name has been provided in the tar header, + // so we do not care about the filePath. + return archiver.Untar(reader, outputDir) +}