Skip to content

Commit 43fd795

Browse files
authored
Fix issue where user-supplied enrichments were lost during the startup phase (#260)
* make enrichment closure a property of event * bug fix * add unit tests * fix unit tests * fix unit tests * fix unit tests * fix unit tests * fix unit tests * fix unit tests * fix unit tests * fix unit tests * fix unit tests
1 parent 347333c commit 43fd795

File tree

6 files changed

+193
-15
lines changed

6 files changed

+193
-15
lines changed

core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -509,13 +509,13 @@ open class Analytics protected constructor(
509509
fun process(event: BaseEvent, enrichment: EnrichmentClosure? = null) {
510510
if (!enabled) return
511511

512-
event.applyBaseData()
512+
event.applyBaseData(enrichment)
513513

514514
log("applying base attributes on ${Thread.currentThread().name}")
515515
analyticsScope.launch(analyticsDispatcher) {
516516
event.applyBaseEventData(store)
517517
log("processing event on ${Thread.currentThread().name}")
518-
timeline.process(event, enrichment)
518+
timeline.process(event)
519519
}
520520
}
521521

core/src/main/java/com/segment/analytics/kotlin/core/Events.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ typealias AnalyticsContext = JsonObject
1212
typealias Integrations = JsonObject
1313
typealias Properties = JsonObject
1414
typealias Traits = JsonObject
15+
typealias EnrichmentClosure = (event: BaseEvent?) -> BaseEvent?
1516

1617
val emptyJsonObject = JsonObject(emptyMap())
1718
val emptyJsonArray = JsonArray(emptyList())
@@ -75,11 +76,14 @@ sealed class BaseEvent {
7576

7677
abstract var _metadata: DestinationMetadata
7778

79+
var enrichment: EnrichmentClosure? = null
80+
7881
companion object {
7982
internal const val ALL_INTEGRATIONS_KEY = "All"
8083
}
8184

82-
internal fun applyBaseData() {
85+
internal fun applyBaseData(enrichment: EnrichmentClosure?) {
86+
this.enrichment = enrichment
8387
this.timestamp = SegmentInstant.now()
8488
this.context = emptyJsonObject
8589
this.messageId = UUID.randomUUID().toString()
@@ -119,6 +123,7 @@ sealed class BaseEvent {
119123
integrations = original.integrations
120124
userId = original.userId
121125
_metadata = original._metadata
126+
enrichment = original.enrichment
122127
}
123128
@Suppress("UNCHECKED_CAST")
124129
return copy as T // This is ok because resultant type will be same as input type

core/src/main/java/com/segment/analytics/kotlin/core/platform/Timeline.kt

+11-10
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ internal class Timeline {
2424
lateinit var analytics: Analytics
2525

2626
// initiate the event's lifecycle
27-
fun process(incomingEvent: BaseEvent, enrichmentClosure: EnrichmentClosure? = null): BaseEvent? {
27+
fun process(incomingEvent: BaseEvent): BaseEvent? {
2828
val beforeResult = applyPlugins(Plugin.Type.Before, incomingEvent)
2929
var enrichmentResult = applyPlugins(Plugin.Type.Enrichment, beforeResult)
30-
enrichmentClosure?.let {
30+
enrichmentResult?.enrichment?.let {
3131
enrichmentResult = it(enrichmentResult)
3232
}
3333

@@ -82,14 +82,6 @@ internal class Timeline {
8282
it["message"] = "Exception executing plugin"
8383
}
8484
}
85-
Telemetry.increment(Telemetry.INTEGRATION_METRIC) {
86-
it["message"] = "added"
87-
if (plugin is DestinationPlugin && plugin.key != "") {
88-
it["plugin"] = "${plugin.type}-${plugin.key}"
89-
} else {
90-
it["plugin"] = "${plugin.type}-${plugin.javaClass}"
91-
}
92-
}
9385
plugins[plugin.type]?.add(plugin)
9486
with(analytics) {
9587
analyticsScope.launch(analyticsDispatcher) {
@@ -108,6 +100,15 @@ internal class Timeline {
108100
}
109101
}
110102
}
103+
104+
Telemetry.increment(Telemetry.INTEGRATION_METRIC) {
105+
it["message"] = "added"
106+
if (plugin is DestinationPlugin && plugin.key != "") {
107+
it["plugin"] = "${plugin.type}-${plugin.key}"
108+
} else {
109+
it["plugin"] = "${plugin.type}-${plugin.javaClass}"
110+
}
111+
}
111112
}
112113

113114
// Remove a registered plugin

core/src/main/java/com/segment/analytics/kotlin/core/platform/plugins/StartupQueue.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ class StartupQueue : Plugin, Subscriber {
7272
// after checking if the queue is empty so we only process if the event
7373
// if it is indeed not NULL.
7474
event?.let {
75-
analytics.process(it)
75+
analytics.process(it, it.enrichment)
7676
}
7777
}
7878
}

core/src/test/kotlin/com/segment/analytics/kotlin/core/AnalyticsTests.kt

+168-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import com.segment.analytics.kotlin.core.utilities.SegmentInstant
88
import com.segment.analytics.kotlin.core.utilities.getString
99
import com.segment.analytics.kotlin.core.utilities.putInContext
1010
import com.segment.analytics.kotlin.core.utilities.updateJsonObject
11+
import com.segment.analytics.kotlin.core.utilities.set
12+
import com.segment.analytics.kotlin.core.utils.StubAfterPlugin
1113
import com.segment.analytics.kotlin.core.utils.StubPlugin
1214
import com.segment.analytics.kotlin.core.utils.TestRunPlugin
1315
import com.segment.analytics.kotlin.core.utils.clearPersistentStorage
@@ -17,7 +19,6 @@ import io.mockk.*
1719
import kotlinx.coroutines.runBlocking
1820
import kotlinx.coroutines.test.TestScope
1921
import kotlinx.coroutines.test.UnconfinedTestDispatcher
20-
import kotlinx.coroutines.test.runBlockingTest
2122
import kotlinx.coroutines.test.runTest
2223
import kotlinx.serialization.json.buildJsonObject
2324
import kotlinx.serialization.json.jsonObject
@@ -34,6 +35,7 @@ import java.io.ByteArrayInputStream
3435
import java.net.HttpURLConnection
3536
import java.util.Date
3637
import java.util.UUID
38+
import java.util.concurrent.Semaphore
3739

3840
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
3941
class AnalyticsTests {
@@ -979,4 +981,169 @@ class AnalyticsTests {
979981
context = baseContext
980982
integrations = emptyJsonObject
981983
}
984+
}
985+
986+
@TestInstance(TestInstance.Lifecycle.PER_METHOD)
987+
class AsyncAnalyticsTests {
988+
private lateinit var analytics: Analytics
989+
990+
private lateinit var afterPlugin: StubAfterPlugin
991+
992+
private lateinit var httpSemaphore: Semaphore
993+
994+
private lateinit var assertSemaphore: Semaphore
995+
996+
private lateinit var actual: CapturingSlot<BaseEvent>
997+
998+
@BeforeEach
999+
fun setup() {
1000+
httpSemaphore = Semaphore(0)
1001+
assertSemaphore = Semaphore(0)
1002+
1003+
val settings = """
1004+
{"integrations":{"Segment.io":{"apiKey":"1vNgUqwJeCHmqgI9S1sOm9UHCyfYqbaQ"}},"plan":{},"edgeFunction":{}}
1005+
""".trimIndent()
1006+
mockkConstructor(HTTPClient::class)
1007+
val settingsStream = ByteArrayInputStream(
1008+
settings.toByteArray()
1009+
)
1010+
val httpConnection: HttpURLConnection = mockk()
1011+
val connection = object : Connection(httpConnection, settingsStream, null) {}
1012+
every { anyConstructed<HTTPClient>().settings("cdn-settings.segment.com/v1") } answers {
1013+
// suspend http calls until we tracked events
1014+
// this will force events get into startup queue
1015+
httpSemaphore.acquire()
1016+
connection
1017+
}
1018+
1019+
afterPlugin = spyk(StubAfterPlugin())
1020+
actual = slot<BaseEvent>()
1021+
every { afterPlugin.execute(capture(actual)) } answers {
1022+
val input = firstArg<BaseEvent?>()
1023+
// since this is an after plugin, when its execute function is called,
1024+
// it is guaranteed that the enrichment closure has been called.
1025+
// so we can release the semaphore on assertions.
1026+
assertSemaphore.release()
1027+
input
1028+
}
1029+
analytics = Analytics(Configuration(writeKey = "123", application = "Test"))
1030+
analytics.add(afterPlugin)
1031+
}
1032+
1033+
@Test
1034+
fun `startup queue should replay with track enrichment closure`() {
1035+
val expectedEvent = "foo"
1036+
val expectedAnonymousId = "bar"
1037+
1038+
analytics.track(expectedEvent) {
1039+
it?.anonymousId = expectedAnonymousId
1040+
it
1041+
}
1042+
1043+
// now we have tracked event, i.e. event added to startup queue
1044+
// release the semaphore put on http client, so we startup queue will replay the events
1045+
httpSemaphore.release()
1046+
// now we need to wait for events being fully replayed before making assertions
1047+
assertSemaphore.acquire()
1048+
1049+
assertTrue(actual.isCaptured)
1050+
actual.captured.let {
1051+
assertTrue(it is TrackEvent)
1052+
val e = it as TrackEvent
1053+
assertTrue(e.properties.isEmpty())
1054+
assertEquals(expectedEvent, e.event)
1055+
assertEquals(expectedAnonymousId, e.anonymousId)
1056+
}
1057+
}
1058+
1059+
@Disabled
1060+
@Test
1061+
fun `startup queue should replay with identify enrichment closure`() {
1062+
val expected = buildJsonObject {
1063+
put("foo", "baz")
1064+
}
1065+
val expectedUserId = "newUserId"
1066+
1067+
analytics.identify(expectedUserId) {
1068+
if (it is IdentifyEvent) {
1069+
it.traits = updateJsonObject(it.traits) {
1070+
it["foo"] = "baz"
1071+
}
1072+
}
1073+
it
1074+
}
1075+
1076+
// now we have tracked event, i.e. event added to startup queue
1077+
// release the semaphore put on http client, so we startup queue will replay the events
1078+
httpSemaphore.release()
1079+
// now we need to wait for events being fully replayed before making assertions
1080+
assertSemaphore.acquire()
1081+
1082+
val actualUserId = analytics.userId()
1083+
1084+
assertTrue(actual.isCaptured)
1085+
actual.captured.let {
1086+
assertTrue(it is IdentifyEvent)
1087+
val e = it as IdentifyEvent
1088+
assertEquals(expected, e.traits)
1089+
assertEquals(expectedUserId, actualUserId)
1090+
}
1091+
}
1092+
1093+
@Disabled
1094+
@Test
1095+
fun `startup queue should replay with group enrichment closure`() {
1096+
val expected = buildJsonObject {
1097+
put("foo", "baz")
1098+
}
1099+
val expectedGroupId = "foo"
1100+
1101+
analytics.group(expectedGroupId) {
1102+
if (it is GroupEvent) {
1103+
it.traits = updateJsonObject(it.traits) {
1104+
it["foo"] = "baz"
1105+
}
1106+
}
1107+
it
1108+
}
1109+
1110+
// now we have tracked event, i.e. event added to startup queue
1111+
// release the semaphore put on http client, so we startup queue will replay the events
1112+
httpSemaphore.release()
1113+
// now we need to wait for events being fully replayed before making assertions
1114+
assertSemaphore.acquire()
1115+
1116+
assertTrue(actual.isCaptured)
1117+
actual.captured.let {
1118+
assertTrue(it is GroupEvent)
1119+
val e = it as GroupEvent
1120+
assertEquals(expected, e.traits)
1121+
assertEquals(expectedGroupId, e.groupId)
1122+
}
1123+
}
1124+
1125+
@Disabled
1126+
@Test
1127+
fun `startup queue should replay with alias enrichment closure`() {
1128+
val expected = "bar"
1129+
1130+
analytics.alias(expected) {
1131+
it?.anonymousId = "test"
1132+
it
1133+
}
1134+
1135+
// now we have tracked event, i.e. event added to startup queue
1136+
// release the semaphore put on http client, so we startup queue will replay the events
1137+
httpSemaphore.release()
1138+
// now we need to wait for events being fully replayed before making assertions
1139+
assertSemaphore.acquire()
1140+
1141+
assertTrue(actual.isCaptured)
1142+
actual.captured.let {
1143+
assertTrue(it is AliasEvent)
1144+
val e = it as AliasEvent
1145+
assertEquals(expected, e.userId)
1146+
assertEquals("test", e.anonymousId)
1147+
}
1148+
}
9821149
}

core/src/test/kotlin/com/segment/analytics/kotlin/core/utils/Plugins.kt

+5
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,9 @@ class TestRunPlugin(var closure: (BaseEvent?) -> Unit): EventPlugin {
6868
open class StubPlugin : EventPlugin {
6969
override val type: Plugin.Type = Plugin.Type.Before
7070
override lateinit var analytics: Analytics
71+
}
72+
73+
open class StubAfterPlugin : EventPlugin {
74+
override val type: Plugin.Type = Plugin.Type.After
75+
override lateinit var analytics: Analytics
7176
}

0 commit comments

Comments
 (0)