Skip to content

Mysqlconnector #13

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
FROM mariadb:10.3

# Install OpenJDK-8
RUN apt-get update && \
apt-get install -y openjdk-8-jdk && \
apt-get clean;

# Fix certificate issues
RUN apt-get update && \
apt-get install ca-certificates-java && \
apt-get clean && \
update-ca-certificates -f;

# Setup JAVA_HOME -- useful for docker commandline
ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64/
RUN export JAVA_HOME

ENV SBT_VERSION 1.2.6

RUN apt-get update && \
apt-get install -y curl && \
apt-get clean

RUN curl -L -o sbt-$SBT_VERSION.deb http://dl.bintray.com/sbt/debian/sbt-$SBT_VERSION.deb && \
dpkg -i sbt-$SBT_VERSION.deb && \
rm sbt-$SBT_VERSION.deb && \
apt-get update && \
apt-get install sbt && \
sbt sbtVersion

# Install Git
RUN apt-get update && \
apt-get install -y git && \
apt-get clean;

RUN git clone -b mysqlconnector https://github.com/pier485/darwin.git

WORKDIR /darwin

RUN chmod 700 /darwin/init-db-and-launch-tests.sh

#CMD sbt darwin-mysql-connector/test
8 changes: 8 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ lazy val hbaseConnector = Project("darwin-hbase-connector", file("hbase"))
.settings(crossScalaVersions := Versions.crossScalaVersions)
.enablePlugins(JavaAppPackaging)

lazy val mysqlConnector = Project("darwin-mysql-connector", file("mysql"))
.settings(Settings.commonSettings:_*)
.dependsOn(coreCommon)
.settings(pgpPassphrase := Settings.pgpPass)
.settings(libraryDependencies ++= Dependencies.mysql_conn_dep)
.settings(crossScalaVersions := Versions.crossScalaVersions)
.enablePlugins(JavaAppPackaging)

lazy val postgresConnector = Project("darwin-postgres-connector", file("postgres"))
.settings(Settings.commonSettings:_*)
.dependsOn(coreCommon)
Expand Down
18 changes: 18 additions & 0 deletions init-db-and-launch-tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

RET=1
while [[ RET -ne 0 ]]; do
echo "=> Waiting for confirmation of MariaDB service startup"
sleep 5
mysql -uroot -p${MYSQL_ROOT_PASSWORD} -e "status" > /dev/null 2>&1
RET=$?
done

echo "Create database and table";

mysql -uroot -p${MYSQL_ROOT_PASSWORD} -e "CREATE DATABASE IF NOT EXISTS mysqldb;"
mysql -uroot -p${MYSQL_ROOT_PASSWORD} -e "USE mysqldb; CREATE TABLE \`SCHEMA_REPOSITORY\` (\`id\` bigint(20) DEFAULT NULL, \`schema\` text NOT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;"

echo "Launching darwin-mysql-connector tests"

sbt darwin-mysql-connector/test
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
it.agilelab.darwin.connector.mysql.MySQLConnectorCreator
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package it.agilelab.darwin.connector.mysql

object ConfigurationKeys {
val TABLE : String = "table"
val HOST : String = "host"
val DATABASE : String = "db"
val USER : String = "username"
val PASSWORD : String = "password"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package it.agilelab.darwin.connector.mysql

import java.sql.{Connection, DriverManager}

import com.typesafe.config.Config

trait MySQLConnection {

private var connectionUrl : String = ""
private val driverName : String = "org.mariadb.jdbc.Driver"

protected def setConnectionConfig(config : Config) = {
val db = config.getString(ConfigurationKeys.DATABASE)
val host = config.getString(ConfigurationKeys.HOST)
val user = config.getString(ConfigurationKeys.USER)
val password = config.getString(ConfigurationKeys.PASSWORD)
connectionUrl = s"jdbc:mysql://$host/$db?user=$user&password=$password"
}

protected def getConnection: Connection = {
Class.forName(driverName)
val connection: Connection = DriverManager.getConnection(connectionUrl)
connection
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package it.agilelab.darwin.connector.mysql

import java.sql.ResultSet

import com.typesafe.config.Config
import it.agilelab.darwin.common.Connector
import org.apache.avro.Schema
import org.apache.avro.Schema.Parser

class MySQLConnector(config: Config) extends Connector(config) with MySQLConnection {

private def parser: Parser = new Parser()

private val DEFAULT_TABLENAME = "SCHEMA_REPOSITORY"

val TABLE_NAME: String = if (config.hasPath(ConfigurationKeys.TABLE)) {
config.getString(ConfigurationKeys.TABLE)
} else {
DEFAULT_TABLENAME
}

setConnectionConfig(config)

override def fullLoad(): Seq[(Long, Schema)] = {
val connection = getConnection
var schemas: Seq[(Long, Schema)] = Seq.empty[(Long, Schema)]
val statement = connection.createStatement()
val resultSet: ResultSet = statement.executeQuery(s"select * from $TABLE_NAME")

while (resultSet.next()) {
val id = resultSet.getLong("id")
val schema = parser.parse(resultSet.getString("schema"))
schemas = schemas :+ (id -> schema)
}
connection.close
schemas
}

override def insert(schemas: Seq[(Long, Schema)]): Unit = {
val connection = getConnection
try {
connection.setAutoCommit(false)
schemas.foreach { case (id, schema) =>
val insertSchemaPS = connection.prepareStatement(s"INSERT INTO $TABLE_NAME (`id`,`schema`) VALUES (?,?)")
insertSchemaPS.setLong(1, id)
insertSchemaPS.setString(2, schema.toString)
insertSchemaPS.executeUpdate()
insertSchemaPS.close()
}
connection.commit
} catch {
case e: Exception => {
connection.rollback
// e.printStackTrace
throw e // should re-throw?
}
} finally {
connection.close
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package it.agilelab.darwin.connector.mysql

import com.typesafe.config.Config
import it.agilelab.darwin.common.{Connector, ConnectorCreator}

class MySQLConnectorCreator extends ConnectorCreator {
override def create(config: Config): Connector = new MySQLConnector(config)
}
4 changes: 4 additions & 0 deletions mysql/src/test/resources/mysql.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
host = localhost:3306
db = mysqldb
username = root
password = mysqlpwd
28 changes: 28 additions & 0 deletions mysql/src/test/resources/mysqlmock.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"type" : "record",
"name" : "MySQLMock",
"namespace" : "it.agilelab.darwin.connector.mysql",
"fields" : [ {
"name" : "one",
"type" : "int"
}, {
"name" : "two",
"type" : "string"
}, {
"name" : "three",
"type" : "long"
}, {
"name" : "four",
"type" : {
"type" : "record",
"name" : "MySQL2Mock",
"fields" : [ {
"name" : "one",
"type" : "boolean"
}, {
"name" : "two",
"type" : "long"
} ]
}
} ]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package it.agilelab.darwin.connector.mysql

case class MySQL2Mock(one: Boolean, two: Long)
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package it.agilelab.darwin.connector.mysql

import com.typesafe.config.ConfigFactory
import it.agilelab.darwin.common.Connector
import org.apache.avro.{Schema, SchemaNormalization}
import org.scalatest.{FlatSpec, Matchers}
import org.apache.avro.reflect.ReflectData


class MySQLConnectorSuite extends FlatSpec with Matchers {
val connector: Connector = new MySQLConnectorCreator().create(ConfigFactory.load("mysql.properties"))

"MySQLConnector" should "load all existing schemas" in {
connector.fullLoad()
}

it should "insert and retrieve" in {
val outerSchema = new Schema.Parser().parse(getClass.getClassLoader.getResourceAsStream("mysqlmock.avsc"))
val innerSchema = outerSchema.getField("four").schema()

val schemas = Seq(innerSchema, outerSchema)
.map(s => SchemaNormalization.parsingFingerprint64(s) -> s)
connector.insert(schemas)
val loaded: Seq[(Long, Schema)] = connector.fullLoad()
assert(loaded.size == schemas.size)
assert(loaded.forall(schemas.contains))
}

it should "check schemas" in {
val outerSchema: Schema = ReflectData.get().getSchema(classOf[MySQLMock])
val innerSchema: Schema = ReflectData.get().getSchema(classOf[MySQL2Mock])

val loaded: Seq[Schema] = connector.fullLoad().map(s => s._2)

assert(loaded.contains(outerSchema))
assert(loaded.contains(innerSchema))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package it.agilelab.darwin.connector.mysql

case class MySQLMock(one: Int,
two: String,
three: Long,
four: MySQL2Mock )
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ object Dependencies {
val spark_core = "org.apache.spark" %% "spark-core" % "2.3.0" % "provided"
val spark_sql = "org.apache.spark" %% "spark-sql" % "2.3.0" % "provided"
val postgres_conn = "org.postgresql" % "postgresql" % "9.3-1100-jdbc4"
val mysql_conn = "org.mariadb.jdbc" % "mariadb-java-client" % "2.3.0"

val core_deps = Seq(scalatest, avro, typesafe_config)
val mock_app_dep = core_deps ++ Seq(avro4s, reflections)
val hbase_conn_dep = core_deps ++ Seq(hbase_common, hbase_server, hadoop_common)
val postgres_conn_dep = core_deps :+ postgres_conn
val spark_app = mock_app_dep ++ Seq(spark_core, spark_sql, hbase_common)
val mysql_conn_dep = core_deps :+ mysql_conn
}