Skip to content

Commit

Permalink
feat: Generic Importers (#1389)
Browse files Browse the repository at this point in the history
* Add @type annotations when serializing SocialActivity data

In preparation for creating a generic transfer extension, set the
`@type` field when serializing `SocialActivity*` objects.

* Implement basic GenericImporter for SOCIAL_POSTS

Adds a generic `Importer` for SOCIAL_POSTS. The importer passes the
social data straight through, and is built to be extensible to other
data types.

* Implement basic GenericFileImporter for BLOBS

Extends the `GenericImporter` from the previous commit to support data
with both JSON metadata and file data. As an example of usage, provide
an implementation for `BLOBS` data.

* Add README with draft docs for generic importers

* Configure GenericFileImporter for MEDIA

* Configure GenericImporter for CALENDAR

* Extract generic BLOBS serializer to separate module

Extracts the logic for serializing BLOBS data to a separate module.
Adds tests for the serializer.
Adds a generic wrapper around the payload, which will wrap all other
types after they undergo the same refactoring.

* Extract generic SOCIAL_POSTS exporter to separate module

Extracts the SOCIAL_POSTS generic importer serialization logic to a
separate module.
Also adds tests, and wraps the payload in GenericPayload, like with
BLOBS that was done in the previous commit.

* Extract generic CALENDAR exporter to separate module

Extracts the CALENDAR generic importer serialization logic to a separate
module.
Also adds tests, and wraps the payload in GenericPayload, like with
other data types in previous commits.

* Extract generic MEDIA exporter to separate module

Extracts the MEDIA generic importer serialization logic to a separate
module.
Also adds tests, and wraps the payload in GenericPayload, like with
other data types in previous commits.

* Move CachedDownloadableItem to BlobbySerializer

Moves `CachedDownloadableItem` to the `BlobbySerializer` file, since
that's the only place it's used.

* Serialize dates to strings in tests

* Preserve typing information of JSON payloads

In previous commits the type information of exported data was being lost
at serialization time. This change delays converting to JSON
representation until the time when data is sent on the wire, allowing
type information to be preserved as long as possible and to avoid
double-serialization of the data.

The exported data have a union of possible types (the MediaSerializer,
for example, generates a list of MediaAlbums, VideoModels, and
PhotoModels), but unfortunately Java doesn't support proper union types.
A solution to this is to create an empty `ExportData` interface for each
serializer and have the possible subtypes 'implement' the empty
interface, then using the `ExportData` interface as a type bound.

This makes Media and Calendar serializers more verbose as we're having
to duplicate the underlying models, but it gives us a layer of
separation between the model and the serialized data, allowing them to
evolve independently, and guaranteeing that changes to the model will
require a corresponding change in the serializer, reducing the chance of
the serializer interface being broken.

* Use custom schema for media GenericImporter serialization

Since we've added a layer between generic importer serializers and the
underlying models to better preserve type information, make use of this
by using a custom schema for media items. This lets us be
a) use consistent schemas for Photos and Videos
b) use TZ-aware date-times (although we have to assume the underlying
   Date is UTC when copying the model)

* Tidy up TypeNames of top-level schemas

Simplifies the `@type` annotations of exported data. Might be worth
doing this for nested types too, although some of them are directly
using Models defined by DTP.

* Configure endpoint and supported verticals for generic importers

Expands the `TransferServiceConfig` object to allow arbitraty data in a
`serviceConfig` field, to be later parsed by `TransferExtension`s at
initialization time.
Extends `GenericTransferExtension` to read from `serviceConfig` to
configure itself for the import service being targeted.

Also allows `TransferExtension`s to declare support for services
through a `supportsService` method, turning Service->Extension mappings
from one-to-one to many-to-one, meaning `GenericTransferExtension` can
be configured to support multiple services.

For now the configuration is fairly barebones; a list of supported
verticals (wrapped in an object to allow future support for
vertical-level configuration), the base of the API endpoint, and the
service name.

* Support OAuth Authorization Code-based auth for generic importers

* Tidy up MediaSerializer

Tidies up the MediaSerializer class, and reverts some now-unnecessary
changes to the core DTP models.

* Tidy up BlobbySerializer

Improves the serialization of Blobby data.

* Add tests for GenericImporter

* Simplify GenericFileImporter

Refactors GenericImporter to make GenericFileImporter simpler, and to
remove some duplication in the two classes.

* Set mimetype in GenericFileImporter

* Tidy up old test code

There was a manual test being used during development that was replaced
by unit tests.

* Flatten BlobbySerializer File data

File data had inherited some structure from when it was based on
`BlobbyStorageContainerResource`. Since refactoring to a standalone
schema, there's no need to have this structure, so simplify the schema
by flattening it.

* Add schemas to README docs

* Correct OAuth doc

* Add configuration docs

* Remove test generic extension config file

* Document GenericPayload wrapper

* Rename ContainerMapper to ContainerSerializer

This makes the naming more consistent with e.g. MediaSerializer

* Expand documentation

Based on review feedback, expand on the documentation.
- Include a high-level overview of job lifecyle
- Add details on data rates
- Clarify the ordering behaviour
- Add reference to OAuth RFC
- 201 -> 20x for success codes
- Remove implementation details for OAuth flow
  It's documented more thoroughly elsewhere and we should encourage use
  of a framework or third-party authorization server.

* Fix 'favourite' typo

* Remove extra import

* Use set for vertical config storage

Also use `Set.contains` to check for vertical support.
  • Loading branch information
calumcalder authored Jan 6, 2025
1 parent 56e6d3b commit 60c992b
Show file tree
Hide file tree
Showing 35 changed files with 3,343 additions and 35 deletions.
736 changes: 736 additions & 0 deletions extensions/data-transfer/portability-data-transfer-generic/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2018 The Data Transfer Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
plugins {
id 'maven'
id 'signing'
id 'application'
}

dependencies {
compile project(':portability-spi-transfer')
compile project(':portability-spi-cloud')
compile project(':portability-types-common')
compile "com.squareup.okhttp3:okhttp:${okHttpVersion}"
compile "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${jacksonVersion}"
compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:${jacksonVersion}"
testCompile "com.squareup.okhttp3:mockwebserver:${okHttpVersion}"
testCompile "commons-fileupload:commons-fileupload:1.5"
}

configurePublication(project)
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package org.datatransferproject.datatransfer.generic;

import static java.lang.String.format;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeName;
import java.time.ZonedDateTime;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import org.datatransferproject.types.common.DownloadableItem;
import org.datatransferproject.types.common.models.blob.BlobbyStorageContainerResource;
import org.datatransferproject.types.common.models.blob.DigitalDocumentWrapper;
import org.datatransferproject.types.common.models.blob.DtpDigitalDocument;

/**
* Wrapper to adapt items known to be in temp storage (e.g. BLOB data) into {@link DownloadableItem}
*
* <p>It's useful to wrap such items so upstream code can consume either known temp store'd items or
* items the Importer has to download itself (some MEDIA items) from the same interface.
*/
class CachedDownloadableItem implements DownloadableItem {
private String cachedId;
private String name;

public CachedDownloadableItem(String cachedId, String name) {
this.cachedId = cachedId;
this.name = name;
}

@Override
public String getIdempotentId() {
return cachedId;
}

@Override
public String getFetchableUrl() {
// 'url' is ID when cached
return cachedId;
}

@Override
public boolean isInTempStore() {
return true;
}

@Override
public String getName() {
return name;
}
}

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
@JsonTypeName("File")
class FileExportData implements BlobbySerializer.ExportData {
@JsonProperty private final String folder;
@JsonProperty private final String name;
@JsonProperty private final Optional<ZonedDateTime> dateModified;

private FileExportData(String folder, String name, Optional<ZonedDateTime> dateModified) {
this.folder = folder;
this.name = name;
this.dateModified = dateModified;
}

public String getFolder() {
return folder;
}

public String getName() {
return name;
}

public Optional<ZonedDateTime> getDateModified() {
return dateModified;
}

public static FileExportData fromDtpDigitalDocument(String path, DtpDigitalDocument model) {
return new FileExportData(
path,
model.getName(),
Optional.ofNullable(model.getDateModified())
.filter(string -> !string.isEmpty())
.map(dateString -> ZonedDateTime.parse(model.getDateModified())));
}
}

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
@JsonTypeName("Folder")
class FolderExportData implements BlobbySerializer.ExportData {
@JsonProperty private final String path;

@JsonCreator
public FolderExportData(@JsonProperty String path) {
this.path = path;
}

public String getPath() {
return path;
}
}

public class BlobbySerializer {
@JsonSubTypes({
@JsonSubTypes.Type(FolderExportData.class),
@JsonSubTypes.Type(FileExportData.class),
})
public interface ExportData {}

static class BlobbyContainerPath {
private BlobbyStorageContainerResource container;
private String path;

public BlobbyContainerPath(BlobbyStorageContainerResource container, String path) {
this.container = container;
this.path = path;
}

public BlobbyStorageContainerResource getContainer() {
return container;
}

public String getPath() {
return path;
}
}

static final String SCHEMA_SOURCE =
GenericTransferConstants.SCHEMA_SOURCE_BASE
+ "/extensions/data-transfer/portability-data-transfer-generic/src/main/java/org/datatransferproject/datatransfer/generic/BlobbySerializer.java";

public static Iterable<ImportableData<ExportData>> serialize(
BlobbyStorageContainerResource root) {
List<ImportableData<ExportData>> results = new ArrayList<>();
// Search whole tree of container resource
Queue<BlobbyContainerPath> horizon = new ArrayDeque<>();
BlobbyContainerPath containerAndPath = new BlobbyContainerPath(root, "");
do {
BlobbyStorageContainerResource container = containerAndPath.getContainer();
String parentPath = containerAndPath.getPath();
String path = format("%s/%s", parentPath, container.getName());
// Import the current folder
results.add(
new ImportableData<>(
new GenericPayload<>(new FolderExportData(path), SCHEMA_SOURCE),
container.getId(),
path));

// Add all sub-folders to the search tree
for (BlobbyStorageContainerResource child : container.getFolders()) {
horizon.add(new BlobbyContainerPath(child, path));
}

// Import all files in the current folder
// Intentionally done after importing the current folder
for (DigitalDocumentWrapper file : container.getFiles()) {
results.add(
new ImportableFileData<>(
new CachedDownloadableItem(
file.getCachedContentId(), file.getDtpDigitalDocument().getName()),
file.getDtpDigitalDocument().getEncodingFormat(),
new GenericPayload<>(
FileExportData.fromDtpDigitalDocument(path, file.getDtpDigitalDocument()),
SCHEMA_SOURCE),
file.getCachedContentId(),
file.getDtpDigitalDocument().getName()));
}
} while ((containerAndPath = horizon.poll()) != null);

return results;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.datatransferproject.datatransfer.generic;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.datatransferproject.types.common.models.calendar.CalendarAttendeeModel;
import org.datatransferproject.types.common.models.calendar.CalendarContainerResource;
import org.datatransferproject.types.common.models.calendar.CalendarEventModel;
import org.datatransferproject.types.common.models.calendar.CalendarModel;
import org.datatransferproject.types.common.models.calendar.RecurrenceRule;

class CalendarExportData extends CalendarModel implements CalendarSerializer.ExportData {
private CalendarExportData(String id, String name, String description) {
super(id, name, description);
}

public static CalendarExportData fromModel(CalendarModel model) {
return new CalendarExportData(model.getId(), model.getName(), model.getDescription());
}
}

class CalendarEventExportData extends CalendarEventModel implements CalendarSerializer.ExportData {

private CalendarEventExportData(
String calendarId,
String title,
String notes,
List<CalendarAttendeeModel> attendees,
String location,
CalendarEventTime startTime,
CalendarEventTime endTime,
RecurrenceRule recurrenceRule) {
super(calendarId, title, notes, attendees, location, startTime, endTime, recurrenceRule);
}

public static CalendarEventExportData fromModel(CalendarEventModel model) {
return new CalendarEventExportData(
model.getCalendarId(),
model.getTitle(),
model.getNotes(),
model.getAttendees(),
model.getLocation(),
model.getStartTime(),
model.getEndTime(),
model.getRecurrenceRule());
}
}

public class CalendarSerializer {

@JsonSubTypes({
@JsonSubTypes.Type(value = CalendarExportData.class, name = "Calendar"),
@JsonSubTypes.Type(value = CalendarEventExportData.class, name = "CalendarEvent"),
})
public interface ExportData {}

static final String SCHEMA_SOURCE_CALENDAR =
GenericTransferConstants.SCHEMA_SOURCE_BASE
+ "/portability-types-common/src/main/java/org/datatransferproject/types/common/models/calendar/CalendarModel.java";
static final String SCHEMA_SOURCE_EVENT =
GenericTransferConstants.SCHEMA_SOURCE_BASE
+ "/portability-types-common/src/main/java/org/datatransferproject/types/common/models/calendar/CalendarEventModel.java";

public static Iterable<ImportableData<ExportData>> serialize(
CalendarContainerResource container) {
return Stream.concat(
container.getCalendars().stream()
.map(
calendar ->
new ImportableData<>(
new GenericPayload<ExportData>(
CalendarExportData.fromModel(calendar), SCHEMA_SOURCE_CALENDAR),
calendar.getId(),
calendar.getName())),
container.getEvents().stream()
.map(
event ->
new ImportableData<>(
new GenericPayload<ExportData>(
CalendarEventExportData.fromModel(event), SCHEMA_SOURCE_EVENT),
String.valueOf(event.hashCode()),
event.getTitle())))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.datatransferproject.datatransfer.generic;

import static java.lang.String.format;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.Optional;
import java.util.UUID;
import okhttp3.MediaType;
import okhttp3.MultipartBody;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.datatransferproject.api.launcher.Monitor;
import org.datatransferproject.spi.cloud.connection.ConnectionProvider;
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore;
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore.InputStreamWrapper;
import org.datatransferproject.spi.transfer.types.InvalidTokenException;
import org.datatransferproject.types.common.models.ContainerResource;
import org.datatransferproject.types.transfer.auth.AppCredentials;
import org.datatransferproject.types.transfer.auth.AuthData;
import org.datatransferproject.types.transfer.auth.TokensAndUrlAuthData;

public class GenericFileImporter<C extends ContainerResource, R> extends GenericImporter<C, R> {
private TemporaryPerJobDataStore dataStore;
private ConnectionProvider connectionProvider;

static final MediaType MULTIPART_RELATED = MediaType.parse("multipart/related");
static final MediaType OCTET_STREAM = MediaType.parse("application/octet-stream");

public GenericFileImporter(
ContainerSerializer<C, R> containerSerializer,
AppCredentials appCredentials,
URL endpoint,
TemporaryPerJobDataStore dataStore,
Monitor monitor) {
super(containerSerializer, appCredentials, endpoint, monitor);
this.dataStore = dataStore;
this.connectionProvider = new ConnectionProvider(dataStore);
}

@Override
public boolean importSingleItem(
UUID jobId, TokensAndUrlAuthData authData, ImportableData<R> dataItem)
throws IOException, InvalidTokenException {
if (dataItem instanceof ImportableFileData) {
return importSingleFileItem(jobId, authData, (ImportableFileData<R>) dataItem);
} else {
return super.importSingleItem(jobId, authData, dataItem);
}
}

private <T> boolean importSingleFileItem(
UUID jobId, AuthData authData, ImportableFileData<R> data)
throws IOException, InvalidTokenException {
InputStreamWrapper wrapper = connectionProvider.getInputStreamForItem(jobId, data.getFile());
File tempFile =
dataStore.getTempFileFromInputStream(wrapper.getStream(), data.getFile().getName(), null);
MediaType mimeType =
Optional.ofNullable(MediaType.parse(data.getFileMimeType())).orElse(OCTET_STREAM);
Request request =
new Request.Builder()
.url(endpoint)
.addHeader("Authorization", format("Bearer %s", authData.getToken()))
.post(
new MultipartBody.Builder()
.setType(MULTIPART_RELATED)
.addPart(RequestBody.create(JSON, om.writeValueAsBytes(data.getJsonData())))
.addPart(MultipartBody.create(mimeType, tempFile))
.build())
.build();

try (Response response = client.newCall(request).execute()) {
return parseResponse(response);
} finally {
tempFile.delete();
}
}
}
Loading

0 comments on commit 60c992b

Please sign in to comment.