From 86528799a86784199422594ad7c59f853794565e Mon Sep 17 00:00:00 2001 From: Stefan Bocutiu Date: Tue, 12 Nov 2024 13:51:59 +0000 Subject: [PATCH] Improvements to the HTTP Sink (#163) * Improvements to the HTTP Sink This PR introduces the following enhancements to the HTTP Sink: 1. **Queue Limiting**: We've set a limit on the queue size per topic to reduce the chances of an Out-of-Memory (OOM) issue. Previously the queue was unbounded and in a scenario where the http calls are slow and the sink gets more records than it clears, it would eventually lead to OOM. 2. **Offering Timeout**: The offering to the queue now includes a timeout. If there are records to be offered, but the timeout is exceeded, a retriable exception is thrown. Depending on the connector's retry settings, the operation will be attempted again. This helps avoid situations where the sink gets stuck processing a slow or unresponsive batch. 3. **Duplicate Record Handling**: To prevent the same records from being added to the queue multiple times, we've introduced a `Map[TopicPartition, Offset]` to track the last processed offset for each topic-partition. This ensures that the sink does not attempt to process the same records repeatedly. 4. **Batch Failure Handling**: The changes also address a situation where an HTTP call fails due to a specific input, but the batch is not removed from the queue. This could have led to the batch being retried indefinitely, which is now prevented. In the near future, there will be a new PR to further reduce the code complexity around the batching approach and the boilerplate code. * fix the unit test * Rename variable * Removes the invalid functional tests. a failed batch request is not retried anymore. * Remove unused functions --------- Co-authored-by: stheppi --- .../connect/test/HttpSinkTest.scala | 78 ----------------- .../connect/http/sink/HttpWriter.scala | 10 ++- .../connect/http/sink/HttpWriterManager.scala | 16 +++- .../connect/http/sink/RecordsQueue.scala | 85 +++++++++++++++++-- .../http/sink/config/HttpSinkConfig.scala | 11 +++ .../http/sink/config/HttpSinkConfigDef.scala | 28 ++++++ .../connect/http/sink/RecordsQueueTest.scala | 85 +++++++++++++++++-- 7 files changed, 218 insertions(+), 95 deletions(-) diff --git a/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpSinkTest.scala b/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpSinkTest.scala index 3b1f65ea3..ff62f1e8c 100644 --- a/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpSinkTest.scala +++ b/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpSinkTest.scala @@ -132,45 +132,6 @@ class HttpSinkTest } } - /** - * Retries occur by default and the failed HTTP post will be retried twice before succeeding. - */ - test("failing scenario written to error reporter") { - - setUpWiremockFailureResponse() - - sendRecordsWithProducer( - stringProducer, - stringConverters, - randomTestId, - topic, - "My Static Content Template", - BatchSizeSingleRecord, - false, - 1, - 2, - "Record number 1", - ).asserting { - case (requests, successReporterRecords, failureReporterRecords) => - requests.size should be(3) - requests.map(_.getBody).map(new String(_)).toSet should contain only "My Static Content Template" - requests.map(_.getMethod).toSet should be(Set(RequestMethod.POST)) - - failureReporterRecords.size should be(2) - failureReporterRecords.foreach { - rec => - rec.topic() should be(failureTopicName) - rec.value() should be("My Static Content Template") - } - - successReporterRecords.size should be(1) - val successRecord = successReporterRecords.head - successRecord.topic() should be(successTopicName) - successRecord.value() should be("My Static Content Template") - - } - } - test("dynamic string template containing message content should be sent to endpoint") { setUpWiremockResponse() @@ -311,45 +272,6 @@ class HttpSinkTest () } - private def setUpWiremockFailureResponse(): Unit = { - WireMock.configureFor(container.getHost, container.getFirstMappedPort) - WireMock.resetAllScenarios() - WireMock.resetAllRequests() - WireMock.resetToDefault() - WireMock.reset() - - val url = s"/$randomTestId" - - stubFor( - post(urlEqualTo(url)) - .inScenario("failure") - .whenScenarioStateIs("STARTED") - .willSetStateTo("ONE ATTEMPT") - .willReturn(aResponse.withStatus(404).withHeader("Content-Type", "text/plain").withBody("File Not Found")), - ) - - stubFor( - post(urlEqualTo(url)) - .inScenario("failure") - .whenScenarioStateIs("ONE ATTEMPT") - .willSetStateTo("TWO ATTEMPTS") - .willReturn(aResponse.withStatus(404).withHeader("Content-Type", "text/plain").withBody("File Not Found")), - ) - - stubFor( - post(urlEqualTo(url)) - .inScenario("failure") - .whenScenarioStateIs("TWO ATTEMPTS") - .willReturn(aResponse.withHeader("Content-Type", "text/plain") - .withBody("Hello world!")), - ) - - WireMock.setScenarioState("failure", "STARTED") - - () - - } - def getBootstrapServers: String = s"PLAINTEXT://kafka:9092" private def sendRecordsWithProducer[K, V]( diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala index f5941ad44..8a4b2fd40 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala @@ -80,19 +80,21 @@ class HttpWriter( nonEmptyBatchInfo: NonEmptyBatchInfo, batch: NonEmptySeq[RenderedRecord], totalQueueSize: Int, - ) = + ): IO[Unit] = for { _ <- IO( logger.debug(s"[$sinkName] HttpWriter.process, batch of ${batch.length}, queue size: $totalQueueSize"), ) + // remove the batch from the queue before any of the operation + _ <- recordsQueue.dequeue(batch) _ <- IO.delay(logger.trace(s"[$sinkName] modifyCommitContext for batch of ${nonEmptyBatchInfo.batch.length}")) _ <- flush(nonEmptyBatchInfo.batch) updatedCommitContext = updateCommitContextPostCommit(nonEmptyBatchInfo.updatedCommitContext) _ <- IO.delay(logger.trace(s"[$sinkName] Updating sink context to: $updatedCommitContext")) _ <- commitContextRef.set(updatedCommitContext) - removedElements <- recordsQueue.dequeue(batch) - _ <- resetErrorsInCommitContext() - } yield removedElements + + _ <- resetErrorsInCommitContext() + } yield () def preCommit( initialOffsetAndMetaMap: Map[TopicPartition, OffsetAndMetadata], diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala index 965725ec2..8b46df3dd 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala @@ -26,6 +26,7 @@ import com.typesafe.scalalogging.LazyLogging import com.typesafe.scalalogging.StrictLogging import io.lenses.streamreactor.common.util.EitherUtils.unpackOrThrow import io.lenses.streamreactor.common.utils.CyclopsToScalaOption.convertToScalaOption +import io.lenses.streamreactor.connect.cloud.common.model.Offset import io.lenses.streamreactor.connect.cloud.common.model.Topic import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition import io.lenses.streamreactor.connect.http.sink.client.HttpRequestSender @@ -50,6 +51,7 @@ import java.net.http.HttpClient import java.time.Duration import scala.collection.immutable.Queue import scala.concurrent.duration.DurationInt +import scala.concurrent.duration.FiniteDuration /** * The `HttpWriterManager` object provides a factory method to create an instance of `HttpWriterManager`. @@ -123,6 +125,8 @@ object HttpWriterManager extends StrictLogging { config.tidyJson, config.errorReportingController, config.successReportingController, + config.maxQueueSize, + config.maxQueueOfferTimeout, ) } @@ -172,6 +176,8 @@ class HttpWriterManager( tidyJson: Boolean, errorReportingController: ReportingController[HttpFailureConnectorSpecificRecordData], successReportingController: ReportingController[HttpSuccessConnectorSpecificRecordData], + maxQueueSize: Int, + maxQueueOfferTimeout: FiniteDuration, )( implicit t: Temporal[IO], @@ -184,15 +190,21 @@ class HttpWriterManager( */ private def createNewHttpWriter(): IO[HttpWriter] = for { - batchPolicy <- IO.pure(batchPolicy) recordsQueueRef <- Ref.of[IO, Queue[RenderedRecord]](Queue.empty) commitContextRef <- Ref.of[IO, HttpCommitContext](HttpCommitContext.default(sinkName)) + offsetsRef <- Ref.of[IO, Map[TopicPartition, Offset]](Map.empty) } yield new HttpWriter( sinkName = sinkName, sender = httpRequestSender, template = template, recordsQueue = - new RecordsQueue(recordsQueueRef, commitContextRef, batchPolicy), + new RecordsQueue(recordsQueueRef, + commitContextRef, + batchPolicy, + maxQueueSize, + maxQueueOfferTimeout, + offsetsRef, + ), errorThreshold = errorThreshold, tidyJson = tidyJson, errorReporter = errorReportingController, diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/RecordsQueue.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/RecordsQueue.scala index 02429930e..e4f460f83 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/RecordsQueue.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/RecordsQueue.scala @@ -22,8 +22,14 @@ import io.lenses.streamreactor.connect.http.sink.RecordsQueueBatcher.takeBatch import io.lenses.streamreactor.connect.http.sink.commit.BatchPolicy import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitContext import io.lenses.streamreactor.connect.http.sink.tpl.RenderedRecord +import cats.implicits.toFoldableOps +import io.lenses.streamreactor.connect.cloud.common.model.Offset +import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition +import org.apache.kafka.connect.errors.RetriableException import scala.collection.immutable.Queue +import scala.concurrent.duration.DurationInt +import scala.concurrent.duration.FiniteDuration /** * The `RecordsQueue` class manages a queue of `RenderedRecord` objects and handles the logic for @@ -37,19 +43,85 @@ class RecordsQueue( val recordsQueue: Ref[IO, Queue[RenderedRecord]], commitContextRef: Ref[IO, HttpCommitContext], batchPolicy: BatchPolicy, + maxSize: Int, + offerTimeout: FiniteDuration, + offsetMapRef: Ref[IO, Map[TopicPartition, Offset]], ) extends LazyLogging { /** - * Enqueues a sequence of `RenderedRecord` objects into the queue. + * Enqueues a sequence of `RenderedRecord` objects into the queue, with a maximum size limit. + * If the queue is full, it retries adding the remaining records within the specified timeout. + * If after the timeout records remain, it throws a RetriableException. + * Also, it discards any records for which the offset was already queued. * * @param records The records to be enqueued. - * @return An `IO` action that enqueues the records. + * @return An `IO` action that enqueues the records or throws a RetriableException if the queue remains full. */ - def enqueueAll(records: NonEmptySeq[RenderedRecord]): IO[Unit] = + def enqueueAll(records: NonEmptySeq[RenderedRecord]): IO[Unit] = { + + // Filter out records with offsets that have already been processed + def filterDuplicates(records: List[RenderedRecord], offsetMap: Map[TopicPartition, Offset]): List[RenderedRecord] = + records.filter { record => + val tp = record.topicPartitionOffset.toTopicPartition + offsetMap.get(tp) match { + case Some(lastOffset) if record.topicPartitionOffset.offset.value <= lastOffset.value => + // Offset already processed, discard this record + false + case _ => + true + } + } + + def attemptEnqueue(remainingRecords: List[RenderedRecord], startTime: Long): IO[Unit] = + if (remainingRecords.isEmpty) { + IO.unit + } else { + for { + currentTime <- IO.realTime.map(_.toMillis) + elapsedTime = currentTime - startTime + _ <- if (elapsedTime >= offerTimeout.toMillis) { + IO.raiseError(new RetriableException("Enqueue timed out and records remain")) + } else { + for { + (recordsToAdd, recordsRemaining) <- recordsQueue.modify { queue => + val queueSize = queue.size + val spaceAvailable = maxSize - queueSize + val recordsToAdd = remainingRecords.take(spaceAvailable) + val recordsRemaining = remainingRecords.drop(spaceAvailable) + val newQueue = queue.enqueueAll(recordsToAdd) + (newQueue, (recordsToAdd, recordsRemaining)) + } + _ <- if (recordsToAdd.nonEmpty) { + // Update the offset map with the offsets of the records that were actually enqueued + offsetMapRef.update { offsetMap => + recordsToAdd.foldLeft(offsetMap) { (accOffsets, record) => + val tp = record.topicPartitionOffset.toTopicPartition + val offset = record.topicPartitionOffset.offset + // Only update if the new offset is greater + val updatedOffset: Offset = accOffsets.get(tp) match { + case Some(existingOffset) if existingOffset.value >= offset.value => existingOffset + case _ => offset + } + accOffsets.updated(tp, updatedOffset) + } + } + } else IO.unit + _ <- if (recordsRemaining.nonEmpty) { + IO.sleep(5.millis) *> + attemptEnqueue(recordsRemaining, startTime) + } else IO.unit + } yield () + } + } yield () + } + for { - _ <- IO.delay(logger.debug(s"${records.length} records added to $recordsQueue")) - _ <- recordsQueue.getAndUpdate(q => q ++ records.toSeq).void + offsetMap <- offsetMapRef.get + uniqueRecords = filterDuplicates(records.toList, offsetMap) + startTime <- IO.realTime.map(_.toMillis) + _ <- attemptEnqueue(uniqueRecords, startTime) } yield () + } /** * Takes a batch of records from the queue based on the commit policy. @@ -78,8 +150,9 @@ class RecordsQueue( def dequeue(nonEmptyBatch: NonEmptySeq[RenderedRecord]): IO[Unit] = recordsQueue.access.flatMap { case (records, updater) => + val lookup = nonEmptyBatch.toSeq.toSet for { - newQueue <- IO(records.dropWhile(nonEmptyBatch.toSeq.contains)) + newQueue <- IO(records.dropWhile(lookup.contains)) _ <- updater(newQueue) _ <- IO.delay(logger.debug("Queue before: {}, after: {}", records, newQueue)) } yield () diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala index 919ec7b93..9bba537a8 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala @@ -46,6 +46,7 @@ import java.net.MalformedURLException import java.net.URL import java.time.Clock import java.time.Duration +import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ import scala.util.Try @@ -101,6 +102,8 @@ case class HttpSinkConfig( tidyJson: Boolean, errorReportingController: ReportingController[HttpFailureConnectorSpecificRecordData], successReportingController: ReportingController[HttpSuccessConnectorSpecificRecordData], + maxQueueSize: Int, + maxQueueOfferTimeout: FiniteDuration, ) object HttpSinkConfig { @@ -159,6 +162,12 @@ object HttpSinkConfig { connectConfig, ), ) + + maxQueueSize = connectConfig.getInt(HttpSinkConfigDef.MaxQueueSizeProp) + maxQueueOfferTimeout = FiniteDuration( + connectConfig.getLong(HttpSinkConfigDef.MaxQueueOfferTimeoutProp), + scala.concurrent.duration.MILLISECONDS, + ) } yield HttpSinkConfig( method, endpoint, @@ -175,6 +184,8 @@ object HttpSinkConfig { jsonTidy, errorReportingController, successReportingController, + maxQueueSize, + maxQueueOfferTimeout, ) } diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala index ed3a6fe29..ca7872146 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala @@ -158,6 +158,20 @@ object HttpSinkConfigDef { |Literal to output in templates in place of a null payload. Values are `error` (raises an error), `empty` (empty string, eg ""), `null` (the literal 'null') or `custom` (a string of your choice, as defined by `$CustomNullPayloadHandler`). `Defaults to `error`. |""".stripMargin + val MaxQueueSizeProp: String = "connect.http.max.queue.size" + val MaxQueueSizeDoc: String = + """ + |The maximum number of records to queue per topic before blocking. If the queue limit is reached the connector will throw RetriableException and the connector settings to handle retries will be used. + |""".stripMargin + val MaxQueueSizeDefault = 1000000 + + val MaxQueueOfferTimeoutProp: String = "connect.http.max.queue.offer.timeout.ms" + val MaxQueueOfferTimeoutDoc: String = + """ + |The maximum time in milliseconds to wait for the queue to accept a record. If the queue does not accept the record within this time, the connector will throw RetriableException and the connector settings to handle retries will be used. + |""".stripMargin + val MaxQueueOfferTimeoutDefault = 120000 + val config: ConfigDef = { val configDef = new ConfigDef() .withClientSslSupport() @@ -292,6 +306,20 @@ object HttpSinkConfigDef { Importance.HIGH, CustomNullPayloadHandlerDoc, ) + .define( + MaxQueueSizeProp, + Type.INT, + MaxQueueSizeDefault, + Importance.HIGH, + MaxQueueSizeDoc, + ) + .define( + MaxQueueOfferTimeoutProp, + Type.LONG, + MaxQueueOfferTimeoutDefault, + Importance.HIGH, + MaxQueueOfferTimeoutDoc, + ) ReporterConfig.withErrorRecordReportingSupport(configDef) ReporterConfig.withSuccessRecordReportingSupport(configDef) OAuth2Config.append(configDef) diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/RecordsQueueTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/RecordsQueueTest.scala index 0398e14df..167fe8966 100644 --- a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/RecordsQueueTest.scala +++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/RecordsQueueTest.scala @@ -34,18 +34,21 @@ import cats.data.NonEmptySeq import cats.effect.IO import cats.effect.kernel.Ref import cats.effect.testing.scalatest.AsyncIOSpec +import io.lenses.streamreactor.connect.cloud.common.model.Offset import io.lenses.streamreactor.connect.cloud.common.model.Topic import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition import io.lenses.streamreactor.connect.http.sink.commit.BatchPolicy import io.lenses.streamreactor.connect.http.sink.commit.BatchResult import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitContext import io.lenses.streamreactor.connect.http.sink.tpl.RenderedRecord +import org.apache.kafka.connect.errors.RetriableException import org.mockito.ArgumentMatchers.any import org.mockito.MockitoSugar import org.scalatest.funsuite.AsyncFunSuiteLike import org.scalatest.matchers.should.Matchers import scala.collection.immutable.Queue +import scala.concurrent.duration.DurationInt class RecordsQueueTest extends AsyncFunSuiteLike with AsyncIOSpec with MockitoSugar with Matchers { @@ -59,13 +62,15 @@ class RecordsQueueTest extends AsyncFunSuiteLike with AsyncIOSpec with MockitoSu private val record1 = RenderedRecord(topicPartition.atOffset(100), timestamp, "record1", Seq.empty, testEndpoint) private val record2 = RenderedRecord(topicPartition.atOffset(101), timestamp, "record2", Seq.empty, testEndpoint) + private val record3 = RenderedRecord(topicPartition.atOffset(102), timestamp, "record3", Seq.empty, testEndpoint) test("enqueueAll should add all records to the queue") { { for { commitContext <- Ref[IO].of(defaultContext) recordsQueueRef <- Ref[IO].of(Queue.empty[RenderedRecord]) - recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, mock[BatchPolicy]) + offsetsRef <- Ref[IO].of(Map.empty[TopicPartition, Offset]) + recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, mock[BatchPolicy], 10000, 1.minute, offsetsRef) _ <- recordsQueue.enqueueAll(NonEmptySeq.of(record1, record2)) refValue <- recordsQueueRef.get } yield refValue @@ -87,7 +92,8 @@ class RecordsQueueTest extends AsyncFunSuiteLike with AsyncIOSpec with MockitoSu for { commitContext <- Ref[IO].of(defaultContext) recordsQueueRef <- Ref[IO].of(Queue(record1, record2)) - recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, commitPolicy) + offsetsRef <- Ref[IO].of(Map.empty[TopicPartition, Offset]) + recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, commitPolicy, 10000, 1.minute, offsetsRef) batchInfo <- recordsQueue.popBatch() } yield batchInfo } asserting { @@ -101,7 +107,8 @@ class RecordsQueueTest extends AsyncFunSuiteLike with AsyncIOSpec with MockitoSu for { commitContext <- Ref[IO].of(defaultContext) recordsQueueRef <- Ref[IO].of(Queue.empty[RenderedRecord]) - recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, mock[BatchPolicy]) + offsetsRef <- Ref[IO].of(Map.empty[TopicPartition, Offset]) + recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, mock[BatchPolicy], 10000, 1.minute, offsetsRef) batchInfo <- recordsQueue.popBatch() } yield batchInfo } asserting { @@ -117,7 +124,8 @@ class RecordsQueueTest extends AsyncFunSuiteLike with AsyncIOSpec with MockitoSu for { commitContext <- Ref[IO].of(defaultContext) recordsQueueRef <- Ref[IO].of(Queue(record1, record2)) - recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, mock[BatchPolicy]) + offsetsRef <- Ref[IO].of(Map.empty[TopicPartition, Offset]) + recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, mock[BatchPolicy], 10000, 1.minute, offsetsRef) _ <- recordsQueue.dequeue(records) refValue <- recordsQueueRef.get } yield refValue @@ -134,7 +142,8 @@ class RecordsQueueTest extends AsyncFunSuiteLike with AsyncIOSpec with MockitoSu for { commitContext <- Ref[IO].of(defaultContext) recordsQueueRef <- Ref[IO].of(Queue(record1)) - recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, mock[BatchPolicy]) + offsetsRef <- Ref[IO].of(Map.empty[TopicPartition, Offset]) + recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, mock[BatchPolicy], 10000, 1.minute, offsetsRef) _ <- recordsQueue.dequeue(records) refValue <- recordsQueueRef.get } yield refValue @@ -143,4 +152,70 @@ class RecordsQueueTest extends AsyncFunSuiteLike with AsyncIOSpec with MockitoSu backingQueue should contain theSameElementsInOrderAs Seq(record1) } } + + test("enqueue all should throw RetriableException if the queue size is exceeded") { + val records = NonEmptySeq.of(record1, record2) + val commitPolicy = mock[BatchPolicy] + + val ioAction = for { + commitContext <- Ref[IO].of(defaultContext) + recordsQueueRef <- Ref[IO].of(Queue.empty[RenderedRecord]) + offsetsRef <- Ref[IO].of(Map.empty[TopicPartition, Offset]) + recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, commitPolicy, 1, 2.seconds, offsetsRef) + _ <- recordsQueue.enqueueAll(records) + } yield () + + ioAction.attempt.map { + case Left(e: RetriableException) => + e.getMessage should be("Enqueue timed out and records remain") + case Left(e) => + fail(s"Expected RetriableException but got ${e}") + case Right(_) => + fail("Expected RetriableException but enqueueAll succeeded") + } + + } + + test(" does not enqueue a record which was enqueued before") { + val records = NonEmptySeq.of(record1, record2) + val commitPolicy = mock[BatchPolicy] + val newRecords = NonEmptySeq.of(record2, record3) + + val ioAction = for { + commitContext <- Ref[IO].of(defaultContext) + recordsQueueRef <- Ref[IO].of(Queue.empty[RenderedRecord]) + offsetsRef <- Ref[IO].of(Map.empty[TopicPartition, Offset]) + recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, commitPolicy, 10000, 1.minute, offsetsRef) + _ <- recordsQueue.enqueueAll(records) + _ <- recordsQueue.enqueueAll(newRecords) + queue <- recordsQueueRef.get + } yield queue.toList + + ioAction asserting { + queue => + queue should contain theSameElementsInOrderAs List(record1, record2, record3) + } + } + + test("enqueue when the queue size is 1") { + val records = NonEmptySeq.of(record1) + val commitPolicy = mock[BatchPolicy] + val newRecords = NonEmptySeq.of(record2) + + val ioAction = for { + commitContext <- Ref[IO].of(defaultContext) + recordsQueueRef <- Ref[IO].of(Queue.empty[RenderedRecord]) + offsetsRef <- Ref[IO].of(Map.empty[TopicPartition, Offset]) + recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, commitPolicy, 1, 10.seconds, offsetsRef) + _ <- recordsQueue.enqueueAll(records) + _ <- recordsQueue.dequeue(records) + _ <- recordsQueue.enqueueAll(newRecords) + queue <- recordsQueueRef.get + } yield queue.toList + + ioAction asserting { + queue => + queue should contain theSameElementsInOrderAs List(record2) + } + } }