Skip to content

Commit 023f74c

Browse files
committed
Fixed an issue in AWS CRT-based S3 client where a GetObject request may hang if streaming failed mid request
1 parent ffc9ccc commit 023f74c

File tree

6 files changed

+134
-37
lines changed

6 files changed

+134
-37
lines changed

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapter.java

+16
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ private S3MetaRequestWrapper s3MetaRequest() {
111111

112112
@Override
113113
public void onResponseHeaders(int statusCode, HttpHeader[] headers) {
114+
log.debug(() -> "Received response header with status code " + statusCode);
114115
// Note, we cannot call responseHandler.onHeaders() here because the response status code and headers may not represent
115116
// whether the request has succeeded or not (e.g. if this is for a HeadObject call that CRT calls under the hood). We
116117
// need to rely on onResponseBody/onFinished being called to determine this.
@@ -150,6 +151,7 @@ public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long ob
150151
@Override
151152
public void onFinished(S3FinishedResponseContext context) {
152153
int crtCode = context.getErrorCode();
154+
log.debug(() -> "Request finished with code: " + crtCode);
153155
if (crtCode != CRT.AWS_CRT_SUCCESS) {
154156
handleError(context);
155157
} else {
@@ -192,6 +194,19 @@ private void handleIoError(S3FinishedResponseContext context, int crtCode) {
192194
SdkClientException.create("Failed to send the request: " +
193195
CRT.awsErrorString(crtCode), cause);
194196
failResponseHandlerAndFuture(sdkClientException);
197+
notifyResponsePublisherErrorIfNeeded(sdkClientException);
198+
}
199+
200+
private void notifyResponsePublisherErrorIfNeeded(Throwable error) {
201+
if (responseHandlingInitiated) {
202+
responsePublisher.error(error).handle((ignore, throwable) -> {
203+
if (throwable != null) {
204+
log.warn(() -> "Exception thrown in responsePublisher#error, ignoring", throwable);
205+
return null;
206+
}
207+
return null;
208+
});
209+
}
195210
}
196211

197212
private void handleServiceError(int responseStatus, HttpHeader[] headers, byte[] errorPayload) {
@@ -204,6 +219,7 @@ private void handleServiceError(int responseStatus, HttpHeader[] headers, byte[]
204219
SdkClientException.create("Request failed during the transfer due to an error returned from S3");
205220
s3Exception.addSuppressed(sdkClientException);
206221
failResponseHandlerAndFuture(s3Exception);
222+
notifyResponsePublisherErrorIfNeeded(s3Exception);
207223
} else {
208224
initiateResponseHandling(errorResponse.build());
209225
onErrorResponseComplete(errorPayload);

services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtClientWiremockTest.java

+74
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import static software.amazon.awssdk.core.http.HttpResponseHandler.X_AMZN_REQUEST_ID_HEADER_ALTERNATE;
3131
import static software.amazon.awssdk.core.http.HttpResponseHandler.X_AMZ_ID_2_HEADER;
3232

33+
import com.github.tomakehurst.wiremock.http.Fault;
3334
import com.github.tomakehurst.wiremock.http.HttpHeader;
3435
import com.github.tomakehurst.wiremock.http.HttpHeaders;
3536
import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
@@ -46,7 +47,9 @@
4647
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
4748
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
4849
import software.amazon.awssdk.core.ResponseBytes;
50+
import software.amazon.awssdk.core.ResponseInputStream;
4951
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
52+
import software.amazon.awssdk.core.exception.SdkClientException;
5053
import software.amazon.awssdk.crt.CrtResource;
5154
import software.amazon.awssdk.crt.Log;
5255
import software.amazon.awssdk.regions.Region;
@@ -149,6 +152,77 @@ public void requestFailedMidway_shouldThrowException() {
149152
});
150153
}
151154

155+
@Test
156+
public void toBlockingInputStream_requestFailedMidwayDueToServerError_shouldThrowException() {
157+
HttpHeaders httpHeaders = new HttpHeaders(new HttpHeader("content-length", "12345676"),
158+
new HttpHeader("etag", E_TAG));
159+
stubFor(head(anyUrl()).willReturn(aResponse().withStatus(200)
160+
.withHeaders(httpHeaders)));
161+
162+
stubFor(get(anyUrl())
163+
.inScenario("SucceedThenFail")
164+
.whenScenarioStateIs(Scenario.STARTED)
165+
.willSetStateTo("first request")
166+
.willReturn(aResponse()
167+
.withStatus(200)
168+
.withBody("helloworld".getBytes(StandardCharsets.UTF_8))));
169+
170+
stubFor(get(anyUrl())
171+
.inScenario("SucceedThenFail")
172+
.whenScenarioStateIs("first request")
173+
.willSetStateTo("second request")
174+
.willReturn(aResponse()
175+
.withStatus(404)
176+
.withHeader(X_AMZ_ID_2_HEADER, "foo")
177+
.withHeader(X_AMZN_REQUEST_ID_HEADER_ALTERNATE, "bar")
178+
.withBody("".getBytes(StandardCharsets.UTF_8))));
179+
ResponseInputStream<GetObjectResponse> stream = s3AsyncClient.getObject(
180+
r -> r.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toBlockingInputStream())
181+
.join();
182+
byte[] buffer = new byte[1024 * 8];
183+
assertThatThrownBy(() -> stream.read(buffer, 0, buffer.length))
184+
.satisfies(throwable -> {
185+
assertThat(throwable).isInstanceOf(S3Exception.class);
186+
S3Exception s3Exception = (S3Exception) throwable;
187+
assertThat(s3Exception.statusCode()).isEqualTo(404);
188+
assertThat(s3Exception.extendedRequestId()).isEqualTo("foo");
189+
assertThat(s3Exception.requestId()).isEqualTo("bar");
190+
});
191+
}
192+
193+
@Test
194+
public void toBlockingInputStream_requestFailedMidwayDueToIoError_shouldThrowException() {
195+
HttpHeaders httpHeaders = new HttpHeaders(new HttpHeader("content-length", "12345676"),
196+
new HttpHeader("etag", E_TAG));
197+
stubFor(head(anyUrl()).willReturn(aResponse().withStatus(200)
198+
.withHeaders(httpHeaders)));
199+
200+
stubFor(get(anyUrl())
201+
.inScenario("SucceedThenFail")
202+
.whenScenarioStateIs(Scenario.STARTED)
203+
.willSetStateTo("first request")
204+
.willReturn(aResponse()
205+
.withStatus(200)
206+
.withBody("helloworld".getBytes(StandardCharsets.UTF_8))));
207+
208+
stubFor(get(anyUrl())
209+
.inScenario("SucceedThenFail")
210+
.whenScenarioStateIs("first request")
211+
.willSetStateTo("second request")
212+
.willReturn(aResponse()
213+
.withFault(Fault.RANDOM_DATA_THEN_CLOSE)));
214+
ResponseInputStream<GetObjectResponse> stream = s3AsyncClient.getObject(
215+
r -> r.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toBlockingInputStream())
216+
.join();
217+
byte[] buffer = new byte[1024 * 8];
218+
assertThatThrownBy(() -> stream.read(buffer, 0, buffer.length))
219+
.satisfies(throwable -> {
220+
assertThat(throwable).isInstanceOf(SdkClientException.class);
221+
SdkClientException exception = (SdkClientException) throwable;
222+
assertThat(exception.getMessage()).contains("Failed to send the request");
223+
});
224+
}
225+
152226
@Test
153227
void overrideResponseCompletionExecutor_shouldCompleteWithCustomExecutor(WireMockRuntimeInfo wiremock) {
154228

services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapterTest.java

+38-7
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.mockito.Mockito;
4141
import org.mockito.junit.MockitoJUnitRunner;
4242
import org.reactivestreams.Publisher;
43+
import org.reactivestreams.Subscriber;
4344
import software.amazon.awssdk.core.async.DrainingSubscriber;
4445
import software.amazon.awssdk.core.exception.SdkClientException;
4546
import software.amazon.awssdk.crt.http.HttpHeader;
@@ -173,14 +174,33 @@ public void requestFailedMidwayDueToServerError_shouldCompleteFutureWithS3Except
173174
when(errorContext.getErrorHeaders()).thenReturn(headers.toArray(new HttpHeader[0]));
174175

175176
responseHandlerAdapter.onFinished(errorContext);
176-
Throwable actualException = sdkResponseHandler.error;
177-
assertThat(actualException).isInstanceOf(S3Exception.class);
177+
Throwable exceptionFromResponseHandler = sdkResponseHandler.error;
178+
Throwable exceptionFromSubscriber = sdkResponseHandler.subscriber.error;
178179

179-
assertThat(((S3Exception) actualException).statusCode()).isEqualTo(404);
180-
assertThat(((S3Exception) actualException).requestId()).isEqualTo("1234");
181-
assertThat(((S3Exception) actualException).extendedRequestId()).isEqualTo("5678");
180+
assertThat(exceptionFromResponseHandler).isInstanceOf(S3Exception.class);
181+
assertThat(((S3Exception) exceptionFromResponseHandler).statusCode()).isEqualTo(404);
182+
assertThat(((S3Exception) exceptionFromResponseHandler).requestId()).isEqualTo("1234");
183+
assertThat(((S3Exception) exceptionFromResponseHandler).extendedRequestId()).isEqualTo("5678");
184+
assertThat(exceptionFromResponseHandler).isEqualTo(exceptionFromSubscriber);
182185

183-
assertThatThrownBy(() -> future.join()).hasRootCause(actualException);
186+
assertThatThrownBy(() -> future.join()).hasRootCause(exceptionFromResponseHandler);
187+
assertThat(future).isCompletedExceptionally();
188+
verify(s3MetaRequest).close();
189+
}
190+
191+
@Test
192+
public void requestFailedMidwayDueToIoError_shouldInvokeOnError() {
193+
responseHandlerAdapter.onResponseHeaders(200, new HttpHeader[0]);
194+
responseHandlerAdapter.onResponseBody(ByteBuffer.wrap("helloworld".getBytes(StandardCharsets.UTF_8)), 0, 0);
195+
196+
S3FinishedResponseContext errorContext = stubResponseContext(1079, 0, "".getBytes());
197+
responseHandlerAdapter.onFinished(errorContext);
198+
Throwable exceptionFromResponseHandler = sdkResponseHandler.error;
199+
Throwable exceptionFromSubscriber = sdkResponseHandler.subscriber.error;
200+
201+
assertThat(exceptionFromResponseHandler).isEqualTo(exceptionFromSubscriber);
202+
assertThat(exceptionFromResponseHandler).isInstanceOf(SdkClientException.class);
203+
assertThatThrownBy(() -> future.join()).hasRootCause(exceptionFromResponseHandler);
184204
assertThat(future).isCompletedExceptionally();
185205
verify(s3MetaRequest).close();
186206
}
@@ -217,19 +237,30 @@ private void stubOnResponseBody() {
217237
private static class TestResponseHandler implements SdkAsyncHttpResponseHandler {
218238
private SdkHttpResponse sdkHttpResponse;
219239
private Throwable error;
240+
private TestSubscriber subscriber = new TestSubscriber();
241+
220242
@Override
221243
public void onHeaders(SdkHttpResponse headers) {
222244
this.sdkHttpResponse = headers;
223245
}
224246

225247
@Override
226248
public void onStream(Publisher<ByteBuffer> stream) {
227-
stream.subscribe(new DrainingSubscriber<>());
249+
stream.subscribe(subscriber);
228250
}
229251

230252
@Override
231253
public void onError(Throwable error) {
232254
this.error = error;
233255
}
234256
}
257+
258+
private static class TestSubscriber extends DrainingSubscriber {
259+
private Throwable error;
260+
@Override
261+
public void onError(Throwable throwable) {
262+
error = throwable;
263+
super.onError(throwable);
264+
}
265+
}
235266
}

0 commit comments

Comments
 (0)