Skip to content

Commit

Permalink
Merge pull request #75 from anuraaga/work
Browse files Browse the repository at this point in the history
Pass through error-code http responses for raw http clients. Allows c…
  • Loading branch information
trustin committed Dec 22, 2015
2 parents 85970bd + 3a59c79 commit 751df2c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 19 deletions.
43 changes: 24 additions & 19 deletions src/main/java/com/linecorp/armeria/client/HttpSessionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,26 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
} else if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;

final Promise<FullHttpResponse> promise = waitsHolder.poll(response);
final Invocation invocation = waitsHolder.poll(response);
final SerializationFormat serializationFormat =
invocation.invocationContext().scheme().serializationFormat();

if (promise != null) {
if (invocation != null) {
try {
if (HttpStatusClass.SUCCESS == response.status().codeClass()) {
promise.setSuccess(response.retain());
if (HttpStatusClass.SUCCESS == response.status().codeClass()
// No serialization indicates a raw HTTP protocol which should
// have error responses returned.
|| serializationFormat == SerializationFormat.NONE) {
invocation.resultPromise().setSuccess(response.retain());
} else {
promise.setFailure(new InvalidResponseException(
invocation.resultPromise().setFailure(new InvalidResponseException(
"HTTP Response code: " + response.status()));
}
} finally {
ReferenceCountUtil.release(msg);
}
} else {
//if promise not found, we just bypass message to next
//if invocation not found, we just bypass message to next
ctx.fireChannelRead(msg);
}

Expand Down Expand Up @@ -139,10 +144,10 @@ void deactivateSession() {

private void failPendingResponses(Throwable e) {
active = false;
final Collection<Promise<FullHttpResponse>> resultPromises = waitsHolder.getAll();
final Collection<Invocation> invocations = waitsHolder.getAll();
waitsHolder.clear();
if (!resultPromises.isEmpty()) {
resultPromises.forEach(promise -> promise.tryFailure(e));
if (!invocations.isEmpty()) {
invocations.forEach(invocation -> invocation.resultPromise().tryFailure(e));
}
}

Expand All @@ -159,11 +164,11 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}

private interface WaitsHolder {
Promise<FullHttpResponse> poll(FullHttpResponse response);
Invocation poll(FullHttpResponse response);

void put(Invocation invocation, FullHttpRequest request);

Collection<Promise<FullHttpResponse>> getAll();
Collection<Invocation> getAll();

void clear();

Expand All @@ -177,24 +182,24 @@ default boolean isEmpty() {
}

private static class SequentialWaitsHolder implements WaitsHolder {
private final Queue<Promise<FullHttpResponse>> requestExpectQueue;
private final Queue<Invocation> requestExpectQueue;

SequentialWaitsHolder() {
requestExpectQueue = new ArrayDeque<>();
}

@Override
public Promise<FullHttpResponse> poll(FullHttpResponse response) {
public Invocation poll(FullHttpResponse response) {
return requestExpectQueue.poll();
}

@Override
public void put(Invocation invocation, FullHttpRequest request) {
requestExpectQueue.add(invocation.resultPromise());
requestExpectQueue.add(invocation);
}

@Override
public Collection<Promise<FullHttpResponse>> getAll() {
public Collection<Invocation> getAll() {
return requestExpectQueue;
}

Expand All @@ -205,7 +210,7 @@ public void clear() {
}

private static class MultiplexWaitsHolder implements WaitsHolder {
private final IntObjectMap<Promise<FullHttpResponse>> resultExpectMap;
private final IntObjectMap<Invocation> resultExpectMap;
private int streamId;

MultiplexWaitsHolder() {
Expand All @@ -214,7 +219,7 @@ private static class MultiplexWaitsHolder implements WaitsHolder {
}

@Override
public Promise<FullHttpResponse> poll(FullHttpResponse response) {
public Invocation poll(FullHttpResponse response) {
int streamID = response.headers().getInt(ExtensionHeaderNames.STREAM_ID.text(), 0);
return resultExpectMap.remove(streamID);
}
Expand All @@ -223,11 +228,11 @@ public Promise<FullHttpResponse> poll(FullHttpResponse response) {
public void put(Invocation invocation, FullHttpRequest request) {
int streamId = nextStreamID();
request.headers().add(ExtensionHeaderNames.STREAM_ID.text(), streamIdToString(streamId));
resultExpectMap.put(streamId, invocation.resultPromise());
resultExpectMap.put(streamId, invocation);
}

@Override
public Collection<Promise<FullHttpResponse>> getAll() {
public Collection<Invocation> getAll() {
return resultExpectMap.values();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ public class SimpleHttpClientIntegrationTest {
response.headers().set(HttpHeaderNames.CACHE_CONTROL, "alwayscache");
promise.setSuccess(response);
}));
sb.serviceAt("/not200", new HttpService((ctx, executor, promise) -> {
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
promise.setSuccess(response);
}));
} catch (Exception e) {
throw new Error(e);
}
Expand Down Expand Up @@ -131,4 +136,13 @@ public void testRequestWithBody() throws Exception {
assertEquals("METHOD: POST|ACCEPT: utf-8|BODY: requestbody日本語",
new String(response.content(), StandardCharsets.UTF_8));
}

@Test
public void testNot200() throws Exception {
SimpleHttpClient client = Clients.newClient(remoteInvokerFactory, "none+http://127.0.0.1:" + httpPort,
SimpleHttpClient.class);
SimpleHttpRequest request = SimpleHttpRequestBuilder.forGet("/not200").build();
SimpleHttpResponse response = client.execute(request).get();
assertEquals(HttpResponseStatus.NOT_FOUND, response.status());
}
}

0 comments on commit 751df2c

Please sign in to comment.