diff --git a/documentation/jetty/modules/programming-guide/pages/migration/12.0-to-12.1.adoc b/documentation/jetty/modules/programming-guide/pages/migration/12.0-to-12.1.adoc
index c66de9907795..3665bbc9fd8d 100644
--- a/documentation/jetty/modules/programming-guide/pages/migration/12.0-to-12.1.adoc
+++ b/documentation/jetty/modules/programming-guide/pages/migration/12.0-to-12.1.adoc
@@ -120,6 +120,15 @@ In Jetty 12.1.x, `RetainableByteBuffer` has broader usage within the xref:migrat
This class is useful to retain or copy other buffers such as ``Content.Chunk``s.
Dynamic capacity buffers are particularly effective to read and accumulate data efficiently (for example for later processing), when processing server-side request content, client-side response content, or WebSocket binary messages.
+[[api-changes-memoryendpointpipe]]
+=== `MemoryEndPointPipe`
+
+`MemoryEndPointPipe` is an internal class that provides an in-memory implementation of `EndPoint.Pipe` for testing and internal use.
+
+In Jetty 12.0.x, `MemoryEndPointPipe` allocated `ByteBuffer` instances directly for buffering data between endpoints.
+
+In Jetty 12.1.x, `MemoryEndPointPipe` uses `RetainableByteBuffer.DynamicCapacity` for more efficient buffer management and integration with buffer pooling.
+
[[api-changes-httpconfig-maxresponseheadersize]]
=== `HttpConfiguration.maxResponseHeaderSize`
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/MemoryEndPointPipe.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/MemoryEndPointPipe.java
index 8f2a7aa616fa..706bc8c06e94 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/MemoryEndPointPipe.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/MemoryEndPointPipe.java
@@ -33,6 +33,8 @@
/**
*
Memory-based implementation of {@link EndPoint.Pipe}.
+ * This implementation uses {@link RetainableByteBuffer.DynamicCapacity} for efficient
+ * buffer management in {@link #flush(ByteBuffer...)}, avoiding per-write ByteBuffer allocations.
*/
public class MemoryEndPointPipe implements EndPoint.Pipe
{
@@ -65,19 +67,28 @@ public void setLocalEndPointMaxCapacity(int maxCapacity)
{
localEndPoint.setMaxCapacity(maxCapacity);
}
-
+
public void setRemoteEndPointMaxCapacity(int maxCapacity)
{
remoteEndPoint.setMaxCapacity(maxCapacity);
}
+ /**
+ * Memory-based {@link EndPoint} that uses {@link RetainableByteBuffer.DynamicCapacity}
+ * for efficient buffer management.
+ * Data written via {@link #flush(ByteBuffer...)} is stored in RetainableByteBuffers in a queue,
+ * and read via {@link #fill(ByteBuffer)} from the peer's queue. EOF is tracked using a sentinel
+ * in the queue to ensure proper ordering of data and EOF signals.
+ */
private class MemoryEndPoint extends AbstractEndPoint
{
private static final Logger LOG = LoggerFactory.getLogger(MemoryEndPoint.class);
- private static final ByteBuffer EOF = ByteBuffer.allocate(0);
+
+ // Sentinel to mark EOF in the queue - ensures EOF is delivered after all data
+ private static final RetainableByteBuffer EOF = RetainableByteBuffer.EMPTY;
private final AutoLock lock = new AutoLock();
- private final Deque byteBuffers = new ArrayDeque<>();
+ private final Deque byteBuffers = new ArrayDeque<>();
private final SocketAddress localAddress;
private MemoryEndPoint peerEndPoint;
private Invocable.Task fillableTask;
@@ -165,24 +176,51 @@ else if (filled < 0)
private int fillInto(ByteBuffer buffer)
{
int filled = 0;
- try (AutoLock ignored = lock.lock())
+ int pos = BufferUtil.flipToFill(buffer);
+ try
{
- while (true)
+ try (AutoLock ignored = lock.lock())
{
- ByteBuffer data = byteBuffers.peek();
- if (data == null)
- return filled;
- if (data == EOF)
- return filled > 0 ? filled : -1;
- int length = data.remaining();
- int copied = BufferUtil.append(buffer, data);
- capacity -= copied;
- filled += copied;
- if (copied < length)
- return filled;
- byteBuffers.poll();
+ while (true)
+ {
+ RetainableByteBuffer data = byteBuffers.peek();
+ if (data == null)
+ return filled;
+ if (data == EOF)
+ return filled > 0 ? filled : -1;
+
+ int space = buffer.remaining();
+ if (space == 0)
+ return filled;
+
+ int available = data.remaining();
+ int toCopy = Math.min(space, available);
+
+ if (toCopy == available)
+ {
+ // Copy all and consume
+ data.putTo(buffer);
+ data.release();
+ byteBuffers.poll();
+ }
+ else
+ {
+ // Partial copy using slice
+ RetainableByteBuffer slice = data.slice(toCopy);
+ slice.putTo(buffer);
+ slice.release();
+ data.skip(toCopy);
+ }
+
+ capacity -= toCopy;
+ filled += toCopy;
+ }
}
}
+ finally
+ {
+ BufferUtil.flipToFlush(buffer, pos);
+ }
}
private void onFilled()
@@ -247,7 +285,7 @@ public boolean flush(ByteBuffer... buffers) throws IOException
// The buffer must be copied, otherwise a write() would complete
// and return it to the buffer pool where its backing store would
// be overwritten before it is read by the peer EndPoint.
- ByteBuffer copy = lockedCopy(buffer);
+ RetainableByteBuffer copy = lockedCopy(buffer);
if (copy == null)
{
result = false;
@@ -277,7 +315,14 @@ public boolean flush(ByteBuffer... buffers) throws IOException
return result;
}
- private ByteBuffer lockedCopy(ByteBuffer buffer)
+ /**
+ * Creates a copy of data from the source buffer, respecting maxCapacity.
+ * Uses {@link RetainableByteBuffer.DynamicCapacity} for efficient buffer management.
+ *
+ * @param buffer the source buffer to copy from
+ * @return a RetainableByteBuffer containing the copied data, or null if at capacity
+ */
+ private RetainableByteBuffer lockedCopy(ByteBuffer buffer)
{
int length = buffer.remaining();
long maxCapacity = getMaxCapacity();
@@ -288,10 +333,22 @@ private ByteBuffer lockedCopy(ByteBuffer buffer)
return null;
length = (int)Math.min(length, space);
}
- // TODO: Use RetainableByteBuffer.DynamicCapacity in Jetty 12.1.x.
- ByteBuffer copy = buffer.isDirect() ? ByteBuffer.allocateDirect(length) : ByteBuffer.allocate(length);
- copy.put(0, buffer, buffer.position(), length);
- buffer.position(buffer.position() + length);
+
+ // Use DynamicCapacity to efficiently copy the data
+ RetainableByteBuffer.DynamicCapacity copy = new RetainableByteBuffer.DynamicCapacity();
+ if (length < buffer.remaining())
+ {
+ // Partial copy: temporarily reduce limit to copy only 'length' bytes,
+ // then restore the original limit so caller sees remaining data.
+ int limit = buffer.limit();
+ buffer.limit(buffer.position() + length);
+ copy.append(buffer);
+ buffer.limit(limit);
+ }
+ else
+ {
+ copy.append(buffer);
+ }
return copy;
}
@@ -312,7 +369,7 @@ protected void doClose()
super.doClose();
try (AutoLock ignored = lock.lock())
{
- ByteBuffer last = byteBuffers.peekLast();
+ RetainableByteBuffer last = byteBuffers.peekLast();
if (last != EOF)
byteBuffers.offer(EOF);
}
diff --git a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/MemoryEndPointPipeTest.java b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/MemoryEndPointPipeTest.java
index cb872f576e67..4757d6043a8d 100644
--- a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/MemoryEndPointPipeTest.java
+++ b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/MemoryEndPointPipeTest.java
@@ -13,6 +13,7 @@
package org.eclipse.jetty.io;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
@@ -27,6 +28,8 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class MemoryEndPointPipeTest
@@ -147,4 +150,189 @@ else if (filled == 0)
assertThat(totalFilled, equalTo(totalWritten));
}
+
+ @Test
+ public void testEofAfterAllDataConsumed() throws Exception
+ {
+ MemoryEndPointPipe pipe = new MemoryEndPointPipe(scheduler, executor::execute, null);
+
+ EndPoint localEndPoint = pipe.getLocalEndPoint();
+ EndPoint remoteEndPoint = pipe.getRemoteEndPoint();
+
+ // Register fill interest before writing
+ Callback.Completable remoteFillCallback = new Callback.Completable();
+ remoteEndPoint.fillInterested(remoteFillCallback);
+
+ // Write some data
+ byte[] data = new byte[100];
+ Arrays.fill(data, (byte)42);
+ Blocker.Shared blocker = new Blocker.Shared();
+ try (Blocker.Callback callback = blocker.callback())
+ {
+ localEndPoint.write(callback, ByteBuffer.wrap(data));
+ callback.block();
+ }
+
+ // Shutdown output to signal EOF
+ localEndPoint.shutdownOutput();
+
+ // Wait for data to be available
+ remoteFillCallback.get(5, TimeUnit.SECONDS);
+
+ // Read all data first
+ RetainableByteBuffer buffer = buffers.acquire(200, false);
+ ByteBuffer readBuffer = buffer.getByteBuffer();
+ int totalFilled = 0;
+ int filled = remoteEndPoint.fill(readBuffer);
+ while (filled > 0)
+ {
+ readBuffer.position(readBuffer.position() + filled);
+ totalFilled += filled;
+ filled = remoteEndPoint.fill(readBuffer);
+ }
+
+ // Verify all data was read
+ assertThat(totalFilled, equalTo(data.length));
+
+ // Verify EOF is returned after all data consumed
+ assertThat(filled, equalTo(-1));
+ }
+
+ @Test
+ public void testShutdownOutput() throws Exception
+ {
+ MemoryEndPointPipe pipe = new MemoryEndPointPipe(scheduler, executor::execute, null);
+
+ EndPoint localEndPoint = pipe.getLocalEndPoint();
+ EndPoint remoteEndPoint = pipe.getRemoteEndPoint();
+
+ // Shutdown output without writing any data
+ localEndPoint.shutdownOutput();
+
+ // Remote endpoint should get EOF immediately
+ ByteBuffer readBuffer = ByteBuffer.allocate(100);
+ int filled = remoteEndPoint.fill(readBuffer);
+ assertThat(filled, equalTo(-1));
+
+ // Verify local endpoint is output shutdown
+ assertTrue(localEndPoint.isOutputShutdown());
+ }
+
+ @Test
+ public void testCloseEndpoint() throws Exception
+ {
+ MemoryEndPointPipe pipe = new MemoryEndPointPipe(scheduler, executor::execute, null);
+
+ EndPoint localEndPoint = pipe.getLocalEndPoint();
+ EndPoint remoteEndPoint = pipe.getRemoteEndPoint();
+
+ // Register fill interest before writing
+ Callback.Completable remoteFillCallback = new Callback.Completable();
+ remoteEndPoint.fillInterested(remoteFillCallback);
+
+ // Write some data
+ byte[] data = new byte[50];
+ Blocker.Shared blocker = new Blocker.Shared();
+ try (Blocker.Callback callback = blocker.callback())
+ {
+ localEndPoint.write(callback, ByteBuffer.wrap(data));
+ callback.block();
+ }
+
+ // Close local endpoint
+ localEndPoint.close();
+
+ // Verify local endpoint is closed
+ assertFalse(localEndPoint.isOpen());
+
+ // Wait for data to be available
+ remoteFillCallback.get(5, TimeUnit.SECONDS);
+
+ // Remote endpoint should still be able to read existing data
+ RetainableByteBuffer buffer = buffers.acquire(100, false);
+ ByteBuffer readBuffer = buffer.getByteBuffer();
+ int filled = remoteEndPoint.fill(readBuffer);
+ assertThat(filled, equalTo(data.length));
+
+ // Advance buffer position after reading
+ readBuffer.position(readBuffer.position() + filled);
+
+ // After reading all data, should get EOF
+ filled = remoteEndPoint.fill(readBuffer);
+ assertThat(filled, equalTo(-1));
+ }
+
+ @Test
+ public void testFillOnClosedEndpoint() throws Exception
+ {
+ MemoryEndPointPipe pipe = new MemoryEndPointPipe(scheduler, executor::execute, null);
+
+ EndPoint localEndPoint = pipe.getLocalEndPoint();
+ EndPoint remoteEndPoint = pipe.getRemoteEndPoint();
+
+ // Close remote endpoint
+ remoteEndPoint.close();
+
+ // fill() on closed endpoint should throw IOException
+ ByteBuffer readBuffer = ByteBuffer.allocate(100);
+ assertThrows(IOException.class, () -> remoteEndPoint.fill(readBuffer));
+ }
+
+ @Test
+ public void testFlushOnClosedEndpoint() throws Exception
+ {
+ MemoryEndPointPipe pipe = new MemoryEndPointPipe(scheduler, executor::execute, null);
+
+ EndPoint localEndPoint = pipe.getLocalEndPoint();
+
+ // Close local endpoint
+ localEndPoint.close();
+
+ // flush() on closed endpoint should throw IOException
+ ByteBuffer writeBuffer = ByteBuffer.wrap(new byte[50]);
+ assertThrows(IOException.class, () -> localEndPoint.flush(writeBuffer));
+ }
+
+ @Test
+ public void testSmallCapacityPartialFlush() throws Exception
+ {
+ MemoryEndPointPipe pipe = new MemoryEndPointPipe(scheduler, executor::execute, null);
+ // Set a small capacity that is less than the data we want to write
+ pipe.setLocalEndPointMaxCapacity(20);
+
+ EndPoint localEndPoint = pipe.getLocalEndPoint();
+
+ // Try to flush more data than capacity allows
+ ByteBuffer writeBuffer = ByteBuffer.wrap(new byte[50]);
+ boolean flushed = localEndPoint.flush(writeBuffer);
+ assertFalse(flushed);
+
+ // Only 20 bytes should have been flushed, 30 remaining
+ assertThat(writeBuffer.remaining(), equalTo(30));
+ }
+
+ @Test
+ public void testEmptyBufferFlush() throws Exception
+ {
+ MemoryEndPointPipe pipe = new MemoryEndPointPipe(scheduler, executor::execute, null);
+
+ EndPoint localEndPoint = pipe.getLocalEndPoint();
+ EndPoint remoteEndPoint = pipe.getRemoteEndPoint();
+
+ // Flush empty buffers
+ ByteBuffer emptyBuffer = ByteBuffer.allocate(0);
+ boolean flushed = localEndPoint.flush(emptyBuffer);
+ assertTrue(flushed);
+
+ // Flush with position == limit (consumed buffer)
+ ByteBuffer consumedBuffer = ByteBuffer.allocate(50);
+ consumedBuffer.position(consumedBuffer.limit());
+ flushed = localEndPoint.flush(consumedBuffer);
+ assertTrue(flushed);
+
+ // Remote should have no data to read
+ ByteBuffer readBuffer = ByteBuffer.allocate(100);
+ int filled = remoteEndPoint.fill(readBuffer);
+ assertThat(filled, equalTo(0));
+ }
}