Skip to content

Commit 5e36e54

Browse files
Eliminate unnecessary nextChunk logic during reading
nextChunk was a remnant from old logic that is no longer used. The current code does not try to read past current frame bytes and there is no need for the logic that loads the next chunk. Renamed chunkedReader to limitedReader for more clarity and added a unit test.
1 parent c1096ad commit 5e36e54

File tree

6 files changed

+209
-106
lines changed

6 files changed

+209
-106
lines changed

go/pkg/frame.go

Lines changed: 13 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -96,19 +96,19 @@ type FrameDecoder struct {
9696
frameContentSrc ByteAndBlockReader
9797
decompressedContentReader *bufio.Reader
9898
decompressor *zstd.Decoder
99-
chunkReader chunkedReader
99+
limitedReader limitedReader
100100
flags FrameFlags
101101
frameLoaded bool
102102
notFirstFrame bool
103103
}
104104

105-
type chunkedReader struct {
106-
src ByteAndBlockReader
107-
limit int64
108-
nextChunk func() error
105+
// limitedReader wraps a ByteAndBlockReader and limits the number of bytes read.
106+
type limitedReader struct {
107+
src ByteAndBlockReader
108+
limit int64
109109
}
110110

111-
func (r *chunkedReader) readByte() (byte, error) {
111+
func (r *limitedReader) ReadByte() (byte, error) {
112112
if r.limit <= 0 {
113113
return 0, io.EOF
114114
}
@@ -117,25 +117,7 @@ func (r *chunkedReader) readByte() (byte, error) {
117117
return b, err
118118
}
119119

120-
func (r *chunkedReader) ReadByte() (byte, error) {
121-
loop:
122-
for {
123-
b, err := r.readByte()
124-
if err == nil {
125-
return b, err
126-
}
127-
if err == io.EOF {
128-
err = r.nextChunk()
129-
if err != nil {
130-
return 0, err
131-
}
132-
goto loop
133-
}
134-
return 0, err
135-
}
136-
}
137-
138-
func (r *chunkedReader) readBlock(p []byte) (n int, err error) {
120+
func (r *limitedReader) Read(p []byte) (n int, err error) {
139121
if r.limit <= 0 {
140122
return 0, io.EOF
141123
}
@@ -147,25 +129,7 @@ func (r *chunkedReader) readBlock(p []byte) (n int, err error) {
147129
return
148130
}
149131

150-
func (r *chunkedReader) Read(p []byte) (n int, err error) {
151-
loop:
152-
for {
153-
n, err := r.readBlock(p)
154-
if err == nil {
155-
return n, err
156-
}
157-
if err == io.EOF {
158-
err = r.nextChunk()
159-
if err != nil {
160-
return 0, err
161-
}
162-
goto loop
163-
}
164-
return 0, err
165-
}
166-
}
167-
168-
func (r *chunkedReader) Init(src ByteAndBlockReader) {
132+
func (r *limitedReader) Init(src ByteAndBlockReader) {
169133
r.src = src
170134
}
171135

@@ -176,12 +140,11 @@ const readBufSize = 64 * 1024
176140
func (d *FrameDecoder) Init(src ByteAndBlockReader, compression Compression) error {
177141
d.src = src
178142
d.compression = compression
179-
d.chunkReader.Init(src)
180-
d.chunkReader.nextChunk = d.nextFrame
143+
d.limitedReader.Init(src)
181144

182145
switch d.compression {
183146
case CompressionNone:
184-
d.frameContentSrc = &d.chunkReader
147+
d.frameContentSrc = &d.limitedReader
185148

186149
case CompressionZstd:
187150
var err error
@@ -222,19 +185,19 @@ func (d *FrameDecoder) nextFrame() error {
222185
if err != nil {
223186
return err
224187
}
225-
d.chunkReader.limit = int64(compressedSize)
188+
d.limitedReader.limit = int64(compressedSize)
226189

227190
if !d.notFirstFrame || d.flags&RestartCompression != 0 {
228191
d.notFirstFrame = true
229-
if err := d.decompressor.Reset(&d.chunkReader); err != nil {
192+
if err := d.decompressor.Reset(&d.limitedReader); err != nil {
230193
return err
231194
}
232195
}
233196

234197
d.decompressedContentReader.Reset(d.decompressor)
235198
} else {
236199
compressedSize = uncompressedSize
237-
d.chunkReader.limit = int64(uncompressedSize)
200+
d.limitedReader.limit = int64(uncompressedSize)
238201
}
239202

240203
d.frameLoaded = true

go/pkg/frame_test.go

Lines changed: 111 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,28 @@ package pkg
33
import (
44
"bytes"
55
"io"
6+
"strconv"
67
"strings"
78
"testing"
89

910
"github.com/stretchr/testify/require"
1011
)
1112

12-
type memChunkReaderWriter struct {
13+
// memReaderWriter is an in-memory implementation of ChunkWriter and ByteAndBlockReader interfaces
14+
// that allows to first write to the buffer and then read from it.
15+
type memReaderWriter struct {
1316
buf bytes.Buffer
1417
}
1518

16-
func (m *memChunkReaderWriter) ReadByte() (byte, error) {
19+
func (m *memReaderWriter) ReadByte() (byte, error) {
1720
return m.buf.ReadByte()
1821
}
1922

20-
func (m *memChunkReaderWriter) Read(p []byte) (n int, err error) {
23+
func (m *memReaderWriter) Read(p []byte) (n int, err error) {
2124
return m.buf.Read(p)
2225
}
2326

24-
func (m *memChunkReaderWriter) WriteChunk(header []byte, content []byte) error {
27+
func (m *memReaderWriter) WriteChunk(header []byte, content []byte) error {
2528
_, err := m.buf.Write(header)
2629
if err != nil {
2730
return err
@@ -30,11 +33,11 @@ func (m *memChunkReaderWriter) WriteChunk(header []byte, content []byte) error {
3033
return err
3134
}
3235

33-
func (m *memChunkReaderWriter) Bytes() []byte {
36+
func (m *memReaderWriter) Bytes() []byte {
3437
return m.buf.Bytes()
3538
}
3639

37-
func TestLastFrameAndContinue(t *testing.T) {
40+
func testLastFrameAndContinue(t *testing.T, compression Compression) {
3841
// This test verifies that it is possible to decode until the end of available
3942
// data, get a correct indication that it is the end of the frame and end
4043
// of all available data, then once new data becomes available the decoding
@@ -43,8 +46,8 @@ func TestLastFrameAndContinue(t *testing.T) {
4346

4447
// Encode one frame with some data.
4548
encoder := FrameEncoder{}
46-
buf := &memChunkReaderWriter{}
47-
err := encoder.Init(buf, CompressionZstd)
49+
buf := &memReaderWriter{}
50+
err := encoder.Init(buf, compression)
4851
require.NoError(t, err)
4952
writeStr := []byte(strings.Repeat("hello", 10))
5053
_, err = encoder.Write(writeStr)
@@ -55,7 +58,7 @@ func TestLastFrameAndContinue(t *testing.T) {
5558

5659
// Now decode that frame.
5760
decoder := FrameDecoder{}
58-
err = decoder.Init(buf, CompressionZstd)
61+
err = decoder.Init(buf, compression)
5962
require.NoError(t, err)
6063
_, err = decoder.Next()
6164
require.NoError(t, err)
@@ -73,40 +76,113 @@ func TestLastFrameAndContinue(t *testing.T) {
7376
require.ErrorIs(t, err, EndOfFrame)
7477
require.EqualValues(t, 0, n)
7578

76-
// Try decoding the next frame and make sure we get the EOF from the source byte Reader.
77-
_, err = decoder.Next()
79+
for i := 1; i <= 10; i++ {
80+
// Try decoding the next frame and make sure we get the EOF from the source byte Reader.
81+
_, err = decoder.Next()
82+
require.ErrorIs(t, err, io.EOF)
83+
84+
// Continue adding to the same source byte buffer using encoder.
85+
86+
// Open a new frame, write new data and close the frame.
87+
encoder.OpenFrame(0)
88+
writeStr = []byte(strings.Repeat("foo", i))
89+
_, err = encoder.Write(writeStr)
90+
require.NoError(t, err)
91+
92+
err = encoder.CloseFrame()
93+
require.NoError(t, err)
94+
95+
// Try reading again. We should get an EndOfFrame error.
96+
readStr = make([]byte, len(writeStr))
97+
n, err = decoder.Read(readStr)
98+
require.ErrorIs(t, err, EndOfFrame)
99+
require.EqualValues(t, 0, n)
100+
101+
// Now try decoding a new frame. This time it should succeed since we added a new frame.
102+
_, err = decoder.Next()
103+
require.NoError(t, err)
104+
105+
// Read the encoded data.
106+
n, err = decoder.Read(readStr)
107+
require.EqualValues(t, len(writeStr), n)
108+
require.EqualValues(t, writeStr, readStr)
109+
110+
// Try decoding more, past the end of second frame.
111+
n, err = decoder.Read(readStr)
112+
113+
// Make sure the error indicates end of the frame.
114+
require.ErrorIs(t, err, EndOfFrame)
115+
require.EqualValues(t, 0, n)
116+
}
117+
}
118+
119+
func TestLastFrameAndContinue(t *testing.T) {
120+
compressions := []Compression{
121+
CompressionNone,
122+
CompressionZstd,
123+
}
124+
125+
for _, compression := range compressions {
126+
t.Run(
127+
strconv.Itoa(int(compression)), func(t *testing.T) {
128+
testLastFrameAndContinue(t, compression)
129+
},
130+
)
131+
}
132+
}
133+
134+
func TestLimitedReader(t *testing.T) {
135+
data := []byte("abcdef")
136+
mem := &memReaderWriter{buf: *bytes.NewBuffer(data)}
137+
var lr limitedReader
138+
lr.Init(mem)
139+
140+
// Test reading with limit 0
141+
lr.limit = 0
142+
buf := make([]byte, 3)
143+
n, err := lr.Read(buf)
144+
require.Equal(t, 0, n)
78145
require.ErrorIs(t, err, io.EOF)
79146

80-
// Continue adding to the same source byte buffer using encoder.
147+
// Test ReadByte with limit 0
148+
lr.limit = 0
149+
_, err = lr.ReadByte()
150+
require.ErrorIs(t, err, io.EOF)
81151

82-
// Open a new frame, write new data and close the frame.
83-
encoder.OpenFrame(0)
84-
writeStr = []byte(strings.Repeat("foo", 10))
85-
_, err = encoder.Write(writeStr)
152+
// Reset and test reading less than limit
153+
mem = &memReaderWriter{buf: *bytes.NewBuffer(data)}
154+
lr.Init(mem)
155+
lr.limit = 3
156+
buf = make([]byte, 2)
157+
n, err = lr.Read(buf)
158+
require.Equal(t, 2, n)
86159
require.NoError(t, err)
160+
require.Equal(t, []byte("ab"), buf)
161+
require.Equal(t, int64(1), lr.limit)
87162

88-
err = encoder.CloseFrame()
163+
// Test ReadByte with remaining limit
164+
b, err := lr.ReadByte()
89165
require.NoError(t, err)
166+
require.Equal(t, byte('c'), b)
167+
require.Equal(t, int64(0), lr.limit)
90168

91-
// Try reading again. We should get an EndOfFrame error.
92-
readStr = make([]byte, len(writeStr))
93-
n, err = decoder.Read(readStr)
94-
require.ErrorIs(t, err, EndOfFrame)
95-
require.EqualValues(t, 0, n)
169+
// Test ReadByte at limit 0 after reading
170+
_, err = lr.ReadByte()
171+
require.ErrorIs(t, err, io.EOF)
96172

97-
// Now try decoding a new frame. This time it should succeed since we added a new frame.
98-
_, err = decoder.Next()
173+
// Test reading more than limit
174+
mem = &memReaderWriter{buf: *bytes.NewBuffer(data)}
175+
lr.Init(mem)
176+
lr.limit = 4
177+
buf = make([]byte, 10)
178+
n, err = lr.Read(buf)
179+
require.Equal(t, 4, n)
99180
require.NoError(t, err)
181+
require.Equal(t, []byte("abcd"), buf[:n])
182+
require.Equal(t, int64(0), lr.limit)
100183

101-
// Read the encoded data.
102-
n, err = decoder.Read(readStr)
103-
require.EqualValues(t, len(writeStr), n)
104-
require.EqualValues(t, writeStr, readStr)
105-
106-
// Try decoding more, past the end of second frame.
107-
n, err = decoder.Read(readStr)
108-
109-
// Make sure the error indicates end of the frame.
110-
require.ErrorIs(t, err, EndOfFrame)
111-
require.EqualValues(t, 0, n)
184+
// Test Read after limit exhausted
185+
n, err = lr.Read(buf)
186+
require.Equal(t, 0, n)
187+
require.ErrorIs(t, err, io.EOF)
112188
}

java/src/main/java/net/stef/FrameDecoder.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,22 @@ public class FrameDecoder extends InputStream {
1313
private int ofs;
1414
private InputStream frameContentSrc;
1515
private ZstdInputStream decompressor;
16-
private ChunkedReader chunkReader = new ChunkedReader();
16+
final private LimitedReader limitedReader = new LimitedReader();
1717
private int flags;
1818
private boolean frameLoaded;
1919
private boolean notFirstFrame;
2020

2121
public void init(InputStream src, Compression compression) throws IOException {
2222
this.src = src;
2323
this.compression = compression;
24-
chunkReader.init(src);
25-
chunkReader.setNextChunk(this::nextFrame);
24+
limitedReader.init(src);
2625

2726
switch (compression) {
2827
case None:
29-
this.frameContentSrc = chunkReader;
28+
this.frameContentSrc = limitedReader;
3029
break;
3130
case Zstd:
32-
this.decompressor = new ZstdInputStream(chunkReader);
31+
this.decompressor = new ZstdInputStream(limitedReader);
3332
this.frameContentSrc = decompressor;
3433
break;
3534
default:
@@ -53,16 +52,16 @@ private void nextFrame() throws IOException {
5352
if (compression != Compression.None) {
5453
long compressedSize = Serde.readUvarint(src);
5554

56-
chunkReader.setLimit(compressedSize);
55+
limitedReader.setLimit(compressedSize);
5756

5857
if (!notFirstFrame || (flags & FrameFlags.RestartCompression)!=0) {
5958
notFirstFrame = true;
6059
decompressor.close();
61-
decompressor = new ZstdInputStream(chunkReader);
60+
decompressor = new ZstdInputStream(limitedReader);
6261
frameContentSrc = decompressor;
6362
}
6463
} else {
65-
chunkReader.setLimit(uncompressedSize);
64+
limitedReader.setLimit(uncompressedSize);
6665
}
6766

6867
frameLoaded = true;

0 commit comments

Comments
 (0)