Skip to content

Commit 54d1db5

Browse files
stheppistheppidavidsloan
authored
HTTP extractor (#198)
* HTTP extractor A bug was reported that referencing the entire record value yields an exception when the payload is a complex type. In this case a HashMap. The changes allow the return of the value when the extractor has no field to extract (ie. extract the full value) * Remove the comment * Improve code readability Use the Scala Map to Java map for code readability Co-authored-by: David Sloan <[email protected]> * Fix the unit test after committing the code change suggestion * Fix the unit test expecting error on unknown type and no path --------- Co-authored-by: stheppi <[email protected]> Co-authored-by: David Sloan <[email protected]>
1 parent 4857962 commit 54d1db5

File tree

2 files changed

+31
-3
lines changed

2 files changed

+31
-3
lines changed

kafka-connect-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/KafkaConnectExtractor.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ object KafkaConnectExtractor extends LazyLogging {
6363
case (mapVal: Map[_, _], Some(pnp)) => MapExtractor.extractPathFromMap(mapVal.asJava, pnp, extractSchema.orNull)
6464
case (mapVal: util.Map[_, _], Some(pnp)) => MapExtractor.extractPathFromMap(mapVal, pnp, extractSchema.orNull)
6565
case (listVal: util.List[_], Some(pnp)) => ArrayExtractor.extractPathFromArray(listVal, pnp, extractSchema.orNull)
66+
case (left, None) => left.asRight
6667
case (leftVal, rightVal) => new ConnectException(
6768
s"Unknown value type: `${Try(leftVal.getClass.getName).getOrElse("undefined")}`, string representation: '${Try(
6869
leftVal.toString,

kafka-connect-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/extractors/KafkaConnectExtractorTest.scala

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,16 @@ package io.lenses.streamreactor.connect.cloud.common.sink.extractors
1818
import cats.implicits.catsSyntaxOptionId
1919
import org.apache.kafka.connect.data.Schema
2020
import org.apache.kafka.connect.data.SchemaBuilder
21-
import org.scalatest.flatspec.AnyFlatSpec
22-
import org.scalatest.matchers.should.Matchers
2321
import org.apache.kafka.connect.data.Struct
2422
import org.apache.kafka.connect.sink.SinkRecord
2523
import org.mockito.MockitoSugar
2624
import org.scalatest.EitherValues
25+
import org.scalatest.flatspec.AnyFlatSpec
26+
import org.scalatest.matchers.should.Matchers
2727

28+
import java.util
2829
import scala.jdk.CollectionConverters.MapHasAsJava
30+
import scala.jdk.CollectionConverters.MapHasAsScala
2931
import scala.jdk.CollectionConverters.SeqHasAsJava
3032

3133
class KafkaConnectExtractorTest extends AnyFlatSpec with Matchers with MockitoSugar with EitherValues {
@@ -48,6 +50,31 @@ class KafkaConnectExtractorTest extends AnyFlatSpec with Matchers with MockitoSu
4850
result shouldBe Right("testValue")
4951
}
5052

53+
"extract" should "handle a java.util.HashMap" in {
54+
val dataMap: Map[String, Any] = Map(
55+
"gender" -> "male",
56+
"tenantId" -> "tenantid",
57+
"age" -> "25",
58+
)
59+
60+
val map: Map[String, Any] = Map(
61+
"data" -> dataMap,
62+
"specversion" -> "1.0",
63+
"source" -> "da915f_87b9_414a_9f70_5f794a1310de",
64+
"id" -> "4d8f4a0f-cba5-45e4-9f9d-8168bfda77",
65+
"time" -> "2024-11-14T06:58:47.472733Z",
66+
"type" -> "event.type.v1",
67+
"publishtime" -> "2024-11-14T06:58:48.235556Z",
68+
)
69+
70+
val sinkRecord = mock[SinkRecord]
71+
when(sinkRecord.value()).thenReturn(map.asJava)
72+
when(sinkRecord.valueSchema()).thenReturn(null)
73+
74+
val result = KafkaConnectExtractor.extractFromValue(sinkRecord, None)
75+
result.map(_.asInstanceOf[util.Map[_, _]].asScala.toMap) shouldBe Right(map)
76+
}
77+
5178
"extract" should "handle different types correctly" in {
5279
KafkaConnectExtractor.extract(123: java.lang.Integer, None, None) shouldBe Right(123)
5380
KafkaConnectExtractor.extract(123L: java.lang.Long, None, None) shouldBe Right(123L)
@@ -90,7 +117,7 @@ class KafkaConnectExtractorTest extends AnyFlatSpec with Matchers with MockitoSu
90117

91118
it should "return an error for unknown types" in {
92119
val unknownType = new Object()
93-
val result = KafkaConnectExtractor.extract(unknownType, None, None)
120+
val result = KafkaConnectExtractor.extract(unknownType, None, Some("Empty"))
94121
val message = result.left.value.getMessage
95122
message should startWith("Unknown value type: `java.lang.Object`")
96123
message should endWith("path: `Empty`")

0 commit comments

Comments
 (0)