Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move RPC handlers to room #395

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
8 changes: 4 additions & 4 deletions examples/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ async def main():


def register_receiver_methods(greeters_room: rtc.Room, math_genius_room: rtc.Room):
@greeters_room.local_participant.register_rpc_method("arrival")
@greeters_room.register_rpc_method("arrival")
async def arrival_method(
data: RpcInvocationData,
):
print(f'[Greeter] Oh {data.caller_identity} arrived and said "{data.payload}"')
await asyncio.sleep(2)
return "Welcome and have a wonderful day!"

@math_genius_room.local_participant.register_rpc_method("square-root")
@math_genius_room.register_rpc_method("square-root")
async def square_root_method(
data: RpcInvocationData,
):
Expand All @@ -110,7 +110,7 @@ async def square_root_method(
print(f"[Math Genius] Aha! It's {result}")
return json.dumps({"result": result})

@math_genius_room.local_participant.register_rpc_method("divide")
@math_genius_room.register_rpc_method("divide")
async def divide_method(
data: RpcInvocationData,
):
Expand All @@ -122,7 +122,7 @@ async def divide_method(
result = dividend / divisor
return json.dumps({"result": result})

@math_genius_room.local_participant.register_rpc_method("long-calculation")
@math_genius_room.register_rpc_method("long-calculation")
async def long_calculation_method(
data: RpcInvocationData,
):
Expand Down
36 changes: 18 additions & 18 deletions livekit-rtc/livekit/rtc/_proto/rpc_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions livekit-rtc/livekit/rtc/_proto/rpc_pb2.pyi

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

119 changes: 25 additions & 94 deletions livekit-rtc/livekit/rtc/participant.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
from __future__ import annotations

import ctypes
import asyncio
import os
import mimetypes
import aiofiles
from typing import List, Union, Callable, Dict, Awaitable, Optional, Mapping, cast
import weakref
from typing import List, Union, Callable, Dict, Awaitable, Optional, Mapping, TYPE_CHECKING
from abc import abstractmethod, ABC
from deprecated import deprecated

from ._ffi_client import FfiClient, FfiHandle
from ._proto import ffi_pb2 as proto_ffi
Expand All @@ -43,8 +44,6 @@
)
from .transcription import Transcription
from .rpc import RpcError
from ._proto.rpc_pb2 import RpcMethodInvocationResponseRequest
from .log import logger

from .rpc import RpcInvocationData
from .data_stream import (
Expand All @@ -54,6 +53,9 @@
STREAM_CHUNK_SIZE,
)

if TYPE_CHECKING:
from .room import Room


class PublishTrackError(Exception):
def __init__(self, message: str) -> None:
Expand Down Expand Up @@ -151,13 +153,16 @@ def __init__(
self,
room_queue: BroadcastQueue[proto_ffi.FfiEvent],
owned_info: proto_participant.OwnedParticipant,
room: Room,
) -> None:
super().__init__(owned_info)
self._room_queue = room_queue
self._track_publications: dict[str, LocalTrackPublication] = {} # type: ignore
self._rpc_handlers: Dict[
str, Callable[[RpcInvocationData], Union[Awaitable[str], str]]
] = {}
self._room_ref = weakref.ref(room)

@property
def room(self) -> Room | None:
return self._room_ref()

@property
def track_publications(self) -> Mapping[str, LocalTrackPublication]:
Expand Down Expand Up @@ -325,12 +330,15 @@ async def perform_rpc(

return cb.perform_rpc.payload

@deprecated(reason="Use room.register_rpc_method instead.")
def register_rpc_method(
self,
method_name: str,
handler: Optional[Callable[[RpcInvocationData], Union[Awaitable[str], str]]] = None,
) -> Union[None, Callable]:
"""
Deprecated

Establishes the participant as a receiver for calls of the specified RPC method.
Can be used either as a decorator or a regular method.

Expand Down Expand Up @@ -365,35 +373,24 @@ async def greet_handler(data: RpcInvocationData) -> str:

room.local_participant.register_rpc_method('greet', greet_handler)
"""
room = self.room
if room is not None:
return room.register_rpc_method(method_name, handler)
return None

def register(handler_func):
self._rpc_handlers[method_name] = handler_func
req = proto_ffi.FfiRequest()
req.register_rpc_method.local_participant_handle = self._ffi_handle.handle
req.register_rpc_method.method = method_name
FfiClient.instance.request(req)

if handler is not None:
register(handler)
return None
else:
# Called as a decorator
return register

@deprecated(reason="Use room.unregister_rpc_method instead.")
def unregister_rpc_method(self, method: str) -> None:
"""
Deprecated

Unregisters a previously registered RPC method.

Args:
method (str): The name of the RPC method to unregister
"""
self._rpc_handlers.pop(method, None)

req = proto_ffi.FfiRequest()
req.unregister_rpc_method.local_participant_handle = self._ffi_handle.handle
req.unregister_rpc_method.method = method

FfiClient.instance.request(req)
room = self.room
if room is not None:
room.unregister_rpc_method(method)

def set_track_subscription_permissions(
self,
Expand All @@ -417,72 +414,6 @@ def set_track_subscription_permissions(
req.set_track_subscription_permissions.permissions.extend(participant_permissions)
FfiClient.instance.request(req)

async def _handle_rpc_method_invocation(
self,
invocation_id: int,
method: str,
request_id: str,
caller_identity: str,
payload: str,
response_timeout: float,
) -> None:
response_error: Optional[RpcError] = None
response_payload: Optional[str] = None

params = RpcInvocationData(request_id, caller_identity, payload, response_timeout)

handler = self._rpc_handlers.get(method)

if not handler:
response_error = RpcError._built_in(RpcError.ErrorCode.UNSUPPORTED_METHOD)
else:
try:
if asyncio.iscoroutinefunction(handler):
async_handler = cast(Callable[[RpcInvocationData], Awaitable[str]], handler)

async def run_handler():
try:
return await async_handler(params)
except asyncio.CancelledError:
# This will be caught by the outer try-except if it's due to timeout
raise

try:
response_payload = await asyncio.wait_for(
run_handler(), timeout=response_timeout
)
except asyncio.TimeoutError:
raise RpcError._built_in(RpcError.ErrorCode.RESPONSE_TIMEOUT)
except asyncio.CancelledError:
raise RpcError._built_in(RpcError.ErrorCode.RECIPIENT_DISCONNECTED)
else:
sync_handler = cast(Callable[[RpcInvocationData], str], handler)
response_payload = sync_handler(params)
except RpcError as error:
response_error = error
except Exception as error:
logger.exception(
f"Uncaught error returned by RPC handler for {method}. "
"Returning APPLICATION_ERROR instead. "
f"Original error: {error}"
)
response_error = RpcError._built_in(RpcError.ErrorCode.APPLICATION_ERROR)

req = proto_ffi.FfiRequest(
rpc_method_invocation_response=RpcMethodInvocationResponseRequest(
local_participant_handle=self._ffi_handle.handle,
invocation_id=invocation_id,
error=response_error._to_proto() if response_error else None,
payload=response_payload,
)
)

res = FfiClient.instance.request(req)

if res.rpc_method_invocation_response.error:
message = res.rpc_method_invocation_response.error
logger.exception(f"error sending rpc method invocation response: {message}")

async def set_metadata(self, metadata: str) -> None:
"""
Set the metadata for the local participant.
Expand Down
Loading
Loading