Skip to content

Commit e2db49b

Browse files
committed
feat: enhance agent identification and connection management across multiple modules
1 parent 8a26431 commit e2db49b

7 files changed

Lines changed: 87 additions & 26 deletions

File tree

src/db/crud.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,11 @@ async def msg_post(
267267
author_id = None
268268
author_name = author
269269

270-
async with db.execute("SELECT id, name FROM agents WHERE id = ?", (author,)) as cur:
270+
# Try to find agent by ID, name, or display_name
271+
async with db.execute(
272+
"SELECT id, name FROM agents WHERE id = ? OR name = ? OR display_name = ?",
273+
(author, author, author)
274+
) as cur:
271275
row = await cur.fetchone()
272276
if row:
273277
actual_author = row["name"]

src/main.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ async def __call__(self, scope, receive, send):
139139
@app.get("/mcp/sse")
140140
async def mcp_sse_endpoint(request: Request):
141141
"""MCP SSE endpoint consumed by MCP clients (Claude Desktop, Cursor, …)."""
142-
from src.mcp_server import init_session_id
142+
from src.mcp_server import init_session_id, clear_connection_agent
143143

144144
# Initialize unique session ID for this SSE connection
145145
session_id = init_session_id()
@@ -161,6 +161,9 @@ async def mcp_sse_endpoint(request: Request):
161161
# Most are normal disconnects (anyio.ClosedResourceError, CancelledError…).
162162
# Log at DEBUG to avoid polluting the terminal.
163163
logger.debug("MCP SSE session ended: %s: %s", type(exc).__name__, exc)
164+
finally:
165+
# Clean up connection agent registry on disconnect
166+
clear_connection_agent(session_id)
164167
return _SseCompletedResponse()
165168

166169

src/mcp_server.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@ def get_connection_agent() -> tuple[str | None, str | None]:
7171
return agent_info["agent_id"], agent_info["token"]
7272
return None, None
7373

74+
def clear_connection_agent(session_id: str) -> None:
75+
"""Clear agent identity for a session (call on SSE disconnect)."""
76+
if session_id in _connection_agents:
77+
agent_info = _connection_agents.pop(session_id)
78+
logger.info(f"[clear_connection_agent] removed session {session_id[:8]}: agent_id={agent_info.get('agent_id')}")
79+
7480
# Create the MCP server instance
7581
server = Server("AgentChatBus")
7682

src/static/js/shared-agent-status.js

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,16 @@
88
}
99

1010
const secondsAgo = (now - activityTime) / 1000;
11-
if (agent.last_activity === "msg_wait" && secondsAgo < 60) return "Waiting";
12-
if (secondsAgo < 30) return "Active";
13-
if (secondsAgo < 300) return "Idle";
11+
12+
if (agent.last_activity === "msg_wait" && secondsAgo < 60) {
13+
return "Waiting";
14+
}
15+
if (secondsAgo < 30) {
16+
return "Active";
17+
}
18+
if (secondsAgo < 300) {
19+
return "Idle";
20+
}
1421
return agent.is_online ? "Idle" : "Offline";
1522
}
1623

src/static/js/shared-agents.js

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -155,26 +155,44 @@
155155
const msgArea = document.getElementById("messages");
156156
if (msgArea) {
157157
const rows = msgArea.querySelectorAll("[data-author-id]");
158+
// Use agent's real UUID as key for deduplication
159+
const agentUuidMap = new Map(); // UUID -> agent object
160+
158161
rows.forEach((row) => {
159162
const authorId = row.getAttribute("data-author-id");
160-
if (authorId && authorId !== "system" && authorId !== "human" && !participantIdMap.has(authorId)) {
161-
const agent = allAgents.find((a) => a.id === authorId || a.agent_id === authorId);
163+
if (authorId && authorId !== "system" && authorId !== "human") {
164+
// Try to find agent by id (UUID), author_id, or name/display_name
165+
// Also try partial match for name/display_name (e.g., "iFlow CLI" matches "iFlow CLI (glm-5)")
166+
const agent = allAgents.find((a) =>
167+
a.id === authorId ||
168+
a.agent_id === authorId ||
169+
a.name === authorId ||
170+
a.display_name === authorId ||
171+
a.author_id === authorId ||
172+
(a.name && a.name.includes(authorId)) ||
173+
(a.display_name && a.display_name.includes(authorId))
174+
);
162175
if (agent) {
163-
participantIdMap.set(authorId, agent);
176+
// Dedupe by agent's real UUID
177+
if (!agentUuidMap.has(agent.id)) {
178+
agentUuidMap.set(agent.id, agent);
179+
}
164180
} else {
165-
participantIdMap.set(authorId, {
166-
id: authorId,
167-
display_name: authorId,
168-
name: authorId,
169-
is_online: false,
170-
});
181+
// Fallback: use authorId as pseudo-UUID
182+
if (!agentUuidMap.has(authorId)) {
183+
agentUuidMap.set(authorId, {
184+
id: authorId,
185+
display_name: authorId,
186+
name: authorId,
187+
is_online: false,
188+
});
189+
}
171190
}
172191
}
173192
});
193+
// When a thread is selected, only show participants for that thread
194+
participants = Array.from(agentUuidMap.values());
174195
}
175-
176-
// When a thread is selected, only show participants for that thread
177-
participants = Array.from(participantIdMap.values());
178196
} else {
179197
// When no thread is selected, show only online agents
180198
participants = allAgents.filter((a) => a.is_online);

src/static/js/shared-api.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
if (!response.ok) {
1010
console.warn(`[API] ${options.method || 'GET'} ${path} → HTTP ${response.status}`);
1111
}
12-
return response.json();
12+
return await response.json();
1313
} catch (err) {
1414
console.warn(`[API] ${options.method || 'GET'} ${path} → network error:`, err);
1515
return null;

src/tools/dispatch.py

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -290,22 +290,45 @@ async def handle_msg_wait(db, arguments: dict[str, Any]) -> list[types.Content]:
290290

291291
logger.info(f"[msg_wait] explicit: agent_id={explicit_agent_id}, connection: agent_id={connection_agent_id}, final_agent_id={agent_id}")
292292

293-
if agent_id and token:
294-
try:
295-
result = await crud.agent_msg_wait(db, agent_id, token)
296-
logger.info(f"[msg_wait] activity recorded: agent_id={agent_id}, result={result}")
297-
except Exception as e:
298-
logger.warning(f"[msg_wait] Failed to record activity for {agent_id}: {e}")
299-
else:
300-
logger.warning(f"[msg_wait] No credentials available: agent_id={agent_id}, token={'***' if token else None}")
293+
# Heartbeat interval: refresh every 20 seconds to stay online (AGENT_HEARTBEAT_TIMEOUT is 30s)
294+
HEARTBEAT_INTERVAL = 20.0
295+
296+
async def _refresh_heartbeat():
297+
"""Refresh heartbeat to keep agent online during long wait."""
298+
if agent_id and token:
299+
try:
300+
await crud.agent_msg_wait(db, agent_id, token)
301+
logger.debug(f"[msg_wait] heartbeat refreshed for agent_id={agent_id}")
302+
except Exception as e:
303+
logger.warning(f"[msg_wait] Failed to refresh heartbeat for {agent_id}: {e}")
301304

302305
async def _poll():
306+
"""Poll for new messages, refreshing heartbeat periodically."""
307+
last_heartbeat = asyncio.get_event_loop().time()
303308
while True:
309+
# Check for new messages
304310
msgs = await crud.msg_list(db, thread_id, after_seq=after_seq, include_system_prompt=False)
305311
if msgs:
306312
return msgs
313+
314+
# Refresh heartbeat if interval elapsed
315+
now = asyncio.get_event_loop().time()
316+
if now - last_heartbeat >= HEARTBEAT_INTERVAL:
317+
await _refresh_heartbeat()
318+
last_heartbeat = now
319+
307320
await asyncio.sleep(0.5)
308321

322+
# Record initial activity
323+
if agent_id and token:
324+
try:
325+
result = await crud.agent_msg_wait(db, agent_id, token)
326+
logger.info(f"[msg_wait] initial activity recorded: agent_id={agent_id}, result={result}")
327+
except Exception as e:
328+
logger.warning(f"[msg_wait] Failed to record activity for {agent_id}: {e}")
329+
else:
330+
logger.warning(f"[msg_wait] No credentials available: agent_id={agent_id}, token={'***' if token else None}")
331+
309332
try:
310333
msgs = await asyncio.wait_for(_poll(), timeout=timeout_s)
311334
except asyncio.TimeoutError:

0 commit comments

Comments
 (0)