Skip to content

Commit 41f45cc

Browse files
committed
agent: structure to test
Signed-off-by: vsoch <[email protected]>
1 parent 4c7f7a6 commit 41f45cc

File tree

9 files changed

+433
-283
lines changed

9 files changed

+433
-283
lines changed

README.md

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,16 @@ Tools to add:
4646
- build
4747
- docker
4848

49+
### Environment
50+
51+
The following variables can be set in the environment.
52+
53+
| Name | Description | Default |
54+
|-------|------------|---------------|
55+
| `FRACTALE_MCP_PORT` | Port to run MCP server on, if using http variant | 8089 |
56+
| `FRACTALE_MCP_TOKEN` | Token to use for testing | unset |
57+
| `FRACTALE_LLM_PROVIDER` | LLM Backend to use (gemini, openai, llama) | gemini |
58+
4959
### Testing
5060

5161
Start the server in one terminal. Export `FRACTALE_MCP_TOKEN` if you want to require simple token auth. Here is for http.
@@ -78,17 +88,21 @@ export FRACTALE_MCP_TOKEN=dude
7888
# In one terminal (start MCP)
7989
fractale start -t http --port 8089
8090

91+
# Define the model (provider and endpoints) to use.
92+
export FRACTALE_LLM_PROVIDER=openai
93+
export OPENAI_API_KEY=xxxxxxxxxxxxxxxx
94+
export OPENAI_BASE_URL=https://my.custom.url/v1
95+
8196
# In the other, run the plan
8297
fractale agent ./examples/plans/docker-build-lammps.yaml
8398
```
8499

85-
86100
- `manager` agents know how to orchestrate step agents and choose between them (don't hold state, but could)
87101
- `step` agents are experts on doing specific tasks. This originally was an agent with specific functions to do something (e.g., docker build) and now is a generic MCP agent with a prompt that gives it context and a goal.
88102

89103
The initial design of `helper` agents from the first fractale is subsumed by the idea of an MCP function. A helper agent _is_ an MCP tool.
90104

91-
The design is simple in that each agent is responding to state of error vs. success. In the [first version](https://github.com/compspec/fractale) of our library, agents formed a custom graph. In this variant, we refactor to use MCP server tools. It has the same top level design with a manager, but each step agent is like a small state machine governmed by an LLM with access to MCP tools and resources.
105+
The design is simple in that each agent is responding to state of error vs. success. In the [first version](https://github.com/compspec/fractale) of our library, agents formed a custom graph. In this variant, we refactor to use MCP server tools. It has the same top level design with a manager, but each step agent is like a small state machine governed by an LLM with access to MCP tools and resources.
92106

93107
See [examples/agent](examples/agent) for an example, along with observations, research questions, ideas, and experiment brainstorming!
94108

examples/plans/docker-build-lammps.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ plan:
44
- name: "build"
55
prompt: "docker-build-persona"
66
inputs:
7-
description: "LAMMPS (Large-scale Atomic/Molecular Massively Parallel Simulator)"
7+
application: "LAMMPS (Large-scale Atomic/Molecular Massively Parallel Simulator)"
88
container: "ghcr.io/hpc-lab/lammps:cpu-latest"
99
environment: "Rocky Linux 9, CPU Only"
1010

1111
# - name: "deploy"
1212
# prompt: "k8s-deploy-persona"
1313
# inputs:
14-
# replicas: 4
14+
# replicas: 4

fractale/agent/agent.py

Lines changed: 129 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from fastmcp.client.transports import StreamableHttpTransport
77

88
import fractale.agent.backends as backends
9+
import fractale.agent.defaults as defaults
910
import fractale.agent.logger as logger
1011
from fractale.agent.base import Agent
1112

@@ -17,7 +18,7 @@ class MCPAgent(Agent):
1718

1819
def init(self):
1920
# 1. Setup MCP Client
20-
port = os.environ.get("FRACTALE_MCP_PORT", "8089")
21+
port = os.environ.get("FRACTALE_MCP_PORT", defaults.mcp_port)
2122
token = os.environ.get("FRACTALE_MCP_TOKEN")
2223
url = f"http://localhost:{port}/mcp"
2324

@@ -27,8 +28,18 @@ def init(self):
2728
else:
2829
self.client = Client(url)
2930

30-
# 2. Select Backend based on Config/Env
31-
provider = os.environ.get("FRACTALE_LLM_PROVIDER", "gemini").lower()
31+
# Initialize the provider. We will do this for each step.
32+
self.init_provider()
33+
34+
def init_provider(self):
35+
"""
36+
Initialize the provider.
37+
"""
38+
# select Backend based on Config/Env first, then cached version
39+
provider = self._provider or os.environ.get("FRACTALE_LLM_PROVIDER", "gemini").lower()
40+
self._provider = provider
41+
42+
# Other envars come from provider backend
3243
if provider in backends.BACKENDS:
3344
self.backend = backends.BACKENDS[provider]()
3445
else:
@@ -52,79 +63,118 @@ async def get_tools_list(self):
5263
tools = await self.client.list_tools()
5364
return tools
5465

55-
async def execute_mission_async(self, prompt_text: str):
66+
async def execute(self, context, step):
5667
"""
57-
The Async Loop: Think -> Act -> Observe -> Think
68+
The Async Loop that will start with a prompt name, retrieve it,
69+
and then respond to it until the state is successful.
5870
"""
5971
start_time = time.perf_counter()
6072

61-
# 1. Connect & Discover Tools
73+
# We keep the client connection open for the duration of the step
6274
async with self.client:
63-
mcp_tools = await self.client.list_tools()
6475

65-
# 2. Initialize Backend with these tools
76+
# These are tools available to agent
77+
# TODO need to filter these to be agent specific?
78+
mcp_tools = await self.client.list_tools()
6679
await self.backend.initialize(mcp_tools)
6780

68-
# 3. Initial Prompt
69-
# 'response_text' is what the LLM says to the user
70-
# 'calls' is a list of tools it wants to run
71-
response_text, calls = await self.backend.generate_response(prompt=prompt_text)
72-
73-
max_loops = 15
74-
loops = 0
75-
76-
while loops < max_loops:
77-
loops += 1
81+
# Get prompt to give goal/task/personality to agent
82+
args = getattr(context, "data", context)
7883

79-
# If there are tool calls, we MUST execute them and feed back results
80-
if calls:
81-
tool_outputs = []
84+
# This partitions inputs, adding inputs from the step and separating
85+
# those from extra
86+
args, extra = step.partition_inputs(args)
87+
instruction = await self.fetch_persona(step.prompt, args)
88+
# TODO STOPPED HERE should we add "extra" to context?
89+
print("INSTRUCTION")
90+
print(instruction)
91+
print("EXTRA")
92+
print(extra)
8293

83-
for call in calls:
84-
t_name = call["name"]
85-
t_args = call["args"]
86-
t_id = call.get("id") # Needed for OpenAI
94+
# Run the loop up to some max attempts (internal state machine with MCP tools)
95+
max_loops = context.get("max_loops", 15)
96+
response_text = await self.run_llm_loop(instruction, max_loops)
8797

88-
logger.info(f"🛠️ Tool Call: {t_name} {t_args}")
89-
90-
# --- EXECUTE TOOL ---
91-
try:
92-
result = await self.client.call_tool(t_name, t_args)
93-
# Handle FastMCP result object
94-
output_str = (
95-
result.content[0].text
96-
if hasattr(result, "content")
97-
else str(result)
98-
)
99-
except Exception as e:
100-
output_str = f"Error: {str(e)}"
101-
102-
# Record Metadata (Your Requirement)
103-
self._record_step(t_name, t_args, output_str)
98+
self.record_usage(time.perf_counter() - start_time)
99+
return response_text
104100

105-
tool_outputs.append({"name": t_name, "content": output_str, "id": t_id})
101+
async def run_llm_loop(self, instruction: str, max_loops: int) -> str:
102+
"""
103+
Process -> Tool -> Process loop.
104+
We need to return on some state of success or ultimate failure.
105+
"""
106+
# Initial response to first prompt
107+
response_text, calls = await self.backend.generate_response(prompt=instruction)
108+
109+
loops = 0
110+
while loops < max_loops:
111+
loops += 1
112+
113+
# If no tools called, we are done
114+
if not calls:
115+
break
116+
117+
# Execute all requested tools
118+
tool_outputs = []
119+
for call in calls:
120+
t_name = call["name"]
121+
t_args = call["args"]
122+
t_id = call.get("id")
123+
logger.info(f"🛠️ Calling: {t_name}")
124+
125+
try:
126+
# Get result and unpack (FastMCP format)
127+
result = await self.client.call_tool(t_name, t_args)
128+
if hasattr(result, "content") and isinstance(result.content, list):
129+
content = result.content[0].text
130+
else:
131+
content = str(result)
132+
except Exception as e:
133+
content = f"Error executing {t_name}: {str(e)}"
134+
135+
# Record metadata about the step
136+
self.record_step(t_name, t_args, content)
137+
138+
# Save outputs (name, id, and content)
139+
tool_outputs.append({"id": t_id, "name": t_name, "content": content})
140+
141+
# Feed results back to backend with history.
142+
response_text, calls = await self.backend.generate_response(tool_outputs=tool_outputs)
143+
if not calls:
144+
logger.info("🎢 Agent has not requested new calls, ending loop.")
145+
146+
# When we get here, we either have no calls, or we reached max attempts.
147+
return response_text
106148

107-
# --- FEEDBACK LOOP ---
108-
# We pass the outputs back to the backend.
109-
# It returns the NEXT thought.
110-
response_text, calls = await self.backend.generate_response(
111-
tool_outputs=tool_outputs
112-
)
149+
async def fetch_persona(self, prompt_name: str, arguments: dict) -> str:
150+
"""
151+
Asks the MCP Server to render the prompt template.
113152
153+
This is akin to rendering or fetching the person. E.g., "You are X and
154+
here are your instructions for a task."
155+
"""
156+
logger.info(f"📥 Bootstrapping Persona: {prompt_name}")
157+
try:
158+
prompt_result = await self.client.get_prompt(name=prompt_name, arguments=arguments)
159+
# MCP Prompts return a list of messages (User/Assistant/Text).
160+
# We squash them into a single string for the instruction.
161+
msgs = []
162+
for m in prompt_result.messages:
163+
if hasattr(m.content, "text"):
164+
msgs.append(m.content.text)
114165
else:
115-
# No tool calls? The LLM is done thinking.
116-
break
117-
118-
end_time = time.perf_counter()
166+
msgs.append(str(m.content))
119167

120-
# Save Summary Metadata
121-
self.save_mcp_metadata(end_time - start_time)
168+
return "\n\n".join(msgs)
122169

123-
return response_text
170+
except Exception as e:
171+
raise RuntimeError(f"Failed to load persona '{prompt_name}': {e}")
124172

125-
def _record_step(self, tool, args, output):
126-
if "steps" not in self.metadata:
127-
self.metadata["steps"] = []
173+
def record_step(self, tool, args, output):
174+
"""
175+
Record step metadata.
176+
TODO: refactor this into metadata registry (decorator)
177+
"""
128178
self.metadata["steps"].append(
129179
{
130180
"tool": tool,
@@ -134,33 +184,31 @@ def _record_step(self, tool, args, output):
134184
}
135185
)
136186

137-
def save_mcp_metadata(self, duration):
138-
"""Save token usage from backend."""
139-
usage = self.backend.token_usage
140-
if "llm_usage" not in self.metadata:
141-
self.metadata["llm_usage"] = []
142-
143-
self.metadata["llm_usage"].append(
144-
{
145-
"duration": duration,
146-
"prompt_tokens": usage.get("prompt_tokens", 0),
147-
"completion_tokens": usage.get("completion_tokens", 0),
148-
}
149-
)
150-
151-
def run_step(self, context):
187+
def record_usage(self, duration):
152188
"""
153-
Bridge the sync Base Class to the async implementation.
189+
Record token usage.
190+
TODO: refactor this into metadata registry (decorator)
191+
"""
192+
if hasattr(self.backend, "token_usage"):
193+
usage = self.backend.token_usage
194+
self.metadata["llm_usage"].append(
195+
{
196+
"duration": duration,
197+
"prompt": usage.get("prompt_tokens", 0),
198+
"completion": usage.get("completion_tokens", 0),
199+
}
200+
)
201+
202+
def run_step(self, context, step):
203+
"""
204+
Run step is called from the Agent run (base class)
205+
It's here so we can asyncio.run the thing!
154206
"""
155-
prompt_text = self.get_prompt(context)
156-
157207
try:
158-
# Run the loop
159-
final_result = asyncio.run(self.execute_mission_async(prompt_text))
160-
context["result"] = final_result
208+
final_result = asyncio.run(self.execute(context, step))
209+
context.result = final_result
161210
except Exception as e:
162211
context["error_message"] = str(e)
163212
logger.error(f"Agent failed: {e}")
164-
raise # Or handle gracefully depending on policy
165-
213+
raise e
166214
return context

fractale/agent/backends/openai.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
import os
12
from typing import Any, Dict, List
23

3-
from openai import AsyncOpenAI
4+
from openai import AsyncOpenAI, OpenAI
45

56
from .llm import LLMBackend
67

@@ -11,7 +12,12 @@ class OpenAIBackend(LLMBackend):
1112
"""
1213

1314
def __init__(self, model_name="gpt-4o"):
14-
self.client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
15+
# Needs to be tested if base url is None.
16+
# Switch to async if/when needed. Annoying for development
17+
# self.client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY"), base_url=os.environ.get("OPENAI_BASE_URL"))
18+
self.client = OpenAI(
19+
api_key=os.environ.get("OPENAI_API_KEY"), base_url=os.environ.get("OPENAI_BASE_URL")
20+
)
1521
self.model_name = model_name
1622
self.history = []
1723
self.tools_schema = []

fractale/agent/base.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def __init__(
2121
self.name = name
2222
self.attempts = 0
2323
self.max_attempts = max_attempts
24+
self._provider = None
2425

2526
self.results_dir = results_dir or os.getcwd()
2627
self.save_incremental = save_incremental
@@ -46,28 +47,26 @@ def init_metadata(self):
4647
"llm_usage": [],
4748
}
4849

49-
def run(self, context: Dict[str, Any]) -> Dict[str, Any]:
50+
def run(self, context, step):
5051
"""
5152
Main execution wrapper
5253
"""
5354
# Ensure max_attempts is set
5455
context["max_attempts"] = self.max_attempts or context.get("max_attempts")
55-
56-
# 3. RUN STEP
5756
logger.info(f"▶️ Running {self.name}...")
5857
start_time = time.time()
5958

6059
try:
6160
# Call abstract method
62-
context = self.run_step(context)
61+
context = self.run_step(context, step)
6362

6463
finally:
6564
duration = time.time() - start_time
6665
self.metadata["times"]["execution"] = duration
6766

6867
return context
6968

70-
def run_step(self, context):
69+
def run_step(self, context, step):
7170
"""
7271
Abstract: Implemented by MCPAgent
7372
"""

fractale/agent/defaults.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
environment = "generic cloud environment"
22
gemini_model = "gemini-2.5-pro"
3+
mcp_port = "8089"
34

45
# These are common / default args we don't need to give in any prompt.
56
shared_args = {

0 commit comments

Comments
 (0)