From 519029ceded0e6ec6fa39b6830d5596260dfc448 Mon Sep 17 00:00:00 2001 From: zwtop Date: Wed, 3 Sep 2025 11:42:23 +0800 Subject: [PATCH] feat: blob fetch download interface Signed-off-by: zwtop --- remotes/file.go | 81 +++++------------------- remotes/file_download.go | 99 ++++++++++++++++++++++++++++++ remotes/file_download_test.go | 62 +++++++++++++++++++ remotes/testdata/example-zstd.tar | Bin 0 -> 10240 bytes 4 files changed, 178 insertions(+), 64 deletions(-) create mode 100644 remotes/file_download.go create mode 100644 remotes/file_download_test.go create mode 100644 remotes/testdata/example-zstd.tar diff --git a/remotes/file.go b/remotes/file.go index c38fedd..72a69b5 100644 --- a/remotes/file.go +++ b/remotes/file.go @@ -18,13 +18,10 @@ package remotes import ( "archive/tar" - "compress/gzip" "context" "encoding/json" "fmt" "io" - "net/url" - "path/filepath" "sort" "github.com/containerd/containerd" @@ -33,7 +30,6 @@ import ( "github.com/docker/distribution/reference" ptypes "github.com/gogo/protobuf/types" "github.com/hashicorp/go-version" - "github.com/klauspost/compress/zstd" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) @@ -63,7 +59,16 @@ var ErrNotFound = errors.New("not found") // NewFileProvider create a new file provider func NewFileProvider(file File) StoreProvider { return &fileProvider{ - file: file, + file: file, + downloader: NewDownloadGZIPFromZSDT(file), + } +} + +// NewFileProviderWithDownloader create a new file provider use custom Downloader +func NewFileProviderWithDownloader(file File, downloader Downloader) StoreProvider { + return &fileProvider{ + file: file, + downloader: downloader, } } @@ -71,8 +76,9 @@ func NewFileProvider(file File) StoreProvider { // for now only supports oci image layout 1.0.0: // - https://github.com/opencontainers/image-spec/blob/main/image-layout.md type fileProvider struct { - file File - client *containerd.Client + file File + downloader Downloader + client *containerd.Client } func (p *fileProvider) Name() string { return "file provider" } @@ -96,40 +102,11 @@ func (p *fileProvider) Fetcher(ctx context.Context, ref string) (remotes.Fetcher func (p *fileProvider) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) { fileLocation := fmt.Sprintf("blobs/%s/%s", desc.Digest.Algorithm(), desc.Digest.Encoded()) - reader, err := LookupFileInTARFile(p.file, fileLocation).Open() - if err == nil { - return reader, nil - } - if err != nil { - // NOTE: to reproducible generate gzip from zstd, gzip header should - // always be empty, and the default compress level should be used - if !errors.Is(err, ErrNotFound) || len(desc.URLs) == 0 || desc.MediaType != ocispec.MediaTypeImageLayerGzip { - return nil, err - } - } - - for _, downloadURL := range desc.URLs { - u, err := url.ParseRequestURI(downloadURL) - if err != nil { - continue - } - - switch u.Scheme { // //nolint: gocritic - case URISchemeZstd: - reader, err = LookupFileInTARFile(p.file, filepath.Join("blobs", u.Path)).Open() - if err != nil { - return nil, fmt.Errorf("read blobs %s: %w", u.Path, err) - } - greader, err := GzipReaderFromZstdUpstream(reader) - if err != nil { - _ = reader.Close() - return nil, fmt.Errorf("read gzip from zstd: %w", err) - } - return greader, nil - } + r, err := LookupFileInTARFile(p.file, fileLocation).Open() + if err == nil || !errors.Is(err, ErrNotFound) || len(desc.URLs) == 0 { + return r, err } - - return nil, fmt.Errorf("digest %s not found: %w", desc.Digest, ErrNotFound) + return DownloadFetch(ctx, p.downloader, desc) } func (p *fileProvider) Get(ctx context.Context, ref string) (images.Image, error) { @@ -298,27 +275,3 @@ func newFileInTARFile(r io.ReadCloser, h *tar.Header, tr *tar.Reader) (io.ReadCl SeekReaderAt: io.NewSectionReader(seekReaderAt, offset, h.Size), }, nil } - -func GzipReaderFromZstdUpstream(upstream io.ReadCloser) (io.ReadCloser, error) { - zreader, err := zstd.NewReader(upstream) - if err != nil { - return nil, fmt.Errorf("open zstd stream: %w", err) - } - - pr, pw := io.Pipe() - greader := gzip.NewWriter(pw) - go func() { - _, err = io.Copy(greader, zreader) - zreader.Close() - greader.Close() - _ = pw.CloseWithError(err) - }() - - return struct { - io.Reader - io.Closer - }{ - Reader: pr, - Closer: multiCloser(pr, upstream), - }, nil -} diff --git a/remotes/file_download.go b/remotes/file_download.go new file mode 100644 index 0000000..10772f5 --- /dev/null +++ b/remotes/file_download.go @@ -0,0 +1,99 @@ +/* +Copyright 2025 The Everoute Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remotes + +import ( + "compress/gzip" + "context" + "fmt" + "io" + "net/url" + "path/filepath" + + "github.com/klauspost/compress/zstd" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +type Downloader interface { + Support(ctx context.Context, desc ocispec.Descriptor, downloadURL string) bool + Download(ctx context.Context, desc ocispec.Descriptor, downloadURL string) (io.ReadCloser, error) +} + +func DownloadFetch(ctx context.Context, d Downloader, desc ocispec.Descriptor) (io.ReadCloser, error) { + for _, downloadURL := range desc.URLs { + if d.Support(ctx, desc, downloadURL) { + return d.Download(ctx, desc, downloadURL) + } + } + return nil, fmt.Errorf("digest %s not found: %w", desc.Digest, ErrNotFound) +} + +func NewDownloadGZIPFromZSDT(f File) Downloader { + return &downloadGZIPFromZSDT{file: f} +} + +type downloadGZIPFromZSDT struct { + file File +} + +func (d *downloadGZIPFromZSDT) Support(_ context.Context, desc ocispec.Descriptor, downloadURL string) bool { + // NOTE: to reproducible generate gzip from zstd, gzip header should + // always be empty, and the default compress level should be used + u, err := url.ParseRequestURI(downloadURL) + return err == nil && u.Scheme == URISchemeZstd && desc.MediaType == ocispec.MediaTypeImageLayerGzip +} + +func (d *downloadGZIPFromZSDT) Download(_ context.Context, _ ocispec.Descriptor, downloadURL string) (io.ReadCloser, error) { + u, err := url.ParseRequestURI(downloadURL) + if err != nil { + return nil, fmt.Errorf("invalid url %s: %w", downloadURL, err) + } + r, err := LookupFileInTARFile(d.file, filepath.Join("blobs", u.Path)).Open() + if err != nil { + return nil, fmt.Errorf("read blobs %s: %w", u.Path, err) + } + gr, err := GzipReaderFromZstdUpstream(r) + if err != nil { + _ = r.Close() + return nil, fmt.Errorf("read gzip from zstd: %w", err) + } + return gr, nil +} + +func GzipReaderFromZstdUpstream(upstream io.ReadCloser) (io.ReadCloser, error) { + zr, err := zstd.NewReader(upstream) + if err != nil { + return nil, fmt.Errorf("open zstd stream: %w", err) + } + + pr, pw := io.Pipe() + gr := gzip.NewWriter(pw) + go func() { + _, err = io.Copy(gr, zr) + zr.Close() + _ = gr.Close() + _ = pw.CloseWithError(err) + }() + + return struct { + io.Reader + io.Closer + }{ + Reader: pr, + Closer: multiCloser(pr, upstream), + }, nil +} diff --git a/remotes/file_download_test.go b/remotes/file_download_test.go new file mode 100644 index 0000000..3a8d383 --- /dev/null +++ b/remotes/file_download_test.go @@ -0,0 +1,62 @@ +/* +Copyright 2025 The Everoute Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remotes_test + +import ( + "context" + "io" + "os" + "testing" + + "github.com/klauspost/compress/gzip" + . "github.com/onsi/gomega" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/samber/lo" + + "github.com/everoute/container/remotes" +) + +func TestDownloadGZIPFromZSDT(t *testing.T) { + RegisterTestingT(t) + + ctx := context.Background() + f := remotes.OpenFunc(func() (io.ReadCloser, error) { return os.Open("testdata/example-zstd.tar") }) + d := remotes.NewDownloadGZIPFromZSDT(f) + + desc := ocispec.Descriptor{ + MediaType: ocispec.MediaTypeImageLayerGzip, + URLs: []string{ + "https://example.com/blob/b867a7a339c983abc0a33b8f7da12380ec7cec3f2a1e6ba80ec7cb", + "zstd:///sha256/d1b7ce66ba6cbfd27bf4adf2554e2e999689820ee37176138843f60f502bf47f", + }, + } + Expect(d.Support(ctx, desc, desc.URLs[1])).Should(BeTrue()) + Expect(d.Support(ctx, desc, desc.URLs[0])).Should(BeFalse()) + + cr, err := d.Download(ctx, desc, desc.URLs[1]) + Expect(err).ShouldNot(HaveOccurred()) + defer cr.Close() + gr, err := gzip.NewReader(cr) + Expect(err).ShouldNot(HaveOccurred()) + Expect(string(lo.Must(io.ReadAll(gr)))).Should(Equal("HELLO WORLD")) + Expect(gr.Close()).ShouldNot(HaveOccurred()) + Expect(cr.Close()).ShouldNot(HaveOccurred()) + + cr, err = remotes.DownloadFetch(ctx, d, desc) + Expect(err).ShouldNot(HaveOccurred()) + Expect(cr.Close()).ShouldNot(HaveOccurred()) +} diff --git a/remotes/testdata/example-zstd.tar b/remotes/testdata/example-zstd.tar new file mode 100644 index 0000000000000000000000000000000000000000..76efb169a7474720e6c56661902703bca2892e7c GIT binary patch literal 10240 zcmeIxze)r#5C-rR);`0Qc9(xMv*{Hq7EWx$GPzB-LKIv@@L_xeJ0DLlt0KyAoW;e) zFHIm5CgJxb>FOp;3)OV!Iz&X($q4E1^IV#TGNg%wou2E)kSEnq(_4Kuzn|{nZP%&C zb-aEs+mGB;|BJgeI3vC1KdqxBZuJc2fPb04tvhS}mWYvmgSaXncNmiLTMP8*c zT1mynvD7qtwmF2redw*u*@+Q0uJ64otP5#dcS_ml{;{w#fA{tOjd)htI&W+KdP~2( zF8}lYfAq3=Jvh5m>hxq7mfgkjd^mo7It+U?Ml%RN00Izz00bZa0SG_<0uX=z1Rwwb c2tWV=5P$##AOHafKmY;|fB*y_0D)!#Zw2#MLjV8( literal 0 HcmV?d00001