28
28
import io .cdap .cdap .api .dataset .lib .KeyValue ;
29
29
import io .cdap .cdap .api .plugin .PluginConfig ;
30
30
import io .cdap .cdap .etl .api .Emitter ;
31
+ import io .cdap .cdap .etl .api .FailureCollector ;
31
32
import io .cdap .cdap .etl .api .PipelineConfigurer ;
33
+ import io .cdap .cdap .etl .api .StageConfigurer ;
32
34
import io .cdap .cdap .etl .api .batch .BatchRuntimeContext ;
33
35
import io .cdap .cdap .etl .api .batch .BatchSinkContext ;
34
36
import io .cdap .cdap .etl .api .validation .InvalidStageException ;
64
66
import java .util .Objects ;
65
67
import java .util .Optional ;
66
68
import java .util .Properties ;
69
+ import java .util .Set ;
67
70
import java .util .stream .Collectors ;
68
71
69
72
/**
@@ -91,12 +94,16 @@ private String getJDBCPluginId() {
91
94
@ Override
92
95
public void configurePipeline (PipelineConfigurer pipelineConfigurer ) {
93
96
super .configurePipeline (pipelineConfigurer );
97
+ StageConfigurer configurer = pipelineConfigurer .getStageConfigurer ();
94
98
DBUtils .validateJDBCPluginPipeline (pipelineConfigurer , dbSinkConfig , getJDBCPluginId ());
95
- Schema inputSchema = pipelineConfigurer . getStageConfigurer () .getInputSchema ();
99
+ Schema inputSchema = configurer .getInputSchema ();
96
100
if (Objects .nonNull (inputSchema )) {
97
101
Class <? extends Driver > driverClass = DBUtils .getDriverClass (
98
102
pipelineConfigurer , dbSinkConfig , ConnectionConfig .JDBC_PLUGIN_TYPE );
99
- validateSchema (driverClass , dbSinkConfig .tableName , inputSchema );
103
+ if (driverClass != null ) {
104
+ FailureCollector collector = configurer .getFailureCollector ();
105
+ validateSchema (collector , driverClass , dbSinkConfig .tableName , inputSchema );
106
+ }
100
107
}
101
108
}
102
109
@@ -117,7 +124,9 @@ public void prepareRun(BatchSinkContext context) {
117
124
// make sure that the destination table exists and column types are correct
118
125
try {
119
126
if (Objects .nonNull (outputSchema )) {
120
- validateSchema (driverClass , dbSinkConfig .tableName , outputSchema );
127
+ FailureCollector collector = context .getFailureCollector ();
128
+ validateSchema (collector , driverClass , dbSinkConfig .tableName , outputSchema );
129
+ collector .getOrThrowException ();
121
130
} else {
122
131
outputSchema = inferSchema (driverClass );
123
132
}
@@ -269,15 +278,17 @@ static List<ColumnType> getMatchedColumnTypeList(ResultSetMetaData resultSetMeta
269
278
return columnTypes ;
270
279
}
271
280
272
- private void validateSchema (Class <? extends Driver > jdbcDriverClass , String tableName , Schema inputSchema ) {
281
+ private void validateSchema (FailureCollector collector , Class <? extends Driver > jdbcDriverClass , String tableName ,
282
+ Schema inputSchema ) {
273
283
String connectionString = dbSinkConfig .getConnectionString ();
274
284
275
285
try {
276
286
DBUtils .ensureJDBCDriverIsAvailable (jdbcDriverClass , connectionString , dbSinkConfig .jdbcPluginName );
277
287
} catch (IllegalAccessException | InstantiationException | SQLException e ) {
278
- throw new InvalidStageException (String .format ("Unable to load or register JDBC driver '%s' while checking for " +
279
- "the existence of the database table '%s'." ,
280
- jdbcDriverClass , tableName ), e );
288
+ collector .addFailure (String .format ("Unable to load or register JDBC driver '%s' while checking for " +
289
+ "the existence of the database table '%s'." ,
290
+ jdbcDriverClass , tableName ), null ).withStacktrace (e .getStackTrace ());
291
+ throw collector .getOrThrowException ();
281
292
}
282
293
283
294
Properties connectionProperties = new Properties ();
@@ -286,23 +297,26 @@ private void validateSchema(Class<? extends Driver> jdbcDriverClass, String tabl
286
297
executeInitQueries (connection , dbSinkConfig .getInitQueries ());
287
298
try (ResultSet tables = connection .getMetaData ().getTables (null , null , tableName , null )) {
288
299
if (!tables .next ()) {
289
- throw new InvalidStageException ("Table " + tableName + " does not exist. " +
290
- "Please check that the 'tableName' property has been set correctly, " +
291
- "and that the connection string " + connectionString +
292
- "points to a valid database." );
300
+ collector .addFailure (
301
+ String .format ("Table '%s' does not exist." , tableName ),
302
+ String .format ("Ensure table '%s' is set correctly and that the connection string '%s' points " +
303
+ "to a valid database." , tableName , connectionString ))
304
+ .withConfigProperty (DBSinkConfig .TABLE_NAME );
305
+ return ;
293
306
}
294
307
}
295
308
296
309
try (PreparedStatement pStmt = connection .prepareStatement ("SELECT * FROM " + dbSinkConfig .getEscapedTableName ()
297
310
+ " WHERE 1 = 0" );
298
311
ResultSet rs = pStmt .executeQuery ()) {
299
- getFieldsValidator ().validateFields (inputSchema , rs );
312
+ getFieldsValidator ().validateFields (inputSchema , rs , collector );
300
313
}
301
-
302
314
} catch (SQLException e ) {
303
315
LOG .error ("Exception while trying to validate schema of database table {} for connection {}." ,
304
316
tableName , connectionString , e );
305
- throw Throwables .propagate (e );
317
+ collector .addFailure (
318
+ String .format ("Exception while trying to validate schema of database table '%s' for connection '%s'." ,
319
+ tableName , connectionString ), null ).withStacktrace (e .getStackTrace ());
306
320
}
307
321
}
308
322
0 commit comments