Skip to content

Commit

Permalink
Fix example
Browse files Browse the repository at this point in the history
  • Loading branch information
grdsdev committed Nov 28, 2023
1 parent b76d1b0 commit 253e019
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 107 deletions.
168 changes: 100 additions & 68 deletions Examples/RealtimeSample/ContentView.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,107 @@
import Realtime
import SwiftUI

struct ContentView: View {
@State var inserts: [Message] = []
@State var updates: [Message] = []
@State var deletes: [Message] = []
@MainActor
final class ViewModel: ObservableObject {
@Published var inserts: [Message] = []
@Published var updates: [Message] = []
@Published var deletes: [Message] = []

@Published var socketStatus: String?
@Published var channelStatus: String?

@Published var publicSchema: RealtimeChannel?
@Published var isJoined: Bool = false

func createSubscription() async {
await supabase.realtime.connect()

publicSchema = await supabase.realtime.channel("public")
.on(
"postgres_changes",
filter: ChannelFilter(event: "INSERT", schema: "public")
) { [weak self] message in
await MainActor.run { [weak self] in
self?.inserts.append(message)
}
}
.on(
"postgres_changes",
filter: ChannelFilter(event: "UPDATE", schema: "public")
) { [weak self] message in
await MainActor.run { [weak self] in
self?.updates.append(message)
}
}
.on(
"postgres_changes",
filter: ChannelFilter(event: "DELETE", schema: "public")
) { [weak self] message in
await MainActor.run { [weak self] in
self?.deletes.append(message)
}
}

await publicSchema?.onError { @MainActor [weak self] _ in self?.channelStatus = "ERROR" }
await publicSchema?
.onClose { @MainActor [weak self] _ in self?.channelStatus = "Closed gracefully" }
await publicSchema?
.subscribe { @MainActor [weak self] state, _ in
self?.isJoined = await self?.publicSchema?.isJoined == true
switch state {
case .subscribed:
self?.channelStatus = "OK"
case .closed:
self?.channelStatus = "CLOSED"
case .timedOut:
self?.channelStatus = "Timed out"
case .channelError:
self?.channelStatus = "ERROR"
}
}

@State var socketStatus: String?
@State var channelStatus: String?
await supabase.realtime.connect()
await supabase.realtime.onOpen { @MainActor [weak self] in
self?.socketStatus = "OPEN"
}
await supabase.realtime.onClose { [weak self] _, _ in
await MainActor.run { [weak self] in
self?.socketStatus = "CLOSE"
}
}
await supabase.realtime.onError { @MainActor [weak self] error, _ in
self?.socketStatus = "ERROR: \(error.localizedDescription)"
}
}

func toggleSubscription() async {
if await publicSchema?.isJoined == true {
await publicSchema?.unsubscribe()
} else {
await createSubscription()
}
}
}

@State var publicSchema: RealtimeChannel?
struct ContentView: View {
@StateObject var model = ViewModel()

var body: some View {
List {
Section("INSERTS") {
ForEach(Array(zip(inserts.indices, inserts)), id: \.0) { _, message in
ForEach(Array(zip(model.inserts.indices, model.inserts)), id: \.0) { _, message in
Text(message.stringfiedPayload())
}
}

Section("UPDATES") {
ForEach(Array(zip(updates.indices, updates)), id: \.0) { _, message in
ForEach(Array(zip(model.updates.indices, model.updates)), id: \.0) { _, message in
Text(message.stringfiedPayload())
}
}

Section("DELETES") {
ForEach(Array(zip(deletes.indices, deletes)), id: \.0) { _, message in
ForEach(Array(zip(model.deletes.indices, model.deletes)), id: \.0) { _, message in
Text(message.stringfiedPayload())
}
}
Expand All @@ -42,77 +117,34 @@ struct ContentView: View {
VStack(alignment: .leading) {
Toggle(
"Toggle Subscription",
isOn: Binding(get: { publicSchema?.isJoined == true }, set: { _ in toggleSubscription() })
isOn: Binding(
get: { model.isJoined },
set: { _ in
Task {
await model.toggleSubscription()
}
}
)
)
Text("Socket: \(socketStatus ?? "")")
Text("Channel: \(channelStatus ?? "")")
Text("Socket: \(model.socketStatus ?? "")")
Text("Channel: \(model.channelStatus ?? "")")
}
.padding()
.background(.regularMaterial)
.padding()
}
.onAppear {
createSubscription()
}
}

func createSubscription() {
supabase.realtime.connect()

publicSchema = supabase.realtime.channel("public")
.on("postgres_changes", filter: ChannelFilter(event: "INSERT", schema: "public")) {
inserts.append($0)
}
.on("postgres_changes", filter: ChannelFilter(event: "UPDATE", schema: "public")) {
updates.append($0)
}
.on("postgres_changes", filter: ChannelFilter(event: "DELETE", schema: "public")) {
deletes.append($0)
}

publicSchema?.onError { _ in channelStatus = "ERROR" }
publicSchema?.onClose { _ in channelStatus = "Closed gracefully" }
publicSchema?
.subscribe { state, _ in
switch state {
case .subscribed:
channelStatus = "OK"
case .closed:
channelStatus = "CLOSED"
case .timedOut:
channelStatus = "Timed out"
case .channelError:
channelStatus = "ERROR"
}
}

supabase.realtime.connect()
supabase.realtime.onOpen {
socketStatus = "OPEN"
}
supabase.realtime.onClose {
socketStatus = "CLOSE"
}
supabase.realtime.onError { error, _ in
socketStatus = "ERROR: \(error.localizedDescription)"
}
}

func toggleSubscription() {
if publicSchema?.isJoined == true {
publicSchema?.unsubscribe()
} else {
createSubscription()
.task {
await model.createSubscription()
}
}
}

extension Message {
func stringfiedPayload() -> String {
do {
let data = try JSONSerialization.data(
withJSONObject: payload, options: [.prettyPrinted, .sortedKeys]
)
let encoder = JSONEncoder()
encoder.outputFormatting = [.prettyPrinted, .sortedKeys]
let data = try encoder.encode(payload)
return String(data: data, encoding: .utf8) ?? ""
} catch {
return ""
Expand Down
32 changes: 32 additions & 0 deletions Sources/Realtime/ArrayExtensions.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//
// ArrayExtensions.swift
//
//
// Created by Guilherme Souza on 28/11/23.
//

import Foundation

extension Array {
@_disfavoredOverload
@inlinable func filter(_ isIncluded: (Element) async throws -> Bool) async rethrows -> [Element] {
var result: [Element] = []
for element in self {
if try await isIncluded(element) {
result.append(element)
}
}
return result
}

@inlinable func first(where predicate: (Element) async throws -> Bool) async rethrows
-> Element?
{
for element in self {
if try await predicate(element) {
return element
}
}
return nil
}
}
6 changes: 3 additions & 3 deletions Sources/Realtime/RealtimeChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct Binding: Sendable {
let id: String?
}

public struct ChannelFilter {
public struct ChannelFilter: Sendable {
public let event: String?
public let schema: String?
public let table: String?
Expand Down Expand Up @@ -70,7 +70,7 @@ public enum RealtimeListenTypes: String {
}

/// Represents the broadcast and presence options for a channel.
public struct RealtimeChannelOptions {
public struct RealtimeChannelOptions: Sendable {
/// Used to track presence payload across clients. Must be unique per client. If `nil`, the server
/// will generate one.
var presenceKey: String?
Expand Down Expand Up @@ -393,7 +393,7 @@ public actor RealtimeChannel {
@discardableResult
public func subscribe(
timeout: TimeInterval? = nil,
callback: ((RealtimeSubscribeStates, Error?) async -> Void)? = nil
callback: (@Sendable (RealtimeSubscribeStates, Error?) async -> Void)? = nil
) async -> RealtimeChannel {
guard !joinedOnce else {
fatalError(
Expand Down
37 changes: 1 addition & 36 deletions Sources/Realtime/RealtimeClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ public actor RealtimeClient: PhoenixTransportDelegate {
///
/// - parameter callback: Called when the Socket is opened
@discardableResult
public func onOpen(callback: @escaping () async -> Void) -> String {
public func onOpen(callback: @escaping @Sendable () async -> Void) -> String {
onOpen { _ in await callback() }
}

Expand Down Expand Up @@ -884,38 +884,3 @@ extension RealtimeClient {
}
}
}

extension Array {
@inlinable mutating func removeAll(
where shouldBeRemoved: (Element) async throws
-> Bool
) async rethrows {
for (index, element) in zip(indices, self) {
if try await shouldBeRemoved(element) {
remove(at: index)
}
}
}

@_disfavoredOverload
@inlinable func filter(_ isIncluded: (Element) async throws -> Bool) async rethrows -> [Element] {
var result: [Element] = []
for element in self {
if try await isIncluded(element) {
result.append(element)
}
}
return result
}

@inlinable func first(where predicate: (Element) async throws -> Bool) async rethrows
-> Element?
{
for element in self {
if try await predicate(element) {
return element
}
}
return nil
}
}

0 comments on commit 253e019

Please sign in to comment.