From 469d6e84b26396f6f9031d18d2453b4ed9e167a4 Mon Sep 17 00:00:00 2001 From: Aleksa Sarai Date: Mon, 30 Jan 2017 22:51:29 +1100 Subject: [PATCH 1/2] copy: move compression to separate pkg Signed-off-by: Aleksa Sarai --- copy/copy.go | 15 ++++---- copy/copy_test.go | 9 ++--- copy/fixtures/Hello.bz2 | Bin 43 -> 40 bytes copy/fixtures/Hello.gz | Bin 25 -> 39 bytes copy/fixtures/Hello.uncompressed | 2 +- copy/fixtures/Hello.xz | Bin 64 -> 39 bytes {copy => pkg/compression}/compression.go | 33 ++++++++++-------- {copy => pkg/compression}/compression_test.go | 12 +++---- pkg/compression/fixtures/Hello.bz2 | Bin 0 -> 43 bytes pkg/compression/fixtures/Hello.gz | Bin 0 -> 25 bytes pkg/compression/fixtures/Hello.uncompressed | 1 + pkg/compression/fixtures/Hello.xz | Bin 0 -> 64 bytes 12 files changed, 40 insertions(+), 32 deletions(-) mode change 100644 => 120000 copy/fixtures/Hello.bz2 mode change 100644 => 120000 copy/fixtures/Hello.gz mode change 100644 => 120000 copy/fixtures/Hello.uncompressed mode change 100644 => 120000 copy/fixtures/Hello.xz rename {copy => pkg/compression}/compression.go (59%) rename {copy => pkg/compression}/compression_test.go (88%) create mode 100644 pkg/compression/fixtures/Hello.bz2 create mode 100644 pkg/compression/fixtures/Hello.gz create mode 100644 pkg/compression/fixtures/Hello.uncompressed create mode 100644 pkg/compression/fixtures/Hello.xz diff --git a/copy/copy.go b/copy/copy.go index d27e634ce5..e03fd5c987 100644 --- a/copy/copy.go +++ b/copy/copy.go @@ -13,6 +13,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/containers/image/image" "github.com/containers/image/manifest" + "github.com/containers/image/pkg/compression" "github.com/containers/image/signature" "github.com/containers/image/transports" "github.com/containers/image/types" @@ -370,7 +371,7 @@ func (ic *imageCopier) copyLayer(srcInfo types.BlobInfo) (types.BlobInfo, digest // and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller. func (ic *imageCopier) copyLayerFromStream(srcStream io.Reader, srcInfo types.BlobInfo, diffIDIsNeeded bool) (types.BlobInfo, <-chan diffIDResult, error) { - var getDiffIDRecorder func(decompressorFunc) io.Writer // = nil + var getDiffIDRecorder func(compression.DecompressorFunc) io.Writer // = nil var diffIDChan chan diffIDResult err := errors.New("Internal error: unexpected panic in copyLayer") // For pipeWriter.CloseWithError below @@ -381,7 +382,7 @@ func (ic *imageCopier) copyLayerFromStream(srcStream io.Reader, srcInfo types.Bl pipeWriter.CloseWithError(err) // CloseWithError(nil) is equivalent to Close() }() - getDiffIDRecorder = func(decompressor decompressorFunc) io.Writer { + getDiffIDRecorder = func(decompressor compression.DecompressorFunc) io.Writer { // If this fails, e.g. because we have exited and due to pipeWriter.CloseWithError() above further // reading from the pipe has failed, we don’t really care. // We only read from diffIDChan if the rest of the flow has succeeded, and when we do read from it, @@ -399,7 +400,7 @@ func (ic *imageCopier) copyLayerFromStream(srcStream io.Reader, srcInfo types.Bl } // diffIDComputationGoroutine reads all input from layerStream, uncompresses using decompressor if necessary, and sends its digest, and status, if any, to dest. -func diffIDComputationGoroutine(dest chan<- diffIDResult, layerStream io.ReadCloser, decompressor decompressorFunc) { +func diffIDComputationGoroutine(dest chan<- diffIDResult, layerStream io.ReadCloser, decompressor compression.DecompressorFunc) { result := diffIDResult{ digest: "", err: errors.New("Internal error: unexpected panic in diffIDComputationGoroutine"), @@ -411,7 +412,7 @@ func diffIDComputationGoroutine(dest chan<- diffIDResult, layerStream io.ReadClo } // computeDiffID reads all input from layerStream, uncompresses it using decompressor if necessary, and returns its digest. -func computeDiffID(stream io.Reader, decompressor decompressorFunc) (digest.Digest, error) { +func computeDiffID(stream io.Reader, decompressor compression.DecompressorFunc) (digest.Digest, error) { if decompressor != nil { s, err := decompressor(stream) if err != nil { @@ -428,7 +429,7 @@ func computeDiffID(stream io.Reader, decompressor decompressorFunc) (digest.Dige // perhaps compressing it if canCompress, // and returns a complete blobInfo of the copied blob. func (ic *imageCopier) copyBlobFromStream(srcStream io.Reader, srcInfo types.BlobInfo, - getOriginalLayerCopyWriter func(decompressor decompressorFunc) io.Writer, + getOriginalLayerCopyWriter func(decompressor compression.DecompressorFunc) io.Writer, canCompress bool) (types.BlobInfo, error) { // The copying happens through a pipeline of connected io.Readers. // === Input: srcStream @@ -446,8 +447,8 @@ func (ic *imageCopier) copyBlobFromStream(srcStream io.Reader, srcInfo types.Blo var destStream io.Reader = digestingReader // === Detect compression of the input stream. - // This requires us to “peek ahead” into the stream to read the initial part, which requires us to chain through another io.Reader returned by detectCompression. - decompressor, destStream, err := detectCompression(destStream) // We could skip this in some cases, but let's keep the code path uniform + // This requires us to “peek ahead” into the stream to read the initial part, which requires us to chain through another io.Reader returned by DetectCompression. + decompressor, destStream, err := compression.DetectCompression(destStream) // We could skip this in some cases, but let's keep the code path uniform if err != nil { return types.BlobInfo{}, errors.Wrapf(err, "Error reading blob %s", srcInfo.Digest) } diff --git a/copy/copy_test.go b/copy/copy_test.go index feb580ddbf..b98133a88f 100644 --- a/copy/copy_test.go +++ b/copy/copy_test.go @@ -9,6 +9,7 @@ import ( "github.com/pkg/errors" + "github.com/containers/image/pkg/compression" "github.com/opencontainers/go-digest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -63,7 +64,7 @@ func TestDigestingReaderRead(t *testing.T) { } } -func goDiffIDComputationGoroutineWithTimeout(layerStream io.ReadCloser, decompressor decompressorFunc) *diffIDResult { +func goDiffIDComputationGoroutineWithTimeout(layerStream io.ReadCloser, decompressor compression.DecompressorFunc) *diffIDResult { ch := make(chan diffIDResult) go diffIDComputationGoroutine(ch, layerStream, nil) timeout := time.After(time.Second) @@ -94,12 +95,12 @@ func TestDiffIDComputationGoroutine(t *testing.T) { func TestComputeDiffID(t *testing.T) { for _, c := range []struct { filename string - decompressor decompressorFunc + decompressor compression.DecompressorFunc result digest.Digest }{ {"fixtures/Hello.uncompressed", nil, "sha256:185f8db32271fe25f561a6fc938b2e264306ec304eda518007d1764826381969"}, {"fixtures/Hello.gz", nil, "sha256:0bd4409dcd76476a263b8f3221b4ce04eb4686dec40bfdcc2e86a7403de13609"}, - {"fixtures/Hello.gz", gzipDecompressor, "sha256:185f8db32271fe25f561a6fc938b2e264306ec304eda518007d1764826381969"}, + {"fixtures/Hello.gz", compression.GzipDecompressor, "sha256:185f8db32271fe25f561a6fc938b2e264306ec304eda518007d1764826381969"}, } { stream, err := os.Open(c.filename) require.NoError(t, err, c.filename) @@ -111,7 +112,7 @@ func TestComputeDiffID(t *testing.T) { } // Error initializing decompression - _, err := computeDiffID(bytes.NewReader([]byte{}), gzipDecompressor) + _, err := computeDiffID(bytes.NewReader([]byte{}), compression.GzipDecompressor) assert.Error(t, err) // Error reading input diff --git a/copy/fixtures/Hello.bz2 b/copy/fixtures/Hello.bz2 deleted file mode 100644 index e822f5e5e9e170abc384cc72e161a5ed1548ca22..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 43 ycmZ>Y%CIzaj8qGblnP0i#K6G7%D~{j#Ik@vaaM-0uds3VPNg{-9=uvco(uru9tt-A diff --git a/copy/fixtures/Hello.bz2 b/copy/fixtures/Hello.bz2 new file mode 120000 index 0000000000..fc28d6c9ac --- /dev/null +++ b/copy/fixtures/Hello.bz2 @@ -0,0 +1 @@ +../../pkg/compression/fixtures/Hello.bz2 \ No newline at end of file diff --git a/copy/fixtures/Hello.gz b/copy/fixtures/Hello.gz deleted file mode 100644 index 22c895b7d178a03e0dc601f450ec5f2158f4866b..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 25 hcmb2|=3pq;{y&_7`LoB_lPB33nmR9jXJueu004p>39JAB diff --git a/copy/fixtures/Hello.gz b/copy/fixtures/Hello.gz new file mode 120000 index 0000000000..08aa805fcc --- /dev/null +++ b/copy/fixtures/Hello.gz @@ -0,0 +1 @@ +../../pkg/compression/fixtures/Hello.gz \ No newline at end of file diff --git a/copy/fixtures/Hello.uncompressed b/copy/fixtures/Hello.uncompressed deleted file mode 100644 index 5ab2f8a432..0000000000 --- a/copy/fixtures/Hello.uncompressed +++ /dev/null @@ -1 +0,0 @@ -Hello \ No newline at end of file diff --git a/copy/fixtures/Hello.uncompressed b/copy/fixtures/Hello.uncompressed new file mode 120000 index 0000000000..49b46625d8 --- /dev/null +++ b/copy/fixtures/Hello.uncompressed @@ -0,0 +1 @@ +../../pkg/compression/fixtures/Hello.uncompressed \ No newline at end of file diff --git a/copy/fixtures/Hello.xz b/copy/fixtures/Hello.xz deleted file mode 100644 index 6e9b0b6648fbe8cc20d6dcb1921ddbed0069210b..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 64 zcmexsUKJ6=z`*kC+7>q^21Q0O1_p)_{ill=8CX10b8_;5T!s^Cs!v$QoDXDRlx5wa R+pu1K+vi$FkOWI)6aakT6B_^k diff --git a/copy/fixtures/Hello.xz b/copy/fixtures/Hello.xz new file mode 120000 index 0000000000..77bcd85587 --- /dev/null +++ b/copy/fixtures/Hello.xz @@ -0,0 +1 @@ +../../pkg/compression/fixtures/Hello.xz \ No newline at end of file diff --git a/copy/compression.go b/pkg/compression/compression.go similarity index 59% rename from copy/compression.go rename to pkg/compression/compression.go index 03514b48e8..c114ded68e 100644 --- a/copy/compression.go +++ b/pkg/compression/compression.go @@ -1,4 +1,4 @@ -package copy +package compression import ( "bytes" @@ -11,32 +11,37 @@ import ( "github.com/Sirupsen/logrus" ) -// decompressorFunc, given a compressed stream, returns the decompressed stream. -type decompressorFunc func(io.Reader) (io.Reader, error) +// DecompressorFunc returns the decompressed stream, given a compressed stream. +type DecompressorFunc func(io.Reader) (io.Reader, error) -func gzipDecompressor(r io.Reader) (io.Reader, error) { +// GzipDecompressor is a DecompressorFunc for the gzip compression algorithm. +func GzipDecompressor(r io.Reader) (io.Reader, error) { return gzip.NewReader(r) } -func bzip2Decompressor(r io.Reader) (io.Reader, error) { + +// Bzip2Decompressor is a DecompressorFunc for the bzip2 compression algorithm. +func Bzip2Decompressor(r io.Reader) (io.Reader, error) { return bzip2.NewReader(r), nil } -func xzDecompressor(r io.Reader) (io.Reader, error) { + +// XzDecompressor is a DecompressorFunc for the xz compression algorithm. +func XzDecompressor(r io.Reader) (io.Reader, error) { return nil, errors.New("Decompressing xz streams is not supported") } -// compressionAlgos is an internal implementation detail of detectCompression +// compressionAlgos is an internal implementation detail of DetectCompression var compressionAlgos = map[string]struct { prefix []byte - decompressor decompressorFunc + decompressor DecompressorFunc }{ - "gzip": {[]byte{0x1F, 0x8B, 0x08}, gzipDecompressor}, // gzip (RFC 1952) - "bzip2": {[]byte{0x42, 0x5A, 0x68}, bzip2Decompressor}, // bzip2 (decompress.c:BZ2_decompress) - "xz": {[]byte{0xFD, 0x37, 0x7A, 0x58, 0x5A, 0x00}, xzDecompressor}, // xz (/usr/share/doc/xz/xz-file-format.txt) + "gzip": {[]byte{0x1F, 0x8B, 0x08}, GzipDecompressor}, // gzip (RFC 1952) + "bzip2": {[]byte{0x42, 0x5A, 0x68}, Bzip2Decompressor}, // bzip2 (decompress.c:BZ2_decompress) + "xz": {[]byte{0xFD, 0x37, 0x7A, 0x58, 0x5A, 0x00}, XzDecompressor}, // xz (/usr/share/doc/xz/xz-file-format.txt) } -// detectCompression returns a decompressorFunc if the input is recognized as a compressed format, nil otherwise. +// DetectCompression returns a DecompressorFunc if the input is recognized as a compressed format, nil otherwise. // Because it consumes the start of input, other consumers must use the returned io.Reader instead to also read from the beginning. -func detectCompression(input io.Reader) (decompressorFunc, io.Reader, error) { +func DetectCompression(input io.Reader) (DecompressorFunc, io.Reader, error) { buffer := [8]byte{} n, err := io.ReadAtLeast(input, buffer[:], len(buffer)) @@ -46,7 +51,7 @@ func detectCompression(input io.Reader) (decompressorFunc, io.Reader, error) { return nil, nil, err } - var decompressor decompressorFunc + var decompressor DecompressorFunc for name, algo := range compressionAlgos { if bytes.HasPrefix(buffer[:n], algo.prefix) { logrus.Debugf("Detected compression format %s", name) diff --git a/copy/compression_test.go b/pkg/compression/compression_test.go similarity index 88% rename from copy/compression_test.go rename to pkg/compression/compression_test.go index 9475a8b769..2dd429317a 100644 --- a/copy/compression_test.go +++ b/pkg/compression/compression_test.go @@ -1,4 +1,4 @@ -package copy +package compression import ( "bytes" @@ -33,7 +33,7 @@ func TestDetectCompression(t *testing.T) { require.NoError(t, err, c.filename) defer stream.Close() - _, updatedStream, err := detectCompression(stream) + _, updatedStream, err := DetectCompression(stream) require.NoError(t, err, c.filename) updatedContents, err := ioutil.ReadAll(updatedStream) @@ -47,7 +47,7 @@ func TestDetectCompression(t *testing.T) { require.NoError(t, err, c.filename) defer stream.Close() - decompressor, updatedStream, err := detectCompression(stream) + decompressor, updatedStream, err := DetectCompression(stream) require.NoError(t, err, c.filename) var uncompressedStream io.Reader @@ -70,7 +70,7 @@ func TestDetectCompression(t *testing.T) { } // Empty input is handled reasonably. - decompressor, updatedStream, err := detectCompression(bytes.NewReader([]byte{})) + decompressor, updatedStream, err := DetectCompression(bytes.NewReader([]byte{})) require.NoError(t, err) assert.Nil(t, decompressor) updatedContents, err := ioutil.ReadAll(updatedStream) @@ -80,7 +80,7 @@ func TestDetectCompression(t *testing.T) { // Error reading input reader, writer := io.Pipe() defer reader.Close() - writer.CloseWithError(errors.New("Expected error reading input in detectCompression")) - _, _, err = detectCompression(reader) + writer.CloseWithError(errors.New("Expected error reading input in DetectCompression")) + _, _, err = DetectCompression(reader) assert.Error(t, err) } diff --git a/pkg/compression/fixtures/Hello.bz2 b/pkg/compression/fixtures/Hello.bz2 new file mode 100644 index 0000000000000000000000000000000000000000..e822f5e5e9e170abc384cc72e161a5ed1548ca22 GIT binary patch literal 43 ycmZ>Y%CIzaj8qGblnP0i#K6G7%D~{j#Ik@vaaM-0uds3VPNg{-9=uvco(uru9tt-A literal 0 HcmV?d00001 diff --git a/pkg/compression/fixtures/Hello.gz b/pkg/compression/fixtures/Hello.gz new file mode 100644 index 0000000000000000000000000000000000000000..22c895b7d178a03e0dc601f450ec5f2158f4866b GIT binary patch literal 25 hcmb2|=3pq;{y&_7`LoB_lPB33nmR9jXJueu004p>39JAB literal 0 HcmV?d00001 diff --git a/pkg/compression/fixtures/Hello.uncompressed b/pkg/compression/fixtures/Hello.uncompressed new file mode 100644 index 0000000000..5ab2f8a432 --- /dev/null +++ b/pkg/compression/fixtures/Hello.uncompressed @@ -0,0 +1 @@ +Hello \ No newline at end of file diff --git a/pkg/compression/fixtures/Hello.xz b/pkg/compression/fixtures/Hello.xz new file mode 100644 index 0000000000000000000000000000000000000000..6e9b0b6648fbe8cc20d6dcb1921ddbed0069210b GIT binary patch literal 64 zcmexsUKJ6=z`*kC+7>q^21Q0O1_p)_{ill=8CX10b8_;5T!s^Cs!v$QoDXDRlx5wa R+pu1K+vi$FkOWI)6aakT6B_^k literal 0 HcmV?d00001 From 990658611305f9e319afdf5df2df03c996d5fa35 Mon Sep 17 00:00:00 2001 From: Aleksa Sarai Date: Mon, 30 Jan 2017 23:40:56 +1100 Subject: [PATCH 2/2] docker: daemon: decompress blobs transparently in GetBlob This is necessary because inside the docker-daemon (soon to be Docker archive) transport, the provided DigestInfo's digest is *not* the digest of the blob but is rather the DiffID of the layer. This results in issues when converting a docker-daemon image to an OCI image and then back again -- the compression copying code doesn't know what to do because the digest of the blob and the blob's "digest" in DigestInfo don't match. Fix it by always silently decompressing blobs that come from Docker archive transports. Signed-off-by: Aleksa Sarai --- docker/daemon/daemon_src.go | 45 ++++++++++++++++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/docker/daemon/daemon_src.go b/docker/daemon/daemon_src.go index 6b4aec8b76..79042852ae 100644 --- a/docker/daemon/daemon_src.go +++ b/docker/daemon/daemon_src.go @@ -10,6 +10,7 @@ import ( "path" "github.com/containers/image/manifest" + "github.com/containers/image/pkg/compression" "github.com/containers/image/types" "github.com/docker/docker/client" "github.com/opencontainers/go-digest" @@ -334,6 +335,18 @@ func (s *daemonImageSource) GetTargetManifest(digest digest.Digest) ([]byte, str return nil, "", errors.Errorf(`Manifest lists are not supported by "docker-daemon:"`) } +type readCloseWrapper struct { + io.Reader + closeFunc func() error +} + +func (r readCloseWrapper) Close() error { + if r.closeFunc != nil { + return r.closeFunc() + } + return nil +} + // GetBlob returns a stream for the specified blob, and the blob’s size (or -1 if unknown). func (s *daemonImageSource) GetBlob(info types.BlobInfo) (io.ReadCloser, int64, error) { if err := s.ensureCachedDataIsPresent(); err != nil { @@ -349,7 +362,37 @@ func (s *daemonImageSource) GetBlob(info types.BlobInfo) (io.ReadCloser, int64, if err != nil { return nil, 0, err } - return stream, li.size, nil + + // In order to handle the fact that digests != diffIDs (and thus that a + // caller which is trying to verify the blob will run into problems), + // we need to decompress blobs. This is a bit ugly, but it's a + // consequence of making everything addressable by their DiffID rather + // than by their digest... + // + // In particular, because the v2s2 manifest being generated uses + // DiffIDs, any caller of GetBlob is going to be asking for DiffIDs of + // layers not their _actual_ digest. The result is that copy/... will + // be verifing a "digest" which is not the actual layer's digest (but + // is instead the DiffID). + + decompressFunc, reader, err := compression.DetectCompression(stream) + if err != nil { + return nil, 0, errors.Wrapf(err, "Detecting compression in blob %s", info.Digest) + } + + if decompressFunc != nil { + reader, err = decompressFunc(reader) + if err != nil { + return nil, 0, errors.Wrapf(err, "Decompressing blob %s stream", info.Digest) + } + } + + newStream := readCloseWrapper{ + Reader: reader, + closeFunc: stream.Close, + } + + return newStream, li.size, nil } return nil, 0, errors.Errorf("Unknown blob %s", info.Digest)