From b78b527538884417a1d21a0380c957b30bf463e1 Mon Sep 17 00:00:00 2001 From: Qiu Jian Date: Mon, 19 May 2025 07:21:30 +0800 Subject: [PATCH] fix: streamutils error for zero sized input --- util/fileutils/filesize.go | 4 ++++ util/streamutils/streamutils.go | 35 +++++++++++++++++++++------- util/streamutils/streamutils_test.go | 12 ++++++++-- 3 files changed, 40 insertions(+), 11 deletions(-) diff --git a/util/fileutils/filesize.go b/util/fileutils/filesize.go index 01bddf3..4d368f9 100644 --- a/util/fileutils/filesize.go +++ b/util/fileutils/filesize.go @@ -63,3 +63,7 @@ func GetSizeKb(sizeStr string, defaultSize byte, base int) (int, error) { size, err := parseSizeStr(sizeStr, defaultSize, base) return size / base, err } + +func GetSizeBytes(sizeStr string, base int) (int, error) { + return parseSizeStr(sizeStr, 'B', base) +} diff --git a/util/streamutils/streamutils.go b/util/streamutils/streamutils.go index 7bee4d9..068fe41 100644 --- a/util/streamutils/streamutils.go +++ b/util/streamutils/streamutils.go @@ -33,21 +33,30 @@ type SStreamProperty struct { type sXZReadAheadReader struct { offset int64 header []byte + hdrEof bool upstream io.Reader } func newXZReadAheadReader(stream io.Reader) (*sXZReadAheadReader, error) { xzHdr := make([]byte, xz.HeaderLen) n, err := stream.Read(xzHdr) + hdrEof := false if err != nil { - return nil, errors.Wrap(err, "Read XZ hader") - } - if n != len(xzHdr) { - return nil, errors.Wrap(errors.ErrEOF, "too few header bytes") + if errors.Cause(err) == io.EOF { + // delay the EOF + hdrEof = true + xzHdr = xzHdr[:n] + } else { + return nil, errors.Wrap(err, "Read XZ header") + } + } else if n != len(xzHdr) { + hdrEof = true + xzHdr = xzHdr[:n] } return &sXZReadAheadReader{ offset: 0, header: xzHdr, + hdrEof: hdrEof, upstream: stream, }, nil } @@ -57,6 +66,7 @@ func (s *sXZReadAheadReader) IsXz() bool { } func (s *sXZReadAheadReader) Read(buf []byte) (int, error) { + bufOffset := 0 if s.offset < int64(len(s.header)) { // read from header rdSize := len(s.header) - int(s.offset) @@ -65,12 +75,19 @@ func (s *sXZReadAheadReader) Read(buf []byte) (int, error) { } n := copy(buf, s.header[s.offset:s.offset+int64(rdSize)]) s.offset += int64(n) - return n, nil - } else { - n, err := s.upstream.Read(buf) - s.offset += int64(n) - return n, err + bufOffset = n } + // read buffer is full + if bufOffset >= len(buf) { + return bufOffset, nil + } + if s.offset >= int64(len(s.header)) && s.hdrEof { + return bufOffset, io.EOF + } + + n, err := s.upstream.Read(buf[bufOffset:]) + s.offset += int64(n) + return n + bufOffset, err } func StreamPipe(upstream io.Reader, writer io.Writer, CalChecksum bool, callback func(savedTotal int64)) (*SStreamProperty, error) { diff --git a/util/streamutils/streamutils_test.go b/util/streamutils/streamutils_test.go index d984c01..70aed28 100644 --- a/util/streamutils/streamutils_test.go +++ b/util/streamutils/streamutils_test.go @@ -28,6 +28,8 @@ func TestStreamPipe(t *testing.T) { for _, bufSize := range []int{ 324553, 2312, + 5, + 0, } { seed := rand.New(rand.NewSource(int64(bufSize))) buf := make([]byte, bufSize) @@ -45,7 +47,9 @@ func TestStreamPipe(t *testing.T) { t.Errorf("Steampipe fail %s", err) } else { t.Logf("stat %#v", stat) - if !reflect.DeepEqual(buf, outBuf.Bytes()) { + if len(buf) != len(outBuf.Bytes()) { + t.Errorf("input len %d != output len %d", len(buf), len(outBuf.Bytes()) ) + } else if len(buf) > 0 && !reflect.DeepEqual(buf, outBuf.Bytes()) { t.Errorf("input != output") } } @@ -56,6 +60,8 @@ func TestStreamPipeXZ(t *testing.T) { for _, bufSize := range []int{ 324553, 2312, + 5, + 0, } { seed := rand.New(rand.NewSource(int64(bufSize))) buf := make([]byte, bufSize) @@ -97,7 +103,9 @@ func TestStreamPipeXZ(t *testing.T) { t.Errorf("Steampipe fail %s", err) } else { t.Logf("stat %#v", stat) - if !reflect.DeepEqual(buf, outBuf.Bytes()) { + if len(buf) != len(outBuf.Bytes()) { + t.Errorf("input len %d != output len %d", len(buf), len(outBuf.Bytes()) ) + } else if len(buf) > 0 && !reflect.DeepEqual(buf, outBuf.Bytes()) { t.Errorf("input != output") } }