diff --git a/src/Connectors/AsyncWebSocketConnector.swift b/src/Connectors/AsyncWebSocketConnector.swift new file mode 100644 index 0000000..95e50ec --- /dev/null +++ b/src/Connectors/AsyncWebSocketConnector.swift @@ -0,0 +1,93 @@ +import Foundation +#if canImport(FoundationNetworking) +import FoundationNetworking +#endif + +public final class AsyncWebSocketConnector: NSObject, Connector, Sendable { + + + @MainActor public private(set) var onDisconnect: (@Sendable () -> Void)? = nil + public let events: AsyncThrowingStream + + private let webSocket: URLSessionWebSocketTask + private let stream: AsyncThrowingStream.Continuation + private let task: Task + + private let encoder: JSONEncoder = { + let encoder = JSONEncoder() + encoder.keyEncodingStrategy = .convertToSnakeCase + return encoder + }() + + public init(connectingTo request: URLRequest) { + let (events, stream) = AsyncThrowingStream.makeStream(of: ServerEvent.self) + + let webSocket = URLSession.shared.webSocketTask(with: request) + webSocket.resume() + + task = Task.detached { [webSocket, stream] in + var isActive = true + + let decoder = JSONDecoder() + decoder.keyDecodingStrategy = .convertFromSnakeCase + + while isActive && webSocket.closeCode == .invalid && !Task.isCancelled { + guard webSocket.closeCode == .invalid else { + NSLog("πŸ•ΈοΈ socket closed WebSocketConnector") + stream.yield(error: RealtimeAPIError.disconnected(webSocket.closeCode)) + break + } + do { + let message = try await webSocket.receive() + + switch message { + case let .string(text): + do { + guard let data = text.data(using: .utf8) else { + stream.yield(error: RealtimeAPIError.invalidMessage) + continue + } + let event = try decoder.decode(ServerEvent.self, from: data) + stream.yield(event) + } catch { + NSLog("πŸ•ΈοΈ parse error WebSocketConnector") + stream.yield(error: error) + } + case .data: + NSLog("πŸ•ΈοΈ invalid type WebSocketConnector") + stream.yield(error: RealtimeAPIError.invalidMessage) + @unknown default: + NSLog("πŸ•ΈοΈ unexpected type WebSocketConnector") + stream.yield(error: RealtimeAPIError.invalidMessage) + } + } catch { + NSLog("πŸ•ΈοΈ catch WebSocketConnector") + stream.yield(error: error) + isActive = false + } + } + + webSocket.cancel(with: .goingAway, reason: nil) + } + + self.events = events + self.stream = stream + self.webSocket = webSocket + } + + deinit { + NSLog("πŸ•ΈοΈ deinit WebSocketConnector") + task.cancel() + stream.finish() + onDisconnect?() + } + + public func send(event: ClientEvent) async throws { + let message = try URLSessionWebSocketTask.Message.string(String(data: encoder.encode(event), encoding: .utf8)!) + try await webSocket.send(message) + } + + @MainActor public func onDisconnect(_ action: (@Sendable () -> Void)?) { + onDisconnect = action + } +} diff --git a/src/Conversation.swift b/src/Conversation.swift index 2b30489..f58cd1b 100644 --- a/src/Conversation.swift +++ b/src/Conversation.swift @@ -1,4 +1,4 @@ -import Foundation +import SwiftUI @preconcurrency import AVFoundation public enum ConversationError: Error { @@ -7,23 +7,35 @@ public enum ConversationError: Error { } @Observable -public final class Conversation: Sendable { +public final class Conversation: @unchecked Sendable { private let client: RealtimeAPI - @MainActor private var cancelTask: (() -> Void)? + public let voice: Session.Voice + private let errorStream: AsyncStream.Continuation + private let responseStream: AsyncStream.Continuation private let audioEngine = AVAudioEngine() + private let speedControl = AVAudioUnitVarispeed() + private let pitchControl = AVAudioUnitTimePitch() private let playerNode = AVAudioPlayerNode() private let queuedSamples = UnsafeMutableArray() private let apiConverter = UnsafeInteriorMutable() private let userConverter = UnsafeInteriorMutable() private let desiredFormat = AVAudioFormat(commonFormat: .pcmFormatInt16, sampleRate: 24000, channels: 1, interleaved: false)! + /// Local reference ID for this Conversation + public let id: UUID + /// A stream of errors that occur during the conversation. public let errors: AsyncStream + /// A stream of errors that occur during the conversation. + public let responses: AsyncStream + + private let audioInterruptHandler: AudioInterruptHandler + /// The unique ID of the conversation. - @MainActor public private(set) var id: String? + @MainActor public private(set) var conversationId: String? /// The current session for this conversation. @MainActor public private(set) var session: Session? @@ -55,55 +67,116 @@ public final class Conversation: Sendable { default: return nil } } } - - private init(client: RealtimeAPI) { - self.client = client + + @MainActor public var voiceSpeed: Float = 1.0 { // 0.25 - 4.0 + didSet { + speedControl.rate = voiceSpeed + } + } + @MainActor public var voicePitch: Float = 0 { // -2400 - 2400 + didSet { + pitchControl.pitch = voicePitch + } + } + + // Do not mutate this from anywhere other than the init. + private var task: Task! + + private init(id: UUID, client: RealtimeAPI, voice: Session.Voice = .alloy, voiceSpeed: Float = 1.0, voicePitch: Float = 0.0) { + self.id = id + self.client = client + self.voice = voice + audioInterruptHandler = AudioInterruptHandler() + speedControl.rate = voiceSpeed + pitchControl.pitch = voicePitch (errors, errorStream) = AsyncStream.makeStream(of: ServerError.self) - - let task = Task.detached { [weak self] in - guard let self else { return } - - for try await event in client.events { - await self.handleEvent(event) - } - - await MainActor.run { - self.connected = false + (responses, responseStream) = AsyncStream.makeStream(of: Response.self) + + audioInterruptHandler.audioInterrupted = { [weak self] reason in + self?.audioInterrupted(reason: reason) + } + audioInterruptHandler.audioInterruptionEnded = { [weak self] shouldResume in + self?.audioInterruptionEnded(shouldResume: shouldResume) + } + + let events = client.events + + self.task = Task.detached { [weak self] in + do { + for try await event in events { + guard !Task.isCancelled else { break } + await self?.handleEvent(event) + } + NSLog("πŸ—£οΈ Event stream completed successfully.") + } catch { + NSLog("πŸ—£οΈβŒ Event stream completed with error: \(error)") + self?.errorStream.yield( + ServerError( + type: "Disconnected", + code: nil, + message: "Error: \(error)", + param: nil, + eventId: nil + ) + ) + } + + await MainActor.run { [weak self] in + NSLog("πŸ—£οΈ Setting Conversation Disconnected...") + self?.setDisconnected() } } - Task { @MainActor in - self.cancelTask = task.cancel + Task { @MainActor in + self.voiceSpeed = voiceSpeed + self.voicePitch = voicePitch client.onDisconnect = { [weak self] in - guard let self else { return } - + guard let self else { return } + Task { @MainActor in - self.connected = false + self.setDisconnected() } } - _keepIsPlayingPropertyUpdated() + _keepIsPlayingPropertyUpdated() } } deinit { + NSLog("πŸ—£οΈ deinit Conversation (id: \(id))") + task.cancel() errorStream.finish() - - DispatchQueue.main.asyncAndWait { - cancelTask?() - stopHandlingVoice() - } + responseStream.finish() + + Task { [playerNode, audioEngine] in + Self.cleanUpAudio( + playerNode: playerNode, + audioEngine: audioEngine + ) + } } /// Create a new conversation providing an API token and, optionally, a model. - public convenience init(authToken token: String, model: String = "gpt-4o-realtime-preview") { - self.init(client: RealtimeAPI.webSocket(authToken: token, model: model)) + public convenience init(id: UUID, authToken token: String, model: String = "gpt-4o-realtime-preview", voice: Session.Voice = .alloy, voiceSpeed: Float = 1.0, voicePitch: Float = 0.0) { + self.init( + id: id, + client: RealtimeAPI.webSocket(authToken: token, model: model), + voice: voice, + voiceSpeed: voiceSpeed, + voicePitch: voicePitch + ) } /// Create a new conversation that connects using a custom `URLRequest`. - public convenience init(connectingTo request: URLRequest) { - self.init(client: RealtimeAPI.webSocket(connectingTo: request)) + public convenience init(id: UUID, connectingTo request: URLRequest, voice: Session.Voice = .alloy, voiceSpeed: Float = 1.0, voicePitch: Float = 0.0) { + self.init( + id: id, + client: RealtimeAPI.webSocket(connectingTo: request), + voice: voice, + voiceSpeed: voiceSpeed, + voicePitch: voicePitch + ) } /// Wait for the connection to be established @@ -184,8 +257,8 @@ public extension Conversation { guard !isListening else { return } if !handlingVoice { try startHandlingVoice() } - Task.detached { - self.audioEngine.inputNode.installTap(onBus: 0, bufferSize: 4096, format: self.audioEngine.inputNode.outputFormat(forBus: 0)) { [weak self] buffer, _ in + Task.detached { [audioEngine] in + audioEngine.inputNode.installTap(onBus: 0, bufferSize: 4096, format: audioEngine.inputNode.outputFormat(forBus: 0)) { [weak self] buffer, _ in self?.processAudioBufferFromUser(buffer: buffer) } } @@ -211,14 +284,13 @@ public extension Conversation { } userConverter.set(converter) - #if os(iOS) - let audioSession = AVAudioSession.sharedInstance() - try audioSession.setCategory(.playAndRecord, mode: .voiceChat, options: [.defaultToSpeaker, .allowBluetooth]) - try audioSession.setActive(true) - #endif - audioEngine.attach(playerNode) - audioEngine.connect(playerNode, to: audioEngine.mainMixerNode, format: converter.inputFormat) + audioEngine.attach(pitchControl) + audioEngine.attach(speedControl) + + audioEngine.connect(playerNode, to: speedControl, format: converter.inputFormat) + audioEngine.connect(speedControl, to: pitchControl, format: converter.inputFormat) + audioEngine.connect(pitchControl, to: audioEngine.mainMixerNode, format: converter.inputFormat) #if os(iOS) try audioEngine.inputNode.setVoiceProcessingEnabled(true) @@ -227,6 +299,13 @@ public extension Conversation { audioEngine.prepare() do { try audioEngine.start() + + #if os(iOS) + let audioSession = AVAudioSession.sharedInstance() + try audioSession.setCategory(.playAndRecord, mode: .voiceChat, options: [.defaultToSpeaker, .allowBluetooth]) + try audioSession.setActive(true) + #endif + handlingVoice = true } catch { print("Failed to enable audio engine: \(error)") @@ -248,7 +327,7 @@ public extension Conversation { { let audioTimeInMiliseconds = Int((Double(playerTime.sampleTime) / playerTime.sampleRate) * 1000) - Task { + Task { [client] in do { try await client.send(event: .truncateConversationItem(forItem: itemID, atAudioMs: audioTimeInMiliseconds)) } catch { @@ -264,39 +343,77 @@ public extension Conversation { /// Stop playing audio responses from the model and listening to the user's microphone. @MainActor func stopHandlingVoice() { guard handlingVoice else { return } + NSLog("πŸ—£οΈ Conversation stopHandlingVoice") - audioEngine.inputNode.removeTap(onBus: 0) - audioEngine.stop() - audioEngine.disconnectNodeInput(playerNode) - audioEngine.disconnectNodeOutput(playerNode) - - #if os(iOS) - try? AVAudioSession.sharedInstance().setActive(false) - #elseif os(macOS) - if audioEngine.isRunning { - audioEngine.stop() - audioEngine.reset() - } - #endif + Self.cleanUpAudio( + playerNode: playerNode, + audioEngine: audioEngine + ) isListening = false handlingVoice = false } + + static func cleanUpAudio(playerNode: AVAudioPlayerNode, audioEngine: AVAudioEngine) { + // If attachedNodes does not contain the playerNode then `startHandlingVoice` was never called + guard audioEngine.attachedNodes.contains(playerNode) else { return } + + audioEngine.inputNode.removeTap(onBus: 0) + audioEngine.stop() + audioEngine.disconnectNodeInput(playerNode) + audioEngine.disconnectNodeOutput(playerNode) + + #if os(iOS) + try? AVAudioSession.sharedInstance().setActive(false) + #elseif os(macOS) + if audioEngine.isRunning { + audioEngine.stop() + audioEngine.reset() + } + #endif + + NSLog("πŸ—£οΈ AudioEngine Cleaned-up") + } } /// Event handling private API private extension Conversation { + + @MainActor func setConnected() { + withAnimation { + connected = true + } + } + + @MainActor func setDisconnected() { + withAnimation { + connected = false + } + } + + @MainActor func setUserSpeaking() { + withAnimation { + isUserSpeaking = true + } + } + + @MainActor func setUserNotSpeaking() { + withAnimation { + isUserSpeaking = false + } + } + @MainActor func handleEvent(_ event: ServerEvent) { switch event { case let .error(event): errorStream.yield(event.error) case let .sessionCreated(event): - connected = true + setConnected() session = event.session case let .sessionUpdated(event): session = event.session case let .conversationCreated(event): - id = event.conversation.id + conversationId = event.conversation.id case let .conversationItemCreated(event): entries.append(event.item) case let .conversationItemDeleted(event): @@ -355,10 +472,12 @@ private extension Conversation { functionCall.arguments = event.arguments } case .inputAudioBufferSpeechStarted: - isUserSpeaking = true + setUserSpeaking() if handlingVoice { interruptSpeech() } case .inputAudioBufferSpeechStopped: - isUserSpeaking = false + setUserNotSpeaking() + case .responseCreated(let event), .responseDone(let event): + responseStream.yield(event.response) case let .responseOutputItemDone(event): updateEvent(id: event.item.id) { message in guard case let .message(newMessage) = event.item else { return } @@ -482,12 +601,42 @@ extension Conversation { /// This is because updating the `queuedSamples` array on a background thread will trigger a re-render of any views that depend on it on that thread. /// So, instead, we observe the property and update `isPlaying` on the main actor. private func _keepIsPlayingPropertyUpdated() { - withObservationTracking { _ = queuedSamples.isEmpty } onChange: { + withObservationTracking { _ = queuedSamples.isEmpty } onChange: { [weak self] in Task { @MainActor in - self.isPlaying = self.queuedSamples.isEmpty + guard let self else { return } + withAnimation { + self.isPlaying = self.queuedSamples.isEmpty + } } - self._keepIsPlayingPropertyUpdated() + self?._keepIsPlayingPropertyUpdated() } } } + +extension Conversation { + func audioInterrupted(reason: AVAudioSession.InterruptionReason?) { + Task { @MainActor in + NSLog("πŸ—£οΈπŸš« Audio Interrupted (reason: \(String(describing: reason))), stopping voice handling...") + stopListening() + // If you stop handling voice here you won't get the interrupt ended event. + } + } + + func audioInterruptionEnded(shouldResume: Bool) { + guard shouldResume else { + NSLog("πŸ—£οΈπŸš« Audio Interrupt ended, should resume false.") + return + } + Task { @MainActor in + do { + // We need to stop handling voice to reset the audio stack + stopHandlingVoice() + try startListening() + NSLog("πŸ—£οΈπŸŸ’ Audio Interrupt ended, voice handling can restart.") + } catch { + NSLog("πŸ—£οΈπŸš« Audio Interrupt ended, should not restart: \(error)") + } + } + } +} diff --git a/src/Models/Item.swift b/src/Models/Item.swift index e2a7b67..ec6e40b 100644 --- a/src/Models/Item.swift +++ b/src/Models/Item.swift @@ -30,7 +30,7 @@ public enum Item: Identifiable, Equatable, Sendable { case audio(Audio) } - public struct Message: Codable, Equatable, Sendable { + public struct Message: Codable, Equatable, Sendable, Identifiable { public enum Content: Equatable, Sendable { case text(String) case audio(Audio) @@ -70,7 +70,7 @@ public enum Item: Identifiable, Equatable, Sendable { } } - public struct FunctionCall: Codable, Equatable, Sendable { + public struct FunctionCall: Codable, Equatable, Sendable, Identifiable { /// The unique ID of the item. public var id: String /// The type of the item @@ -85,7 +85,7 @@ public enum Item: Identifiable, Equatable, Sendable { public var arguments: String } - public struct FunctionCallOutput: Codable, Equatable, Sendable { + public struct FunctionCallOutput: Codable, Equatable, Sendable, Identifiable { /// The unique ID of the item. public var id: String /// The type of the item diff --git a/src/OpenAI.swift b/src/OpenAI.swift index 0254e85..863e06b 100644 --- a/src/OpenAI.swift +++ b/src/OpenAI.swift @@ -5,6 +5,7 @@ import FoundationNetworking enum RealtimeAPIError: Error { case invalidMessage + case disconnected(URLSessionWebSocketTask.CloseCode) } public final class RealtimeAPI: NSObject, Sendable { @@ -35,7 +36,7 @@ public final class RealtimeAPI: NSObject, Sendable { extension RealtimeAPI { /// Connect to the OpenAI WebSocket Realtime API with the given request. static func webSocket(connectingTo request: URLRequest) -> RealtimeAPI { - RealtimeAPI(connector: WebSocketConnector(connectingTo: request)) + RealtimeAPI(connector: AsyncWebSocketConnector(connectingTo: request)) } /// Connect to the OpenAI WebSocket Realtime API with the given authentication token and model. diff --git a/src/Support/AudioInterruptHandler.swift b/src/Support/AudioInterruptHandler.swift new file mode 100644 index 0000000..25979d5 --- /dev/null +++ b/src/Support/AudioInterruptHandler.swift @@ -0,0 +1,95 @@ +// +// AudioInterruptHandler.swift +// OpenAI +// +// Created by Eric DeLabar on 2/20/25. +// + +import Foundation +import AVFoundation +import os + +protocol AudioInterruptHandlerDelegate: Sendable { + func audioInterrupted(reason: AVAudioSession.InterruptionReason?) + func audioInterruptionEnded(shouldResume: Bool) +} + +final class AudioInterruptHandler: NSObject, @unchecked Sendable { + + let audioInterruptedLock = OSAllocatedUnfairLock() + var _audioInterrupted: (@Sendable (AVAudioSession.InterruptionReason?) -> Void)? + var audioInterrupted: (@Sendable (AVAudioSession.InterruptionReason?) -> Void)? { + get { + audioInterruptedLock.withLock { self._audioInterrupted } + } + set { + audioInterruptedLock.withLock { self._audioInterrupted = newValue } + } + } + + let audioInterruptionEndedLock = OSAllocatedUnfairLock() + var _audioInterruptionEnded: (@Sendable (Bool) -> Void)? + var audioInterruptionEnded: (@Sendable (Bool) -> Void)? { + get { + audioInterruptionEndedLock.withLock { self._audioInterruptionEnded } + } + set { + audioInterruptionEndedLock.withLock { self._audioInterruptionEnded = newValue } + } + } + + override init() { + super.init() + + // Get the default notification center instance. + let nc = NotificationCenter.default + nc.addObserver(self, + selector: #selector(handleAudioSessionInterruption), + name: AVAudioSession.interruptionNotification, + object: AVAudioSession.sharedInstance()) + } + + deinit { + NotificationCenter.default.removeObserver(self) + } + + @MainActor + @objc func handleAudioSessionInterruption(notification: Notification) { + guard let info = notification.userInfo else { + return + } + + var interruptReason: AVAudioSession.InterruptionReason? + if let reasonValue = info[AVAudioSessionInterruptionReasonKey] as? UInt, + let reason = AVAudioSession.InterruptionReason(rawValue: reasonValue) { + interruptReason = reason + } + + guard let typeValue = info[AVAudioSessionInterruptionTypeKey] as? UInt, + let type = AVAudioSession.InterruptionType(rawValue: typeValue) else { + return + } + + switch type { + case .began: // system began interrupting the audio + + audioInterrupted?(interruptReason) + + case .ended: // system ended interrupting the audio + guard let info = notification.userInfo else { + return + } + + guard let optionValue = info[AVAudioSessionInterruptionOptionKey] as? UInt else { + return + } + let options = AVAudioSession.InterruptionOptions(rawValue: optionValue) + + audioInterruptionEnded?(options.contains(.shouldResume)) + + @unknown default: + fatalError("🎀❌ Unknown AVAudioSession.InterruptionType: \(typeValue)") + } + } + +}