Skip to content

Commit bf2be56

Browse files
authored
Merge pull request #194 from awslabs/update-streaming-processing-python
Update streaming processing python
2 parents 357f95b + 51e0123 commit bf2be56

25 files changed

+2390
-387
lines changed

docs/src/content/docs/cookbook/examples/ecommerce-support-simulator.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ For production usage, it is crucial to implement proper security measures to pro
221221

222222
- [Multi-Agent Orchestrator Documentation](https://github.com/awslabs/multi-agent-orchestrator)
223223
- [AWS AppSync Documentation](https://docs.aws.amazon.com/appsync)
224-
- [Claude API Documentation](https://docs.anthropic.com/claude/reference)
224+
- [Claude API Documentation](https://docs.anthropic.com/en/api/getting-started)
225225

226226

227227
Ready to build your own multi-agent chat application? Check out the complete [source code](https://github.com/awslabs/multi-agent-orchestrator/tree/main/examples/ecommerce-support-simulator) in our GitHub repository.

docs/src/content/docs/orchestrator/overview.mdx

+6-4
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ The MultiAgentOrchestrator provides several key functions to manage agents, proc
172172
2. get_default_agent() -> Agent
173173
3. set_default_agent(agent: Agent) -> None
174174
4. get_all_agents() -> Dict[str, Dict[str, str]]
175-
5. route_request(user_input: str, user_id: str, session_id: str, additional_params: Dict[str, str] = {}) -> AgentResponse
175+
5. route_request(user_input: str, user_id: str, session_id: str, additional_params: Dict[str, str] = {}, stream_response: bool | None = False) -> AgentResponse
176176
```
177177
</TabItem>
178178
</Tabs>
@@ -268,7 +268,7 @@ Here are practical examples of how to use each function:
268268
<TabItem label="Python" icon="seti:python">
269269
```python
270270
from multi_agent_orchestrator.orchestrator import MultiAgentOrchestrator
271-
from multi_agent_orchestrator.agents import BedrockLLMAgent, BedrockLLMAgentOptions
271+
from multi_agent_orchestrator.agents import BedrockLLMAgent, BedrockLLMAgentOptions, AgentStreamResponse
272272
from multi_agent_orchestrator.classifiers import AnthropicClassifier, AnthropicClassifierOptions
273273
import asyncio
274274
orchestrator = MultiAgentOrchestrator()
@@ -305,12 +305,14 @@ async def handle_user_query():
305305
"How do I optimize a Python script?",
306306
"user123",
307307
"session456",
308-
{"priority": "high"} # Additional parameters
308+
{"priority": "high"} # Additional parameters,
309+
True,
309310
)
310311

311312
if response.streaming:
312313
async for chunk in response.output:
313-
print(chunk, end='', flush=True)
314+
if isinstance(chunk, AgentStreamResponse):
315+
print(chunk.text, end='', flush=True)
314316
else:
315317
print(response.output)
316318

examples/fast-api-streaming/main.py

+16-73
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,17 @@
1-
import asyncio
2-
from typing import AsyncIterable
3-
41
from fastapi import FastAPI
52
from fastapi.middleware.cors import CORSMiddleware
63
from fastapi.responses import StreamingResponse
74
from pydantic import BaseModel
8-
from multi_agent_orchestrator.orchestrator import MultiAgentOrchestrator, AgentResponse, OrchestratorConfig
9-
from multi_agent_orchestrator.types import ConversationMessage
5+
from multi_agent_orchestrator.orchestrator import MultiAgentOrchestrator, OrchestratorConfig
106
from multi_agent_orchestrator.agents import (
117
BedrockLLMAgent,
12-
AgentResponse,
13-
AgentCallbacks,
148
BedrockLLMAgentOptions,
9+
AgentStreamResponse,
1510
)
1611

17-
from multi_agent_orchestrator.classifiers import BedrockClassifier, BedrockClassifierOptions, AnthropicClassifier, AnthropicClassifierOptions
18-
19-
from typing import Dict, List, Any
12+
from multi_agent_orchestrator.classifiers import BedrockClassifier, BedrockClassifierOptions
2013

21-
import uuid
22-
import asyncio
23-
import argparse
24-
from queue import Queue, Empty
25-
from threading import Thread
14+
orchestrator = None
2615

2716
app = FastAPI()
2817
app.add_middleware(
@@ -38,25 +27,7 @@ class Body(BaseModel):
3827
user_id: str
3928
session_id: str
4029

41-
class MyCustomHandler(AgentCallbacks):
42-
def __init__(self, queue) -> None:
43-
super().__init__()
44-
self._queue = queue
45-
self._stop_signal = None
46-
print("Custom handler Initialized")
47-
48-
def on_llm_new_token(self, token: str, **kwargs) -> None:
49-
self._queue.put_nowait(token)
50-
51-
def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any) -> None:
52-
print("generation started")
53-
54-
def on_llm_end(self, response: Any, **kwargs: Any) -> None:
55-
print("\n\ngeneration concluded")
56-
57-
self._queue.put_nowait(self._stop_signal)
58-
59-
def setup_orchestrator(streamer_queue):
30+
def setup_orchestrator():
6031
# Initialize the orchestrator
6132
orchestrator = MultiAgentOrchestrator(options=OrchestratorConfig(
6233
LOG_AGENT_CHAT=True,
@@ -72,22 +43,18 @@ def setup_orchestrator(streamer_queue):
7243
classifier = BedrockClassifier(BedrockClassifierOptions())
7344
)
7445

75-
my_handler = MyCustomHandler(streamer_queue)
76-
7746
tech_agent = BedrockLLMAgent(BedrockLLMAgentOptions(
7847
name="Tech agent",
7948
streaming=True,
8049
description="Expert in Technology and AWS services",
8150
save_chat=False,
82-
callbacks=my_handler
8351
))
8452

8553
health = BedrockLLMAgent(BedrockLLMAgentOptions(
8654
name="Health agent",
8755
streaming=True,
8856
description="Expert health",
8957
save_chat=False,
90-
callbacks=my_handler
9158
))
9259

9360
orchestrator.add_agent(tech_agent)
@@ -96,43 +63,19 @@ def setup_orchestrator(streamer_queue):
9663
return orchestrator
9764

9865

99-
async def start_generation(query, user_id, session_id, streamer_queue):
100-
try:
101-
# Create a new orchestrator for this query
102-
orchestrator = setup_orchestrator(streamer_queue)
66+
async def response_generator(query, user_id, session_id):
10367

104-
response = await orchestrator.route_request(query, user_id, session_id)
105-
if isinstance(response, AgentResponse) and response.streaming is False:
106-
if isinstance(response.output, str):
107-
streamer_queue.put_nowait(response.output)
108-
elif isinstance(response.output, ConversationMessage):
109-
streamer_queue.put_nowait(response.output.content[0].get('text'))
110-
except Exception as e:
111-
print(f"Error in start_generation: {e}")
112-
finally:
113-
streamer_queue.put_nowait(None) # Signal the end of the response
68+
response = await orchestrator.route_request(query, user_id, session_id, None, True)
69+
70+
if response.streaming:
71+
async for chunk in response.output:
72+
if isinstance(chunk, AgentStreamResponse):
73+
yield chunk.text
11474

115-
async def response_generator(query, user_id, session_id):
116-
streamer_queue = asyncio.Queue()
117-
118-
# Start the generation process in a separate thread
119-
Thread(target=lambda: asyncio.run(start_generation(query, user_id, session_id, streamer_queue))).start()
120-
121-
print("Waiting for the response...")
122-
while True:
123-
try:
124-
try:
125-
value = streamer_queue.get_nowait() # or q.get_nowait()
126-
if value is None:
127-
break
128-
yield value
129-
streamer_queue.task_done()
130-
except asyncio.QueueEmpty:
131-
pass
132-
except Exception as e:
133-
print(f"Error in response_generator: {str(e)}")
134-
break
13575

13676
@app.post("/stream_chat/")
13777
async def stream_chat(body: Body):
138-
return StreamingResponse(response_generator(body.content, body.user_id, body.session_id), media_type="text/event-stream")
78+
return StreamingResponse(response_generator(body.content, body.user_id, body.session_id), media_type="text/event-stream")
79+
80+
81+
orchestrator = setup_orchestrator()

examples/python-demo/main-stream.py

+167
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
2+
import uuid
3+
import asyncio
4+
from typing import Optional, Any
5+
import json
6+
import sys
7+
8+
from tools import weather_tool
9+
10+
from multi_agent_orchestrator.orchestrator import MultiAgentOrchestrator, OrchestratorConfig
11+
from multi_agent_orchestrator.agents import (BedrockLLMAgent,
12+
BedrockLLMAgentOptions,
13+
AgentResponse,
14+
AgentStreamResponse,
15+
AgentCallbacks)
16+
from multi_agent_orchestrator.types import ConversationMessage, ParticipantRole
17+
from multi_agent_orchestrator.utils import AgentTools
18+
19+
class LLMAgentCallbacks(AgentCallbacks):
20+
def on_llm_new_token(self, token: str) -> None:
21+
print(token, end='', flush=True)
22+
23+
24+
async def handle_request(_orchestrator: MultiAgentOrchestrator, _user_input:str, _user_id:str, _session_id:str):
25+
stream_response = True
26+
response:AgentResponse = await _orchestrator.route_request(_user_input, _user_id, _session_id, {}, stream_response)
27+
28+
# Print metadata
29+
print("\nMetadata:")
30+
print(f"Selected Agent: {response.metadata.agent_name}")
31+
if stream_response and response.streaming:
32+
async for chunk in response.output:
33+
if isinstance(chunk, AgentStreamResponse):
34+
if response.streaming:
35+
print(chunk.text, end='', flush=True)
36+
else:
37+
if isinstance(response.output, ConversationMessage):
38+
print(response.output.content[0]['text'])
39+
elif isinstance(response.output, str):
40+
print(response.output)
41+
else:
42+
print(response.output)
43+
44+
def custom_input_payload_encoder(input_text: str,
45+
chat_history: list[Any],
46+
user_id: str,
47+
session_id: str,
48+
additional_params: Optional[dict[str, str]] = None) -> str:
49+
return json.dumps({
50+
'hello':'world'
51+
})
52+
53+
def custom_output_payload_decoder(response: dict[str, Any]) -> Any:
54+
decoded_response = json.loads(
55+
json.loads(
56+
response['Payload'].read().decode('utf-8')
57+
)['body'])['response']
58+
return ConversationMessage(
59+
role=ParticipantRole.ASSISTANT.value,
60+
content=[{'text': decoded_response}]
61+
)
62+
63+
if __name__ == "__main__":
64+
65+
# Initialize the orchestrator with some options
66+
orchestrator = MultiAgentOrchestrator(options=OrchestratorConfig(
67+
LOG_AGENT_CHAT=True,
68+
LOG_CLASSIFIER_CHAT=True,
69+
LOG_CLASSIFIER_RAW_OUTPUT=True,
70+
LOG_CLASSIFIER_OUTPUT=True,
71+
LOG_EXECUTION_TIMES=True,
72+
MAX_RETRIES=3,
73+
USE_DEFAULT_AGENT_IF_NONE_IDENTIFIED=True,
74+
MAX_MESSAGE_PAIRS_PER_AGENT=10,
75+
))
76+
77+
# Add some agents
78+
tech_agent = BedrockLLMAgent(BedrockLLMAgentOptions(
79+
name="Tech Agent",
80+
streaming=True,
81+
description="Specializes in technology areas including software development, hardware, AI, \
82+
cybersecurity, blockchain, cloud computing, emerging tech innovations, and pricing/costs \
83+
related to technology products and services.",
84+
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
85+
# callbacks=LLMAgentCallbacks()
86+
))
87+
orchestrator.add_agent(tech_agent)
88+
89+
# Add some agents
90+
tech_agent = BedrockLLMAgent(BedrockLLMAgentOptions(
91+
name="Health Agent",
92+
streaming=False,
93+
description="Specializes in health and well being.",
94+
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
95+
))
96+
orchestrator.add_agent(tech_agent)
97+
98+
# Add a Anthropic weather agent with a tool in anthropic's tool format
99+
# weather_agent = AnthropicAgent(AnthropicAgentOptions(
100+
# api_key='api-key',
101+
# name="Weather Agent",
102+
# streaming=False,
103+
# description="Specialized agent for giving weather condition from a city.",
104+
# tool_config={
105+
# 'tool': [tool.to_claude_format() for tool in weather_tool.weather_tools.tools],
106+
# 'toolMaxRecursions': 5,
107+
# 'useToolHandler': weather_tool.anthropic_weather_tool_handler
108+
# },
109+
# callbacks=LLMAgentCallbacks()
110+
# ))
111+
112+
# Add an Anthropic weather agent with Tools class
113+
# weather_agent = AnthropicAgent(AnthropicAgentOptions(
114+
# api_key='api-key',
115+
# name="Weather Agent",
116+
# streaming=True,
117+
# description="Specialized agent for giving weather condition from a city.",
118+
# tool_config={
119+
# 'tool': weather_tool.weather_tools,
120+
# 'toolMaxRecursions': 5,
121+
# },
122+
# callbacks=LLMAgentCallbacks()
123+
# ))
124+
125+
# Add a Bedrock weather agent with Tools class
126+
# weather_agent = BedrockLLMAgent(BedrockLLMAgentOptions(
127+
# name="Weather Agent",
128+
# streaming=False,
129+
# description="Specialized agent for giving weather condition from a city.",
130+
# tool_config={
131+
# 'tool': weather_tool.weather_tools,
132+
# 'toolMaxRecursions': 5,
133+
# },
134+
# callbacks=LLMAgentCallbacks(),
135+
# ))
136+
137+
# Add a Bedrock weather agent with custom handler and bedrock's tool format
138+
weather_agent = BedrockLLMAgent(BedrockLLMAgentOptions(
139+
name="Weather Agent",
140+
streaming=False,
141+
description="Specialized agent for giving weather condition from a city.",
142+
tool_config={
143+
'tool': [tool.to_bedrock_format() for tool in weather_tool.weather_tools.tools],
144+
'toolMaxRecursions': 5,
145+
'useToolHandler': weather_tool.bedrock_weather_tool_handler
146+
}
147+
))
148+
149+
150+
weather_agent.set_system_prompt(weather_tool.weather_tool_prompt)
151+
orchestrator.add_agent(weather_agent)
152+
153+
USER_ID = "user123"
154+
SESSION_ID = str(uuid.uuid4())
155+
156+
print("Welcome to the interactive Multi-Agent system. Type 'quit' to exit.")
157+
158+
while True:
159+
# Get user input
160+
user_input = input("\nYou: ").strip()
161+
162+
if user_input.lower() == 'quit':
163+
print("Exiting the program. Goodbye!")
164+
sys.exit()
165+
166+
# Run the async function
167+
asyncio.run(handle_request(orchestrator, user_input, USER_ID, SESSION_ID))

python/src/multi_agent_orchestrator/agents/__init__.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""
22
Code for Agents.
33
"""
4-
from .agent import Agent, AgentOptions, AgentCallbacks, AgentProcessingResult, AgentResponse
4+
from .agent import Agent, AgentOptions, AgentCallbacks, AgentProcessingResult, AgentResponse, AgentStreamResponse
55

66

77
try:
@@ -32,16 +32,16 @@
3232

3333
from .supervisor_agent import SupervisorAgent, SupervisorAgentOptions
3434

35-
3635
__all__ = [
3736
'Agent',
3837
'AgentOptions',
3938
'AgentCallbacks',
4039
'AgentProcessingResult',
4140
'AgentResponse',
41+
'AgentStreamResponse',
4242
'SupervisorAgent',
4343
'SupervisorAgentOptions'
44-
]
44+
]
4545

4646

4747
if _AWS_AVAILABLE :

0 commit comments

Comments
 (0)