diff --git a/README.md b/README.md index 96a31cc..16986ee 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ The following RTVI transports are available in this repository: Add the following dependency to your `build.gradle` file: ``` -implementation "ai.pipecat:daily-transport:1.0.3" +implementation "ai.pipecat:daily-transport:1.1.0" ``` Instantiate from your code: @@ -32,7 +32,7 @@ val options = PipecatClientOptions(callbacks = callbacks) val client: PipecatClientDaily = PipecatClient(DailyTransport(context), options) -client.startBotAndConnect(startBotParams).withCallback { +client.startBotAndConnect(apiRequest).withCallback { // ... } ``` @@ -125,23 +125,26 @@ client.connect().withCallback { Add the following dependency to your `build.gradle` file: ``` -implementation "ai.pipecat:small-webrtc-transport:0.3.7" +implementation "ai.pipecat:small-webrtc-transport:1.1.0" ``` Instantiate from your code: ```kotlin -val options = RTVIClientOptions( - params = RTVIClientParams(baseUrl = null), - enableMic = true, - enableCam = true -) +val callbacks = object : PipecatEventCallbacks() { -val connectionUrl = "http://localhost:7860/api/offer" + override fun onBackendError(message: String) { + Log.e(TAG, "Error from backend: $message") + } -val client = RTVIClient(SmallWebRTCTransport.Factory(context, connectionUrl), callbacks, options) + // ... +} -client.connect().withCallback { +val options = PipecatClientOptions(callbacks = callbacks) + +val client: PipecatClientSmallWebRTC = PipecatClient(SmallWebRTCTransport(context), options) + +client.startBotAndConnect(apiRequest).withCallback { // ... } -``` \ No newline at end of file +``` diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 7e9c5fc..76eef58 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -10,7 +10,7 @@ dokka = "1.9.20" androidxTest = "1.6.1" ktor = "2.3.5" okhttp = "4.12.0" -pipecatClient = "1.0.2" +pipecatClient = "1.1.0" [libraries] androidx-core-ktx = { group = "androidx.core", name = "core-ktx", version.ref = "coreKtx" } diff --git a/pipecat-client-android-daily/build.gradle.kts b/pipecat-client-android-daily/build.gradle.kts index 825ba60..a93c296 100644 --- a/pipecat-client-android-daily/build.gradle.kts +++ b/pipecat-client-android-daily/build.gradle.kts @@ -1,4 +1,4 @@ -val libraryVersion = "1.0.3" +val libraryVersion = "1.1.0" plugins { alias(libs.plugins.android.library) diff --git a/pipecat-client-android-daily/src/main/java/ai/pipecat/client/daily/DailyTransport.kt b/pipecat-client-android-daily/src/main/java/ai/pipecat/client/daily/DailyTransport.kt index 79d64b2..0cfd1f3 100644 --- a/pipecat-client-android-daily/src/main/java/ai/pipecat/client/daily/DailyTransport.kt +++ b/pipecat-client-android-daily/src/main/java/ai/pipecat/client/daily/DailyTransport.kt @@ -9,6 +9,7 @@ import ai.pipecat.client.transport.MsgClientToServer import ai.pipecat.client.transport.MsgServerToClient import ai.pipecat.client.transport.Transport import ai.pipecat.client.transport.TransportContext +import ai.pipecat.client.types.APIRequest import ai.pipecat.client.types.MediaDeviceId import ai.pipecat.client.types.MediaDeviceInfo import ai.pipecat.client.types.Participant @@ -238,7 +239,10 @@ class DailyTransport( thread = ctx.thread } - override fun deserializeConnectParams(json: String): DailyTransportConnectParams { + override fun deserializeConnectParams( + json: String, + startBotRequest: APIRequest + ): DailyTransportConnectParams { val authBundle: DailyTransportAuthBundle = JSON_INSTANCE.decodeFromString(json) val room = authBundle.actualRoom() ?: throw Exception("dailyRoom not set") diff --git a/pipecat-client-android-small-webrtc-transport/build.gradle.kts b/pipecat-client-android-small-webrtc-transport/build.gradle.kts index 0f21e6a..2912cfb 100644 --- a/pipecat-client-android-small-webrtc-transport/build.gradle.kts +++ b/pipecat-client-android-small-webrtc-transport/build.gradle.kts @@ -1,3 +1,5 @@ +val libraryVersion = "1.1.0" + plugins { alias(libs.plugins.android.library) alias(libs.plugins.jetbrains.kotlin.android) @@ -9,11 +11,12 @@ plugins { android { namespace = "ai.pipecat.client.small_webrtc_transport" - compileSdk = 34 + compileSdk = 35 defaultConfig { minSdk = 24 testInstrumentationRunner = "androidx.test.runner.AndroidJUnitRunner" + buildConfigField("String", "VERSION_NAME", "\"$libraryVersion\"") } buildTypes { @@ -27,8 +30,8 @@ android { } compileOptions { - sourceCompatibility = JavaVersion.VERSION_1_8 - targetCompatibility = JavaVersion.VERSION_1_8 + sourceCompatibility = JavaVersion.VERSION_11 + targetCompatibility = JavaVersion.VERSION_11 } lint { @@ -36,7 +39,11 @@ android { } kotlinOptions { - jvmTarget = "1.8" + jvmTarget = "11" + } + + buildFeatures { + buildConfig = true } sourceSets { @@ -56,8 +63,7 @@ dependencies { implementation(libs.ktor.client.content.negotiation) implementation(libs.ktor.serialization.kotlinx.json) - // Temporary override until transport is updated to 1.0.0 - api("ai.pipecat:client:0.3.4") + api(libs.pipecat.client) androidTestImplementation(libs.androidx.runner) androidTestImplementation(libs.androidx.rules) @@ -76,7 +82,7 @@ publishing { register("release") { groupId = "ai.pipecat" artifactId = "small-webrtc-transport" - version = "0.3.7" + version = libraryVersion pom { name.set("Small WebRTC Transport") diff --git a/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/IceCandidatesRequestBody.kt b/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/IceCandidatesRequestBody.kt new file mode 100644 index 0000000..5615a1f --- /dev/null +++ b/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/IceCandidatesRequestBody.kt @@ -0,0 +1,22 @@ +package ai.pipecat.client.small_webrtc_transport + +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable + +@Serializable +internal data class IceCandidatesRequestBody( + @SerialName("pc_id") + val pcId: String, + val candidates: List, +) + +@Serializable +internal data class IceCandidateItem( + val candidate: String, + @SerialName("sdp_mid") + val sdpMid: String?, + @SerialName("sdp_mline_index") + val sdpMLineIndex: Int, +) + + diff --git a/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/IceConfig.kt b/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/IceConfig.kt new file mode 100644 index 0000000..8608f50 --- /dev/null +++ b/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/IceConfig.kt @@ -0,0 +1,15 @@ +package ai.pipecat.client.small_webrtc_transport + +import kotlinx.serialization.Serializable + +@Serializable +data class IceConfig( + val iceServers: List +) + +@Serializable +data class IceServer( + val urls: List, + val username: String? = null, + val credential: String? = null, +) \ No newline at end of file diff --git a/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/OfferRequestBody.kt b/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/OfferRequestBody.kt index ee54c79..f771505 100644 --- a/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/OfferRequestBody.kt +++ b/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/OfferRequestBody.kt @@ -1,5 +1,6 @@ package ai.pipecat.client.small_webrtc_transport +import ai.pipecat.client.types.Value import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable @@ -10,5 +11,7 @@ internal data class OfferRequestBody( @SerialName("pc_id") val pcId: String?, @SerialName("restart_pc") - val restartPc: Boolean + val restartPc: Boolean, + @SerialName("request_data") + val requestData: Value ) \ No newline at end of file diff --git a/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/OutboundSignalling.kt b/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/OutboundSignalling.kt new file mode 100644 index 0000000..d650050 --- /dev/null +++ b/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/OutboundSignalling.kt @@ -0,0 +1,43 @@ +package ai.pipecat.client.small_webrtc_transport + +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable + +/** + * Outbound signalling messages sent over the WebRTC data channel. + */ +@Serializable +internal data class OutboundSignallingEvent( + val type: String, + val message: TrackStatusMessage, +) { + companion object { + fun create(message: TrackStatusMessage) = OutboundSignallingEvent( + type = "signalling", + message = message + ) + } +} + +@Serializable +internal data class TrackStatusMessage( + val type: String, + @SerialName("receiver_index") + val receiverIndex: Int, + val enabled: Boolean, +) { + companion object { + fun create(receiverIndex: Int, enabled: Boolean) = TrackStatusMessage( + type = "trackStatus", + receiverIndex = receiverIndex, + enabled = enabled + ) + } +} + +internal object SmallWebRTCTransceiverIndex { + const val AUDIO: Int = 0 + const val VIDEO: Int = 1 +} + + diff --git a/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/PipecatClientSmallWebRTC.kt b/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/PipecatClientSmallWebRTC.kt new file mode 100644 index 0000000..a4f3dc7 --- /dev/null +++ b/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/PipecatClientSmallWebRTC.kt @@ -0,0 +1,5 @@ +package ai.pipecat.client.small_webrtc_transport + +import ai.pipecat.client.PipecatClient + +typealias PipecatClientSmallWebRTC = PipecatClient \ No newline at end of file diff --git a/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/SmallWebRTCStartBotResult.kt b/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/SmallWebRTCStartBotResult.kt new file mode 100644 index 0000000..ab1a510 --- /dev/null +++ b/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/SmallWebRTCStartBotResult.kt @@ -0,0 +1,9 @@ +package ai.pipecat.client.small_webrtc_transport + +import kotlinx.serialization.Serializable + +@Serializable +internal data class SmallWebRTCStartBotResult( + val sessionId: String, + val iceConfig: IceConfig? = null +) \ No newline at end of file diff --git a/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/SmallWebRTCTransport.kt b/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/SmallWebRTCTransport.kt index 70b6e38..38d6677 100644 --- a/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/SmallWebRTCTransport.kt +++ b/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/SmallWebRTCTransport.kt @@ -5,24 +5,40 @@ import ai.pipecat.client.result.RTVIError import ai.pipecat.client.result.resolvedPromiseErr import ai.pipecat.client.result.resolvedPromiseOk import ai.pipecat.client.result.withPromise -import ai.pipecat.client.transport.AuthBundle import ai.pipecat.client.transport.MsgClientToServer import ai.pipecat.client.transport.MsgServerToClient import ai.pipecat.client.transport.Transport import ai.pipecat.client.transport.TransportContext -import ai.pipecat.client.transport.TransportFactory +import ai.pipecat.client.types.APIRequest import ai.pipecat.client.types.MediaDeviceId import ai.pipecat.client.types.MediaDeviceInfo import ai.pipecat.client.types.Participant import ai.pipecat.client.types.ParticipantId import ai.pipecat.client.types.TransportState +import ai.pipecat.client.types.Value +import ai.pipecat.client.utils.ThreadRef import android.annotation.SuppressLint import android.content.Context import android.media.AudioManager import android.util.Log +import io.ktor.client.HttpClient +import io.ktor.client.engine.android.Android +import io.ktor.client.plugins.ResponseException +import io.ktor.client.plugins.contentnegotiation.ContentNegotiation +import io.ktor.client.request.header +import io.ktor.client.request.patch +import io.ktor.client.request.setBody +import io.ktor.client.statement.bodyAsText +import io.ktor.http.ContentType +import io.ktor.http.contentType +import io.ktor.serialization.kotlinx.json.json +import kotlinx.coroutines.Job import kotlinx.coroutines.MainScope +import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.serialization.json.decodeFromJsonElement +import org.webrtc.IceCandidate +import java.util.concurrent.atomic.AtomicBoolean private val BOT_PARTICIPANT = Participant( @@ -37,11 +53,10 @@ private val LOCAL_PARTICIPANT = Participant( local = true ) -class SmallWebRTCTransport internal constructor( - private val transportContext: TransportContext, - androidContext: Context, - private val serverUrl: String, -) : Transport() { +class SmallWebRTCTransport( + context: Context, + private val iceConfig: IceConfig? = null, +) : Transport() { companion object { private const val TAG = "SmallWebRTCTransport" @@ -64,30 +79,60 @@ class SmallWebRTCTransport internal constructor( val Rear = MediaDeviceInfo(id = MediaDeviceId("cam-rear"), name = "Rear Camera") } - class Factory( - private val androidContext: Context, - private val serverUrl: String - ) : TransportFactory { - override fun createTransport(context: TransportContext): Transport { - return SmallWebRTCTransport(context, androidContext, serverUrl) - } - } + private lateinit var transportContext: TransportContext + private lateinit var thread: ThreadRef private var state = TransportState.Disconnected - private val appContext = androidContext.applicationContext - private val thread = transportContext.thread + private val appContext = context.applicationContext private var client: WebRTCClient? = null private var selectedCam = CameraMode.Front + private var connectParams: SmallWebRTCTransportConnectParams? = null + + // Trickle ICE batching (send queued candidates every ~200ms via PATCH) + private val canSendIceCandidates = AtomicBoolean(false) + private val candidateQueue: MutableList = mutableListOf() + private var flushJob: Job? = null + private val flushDelayMs: Long = 200 + + override fun initialize(ctx: TransportContext) { + transportContext = ctx + thread = ctx.thread + } override fun initDevices(): Future = resolvedPromiseOk(thread, Unit) + private fun sendSignallingMessage(message: TrackStatusMessage) { + val currentClient = client ?: return + currentClient.sendDataMessage( + OutboundSignallingEvent.serializer(), + OutboundSignallingEvent.create(message = message) + ) + } + + private fun syncTrackStatus() { + sendSignallingMessage( + TrackStatusMessage.create( + receiverIndex = SmallWebRTCTransceiverIndex.AUDIO, + enabled = isMicEnabled() + ) + ) + sendSignallingMessage( + TrackStatusMessage.create( + receiverIndex = SmallWebRTCTransceiverIndex.VIDEO, + enabled = isCamEnabled() + ) + ) + } + @SuppressLint("MissingPermission") - override fun connect(authBundle: AuthBundle?): Future = + override fun connect( + transportParams: SmallWebRTCTransportConnectParams + ): Future = thread.runOnThreadReturningFuture { - Log.i(TAG, "connect(${authBundle})") + Log.i(TAG, "connect(${transportParams})") if (client != null) { return@runOnThreadReturningFuture resolvedPromiseErr( @@ -97,6 +142,17 @@ class SmallWebRTCTransport internal constructor( } setState(TransportState.Connecting) + connectParams = transportParams + canSendIceCandidates.set(false) + candidateQueue.clear() + flushJob?.cancel() + flushJob = null + + val iceConfig = transportParams.iceConfig ?: this.iceConfig ?: IceConfig( + iceServers = listOf(IceServer( + urls = listOf("stun:stun.l.google.com:19302") + )) + ) try { client = WebRTCClient( @@ -114,7 +170,7 @@ class SmallWebRTCTransport internal constructor( disconnect() } - "renegotiate" -> negotiate() + "renegotiate" -> negotiate(transportParams) } } else { @@ -132,6 +188,9 @@ class SmallWebRTCTransport internal constructor( mic = mic ) }, + onNewIceCandidate = { + onNewIceCandidate(it) + }, context = appContext, thread = transportContext.thread, initialCamMode = if (transportContext.options.enableCam) { @@ -139,7 +198,9 @@ class SmallWebRTCTransport internal constructor( } else { null }, - initialMicEnabled = transportContext.options.enableMic + initialMicEnabled = transportContext.options.enableMic, + rtviProtocolVersion = transportContext.protocolVersion, + iceConfig = iceConfig ) } catch (e: Exception) { return@runOnThreadReturningFuture resolvedPromiseErr( @@ -148,21 +209,93 @@ class SmallWebRTCTransport internal constructor( ) } - negotiate() + negotiate(transportParams) + } + + private fun onNewIceCandidate(iceCandidate: IceCandidate) { + thread.assertCurrent() + candidateQueue.add(iceCandidate) + + if (flushJob == null) { + flushJob = MainScope().launch { + delay(flushDelayMs) + thread.runOnThread { + flushIceCandidates() + } + } + } + } + + private fun flushIceCandidates() = thread.runOnThread { + flushJob = null + + if (!canSendIceCandidates.get()) return@runOnThread + + val currentConnectParams = connectParams ?: return@runOnThread + val pcId = client?.getPcId() ?: return@runOnThread + + if (candidateQueue.isEmpty()) return@runOnThread + + val candidates = candidateQueue.toList() + candidateQueue.clear() + + val requestBody = IceCandidatesRequestBody( + pcId = pcId, + candidates = candidates.map { + IceCandidateItem( + candidate = it.sdp, + sdpMid = it.sdpMid, + sdpMLineIndex = it.sdpMLineIndex + ) + } + ) + + Log.i(TAG, "Flushing ${requestBody.candidates.size} ICE candidates") + + MainScope().launch { + try { + HttpClient(Android) { + install(ContentNegotiation) { + json(JSON_INSTANCE) + } + }.use { httpClient -> + try { + val response = httpClient.patch(currentConnectParams.webrtcRequestParams.endpoint) { + contentType(ContentType.Application.Json) + setBody(requestBody) + currentConnectParams.webrtcRequestParams.headers.forEach(this::header) + } + // Ensure the request completes and surface any non-2xx response bodies. + response.bodyAsText() + + } catch (e: ResponseException) { + val errorBody = e.response.bodyAsText() + val status = e.response.status.value + Log.e(TAG, "ICE candidate PATCH failed ($status): $errorBody", e) + } + } + } catch (e: Exception) { + Log.e(TAG, "Failed to send ICE candidates", e) + } } + } - private fun negotiate() = withPromise(thread) { promise -> + private fun negotiate( + connectParams: SmallWebRTCTransportConnectParams + ) = withPromise(thread) { promise -> MainScope().launch { try { - client?.negotiateConnection( - url = serverUrl, - restartPc = false - ) + client?.negotiateConnection(connectParams) + canSendIceCandidates.set(true) + flushIceCandidates() + + client?.waitForDataChannelOpen() val cb = transportContext.callbacks setState(TransportState.Connected) + syncTrackStatus() cb.onConnected() cb.onParticipantJoined(LOCAL_PARTICIPANT) cb.onParticipantJoined(BOT_PARTICIPANT) @@ -174,17 +307,42 @@ class SmallWebRTCTransport internal constructor( } } + override fun deserializeConnectParams( + json: String, + startBotRequest: APIRequest + ): SmallWebRTCTransportConnectParams { + val startBotResult = JSON_INSTANCE.decodeFromString(json) + + return SmallWebRTCTransportConnectParams( + webrtcRequestParams = APIRequest( + endpoint = startBotRequest.endpoint.replace( + "/start", + "/sessions/${startBotResult.sessionId}/api/offer" + ), + requestData = Value.Object(), + headers = startBotRequest.headers + ), + iceConfig = startBotResult.iceConfig + ) + } + override fun disconnect(): Future = thread.runOnThreadReturningFuture { withPromise(thread) { promise -> val clientRef = client client = null + connectParams = null + canSendIceCandidates.set(false) + candidateQueue.clear() + flushJob?.cancel() + flushJob = null MainScope().launch { try { if (clientRef != null) { clientRef.dispose() setState(TransportState.Disconnected) + transportContext.onConnectionEnd() transportContext.callbacks.onDisconnected() } promise.resolveOk(Unit) @@ -253,14 +411,29 @@ class SmallWebRTCTransport internal constructor( override fun isMicEnabled() = client?.micEnabled ?: false - override fun enableMic(enable: Boolean): Future = client?.setMicEnabled(enable) - ?: resolvedPromiseErr(thread, RTVIError.TransportNotInitialized) - - override fun expiry() = null + override fun enableMic(enable: Boolean): Future { + val result = client?.setMicEnabled(enable) + ?: return resolvedPromiseErr(thread, RTVIError.TransportNotInitialized) + sendSignallingMessage( + TrackStatusMessage.create( + receiverIndex = SmallWebRTCTransceiverIndex.AUDIO, + enabled = enable + ) + ) + return result + } - override fun enableCam(enable: Boolean): Future = - client?.setCamMode(if (enable) selectedCam else null) - ?: resolvedPromiseErr(thread, RTVIError.TransportNotInitialized) + override fun enableCam(enable: Boolean): Future { + val result = client?.setCamMode(if (enable) selectedCam else null) + ?: return resolvedPromiseErr(thread, RTVIError.TransportNotInitialized) + sendSignallingMessage( + TrackStatusMessage.create( + receiverIndex = SmallWebRTCTransceiverIndex.VIDEO, + enabled = enable + ) + ) + return result + } override fun tracks() = client?.getTracks() ?: EMPTY_TRACKS diff --git a/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/SmallWebRTCTransportConnectParams.kt b/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/SmallWebRTCTransportConnectParams.kt new file mode 100644 index 0000000..554b3df --- /dev/null +++ b/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/SmallWebRTCTransportConnectParams.kt @@ -0,0 +1,8 @@ +package ai.pipecat.client.small_webrtc_transport + +import ai.pipecat.client.types.APIRequest + +data class SmallWebRTCTransportConnectParams( + val webrtcRequestParams: APIRequest, + val iceConfig: IceConfig? = null +) \ No newline at end of file diff --git a/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/VersionUtils.kt b/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/VersionUtils.kt new file mode 100644 index 0000000..8206666 --- /dev/null +++ b/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/VersionUtils.kt @@ -0,0 +1,3 @@ +package ai.pipecat.client.small_webrtc_transport + +val SMALL_WEBRTC_TRANSPORT_VERSION: String = ai.pipecat.client.small_webrtc_transport.BuildConfig.VERSION_NAME \ No newline at end of file diff --git a/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/WebRTCClient.kt b/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/WebRTCClient.kt index 3176427..6e3f9db 100644 --- a/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/WebRTCClient.kt +++ b/pipecat-client-android-small-webrtc-transport/src/main/java/ai/pipecat/client/small_webrtc_transport/WebRTCClient.kt @@ -12,17 +12,20 @@ import ai.pipecat.client.types.ParticipantTracks import ai.pipecat.client.types.Tracks import ai.pipecat.client.utils.ThreadRef import android.content.Context +import android.os.Build import android.util.Log import io.ktor.client.HttpClient import io.ktor.client.engine.android.Android import io.ktor.client.plugins.ResponseException import io.ktor.client.plugins.contentnegotiation.ContentNegotiation +import io.ktor.client.request.header import io.ktor.client.request.post import io.ktor.client.request.setBody import io.ktor.client.statement.bodyAsText import io.ktor.http.ContentType import io.ktor.http.contentType import io.ktor.serialization.kotlinx.json.json +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.Job import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.coroutineScope @@ -32,7 +35,6 @@ import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.serialization.KSerializer import kotlinx.serialization.json.Json import kotlinx.serialization.json.JsonElement -import kotlinx.serialization.json.JsonObject import org.webrtc.AudioSource import org.webrtc.AudioTrack import org.webrtc.DataChannel @@ -89,10 +91,13 @@ internal class WebRTCClient( private val onIncomingEvent: (JsonElement) -> Unit, private val onTracksUpdated: (Tracks) -> Unit, private val onInputsUpdated: (cam: Boolean, mic: Boolean) -> Unit, + private val onNewIceCandidate: (iceCandidate: IceCandidate) -> Unit, private val context: Context, private val thread: ThreadRef, initialCamMode: CameraMode?, initialMicEnabled: Boolean, + rtviProtocolVersion: String, + iceConfig: IceConfig ) { private val peerConnectionFactory: PeerConnectionFactory private val peerConnection: PeerConnection @@ -110,11 +115,14 @@ internal class WebRTCClient( private val pcId = AtomicReference(null) private var tracks: Tracks? = null + private val dataChannelOpen = CompletableDeferred() companion object { private const val TAG = "WebRTCClient" } + fun getPcId(): String? = pcId.get() + val camMode: CameraMode? get() = enableCam.get() @@ -150,17 +158,31 @@ internal class WebRTCClient( Log.i(TAG, "Creating PeerConnection") - val iceServers = ArrayList() - iceServers.add( - PeerConnection.IceServer.builder("stun:stun.l.google.com:19302").createIceServer() - ) + Log.i(TAG, "ICE config: $iceConfig") + + val iceServers = iceConfig.iceServers.map { + PeerConnection.IceServer + .builder(it.urls) + .apply { + it.username?.let(::setUsername) + it.credential?.let(::setPassword) + } + .createIceServer() + } val rtcConfig = PeerConnection.RTCConfiguration(iceServers) rtcConfig.sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN - val observer = PeerConnectionObserver(onTrackCallback = { transceiver -> - transceiver?.receiver?.track()?.let(::handleRemoteTrack) - }) + val observer = PeerConnectionObserver( + onTrackCallback = { transceiver -> + transceiver?.receiver?.track()?.let(::handleRemoteTrack) + }, + onNewIceCandidateCallback = { candidate -> + thread.runOnThread { + onNewIceCandidate(candidate) + } + } + ) peerConnection = peerConnectionFactory.createPeerConnection(rtcConfig, observer) @@ -195,11 +217,17 @@ internal class WebRTCClient( dataChannelReady.set(true) sendDataMessage( MsgClientToServer.serializer(), - MsgClientToServer( - type = "client-ready", - data = JsonObject(emptyMap()) + MsgClientToServer.ClientReady( + rtviVersion = rtviProtocolVersion, + library = "Pipecat Android Client", + libraryVersion = SMALL_WEBRTC_TRANSPORT_VERSION, + platform = "Android", + platformVersion = Build.VERSION.RELEASE ) ) + if (!dataChannelOpen.isCompleted) { + dataChannelOpen.complete(Unit) + } } } @@ -225,6 +253,11 @@ internal class WebRTCClient( }) } + suspend fun waitForDataChannelOpen() { + if (dataChannel.state() == DataChannel.State.OPEN) return + dataChannelOpen.await() + } + fun getTracks() = tracks ?: EMPTY_TRACKS private fun updateLocalTracks(): Future = thread.runOnThreadReturningFuture { @@ -359,8 +392,6 @@ internal class WebRTCClient( try { setCamMode(null) setMicEnabled(false) - audioTransceiver.dispose() - videoTransceiver.dispose() audioSource?.dispose() videoSource?.dispose() dataChannel.dispose() @@ -432,8 +463,7 @@ internal class WebRTCClient( } suspend fun negotiateConnection( - url: String, - restartPc: Boolean + connectParams: SmallWebRTCTransportConnectParams ) { val job = coroutineScope { launch { @@ -450,16 +480,18 @@ internal class WebRTCClient( } }.use { client -> try { - val response = client.post(url) { + val response = client.post(connectParams.webrtcRequestParams.endpoint) { contentType(ContentType.Application.Json) setBody( OfferRequestBody( sdp = offer.description, type = offer.type.canonicalForm(), pcId = pcId.get(), - restartPc = restartPc + restartPc = false, + requestData = connectParams.webrtcRequestParams.requestData ) ) + connectParams.webrtcRequestParams.headers.forEach(this::header) } response.bodyAsText() @@ -501,7 +533,8 @@ internal class WebRTCClient( } private class PeerConnectionObserver( - private val onTrackCallback: (transceiver: RtpTransceiver?) -> Unit + private val onTrackCallback: (transceiver: RtpTransceiver?) -> Unit, + private val onNewIceCandidateCallback: (iceCandidate: IceCandidate) -> Unit, ) : PeerConnection.Observer { override fun onSignalingChange(signalingState: PeerConnection.SignalingState) { Log.i(TAG, "onSignalingChange: $signalingState") @@ -521,6 +554,7 @@ internal class WebRTCClient( override fun onIceCandidate(iceCandidate: IceCandidate) { Log.i(TAG, "onIceCandidate: $iceCandidate") + onNewIceCandidateCallback(iceCandidate) } override fun onIceCandidatesRemoved(iceCandidates: Array) {