-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
API, AWS: Retry S3InputStream reads #10433
base: main
Are you sure you want to change the base?
API, AWS: Retry S3InputStream reads #10433
Conversation
096a4f1
to
f4788e7
Compare
public void testReadWithFuzzyStreamRetrySucceed(IOException exception) throws Exception { | ||
testRead( | ||
fuzzyStreamClient(new AtomicInteger(3), exception), new S3FileIOProperties(), DATA_SIZE); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test takes way too long. That's primarily because it internally tests retrying on just read() (the non-buffered reads) which means every byte read will fail and be retried 2 times with a 500 ms in between. So essentially that's a second per byte.
I think what we can do is modularize further and only do buffered read tests + with a much smaller data size test the per byte read to exercise that code path in the tests. The buffered read tests are pretty fast.
import software.amazon.awssdk.services.s3.model.PutObjectRequest; | ||
import software.amazon.awssdk.services.s3.model.PutObjectResponse; | ||
|
||
public class TestFuzzyS3InputStream extends TestS3InputStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copied a lot of the test logic from https://github.com/apache/iceberg/pull/4912/files, will mark @jackye1995 as coauthor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw @xiaoxuandev also had the same tests in #8221 so I'm marking her as co-author here as well
return s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream()); | ||
stream = | ||
RetryableInputStream.builderFor( | ||
() -> s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any failures that occur on the retry during the getObject request (not the stream read) should just be handled by the SDK. Don't think we need to add anything custom for that since the S3 client is already pluggable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like there's an issue here. When the stream gets recreated, the stream will reset back to the original position and we continue from there as if we're at the right place in the stream.
The pos
won't reflect the new position of the stream, if I'm reading this correctly. I would think the following retry would need to start from next
to reflect where the next read should start. There's a small problem with the single byte read method because we increment the positions prior to read, so that would likely need to be adjusted to be after the read like in the range read method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll double check this, I think you're right although I'm not sure why the first test case wouldn't surface that (since the content would be different). We may need to seek properly on the input stream initialization during retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I revisited this PR, and took a look with fresh eyes and yes the current logic is definitely incorrect for the case where a non range read (readFully, readTail perform range reads) is performed.
For the range-reads we don't really care about the current position for the purpose of tracking in the retryable input stream. But for normal seeking based reads we definitely do!
I think the way to solve this is to pass a supplier of the current position to the retryable input stream. That supplier would have a reference to this
and the stream provider would be a function which accepts a position. Upon retries the stream provider would open a new connection and open a stream that begins with the position that the position supplier (which is guaranteed to be the correct position to start the stream from) returns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated this, the RetryableInputStream offers two builder APIs, one for specifying just a new stream initialization and one for a stream initialization plus a position supplier. The stream initialization function takes in a position (the position can be null to handle the range based requests since for range reads with explicit begin/end we don't care about the current position in the stream). cc @danielcweeks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed with @danielcweeks , for the range read cases since we're using IOUtil readFully/readRemaining which will read the range in a buffered manner. On retries we would read from the beginning position but the internal stream tracking in readFully/readRemaining would not reset to the right position in the buffer to read so that's still an issue.
What we can do is to just not retry the RangeReadable methods for now since they're not actually exercised anywhere. Down the line, we could just use FailSafe and retry on the whole method.
@@ -37,6 +37,7 @@ delta-standalone = "3.1.0" | |||
delta-spark = "3.2.0" | |||
esotericsoftware-kryo = "4.0.3" | |||
errorprone-annotations = "2.27.0" | |||
failsafe = "3.3.2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This dependency is quite nice in that it's 0 dependency itself, has Apache licensing and I think there's more use cases in Iceberg to leverage it. For example, I think a lot of the complex logic in Tasks can be simplified.
Furthermore, there's some custom retry logic in JDBC connector which we couldn't use tasks for, but now we could use Failsafe. Wonder what others think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this a lot
private List<Class<? extends Exception>> retryableExceptions = | ||
ImmutableList.of(SSLException.class, SocketTimeoutException.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[QQ]
- Should we retry this 5xx errors like 503 ?
- will it make sense to make this configurable by any chance ?
- Also any thoughts about outputstream ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@singhpk234 I believe the SDK already handles 503 errors and other errors that happen during the initiation of the request. I believe output stream is different (I don't think we've heard of issues on that side), but I would handle that in a separate PR if necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the SDK already handles 503 errors and other errors that happen during the initiation of the request
yup, they do, but we have defaults of SDK at the moment right ? wondering if we should either expose conf's to overwrite it or control it via here.
Also was wondering if we can go as exhaustive a retry policy and have it at one place, as this : https://github.com/apache/hadoop/blob/b108e9e2d814cc009c4b663799e056a65bf24766/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java#L193
In past we have been reached for tickets like this : #9124 (comment)
I believe output stream is different (I don't think we've heard of issues on that side), but I would handle that in a separate PR if necessary
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really sorry, I raised this draft and never followed through:
@singhpk234 You're correct that Iceberg itself does not have a configuration to overwrite the default SDK retry policy. I'd say that's separate to this and wouldn't club the two together imo. But my general thought is that AwsProperties has gotten a bit complex with all the knobs that are exposed, and I think in Iceberg we'd want to avoid the complexity that Hadoop s3a gets into. It's a classic "knobs enable users control" vs "simple/reducing maintenance burden" problem and personally I think Iceberg should attempt to be in the latter category. At least for the 503 retry case, users can already provide a s3client that's configured with their desired retry policy.
On retrying outputstream, I'd say we can address that separately and when it becomes an issue. All the cases, I've so far have really just been on the read side.
When will this merged? I'm getting this issue while reading iceberg tables in glue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When will this merged? I'm getting this issue while reading iceberg tables in glue.
Sorry about that @SandeepSinghGahir , I drafted this PR and never followed through. I just took a pass and determined next steps so that we can take this to completion since I know a few folks are hitting this issue and I think it's reasonable that Iceberg S3 input stream has some level of retries when reading from the stream.
I'll also mention, it's really important for changes like this which are on the critical path that they are well tested so I'm also thinking through the test cases as well.
return s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream()); | ||
stream = | ||
RetryableInputStream.builderFor( | ||
() -> s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I revisited this PR, and took a look with fresh eyes and yes the current logic is definitely incorrect for the case where a non range read (readFully, readTail perform range reads) is performed.
For the range-reads we don't really care about the current position for the purpose of tracking in the retryable input stream. But for normal seeking based reads we definitely do!
I think the way to solve this is to pass a supplier of the current position to the retryable input stream. That supplier would have a reference to this
and the stream provider would be a function which accepts a position. Upon retries the stream provider would open a new connection and open a stream that begins with the position that the position supplier (which is guaranteed to be the correct position to start the stream from) returns.
private List<Class<? extends Exception>> retryableExceptions = | ||
ImmutableList.of(SSLException.class, SocketTimeoutException.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really sorry, I raised this draft and never followed through:
@singhpk234 You're correct that Iceberg itself does not have a configuration to overwrite the default SDK retry policy. I'd say that's separate to this and wouldn't club the two together imo. But my general thought is that AwsProperties has gotten a bit complex with all the knobs that are exposed, and I think in Iceberg we'd want to avoid the complexity that Hadoop s3a gets into. It's a classic "knobs enable users control" vs "simple/reducing maintenance burden" problem and personally I think Iceberg should attempt to be in the latter category. At least for the 503 retry case, users can already provide a s3client that's configured with their desired retry policy.
On retrying outputstream, I'd say we can address that separately and when it becomes an issue. All the cases, I've so far have really just been on the read side.
f4788e7
to
deae6eb
Compare
28513fc
to
8ba83e3
Compare
Sorry for the late reply, Absolutely @jackye1995 ! So it seems like the overlap between this and #11052 is primarily the retrying of the reading of the input stream. #11052 also includes some configurable properties and different configurations for the S3Client. For the retrying of the input stream, I think the key thing I was trying to achieve was adding the RetryableInputStream primitive so that other FileIOs like GCS/Azure can also use that. How do we feel about that @jackye1995 @ookumuso ? And then the other PR we can talk about the configuration/default updates? |
This is interesting. Didn't know that the AWS SDK already handles the outputstream retries, if that's the case then I think that's even more of a case not to introduce an additional retry logic. |
No worries and thanks for following up! I am okay with using the RetryableInputStream as long as we can we can close this out soonish since there seems to be a lot of people hitting the same issue. Have 2 options here:
What do you think @jackye1995 @amogh-jahagirdar ? |
Sounds good, yeah let me get this to closure soon since it has been open for a while. I think I'd propose option 1 and we rebase #11052 on top of that. My only take is we really should try and minimize as much configuration as possible in Iceberg since the S3 properties is already quite complex. But I think we can address that in the other PR since not retrying the input stream is a concrete issue that quite a few folks are hitting. |
8ba83e3
to
9e07d2a
Compare
Sounds good. Yes, a lot of folks are hitting this issue, also mentioned here: #10340 Do we have any expected time to merge this PR? |
return s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream()); | ||
stream = | ||
RetryableInputStream.builderFor( | ||
() -> s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed with @danielcweeks , for the range read cases since we're using IOUtil readFully/readRemaining which will read the range in a buffered manner. On retries we would read from the beginning position but the internal stream tracking in readFully/readRemaining would not reset to the right position in the buffer to read so that's still an issue.
What we can do is to just not retry the RangeReadable methods for now since they're not actually exercised anywhere. Down the line, we could just use FailSafe and retry on the whole method.
9707998
to
3b916fe
Compare
/** | ||
* RetryableInputStream wraps over an underlying InputStream and retries failures encountered when | ||
* reading through the stream. On retries, the underlying streams will be reinitialized. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we need to document that the newStreamSupplier
must track the position within the stream and restart it at the appropriate location. This class will not track/reposition the stream upon recreation.
d77f22d
to
56cef08
Compare
} | ||
} | ||
|
||
static class FlakyInputStream extends InputStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I slimmed down this input stream implementation so that we only implement read/close that's required for the tests.
import software.amazon.awssdk.services.s3.model.PutObjectRequest; | ||
import software.amazon.awssdk.services.s3.model.PutObjectResponse; | ||
|
||
public class TestFlakyS3InputStream extends TestS3InputStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed from fuzzy to flaky since I think flaky is probably a better adjective to describe that it will periodically fail and then succeed. When I think "fuzzy" I think of fuzzy logic in math which is a bit different.
95fb440
to
f65a26d
Compare
private int skipSize = 1024 * 1024;; | ||
private RetryPolicy<Object> retryPolicy = | ||
RetryPolicy.builder() | ||
.handle( | ||
ImmutableList.of( | ||
SSLException.class, SocketTimeoutException.class, SocketException.class)) | ||
.onFailure(failure -> openStream(true)) | ||
.withMaxRetries(3) | ||
.build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up removing RetryableInputStream for now; GCS is channel based and we should probably verify the Azure SDK doesn't already retry internally. We can always add the abstraction later if it ends up being needed but there's arguably not a compelling case for adding it up front when inlining Failsafe is simple. cc @danielcweeks
if (closeQuietly) { | ||
stream = null; | ||
LOG.warn("An error occurred while closing the stream", e); | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed so that IOException failures on close during the retry don't count against the retry on the read. It can just be a best effort close.
f65a26d
to
ffb1274
Compare
.handle( | ||
ImmutableList.of( | ||
SSLException.class, SocketTimeoutException.class, SocketException.class)) | ||
.onFailure(failure -> openStream(true)) | ||
.withMaxRetries(3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can make these configurable down the line if needed but this covers all the cases that have been reported so far
@amogh-jahagirdar Any tentative timeline on merging of this PR? |
28cf3ec
to
044b959
Compare
Co-authored-by: Jack Ye <[email protected]> Co-authored-by: Xiaoxuan Li <[email protected]>
044b959
to
3dbdf1e
Compare
Fixes #10340
This is an alternative approach to https://github.com/apache/iceberg/pull/4912/files and https://github.com/apache/iceberg/pull/8221/files#diff-0b632866a3b10fac55c442b08178ec0ac72b3b600878243e15d788a8bd031054
for retrying failures encountered when retrying on the reading of input streams.
This approach defines a
RetryableInputStream
class which will wrap underlying input streams returned by object store APIs.Upon failures a new stream will be created. Custom exceptions can be passed in, but the default retries are on SocketTimeoutException and SSLException. This change integrates this input stream implementation with S3InputStream, but RetryableINputStream should be able to be used for the other input streams implementations that are provided by Iceberg.
This change relies on the Failsafe dependency.