Skip to content

Commit 5e3f98c

Browse files
committed
Address PR feedback. MemoryStream base change. Spillover buffer for Convert edge case. Flush strategy update.
1 parent bd5cf98 commit 5e3f98c

11 files changed

Lines changed: 319 additions & 105 deletions

File tree

src/libraries/Common/src/System/IO/ReadOnlyMemoryStream.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4+
// On net11.0+, the public ReadOnlyMemoryStream in System.Runtime (CoreLib) supersedes this internal copy.
5+
#if !NET11_0_OR_GREATER
6+
47
using System.Threading;
58
using System.Threading.Tasks;
69

@@ -213,3 +216,5 @@ private static void ValidateBufferArguments(byte[] buffer, int offset, int count
213216
#endif
214217
}
215218
}
219+
220+
#endif // !NET11_0_OR_GREATER

src/libraries/System.Memory/ref/System.Memory.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,11 +171,17 @@ public ReadOnlySequenceStream(System.Buffers.ReadOnlySequence<byte> sequence) {
171171
public override long Length { get { throw null; } }
172172
public override long Position { get { throw null; } set { } }
173173
public override void Flush() { }
174+
public override System.Threading.Tasks.Task FlushAsync(System.Threading.CancellationToken cancellationToken) { throw null; }
174175
public override int Read(byte[] buffer, int offset, int count) { throw null; }
175176
public override int Read(System.Span<byte> buffer) { throw null; }
177+
public override System.Threading.Tasks.Task<int> ReadAsync(byte[] buffer, int offset, int count, System.Threading.CancellationToken cancellationToken) { throw null; }
178+
public override System.Threading.Tasks.ValueTask<int> ReadAsync(System.Memory<byte> buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
176179
public override long Seek(long offset, System.IO.SeekOrigin origin) { throw null; }
177180
public override void SetLength(long value) { }
178181
public override void Write(byte[] buffer, int offset, int count) { }
182+
public override void Write(System.ReadOnlySpan<byte> buffer) { }
183+
public override System.Threading.Tasks.Task WriteAsync(byte[] buffer, int offset, int count, System.Threading.CancellationToken cancellationToken) { throw null; }
184+
public override System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory<byte> buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
179185
}
180186
}
181187
namespace System.Runtime.InteropServices

src/libraries/System.Memory/src/System/Buffers/ReadOnlySequenceStream.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,10 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
116116
{
117117
ValidateBufferArguments(buffer, offset, count);
118118

119-
// If cancellation was requested, bail early
120119
if (cancellationToken.IsCancellationRequested)
120+
{
121121
return Task.FromCanceled<int>(cancellationToken);
122+
}
122123

123124
int n = Read(buffer, offset, count);
124125
return Task.FromResult(n);
@@ -190,6 +191,10 @@ public override long Seek(long offset, SeekOrigin origin)
190191
/// <inheritdoc />
191192
public override void Flush() { }
192193

194+
/// <inheritdoc />
195+
public override Task FlushAsync(CancellationToken cancellationToken) =>
196+
cancellationToken.IsCancellationRequested ? Task.FromCanceled(cancellationToken) : Task.CompletedTask;
197+
193198
/// <inheritdoc />
194199
public override void SetLength(long value) => throw new NotSupportedException(SR.NotSupported_UnwritableStream);
195200

src/libraries/System.Private.CoreLib/src/Resources/Strings.resx

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3068,9 +3068,6 @@
30683068
<data name="InvalidOperation_SpanOverlappedOperation" xml:space="preserve">
30693069
<value>This operation is invalid on overlapping buffers.</value>
30703070
</data>
3071-
<data name="InvalidOperation_StreamResyncExceededMaxIterations" xml:space="preserve">
3072-
<value>Stream resynchronization exceeded maximum iterations.</value>
3073-
</data>
30743071
<data name="InvalidOperation_TimeProviderNullLocalTimeZone" xml:space="preserve">
30753072
<value>The operation cannot be performed when TimeProvider.LocalTimeZone is null.</value>
30763073
</data>

src/libraries/System.Private.CoreLib/src/System/IO/ReadOnlyMemoryStream.cs

Lines changed: 75 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@
77
namespace System.IO;
88

99
/// <summary>
10-
/// Provides a seekable, read-only <see cref="Stream"/> over a <see cref="ReadOnlyMemory{Byte}"/>.
10+
/// Provides a seekable, read-only <see cref="MemoryStream"/> over a <see cref="ReadOnlyMemory{Byte}"/>.
1111
/// </summary>
1212
/// <remarks>
1313
/// <para>This type is not thread-safe. Synchronize access if the stream is used concurrently.</para>
1414
/// <para>The stream cannot be written to. <see cref="CanWrite"/> always returns <see langword="false"/>.</para>
15+
/// <para><see cref="GetBuffer"/> throws and <see cref="TryGetBuffer"/> returns <see langword="false"/>.</para>
1516
/// </remarks>
16-
public sealed class ReadOnlyMemoryStream : Stream
17+
public sealed class ReadOnlyMemoryStream : MemoryStream
1718
{
1819
private ReadOnlyMemory<byte> _buffer;
1920
private int _position;
@@ -23,7 +24,7 @@ public sealed class ReadOnlyMemoryStream : Stream
2324
/// Initializes a new instance of the <see cref="ReadOnlyMemoryStream"/> class over the specified <see cref="ReadOnlyMemory{Byte}"/>.
2425
/// </summary>
2526
/// <param name="source">The <see cref="ReadOnlyMemory{Byte}"/> to wrap.</param>
26-
public ReadOnlyMemoryStream(ReadOnlyMemory<byte> source)
27+
public ReadOnlyMemoryStream(ReadOnlyMemory<byte> source) : base()
2728
{
2829
_buffer = source;
2930
_isOpen = true;
@@ -38,6 +39,17 @@ public ReadOnlyMemoryStream(ReadOnlyMemory<byte> source)
3839
/// <inheritdoc/>
3940
public override bool CanWrite => false;
4041

42+
/// <inheritdoc/>
43+
public override int Capacity
44+
{
45+
get
46+
{
47+
EnsureNotClosed();
48+
return _buffer.Length;
49+
}
50+
set => throw new NotSupportedException(SR.NotSupported_MemStreamNotExpandable);
51+
}
52+
4153
/// <inheritdoc/>
4254
public override long Length
4355
{
@@ -72,10 +84,16 @@ public override int ReadByte()
7284
{
7385
EnsureNotClosed();
7486

75-
if (_position >= _buffer.Length)
76-
return -1;
87+
ReadOnlySpan<byte> span = _buffer.Span;
88+
int position = _position;
89+
90+
if ((uint)position < (uint)span.Length)
91+
{
92+
_position++;
93+
return span[position];
94+
}
7795

78-
return _buffer.Span[_position++];
96+
return -1;
7997
}
8098

8199
/// <inheritdoc/>
@@ -93,7 +111,9 @@ public override int Read(Span<byte> buffer)
93111

94112
int remaining = _buffer.Length - _position;
95113
if (remaining <= 0 || buffer.Length == 0)
114+
{
96115
return 0;
116+
}
97117

98118
int bytesToRead = Math.Min(remaining, buffer.Length);
99119
_buffer.Span.Slice(_position, bytesToRead).CopyTo(buffer);
@@ -109,7 +129,9 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
109129
EnsureNotClosed();
110130

111131
if (cancellationToken.IsCancellationRequested)
132+
{
112133
return Task.FromCanceled<int>(cancellationToken);
134+
}
113135

114136
return Task.FromResult(Read(buffer, offset, count));
115137
}
@@ -120,7 +142,9 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
120142
EnsureNotClosed();
121143

122144
if (cancellationToken.IsCancellationRequested)
145+
{
123146
return ValueTask.FromCanceled<int>(cancellationToken);
147+
}
124148

125149
return new ValueTask<int>(Read(buffer.Span));
126150
}
@@ -169,7 +193,9 @@ public override long Seek(long offset, SeekOrigin origin)
169193
};
170194

171195
if (newPosition < 0)
196+
{
172197
throw new IOException(SR.IO_SeekBeforeBegin);
198+
}
173199

174200
ArgumentOutOfRangeException.ThrowIfGreaterThan(newPosition, int.MaxValue, nameof(offset));
175201

@@ -187,6 +213,49 @@ public override long Seek(long offset, SeekOrigin origin)
187213
/// <inheritdoc/>
188214
public override void Write(ReadOnlySpan<byte> buffer) => throw new NotSupportedException(SR.NotSupported_UnwritableStream);
189215

216+
/// <inheritdoc/>
217+
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => throw new NotSupportedException(SR.NotSupported_UnwritableStream);
218+
219+
/// <inheritdoc/>
220+
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) => throw new NotSupportedException(SR.NotSupported_UnwritableStream);
221+
222+
/// <inheritdoc/>
223+
public override byte[] GetBuffer() =>
224+
throw new UnauthorizedAccessException(SR.UnauthorizedAccess_MemStreamBuffer);
225+
226+
/// <inheritdoc/>
227+
public override bool TryGetBuffer(out ArraySegment<byte> buffer)
228+
{
229+
buffer = default;
230+
return false;
231+
}
232+
233+
/// <inheritdoc/>
234+
public override byte[] ToArray()
235+
{
236+
EnsureNotClosed();
237+
if (_buffer.Length == 0)
238+
{
239+
return Array.Empty<byte>();
240+
}
241+
242+
byte[] copy = GC.AllocateUninitializedArray<byte>(_buffer.Length);
243+
_buffer.Span.CopyTo(copy);
244+
return copy;
245+
}
246+
247+
/// <inheritdoc/>
248+
public override void WriteTo(Stream stream)
249+
{
250+
ArgumentNullException.ThrowIfNull(stream);
251+
EnsureNotClosed();
252+
253+
if (_buffer.Length > 0)
254+
{
255+
stream.Write(_buffer.Span);
256+
}
257+
}
258+
190259
/// <inheritdoc/>
191260
public override void Flush() { }
192261

src/libraries/System.Private.CoreLib/src/System/IO/StringStream.cs

Lines changed: 81 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,13 @@ public sealed class StringStream : Stream
2323
private int _charPosition;
2424
private bool _disposed;
2525

26+
// Spillover buffer for multibyte encodings: when the caller's buffer is too small
27+
// to hold even one encoded scalar (e.g., ReadByte with UTF-16), we encode into
28+
// this buffer and serve bytes from it across subsequent Read/ReadByte calls.
29+
private byte[]? _pendingBytes;
30+
private int _pendingOffset;
31+
private int _pendingCount;
32+
2633
/// <summary>
2734
/// Initializes a new instance of the <see cref="StringStream"/> class with the specified string and encoding.
2835
/// </summary>
@@ -91,18 +98,76 @@ public override int Read(Span<byte> buffer)
9198
{
9299
ObjectDisposedException.ThrowIf(_disposed, this);
93100

94-
if (buffer.Length == 0 || _charPosition >= _text.Length)
101+
if (buffer.Length == 0 || (_charPosition >= _text.Length && _pendingCount == 0))
95102
{
96103
return 0;
97104
}
98105

99-
ReadOnlySpan<char> remaining = _text.Span.Slice(_charPosition);
100-
bool flush = true;
106+
int totalBytesWritten = 0;
101107

102-
_encoder.Convert(remaining, buffer, flush, out int charsUsed, out int bytesUsed, out _);
103-
_charPosition += charsUsed;
108+
// Drain any pending bytes from a previous partial read.
109+
if (_pendingCount > 0)
110+
{
111+
int toCopy = Math.Min(_pendingCount, buffer.Length);
112+
_pendingBytes.AsSpan(_pendingOffset, toCopy).CopyTo(buffer);
113+
_pendingOffset += toCopy;
114+
_pendingCount -= toCopy;
115+
totalBytesWritten += toCopy;
116+
117+
if (totalBytesWritten == buffer.Length)
118+
{
119+
return totalBytesWritten;
120+
}
121+
122+
buffer = buffer.Slice(totalBytesWritten);
123+
}
104124

105-
return bytesUsed;
125+
if (_charPosition < _text.Length)
126+
{
127+
ReadOnlySpan<char> remaining = _text.Span.Slice(_charPosition);
128+
129+
// If the caller's buffer may be too small for even one encoded scalar,
130+
// encode into the spillover buffer first, then copy what fits.
131+
// Encoder.Convert throws ArgumentException when the output buffer
132+
// cannot hold a single complete encoded character.
133+
if (buffer.Length < _encoding.GetMaxByteCount(1))
134+
{
135+
_pendingBytes ??= new byte[_encoding.GetMaxByteCount(2)];
136+
int charsToEncode = Math.Min(2, remaining.Length);
137+
bool flush = _charPosition + charsToEncode >= _text.Length;
138+
_encoder.Convert(remaining.Slice(0, charsToEncode), _pendingBytes, flush, out int charsUsed, out int bytesUsed, out _);
139+
_charPosition += charsUsed;
140+
141+
int toCopy = Math.Min(bytesUsed, buffer.Length);
142+
_pendingBytes.AsSpan(0, toCopy).CopyTo(buffer);
143+
totalBytesWritten += toCopy;
144+
145+
_pendingOffset = toCopy;
146+
_pendingCount = bytesUsed - toCopy;
147+
}
148+
else
149+
{
150+
// Encode directly into the caller's buffer.
151+
// Only flush on the final block to preserve encoder state
152+
// for stateful encodings.
153+
_encoder.Convert(remaining, buffer, flush: false, out int charsUsed, out int bytesUsed, out _);
154+
_charPosition += charsUsed;
155+
totalBytesWritten += bytesUsed;
156+
157+
// If we consumed all remaining input, flush encoder state.
158+
if (_charPosition >= _text.Length && bytesUsed > 0)
159+
{
160+
Span<byte> flushBuf = buffer.Slice(bytesUsed);
161+
if (flushBuf.Length > 0)
162+
{
163+
_encoder.Convert(ReadOnlySpan<char>.Empty, flushBuf, flush: true, out _, out int flushBytes, out _);
164+
totalBytesWritten += flushBytes;
165+
}
166+
}
167+
}
168+
}
169+
170+
return totalBytesWritten;
106171
}
107172

108173
/// <inheritdoc/>
@@ -120,7 +185,9 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
120185
ValidateBufferArguments(buffer, offset, count);
121186

122187
if (cancellationToken.IsCancellationRequested)
188+
{
123189
return Task.FromCanceled<int>(cancellationToken);
190+
}
124191

125192
return Task.FromResult(Read(buffer, offset, count));
126193
}
@@ -129,7 +196,9 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
129196
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
130197
{
131198
if (cancellationToken.IsCancellationRequested)
199+
{
132200
return ValueTask.FromCanceled<int>(cancellationToken);
201+
}
133202

134203
return new ValueTask<int>(Read(buffer.Span));
135204
}
@@ -146,6 +215,12 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
146215
/// <inheritdoc/>
147216
public override void Write(ReadOnlySpan<byte> buffer) => throw new NotSupportedException(SR.NotSupported_UnwritableStream);
148217

218+
/// <inheritdoc/>
219+
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => throw new NotSupportedException(SR.NotSupported_UnwritableStream);
220+
221+
/// <inheritdoc/>
222+
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) => throw new NotSupportedException(SR.NotSupported_UnwritableStream);
223+
149224
/// <inheritdoc/>
150225
public override void Flush() { }
151226

0 commit comments

Comments
 (0)