Skip to content

Commit f0abaff

Browse files
committed
feat: Implement acb-compose-shell custom element for chat composition, enabling mentions and image uploads, and update chat message handling.
1 parent d4edad0 commit f0abaff

17 files changed

Lines changed: 604 additions & 59 deletions

src/config.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@
4040
MSG_WAIT_TIMEOUT = int(os.getenv("AGENTCHATBUS_WAIT_TIMEOUT", config_data.get("MSG_WAIT_TIMEOUT", "300")))
4141
BUS_VERSION = "0.1.0"
4242

43+
# Strict message sync mode (mandatory)
44+
REPLY_TOKEN_LEASE_SECONDS = int(os.getenv("AGENTCHATBUS_REPLY_TOKEN_LEASE_SECONDS", "10"))
45+
SEQ_TOLERANCE = int(os.getenv("AGENTCHATBUS_SEQ_TOLERANCE", "5"))
46+
SEQ_MISMATCH_MAX_MESSAGES = int(os.getenv("AGENTCHATBUS_SEQ_MISMATCH_MAX_MESSAGES", "20"))
47+
4348
# Rate limiting: max messages per minute per author identity (0 = disabled)
4449
RATE_LIMIT_MSG_PER_MINUTE = int(os.getenv("AGENTCHATBUS_RATE_LIMIT", "30"))
4550
RATE_LIMIT_ENABLED = RATE_LIMIT_MSG_PER_MINUTE > 0
@@ -62,6 +67,9 @@ def get_config_dict():
6267
"PORT": PORT,
6368
"AGENT_HEARTBEAT_TIMEOUT": AGENT_HEARTBEAT_TIMEOUT,
6469
"MSG_WAIT_TIMEOUT": MSG_WAIT_TIMEOUT,
70+
"REPLY_TOKEN_LEASE_SECONDS": REPLY_TOKEN_LEASE_SECONDS,
71+
"SEQ_TOLERANCE": SEQ_TOLERANCE,
72+
"SEQ_MISMATCH_MAX_MESSAGES": SEQ_MISMATCH_MAX_MESSAGES,
6573
"EXPOSE_THREAD_RESOURCES": EXPOSE_THREAD_RESOURCES,
6674
}
6775

src/db/crud.py

Lines changed: 165 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,15 @@
1313
import aiosqlite
1414

1515
from src.db.models import Thread, Message, AgentInfo, Event
16-
from src.config import AGENT_HEARTBEAT_TIMEOUT, RATE_LIMIT_MSG_PER_MINUTE, RATE_LIMIT_ENABLED
17-
from src.config import AGENT_HEARTBEAT_TIMEOUT, CONTENT_FILTER_ENABLED
16+
from src.config import (
17+
AGENT_HEARTBEAT_TIMEOUT,
18+
RATE_LIMIT_MSG_PER_MINUTE,
19+
RATE_LIMIT_ENABLED,
20+
CONTENT_FILTER_ENABLED,
21+
REPLY_TOKEN_LEASE_SECONDS,
22+
SEQ_TOLERANCE,
23+
SEQ_MISMATCH_MAX_MESSAGES,
24+
)
1825
from src.content_filter import check_content, ContentFilterError
1926

2027
logger = logging.getLogger(__name__)
@@ -30,6 +37,46 @@ def __init__(self, limit: int, window: int, retry_after: int, scope: str) -> Non
3037
super().__init__(f"Rate limit exceeded: {limit} messages/{window}s")
3138

3239

40+
class MissingSyncFieldsError(Exception):
41+
"""Raised when strict sync fields are absent from msg_post."""
42+
43+
def __init__(self, missing_fields: list[str]) -> None:
44+
self.missing_fields = missing_fields
45+
super().__init__(f"Missing required sync fields: {', '.join(missing_fields)}")
46+
47+
48+
class SeqMismatchError(Exception):
49+
"""Raised when too many unseen messages exist since expected seq."""
50+
51+
def __init__(self, expected_last_seq: int, current_seq: int, new_messages: list[dict]) -> None:
52+
self.expected_last_seq = expected_last_seq
53+
self.current_seq = current_seq
54+
self.new_messages = new_messages
55+
super().__init__(
56+
f"SEQ_MISMATCH: expected_last_seq={expected_last_seq}, current_seq={current_seq}"
57+
)
58+
59+
60+
class ReplyTokenInvalidError(Exception):
61+
def __init__(self, token: str) -> None:
62+
self.token = token
63+
super().__init__("TOKEN_INVALID")
64+
65+
66+
class ReplyTokenExpiredError(Exception):
67+
def __init__(self, token: str, expires_at: str) -> None:
68+
self.token = token
69+
self.expires_at = expires_at
70+
super().__init__("TOKEN_EXPIRED")
71+
72+
73+
class ReplyTokenReplayError(Exception):
74+
def __init__(self, token: str, consumed_at: Optional[str]) -> None:
75+
self.token = token
76+
self.consumed_at = consumed_at
77+
super().__init__("TOKEN_REPLAY")
78+
79+
3380
GLOBAL_SYSTEM_PROMPT = """**SYSTEM DIRECTIVE: ACTIVE AGENT COLLABORATION WORKSPACE**
3481
3582
Welcome to this Thread. You are participating in a multi-agent workspace sharing the same underlying codebase and execution environment. You MUST collaborate proactively and keep progress moving.
@@ -233,6 +280,69 @@ async def thread_latest_seq(db: aiosqlite.Connection, thread_id: str) -> int:
233280
return row["max_seq"] or 0
234281

235282

283+
async def _expire_old_reply_tokens(db: aiosqlite.Connection) -> None:
284+
now = _now()
285+
await db.execute(
286+
"UPDATE reply_tokens SET status = 'expired' "
287+
"WHERE status = 'issued' AND expires_at <= ?",
288+
(now,),
289+
)
290+
await db.commit()
291+
292+
293+
async def issue_reply_token(
294+
db: aiosqlite.Connection,
295+
thread_id: str,
296+
agent_id: Optional[str] = None,
297+
) -> dict:
298+
"""Issue a short-lived reply token bound to a thread (and optionally an agent)."""
299+
await _expire_old_reply_tokens(db)
300+
token = secrets.token_urlsafe(24)
301+
issued_at = _now()
302+
expires_at = (datetime.now(timezone.utc) + timedelta(seconds=REPLY_TOKEN_LEASE_SECONDS)).isoformat()
303+
await db.execute(
304+
"INSERT INTO reply_tokens (token, thread_id, agent_id, issued_at, expires_at, consumed_at, status) "
305+
"VALUES (?, ?, ?, ?, ?, NULL, 'issued')",
306+
(token, thread_id, agent_id, issued_at, expires_at),
307+
)
308+
await db.commit()
309+
current_seq = await thread_latest_seq(db, thread_id)
310+
return {
311+
"reply_token": token,
312+
"current_seq": current_seq,
313+
"reply_window": {
314+
"expires_at": expires_at,
315+
"max_new_messages": SEQ_TOLERANCE,
316+
},
317+
}
318+
319+
320+
async def _get_new_messages_since(
321+
db: aiosqlite.Connection,
322+
thread_id: str,
323+
expected_last_seq: int,
324+
limit: int = SEQ_MISMATCH_MAX_MESSAGES,
325+
) -> list[dict]:
326+
msgs = await msg_list(
327+
db,
328+
thread_id=thread_id,
329+
after_seq=expected_last_seq,
330+
limit=limit,
331+
include_system_prompt=False,
332+
)
333+
return [
334+
{
335+
"msg_id": m.id,
336+
"seq": m.seq,
337+
"author": m.author,
338+
"role": m.role,
339+
"content": m.content,
340+
"created_at": m.created_at.isoformat(),
341+
}
342+
for m in msgs
343+
]
344+
345+
236346
def _row_to_thread(row: aiosqlite.Row) -> Thread:
237347
system_prompt = row["system_prompt"] if "system_prompt" in row.keys() else None
238348
updated_at = _parse_dt(row["updated_at"]) if "updated_at" in row.keys() and row["updated_at"] else None
@@ -258,6 +368,8 @@ async def msg_post(
258368
thread_id: str,
259369
author: str,
260370
content: str,
371+
expected_last_seq: int,
372+
reply_token: str,
261373
role: str = "user",
262374
metadata: Optional[dict] = None,
263375
) -> Message:
@@ -310,6 +422,41 @@ async def msg_post(
310422
scope=scope,
311423
)
312424

425+
missing_fields: list[str] = []
426+
if expected_last_seq is None:
427+
missing_fields.append("expected_last_seq")
428+
if not reply_token:
429+
missing_fields.append("reply_token")
430+
if missing_fields:
431+
raise MissingSyncFieldsError(missing_fields)
432+
433+
await _expire_old_reply_tokens(db)
434+
async with db.execute(
435+
"SELECT token, thread_id, agent_id, expires_at, consumed_at, status "
436+
"FROM reply_tokens WHERE token = ?",
437+
(reply_token,),
438+
) as cur:
439+
token_row = await cur.fetchone()
440+
441+
if token_row is None:
442+
raise ReplyTokenInvalidError(reply_token)
443+
if token_row["thread_id"] != thread_id:
444+
raise ReplyTokenInvalidError(reply_token)
445+
if token_row["status"] == "consumed":
446+
raise ReplyTokenReplayError(reply_token, token_row["consumed_at"])
447+
if token_row["status"] == "expired":
448+
raise ReplyTokenExpiredError(reply_token, token_row["expires_at"])
449+
450+
token_agent_id = token_row["agent_id"]
451+
if token_agent_id and author_id and token_agent_id != author_id:
452+
raise ReplyTokenInvalidError(reply_token)
453+
454+
current_seq = await thread_latest_seq(db, thread_id)
455+
new_messages_count = current_seq - expected_last_seq
456+
if new_messages_count > SEQ_TOLERANCE:
457+
new_messages = await _get_new_messages_since(db, thread_id, expected_last_seq)
458+
raise SeqMismatchError(expected_last_seq, current_seq, new_messages)
459+
313460
mid = str(uuid.uuid4())
314461
now = _now()
315462
seq = await next_seq(db)
@@ -323,6 +470,22 @@ async def msg_post(
323470
"UPDATE threads SET updated_at = ? WHERE id = ?",
324471
(now, thread_id),
325472
)
473+
async with db.execute(
474+
"UPDATE reply_tokens SET status = 'consumed', consumed_at = ? "
475+
"WHERE token = ? AND status = 'issued'",
476+
(now, reply_token),
477+
) as cur:
478+
consumed = cur.rowcount
479+
if consumed == 0:
480+
await db.rollback()
481+
async with db.execute(
482+
"SELECT consumed_at FROM reply_tokens WHERE token = ?",
483+
(reply_token,),
484+
) as cur:
485+
row = await cur.fetchone()
486+
consumed_at = row["consumed_at"] if row else None
487+
raise ReplyTokenReplayError(reply_token, consumed_at)
488+
326489
await db.commit()
327490
if author_id:
328491
await agent_msg_post(db, author_id)

src/db/database.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,24 @@ async def init_schema(db: aiosqlite.Connection) -> None:
150150
CREATE INDEX IF NOT EXISTS idx_messages_thread_seq
151151
ON messages(thread_id, seq);
152152
153+
-- ----------------------------------------------------------------
154+
-- Reply token lease: mandatory sync token for msg_post in strict mode
155+
-- ----------------------------------------------------------------
156+
CREATE TABLE IF NOT EXISTS reply_tokens (
157+
token TEXT PRIMARY KEY,
158+
thread_id TEXT NOT NULL REFERENCES threads(id),
159+
agent_id TEXT,
160+
issued_at TEXT NOT NULL,
161+
expires_at TEXT NOT NULL,
162+
consumed_at TEXT,
163+
status TEXT NOT NULL CHECK (status IN ('issued', 'consumed', 'expired'))
164+
);
165+
166+
CREATE INDEX IF NOT EXISTS idx_reply_tokens_thread_status
167+
ON reply_tokens(thread_id, status);
168+
CREATE INDEX IF NOT EXISTS idx_reply_tokens_expires_at
169+
ON reply_tokens(expires_at);
170+
153171
-- ----------------------------------------------------------------
154172
-- Sequence counter: single-row table for thread-safe seq increment
155173
-- ----------------------------------------------------------------

src/main.py

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,14 @@
2828
from src.config import HOST, PORT, get_config_dict, save_config_dict
2929
from src.db.database import get_db, close_db
3030
from src.db import crud
31-
from src.db.crud import RateLimitExceeded
31+
from src.db.crud import (
32+
RateLimitExceeded,
33+
MissingSyncFieldsError,
34+
SeqMismatchError,
35+
ReplyTokenInvalidError,
36+
ReplyTokenExpiredError,
37+
ReplyTokenReplayError,
38+
)
3239
from src.config import THREAD_TIMEOUT_ENABLED, THREAD_TIMEOUT_MINUTES, THREAD_TIMEOUT_SWEEP_INTERVAL, RELOAD_ENABLED
3340
from src.mcp_server import server as mcp_server, _session_language
3441
from src.content_filter import ContentFilterError
@@ -385,10 +392,34 @@ class MessageCreate(BaseModel):
385392
author: str = "human"
386393
role: Literal["user", "assistant", "system"] = "user"
387394
content: str
395+
expected_last_seq: int
396+
reply_token: str
388397
mentions: list[str] | None = None
389398
metadata: dict | None = None
390399
images: list[dict] | None = None # [{url: str, name: str}, ...]
391400

401+
402+
class SyncContextRequest(BaseModel):
403+
agent_id: str | None = None
404+
405+
406+
@app.post("/api/threads/{thread_id}/sync-context")
407+
async def api_sync_context(thread_id: str, body: SyncContextRequest | None = None):
408+
try:
409+
db = await asyncio.wait_for(get_db(), timeout=DB_TIMEOUT)
410+
t = await asyncio.wait_for(crud.thread_get(db, thread_id), timeout=DB_TIMEOUT)
411+
except asyncio.TimeoutError:
412+
raise HTTPException(status_code=503, detail="Database operation timeout")
413+
if t is None:
414+
raise HTTPException(status_code=404, detail="Thread not found")
415+
416+
agent_id = body.agent_id if body else None
417+
sync = await asyncio.wait_for(
418+
crud.issue_reply_token(db, thread_id=thread_id, agent_id=agent_id),
419+
timeout=DB_TIMEOUT,
420+
)
421+
return sync
422+
392423
@app.post("/api/threads", status_code=201)
393424
async def api_create_thread(body: ThreadCreate):
394425
try:
@@ -421,10 +452,44 @@ async def api_post_message(thread_id: str, body: MessageCreate):
421452
try:
422453
m = await asyncio.wait_for(
423454
crud.msg_post(db, thread_id=thread_id, author=body.author,
424-
content=body.content, role=body.role,
455+
content=body.content,
456+
expected_last_seq=body.expected_last_seq,
457+
reply_token=body.reply_token,
458+
role=body.role,
425459
metadata=msg_metadata if msg_metadata else None),
426460
timeout=DB_TIMEOUT
427461
)
462+
except MissingSyncFieldsError as e:
463+
raise HTTPException(status_code=400, detail={
464+
"error": "MISSING_SYNC_FIELDS",
465+
"missing_fields": e.missing_fields,
466+
"action": "CALL_SYNC_CONTEXT_THEN_RETRY",
467+
})
468+
except SeqMismatchError as e:
469+
raise HTTPException(status_code=409, detail={
470+
"error": "SEQ_MISMATCH",
471+
"expected_last_seq": e.expected_last_seq,
472+
"current_seq": e.current_seq,
473+
"new_messages": e.new_messages,
474+
"action": "RE_READ_AND_RETRY",
475+
})
476+
except ReplyTokenInvalidError:
477+
raise HTTPException(status_code=400, detail={
478+
"error": "TOKEN_INVALID",
479+
"action": "CALL_SYNC_CONTEXT_THEN_RETRY",
480+
})
481+
except ReplyTokenExpiredError as e:
482+
raise HTTPException(status_code=400, detail={
483+
"error": "TOKEN_EXPIRED",
484+
"expires_at": e.expires_at,
485+
"action": "CALL_SYNC_CONTEXT_THEN_RETRY",
486+
})
487+
except ReplyTokenReplayError as e:
488+
raise HTTPException(status_code=400, detail={
489+
"error": "TOKEN_REPLAY",
490+
"consumed_at": e.consumed_at,
491+
"action": "CALL_SYNC_CONTEXT_THEN_RETRY",
492+
})
428493
except ContentFilterError as e:
429494
raise HTTPException(status_code=400, detail={"error": "Content blocked by filter", "pattern": e.pattern_name})
430495
except asyncio.TimeoutError:

src/mcp_server.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,14 @@ async def list_tools() -> list[types.Tool]:
158158
"thread_id": {"type": "string"},
159159
"author": {"type": "string", "description": "Agent ID, 'system', or 'human'."},
160160
"content": {"type": "string"},
161+
"expected_last_seq": {
162+
"type": "integer",
163+
"description": "Strict sync field. Thread seq the sender used as context baseline.",
164+
},
165+
"reply_token": {
166+
"type": "string",
167+
"description": "Strict sync field. Unconsumed reply token from msg_wait.",
168+
},
161169
"role": {"type": "string", "enum": ["user", "assistant", "system"], "default": "user"},
162170
"mentions": {
163171
"type": "array",
@@ -166,7 +174,7 @@ async def list_tools() -> list[types.Tool]:
166174
},
167175
"metadata": {"type": "object"},
168176
},
169-
"required": ["thread_id", "author", "content"],
177+
"required": ["thread_id", "author", "content", "expected_last_seq", "reply_token"],
170178
},
171179
),
172180
types.Tool(
@@ -202,6 +210,8 @@ async def list_tools() -> list[types.Tool]:
202210
description=(
203211
"Block until at least one new message arrives in the thread after `after_seq`. "
204212
"Returns immediately if messages are already available. "
213+
"Always includes sync context (`current_seq`, `reply_token`, `reply_window`) "
214+
"for the next strict `msg_post` call. "
205215
"If this tool returns an empty list (timeout), avoid spammy waiting messages, "
206216
"but after repeated timeouts you SHOULD send a concise, meaningful progress update "
207217
"(status/blocker/next action) and optionally @mention a relevant online agent."

0 commit comments

Comments
 (0)