Skip to content

Commit

Permalink
test(realtime): add tests for Message and RealtimeClient
Browse files Browse the repository at this point in the history
  • Loading branch information
grdsdev committed Nov 23, 2023
1 parent afd835b commit 43aa423
Show file tree
Hide file tree
Showing 8 changed files with 321 additions and 231 deletions.
51 changes: 34 additions & 17 deletions Sources/Realtime/Message.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import Foundation

/// Data that is received from the Server.
public struct Message: Sendable {
public struct Message: Sendable, Hashable {
/// Reference number. Empty if missing
public let ref: String

Expand All @@ -40,17 +40,15 @@ public struct Message: Sendable {

/// Message payload
public var payload: Payload {
guard let response = rawPayload["response"] as? Payload
else { return rawPayload }
return response
rawPayload["response"]?.objectValue ?? rawPayload
}

/// Convenience accessor. Equivalent to getting the status as such:
/// ```swift
/// message.payload["status"]
/// ```
public var status: PushStatus? {
(rawPayload["status"] as? String).flatMap(PushStatus.init(rawValue:))
rawPayload["status"]?.stringValue.flatMap(PushStatus.init(rawValue:))
}

init(
Expand All @@ -66,21 +64,40 @@ public struct Message: Sendable {
rawPayload = payload
self.joinRef = joinRef
}
}

extension Message: Decodable {
public init(from decoder: Decoder) throws {
var container = try decoder.unkeyedContainer()

let joinRef = try container.decodeIfPresent(String.self)
let ref = try container.decodeIfPresent(String.self)
let topic = try container.decode(String.self)
let event = try container.decode(String.self)
let payload = try container.decode(Payload.self)
self.init(
ref: ref ?? "",
topic: topic,
event: event,
payload: payload,
joinRef: joinRef
)
}
}

init?(json: [Any?]) {
guard json.count > 4 else { return nil }
joinRef = json[0] as? String
ref = json[1] as? String ?? ""
extension Message: Encodable {
public func encode(to encoder: Encoder) throws {
var container = encoder.unkeyedContainer()

if let topic = json[2] as? String,
let event = json[3] as? String,
let payload = json[4] as? Payload
{
self.topic = topic
self.event = event
rawPayload = payload
if let joinRef {
try container.encode(joinRef)
} else {
return nil
try container.encodeNil()
}

try container.encode(ref)
try container.encode(topic)
try container.encode(event)
try container.encode(payload)
}
}
9 changes: 5 additions & 4 deletions Sources/Realtime/PhoenixTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public protocol PhoenixTransportDelegate {
- Parameter message: Message received from the server
*/
func onMessage(message: String)
func onMessage(message: Data)

/**
Notified when the `Transport` closes.
Expand Down Expand Up @@ -276,10 +276,11 @@ open class URLSessionTransport: NSObject, PhoenixTransport, URLSessionWebSocketD
switch result {
case let .success(message):
switch message {
case .data:
print("Data received. This method is unsupported by the Client")
case let .data(data):
self?.delegate?.onMessage(message: data)
case let .string(text):
self?.delegate?.onMessage(message: text)
let data = Data(text.utf8)
self?.delegate?.onMessage(message: data)
default:
fatalError("Unknown result was received. [\(result)]")
}
Expand Down
14 changes: 8 additions & 6 deletions Sources/Realtime/Push.swift
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,13 @@ public class Push {
startTimeout()
sent = true
channel?.socket?.push(
topic: channel?.topic ?? "",
event: event,
payload: payload,
ref: ref,
joinRef: channel?.joinRef
message: Message(
ref: ref ?? "",
topic: channel?.topic ?? "",
event: event,
payload: payload,
joinRef: channel?.joinRef
)
)
}

Expand Down Expand Up @@ -222,7 +224,7 @@ public class Push {
guard let refEvent else { return }

var mutPayload = payload
mutPayload["status"] = status.rawValue
mutPayload["status"] = .string(status.rawValue)

channel?.trigger(event: refEvent, payload: mutPayload)
}
Expand Down
64 changes: 35 additions & 29 deletions Sources/Realtime/RealtimeChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ public struct RealtimeChannelOptions {
}

/// Parameters used to configure the channel
var params: [String: [String: Any]] {
var params: [String: AnyJSON] {
[
"config": [
"presence": [
"key": presenceKey ?? "",
"key": .string(presenceKey ?? ""),
],
"broadcast": [
"ack": broadcastAcknowledge,
"self": broadcastSelf,
"ack": .bool(broadcastAcknowledge),
"self": .bool(broadcastSelf),
],
],
]
Expand Down Expand Up @@ -185,7 +185,7 @@ public class RealtimeChannel {
/// - parameter topic: Topic of the RealtimeChannel
/// - parameter params: Optional. Parameters to send when joining.
/// - parameter socket: Socket that the channel is a part of
init(topic: String, params: [String: Any] = [:], socket: RealtimeClient) {
init(topic: String, params: [String: AnyJSON] = [:], socket: RealtimeClient) {
state = ChannelState.closed
self.topic = topic
subTopic = topic.replacingOccurrences(of: "realtime:", with: "")
Expand Down Expand Up @@ -381,22 +381,27 @@ public class RealtimeChannel {
self.timeout = safeTimeout
}

let broadcast = (params["config"] as? [String: any Sendable])?["broadcast"]
let presence = (params["config"] as? [String: any Sendable])?["presence"]
let broadcast = params["config"]?.objectValue?["broadcast"]
let presence = params["config"]?.objectValue?["presence"]

var accessTokenPayload: Payload = [:]

var config: Payload = [
"postgres_changes": bindings["postgres_changes"]?.map(\.filter) ?? [],
"postgres_changes": .array(
(bindings["postgres_changes"]?.map(\.filter) ?? []).map { filter in
AnyJSON.object(filter.mapValues(AnyJSON.string))
}
),
]

config["broadcast"] = broadcast
config["presence"] = presence

if let accessToken = socket?.accessToken {
accessTokenPayload["access_token"] = accessToken
accessTokenPayload["access_token"] = .string(accessToken)
}

params["config"] = config
params["config"] = .object(config)

joinedOnce = true
rejoin()
Expand All @@ -411,8 +416,8 @@ public class RealtimeChannel {
self.socket?.setAuth(self.socket?.accessToken)
}

guard let serverPostgresFilters = message
.payload["postgres_changes"] as? [[String: any Sendable]]
guard let serverPostgresFilters = message.payload["postgres_changes"]?.arrayValue?
.compactMap(\.objectValue)
else {
callback?(.subscribed, nil)
return
Expand All @@ -432,17 +437,17 @@ public class RealtimeChannel {

let serverPostgresFilter = serverPostgresFilters[i]

if serverPostgresFilter["event"] as? String == event,
serverPostgresFilter["schema"] as? String == schema,
serverPostgresFilter["table"] as? String == table,
serverPostgresFilter["filter"] as? String == filter
if serverPostgresFilter["event"]?.stringValue == event,
serverPostgresFilter["schema"]?.stringValue == schema,
serverPostgresFilter["table"]?.stringValue == table,
serverPostgresFilter["filter"]?.stringValue == filter
{
newPostgresBindings.append(
Binding(
type: clientPostgresBinding.type,
filter: clientPostgresBinding.filter,
callback: clientPostgresBinding.callback,
id: (serverPostgresFilter["id"] as? Int).flatMap(String.init)
id: serverPostgresFilter["id"]?.numberValue.map { Int($0) }.flatMap(String.init)
)
)
} else {
Expand Down Expand Up @@ -481,7 +486,7 @@ public class RealtimeChannel {
type: .presence,
payload: [
"event": "track",
"payload": payload,
"payload": .object(payload),
],
opts: opts
)
Expand Down Expand Up @@ -643,9 +648,9 @@ public class RealtimeChannel {
opts: Payload = [:]
) async -> ChannelResponse {
var payload = payload
payload["type"] = type.rawValue
payload["type"] = .string(type.rawValue)
if let event {
payload["event"] = event
payload["event"] = .string(event)
}

if !canPush, type == .broadcast {
Expand Down Expand Up @@ -681,14 +686,14 @@ public class RealtimeChannel {
return await withCheckedContinuation { continuation in
let push = self.push(
type.rawValue, payload: payload,
timeout: (opts["timeout"] as? TimeInterval) ?? self.timeout
timeout: opts["timeout"]?.numberValue ?? self.timeout
)

if let type = payload["type"] as? String, type == "broadcast",
let config = self.params["config"] as? [String: any Sendable],
let broadcast = config["broadcast"] as? [String: any Sendable]
if let type = payload["type"]?.stringValue, type == "broadcast",
let config = self.params["config"]?.objectValue,
let broadcast = config["broadcast"]?.objectValue
{
let ack = broadcast["ack"] as? Bool
let ack = broadcast["ack"]?.boolValue
if ack == nil || ack == false {
continuation.resume(returning: .ok)
return
Expand Down Expand Up @@ -848,13 +853,14 @@ public class RealtimeChannel {
let bindEvent = bind.filter["event"]?.lowercased()

if let bindId = bind.id.flatMap(Int.init) {
let ids = message.payload["ids"] as? [Int] ?? []
let data = message.payload["data"] as? [String: any Sendable] ?? [:]
let type = data["type"] as? String
let ids = (message.payload["ids"]?.arrayValue ?? []).compactMap(\.numberValue)
.map(Int.init)
let data = message.payload["data"]?.objectValue ?? [:]
let type = data["type"]?.stringValue
return ids.contains(bindId) && (bindEvent == "*" || bindEvent == type?.lowercased())
}

let messageEvent = message.payload["event"] as? String
let messageEvent = message.payload["event"]?.stringValue
return bindEvent == "*" || bindEvent == messageEvent?.lowercased()
}

Expand Down
Loading

0 comments on commit 43aa423

Please sign in to comment.