Skip to content

Commit fd0d105

Browse files
committed
Change FlagsUpdateSubscriber.flow to emit strings
It seems more proper to keep usages of gRPC service definition classes in FlagSyncService.
1 parent b2849a0 commit fd0d105

File tree

5 files changed

+14
-18
lines changed

5 files changed

+14
-18
lines changed

src/main/kotlin/io/github/tobyhs/redisflagd/FlagSyncService.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import jakarta.inject.Singleton
1212
import kotlinx.coroutines.flow.Flow
1313
import kotlinx.coroutines.flow.emitAll
1414
import kotlinx.coroutines.flow.flow
15+
import kotlinx.coroutines.flow.map
1516

1617
/**
1718
* Implementation of flagd's FlagSyncService that uses Redis
@@ -27,7 +28,9 @@ class FlagSyncService(
2728
.setFlagConfiguration(flagsRepository.getFlagConfiguration())
2829
.build()
2930
emit(firstResult)
30-
emitAll(flagsUpdateSubscriber.flow)
31+
emitAll(flagsUpdateSubscriber.flow.map { flagConfiguration ->
32+
SyncFlagsResponse.newBuilder().setFlagConfiguration(flagConfiguration).build()
33+
})
3134
}
3235

3336
override suspend fun fetchAllFlags(request: FetchAllFlagsRequest): FetchAllFlagsResponse {
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
package io.github.tobyhs.redisflagd.data
22

3-
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse
43
import kotlinx.coroutines.flow.SharedFlow
54

65
/**
76
* Something that subscribes to flag configuration updates
87
*/
98
interface FlagsUpdateSubscriber {
10-
/** A [SharedFlow] that emits flag configuration updates */
11-
val flow: SharedFlow<SyncFlagsResponse>
9+
/** A [SharedFlow] that emits updated flag configurations */
10+
val flow: SharedFlow<String>
1211
}

src/main/kotlin/io/github/tobyhs/redisflagd/data/RedisFlagsUpdateSubscriber.kt

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.github.tobyhs.redisflagd.data
22

3-
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse
43
import io.github.tobyhs.redisflagd.di.AppCoroutineScope
54
import io.lettuce.core.RedisURI
65
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection
@@ -22,8 +21,8 @@ class RedisFlagsUpdateSubscriber(
2221
@Property(name = "redis.uri") private val redisUri: String,
2322
@AppCoroutineScope private val appScope: CoroutineScope,
2423
) : FlagsUpdateSubscriber {
25-
private val _flow = MutableSharedFlow<SyncFlagsResponse>()
26-
override val flow: SharedFlow<SyncFlagsResponse> = _flow
24+
private val _flow = MutableSharedFlow<String>()
25+
override val flow: SharedFlow<String> = _flow
2726

2827
/**
2928
* Subscribes to the Redis keyspace notification channels for flag configuration updates
@@ -32,9 +31,7 @@ class RedisFlagsUpdateSubscriber(
3231
fun subscribe() {
3332
pubSubConnection.addListener {
3433
appScope.launch {
35-
val flagConfiguration = flagsRepository.refreshFlagConfiguration()
36-
val response = SyncFlagsResponse.newBuilder().setFlagConfiguration(flagConfiguration).build()
37-
_flow.emit(response)
34+
_flow.emit(flagsRepository.refreshFlagConfiguration())
3835
}
3936
}
4037
val db = RedisURI.create(redisUri).database

src/test/kotlin/io/github/tobyhs/redisflagd/FlagSyncServiceTest.kt

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import io.kotest.common.ExperimentalKotest
1010
import io.kotest.core.spec.style.DescribeSpec
1111
import io.kotest.core.test.testCoroutineScheduler
1212
import io.kotest.matchers.collections.shouldHaveSize
13-
import io.kotest.matchers.shouldBe
1413
import io.mockk.coEvery
1514
import io.mockk.every
1615
import io.mockk.mockk
@@ -36,7 +35,7 @@ class FlagSyncServiceTest : DescribeSpec({
3635

3736
describe("syncFlags").config(coroutineTestScope = true) {
3837
it("returns a flow that emits flag updates") {
39-
val updateFlow = MutableSharedFlow<SyncFlagsResponse>()
38+
val updateFlow = MutableSharedFlow<String>()
4039
every { flagsUpdateSubscriber.flow } returns updateFlow
4140
val responses = mutableListOf<SyncFlagsResponse>()
4241
val flowCollectJob = launch {
@@ -50,13 +49,12 @@ class FlagSyncServiceTest : DescribeSpec({
5049
"myflag": {"state": "ENABLED", "variants": {"on": true, "off": false}, "defaultVariant": "on"}
5150
}
5251
}""".trimIndent()
53-
val nextResponse = SyncFlagsResponse.newBuilder().setFlagConfiguration(nextFlagConfiguration).build()
54-
updateFlow.emit(nextResponse)
52+
updateFlow.emit(nextFlagConfiguration)
5553
testCoroutineScheduler.advanceUntilIdle()
5654

5755
responses.shouldHaveSize(2)
5856
responses[0].flagConfiguration.shouldEqualJson(initialFlagConfiguration)
59-
responses[1].shouldBe(nextResponse)
57+
responses[1].flagConfiguration.shouldEqualJson(nextFlagConfiguration)
6058
flowCollectJob.cancel()
6159
}
6260
}

src/test/kotlin/io/github/tobyhs/redisflagd/data/RedisFlagsUpdateSubscriberTest.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.github.tobyhs.redisflagd.data
22

3-
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse
43
import io.kotest.common.ExperimentalKotest
54
import io.kotest.core.spec.style.DescribeSpec
65
import io.kotest.core.test.testCoroutineScheduler
@@ -44,7 +43,7 @@ class RedisFlagsUpdateSubscriberTest : DescribeSpec({
4443
coEvery { flagsRepository.refreshFlagConfiguration() } returns flagConfiguration
4544

4645
subscriber = RedisFlagsUpdateSubscriber(flagsRepository, pubSubConnection, redisUri, this)
47-
val responses = mutableListOf<SyncFlagsResponse>()
46+
val responses = mutableListOf<String>()
4847
val flowCollectJob = launch {
4948
subscriber.flow.collect { r -> responses.add(r) }
5049
}
@@ -57,7 +56,7 @@ class RedisFlagsUpdateSubscriberTest : DescribeSpec({
5756

5857
responses.shouldHaveSize(1)
5958
val response = responses.first()
60-
response.flagConfiguration.shouldBe(flagConfiguration)
59+
response.shouldBe(flagConfiguration)
6160
flowCollectJob.cancel()
6261
}
6362
}

0 commit comments

Comments
 (0)