Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ In Jetty 12.1.x, `RetainableByteBuffer` has broader usage within the xref:migrat
This class is useful to retain or copy other buffers such as ``Content.Chunk``s.
Dynamic capacity buffers are particularly effective to read and accumulate data efficiently (for example for later processing), when processing server-side request content, client-side response content, or WebSocket binary messages.

[[api-changes-memoryendpointpipe]]
=== `MemoryEndPointPipe`

`MemoryEndPointPipe` is an internal class that provides an in-memory implementation of `EndPoint.Pipe` for testing and internal use.

In Jetty 12.0.x, `MemoryEndPointPipe` allocated `ByteBuffer` instances directly for buffering data between endpoints.

In Jetty 12.1.x, `MemoryEndPointPipe` uses `RetainableByteBuffer.DynamicCapacity` for more efficient buffer management and integration with buffer pooling.

[[api-changes-httpconfig-maxresponseheadersize]]
=== `HttpConfiguration.maxResponseHeaderSize`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@

/**
* <p>Memory-based implementation of {@link EndPoint.Pipe}.</p>
* <p>This implementation uses {@link RetainableByteBuffer.DynamicCapacity} for efficient
* buffer management in {@link #flush(ByteBuffer...)}, avoiding per-write ByteBuffer allocations.</p>
*/
public class MemoryEndPointPipe implements EndPoint.Pipe
{
Expand Down Expand Up @@ -65,19 +67,28 @@ public void setLocalEndPointMaxCapacity(int maxCapacity)
{
localEndPoint.setMaxCapacity(maxCapacity);
}

public void setRemoteEndPointMaxCapacity(int maxCapacity)
{
remoteEndPoint.setMaxCapacity(maxCapacity);
}

/**
* <p>Memory-based {@link EndPoint} that uses {@link RetainableByteBuffer.DynamicCapacity}
* for efficient buffer management.</p>
* <p>Data written via {@link #flush(ByteBuffer...)} is stored in RetainableByteBuffers in a queue,
* and read via {@link #fill(ByteBuffer)} from the peer's queue. EOF is tracked using a sentinel
* in the queue to ensure proper ordering of data and EOF signals.</p>
*/
private class MemoryEndPoint extends AbstractEndPoint
{
private static final Logger LOG = LoggerFactory.getLogger(MemoryEndPoint.class);
private static final ByteBuffer EOF = ByteBuffer.allocate(0);

// Sentinel to mark EOF in the queue - ensures EOF is delivered after all data
private static final RetainableByteBuffer EOF = RetainableByteBuffer.EMPTY;

private final AutoLock lock = new AutoLock();
private final Deque<ByteBuffer> byteBuffers = new ArrayDeque<>();
private final Deque<RetainableByteBuffer> byteBuffers = new ArrayDeque<>();
private final SocketAddress localAddress;
private MemoryEndPoint peerEndPoint;
private Invocable.Task fillableTask;
Expand Down Expand Up @@ -165,24 +176,51 @@ else if (filled < 0)
private int fillInto(ByteBuffer buffer)
{
int filled = 0;
try (AutoLock ignored = lock.lock())
int pos = BufferUtil.flipToFill(buffer);
try
{
while (true)
try (AutoLock ignored = lock.lock())
{
ByteBuffer data = byteBuffers.peek();
if (data == null)
return filled;
if (data == EOF)
return filled > 0 ? filled : -1;
int length = data.remaining();
int copied = BufferUtil.append(buffer, data);
capacity -= copied;
filled += copied;
if (copied < length)
return filled;
byteBuffers.poll();
while (true)
{
RetainableByteBuffer data = byteBuffers.peek();
if (data == null)
return filled;
if (data == EOF)
return filled > 0 ? filled : -1;

int space = buffer.remaining();
if (space == 0)
return filled;

int available = data.remaining();
int toCopy = Math.min(space, available);

if (toCopy == available)
{
// Copy all and consume
data.putTo(buffer);
data.release();
byteBuffers.poll();
}
else
{
// Partial copy using slice
RetainableByteBuffer slice = data.slice(toCopy);
slice.putTo(buffer);
slice.release();
data.skip(toCopy);
}

capacity -= toCopy;
filled += toCopy;
}
}
}
finally
{
BufferUtil.flipToFlush(buffer, pos);
}
}

private void onFilled()
Expand Down Expand Up @@ -247,7 +285,7 @@ public boolean flush(ByteBuffer... buffers) throws IOException
// The buffer must be copied, otherwise a write() would complete
// and return it to the buffer pool where its backing store would
// be overwritten before it is read by the peer EndPoint.
ByteBuffer copy = lockedCopy(buffer);
RetainableByteBuffer copy = lockedCopy(buffer);
if (copy == null)
{
result = false;
Expand Down Expand Up @@ -277,7 +315,14 @@ public boolean flush(ByteBuffer... buffers) throws IOException
return result;
}

private ByteBuffer lockedCopy(ByteBuffer buffer)
/**
* Creates a copy of data from the source buffer, respecting maxCapacity.
* Uses {@link RetainableByteBuffer.DynamicCapacity} for efficient buffer management.
*
* @param buffer the source buffer to copy from
* @return a RetainableByteBuffer containing the copied data, or null if at capacity
*/
private RetainableByteBuffer lockedCopy(ByteBuffer buffer)
{
int length = buffer.remaining();
long maxCapacity = getMaxCapacity();
Expand All @@ -288,10 +333,22 @@ private ByteBuffer lockedCopy(ByteBuffer buffer)
return null;
length = (int)Math.min(length, space);
}
// TODO: Use RetainableByteBuffer.DynamicCapacity in Jetty 12.1.x.
ByteBuffer copy = buffer.isDirect() ? ByteBuffer.allocateDirect(length) : ByteBuffer.allocate(length);
copy.put(0, buffer, buffer.position(), length);
buffer.position(buffer.position() + length);

// Use DynamicCapacity to efficiently copy the data
RetainableByteBuffer.DynamicCapacity copy = new RetainableByteBuffer.DynamicCapacity();
if (length < buffer.remaining())
{
// Partial copy: temporarily reduce limit to copy only 'length' bytes,
// then restore the original limit so caller sees remaining data.
int limit = buffer.limit();
buffer.limit(buffer.position() + length);
copy.append(buffer);
buffer.limit(limit);
}
else
{
copy.append(buffer);
}
return copy;
}

Expand All @@ -312,7 +369,7 @@ protected void doClose()
super.doClose();
try (AutoLock ignored = lock.lock())
{
ByteBuffer last = byteBuffers.peekLast();
RetainableByteBuffer last = byteBuffers.peekLast();
if (last != EOF)
byteBuffers.offer(EOF);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.eclipse.jetty.io;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,6 +28,8 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class MemoryEndPointPipeTest
Expand Down Expand Up @@ -147,4 +150,189 @@ else if (filled == 0)

assertThat(totalFilled, equalTo(totalWritten));
}

@Test
public void testEofAfterAllDataConsumed() throws Exception
{
MemoryEndPointPipe pipe = new MemoryEndPointPipe(scheduler, executor::execute, null);

EndPoint localEndPoint = pipe.getLocalEndPoint();
EndPoint remoteEndPoint = pipe.getRemoteEndPoint();

// Register fill interest before writing
Callback.Completable remoteFillCallback = new Callback.Completable();
remoteEndPoint.fillInterested(remoteFillCallback);

// Write some data
byte[] data = new byte[100];
Arrays.fill(data, (byte)42);
Blocker.Shared blocker = new Blocker.Shared();
try (Blocker.Callback callback = blocker.callback())
{
localEndPoint.write(callback, ByteBuffer.wrap(data));
callback.block();
}

// Shutdown output to signal EOF
localEndPoint.shutdownOutput();

// Wait for data to be available
remoteFillCallback.get(5, TimeUnit.SECONDS);

// Read all data first
RetainableByteBuffer buffer = buffers.acquire(200, false);
ByteBuffer readBuffer = buffer.getByteBuffer();
int totalFilled = 0;
int filled = remoteEndPoint.fill(readBuffer);
while (filled > 0)
{
readBuffer.position(readBuffer.position() + filled);
totalFilled += filled;
filled = remoteEndPoint.fill(readBuffer);
}

// Verify all data was read
assertThat(totalFilled, equalTo(data.length));

// Verify EOF is returned after all data consumed
assertThat(filled, equalTo(-1));
}

@Test
public void testShutdownOutput() throws Exception
{
MemoryEndPointPipe pipe = new MemoryEndPointPipe(scheduler, executor::execute, null);

EndPoint localEndPoint = pipe.getLocalEndPoint();
EndPoint remoteEndPoint = pipe.getRemoteEndPoint();

// Shutdown output without writing any data
localEndPoint.shutdownOutput();

// Remote endpoint should get EOF immediately
ByteBuffer readBuffer = ByteBuffer.allocate(100);
int filled = remoteEndPoint.fill(readBuffer);
assertThat(filled, equalTo(-1));

// Verify local endpoint is output shutdown
assertTrue(localEndPoint.isOutputShutdown());
}

@Test
public void testCloseEndpoint() throws Exception
{
MemoryEndPointPipe pipe = new MemoryEndPointPipe(scheduler, executor::execute, null);

EndPoint localEndPoint = pipe.getLocalEndPoint();
EndPoint remoteEndPoint = pipe.getRemoteEndPoint();

// Register fill interest before writing
Callback.Completable remoteFillCallback = new Callback.Completable();
remoteEndPoint.fillInterested(remoteFillCallback);

// Write some data
byte[] data = new byte[50];
Blocker.Shared blocker = new Blocker.Shared();
try (Blocker.Callback callback = blocker.callback())
{
localEndPoint.write(callback, ByteBuffer.wrap(data));
callback.block();
}

// Close local endpoint
localEndPoint.close();

// Verify local endpoint is closed
assertFalse(localEndPoint.isOpen());

// Wait for data to be available
remoteFillCallback.get(5, TimeUnit.SECONDS);

// Remote endpoint should still be able to read existing data
RetainableByteBuffer buffer = buffers.acquire(100, false);
ByteBuffer readBuffer = buffer.getByteBuffer();
int filled = remoteEndPoint.fill(readBuffer);
assertThat(filled, equalTo(data.length));

// Advance buffer position after reading
readBuffer.position(readBuffer.position() + filled);

// After reading all data, should get EOF
filled = remoteEndPoint.fill(readBuffer);
assertThat(filled, equalTo(-1));
}

@Test
public void testFillOnClosedEndpoint() throws Exception
{
MemoryEndPointPipe pipe = new MemoryEndPointPipe(scheduler, executor::execute, null);

EndPoint localEndPoint = pipe.getLocalEndPoint();
EndPoint remoteEndPoint = pipe.getRemoteEndPoint();

// Close remote endpoint
remoteEndPoint.close();

// fill() on closed endpoint should throw IOException
ByteBuffer readBuffer = ByteBuffer.allocate(100);
assertThrows(IOException.class, () -> remoteEndPoint.fill(readBuffer));
}

@Test
public void testFlushOnClosedEndpoint() throws Exception
{
MemoryEndPointPipe pipe = new MemoryEndPointPipe(scheduler, executor::execute, null);

EndPoint localEndPoint = pipe.getLocalEndPoint();

// Close local endpoint
localEndPoint.close();

// flush() on closed endpoint should throw IOException
ByteBuffer writeBuffer = ByteBuffer.wrap(new byte[50]);
assertThrows(IOException.class, () -> localEndPoint.flush(writeBuffer));
}

@Test
public void testSmallCapacityPartialFlush() throws Exception
{
MemoryEndPointPipe pipe = new MemoryEndPointPipe(scheduler, executor::execute, null);
// Set a small capacity that is less than the data we want to write
pipe.setLocalEndPointMaxCapacity(20);

EndPoint localEndPoint = pipe.getLocalEndPoint();

// Try to flush more data than capacity allows
ByteBuffer writeBuffer = ByteBuffer.wrap(new byte[50]);
boolean flushed = localEndPoint.flush(writeBuffer);
assertFalse(flushed);

// Only 20 bytes should have been flushed, 30 remaining
assertThat(writeBuffer.remaining(), equalTo(30));
}

@Test
public void testEmptyBufferFlush() throws Exception
{
MemoryEndPointPipe pipe = new MemoryEndPointPipe(scheduler, executor::execute, null);

EndPoint localEndPoint = pipe.getLocalEndPoint();
EndPoint remoteEndPoint = pipe.getRemoteEndPoint();

// Flush empty buffers
ByteBuffer emptyBuffer = ByteBuffer.allocate(0);
boolean flushed = localEndPoint.flush(emptyBuffer);
assertTrue(flushed);

// Flush with position == limit (consumed buffer)
ByteBuffer consumedBuffer = ByteBuffer.allocate(50);
consumedBuffer.position(consumedBuffer.limit());
flushed = localEndPoint.flush(consumedBuffer);
assertTrue(flushed);

// Remote should have no data to read
ByteBuffer readBuffer = ByteBuffer.allocate(100);
int filled = remoteEndPoint.fill(readBuffer);
assertThat(filled, equalTo(0));
}
}