Skip to content

Commit

Permalink
Bug fix: corrected the state management in the async HTTP/1.1 protoco…
Browse files Browse the repository at this point in the history
…l handler when committing requests asynchronously
  • Loading branch information
ok2c committed Feb 17, 2025
1 parent b22dc74 commit 7274811
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.NHttpMessageParser;
import org.apache.hc.core5.http.nio.NHttpMessageWriter;
import org.apache.hc.core5.http.nio.RequestChannel;
import org.apache.hc.core5.http.nio.ResponseChannel;
import org.apache.hc.core5.http.nio.SessionOutputBuffer;
import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
Expand Down Expand Up @@ -2037,4 +2038,99 @@ protected void writeHeadLine(final HttpRequest message, final CharArrayBuffer li
Assertions.assertEquals(505, response.getCode());
}

@Test
void testDelayedRequestSubmission() throws Exception {
final Http1TestServer server = resources.server();
final Http1TestClient client = resources.client();

server.register("/hello", () -> new SingleLineResponseHandler("All is well"));
final InetSocketAddress serverEndpoint = server.start();

client.start();

final Future<ClientSessionEndpoint> connectFuture = client.connect(
"localhost", serverEndpoint.getPort(), TIMEOUT);
final ClientSessionEndpoint streamEndpoint = connectFuture.get();

final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
for (int i = 0; i < 5; i++) {
final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
final AsyncEntityProducer entityProducer = AsyncEntityProducers.create("Some important message");
queue.add(streamEndpoint.execute(
new AsyncRequestProducer() {

private final Random random = new Random(System.currentTimeMillis());
private final ReentrantLock lock = new ReentrantLock();

@Override
public void sendRequest(final RequestChannel channel, final HttpContext context) throws HttpException, IOException {
executorResource.getExecutorService().execute(() -> {
try {
Thread.sleep(random.nextInt(200));
lock.lock();
try {
channel.sendRequest(request, entityProducer, context);
} finally {
lock.unlock();
}
} catch (final Exception ignore) {
// ignore
}
});
}

@Override
public boolean isRepeatable() {
lock.lock();
try {
return entityProducer.isRepeatable();
} finally {
lock.unlock();
}
}

@Override
public int available() {
lock.lock();
try {
return entityProducer.available();
} finally {
lock.unlock();
}
}

@Override
public void produce(final DataStreamChannel channel) throws IOException {
lock.lock();
try {
entityProducer.produce(channel);
} finally {
lock.unlock();
}
}

@Override
public void failed(final Exception cause) {
entityProducer.failed(cause);
}

@Override
public void releaseResources() {
entityProducer.releaseResources();
}

},
new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
}
while (!queue.isEmpty()) {
final Future<Message<HttpResponse, String>> future = queue.remove();
final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
Assertions.assertNotNull(result);
final HttpResponse response = result.getHead();
Assertions.assertNotNull(response);
Assertions.assertEquals(200, response.getCode());
Assertions.assertEquals("All is well", result.getBody());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ abstract ContentEncoder createContentEncoder(

abstract boolean isOutputReady();

abstract boolean isRequestInitiated();

abstract void produceOutput() throws HttpException, IOException;

abstract void execute(RequestExecutionCommand executionCommand) throws HttpException, IOException;
Expand Down Expand Up @@ -379,7 +381,7 @@ public final void onOutput() throws IOException, HttpException {
} else {
outputRequests.addAndGet(-pendingOutputRequests);
}
outputEnd = outgoingMessage == null && !outbuf.hasData();
outputEnd = outgoingMessage == null && !outbuf.hasData() && !isRequestInitiated();
} finally {
ioSession.getLock().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,11 @@ protected ContentEncoder createContentEncoder(
}
}

@Override
boolean isRequestInitiated() {
return outgoing != null && !outgoing.isRequestFinal();
}

@Override
boolean inputIdle() {
return incoming == null && pipeline.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ public void endStream() throws IOException {
this.responseState = MessageState.HEADERS;
}

boolean isRequestFinal() {
return requestState == MessageState.COMPLETE;
}

boolean isResponseFinal() {
return responseState == MessageState.COMPLETE;
}
Expand All @@ -137,8 +141,10 @@ String getRequestMethod() {
boolean isOutputReady() {
switch (requestState) {
case IDLE:
case ACK:
return true;
case HEADERS:
case ACK:
return false;
case BODY:
return exchangeHandler.available() > 0;
default:
Expand Down Expand Up @@ -186,6 +192,7 @@ void produceOutput() throws HttpException, IOException {
switch (requestState) {
case IDLE:
requestState = MessageState.HEADERS;
outputChannel.suspendOutput();
exchangeHandler.produceRequest((request, entityDetails, httpContext) -> commitRequest(request, entityDetails), context);
break;
case ACK:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,11 @@ void execute(final RequestExecutionCommand executionCommand) throws HttpExceptio
throw new HttpException("Illegal command: " + executionCommand.getClass());
}

@Override
boolean isRequestInitiated() {
return false;
}

@Override
boolean isOutputReady() {
return outgoing != null && outgoing.isOutputReady();
Expand Down

0 comments on commit 7274811

Please sign in to comment.