-
Notifications
You must be signed in to change notification settings - Fork 66
/
Copy pathrpc.py
277 lines (232 loc) · 9.95 KB
/
rpc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
from livekit import rtc, api
import os
import json
import asyncio
from dotenv import load_dotenv
from livekit.rtc.rpc import RpcInvocationData
load_dotenv(dotenv_path=".env.local", override=False)
LIVEKIT_API_KEY = os.getenv("LIVEKIT_API_KEY")
LIVEKIT_API_SECRET = os.getenv("LIVEKIT_API_SECRET")
LIVEKIT_URL = os.getenv("LIVEKIT_URL")
if not LIVEKIT_API_KEY or not LIVEKIT_API_SECRET or not LIVEKIT_URL:
raise ValueError("Missing required environment variables. Please check your .env.local file.")
async def main():
rooms = [] # Keep track of all rooms for cleanup
try:
room_name = f"rpc-test-{os.urandom(4).hex()}"
print(f"Connecting participants to room: {room_name}")
callers_room, greeters_room, math_genius_room = await asyncio.gather(
connect_participant("caller", room_name),
connect_participant("greeter", room_name),
connect_participant("math-genius", room_name),
)
rooms = [callers_room, greeters_room, math_genius_room]
register_receiver_methods(greeters_room, math_genius_room)
try:
print("\n\nRunning greeting example...")
await asyncio.gather(perform_greeting(callers_room))
except Exception as error:
print("Error:", error)
try:
print("\n\nRunning error handling example...")
await perform_divide(callers_room)
except Exception as error:
print("Error:", error)
try:
print("\n\nRunning math example...")
await perform_square_root(callers_room)
await asyncio.sleep(2)
await perform_quantum_hypergeometric_series(callers_room)
except Exception as error:
print("Error:", error)
try:
print("\n\nRunning long calculation with timeout...")
await asyncio.create_task(perform_long_calculation(callers_room))
except Exception as error:
print("Error:", error)
try:
print("\n\nRunning long calculation with disconnect...")
# Start the long calculation
long_calc_task = asyncio.create_task(perform_long_calculation(callers_room))
# Wait a bit then disconnect the math genius
await asyncio.sleep(5)
print("\nDisconnecting math genius early...")
await math_genius_room.disconnect()
# Wait for the calculation to fail
await long_calc_task
except Exception as error:
print("Error:", error)
print("\n\nParticipants done, disconnecting remaining participants...")
await callers_room.disconnect()
await greeters_room.disconnect()
print("Participants disconnected. Example completed.")
except KeyboardInterrupt:
print("\nReceived interrupt signal, cleaning up...")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
# Clean up all rooms
print("Disconnecting all participants...")
await asyncio.gather(*(room.disconnect() for room in rooms), return_exceptions=True)
print("Cleanup complete")
def register_receiver_methods(greeters_room: rtc.Room, math_genius_room: rtc.Room):
@greeters_room.register_rpc_method("arrival")
async def arrival_method(
data: RpcInvocationData,
):
print(f'[Greeter] Oh {data.caller_identity} arrived and said "{data.payload}"')
await asyncio.sleep(2)
return "Welcome and have a wonderful day!"
@math_genius_room.register_rpc_method("square-root")
async def square_root_method(
data: RpcInvocationData,
):
json_data = json.loads(data.payload)
number = json_data["number"]
print(
f"[Math Genius] I guess {data.caller_identity} wants the square root of {number}. I've only got {data.response_timeout} seconds to respond but I think I can pull it off."
)
print("[Math Genius] *doing math*…")
await asyncio.sleep(2)
result = number**0.5
print(f"[Math Genius] Aha! It's {result}")
return json.dumps({"result": result})
@math_genius_room.register_rpc_method("divide")
async def divide_method(
data: RpcInvocationData,
):
json_data = json.loads(data.payload)
dividend = json_data["dividend"]
divisor = json_data["divisor"]
print(f"[Math Genius] {data.caller_identity} wants to divide {dividend} by {divisor}.")
result = dividend / divisor
return json.dumps({"result": result})
@math_genius_room.register_rpc_method("long-calculation")
async def long_calculation_method(
data: RpcInvocationData,
):
print(f"[Math Genius] Starting a very long calculation for {data.caller_identity}")
print(
f"[Math Genius] This will take 30 seconds even though you're only giving me {data.response_timeout} seconds"
)
await asyncio.sleep(30)
return json.dumps({"result": "Calculation complete!"})
async def perform_greeting(room: rtc.Room):
print("[Caller] Letting the greeter know that I've arrived")
try:
response = await room.local_participant.perform_rpc(
destination_identity="greeter", method="arrival", payload="Hello"
)
print(f'[Caller] That\'s nice, the greeter said: "{response}"')
except Exception as error:
print(f"[Caller] RPC call failed: {error}")
raise
async def perform_square_root(room: rtc.Room):
print("[Caller] What's the square root of 16?")
try:
response = await room.local_participant.perform_rpc(
destination_identity="math-genius",
method="square-root",
payload=json.dumps({"number": 16}),
)
parsed_response = json.loads(response)
print(f"[Caller] Nice, the answer was {parsed_response['result']}")
except Exception as error:
print(f"[Caller] RPC call failed: {error}")
raise
async def perform_quantum_hypergeometric_series(room: rtc.Room):
print("[Caller] What's the quantum hypergeometric series of 42?")
try:
response = await room.local_participant.perform_rpc(
destination_identity="math-genius",
method="quantum-hypergeometric-series",
payload=json.dumps({"number": 42}),
)
parsed_response = json.loads(response)
print(f"[Caller] genius says {parsed_response['result']}!")
except rtc.RpcError as error:
if error.code == rtc.RpcError.ErrorCode.UNSUPPORTED_METHOD:
print("[Caller] Aww looks like the genius doesn't know that one.")
return
print("[Caller] Unexpected error:", error)
raise
except Exception as error:
print("[Caller] Unexpected error:", error)
raise
async def perform_divide(room: rtc.Room):
print("[Caller] Let's divide 10 by 0.")
try:
response = await room.local_participant.perform_rpc(
destination_identity="math-genius",
method="divide",
payload=json.dumps({"dividend": 10, "divisor": 0}),
)
parsed_response = json.loads(response)
print(f"[Caller] The result is {parsed_response['result']}")
except rtc.RpcError as error:
if error.code == rtc.RpcError.ErrorCode.APPLICATION_ERROR:
print("[Caller] Aww something went wrong with that one, lets try something else.")
else:
print(f"[Caller] RPC call failed with unexpected RpcError: {error}")
except Exception as error:
print(f"[Caller] RPC call failed with unexpected error: {error}")
async def perform_long_calculation(room: rtc.Room):
print("[Caller] Giving the math genius 10s to complete a long calculation")
try:
response = await room.local_participant.perform_rpc(
destination_identity="math-genius",
method="long-calculation",
payload=json.dumps({}),
response_timeout=10,
)
parsed_response = json.loads(response)
print(f"[Caller] Result: {parsed_response['result']}")
except rtc.RpcError as error:
if error.code == rtc.RpcError.ErrorCode.RESPONSE_TIMEOUT:
print("[Caller] Math genius took too long to respond")
elif error.code == rtc.RpcError.ErrorCode.RECIPIENT_DISCONNECTED:
print("[Caller] Math genius disconnected before response was received")
else:
print(f"[Caller] Unexpected RPC error: {error}")
except Exception as error:
print(f"[Caller] Unexpected error: {error}")
def create_token(identity: str, room_name: str):
token = (
api.AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET)
.with_identity(identity)
.with_grants(
api.VideoGrants(
room=room_name,
room_join=True,
can_publish=True,
can_subscribe=True,
)
)
)
return token.to_jwt()
async def connect_participant(identity: str, room_name: str) -> rtc.Room:
room = rtc.Room()
token = create_token(identity, room_name)
def on_disconnected(reason: str):
print(f"[{identity}] Disconnected from room: {reason}")
room.on("disconnected", on_disconnected)
await room.connect(LIVEKIT_URL, token)
async def wait_for_participants():
if room.remote_participants:
return
participant_connected = asyncio.Event()
def _on_participant_connected(participant: rtc.RemoteParticipant):
room.off("participant_connected", _on_participant_connected)
participant_connected.set()
room.on("participant_connected", _on_participant_connected)
await participant_connected.wait()
try:
await asyncio.wait_for(wait_for_participants(), timeout=5.0)
except asyncio.TimeoutError:
raise TimeoutError("Timed out waiting for participants")
return room
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nProgram terminated by user")