Skip to content

Commit 2c8600e

Browse files
committed
2 parents 40686b4 + d375547 commit 2c8600e

10 files changed

Lines changed: 550 additions & 28 deletions

File tree

src/db/crud.py

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -790,6 +790,13 @@ async def template_delete(db: aiosqlite.Connection, template_id: str) -> None:
790790
_VALID_PRIORITIES = {"normal", "urgent", "system"}
791791

792792

793+
async def msg_get(db: aiosqlite.Connection, message_id: str) -> Optional[Message]:
794+
"""Fetch a single message by ID. Returns None if not found."""
795+
async with db.execute("SELECT * FROM messages WHERE id = ?", (message_id,)) as cur:
796+
row = await cur.fetchone()
797+
return _row_to_message(row) if row else None
798+
799+
793800
async def msg_post(
794801
db: aiosqlite.Connection,
795802
thread_id: str,
@@ -800,11 +807,25 @@ async def msg_post(
800807
role: str = "user",
801808
metadata: Optional[dict] = None,
802809
priority: str = "normal",
810+
reply_to_msg_id: Optional[str] = None,
803811
) -> Message:
804812
# Validate priority (UP-16)
805813
if priority not in _VALID_PRIORITIES:
806814
raise ValueError(f"Invalid priority '{priority}'. Must be one of: {', '.join(sorted(_VALID_PRIORITIES))}")
807815

816+
# Validate reply_to_msg_id (UP-14): must exist and belong to the same thread
817+
if reply_to_msg_id is not None:
818+
async with db.execute(
819+
"SELECT thread_id FROM messages WHERE id = ?", (reply_to_msg_id,)
820+
) as cur:
821+
parent_row = await cur.fetchone()
822+
if parent_row is None:
823+
raise ValueError(f"reply_to_msg_id '{reply_to_msg_id}' does not exist.")
824+
if parent_row["thread_id"] != thread_id:
825+
raise ValueError(
826+
f"reply_to_msg_id '{reply_to_msg_id}' belongs to a different thread."
827+
)
828+
808829
# Content filter: block known secret patterns before any DB interaction
809830
if CONTENT_FILTER_ENABLED:
810831
blocked, pattern_name = check_content(content)
@@ -890,8 +911,8 @@ async def msg_post(
890911
seq = await next_seq(db)
891912
meta_json = json.dumps(metadata) if metadata else None
892913
await db.execute(
893-
"INSERT INTO messages (id, thread_id, author, role, content, seq, created_at, metadata, author_id, author_name, priority) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
894-
(mid, thread_id, actual_author, role, content, seq, now, meta_json, author_id, author_name, priority),
914+
"INSERT INTO messages (id, thread_id, author, role, content, seq, created_at, metadata, author_id, author_name, priority, reply_to_msg_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
915+
(mid, thread_id, actual_author, role, content, seq, now, meta_json, author_id, author_name, priority, reply_to_msg_id),
895916
)
896917
await db.execute(
897918
"UPDATE threads SET updated_at = ? WHERE id = ?", (now, thread_id)
@@ -940,11 +961,19 @@ async def msg_post(
940961
"msg_id": mid, "thread_id": thread_id,
941962
"agent": author_name, "reason": stop_reason,
942963
})
943-
logger.debug(f"Message posted: seq={seq} author={author_name} thread={thread_id} priority={priority}")
964+
# SSE event for reply-to threading (UP-14)
965+
if reply_to_msg_id is not None:
966+
await _emit_event(db, "msg.reply", thread_id, {
967+
"msg_id": mid, "reply_to_msg_id": reply_to_msg_id,
968+
"thread_id": thread_id, "author": author_name, "seq": seq,
969+
})
970+
971+
logger.debug(f"Message posted: seq={seq} author={author_name} thread={thread_id} priority={priority} reply_to={reply_to_msg_id}")
944972
return Message(
945973
id=mid, thread_id=thread_id, author=actual_author, role=role,
946974
content=content, seq=seq, created_at=_parse_dt(now), metadata=meta_json,
947-
author_id=author_id, author_name=author_name, priority=priority
975+
author_id=author_id, author_name=author_name, priority=priority,
976+
reply_to_msg_id=reply_to_msg_id,
948977
)
949978

950979

@@ -1072,6 +1101,7 @@ def _row_to_message(row: aiosqlite.Row) -> Message:
10721101
if not author_name:
10731102
author_name = row["author"]
10741103
priority = row["priority"] if "priority" in keys else "normal"
1104+
reply_to_msg_id = row["reply_to_msg_id"] if "reply_to_msg_id" in keys else None
10751105

10761106
return Message(
10771107
id=row["id"],
@@ -1085,6 +1115,7 @@ def _row_to_message(row: aiosqlite.Row) -> Message:
10851115
author_id=author_id,
10861116
author_name=author_name,
10871117
priority=priority,
1118+
reply_to_msg_id=reply_to_msg_id,
10881119
)
10891120

10901121

@@ -1636,9 +1667,9 @@ async def thread_export_markdown(db: aiosqlite.Connection, thread_id: str) -> Op
16361667
return "\n".join(lines)
16371668

16381669

1639-
# ─────────────────────────────────────────────
1640-
# Metrics (UP-22)
1641-
# ─────────────────────────────────────────────
1670+
# ──────────────────────────────────────────────────────────────────────────────
1671+
# UP-22: Bus-level observability metrics
1672+
# ──────────────────────────────────────────────────────────────────────────────
16421673

16431674
async def get_bus_metrics(db: aiosqlite.Connection) -> dict:
16441675
"""Return a snapshot of bus-level observability metrics.
@@ -1664,21 +1695,20 @@ async def get_bus_metrics(db: aiosqlite.Connection) -> dict:
16641695
AGENT_HEARTBEAT_TIMEOUT window
16651696
"""
16661697
now = datetime.now(timezone.utc)
1667-
now_iso = now.isoformat()
16681698

1669-
# ── Thread counts ────────────────────────────────────────────────────────
1699+
# ── Thread counts ──────────────────────────────────────────────────────────
16701700
threads_by_status: dict[str, int] = {}
16711701
async with db.execute("SELECT status, COUNT(*) AS cnt FROM threads GROUP BY status") as cur:
16721702
async for row in cur:
16731703
threads_by_status[row["status"]] = row["cnt"]
16741704
threads_total = sum(threads_by_status.values())
16751705

1676-
# ── Message total ────────────────────────────────────────────────────────
1706+
# ── Message total ──────────────────────────────────────────────────────────
16771707
async with db.execute("SELECT COUNT(*) AS cnt FROM messages") as cur:
16781708
row = await cur.fetchone()
16791709
messages_total = row["cnt"] if row else 0
16801710

1681-
# ── Message rates (1m / 5m / 15m) ───────────────────────────────────────
1711+
# ── Message rates (1m / 5m / 15m) ─────────────────────────────────────────
16821712
cutoffs = {
16831713
"last_1m": (now - timedelta(minutes=1)).isoformat(),
16841714
"last_5m": (now - timedelta(minutes=5)).isoformat(),
@@ -1692,11 +1722,9 @@ async def get_bus_metrics(db: aiosqlite.Connection) -> dict:
16921722
row = await cur.fetchone()
16931723
message_rate[key] = row["cnt"] if row else 0
16941724

1695-
# ── Inter-message latency (avg ms, threads active in last 15 min) ────────
1725+
# ── Inter-message latency (avg ms, threads active in last 15 min) ────────
16961726
# Uses LAG() window function (SQLite >= 3.25.0) to compute time gaps
16971727
# between consecutive messages within each thread, then averages them.
1698-
# Only considers threads that had activity in the last 15 minutes to keep
1699-
# the metric relevant to current bus load.
17001728
cutoff_15m = cutoffs["last_15m"]
17011729
avg_latency_ms: Optional[float] = None
17021730
try:
@@ -1718,10 +1746,9 @@ async def get_bus_metrics(db: aiosqlite.Connection) -> dict:
17181746
if row and row["avg_gap"] is not None:
17191747
avg_latency_ms = round(row["avg_gap"], 1)
17201748
except Exception:
1721-
# Defensive: if the window query fails for any reason, degrade gracefully
17221749
avg_latency_ms = None
17231750

1724-
# ── stop_reason distribution (UP-17) ─────────────────────────────────────
1751+
# ── stop_reason distribution (UP-17) ─────────────────────────────────────
17251752
canonical_reasons = ("convergence", "timeout", "complete", "error", "impasse")
17261753
stop_reasons: dict[str, int] = {r: 0 for r in canonical_reasons}
17271754
async with db.execute(
@@ -1736,7 +1763,7 @@ async def get_bus_metrics(db: aiosqlite.Connection) -> dict:
17361763
reason = row["reason"]
17371764
stop_reasons[reason] = stop_reasons.get(reason, 0) + row["cnt"]
17381765

1739-
# ── Agent counts ─────────────────────────────────────────────────────────
1766+
# ── Agent counts ───────────────────────────────────────────────────────────
17401767
agents_total = 0
17411768
agents_online = 0
17421769
heartbeat_cutoff = (

src/db/database.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,17 @@ async def init_schema(db: aiosqlite.Connection) -> None:
486486
# Migration: Add priority column to messages (UP-16)
487487
await _add_column_if_missing(db, "messages", "priority", "TEXT NOT NULL DEFAULT 'normal'")
488488

489+
# Migration: Add reply_to_msg_id column to messages (UP-14)
490+
await _add_column_if_missing(db, "messages", "reply_to_msg_id", "TEXT")
491+
try:
492+
await db.execute(
493+
"CREATE INDEX IF NOT EXISTS idx_messages_reply ON messages(reply_to_msg_id)"
494+
)
495+
await db.commit()
496+
logger.info("Migration: ensured reply_to_msg_id column + index exist (UP-14)")
497+
except Exception as e:
498+
logger.error(f"Migration failed for reply_to_msg_id index: {e}")
499+
489500
# Migration: Create reactions table if it does not exist (UP-13)
490501
# Safe for existing DBs — CREATE TABLE IF NOT EXISTS + CREATE UNIQUE INDEX IF NOT EXISTS
491502
try:

src/db/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class Message:
4545
metadata: Optional[str] # JSON string
4646
author_id: Optional[str] = None
4747
author_name: Optional[str] = None
48+
reply_to_msg_id: Optional[str] = None
4849
priority: str = "normal" # normal | urgent | system (UP-16)
4950

5051

src/main.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ def _agent_label(agent: object | None, fallback_id: str | None = None) -> str:
106106
# Support environment variable override via AGENTCHATBUS_DB_TIMEOUT
107107
DB_TIMEOUT = int(os.getenv("AGENTCHATBUS_DB_TIMEOUT", "5"))
108108

109-
# Server start time — set in lifespan(), used by /api/metrics
109+
# Server start time — set in lifespan(), used by /api/metrics (UP-22)
110110
_start_time: datetime | None = None
111111

112112

@@ -599,6 +599,7 @@ async def api_messages(
599599
"created_at": m.created_at.isoformat(),
600600
"metadata": m.metadata,
601601
"priority": m.priority,
602+
"reply_to_msg_id": m.reply_to_msg_id,
602603
"reactions": reactions_map.get(m.id, []),
603604
}
604605
for m in msgs
@@ -850,6 +851,7 @@ class MessageCreate(BaseModel):
850851
metadata: dict | None = None
851852
images: list[dict] | None = None # [{url: str, name: str}, ...]
852853
priority: Literal["normal", "urgent", "system"] = "normal" # UP-16
854+
reply_to_msg_id: str | None = None # UP-14: optional parent message ID
853855

854856
class SyncContextRequest(BaseModel):
855857
agent_id: str | None = None
@@ -1079,7 +1081,8 @@ async def api_post_message(thread_id: str, body: MessageCreate, x_agent_token: s
10791081
reply_token=reply_token,
10801082
role=body.role,
10811083
metadata=msg_metadata if msg_metadata else None,
1082-
priority=body.priority),
1084+
priority=body.priority,
1085+
reply_to_msg_id=body.reply_to_msg_id),
10831086
timeout=DB_TIMEOUT
10841087
)
10851088
except MissingSyncFieldsError as e:
@@ -1132,7 +1135,7 @@ async def api_post_message(thread_id: str, body: MessageCreate, x_agent_token: s
11321135
# Return the full message with metadata
11331136
result = {"id": m.id, "seq": m.seq, "author": m.author,
11341137
"role": m.role, "content": m.content, "created_at": m.created_at.isoformat(),
1135-
"priority": m.priority}
1138+
"priority": m.priority, "reply_to_msg_id": m.reply_to_msg_id}
11361139

11371140
# Add metadata (includes mentions and images)
11381141
if m.metadata:
@@ -1720,12 +1723,9 @@ async def api_thread_admin_decision(thread_id: str, body: AdminDecisionRequest):
17201723

17211724

17221725
# ─────────────────────────────────────────────
1723-
# Health check
1724-
# ΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇ
1725-
1726-
# ─────────────────────────────────────────────
1726+
# ──────────────────────────────────────────────────────────────────────────────
17271727
# Metrics (UP-22)
1728-
# ─────────────────────────────────────────────
1728+
# ──────────────────────────────────────────────────────────────────────────────
17291729

17301730
@app.get("/api/metrics")
17311731
async def get_metrics():
@@ -1755,6 +1755,9 @@ async def get_metrics():
17551755
}
17561756

17571757

1758+
# Health check
1759+
# ──────────────────────────────────────────────────────────────────────────────
1760+
17581761
@app.get("/health")
17591762
async def health():
17601763
return {"status": "ok", "service": "AgentChatBus"}

src/mcp_server.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,10 @@ async def list_tools() -> list[types.Tool]:
231231
},
232232
},
233233
},
234+
"reply_to_msg_id": {
235+
"type": "string",
236+
"description": "Optional ID of the message being replied to. Must belong to the same thread. Triggers a msg.reply SSE event.",
237+
},
234238
},
235239
"required": ["thread_id", "author", "content", "expected_last_seq", "reply_token"],
236240
},

src/static/css/main.css

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -869,6 +869,28 @@ body[data-theme="light"] .handoff-badge {
869869
color: #2563eb;
870870
}
871871

872+
/* Reply-to quote block (UP-14) */
873+
.msg-reply-quote {
874+
display: inline-block;
875+
font-size: 0.78em;
876+
padding: 2px 8px;
877+
margin-bottom: 4px;
878+
border-left: 3px solid rgba(150,150,255,0.6);
879+
background: rgba(150,150,255,0.08);
880+
border-radius: 0 4px 4px 0;
881+
color: rgba(200,200,220,0.8);
882+
font-style: normal;
883+
max-width: 100%;
884+
white-space: nowrap;
885+
overflow: hidden;
886+
text-overflow: ellipsis;
887+
}
888+
body[data-theme="light"] .msg-reply-quote {
889+
border-left-color: rgba(80,80,200,0.4);
890+
background: rgba(80,80,200,0.06);
891+
color: #555;
892+
}
893+
872894
body[data-theme="light"] .stop-tag {
873895
background: rgba(100,100,100,0.08);
874896
color: #555;

src/static/index.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -800,6 +800,11 @@
800800
priorityBadgeHtml = '<span class="msg-priority-badge msg-priority-system" title="System message">SYSTEM</span>';
801801
}
802802

803+
// Reply-to quote (UP-14)
804+
const replyQuoteHtml = m.reply_to_msg_id
805+
? `<div class="msg-reply-quote" title="In reply to message ${esc(m.reply_to_msg_id)}">&#8617; In reply to: <em>${esc(m.reply_to_msg_id.slice(0, 8))}…</em></div>`
806+
: '';
807+
803808
row.innerHTML = `
804809
<div class="msg-avatar" style="background:${bgAlpha};color:${color};border:1px solid ${color}44">${avatarEmoji}</div>
805810
<div class="msg-col">
@@ -809,6 +814,7 @@
809814
${priorityBadgeHtml}
810815
${handoffBadgeHtml}
811816
</div>
817+
${replyQuoteHtml}
812818
<div class="bubble-v2" style="border-color:${color}55;${isHuman ? 'border-right-color' : 'border-left-color'}:${color}"></div>
813819
${isAgentMessage ? '<acb-message-tail-meta></acb-message-tail-meta>' : ''}
814820
<div class="msg-reactions"></div>

src/tools/dispatch.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ async def handle_thread_create(db, arguments: dict[str, Any]) -> list[types.Text
223223
agent_info = await crud.agent_get(db, agent_id)
224224
if agent_info:
225225
await crud.thread_settings_set_creator_admin(db, result.id, agent_id, agent_info.name)
226-
226+
227227
token_payload = await crud.issue_reply_token(db, thread_id=result.id, agent_id=agent_id)
228228

229229
return [types.TextContent(type="text", text=json.dumps({
@@ -346,6 +346,7 @@ async def handle_msg_post(db, arguments: dict[str, Any]) -> list[types.TextConte
346346
role=arguments.get("role", "user"),
347347
metadata=arguments.get("metadata"),
348348
priority=arguments.get("priority", "normal"),
349+
reply_to_msg_id=arguments.get("reply_to_msg_id"),
349350
)
350351

351352
# Agent posted a message - exit wait state for this thread
@@ -395,9 +396,19 @@ async def handle_msg_post(db, arguments: dict[str, Any]) -> list[types.TextConte
395396
"error": "Content blocked by filter",
396397
"pattern": e.pattern_name,
397398
}))]
399+
except ValueError as e:
400+
return [types.TextContent(type="text", text=json.dumps({
401+
"error": "INVALID_ARGUMENT",
402+
"detail": str(e),
403+
}))]
398404

399405
meta = _safe_json_loads(msg.metadata)
400-
result: dict[str, Any] = {"msg_id": msg.id, "seq": msg.seq, "priority": msg.priority}
406+
result: dict[str, Any] = {
407+
"msg_id": msg.id,
408+
"seq": msg.seq,
409+
"priority": msg.priority,
410+
"reply_to_msg_id": msg.reply_to_msg_id,
411+
}
401412
if isinstance(meta, dict):
402413
if meta.get("handoff_target"):
403414
result["handoff_target"] = meta["handoff_target"]
@@ -438,6 +449,7 @@ async def handle_msg_list(db, arguments: dict[str, Any]) -> list[types.Content]:
438449
"created_at": m.created_at.isoformat(),
439450
"metadata": m.metadata,
440451
"priority": m.priority,
452+
"reply_to_msg_id": m.reply_to_msg_id,
441453
"reactions": reactions_map.get(m.id, []),
442454
}
443455
for m in msgs

0 commit comments

Comments
 (0)