Skip to content

Commit 34e9f17

Browse files
committed
* Splits Elastic module into elastic-common and elastic8
* Adding Opensearch module * Adding Opensearch unit tests * Opensearch SSL Test - needs completing * Replace Elastic6+7 with Elastic8
1 parent 9f96469 commit 34e9f17

File tree

124 files changed

+3019
-3977
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

124 files changed

+3019
-3977
lines changed

build.sbt

+39-14
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import Dependencies.globalExcludeDeps
22
import Dependencies.gson
3+
import Dependencies.bouncyCastle
4+
35
import Settings.*
46
import sbt.Keys.libraryDependencies
57
import sbt.*
@@ -18,8 +20,9 @@ lazy val subProjects: Seq[Project] = Seq(
1820
`azure-documentdb`,
1921
`azure-datalake`,
2022
cassandra,
21-
elastic6,
22-
elastic7,
23+
`elastic-common`,
24+
opensearch,
25+
elastic8,
2326
ftp,
2427
`gcp-storage`,
2528
http,
@@ -219,18 +222,17 @@ lazy val cassandra = (project in file("kafka-connect-cassandra"))
219222
.configureFunctionalTests()
220223
.enablePlugins(PackPlugin)
221224

222-
lazy val elastic6 = (project in file("kafka-connect-elastic6"))
225+
lazy val `elastic-common` = (project in file("kafka-connect-elastic-common"))
223226
.dependsOn(common)
224227
.dependsOn(`sql-common`)
225228
.dependsOn(`test-common` % "fun->compile")
226229
.settings(
227230
settings ++
228231
Seq(
229-
name := "kafka-connect-elastic6",
232+
name := "kafka-connect-elastic-common",
230233
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
231-
libraryDependencies ++= baseDeps ++ kafkaConnectElastic6Deps,
234+
libraryDependencies ++= baseDeps ++ kafkaConnectElasticBaseDeps,
232235
publish / skip := true,
233-
FunctionalTest / baseDirectory := (LocalRootProject / baseDirectory).value,
234236
packExcludeJars := Seq(
235237
"scala-.*\\.jar",
236238
"zookeeper-.*\\.jar",
@@ -239,20 +241,20 @@ lazy val elastic6 = (project in file("kafka-connect-elastic6"))
239241
)
240242
.configureAssembly(true)
241243
.configureTests(baseTestDeps)
242-
.configureIntegrationTests(kafkaConnectElastic6TestDeps)
244+
.configureIntegrationTests(kafkaConnectElastic8TestDeps)
243245
.configureFunctionalTests()
244-
.enablePlugins(PackPlugin)
246+
.disablePlugins(PackPlugin)
245247

246-
lazy val elastic7 = (project in file("kafka-connect-elastic7"))
248+
lazy val elastic8 = (project in file("kafka-connect-elastic8"))
247249
.dependsOn(common)
248-
.dependsOn(`sql-common`)
249-
.dependsOn(`test-common` % "fun->compile")
250+
.dependsOn(`elastic-common`)
251+
.dependsOn(`test-common` % "fun->compile;it->compile")
250252
.settings(
251253
settings ++
252254
Seq(
253-
name := "kafka-connect-elastic7",
255+
name := "kafka-connect-elastic8",
254256
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
255-
libraryDependencies ++= baseDeps ++ kafkaConnectElastic7Deps,
257+
libraryDependencies ++= baseDeps ++ kafkaConnectElastic8Deps,
256258
publish / skip := true,
257259
packExcludeJars := Seq(
258260
"scala-.*\\.jar",
@@ -262,10 +264,33 @@ lazy val elastic7 = (project in file("kafka-connect-elastic7"))
262264
)
263265
.configureAssembly(true)
264266
.configureTests(baseTestDeps)
265-
.configureIntegrationTests(kafkaConnectElastic7TestDeps)
267+
.configureIntegrationTests(kafkaConnectElastic8TestDeps)
266268
.configureFunctionalTests()
267269
.enablePlugins(PackPlugin)
268270

271+
lazy val opensearch = (project in file("kafka-connect-opensearch"))
272+
.dependsOn(common)
273+
.dependsOn(`elastic-common`)
274+
.dependsOn(`test-common` % "fun->compile;it->compile")
275+
.settings(
276+
settings ++
277+
Seq(
278+
name := "kafka-connect-opensearch",
279+
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
280+
libraryDependencies ++= baseDeps ++ kafkaConnectOpenSearchDeps,
281+
publish / skip := true,
282+
packExcludeJars := Seq(
283+
"scala-.*\\.jar",
284+
"zookeeper-.*\\.jar",
285+
),
286+
),
287+
)
288+
.configureAssembly(false)
289+
.configureTests(baseTestDeps)
290+
//.configureIntegrationTests(kafkaConnectOpenSearchTestDeps)
291+
.configureFunctionalTests(bouncyCastle)
292+
.enablePlugins(PackPlugin)
293+
269294
lazy val http = (project in file("kafka-connect-http"))
270295
.dependsOn(common)
271296
//.dependsOn(`test-common` % "fun->compile")

kafka-connect-azure-documentdb/src/main/scala/io/lenses/streamreactor/connect/azure/documentdb/sink/DocumentDbSinkConnector.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.lenses.streamreactor.connect.azure.documentdb.sink
1717

18+
import cats.implicits.toBifunctorOps
1819
import io.lenses.streamreactor.common.config.Helpers
1920
import io.lenses.streamreactor.common.utils.JarManifest
2021
import io.lenses.streamreactor.connect.azure.documentdb.DocumentClientProvider
@@ -100,7 +101,7 @@ class DocumentDbSinkConnector private[sink] (builder: DocumentDbSinkSettings =>
100101
configProps = props
101102

102103
//check input topics
103-
Helpers.checkInputTopics(DocumentDbConfigConstants.KCQL_CONFIG, props.asScala.toMap)
104+
Helpers.checkInputTopics(DocumentDbConfigConstants.KCQL_CONFIG, props.asScala.toMap).leftMap(throw _)
104105

105106
val settings = DocumentDbSinkSettings(config)
106107

kafka-connect-cassandra/src/main/scala/io/lenses/streamreactor/connect/cassandra/CassandraConnection.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
*/
1616
package io.lenses.streamreactor.connect.cassandra
1717

18-
import io.lenses.streamreactor.common.config.SSLConfig
19-
import io.lenses.streamreactor.common.config.SSLConfigContext
2018
import io.lenses.streamreactor.connect.cassandra.config.CassandraConfigConstants
19+
import io.lenses.streamreactor.connect.cassandra.config.SSLConfig
20+
import io.lenses.streamreactor.connect.cassandra.config.SSLConfigContext
2121
import io.lenses.streamreactor.connect.cassandra.config.LoadBalancingPolicy
2222
import com.datastax.driver.core.Cluster.Builder
2323
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright 2017-2024 Lenses.io Ltd
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.lenses.streamreactor.connect.cassandra.config
17+
18+
import java.io.FileInputStream
19+
import java.security.KeyStore
20+
import java.security.SecureRandom
21+
import javax.net.ssl._
22+
23+
/**
24+
* Created by [email protected] on 14/04/16.
25+
* stream-reactor
26+
*/
27+
object SSLConfigContext {
28+
def apply(config: SSLConfig): SSLContext =
29+
getSSLContext(config)
30+
31+
/**
32+
* Get a SSL Connect for a given set of credentials
33+
*
34+
* @param config An SSLConfig containing key and truststore credentials
35+
* @return a SSLContext
36+
*/
37+
def getSSLContext(config: SSLConfig): SSLContext = {
38+
val useClientCertAuth = config.useClientCert
39+
40+
//is client certification authentication set
41+
val keyManagers: Array[KeyManager] = if (useClientCertAuth) {
42+
getKeyManagers(config)
43+
} else {
44+
Array[KeyManager]()
45+
}
46+
47+
val ctx: SSLContext = SSLContext.getInstance("SSL")
48+
val trustManagers = getTrustManagers(config)
49+
ctx.init(keyManagers, trustManagers, new SecureRandom())
50+
ctx
51+
}
52+
53+
/**
54+
* Get an array of Trust Managers
55+
*
56+
* @param config An SSLConfig containing key and truststore credentials
57+
* @return An Array of TrustManagers
58+
*/
59+
def getTrustManagers(config: SSLConfig): Array[TrustManager] = {
60+
val tsf = new FileInputStream(config.trustStorePath)
61+
val ts = KeyStore.getInstance(config.trustStoreType)
62+
ts.load(tsf, config.trustStorePass.toCharArray)
63+
val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
64+
tmf.init(ts)
65+
tmf.getTrustManagers
66+
}
67+
68+
/**
69+
* Get an array of Key Managers
70+
*
71+
* @param config An SSLConfig containing key and truststore credentials
72+
* @return An Array of KeyManagers
73+
*/
74+
def getKeyManagers(config: SSLConfig): Array[KeyManager] = {
75+
require(config.keyStorePath.nonEmpty, "Key store path is not set!")
76+
require(config.keyStorePass.nonEmpty, "Key store password is not set!")
77+
val ksf = new FileInputStream(config.keyStorePath.get)
78+
val ks = KeyStore.getInstance(config.keyStoreType)
79+
ks.load(ksf, config.keyStorePass.get.toCharArray)
80+
val kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
81+
kmf.init(ks, config.keyStorePass.get.toCharArray)
82+
kmf.getKeyManagers
83+
}
84+
85+
}
86+
87+
/**
88+
* Class for holding key and truststore settings
89+
*/
90+
case class SSLConfig(
91+
trustStorePath: String,
92+
trustStorePass: String,
93+
keyStorePath: Option[String],
94+
keyStorePass: Option[String],
95+
useClientCert: Boolean = false,
96+
keyStoreType: String = "JKS",
97+
trustStoreType: String = "JKS",
98+
)

kafka-connect-cassandra/src/main/scala/io/lenses/streamreactor/connect/cassandra/sink/CassandraSinkConnector.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,18 @@
1515
*/
1616
package io.lenses.streamreactor.connect.cassandra.sink
1717

18+
import cats.implicits.toBifunctorOps
19+
import com.typesafe.scalalogging.StrictLogging
1820
import io.lenses.streamreactor.common.config.Helpers
1921
import io.lenses.streamreactor.common.utils.JarManifest
20-
21-
import java.util
2222
import io.lenses.streamreactor.connect.cassandra.config.CassandraConfigConstants
2323
import io.lenses.streamreactor.connect.cassandra.config.CassandraConfigSink
24-
import com.typesafe.scalalogging.StrictLogging
2524
import org.apache.kafka.common.config.ConfigDef
2625
import org.apache.kafka.connect.connector.Task
2726
import org.apache.kafka.connect.errors.ConnectException
2827
import org.apache.kafka.connect.sink.SinkConnector
2928

29+
import java.util
3030
import scala.jdk.CollectionConverters.MapHasAsScala
3131
import scala.jdk.CollectionConverters.SeqHasAsJava
3232
import scala.util.Failure
@@ -66,7 +66,7 @@ class CassandraSinkConnector extends SinkConnector with StrictLogging {
6666
*/
6767
override def start(props: util.Map[String, String]): Unit = {
6868
//check input topics
69-
Helpers.checkInputTopics(CassandraConfigConstants.KCQL, props.asScala.toMap)
69+
Helpers.checkInputTopics(CassandraConfigConstants.KCQL, props.asScala.toMap).leftMap(throw _)
7070
configProps = props
7171
Try(new CassandraConfigSink(props.asScala.toMap)) match {
7272
case Failure(f) =>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2017-2024 Lenses.io Ltd
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.lenses.streamreactor.connect.cassandra.config
17+
18+
import org.scalatest.BeforeAndAfter
19+
import org.scalatest.matchers.should.Matchers
20+
import org.scalatest.wordspec.AnyWordSpec
21+
22+
import javax.net.ssl.KeyManager
23+
import javax.net.ssl.SSLContext
24+
import javax.net.ssl.TrustManager
25+
26+
/**
27+
* Created by [email protected] on 19/04/16.
28+
* stream-reactor
29+
*/
30+
class TestSSLConfigContext extends AnyWordSpec with Matchers with BeforeAndAfter {
31+
var sslConfig: SSLConfig = null
32+
var sslConfigNoClient: SSLConfig = null
33+
34+
before {
35+
val trustStorePath = getClass.getResource("/stc_truststore.jks").getPath
36+
val keystorePath = getClass.getResource("/stc_keystore.jks").getPath
37+
val trustStorePassword = "erZHDS9Eo0CcNo"
38+
val keystorePassword = "8yJQLUnGkwZxOw"
39+
sslConfig = SSLConfig(trustStorePath, trustStorePassword, Some(keystorePath), Some(keystorePassword), true)
40+
sslConfigNoClient = SSLConfig(trustStorePath, trustStorePassword, Some(keystorePath), Some(keystorePassword), false)
41+
}
42+
43+
"SSLConfigContext" should {
44+
"should return an Array of KeyManagers" in {
45+
val keyManagers = SSLConfigContext.getKeyManagers(sslConfig)
46+
keyManagers.length shouldBe 1
47+
val entry = keyManagers.head
48+
entry shouldBe a[KeyManager]
49+
}
50+
51+
"should return an Array of TrustManagers" in {
52+
val trustManager = SSLConfigContext.getTrustManagers(sslConfig)
53+
trustManager.length shouldBe 1
54+
val entry = trustManager.head
55+
entry shouldBe a[TrustManager]
56+
}
57+
58+
"should return a SSLContext" in {
59+
val context = SSLConfigContext(sslConfig)
60+
context.getProtocol shouldBe "SSL"
61+
context shouldBe a[SSLContext]
62+
}
63+
}
64+
}

kafka-connect-common/src/main/scala/io/lenses/streamreactor/common/config/Helpers.scala

+8-7
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.lenses.streamreactor.common.config
1717

18+
import cats.implicits.catsSyntaxEitherId
1819
import io.lenses.kcql.Kcql
1920
import com.typesafe.scalalogging.StrictLogging
2021
import org.apache.kafka.common.config.ConfigException
@@ -26,32 +27,32 @@ import org.apache.kafka.common.config.ConfigException
2627

2728
object Helpers extends StrictLogging {
2829

29-
def checkInputTopics(kcqlConstant: String, props: Map[String, String]): Boolean = {
30+
def checkInputTopics(kcqlConstant: String, props: Map[String, String]): Either[Throwable, Unit] = {
3031
val topics = props("topics").split(",").map(t => t.trim).toSet
3132
val raw = props(kcqlConstant)
3233
if (raw.isEmpty) {
33-
throw new ConfigException(s"Missing $kcqlConstant")
34+
return new ConfigException(s"Missing $kcqlConstant").asLeft
3435
}
3536
val kcql = raw.split(";").map(r => Kcql.parse(r)).toSet
3637
val sources = kcql.map(k => k.getSource)
3738
val res = topics.subsetOf(sources)
3839

3940
if (!res) {
4041
val missing = topics.diff(sources)
41-
throw new ConfigException(
42+
return new ConfigException(
4243
s"Mandatory `topics` configuration contains topics not set in $kcqlConstant: ${missing}, kcql contains $sources",
43-
)
44+
).asLeft
4445
}
4546

4647
val res1 = sources.subsetOf(topics)
4748

4849
if (!res1) {
4950
val missing = topics.diff(sources)
50-
throw new ConfigException(
51+
return new ConfigException(
5152
s"$kcqlConstant configuration contains topics not set in mandatory `topic` configuration: ${missing}, kcql contains $sources",
52-
)
53+
).asLeft
5354
}
5455

55-
true
56+
().asRight
5657
}
5758
}

0 commit comments

Comments
 (0)