Skip to content

Commit 5e03c59

Browse files
stheppistheppi
and
stheppi
authored
Fix: Prevent ElasticSearch from Skipping Records After Tombstone (#172)
* Fix: Prevent ElasticSearch from Skipping Records After Tombstone Overview This pull request addresses a critical bug in ElasticSearch versions 6 (ES6) and 7 (ES7) where records following a tombstone are inadvertently skipped during the insertion process. The issue stemmed from an erroneous return statement that halted the processing of subsequent records. Background In the current implementation, when a tombstone record is encountered within a sequence of records to be written to ElasticSearch, the insertion process prematurely exits due to a return instruction. This results in all records following the tombstone being ignored, leading to incomplete data ingestion and potential inconsistencies within the ElasticSearch indices. Changes Made Refactored Insert Method: Modularization: The original insert method has been decomposed into smaller, more focused functions. This enhances code readability, maintainability, and facilitates easier testing. Detailed Log Entries: Added log statements at key points within the insertion workflow ES Error not handled: Previously the response from ElasticSearch ignored failures. With this change, if any of the batch fail, the sink will raise an exception. * Avoid sending empty requests * Fix the unit tests --------- Co-authored-by: stheppi <[email protected]>
1 parent c272cd5 commit 5e03c59

File tree

7 files changed

+233
-145
lines changed

7 files changed

+233
-145
lines changed

kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/ElasticJsonWriter.scala

+111-72
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,21 @@ import io.lenses.streamreactor.common.errors.ErrorHandler
2525
import io.lenses.streamreactor.common.schemas.ConverterUtil
2626
import io.lenses.streamreactor.connect.elastic6.config.ElasticSettings
2727
import io.lenses.streamreactor.connect.elastic6.indexname.CreateIndex
28-
import com.fasterxml.jackson.databind.JsonNode
28+
import com.sksamuel.elastic4s.bulk.BulkCompatibleRequest
29+
import com.sksamuel.elastic4s.delete.DeleteByIdRequest
2930
import io.lenses.sql.Field
3031
import com.sksamuel.elastic4s.Index
31-
import com.sksamuel.elastic4s.Indexable
3232
import com.sksamuel.elastic4s.http.ElasticDsl._
33+
import com.sksamuel.elastic4s.http.Response
34+
import com.sksamuel.elastic4s.http.bulk.BulkResponse
3335
import com.typesafe.scalalogging.StrictLogging
36+
import io.lenses.json.sql.JacksonJson
3437
import io.lenses.streamreactor.connect.elastic6.NullValueBehavior.NullValueBehavior
3538
import io.lenses.streamreactor.connect.elastic6.config.ElasticConfigConstants.BEHAVIOR_ON_NULL_VALUES_PROPERTY
3639
import org.apache.kafka.connect.sink.SinkRecord
3740

3841
import scala.annotation.nowarn
42+
import scala.collection.immutable
3943
import scala.concurrent.ExecutionContext.Implicits.global
4044
import scala.concurrent.duration._
4145
import scala.concurrent.Await
@@ -117,88 +121,126 @@ class ElasticJsonWriter(client: KElasticClient, settings: ElasticSettings)
117121
* @param records A list of SinkRecords
118122
*/
119123
def insert(records: Map[String, Vector[SinkRecord]]): Unit = {
124+
logger.info(s"Inserting ${records.size} records")
120125
val fut = records.flatMap {
121126
case (topic, sinkRecords) =>
122-
val kcqls = topicKcqlMap.getOrElse(
127+
logger.debug(s"Inserting ${sinkRecords.size} records from $topic")
128+
val kcqls: Seq[Kcql] = topicKcqlMap.getOrElse(
123129
topic,
124130
throw new IllegalArgumentException(
125131
s"$topic hasn't been configured in KCQL. Configured topics is ${topicKcqlMap.keys.mkString(",")}",
126132
),
127133
)
128-
129-
//we might have multiple inserts from the same Kafka Message
130-
kcqls.flatMap { kcql =>
131-
val kcqlValue = kcqlMap.get(kcql)
134+
kcqls.flatMap { kcql: Kcql =>
135+
val kcqlValue: KcqlValues = kcqlMap.get(kcql)
132136
sinkRecords.grouped(settings.batchSize)
133137
.map { batch =>
134-
val indexes = batch.flatMap { r =>
135-
val i = CreateIndex.getIndexName(kcql, r).leftMap(throw _).merge
136-
val documentType = Option(kcql.getDocType).getOrElse(i)
137-
val (json, pks) = if (kcqlValue.primaryKeysPath.isEmpty) {
138-
(Transform(
139-
kcqlValue.fields,
140-
r.valueSchema(),
141-
r.value(),
142-
kcql.hasRetainStructure,
143-
),
144-
Seq.empty,
145-
)
146-
} else {
147-
TransformAndExtractPK(
148-
kcqlValue,
149-
r.valueSchema(),
150-
r.value(),
151-
kcql.hasRetainStructure,
152-
r.keySchema(),
153-
r.key(),
154-
r.headers(),
155-
)
156-
}
157-
val idFromPk = pks.mkString(settings.pkJoinerSeparator)
158-
159-
if (json.isEmpty || json.exists(_.isEmpty)) {
160-
(kcqlValue.behaviorOnNullValues) match {
161-
case NullValueBehavior.DELETE =>
162-
Some(deleteById(new Index(i), documentType, if (idFromPk.isEmpty) autoGenId(r) else idFromPk))
163-
164-
case NullValueBehavior.FAIL =>
165-
throw new IllegalStateException(
166-
s"$topic KCQL mapping is configured to fail on null value, yet it occurred.",
167-
)
168-
169-
case NullValueBehavior.IGNORE =>
170-
return None
171-
}
172-
173-
} else {
174-
kcql.getWriteMode match {
175-
case WriteModeEnum.INSERT =>
176-
Some(
177-
indexInto(i / documentType)
178-
.id(if (idFromPk.isEmpty) autoGenId(r) else idFromPk)
179-
.pipeline(kcql.getPipeline)
180-
.source(json.get.toString),
181-
)
182-
183-
case WriteModeEnum.UPSERT =>
184-
require(pks.nonEmpty, "Error extracting primary keys")
185-
Some(update(idFromPk)
186-
.in(i / documentType)
187-
.docAsUpsert(json.get)(IndexableJsonNode))
188-
}
189-
190-
}
138+
batch.flatMap { r =>
139+
processRecord(topic, kcql, kcqlValue, r)
191140
}
192-
193-
client.execute(bulk(indexes).refreshImmediately)
141+
}
142+
.filter(_.nonEmpty)
143+
.map { indexes =>
144+
client.execute(bulk(indexes))
194145
}
195146
}
196147
}
197148

149+
handleResponse(fut)
150+
}
151+
152+
private def handleTombstone(
153+
topic: String,
154+
kcqlValue: KcqlValues,
155+
r: SinkRecord,
156+
i: String,
157+
idFromPk: String,
158+
documentType: String,
159+
): Option[DeleteByIdRequest] =
160+
kcqlValue.behaviorOnNullValues match {
161+
case NullValueBehavior.DELETE =>
162+
val identifier = if (idFromPk.isEmpty) autoGenId(r) else idFromPk
163+
logger.debug(
164+
s"Deleting tombstone record: ${r.topic()} ${r.kafkaPartition()} ${r.kafkaOffset()}. Index: $i, Identifier: $identifier",
165+
)
166+
Some(deleteById(new Index(i), documentType, identifier))
167+
168+
case NullValueBehavior.FAIL =>
169+
logger.error(
170+
s"Tombstone record received ${r.topic()} ${r.kafkaPartition()} ${r.kafkaOffset()}. $topic KCQL mapping is configured to fail on tombstone records.",
171+
)
172+
throw new IllegalStateException(
173+
s"$topic KCQL mapping is configured to fail on tombstone records.",
174+
)
175+
176+
case NullValueBehavior.IGNORE =>
177+
logger.info(
178+
s"Ignoring tombstone record received. for ${r.topic()} ${r.kafkaPartition()} ${r.kafkaOffset()}.",
179+
)
180+
None
181+
}
182+
183+
private def processRecord(
184+
topic: String,
185+
kcql: Kcql,
186+
kcqlValue: KcqlValues,
187+
r: SinkRecord,
188+
): Option[BulkCompatibleRequest] = {
189+
val i = CreateIndex.getIndexName(kcql, r).leftMap(throw _).merge
190+
val documentType = Option(kcql.getDocType).getOrElse(i)
191+
val (json, pks) = if (kcqlValue.primaryKeysPath.isEmpty) {
192+
(Transform(kcqlValue.fields, r.valueSchema(), r.value(), kcql.hasRetainStructure), Seq.empty)
193+
} else {
194+
TransformAndExtractPK(kcqlValue,
195+
r.valueSchema(),
196+
r.value(),
197+
kcql.hasRetainStructure,
198+
r.keySchema(),
199+
r.key(),
200+
r.headers(),
201+
)
202+
}
203+
val idFromPk = pks.mkString(settings.pkJoinerSeparator)
204+
205+
json.filterNot(_.isEmpty) match {
206+
case Some(value) =>
207+
kcql.getWriteMode match {
208+
case WriteModeEnum.INSERT =>
209+
Some(
210+
indexInto(i / documentType)
211+
.id(if (idFromPk.isEmpty) autoGenId(r) else idFromPk)
212+
.pipeline(kcql.getPipeline)
213+
.source(value.toString),
214+
)
215+
216+
case WriteModeEnum.UPSERT =>
217+
require(pks.nonEmpty, "Error extracting primary keys")
218+
Some(update(idFromPk)
219+
.in(i / documentType)
220+
.docAsUpsert(value.toString))
221+
}
222+
case None =>
223+
handleTombstone(topic, kcqlValue, r, i, idFromPk, documentType)
224+
}
225+
}
226+
227+
private def handleResponse(fut: immutable.Iterable[Future[Response[BulkResponse]]]): Unit = {
198228
handleTry(
199-
Try(
200-
Await.result(Future.sequence(fut), settings.writeTimeout.seconds),
201-
),
229+
Try {
230+
val result: immutable.Iterable[Response[BulkResponse]] =
231+
Await.result(Future.sequence(fut), settings.writeTimeout.seconds)
232+
val errors = result.filter(_.isError).map(_.error)
233+
if (errors.nonEmpty) {
234+
logger.error(s"Error writing to Elastic Search: ${JacksonJson.asJson(errors)}")
235+
throw new RuntimeException(s"Error writing to Elastic Search: ${errors.map(_.reason)}")
236+
}
237+
logger.info(
238+
s"Inserted ${result.size} records. ${result.map { r =>
239+
s"Items: ${r.result.items.size} took ${r.result.took}ms."
240+
}.mkString(",")}",
241+
)
242+
result
243+
},
202244
)
203245
()
204246
}
@@ -220,6 +262,3 @@ case class KcqlValues(
220262
primaryKeysPath: Seq[Vector[String]],
221263
behaviorOnNullValues: NullValueBehavior,
222264
)
223-
case object IndexableJsonNode extends Indexable[JsonNode] {
224-
override def json(t: JsonNode): String = t.toString
225-
}

kafka-connect-elastic6/src/main/scala/io/lenses/streamreactor/connect/elastic6/KElasticClient.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import scala.concurrent.Future
3535
trait KElasticClient extends AutoCloseable {
3636
def index(kcql: Kcql): Unit
3737

38-
def execute(definition: BulkRequest): Future[Any]
38+
def execute(definition: BulkRequest): Future[Response[BulkResponse]]
3939
}
4040

4141
object KElasticClient extends StrictLogging {

kafka-connect-elastic6/src/test/scala/io/lenses/streamreactor/connect/elastic6/ElasticJsonWriterTest.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ class ElasticJsonWriterTest extends TestBase with MockitoSugar {
8686

8787
val sourceTopic = "SOURCE"
8888
val targetShard = "SHARD"
89-
val kcql = Kcql.parse(s"INSERT INTO $targetShard SELECT * FROM $sourceTopic ")
89+
val kcql =
90+
Kcql.parse(s"INSERT INTO $targetShard SELECT * FROM $sourceTopic PROPERTIES('behavior.on.null.values'='IGNORE')")
9091

9192
val recordKey = "KEY"
9293
val tombstoneValue: Null = null
@@ -161,7 +162,7 @@ class ElasticJsonWriterTest extends TestBase with MockitoSugar {
161162
val exception = intercept[Exception](target.write(Vector(sinkRecord)))
162163

163164
exception shouldBe a[IllegalStateException]
164-
exception.getMessage should be(s"$sourceTopic KCQL mapping is configured to fail on null value, yet it occurred.")
165+
exception.getMessage should be(s"$sourceTopic KCQL mapping is configured to fail on tombstone records.")
165166
}
166167

167168
}

0 commit comments

Comments
 (0)