|
24 | 24 | import com.google.cloud.storage.Storage; |
25 | 25 | import com.google.cloud.storage.StorageOptions; |
26 | 26 | import com.google.common.base.Preconditions; |
| 27 | +import com.google.common.base.Strings; |
27 | 28 | import com.google.common.collect.ImmutableMap; |
28 | 29 | import com.google.common.io.Files; |
29 | 30 | import com.google.gson.Gson; |
@@ -120,7 +121,6 @@ public class GCSTest extends DataprocETLTestBase { |
120 | 121 |
|
121 | 122 | private static Storage storage; |
122 | 123 | private List<String> markedForDeleteBuckets; |
123 | | - private static final String DEFAULT_CONTENT_TYPE = "application/octet-stream"; |
124 | 124 | private static final String CSV_CONTENT_TYPE = "text/csv"; |
125 | 125 | private static final String MULTISINK_RUNTIME_ARG = "multisink.%s"; |
126 | 126 |
|
@@ -741,7 +741,7 @@ public void testGcsSourceFormats() throws Exception { |
741 | 741 | ETLBatchConfig.Builder pipelineConfig = ETLBatchConfig.builder().addStage(source); |
742 | 742 | for (String format : formats) { |
743 | 743 | String path = String.format("%s/%s/%s", createPath(bucket, OUTPUT_BLOB_NAME), format, suffix); |
744 | | - ETLStage sink = new ETLStage(format, createSinkPlugin(format, path, schema, DEFAULT_CONTENT_TYPE)); |
| 744 | + ETLStage sink = new ETLStage(format, createSinkPlugin(format, path, schema)); |
745 | 745 | pipelineConfig.addStage(sink).addConnection(source.getName(), sink.getName()); |
746 | 746 | } |
747 | 747 |
|
@@ -873,16 +873,21 @@ private ETLStage createSourceStage(String format, String path, String regex, Sch |
873 | 873 | GOOGLE_CLOUD_ARTIFACT)); |
874 | 874 | } |
875 | 875 |
|
| 876 | + private ETLPlugin createSinkPlugin(String format, String path, Schema schema) { |
| 877 | + return createSinkPlugin(format, path, schema, null); |
| 878 | + } |
| 879 | + |
876 | 880 | private ETLPlugin createSinkPlugin(String format, String path, Schema schema, String contentType) { |
877 | | - return new ETLPlugin(SINK_PLUGIN_NAME, BatchSink.PLUGIN_TYPE, |
878 | | - new ImmutableMap.Builder<String, String>() |
879 | | - .put("path", path) |
880 | | - .put("format", format) |
881 | | - .put("project", getProjectId()) |
882 | | - .put("referenceName", format) |
883 | | - .put("schema", schema.toString()) |
884 | | - .put("contentType", contentType).build(), |
885 | | - GOOGLE_CLOUD_ARTIFACT); |
| 881 | + ImmutableMap.Builder<String, String> propertyBuilder = new ImmutableMap.Builder<String, String>() |
| 882 | + .put("path", path) |
| 883 | + .put("format", format) |
| 884 | + .put("project", getProjectId()) |
| 885 | + .put("referenceName", format) |
| 886 | + .put("schema", schema.toString()); |
| 887 | + if (!Strings.isNullOrEmpty(contentType)) { |
| 888 | + propertyBuilder.put("contentType", contentType); |
| 889 | + } |
| 890 | + return new ETLPlugin(SINK_PLUGIN_NAME, BatchSink.PLUGIN_TYPE, propertyBuilder.build(), GOOGLE_CLOUD_ARTIFACT); |
886 | 891 | } |
887 | 892 |
|
888 | 893 | private ETLPlugin createMultiSinkPlugin(String format, String path, Schema schema, String contentType, |
|
0 commit comments