Skip to content

Commit 00215ac

Browse files
authored
Merge pull request #58 from mlozbin-cybervisiontech/feature/PLUGIN-8_memsql-plugin
PLUGIN-8 MemSQL database Source and Sink plugins
2 parents b492572 + 499af71 commit 00215ac

36 files changed

+3359
-31
lines changed

README.md

+31-2
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,43 @@ mvn clean test \
1313
```
1414
Notice that you must change properties for Aurora MySQL and Aurora Postgresql to real before running tests.
1515
## Setup Local Environment
16-
MySQL, Postgresql, MSSQL, DB2 are using prebuild images.
16+
MySQL, Postgresql, MSSQL, DB2, MemSQL are using prebuild images.
1717

1818
Oracle DB image should be build separately.
1919

20+
MemSQL image should be configure after start.
21+
2022
Netezza requires VMware Player for running Netezza emulator.
2123

2224
* [Install Docker Compose](https://docs.docker.com/compose/install/)
2325
* Build local docker images
2426
* [Build Oracle DB docker image version 12.1.0.2-ee](https://github.com/oracle/docker-images/tree/master/OracleDatabase/SingleInstance)
25-
* Start docker environment by running commands:
27+
* Enter the folder with docker-compose file:
2628
```bash
2729
cd docker-compose/db-plugins-env/
30+
```
31+
* Export your license key for MemSQL to environment variable:
32+
```bash
33+
export MEMSQL_LICENSE_KEY=YOUR_LICENSE_KEY
34+
```
35+
* Initialize Memsql container:
36+
```bash
37+
docker-compose up memsql
38+
```
39+
* Start docker environment by running commands:
40+
```bash
2841
docker-compose up -d
2942
```
43+
* Connect to MemSQL Studio at [http://localhost:8888](http://localhost:8888)
44+
The default Username is root and Password should be left blank.
45+
* Create `mydb` database in MemSQL Studio
46+
```sql
47+
create database mydb
48+
```
49+
* Set password for `root` user in MemSQL Studio
50+
```sql
51+
grant all on *.* to 'root'@'%' identified by 'root' with grant option;
52+
```
3053
* [Install and start Netezza emulator](http://dwgeek.com/install-vmware-player-netezza-emulator.html/)
3154
* Create database `mydb` in Netezza emulator
3255

@@ -68,6 +91,12 @@ docker-compose up -d
6891
* **netezza.database** - Server namespace for test databases. Default: mydb.
6992
* **netezza.username** - Server username. Default: admin.
7093
* **netezza.password** - Server password. Default: password.
94+
#### MemSQL
95+
* **memsql.host** - Server host. Default: localhost.
96+
* **memsql.port** - Server port. Default: 3308.
97+
* **memsql.database** - Server namespace for test databases. Default: mydb.
98+
* **memsql.username** - Server username. Default: root.
99+
* **memsql.password** - Server password. Default: root.
71100
#### Aurora MySQL
72101
* **auroraMysql.clusterEndpoint** - Cluster endpoint.
73102
* **auroraMysql.port** - Server port.

database-commons/src/main/java/io/cdap/plugin/db/batch/source/AbstractDBSource.java

+18-13
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@
3232
import io.cdap.cdap.etl.api.StageConfigurer;
3333
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
3434
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
35-
import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException;
36-
import io.cdap.cdap.etl.api.validation.InvalidStageException;
3735
import io.cdap.cdap.internal.io.SchemaTypeAdapter;
3836
import io.cdap.plugin.common.LineageRecorder;
3937
import io.cdap.plugin.common.ReferenceBatchSource;
@@ -259,7 +257,8 @@ public void prepareRun(BatchSourceContext context) throws Exception {
259257

260258
Schema schemaFromDB = loadSchemaFromDB(driverClass);
261259
if (sourceConfig.schema != null) {
262-
sourceConfig.validateSchema(schemaFromDB);
260+
sourceConfig.validateSchema(schemaFromDB, collector);
261+
collector.getOrThrowException();
263262
connectionConfigAccessor.setSchema(sourceConfig.schema);
264263
} else {
265264
String schemaStr = SCHEMA_TYPE_ADAPTER.toJson(schemaFromDB);
@@ -394,37 +393,43 @@ private void validate(FailureCollector collector) {
394393
}
395394
}
396395

397-
private void validateSchema(Schema actualSchema) {
398-
validateSchema(actualSchema, getSchema());
396+
protected void validateSchema(Schema actualSchema, FailureCollector collector) {
397+
validateSchema(actualSchema, getSchema(), collector);
399398
}
400399

401400
@VisibleForTesting
402-
static void validateSchema(Schema actualSchema, Schema configSchema) {
401+
static void validateSchema(Schema actualSchema, Schema configSchema, FailureCollector collector) {
403402
if (configSchema == null) {
404-
405-
throw new InvalidConfigPropertyException("Schema should not be null or empty", SCHEMA);
403+
collector.addFailure("Schema should not be null or empty.", null)
404+
.withConfigProperty(SCHEMA);
405+
return;
406406
}
407+
407408
for (Schema.Field field : configSchema.getFields()) {
408409
Schema.Field actualField = actualSchema.getField(field.getName());
409410
if (actualField == null) {
410-
throw new InvalidConfigPropertyException(String.format("Schema field '%s' is not present in actual record",
411-
field.getName()), SCHEMA);
411+
collector.addFailure(
412+
String.format("Schema field '%s' is not present in actual record", field.getName()), null)
413+
.withOutputSchemaField(field.getName());
414+
continue;
412415
}
416+
413417
Schema actualFieldSchema = actualField.getSchema().isNullable() ?
414418
actualField.getSchema().getNonNullable() : actualField.getSchema();
415419
Schema expectedFieldSchema = field.getSchema().isNullable() ?
416420
field.getSchema().getNonNullable() : field.getSchema();
417421

418422
if (!actualFieldSchema.equals(expectedFieldSchema)) {
419-
throw new IllegalArgumentException(
423+
collector.addFailure(
420424
String.format("Schema field '%s' has type '%s' but found '%s' in input record",
421-
field.getName(), expectedFieldSchema.getType(), actualFieldSchema.getType()));
425+
field.getName(), expectedFieldSchema.getType(), actualFieldSchema.getType()), null)
426+
.withOutputSchemaField(field.getName());
422427
}
423428
}
424429
}
425430

426431
@Nullable
427-
private Schema getSchema() {
432+
protected Schema getSchema() {
428433
try {
429434
return Strings.isNullOrEmpty(schema) ? null : Schema.parseJson(schema);
430435
} catch (IOException e) {

database-commons/src/test/java/io/cdap/plugin/db/batch/source/AbstractDBSourceTest.java

+35-15
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,21 @@
1717
package io.cdap.plugin.db.batch.source;
1818

1919
import io.cdap.cdap.api.data.schema.Schema;
20-
import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException;
20+
import io.cdap.cdap.etl.api.validation.CauseAttributes;
21+
import io.cdap.cdap.etl.api.validation.ValidationFailure;
22+
import io.cdap.cdap.etl.mock.validation.MockFailureCollector;
2123
import org.junit.Assert;
2224
import org.junit.Test;
2325

26+
import java.util.List;
27+
import java.util.stream.Collectors;
28+
import javax.annotation.Nonnull;
29+
2430
/**
2531
* Test class for source schema validation.
2632
*/
2733
public class AbstractDBSourceTest {
34+
private static final String MOCK_STAGE = "mockStage";
2835
private static final Schema SCHEMA = Schema.recordOf(
2936
"schema",
3037
Schema.Field.of("id", Schema.nullableOf(Schema.of(Schema.Type.INT))),
@@ -39,7 +46,9 @@ public class AbstractDBSourceTest {
3946

4047
@Test
4148
public void testValidateSourceSchemaCorrectSchema() {
42-
AbstractDBSource.DBSourceConfig.validateSchema(SCHEMA, SCHEMA);
49+
MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE);
50+
AbstractDBSource.DBSourceConfig.validateSchema(SCHEMA, SCHEMA, collector);
51+
Assert.assertEquals(0, collector.getValidationFailures().size());
4352
}
4453

4554
@Test
@@ -55,12 +64,9 @@ public void testValidateSourceSchemaMismatchFields() {
5564
Schema.Field.of("double_column", Schema.nullableOf(Schema.of(Schema.Type.DOUBLE)))
5665
);
5766

58-
try {
59-
AbstractDBSource.DBSourceConfig.validateSchema(actualSchema, SCHEMA);
60-
Assert.fail(String.format("Expected to throw %s", InvalidConfigPropertyException.class.getName()));
61-
} catch (InvalidConfigPropertyException e) {
62-
Assert.assertEquals(AbstractDBSource.DBSourceConfig.SCHEMA, e.getProperty());
63-
}
67+
MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE);
68+
AbstractDBSource.DBSourceConfig.validateSchema(actualSchema, SCHEMA, collector);
69+
assertPropertyValidationFailed(collector, "boolean_column");
6470
}
6571

6672
@Test
@@ -77,12 +83,26 @@ public void testValidateSourceSchemaInvalidFieldType() {
7783
Schema.Field.of("boolean_column", Schema.nullableOf(Schema.of(Schema.Type.INT)))
7884
);
7985

80-
try {
81-
AbstractDBSource.DBSourceConfig.validateSchema(actualSchema, SCHEMA);
82-
Assert.fail(String.format("Expected to throw %s", IllegalArgumentException.class.getName()));
83-
} catch (IllegalArgumentException e) {
84-
String errorMessage = "Schema field 'boolean_column' has type 'BOOLEAN' but found 'INT' in input record";
85-
Assert.assertEquals(errorMessage, e.getMessage());
86-
}
86+
MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE);
87+
AbstractDBSource.DBSourceConfig.validateSchema(actualSchema, SCHEMA, collector);
88+
assertPropertyValidationFailed(collector, "boolean_column");
89+
}
90+
91+
private static void assertPropertyValidationFailed(MockFailureCollector failureCollector, String paramName) {
92+
List<ValidationFailure> failureList = failureCollector.getValidationFailures();
93+
Assert.assertEquals(1, failureList.size());
94+
ValidationFailure failure = failureList.get(0);
95+
List<ValidationFailure.Cause> causeList = getCauses(failure, CauseAttributes.OUTPUT_SCHEMA_FIELD);
96+
Assert.assertEquals(1, causeList.size());
97+
ValidationFailure.Cause cause = causeList.get(0);
98+
Assert.assertEquals(paramName, cause.getAttribute(CauseAttributes.OUTPUT_SCHEMA_FIELD));
99+
}
100+
101+
@Nonnull
102+
private static List<ValidationFailure.Cause> getCauses(ValidationFailure failure, String stacktrace) {
103+
return failure.getCauses()
104+
.stream()
105+
.filter(cause -> cause.getAttribute(stacktrace) != null)
106+
.collect(Collectors.toList());
87107
}
88108
}

docker-compose/db-plugins-env/docker-compose.yml

+8-1
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,11 @@ services:
5656
environment:
5757
- ORACLE_SID=cdap
5858
- ORACLE_PDB=mydb
59-
- ORACLE_PWD=123Qwe123
59+
- ORACLE_PWD=123Qwe123
60+
memsql:
61+
image: memsql/cluster-in-a-box:centos-6.8.10-a53e479edc-1.9.0-1.3.0
62+
ports:
63+
- 3308:3306
64+
- 8888:8080
65+
environment:
66+
- LICENSE_KEY=${MEMSQL_LICENSE_KEY}

memsql-plugin/docs/Memsql-action.md

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# MemSQL Action
2+
3+
4+
Description
5+
-----------
6+
Action that runs a MemSQL command.
7+
8+
9+
Use Case
10+
--------
11+
The action can be used whenever you want to run a MemSQL command before or after a data pipeline.
12+
For example, you may want to run a sql update command on a database before the pipeline source pulls data from tables.
13+
14+
15+
Properties
16+
----------
17+
**Driver Name:** Name of the JDBC driver to use.
18+
19+
**Database Command:** Database command to execute.
20+
21+
**Host:** Host that MemSQL is running on.
22+
23+
**Port:** Port that MemSQL is running on.
24+
25+
**Database:** MemSQL database name.
26+
27+
**Username:** User identity for connecting to the specified database.
28+
29+
**Password:** Password to use to connect to the specified database.
30+
31+
**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
32+
will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.
33+
34+
**Auto Reconnect:** Should the driver try to re-establish stale and/or dead connections.
35+
36+
**Use SSL:** Turns on SSL encryption. The connection will fail if SSL is not available.
37+
38+
**Keystore URL:** URL to the client certificate KeyStore (if not specified, use defaults). Must be accessible at the
39+
same location on host where CDAP Master is running and all hosts on which at least one HDFS, MapReduce, or YARN daemon
40+
role is running.
41+
42+
**Keystore Password:** Password for the client certificates KeyStore.
43+
44+
**Truststore URL:** URL to the trusted root certificate KeyStore (if not specified, use defaults). Must be accessible at
45+
the same location on host where CDAP Master is running and all hosts on which at least one HDFS, MapReduce, or YARN
46+
daemon role is running.
47+
48+
**Truststore Password:** Password for the trusted root certificates KeyStore
49+
50+
**Use Compression:** Use zlib compression when communicating with the server. Select this option for WAN
51+
connections.
52+
53+
**Use ANSI Quotes:** Treats " as an identifier quote character and not as a string quote character.
54+
55+
56+
Example
57+
-------
58+
Suppose you want to execute a query against a MemSQL database named "prod" that is running on "localhost"
59+
port 3306, then configure the plugin with:
60+
61+
```
62+
Driver Name: "mariadb"
63+
Database Command: "UPDATE table_name SET price = 20 WHERE ID = 6"
64+
Host: "localhost"
65+
Port: 3306
66+
Database: "prod"
67+
```

0 commit comments

Comments
 (0)