Skip to content

Commit

Permalink
test(realtime): fix MockWebSocketClient (#461)
Browse files Browse the repository at this point in the history
* test(realtime): use mainSerialExecutor

* fix MockWebSocketClient
  • Loading branch information
grdsdev committed Jul 12, 2024
1 parent 3c09d6e commit 0667b9f
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 200 deletions.
16 changes: 15 additions & 1 deletion Sources/Helpers/EventEmitter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import ConcurrencyExtras
import Foundation

public final class ObservationToken: Sendable {
public final class ObservationToken: Sendable, Hashable {
let _onCancel = LockIsolated((@Sendable () -> Void)?.none)

package init(_ onCancel: (@Sendable () -> Void)? = nil) {
Expand All @@ -34,6 +34,20 @@ public final class ObservationToken: Sendable {
deinit {
cancel()
}

public static func == (lhs: ObservationToken, rhs: ObservationToken) -> Bool {
ObjectIdentifier(lhs) == ObjectIdentifier(rhs)
}

public func hash(into hasher: inout Hasher) {
hasher.combine(ObjectIdentifier(self))
}
}

extension ObservationToken {
public func store(in set: inout Set<ObservationToken>) {
set.insert(self)
}
}

package final class EventEmitter<Event: Sendable>: Sendable {
Expand Down
11 changes: 11 additions & 0 deletions Sources/Realtime/V2/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ public final class RealtimeChannelV2: Sendable {
statusEventEmitter.stream()
}

/// Listen for connection status changes.
/// - Parameter listener: Closure that will be called when connection status changes.
/// - Returns: An observation handle that can be used to stop listening.
///
/// - Note: Use ``statusChange`` if you prefer to use Async/Await.
public func onStatusChange(
_ listener: @escaping @Sendable (Status) -> Void
) -> ObservationToken {
statusEventEmitter.attach(listener)
}

init(
topic: String,
config: RealtimeChannelConfig,
Expand Down
184 changes: 94 additions & 90 deletions Tests/IntegrationTests/RealtimeIntegrationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,126 +37,130 @@ final class RealtimeIntegrationTests: XCTestCase {
)

func testBroadcast() async throws {
let expectation = expectation(description: "receivedBroadcastMessages")
expectation.expectedFulfillmentCount = 3
try await withMainSerialExecutor {
let expectation = expectation(description: "receivedBroadcastMessages")
expectation.expectedFulfillmentCount = 3

let channel = realtime.channel("integration") {
$0.broadcast.receiveOwnBroadcasts = true
}
let channel = realtime.channel("integration") {
$0.broadcast.receiveOwnBroadcasts = true
}

let receivedMessages = LockIsolated<[JSONObject]>([])
let receivedMessages = LockIsolated<[JSONObject]>([])

Task {
for await message in channel.broadcastStream(event: "test") {
receivedMessages.withValue {
$0.append(message)
Task {
for await message in channel.broadcastStream(event: "test") {
receivedMessages.withValue {
$0.append(message)
}
expectation.fulfill()
}
expectation.fulfill()
}
}

await Task.megaYield()
await Task.yield()

await channel.subscribe()
await channel.subscribe()

struct Message: Codable {
var value: Int
}
struct Message: Codable {
var value: Int
}

try await channel.broadcast(event: "test", message: Message(value: 1))
try await channel.broadcast(event: "test", message: Message(value: 2))
try await channel.broadcast(event: "test", message: ["value": 3, "another_value": 42])
try await channel.broadcast(event: "test", message: Message(value: 1))
try await channel.broadcast(event: "test", message: Message(value: 2))
try await channel.broadcast(event: "test", message: ["value": 3, "another_value": 42])

await fulfillment(of: [expectation], timeout: 0.5)
await fulfillment(of: [expectation], timeout: 0.5)

XCTAssertNoDifference(
receivedMessages.value,
[
XCTAssertNoDifference(
receivedMessages.value,
[
"event": "test",
"payload": [
"value": 1,
[
"event": "test",
"payload": [
"value": 1,
],
"type": "broadcast",
],
"type": "broadcast",
],
[
"event": "test",
"payload": [
"value": 2,
[
"event": "test",
"payload": [
"value": 2,
],
"type": "broadcast",
],
"type": "broadcast",
],
[
"event": "test",
"payload": [
"value": 3,
"another_value": 42,
[
"event": "test",
"payload": [
"value": 3,
"another_value": 42,
],
"type": "broadcast",
],
"type": "broadcast",
],
]
)
]
)

await channel.unsubscribe()
await channel.unsubscribe()
}
}

func testPresence() async throws {
let channel = realtime.channel("integration") {
$0.broadcast.receiveOwnBroadcasts = true
}
try await withMainSerialExecutor {
let channel = realtime.channel("integration") {
$0.broadcast.receiveOwnBroadcasts = true
}

let expectation = expectation(description: "presenceChange")
expectation.expectedFulfillmentCount = 4
let expectation = expectation(description: "presenceChange")
expectation.expectedFulfillmentCount = 4

let receivedPresenceChanges = LockIsolated<[any PresenceAction]>([])
let receivedPresenceChanges = LockIsolated<[any PresenceAction]>([])

Task {
for await presence in channel.presenceChange() {
receivedPresenceChanges.withValue {
$0.append(presence)
Task {
for await presence in channel.presenceChange() {
receivedPresenceChanges.withValue {
$0.append(presence)
}
expectation.fulfill()
}
expectation.fulfill()
}
}

await Task.megaYield()

await channel.subscribe()
await Task.yield()

struct UserState: Codable, Equatable {
let email: String
}
await channel.subscribe()

try await channel.track(UserState(email: "[email protected]"))
try await channel.track(["email": "[email protected]"])

await channel.untrack()
struct UserState: Codable, Equatable {
let email: String
}

await fulfillment(of: [expectation], timeout: 0.5)
try await channel.track(UserState(email: "[email protected]"))
try await channel.track(["email": "[email protected]"])

let joins = try receivedPresenceChanges.value.map { try $0.decodeJoins(as: UserState.self) }
let leaves = try receivedPresenceChanges.value.map { try $0.decodeLeaves(as: UserState.self) }
XCTAssertNoDifference(
joins,
[
[], // This is the first PRESENCE_STATE event.
[UserState(email: "[email protected]")],
[UserState(email: "[email protected]")],
[],
]
)
await channel.untrack()

XCTAssertNoDifference(
leaves,
[
[], // This is the first PRESENCE_STATE event.
[],
[UserState(email: "[email protected]")],
[UserState(email: "[email protected]")],
]
)
await fulfillment(of: [expectation], timeout: 0.5)

await channel.unsubscribe()
let joins = try receivedPresenceChanges.value.map { try $0.decodeJoins(as: UserState.self) }
let leaves = try receivedPresenceChanges.value.map { try $0.decodeLeaves(as: UserState.self) }
XCTAssertNoDifference(
joins,
[
[], // This is the first PRESENCE_STATE event.
[UserState(email: "[email protected]")],
[UserState(email: "[email protected]")],
[],
]
)

XCTAssertNoDifference(
leaves,
[
[], // This is the first PRESENCE_STATE event.
[],
[UserState(email: "[email protected]")],
[UserState(email: "[email protected]")],
]
)

await channel.unsubscribe()
}
}

// FIXME: Test getting stuck
Expand All @@ -179,7 +183,7 @@ final class RealtimeIntegrationTests: XCTestCase {
// await channel.postgresChange(AnyAction.self, schema: "public").prefix(3).collect()
// }
//
// await Task.megaYield()
// await Task.yield()
// await channel.subscribe()
//
// struct Entry: Codable, Equatable {
Expand Down
69 changes: 54 additions & 15 deletions Tests/RealtimeTests/MockWebSocketClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,81 @@ import XCTestDynamicOverlay
#endif

final class MockWebSocketClient: WebSocketClient {
let sentMessages = LockIsolated<[RealtimeMessageV2]>([])
struct MutableState {
var receiveContinuation: AsyncThrowingStream<RealtimeMessageV2, any Error>.Continuation?
var sentMessages: [RealtimeMessageV2] = []
var onCallback: ((RealtimeMessageV2) -> RealtimeMessageV2?)?
var connectContinuation: AsyncStream<ConnectionStatus>.Continuation?

var sendMessageBuffer: [RealtimeMessageV2] = []
var connectionStatusBuffer: [ConnectionStatus] = []
}

private let mutableState = LockIsolated(MutableState())

var sentMessages: [RealtimeMessageV2] {
mutableState.sentMessages
}

func send(_ message: RealtimeMessageV2) async throws {
sentMessages.withValue {
$0.append(message)
}
mutableState.withValue {
$0.sentMessages.append(message)

if let callback = onCallback.value, let response = callback(message) {
mockReceive(response)
if let callback = $0.onCallback, let response = callback(message) {
mockReceive(response)
}
}
}

private let receiveContinuation =
LockIsolated<AsyncThrowingStream<RealtimeMessageV2, any Error>.Continuation?>(nil)
func mockReceive(_ message: RealtimeMessageV2) {
receiveContinuation.value?.yield(message)
mutableState.withValue {
if let continuation = $0.receiveContinuation {
continuation.yield(message)
} else {
$0.sendMessageBuffer.append(message)
}
}
}

private let onCallback = LockIsolated<((RealtimeMessageV2) -> RealtimeMessageV2?)?>(nil)
func on(_ callback: @escaping (RealtimeMessageV2) -> RealtimeMessageV2?) {
onCallback.setValue(callback)
mutableState.withValue {
$0.onCallback = callback
}
}

func receive() -> AsyncThrowingStream<RealtimeMessageV2, any Error> {
let (stream, continuation) = AsyncThrowingStream<RealtimeMessageV2, any Error>.makeStream()
receiveContinuation.setValue(continuation)
mutableState.withValue {
$0.receiveContinuation = continuation

while !$0.sendMessageBuffer.isEmpty {
let message = $0.sendMessageBuffer.removeFirst()
$0.receiveContinuation?.yield(message)
}
}
return stream
}

private let connectContinuation = LockIsolated<AsyncStream<ConnectionStatus>.Continuation?>(nil)
func mockConnect(_ status: ConnectionStatus) {
connectContinuation.value?.yield(status)
mutableState.withValue {
if let continuation = $0.connectContinuation {
continuation.yield(status)
} else {
$0.connectionStatusBuffer.append(status)
}
}
}

func connect() -> AsyncStream<ConnectionStatus> {
let (stream, continuation) = AsyncStream<ConnectionStatus>.makeStream()
connectContinuation.setValue(continuation)
mutableState.withValue {
$0.connectContinuation = continuation

while !$0.connectionStatusBuffer.isEmpty {
let status = $0.connectionStatusBuffer.removeFirst()
$0.connectContinuation?.yield(status)
}
}
return stream
}

Expand Down
Loading

0 comments on commit 0667b9f

Please sign in to comment.