Skip to content

Commit 5884c4c

Browse files
authored
Merge pull request #519 from AdaptiveScale/cherrypick-release/0.14/PLUGIN-498
Cherrypick for relase/0.14 - PLUGIN-498
2 parents 37556b2 + 4f0bca2 commit 5884c4c

File tree

10 files changed

+460
-142
lines changed

10 files changed

+460
-142
lines changed

docs/GCS-batchsink.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,23 @@ The delimiter will be ignored if the format is anything other than 'delimited'.
4343

4444
**Location:** The location where the gcs bucket will get created. This value is ignored if the bucket already exists.
4545

46+
**Content Type:** The Content Type entity is used to indicate the media type of the resource.
47+
Defaults to 'application/octet-stream'. The following table shows valid content types for each format.
48+
49+
| Format type | Content type |
50+
|---------------|--------------------------------------------------------------------------------------------|
51+
| avro | application/avro, application/octet-stream |
52+
| csv | text/csv, application/csv, text/plain, application/octet-stream |
53+
| delimited | text/csv, application/csv, text/tab-separated-values, text/plain, application/octet-stream |
54+
| json | application/json, text/plain, application/octet-stream |
55+
| orc | application/octet-stream |
56+
| parquet | application/octet-stream |
57+
| tsv | text/tab-separated-values, text/plain, application/octet-stream |
58+
59+
**Custom Content Type:** The Custom Content Type is used when the value of Content-Type is set to other.
60+
User can provide specific Content-Type, different from the options in the dropdown.
61+
More information about the Content-Type can be found at https://cloud.google.com/storage/docs/metadata
62+
4663
**Service Account** - service account key used for authorization
4764

4865
* **File Path**: Path on the local file system of the service account key used for

src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java

Lines changed: 128 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public class GCSBatchSink extends AbstractFileSink<GCSBatchSink.GCSBatchSinkConf
6767
private static final String RECORDS_UPDATED_METRIC = "records.updated";
6868
public static final String AVRO_NAMED_OUTPUT = "avro.mo.config.namedOutput";
6969
public static final String COMMON_NAMED_OUTPUT = "mapreduce.output.basename";
70+
public static final String CONTENT_TYPE = "io.cdap.gcs.batch.sink.content.type";
7071

7172
private final GCSBatchSinkConfig config;
7273
private String outputPath;
@@ -125,6 +126,7 @@ public void prepareRun(BatchSinkContext context) throws Exception {
125126
@Override
126127
protected Map<String, String> getFileSystemProperties(BatchSinkContext context) {
127128
Map<String, String> properties = GCPUtils.getFileSystemProperties(config, config.getPath(), new HashMap<>());
129+
properties.put(GCSBatchSink.CONTENT_TYPE, config.getContentType());
128130
properties.putAll(config.getFileSystemProperties());
129131
String outputFileBaseName = config.getOutputFileNameBase();
130132
if (outputFileBaseName == null || outputFileBaseName.isEmpty()) {
@@ -242,6 +244,23 @@ public static class GCSBatchSinkConfig extends GCPReferenceSinkConfig implements
242244
private static final String NAME_LOCATION = "location";
243245
private static final String NAME_FS_PROPERTIES = "fileSystemProperties";
244246
private static final String NAME_FILE_NAME_BASE = "outputFileNameBase";
247+
private static final String NAME_CONTENT_TYPE = "contentType";
248+
private static final String NAME_CUSTOM_CONTENT_TYPE = "customContentType";
249+
private static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";
250+
private static final String CONTENT_TYPE_OTHER = "other";
251+
private static final String CONTENT_TYPE_APPLICATION_JSON = "application/json";
252+
private static final String CONTENT_TYPE_APPLICATION_AVRO = "application/avro";
253+
private static final String CONTENT_TYPE_APPLICATION_CSV = "application/csv";
254+
private static final String CONTENT_TYPE_TEXT_PLAIN = "text/plain";
255+
private static final String CONTENT_TYPE_TEXT_CSV = "text/csv";
256+
private static final String CONTENT_TYPE_TEXT_TSV = "text/tab-separated-values";
257+
private static final String FORMAT_AVRO = "avro";
258+
private static final String FORMAT_CSV = "csv";
259+
private static final String FORMAT_JSON = "json";
260+
private static final String FORMAT_TSV = "tsv";
261+
private static final String FORMAT_DELIMITED = "delimited";
262+
private static final String FORMAT_ORC = "orc";
263+
private static final String FORMAT_PARQUET = "parquet";
245264

246265
private static final String SCHEME = "gs://";
247266
@Name(NAME_PATH)
@@ -279,6 +298,18 @@ public static class GCSBatchSinkConfig extends GCPReferenceSinkConfig implements
279298
"This value is ignored if the bucket already exists")
280299
protected String location;
281300

301+
@Macro
302+
@Description("The Content Type property is used to indicate the media type of the resource." +
303+
"Defaults to 'application/octet-stream'.")
304+
@Nullable
305+
protected String contentType;
306+
307+
@Macro
308+
@Description("The Custom Content Type is used when the value of Content-Type is set to other." +
309+
"User can provide specific Content-Type, different from the options in the dropdown.")
310+
@Nullable
311+
protected String customContentType;
312+
282313
@Name(NAME_FS_PROPERTIES)
283314
@Macro
284315
@Nullable
@@ -321,10 +352,19 @@ public void validate(FailureCollector collector) {
321352
collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_FORMAT).withStacktrace(e.getStackTrace());
322353
}
323354

355+
if (!containsMacro(NAME_CONTENT_TYPE) && !containsMacro(NAME_CUSTOM_CONTENT_TYPE)
356+
&& !Strings.isNullOrEmpty(contentType) && !contentType.equalsIgnoreCase(CONTENT_TYPE_OTHER)
357+
&& !containsMacro(NAME_FORMAT)) {
358+
if (!contentType.equalsIgnoreCase(DEFAULT_CONTENT_TYPE)) {
359+
validateContentType(collector);
360+
}
361+
}
362+
324363
try {
325364
getSchema();
326365
} catch (IllegalArgumentException e) {
327-
collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_SCHEMA).withStacktrace(e.getStackTrace());
366+
collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_SCHEMA)
367+
.withStacktrace(e.getStackTrace());
328368
}
329369

330370
try {
@@ -335,6 +375,69 @@ public void validate(FailureCollector collector) {
335375
}
336376
}
337377

378+
//This method validates the specified content type for the used format.
379+
public void validateContentType(FailureCollector failureCollector) {
380+
switch (format) {
381+
case FORMAT_AVRO:
382+
if (!contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_AVRO)) {
383+
failureCollector.addFailure(String.format("Valid content types for avro are %s, %s.",
384+
CONTENT_TYPE_APPLICATION_AVRO, DEFAULT_CONTENT_TYPE), null)
385+
.withConfigProperty(NAME_CONTENT_TYPE);
386+
}
387+
break;
388+
case FORMAT_JSON:
389+
if (!contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_JSON)
390+
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)) {
391+
failureCollector.addFailure(String.format(
392+
"Valid content types for json are %s, %s, %s.", CONTENT_TYPE_APPLICATION_JSON,
393+
CONTENT_TYPE_TEXT_PLAIN, DEFAULT_CONTENT_TYPE), null
394+
).withConfigProperty(NAME_CONTENT_TYPE);
395+
}
396+
break;
397+
case FORMAT_CSV:
398+
if (!contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_CSV)
399+
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_CSV)
400+
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)) {
401+
failureCollector.addFailure(String.format(
402+
"Valid content types for csv are %s, %s, %s, %s.", CONTENT_TYPE_APPLICATION_CSV,
403+
CONTENT_TYPE_TEXT_PLAIN, CONTENT_TYPE_TEXT_CSV, DEFAULT_CONTENT_TYPE), null
404+
).withConfigProperty(NAME_CONTENT_TYPE);
405+
}
406+
break;
407+
case FORMAT_DELIMITED:
408+
if (!contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)
409+
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_CSV)
410+
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_APPLICATION_CSV)
411+
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_TSV)) {
412+
failureCollector.addFailure(String.format(
413+
"Valid content types for delimited are %s, %s, %s, %s, %s.", CONTENT_TYPE_TEXT_PLAIN,
414+
CONTENT_TYPE_TEXT_CSV, CONTENT_TYPE_APPLICATION_CSV, CONTENT_TYPE_TEXT_TSV, DEFAULT_CONTENT_TYPE), null
415+
).withConfigProperty(NAME_CONTENT_TYPE);
416+
}
417+
break;
418+
case FORMAT_PARQUET:
419+
if (!contentType.equalsIgnoreCase(DEFAULT_CONTENT_TYPE)) {
420+
failureCollector.addFailure(String.format("Valid content type for parquet is %s.", DEFAULT_CONTENT_TYPE),
421+
null).withConfigProperty(NAME_CONTENT_TYPE);
422+
}
423+
break;
424+
case FORMAT_ORC:
425+
if (!contentType.equalsIgnoreCase(DEFAULT_CONTENT_TYPE)) {
426+
failureCollector.addFailure(String.format("Valid content type for orc is %s.", DEFAULT_CONTENT_TYPE),
427+
null).withConfigProperty(NAME_CONTENT_TYPE);
428+
}
429+
break;
430+
case FORMAT_TSV:
431+
if (!contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_PLAIN)
432+
&& !contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT_TSV)) {
433+
failureCollector.addFailure(String.format(
434+
"Valid content types for tsv are %s, %s, %s.", CONTENT_TYPE_TEXT_TSV, CONTENT_TYPE_TEXT_PLAIN,
435+
DEFAULT_CONTENT_TYPE), null).withConfigProperty(NAME_CONTENT_TYPE);
436+
}
437+
break;
438+
}
439+
}
440+
338441
public String getBucket() {
339442
return GCSPath.from(path).getBucket();
340443
}
@@ -378,6 +481,30 @@ public String getLocation() {
378481
return location;
379482
}
380483

484+
/* This method gets the value of content type. Valid content types for each format are:
485+
*
486+
* avro -> application/avro, application/octet-stream
487+
* json -> application/json, text/plain, application/octet-stream
488+
* csv -> application/csv, text/csv, text/plain, application/octet-stream
489+
* delimited -> application/csv, text/csv, text/plain, text/tsv, application/octet-stream
490+
* orc -> application/octet-stream
491+
* parquet -> application/octet-stream
492+
* tsv -> text/tab-separated-values, application/octet-stream
493+
*/
494+
@Nullable
495+
public String getContentType() {
496+
if (!Strings.isNullOrEmpty(contentType)) {
497+
if (contentType.equals(CONTENT_TYPE_OTHER)) {
498+
if (Strings.isNullOrEmpty(customContentType)) {
499+
return DEFAULT_CONTENT_TYPE;
500+
}
501+
return customContentType;
502+
}
503+
return contentType;
504+
}
505+
return DEFAULT_CONTENT_TYPE;
506+
}
507+
381508
public Map<String, String> getFileSystemProperties() {
382509
if (fileSystemProperties == null || fileSystemProperties.isEmpty()) {
383510
return Collections.emptyMap();

src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ public void prepareRun(BatchSinkContext context) throws IOException, Instantiati
9595
collector.getOrThrowException();
9696

9797
Map<String, String> baseProperties = GCPUtils.getFileSystemProperties(config, config.getPath(), new HashMap<>());
98-
9998
Map<String, String> argumentCopy = new HashMap<>(context.getArguments().asMap());
10099

101100
String cmekKey = context.getArguments().get(GCPUtils.CMEK_KEY);
@@ -139,6 +138,7 @@ public void prepareRun(BatchSinkContext context) throws IOException, Instantiati
139138
outputProperties.put(FileOutputFormat.OUTDIR, config.getOutputDir(context.getLogicalStartTime(), name));
140139
outputProperties.put("mapreduce.fileoutputcommitter.algorithm.version", "2");
141140

141+
outputProperties.put(GCSBatchSink.CONTENT_TYPE, config.getContentType());
142142
context.addOutput(Output.of(
143143
config.getReferenceName() + "_" + name,
144144
new SinkOutputFormatProvider(RecordFilterOutputFormat.class.getName(), outputProperties)));
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Copyright © 2015-2020 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.gcp.gcs.sink;
18+
19+
import com.google.cloud.storage.Blob;
20+
import com.google.common.annotations.VisibleForTesting;
21+
import io.cdap.plugin.gcp.common.GCPUtils;
22+
import io.cdap.plugin.gcp.gcs.StorageClient;
23+
import org.apache.hadoop.conf.Configuration;
24+
import org.apache.hadoop.fs.Path;
25+
import org.apache.hadoop.mapreduce.JobContext;
26+
import org.apache.hadoop.mapreduce.JobStatus;
27+
import org.apache.hadoop.mapreduce.OutputCommitter;
28+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
29+
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import java.io.IOException;
34+
import java.util.HashMap;
35+
import java.util.Map;
36+
37+
/**
38+
* OutputCommitter for GCS
39+
*/
40+
public class GCSOutputCommitter extends OutputCommitter {
41+
42+
private static final Logger LOG = LoggerFactory.getLogger(GCSOutputFormatProvider.class);
43+
public static final String RECORD_COUNT_FORMAT = "recordcount.%s";
44+
45+
private final OutputCommitter delegate;
46+
47+
public GCSOutputCommitter(OutputCommitter delegate) {
48+
this.delegate = delegate;
49+
}
50+
51+
@Override
52+
public void setupJob(JobContext jobContext) throws IOException {
53+
delegate.setupJob(jobContext);
54+
}
55+
56+
@Override
57+
public void cleanupJob(JobContext jobContext) throws IOException {
58+
delegate.cleanupJob(jobContext);
59+
}
60+
61+
@Override
62+
public void commitJob(JobContext jobContext) throws IOException {
63+
delegate.commitJob(jobContext);
64+
}
65+
66+
@Override
67+
public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
68+
delegate.abortJob(jobContext, state);
69+
}
70+
71+
@Override
72+
public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
73+
delegate.setupTask(taskAttemptContext);
74+
}
75+
76+
@Override
77+
public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
78+
return delegate.needsTaskCommit(taskAttemptContext);
79+
}
80+
81+
@Override
82+
public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
83+
/*On commit task, there seems to be some inconsistency across different hadoop implementations regarding the path
84+
where output file is stored. For some implementations it appears in the path returned by FileOutputCommitter
85+
getCommittedTaskPath and for some it does not.Before commit, the files appear to be consistently present in path
86+
returned by FileOutputCommitter getTaskAttemptPath. Hence, find the output file from taskAttemptPath and add
87+
metadata before commit happens. After commit, file would have been moved out of the taskAttemptPath. */
88+
try {
89+
updateMetricMetaData(taskAttemptContext);
90+
} catch (Exception exception) {
91+
LOG.warn("Unable to record metric for task. Metric emitted for the number of affected rows may be incorrect.",
92+
exception);
93+
}
94+
95+
delegate.commitTask(taskAttemptContext);
96+
}
97+
98+
private void updateMetricMetaData(TaskAttemptContext taskAttemptContext) throws IOException {
99+
if (!(delegate instanceof FileOutputCommitter)) {
100+
return;
101+
}
102+
103+
FileOutputCommitter fileOutputCommitter = (FileOutputCommitter) delegate;
104+
Configuration configuration = taskAttemptContext.getConfiguration();
105+
//Task is not yet committed, so should be available in attempt path
106+
Path taskAttemptPath = fileOutputCommitter.getTaskAttemptPath(taskAttemptContext);
107+
if (configuration == null || taskAttemptPath == null) {
108+
return;
109+
}
110+
111+
//read the count from configuration
112+
String keyInConfig = String.format(RECORD_COUNT_FORMAT, taskAttemptContext.getTaskAttemptID());
113+
Map<String, String> metaData = new HashMap<>();
114+
metaData.put(GCSBatchSink.RECORD_COUNT, String.valueOf(configuration.getLong(keyInConfig, 0L)));
115+
StorageClient storageClient = getStorageClient(configuration);
116+
//update metadata on the output file present in the directory for this task
117+
Blob blob = storageClient.pickABlob(taskAttemptPath.toString());
118+
if (blob == null) {
119+
LOG.info("Could not find a file in path {} to apply count metadata.", taskAttemptPath.toString());
120+
return;
121+
}
122+
blob.toBuilder().setContentType(configuration.get(GCSBatchSink.CONTENT_TYPE)).setMetadata(metaData).build()
123+
.update();
124+
}
125+
126+
@VisibleForTesting
127+
StorageClient getStorageClient(Configuration configuration) throws IOException {
128+
String project = configuration.get(GCPUtils.FS_GS_PROJECT_ID);
129+
String serviceAccount = null;
130+
boolean isServiceAccountFile = GCPUtils.SERVICE_ACCOUNT_TYPE_FILE_PATH
131+
.equals(configuration.get(GCPUtils.SERVICE_ACCOUNT_TYPE));
132+
if (isServiceAccountFile) {
133+
serviceAccount = configuration.get(GCPUtils.CLOUD_JSON_KEYFILE, null);
134+
} else {
135+
serviceAccount = configuration.get(String.format("%s.%s", GCPUtils.CLOUD_JSON_KEYFILE_PREFIX,
136+
GCPUtils.CLOUD_ACCOUNT_JSON_SUFFIX));
137+
}
138+
return StorageClient.create(project, serviceAccount, isServiceAccountFile);
139+
}
140+
141+
@Override
142+
public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
143+
delegate.abortTask(taskAttemptContext);
144+
}
145+
146+
@Override
147+
public boolean isCommitJobRepeatable(JobContext jobContext) throws IOException {
148+
return delegate.isCommitJobRepeatable(jobContext);
149+
}
150+
151+
@Override
152+
public boolean isRecoverySupported(JobContext jobContext) throws IOException {
153+
return delegate.isRecoverySupported(jobContext);
154+
}
155+
156+
@Override
157+
public boolean isRecoverySupported() {
158+
return delegate.isRecoverySupported();
159+
}
160+
161+
@Override
162+
public void recoverTask(TaskAttemptContext taskContext) throws IOException {
163+
delegate.recoverTask(taskContext);
164+
}
165+
}

0 commit comments

Comments
 (0)