Skip to content

Commit

Permalink
add broadcast through HTTP again
Browse files Browse the repository at this point in the history
  • Loading branch information
grdsdev committed Aug 3, 2024
1 parent 156cffc commit 1da76dd
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 27 deletions.
109 changes: 85 additions & 24 deletions Sources/Realtime/RealtimeChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,42 +9,63 @@ import ConcurrencyExtras
import Foundation
import Helpers

#if canImport(FoundationNetworking)
import FoundationNetworking

extension HTTPURLResponse {
convenience init() {
self.init(
url: URL(string: "http://127.0.0.1")!,
statusCode: 200,
httpVersion: nil,
headerFields: nil
)!
}
}
#endif

@available(*, deprecated, renamed: "RealtimeChannel")
public typealias RealtimeChannelV2 = RealtimeChannel

public struct RealtimeChannelConfig: Sendable {
public var broadcast: BroadcastJoinConfig
public var presence: PresenceJoinConfig
public var isPrivate: Bool
}

struct Socket: Sendable {
var broadcastURL: @Sendable () -> URL
var status: @Sendable () -> RealtimeClient.Status
var options: @Sendable () -> RealtimeClientOptions
var accessToken: @Sendable () -> String?
var apiKey: @Sendable () -> String?
var makeRef: @Sendable () -> Int

var connect: @Sendable () async -> Void
var addChannel: @Sendable (_ channel: RealtimeChannel) -> Void
var removeChannel: @Sendable (_ channel: RealtimeChannel) async -> Void
var push: @Sendable (_ message: RealtimeMessage) async -> Void
var httpSend: @Sendable (_ request: HTTPRequest) async throws -> HTTPResponse
}

extension Socket {
init(client: RealtimeClient) {
self.init(
broadcastURL: { [weak client] in client?.broadcastURL ?? URL(string: "http://localhost")! },
status: { [weak client] in client?.status ?? .disconnected },
options: { [weak client] in client?.options ?? .init() },
accessToken: { [weak client] in client?.mutableState.accessToken },
apiKey: { [weak client] in client?.apikey },
makeRef: { [weak client] in client?.makeRef() ?? 0 },
connect: { [weak client] in await client?.connect() },
addChannel: { [weak client] in client?.addChannel($0) },
removeChannel: { [weak client] in await client?.removeChannel($0) },
push: { [weak client] in await client?.push($0) }
push: { [weak client] in await client?.push($0) },
httpSend: { [weak client] in try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse()) }
)
}
}

@available(*, deprecated, renamed: "RealtimeChannel")
public typealias RealtimeChannelV2 = RealtimeChannel

public final class RealtimeChannel: Sendable {
public typealias Subscription = ObservationToken

Expand Down Expand Up @@ -76,6 +97,10 @@ public final class RealtimeChannel: Sendable {
set { statusEventEmitter.emit(newValue) }
}

public var statusChange: AsyncStream<Status> {
statusEventEmitter.stream()
}

/// Listen for connection status changes.
/// - Parameter listener: Closure that will be called when connection status changes.
/// - Returns: An observation handle that can be used to stop listening.
Expand All @@ -87,10 +112,6 @@ public final class RealtimeChannel: Sendable {
statusEventEmitter.attach(listener)
}

public var statusChange: AsyncStream<Status> {
statusEventEmitter.stream()
}

init(
topic: String,
config: RealtimeChannelConfig,
Expand Down Expand Up @@ -205,24 +226,64 @@ public final class RealtimeChannel: Sendable {
/// - event: Broadcast message event.
/// - message: Message payload.
public func broadcast(event: String, message: JSONObject) async {
assert(
status == .subscribed,
"You can only broadcast after subscribing to the channel. Did you forget to call `channel.subscribe()`?"
)
if status != .subscribed {
struct Message: Encodable {
let topic: String
let event: String
let payload: JSONObject
let `private`: Bool
}

await push(
RealtimeMessage(
joinRef: mutableState.joinRef,
ref: socket.makeRef().description,
topic: topic,
event: ChannelEvent.broadcast,
payload: [
"type": "broadcast",
"event": .string(event),
"payload": .object(message),
]
var headers = HTTPHeaders(["content-type": "application/json"])
if let apiKey = socket.apiKey() {
headers["apikey"] = apiKey
}
if let accessToken = socket.accessToken() {
headers["authorization"] = "Bearer \(accessToken)"
}

let task = Task { [headers] in
_ = try? await socket.httpSend(
HTTPRequest(
url: socket.broadcastURL(),
method: .post,
headers: headers,
body: JSONEncoder().encode(
[
"messages": [
Message(
topic: topic,
event: event,
payload: message,
private: config.isPrivate
),
],
]
)
)
)
}

if config.broadcast.acknowledgeBroadcasts {
try? await withTimeout(interval: socket.options().timeoutInterval) {
await task.value
}
}
} else {
await push(
RealtimeMessage(
joinRef: mutableState.joinRef,
ref: socket.makeRef().description,
topic: topic,
event: ChannelEvent.broadcast,
payload: [
"type": "broadcast",
"event": .string(event),
"payload": .object(message),
]
)
)
)
}
}

public func track(_ state: some Codable) async throws {
Expand Down
21 changes: 19 additions & 2 deletions Sources/Realtime/RealtimeClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public final class RealtimeClient: Sendable {
let options: RealtimeClientOptions
let ws: any WebSocketClient
let mutableState = LockIsolated(MutableState())
let http: any HTTPClientType
let apikey: String?

public var subscriptions: [String: RealtimeChannel] {
Expand Down Expand Up @@ -131,6 +132,12 @@ public final class RealtimeClient: Sendable {
}

public convenience init(url: URL, options: RealtimeClientOptions) {
var interceptors: [any HTTPClientInterceptor] = []

if let logger = options.logger {
interceptors.append(LoggerInterceptor(logger: logger))
}

self.init(
url: url,
options: options,
Expand All @@ -140,14 +147,24 @@ public final class RealtimeClient: Sendable {
apikey: options.apikey
),
options: options
),
http: HTTPClient(
fetch: options.fetch ?? { try await URLSession.shared.data(for: $0) },
interceptors: interceptors
)
)
}

init(url: URL, options: RealtimeClientOptions, ws: any WebSocketClient) {
init(
url: URL,
options: RealtimeClientOptions,
ws: any WebSocketClient,
http: any HTTPClientType
) {
self.url = url
self.options = options
self.ws = ws
self.http = http
apikey = options.apikey

mutableState.withValue {
Expand Down Expand Up @@ -474,7 +491,7 @@ public final class RealtimeClient: Sendable {
return url
}

private var broadcastURL: URL {
var broadcastURL: URL {
url.appendingPathComponent("api/broadcast")
}
}
2 changes: 1 addition & 1 deletion Sources/Realtime/WebSocketClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @

struct MutableState {
var continuation: AsyncStream<ConnectionStatus>.Continuation?
var connection: WebSocketConnection<RealtimeMessageV2, RealtimeMessageV2>?
var connection: WebSocketConnection<RealtimeMessage, RealtimeMessage>?
}

private let mutableState = LockIsolated(MutableState())
Expand Down

0 comments on commit 1da76dd

Please sign in to comment.