Skip to content

Commit

Permalink
feat: Destination Full support for Generic Importers (#1430)
Browse files Browse the repository at this point in the history
* Test commit

* Add support for destination Full exception

* Add support for destination Full exception

* update documentation
  • Loading branch information
ameya9 authored Jan 31, 2025
1 parent 1081df7 commit 55fd706
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ If there is an error importing an item on any endpoint, the relevant HTTP respon
}
```

The combination of the HTTP response code and `error` field can be used to encode for specific failure modes; see [Token Refresh](#token-refresh) below.
The combination of the HTTP response code and `error` field can be used to encode for specific failure modes; see [Token Refresh](#token-refresh) & [Destination Full](#destination-full) below.

## Authentication and Authorization

Expand All @@ -734,3 +734,23 @@ Content-Type: application/json
```

The worker will request a token refresh through the standard OAuth refresh token flow.

### Destination Full

If the destination is unable to accept additional data for user due to capacity constrain or other limitations,
the POST APIs should throw `413 Payload Too Large` error to avoid unnecessary data transfer attempts.

`error` field should strictly be set to `destination_full` to indicate that the destination storage is full.

Example error response:

```
HTTP/1.1 413 Payload Too Large
Content-Type: application/json
{
"error": "destination_full",
"error_description": "The destination storage is full"
}
```
The worker will pause the job and won't retry in such scenario.
Some exporters also support the ability to retry transfers after user frees up space in the destination.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.datatransferproject.spi.cloud.connection.ConnectionProvider;
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore;
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore.InputStreamWrapper;
import org.datatransferproject.spi.transfer.types.DestinationMemoryFullException;
import org.datatransferproject.spi.transfer.types.InvalidTokenException;
import org.datatransferproject.types.common.models.ContainerResource;
import org.datatransferproject.types.transfer.auth.AppCredentials;
Expand Down Expand Up @@ -43,7 +44,7 @@ public GenericFileImporter(
@Override
public boolean importSingleItem(
UUID jobId, TokensAndUrlAuthData authData, ImportableData<R> dataItem)
throws IOException, InvalidTokenException {
throws IOException, InvalidTokenException, DestinationMemoryFullException {
if (dataItem instanceof ImportableFileData) {
return importSingleFileItem(jobId, authData, (ImportableFileData<R>) dataItem);
} else {
Expand All @@ -53,7 +54,7 @@ public boolean importSingleItem(

private <T> boolean importSingleFileItem(
UUID jobId, AuthData authData, ImportableFileData<R> data)
throws IOException, InvalidTokenException {
throws IOException, InvalidTokenException, DestinationMemoryFullException {
InputStreamWrapper wrapper = connectionProvider.getInputStreamForItem(jobId, data.getFile());
File tempFile =
dataStore.getTempFileFromInputStream(wrapper.getStream(), data.getFile().getName(), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.datatransferproject.spi.transfer.provider.ImportResult;
import org.datatransferproject.spi.transfer.provider.ImportResult.ResultType;
import org.datatransferproject.spi.transfer.provider.Importer;
import org.datatransferproject.spi.transfer.types.DestinationMemoryFullException;
import org.datatransferproject.spi.transfer.types.InvalidTokenException;
import org.datatransferproject.types.common.models.ContainerResource;
import org.datatransferproject.types.transfer.auth.AppCredentials;
Expand Down Expand Up @@ -115,6 +116,7 @@ public ImportResult importItem(
TokensAndUrlAuthData initialAuthData,
C data)
throws Exception {

OAuthTokenManager tokenManager =
jobTokenManagerMap.computeIfAbsent(
jobId,
Expand All @@ -130,7 +132,7 @@ public ImportResult importItem(
return new ImportResult(ResultType.OK);
}

boolean parseResponse(Response response) throws IOException, InvalidTokenException {
boolean parseResponse(Response response) throws IOException, InvalidTokenException, DestinationMemoryFullException {
if (response.code() >= 400) {
byte[] body = response.body().bytes();
ErrorResponse error;
Expand All @@ -146,6 +148,10 @@ boolean parseResponse(Response response) throws IOException, InvalidTokenExcepti

if (response.code() == 401 && error.getError().equals("invalid_token")) {
throw new InvalidTokenException(error.toString(), null);
} if (response.code() == 413 && error.getError().equals("destination_full")) {
throw new DestinationMemoryFullException(
String.format("Generic importer failed with code (%s)", response.code()),
new RuntimeException("destination_full"));
} else {
throw new IOException(format("Error (%d) %s", response.code(), error.toString()));
}
Expand All @@ -157,7 +163,7 @@ boolean parseResponse(Response response) throws IOException, InvalidTokenExcepti
}

boolean importSingleItem(UUID jobId, TokensAndUrlAuthData authData, ImportableData<R> dataItem)
throws IOException, InvalidTokenException {
throws IOException, InvalidTokenException, DestinationMemoryFullException {
Request request =
new Request.Builder()
.url(endpoint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ public Optional<String> getExpiresIn() {
*/
public class OAuthTokenManager {
@FunctionalInterface
public interface FunctionRequiringAuthData<T> {
public T execute(TokensAndUrlAuthData authData) throws IOException, InvalidTokenException;
public interface FunctionRequiringAuthData<T, Ex extends Exception> {
public T execute(TokensAndUrlAuthData authData) throws Ex, InvalidTokenException;
}

AppCredentials appCredentials;
Expand Down Expand Up @@ -151,8 +151,8 @@ private TokensAndUrlAuthData refreshToken() throws IOException {
* @throws InvalidTokenException if {@code f} throws an {@link InvalidTokenException} after the
* access token has been refreshed
*/
public <T> T withAuthData(FunctionRequiringAuthData<T> f)
throws IOException, InvalidTokenException {
public <T, Ex extends Exception> T withAuthData(FunctionRequiringAuthData<T, Ex> f)
throws Ex, InvalidTokenException, IOException {
try {
return f.execute(authData);
} catch (InvalidTokenException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import static java.lang.String.format;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import okhttp3.mockwebserver.Dispatcher;
import okhttp3.mockwebserver.MockResponse;
Expand All @@ -16,6 +18,7 @@
import org.datatransferproject.api.launcher.Monitor;
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore;
import org.datatransferproject.spi.transfer.idempotentexecutor.InMemoryIdempotentImportExecutor;
import org.datatransferproject.spi.transfer.types.DestinationMemoryFullException;
import org.datatransferproject.types.common.models.IdOnlyContainerResource;
import org.datatransferproject.types.transfer.auth.AppCredentials;
import org.datatransferproject.types.transfer.auth.TokensAndUrlAuthData;
Expand All @@ -30,16 +33,20 @@

@RunWith(Parameterized.class)
public class GenericImporterTest {
private final Monitor monitor = new Monitor() {};
private final TemporaryPerJobDataStore dataStore = new TemporaryPerJobDataStore() {};
@Parameter public String importerClass;
private MockWebServer webServer;
private Monitor monitor = new Monitor() {};
private TemporaryPerJobDataStore dataStore = new TemporaryPerJobDataStore() {};

@Parameters(name = "{0}")
public static Collection<String> strings() {
return Arrays.asList(GenericImporter.class.getName(), GenericFileImporter.class.getName());
}

@Parameter public String importerClass;
static void assertContains(String expected, String actual) {
assertTrue(
format("Missing substring [%s] from [%s]", expected, actual), actual.contains(expected));
}

@Before
public void setup() throws IOException {
Expand All @@ -52,11 +59,6 @@ public void teardown() throws IOException {
webServer.shutdown();
}

static void assertContains(String expected, String actual) {
assertTrue(
format("Missing substring [%s] from [%s]", expected, actual), actual.contains(expected));
}

<C> GenericImporter<IdOnlyContainerResource, C> getImporter(
String cls, ContainerSerializer<IdOnlyContainerResource, C> containerSerializer) {
if (cls.equals(GenericFileImporter.class.getName())) {
Expand Down Expand Up @@ -98,17 +100,18 @@ public MockResponse dispatch(RecordedRequest request) {
};
}

@Test
public void testGenericImporter() throws Exception {
InMemoryIdempotentImportExecutor executor = new InMemoryIdempotentImportExecutor(monitor);
GenericImporter<IdOnlyContainerResource, String> importer =
getImporter(
importerClass,
container ->
Arrays.asList(
new ImportableData<>(
new GenericPayload<>(container.getId(), "schemasource"),
container.getId(),
container.getId())));
List.of(
new ImportableData<>(
new GenericPayload<>(container.getId(), "schemasource"),
container.getId(),
container.getId())));
webServer.setDispatcher(getDispatcher());

importer.importItem(
Expand Down Expand Up @@ -196,11 +199,11 @@ public void testGenericImporterTokenRefresh() throws Exception {
getImporter(
importerClass,
container ->
Arrays.asList(
new ImportableData<>(
new GenericPayload<>(container.getId(), "schemasource"),
container.getId(),
container.getId())));
List.of(
new ImportableData<>(
new GenericPayload<>(container.getId(), "schemasource"),
container.getId(),
container.getId())));
webServer.setDispatcher(getDispatcher());

importer.importItem(
Expand Down Expand Up @@ -240,11 +243,11 @@ public void testGenericImporterBadRequest() throws Exception {
getImporter(
importerClass,
container ->
Arrays.asList(
new ImportableData<>(
new GenericPayload<>(container.getId(), "schemasource"),
container.getId(),
container.getId())));
List.of(
new ImportableData<>(
new GenericPayload<>(container.getId(), "schemasource"),
container.getId(),
container.getId())));
webServer.enqueue(
new MockResponse().setResponseCode(400).setBody("{\"error\":\"bad_request\"}"));

Expand All @@ -269,11 +272,11 @@ public void testGenericImporterUnexpectedResponse() throws Exception {
getImporter(
importerClass,
container ->
Arrays.asList(
new ImportableData<>(
new GenericPayload<>(container.getId(), "schemasource"),
container.getId(),
container.getId())));
List.of(
new ImportableData<>(
new GenericPayload<>(container.getId(), "schemasource"),
container.getId(),
container.getId())));
webServer.enqueue(new MockResponse().setResponseCode(400).setBody("notjson"));

importer.importItem(
Expand All @@ -297,11 +300,11 @@ public void testGenericImporterUnexpectedResponseCode() throws Exception {
getImporter(
importerClass,
container ->
Arrays.asList(
new ImportableData<>(
new GenericPayload<>(container.getId(), "schemasource"),
container.getId(),
container.getId())));
List.of(
new ImportableData<>(
new GenericPayload<>(container.getId(), "schemasource"),
container.getId(),
container.getId())));
webServer.enqueue(new MockResponse().setResponseCode(111));

importer.importItem(
Expand All @@ -317,4 +320,35 @@ public void testGenericImporterUnexpectedResponseCode() throws Exception {
assertEquals("itemId", error.title());
assertContains("Unexpected response code (111)", error.exception());
}

@Test
public void testGenericImporterDestinationFull() throws Exception {
InMemoryIdempotentImportExecutor executor = new InMemoryIdempotentImportExecutor(monitor);
GenericImporter<IdOnlyContainerResource, String> importer =
getImporter(
importerClass,
container ->
List.of(
new ImportableData<>(
new GenericPayload<>(container.getId(), "schemasource"),
container.getId(),
container.getId())));
webServer.enqueue(new MockResponse().setResponseCode(413).setBody("{\"error\":\"destination_full\"}"));

assertThrows(DestinationMemoryFullException.class, () -> {
importer.importItem(
UUID.randomUUID(),
executor,
new TokensAndUrlAuthData(
"accessToken", "refreshToken", webServer.url("/refresh").toString()),
new IdOnlyContainerResource("itemId"));
});


Collection<ErrorDetail> errors = executor.getErrors();
assertEquals(1, errors.size());
ErrorDetail error = errors.iterator().next();
assertEquals("itemId", error.title());
assertContains("Generic importer failed with code (413)", error.exception());
}
}

0 comments on commit 55fd706

Please sign in to comment.