Skip to content

Commit

Permalink
basic test case
Browse files Browse the repository at this point in the history
  • Loading branch information
pjfanning committed Feb 8, 2025
1 parent cdc05f4 commit 038035c
Showing 1 changed file with 28 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.pekko.persistence.testkit.query

import scala.collection.immutable.Seq
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.Done
Expand All @@ -27,15 +25,20 @@ import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorRef
import pekko.persistence.query.EventEnvelope
import pekko.persistence.query.PersistenceQuery
import pekko.persistence.query.NoOffset
import pekko.persistence.testkit.PersistenceTestKitPlugin
import pekko.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import pekko.persistence.testkit.query.javadsl
import pekko.persistence.testkit.query.scaladsl
import pekko.persistence.typed.PersistenceId
import pekko.persistence.typed.scaladsl.Effect
import pekko.persistence.typed.scaladsl.EventSourcedBehavior
import pekko.stream.testkit.TestSubscriber
import pekko.stream.testkit.scaladsl.TestSink
import org.scalatest.wordspec.AnyWordSpecLike

import scala.collection.immutable.Seq
import scala.concurrent.duration._

object EventsByTagSpec {
val config = PersistenceTestKitPlugin.config.withFallback(
ConfigFactory.parseString("""
Expand Down Expand Up @@ -74,8 +77,15 @@ class EventsByTagSpec

implicit val classic: pekko.actor.ActorSystem = system.classicSystem

val queries =
PersistenceQuery(system).readJournalFor[PersistenceTestKitReadJournal](PersistenceTestKitReadJournal.Identifier)
private val persistenceQuery = PersistenceQuery(system)

private val queries =
persistenceQuery.readJournalFor[scaladsl.PersistenceTestKitReadJournal](scaladsl.PersistenceTestKitReadJournal.Identifier)

private val queriesJava =
persistenceQuery.getReadJournalFor(
classOf[javadsl.PersistenceTestKitReadJournal],
javadsl.PersistenceTestKitReadJournal.Identifier)

def setup(persistenceId: String, tags: Set[String]): ActorRef[Command] = {
val probe = createTestProbe[Done]()
Expand Down Expand Up @@ -115,6 +125,19 @@ class EventsByTagSpec
probe.expectNext("c-4")
}

"find new events (Java DSL)" in {
val ackProbe = createTestProbe[Done]()
val tag = "c-tag"
val ref = setup("c", Set(tag))
val src = queriesJava.eventsByTag(tag, NoOffset).asScala
val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(5).expectNext("c-1", "c-2", "c-3")

ref ! Command("c-4", ackProbe.ref)
ackProbe.expectMessage(Done)

probe.expectNext("c-4")
}

"find new events after batched setup" in {
val ackProbe = createTestProbe[Done]()
val tag = "d-tag"
Expand Down

0 comments on commit 038035c

Please sign in to comment.