diff --git a/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpSinkTest.scala b/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpSinkTest.scala index 3b1f65ea3..ff62f1e8c 100644 --- a/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpSinkTest.scala +++ b/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpSinkTest.scala @@ -132,45 +132,6 @@ class HttpSinkTest } } - /** - * Retries occur by default and the failed HTTP post will be retried twice before succeeding. - */ - test("failing scenario written to error reporter") { - - setUpWiremockFailureResponse() - - sendRecordsWithProducer( - stringProducer, - stringConverters, - randomTestId, - topic, - "My Static Content Template", - BatchSizeSingleRecord, - false, - 1, - 2, - "Record number 1", - ).asserting { - case (requests, successReporterRecords, failureReporterRecords) => - requests.size should be(3) - requests.map(_.getBody).map(new String(_)).toSet should contain only "My Static Content Template" - requests.map(_.getMethod).toSet should be(Set(RequestMethod.POST)) - - failureReporterRecords.size should be(2) - failureReporterRecords.foreach { - rec => - rec.topic() should be(failureTopicName) - rec.value() should be("My Static Content Template") - } - - successReporterRecords.size should be(1) - val successRecord = successReporterRecords.head - successRecord.topic() should be(successTopicName) - successRecord.value() should be("My Static Content Template") - - } - } - test("dynamic string template containing message content should be sent to endpoint") { setUpWiremockResponse() @@ -311,45 +272,6 @@ class HttpSinkTest () } - private def setUpWiremockFailureResponse(): Unit = { - WireMock.configureFor(container.getHost, container.getFirstMappedPort) - WireMock.resetAllScenarios() - WireMock.resetAllRequests() - WireMock.resetToDefault() - WireMock.reset() - - val url = s"/$randomTestId" - - stubFor( - post(urlEqualTo(url)) - .inScenario("failure") - .whenScenarioStateIs("STARTED") - .willSetStateTo("ONE ATTEMPT") - .willReturn(aResponse.withStatus(404).withHeader("Content-Type", "text/plain").withBody("File Not Found")), - ) - - stubFor( - post(urlEqualTo(url)) - .inScenario("failure") - .whenScenarioStateIs("ONE ATTEMPT") - .willSetStateTo("TWO ATTEMPTS") - .willReturn(aResponse.withStatus(404).withHeader("Content-Type", "text/plain").withBody("File Not Found")), - ) - - stubFor( - post(urlEqualTo(url)) - .inScenario("failure") - .whenScenarioStateIs("TWO ATTEMPTS") - .willReturn(aResponse.withHeader("Content-Type", "text/plain") - .withBody("Hello world!")), - ) - - WireMock.setScenarioState("failure", "STARTED") - - () - - } - def getBootstrapServers: String = s"PLAINTEXT://kafka:9092" private def sendRecordsWithProducer[K, V]( diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala index f5941ad44..8a4b2fd40 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala @@ -80,19 +80,21 @@ class HttpWriter( nonEmptyBatchInfo: NonEmptyBatchInfo, batch: NonEmptySeq[RenderedRecord], totalQueueSize: Int, - ) = + ): IO[Unit] = for { _ <- IO( logger.debug(s"[$sinkName] HttpWriter.process, batch of ${batch.length}, queue size: $totalQueueSize"), ) + // remove the batch from the queue before any of the operation + _ <- recordsQueue.dequeue(batch) _ <- IO.delay(logger.trace(s"[$sinkName] modifyCommitContext for batch of ${nonEmptyBatchInfo.batch.length}")) _ <- flush(nonEmptyBatchInfo.batch) updatedCommitContext = updateCommitContextPostCommit(nonEmptyBatchInfo.updatedCommitContext) _ <- IO.delay(logger.trace(s"[$sinkName] Updating sink context to: $updatedCommitContext")) _ <- commitContextRef.set(updatedCommitContext) - removedElements <- recordsQueue.dequeue(batch) - _ <- resetErrorsInCommitContext() - } yield removedElements + + _ <- resetErrorsInCommitContext() + } yield () def preCommit( initialOffsetAndMetaMap: Map[TopicPartition, OffsetAndMetadata], diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala index 965725ec2..8b46df3dd 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala @@ -26,6 +26,7 @@ import com.typesafe.scalalogging.LazyLogging import com.typesafe.scalalogging.StrictLogging import io.lenses.streamreactor.common.util.EitherUtils.unpackOrThrow import io.lenses.streamreactor.common.utils.CyclopsToScalaOption.convertToScalaOption +import io.lenses.streamreactor.connect.cloud.common.model.Offset import io.lenses.streamreactor.connect.cloud.common.model.Topic import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition import io.lenses.streamreactor.connect.http.sink.client.HttpRequestSender @@ -50,6 +51,7 @@ import java.net.http.HttpClient import java.time.Duration import scala.collection.immutable.Queue import scala.concurrent.duration.DurationInt +import scala.concurrent.duration.FiniteDuration /** * The `HttpWriterManager` object provides a factory method to create an instance of `HttpWriterManager`. @@ -123,6 +125,8 @@ object HttpWriterManager extends StrictLogging { config.tidyJson, config.errorReportingController, config.successReportingController, + config.maxQueueSize, + config.maxQueueOfferTimeout, ) } @@ -172,6 +176,8 @@ class HttpWriterManager( tidyJson: Boolean, errorReportingController: ReportingController[HttpFailureConnectorSpecificRecordData], successReportingController: ReportingController[HttpSuccessConnectorSpecificRecordData], + maxQueueSize: Int, + maxQueueOfferTimeout: FiniteDuration, )( implicit t: Temporal[IO], @@ -184,15 +190,21 @@ class HttpWriterManager( */ private def createNewHttpWriter(): IO[HttpWriter] = for { - batchPolicy <- IO.pure(batchPolicy) recordsQueueRef <- Ref.of[IO, Queue[RenderedRecord]](Queue.empty) commitContextRef <- Ref.of[IO, HttpCommitContext](HttpCommitContext.default(sinkName)) + offsetsRef <- Ref.of[IO, Map[TopicPartition, Offset]](Map.empty) } yield new HttpWriter( sinkName = sinkName, sender = httpRequestSender, template = template, recordsQueue = - new RecordsQueue(recordsQueueRef, commitContextRef, batchPolicy), + new RecordsQueue(recordsQueueRef, + commitContextRef, + batchPolicy, + maxQueueSize, + maxQueueOfferTimeout, + offsetsRef, + ), errorThreshold = errorThreshold, tidyJson = tidyJson, errorReporter = errorReportingController, diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/RecordsQueue.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/RecordsQueue.scala index 02429930e..e4f460f83 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/RecordsQueue.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/RecordsQueue.scala @@ -22,8 +22,14 @@ import io.lenses.streamreactor.connect.http.sink.RecordsQueueBatcher.takeBatch import io.lenses.streamreactor.connect.http.sink.commit.BatchPolicy import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitContext import io.lenses.streamreactor.connect.http.sink.tpl.RenderedRecord +import cats.implicits.toFoldableOps +import io.lenses.streamreactor.connect.cloud.common.model.Offset +import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition +import org.apache.kafka.connect.errors.RetriableException import scala.collection.immutable.Queue +import scala.concurrent.duration.DurationInt +import scala.concurrent.duration.FiniteDuration /** * The `RecordsQueue` class manages a queue of `RenderedRecord` objects and handles the logic for @@ -37,19 +43,85 @@ class RecordsQueue( val recordsQueue: Ref[IO, Queue[RenderedRecord]], commitContextRef: Ref[IO, HttpCommitContext], batchPolicy: BatchPolicy, + maxSize: Int, + offerTimeout: FiniteDuration, + offsetMapRef: Ref[IO, Map[TopicPartition, Offset]], ) extends LazyLogging { /** - * Enqueues a sequence of `RenderedRecord` objects into the queue. + * Enqueues a sequence of `RenderedRecord` objects into the queue, with a maximum size limit. + * If the queue is full, it retries adding the remaining records within the specified timeout. + * If after the timeout records remain, it throws a RetriableException. + * Also, it discards any records for which the offset was already queued. * * @param records The records to be enqueued. - * @return An `IO` action that enqueues the records. + * @return An `IO` action that enqueues the records or throws a RetriableException if the queue remains full. */ - def enqueueAll(records: NonEmptySeq[RenderedRecord]): IO[Unit] = + def enqueueAll(records: NonEmptySeq[RenderedRecord]): IO[Unit] = { + + // Filter out records with offsets that have already been processed + def filterDuplicates(records: List[RenderedRecord], offsetMap: Map[TopicPartition, Offset]): List[RenderedRecord] = + records.filter { record => + val tp = record.topicPartitionOffset.toTopicPartition + offsetMap.get(tp) match { + case Some(lastOffset) if record.topicPartitionOffset.offset.value <= lastOffset.value => + // Offset already processed, discard this record + false + case _ => + true + } + } + + def attemptEnqueue(remainingRecords: List[RenderedRecord], startTime: Long): IO[Unit] = + if (remainingRecords.isEmpty) { + IO.unit + } else { + for { + currentTime <- IO.realTime.map(_.toMillis) + elapsedTime = currentTime - startTime + _ <- if (elapsedTime >= offerTimeout.toMillis) { + IO.raiseError(new RetriableException("Enqueue timed out and records remain")) + } else { + for { + (recordsToAdd, recordsRemaining) <- recordsQueue.modify { queue => + val queueSize = queue.size + val spaceAvailable = maxSize - queueSize + val recordsToAdd = remainingRecords.take(spaceAvailable) + val recordsRemaining = remainingRecords.drop(spaceAvailable) + val newQueue = queue.enqueueAll(recordsToAdd) + (newQueue, (recordsToAdd, recordsRemaining)) + } + _ <- if (recordsToAdd.nonEmpty) { + // Update the offset map with the offsets of the records that were actually enqueued + offsetMapRef.update { offsetMap => + recordsToAdd.foldLeft(offsetMap) { (accOffsets, record) => + val tp = record.topicPartitionOffset.toTopicPartition + val offset = record.topicPartitionOffset.offset + // Only update if the new offset is greater + val updatedOffset: Offset = accOffsets.get(tp) match { + case Some(existingOffset) if existingOffset.value >= offset.value => existingOffset + case _ => offset + } + accOffsets.updated(tp, updatedOffset) + } + } + } else IO.unit + _ <- if (recordsRemaining.nonEmpty) { + IO.sleep(5.millis) *> + attemptEnqueue(recordsRemaining, startTime) + } else IO.unit + } yield () + } + } yield () + } + for { - _ <- IO.delay(logger.debug(s"${records.length} records added to $recordsQueue")) - _ <- recordsQueue.getAndUpdate(q => q ++ records.toSeq).void + offsetMap <- offsetMapRef.get + uniqueRecords = filterDuplicates(records.toList, offsetMap) + startTime <- IO.realTime.map(_.toMillis) + _ <- attemptEnqueue(uniqueRecords, startTime) } yield () + } /** * Takes a batch of records from the queue based on the commit policy. @@ -78,8 +150,9 @@ class RecordsQueue( def dequeue(nonEmptyBatch: NonEmptySeq[RenderedRecord]): IO[Unit] = recordsQueue.access.flatMap { case (records, updater) => + val lookup = nonEmptyBatch.toSeq.toSet for { - newQueue <- IO(records.dropWhile(nonEmptyBatch.toSeq.contains)) + newQueue <- IO(records.dropWhile(lookup.contains)) _ <- updater(newQueue) _ <- IO.delay(logger.debug("Queue before: {}, after: {}", records, newQueue)) } yield () diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala index 919ec7b93..9bba537a8 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala @@ -46,6 +46,7 @@ import java.net.MalformedURLException import java.net.URL import java.time.Clock import java.time.Duration +import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ import scala.util.Try @@ -101,6 +102,8 @@ case class HttpSinkConfig( tidyJson: Boolean, errorReportingController: ReportingController[HttpFailureConnectorSpecificRecordData], successReportingController: ReportingController[HttpSuccessConnectorSpecificRecordData], + maxQueueSize: Int, + maxQueueOfferTimeout: FiniteDuration, ) object HttpSinkConfig { @@ -159,6 +162,12 @@ object HttpSinkConfig { connectConfig, ), ) + + maxQueueSize = connectConfig.getInt(HttpSinkConfigDef.MaxQueueSizeProp) + maxQueueOfferTimeout = FiniteDuration( + connectConfig.getLong(HttpSinkConfigDef.MaxQueueOfferTimeoutProp), + scala.concurrent.duration.MILLISECONDS, + ) } yield HttpSinkConfig( method, endpoint, @@ -175,6 +184,8 @@ object HttpSinkConfig { jsonTidy, errorReportingController, successReportingController, + maxQueueSize, + maxQueueOfferTimeout, ) } diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala index ed3a6fe29..ca7872146 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala @@ -158,6 +158,20 @@ object HttpSinkConfigDef { |Literal to output in templates in place of a null payload. Values are `error` (raises an error), `empty` (empty string, eg ""), `null` (the literal 'null') or `custom` (a string of your choice, as defined by `$CustomNullPayloadHandler`). `Defaults to `error`. |""".stripMargin + val MaxQueueSizeProp: String = "connect.http.max.queue.size" + val MaxQueueSizeDoc: String = + """ + |The maximum number of records to queue per topic before blocking. If the queue limit is reached the connector will throw RetriableException and the connector settings to handle retries will be used. + |""".stripMargin + val MaxQueueSizeDefault = 1000000 + + val MaxQueueOfferTimeoutProp: String = "connect.http.max.queue.offer.timeout.ms" + val MaxQueueOfferTimeoutDoc: String = + """ + |The maximum time in milliseconds to wait for the queue to accept a record. If the queue does not accept the record within this time, the connector will throw RetriableException and the connector settings to handle retries will be used. + |""".stripMargin + val MaxQueueOfferTimeoutDefault = 120000 + val config: ConfigDef = { val configDef = new ConfigDef() .withClientSslSupport() @@ -292,6 +306,20 @@ object HttpSinkConfigDef { Importance.HIGH, CustomNullPayloadHandlerDoc, ) + .define( + MaxQueueSizeProp, + Type.INT, + MaxQueueSizeDefault, + Importance.HIGH, + MaxQueueSizeDoc, + ) + .define( + MaxQueueOfferTimeoutProp, + Type.LONG, + MaxQueueOfferTimeoutDefault, + Importance.HIGH, + MaxQueueOfferTimeoutDoc, + ) ReporterConfig.withErrorRecordReportingSupport(configDef) ReporterConfig.withSuccessRecordReportingSupport(configDef) OAuth2Config.append(configDef) diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/RecordsQueueTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/RecordsQueueTest.scala index 0398e14df..167fe8966 100644 --- a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/RecordsQueueTest.scala +++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/RecordsQueueTest.scala @@ -34,18 +34,21 @@ import cats.data.NonEmptySeq import cats.effect.IO import cats.effect.kernel.Ref import cats.effect.testing.scalatest.AsyncIOSpec +import io.lenses.streamreactor.connect.cloud.common.model.Offset import io.lenses.streamreactor.connect.cloud.common.model.Topic import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition import io.lenses.streamreactor.connect.http.sink.commit.BatchPolicy import io.lenses.streamreactor.connect.http.sink.commit.BatchResult import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitContext import io.lenses.streamreactor.connect.http.sink.tpl.RenderedRecord +import org.apache.kafka.connect.errors.RetriableException import org.mockito.ArgumentMatchers.any import org.mockito.MockitoSugar import org.scalatest.funsuite.AsyncFunSuiteLike import org.scalatest.matchers.should.Matchers import scala.collection.immutable.Queue +import scala.concurrent.duration.DurationInt class RecordsQueueTest extends AsyncFunSuiteLike with AsyncIOSpec with MockitoSugar with Matchers { @@ -59,13 +62,15 @@ class RecordsQueueTest extends AsyncFunSuiteLike with AsyncIOSpec with MockitoSu private val record1 = RenderedRecord(topicPartition.atOffset(100), timestamp, "record1", Seq.empty, testEndpoint) private val record2 = RenderedRecord(topicPartition.atOffset(101), timestamp, "record2", Seq.empty, testEndpoint) + private val record3 = RenderedRecord(topicPartition.atOffset(102), timestamp, "record3", Seq.empty, testEndpoint) test("enqueueAll should add all records to the queue") { { for { commitContext <- Ref[IO].of(defaultContext) recordsQueueRef <- Ref[IO].of(Queue.empty[RenderedRecord]) - recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, mock[BatchPolicy]) + offsetsRef <- Ref[IO].of(Map.empty[TopicPartition, Offset]) + recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, mock[BatchPolicy], 10000, 1.minute, offsetsRef) _ <- recordsQueue.enqueueAll(NonEmptySeq.of(record1, record2)) refValue <- recordsQueueRef.get } yield refValue @@ -87,7 +92,8 @@ class RecordsQueueTest extends AsyncFunSuiteLike with AsyncIOSpec with MockitoSu for { commitContext <- Ref[IO].of(defaultContext) recordsQueueRef <- Ref[IO].of(Queue(record1, record2)) - recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, commitPolicy) + offsetsRef <- Ref[IO].of(Map.empty[TopicPartition, Offset]) + recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, commitPolicy, 10000, 1.minute, offsetsRef) batchInfo <- recordsQueue.popBatch() } yield batchInfo } asserting { @@ -101,7 +107,8 @@ class RecordsQueueTest extends AsyncFunSuiteLike with AsyncIOSpec with MockitoSu for { commitContext <- Ref[IO].of(defaultContext) recordsQueueRef <- Ref[IO].of(Queue.empty[RenderedRecord]) - recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, mock[BatchPolicy]) + offsetsRef <- Ref[IO].of(Map.empty[TopicPartition, Offset]) + recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, mock[BatchPolicy], 10000, 1.minute, offsetsRef) batchInfo <- recordsQueue.popBatch() } yield batchInfo } asserting { @@ -117,7 +124,8 @@ class RecordsQueueTest extends AsyncFunSuiteLike with AsyncIOSpec with MockitoSu for { commitContext <- Ref[IO].of(defaultContext) recordsQueueRef <- Ref[IO].of(Queue(record1, record2)) - recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, mock[BatchPolicy]) + offsetsRef <- Ref[IO].of(Map.empty[TopicPartition, Offset]) + recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, mock[BatchPolicy], 10000, 1.minute, offsetsRef) _ <- recordsQueue.dequeue(records) refValue <- recordsQueueRef.get } yield refValue @@ -134,7 +142,8 @@ class RecordsQueueTest extends AsyncFunSuiteLike with AsyncIOSpec with MockitoSu for { commitContext <- Ref[IO].of(defaultContext) recordsQueueRef <- Ref[IO].of(Queue(record1)) - recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, mock[BatchPolicy]) + offsetsRef <- Ref[IO].of(Map.empty[TopicPartition, Offset]) + recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, mock[BatchPolicy], 10000, 1.minute, offsetsRef) _ <- recordsQueue.dequeue(records) refValue <- recordsQueueRef.get } yield refValue @@ -143,4 +152,70 @@ class RecordsQueueTest extends AsyncFunSuiteLike with AsyncIOSpec with MockitoSu backingQueue should contain theSameElementsInOrderAs Seq(record1) } } + + test("enqueue all should throw RetriableException if the queue size is exceeded") { + val records = NonEmptySeq.of(record1, record2) + val commitPolicy = mock[BatchPolicy] + + val ioAction = for { + commitContext <- Ref[IO].of(defaultContext) + recordsQueueRef <- Ref[IO].of(Queue.empty[RenderedRecord]) + offsetsRef <- Ref[IO].of(Map.empty[TopicPartition, Offset]) + recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, commitPolicy, 1, 2.seconds, offsetsRef) + _ <- recordsQueue.enqueueAll(records) + } yield () + + ioAction.attempt.map { + case Left(e: RetriableException) => + e.getMessage should be("Enqueue timed out and records remain") + case Left(e) => + fail(s"Expected RetriableException but got ${e}") + case Right(_) => + fail("Expected RetriableException but enqueueAll succeeded") + } + + } + + test(" does not enqueue a record which was enqueued before") { + val records = NonEmptySeq.of(record1, record2) + val commitPolicy = mock[BatchPolicy] + val newRecords = NonEmptySeq.of(record2, record3) + + val ioAction = for { + commitContext <- Ref[IO].of(defaultContext) + recordsQueueRef <- Ref[IO].of(Queue.empty[RenderedRecord]) + offsetsRef <- Ref[IO].of(Map.empty[TopicPartition, Offset]) + recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, commitPolicy, 10000, 1.minute, offsetsRef) + _ <- recordsQueue.enqueueAll(records) + _ <- recordsQueue.enqueueAll(newRecords) + queue <- recordsQueueRef.get + } yield queue.toList + + ioAction asserting { + queue => + queue should contain theSameElementsInOrderAs List(record1, record2, record3) + } + } + + test("enqueue when the queue size is 1") { + val records = NonEmptySeq.of(record1) + val commitPolicy = mock[BatchPolicy] + val newRecords = NonEmptySeq.of(record2) + + val ioAction = for { + commitContext <- Ref[IO].of(defaultContext) + recordsQueueRef <- Ref[IO].of(Queue.empty[RenderedRecord]) + offsetsRef <- Ref[IO].of(Map.empty[TopicPartition, Offset]) + recordsQueue = new RecordsQueue(recordsQueueRef, commitContext, commitPolicy, 1, 10.seconds, offsetsRef) + _ <- recordsQueue.enqueueAll(records) + _ <- recordsQueue.dequeue(records) + _ <- recordsQueue.enqueueAll(newRecords) + queue <- recordsQueueRef.get + } yield queue.toList + + ioAction asserting { + queue => + queue should contain theSameElementsInOrderAs List(record2) + } + } }