Skip to content

Commit c6ed802

Browse files
authored
Logos: Dynamic worker announcements, separate API keys table for workers (#500)
1 parent 9209b45 commit c6ed802

17 files changed

Lines changed: 544 additions & 179 deletions

logos/db/init.sql

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ DROP TABLE IF EXISTS token_prices CASCADE;
2525
DROP TABLE IF EXISTS jobs CASCADE;
2626
DROP TABLE IF EXISTS ollama_provider_snapshots CASCADE;
2727
DROP TABLE IF EXISTS model_profiles CASCADE;
28+
DROP TABLE IF EXISTS logosnode_provider_keys CASCADE;
2829
DROP TABLE IF EXISTS schema_migrations CASCADE;
2930

3031
CREATE TABLE users (
@@ -111,6 +112,14 @@ CREATE TABLE model_api_keys (
111112
UNIQUE(model_id, provider_id)
112113
);
113114

115+
-- Per-provider key for logosnode workers (replaces per-model model_api_keys for workers)
116+
CREATE TABLE logosnode_provider_keys (
117+
id SERIAL PRIMARY KEY,
118+
provider_id INTEGER NOT NULL REFERENCES providers(id) ON DELETE CASCADE,
119+
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
120+
UNIQUE(provider_id)
121+
);
122+
114123
CREATE TABLE profile_model_permissions (
115124
id SERIAL PRIMARY KEY,
116125
profile_id INTEGER REFERENCES profiles(id) ON DELETE CASCADE,
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
-- Migration 027: Dynamic logosnode deployments
2+
--
3+
-- For logosnode (worker-node) providers, models are announced dynamically via
4+
-- WebSocket capabilities. This migration:
5+
--
6+
-- 1. Creates a logosnode_provider_keys table so workernode providers don't need
7+
-- per-model entries in model_api_keys (they use a single shared key).
8+
-- 2. Auto-syncing of model_provider rows is handled at the application layer
9+
-- when capabilities are announced, so existing deployment queries continue
10+
-- to work without schema changes.
11+
12+
-- Step 1: Create logosnode_provider_keys table
13+
-- This replaces the need for per-model model_api_keys rows for logosnode providers.
14+
-- Each logosnode provider has exactly one key (stored in providers.api_key already),
15+
-- but this table makes the deployment query work without model_api_keys.
16+
CREATE TABLE IF NOT EXISTS logosnode_provider_keys (
17+
id SERIAL PRIMARY KEY,
18+
provider_id INTEGER NOT NULL REFERENCES providers(id) ON DELETE CASCADE,
19+
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
20+
UNIQUE(provider_id)
21+
);
22+
23+
-- Seed the table from existing logosnode providers
24+
INSERT INTO logosnode_provider_keys (provider_id)
25+
SELECT id FROM providers WHERE provider_type = 'logosnode'
26+
ON CONFLICT (provider_id) DO NOTHING;
27+
28+
-- Record migration
29+
INSERT INTO schema_migrations (filename)
30+
VALUES ('027_logosnode_dynamic_deployments.sql')
31+
ON CONFLICT (filename) DO NOTHING;

logos/db/migrations/AGENTS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
If you create a migration in this folder, ensure it is also added to "run_all_migration.sh"!

logos/db/migrations/run_all_migrations.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ MIGRATIONS=(
6161
"024_store_logosnode_runtime_payload.sql"
6262
"025_create_model_profiles_table.sql"
6363
"026_create_schema_migrations.sql"
64+
"027_logosnode_dynamic_deployments.sql"
6465
)
6566

6667
FAILED=0

logos/logos-workernode/logos_worker_node/calibration.py

Lines changed: 82 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
_CALIBRATION_PORT = 11499
6161
_KV_CACHE_MIN_STEP_MB = 1024.0 # binary search precision and safety margin
6262
_KV_CACHE_VRAM_CAP_RATIO = 0.8 # fraction of total GPU VRAM used as KV search ceiling
63+
_FAILED_COMMANDS_FILE = "calibration_failed_commands.txt"
6364

6465
# ---------------------------------------------------------------------------
6566
# KV-cache size parsing
@@ -96,6 +97,47 @@ def _round_up_gb(mb: float) -> float:
9697
return math.ceil(mb / 1024.0) * 1024.0
9798

9899

100+
# ---------------------------------------------------------------------------
101+
# Failed-command blacklist
102+
# ---------------------------------------------------------------------------
103+
104+
105+
def _cmd_fingerprint(cmd: list[str]) -> str:
106+
"""Build a canonical one-line string from a vLLM command list.
107+
108+
Strips ``--host`` and ``--port`` (calibration infra, not model-specific)
109+
so that retries on a different port aren't falsely considered "new".
110+
"""
111+
filtered: list[str] = []
112+
skip_next = False
113+
for i, tok in enumerate(cmd):
114+
if skip_next:
115+
skip_next = False
116+
continue
117+
if tok in ("--host", "--port"):
118+
skip_next = True
119+
continue
120+
filtered.append(tok)
121+
return " ".join(filtered)
122+
123+
124+
def _load_failed_commands(failed_path: Path) -> set[str]:
125+
if not failed_path.exists():
126+
return set()
127+
return {
128+
line.strip()
129+
for line in failed_path.read_text(encoding="utf-8").splitlines()
130+
if line.strip() and not line.strip().startswith("#")
131+
}
132+
133+
134+
def _record_failed_command(failed_path: Path, fingerprint: str) -> None:
135+
failed_path.parent.mkdir(parents=True, exist_ok=True)
136+
with failed_path.open("a", encoding="utf-8") as f:
137+
f.write(fingerprint + "\n")
138+
logger.info(" Blacklisted command → %s", failed_path)
139+
140+
99141
# ---------------------------------------------------------------------------
100142
# GPU VRAM helpers
101143
# ---------------------------------------------------------------------------
@@ -198,30 +240,23 @@ def _post(
198240
# ---------------------------------------------------------------------------
199241

200242

201-
def spawn_vllm(
243+
def _build_vllm_cmd(
202244
plan: dict[str, Any],
203245
vllm_binary: str,
204246
host: str,
205247
port: int,
206-
log_path: Path,
207248
kv_cache_memory_bytes: str,
208-
) -> subprocess.Popen[str]:
249+
) -> list[str]:
250+
"""Build the vLLM command list without spawning a process."""
209251
model = plan["model"]
210252
tp = int(plan.get("tensor_parallel_size", 1))
211253
dtype = str(plan.get("dtype", "auto"))
212254
quant = str(plan.get("quantization") or "")
213255
max_model_len = plan.get("max_model_len")
214256
enforce_eager = bool(plan.get("enforce_eager", False))
215257
disable_custom_all_reduce = bool(plan.get("disable_custom_all_reduce", False))
216-
disable_nccl_p2p = bool(plan.get("disable_nccl_p2p", False))
217258
extra_args: list[str] = list(plan.get("extra_args") or [])
218259
kv_bytes = str(plan.get("kv_cache_memory_bytes") or kv_cache_memory_bytes)
219-
220-
# When kv_cache_memory_bytes is set, omit --gpu-memory-utilization and let
221-
# vLLM default to 0.9. kv_cache_memory_bytes controls the KV pool size
222-
# directly; adding gpu_memory_utilization=0.1 caps total VRAM to 10% which
223-
# prevents the model weights from loading at all.
224-
# An explicit per-model override takes precedence.
225260
explicit_gmu = plan.get("gpu_memory_utilization")
226261

227262
cmd = [
@@ -250,8 +285,23 @@ def spawn_vllm(
250285
cmd.append("--enforce-eager")
251286
if disable_custom_all_reduce:
252287
cmd.append("--disable-custom-all-reduce")
253-
# disable_nccl_p2p is applied via NCCL_P2P_DISABLE env var below (not a vLLM CLI flag)
254288
cmd.extend(extra_args)
289+
return cmd
290+
291+
292+
def spawn_vllm(
293+
plan: dict[str, Any],
294+
vllm_binary: str,
295+
host: str,
296+
port: int,
297+
log_path: Path,
298+
kv_cache_memory_bytes: str,
299+
) -> tuple[subprocess.Popen[str], list[str]]:
300+
"""Spawn vLLM and return ``(process, cmd_list)``."""
301+
tp = int(plan.get("tensor_parallel_size", 1))
302+
disable_nccl_p2p = bool(plan.get("disable_nccl_p2p", False))
303+
304+
cmd = _build_vllm_cmd(plan, vllm_binary, host, port, kv_cache_memory_bytes)
255305

256306
env = os.environ.copy()
257307
env["VLLM_SERVER_DEV_MODE"] = "1"
@@ -284,7 +334,7 @@ def spawn_vllm(
284334

285335
logger.info(" Spawned PID=%d log=%s", proc.pid, log_path)
286336
logger.info(" Command: %s", " ".join(cmd))
287-
return proc
337+
return proc, cmd
288338

289339

290340
def _kill_stale_vllm_workers() -> None:
@@ -554,13 +604,28 @@ def calibrate_model(
554604
kv_cache_sent_mb, _KV_CACHE_MIN_STEP_MB,
555605
)
556606

607+
failed_path = log_dir / _FAILED_COMMANDS_FILE
608+
failed_commands = _load_failed_commands(failed_path)
609+
557610
def _try_start(kv_mb: float) -> subprocess.Popen[str] | None:
558611
"""Try to start vLLM with the given KV cache. Returns the
559612
running process on success, ``None`` on failure (process is
560-
cleaned up)."""
613+
cleaned up). Blacklisted commands are skipped immediately."""
561614
kv_str = _format_kv_mb(kv_mb)
562-
proc = spawn_vllm(
563-
{**plan, "kv_cache_memory_bytes": kv_str},
615+
planned = {**plan, "kv_cache_memory_bytes": kv_str}
616+
# Check the blacklist *before* spawning to avoid wasting time.
617+
fingerprint = _cmd_fingerprint(
618+
_build_vllm_cmd(planned, vllm_binary, host, port, kv_str)
619+
)
620+
if fingerprint in failed_commands:
621+
logger.warning(
622+
" SKIP kv_cache=%s — command previously failed "
623+
"(remove line from %s to retry)",
624+
kv_str, failed_path,
625+
)
626+
return None
627+
proc, _ = spawn_vllm(
628+
planned,
564629
vllm_binary, host, port, log_path,
565630
kv_cache_memory_bytes=kv_str,
566631
)
@@ -585,6 +650,8 @@ def _try_start(kv_mb: float) -> subprocess.Popen[str] | None:
585650
logger.warning(" -- vLLM log tail --\n%s", log_tail)
586651
stop_vllm(proc)
587652
time.sleep(_VRAM_SETTLE_S)
653+
_record_failed_command(failed_path, fingerprint)
654+
failed_commands.add(fingerprint)
588655
return None
589656

590657
proc: subprocess.Popen[str] | None = None

logos/logos-workernode/logos_worker_node/logos_bridge.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -490,11 +490,15 @@ async def _execute_command(self, action: str, params: dict[str, Any]) -> dict[st
490490
raise ValueError(f"Unsupported bridge command '{action}'")
491491

492492
@staticmethod
493-
def _lane_target_url(lane_status: dict[str, Any], payload: dict[str, Any] | None = None) -> str:
494-
# Detect embeddings requests from payload shape: has "input" but not "messages".
495-
# vLLM exposes embeddings at /v1/embeddings, not /v1/chat/completions.
496-
if payload and "input" in payload and "messages" not in payload:
497-
endpoint = "v1/embeddings"
493+
def _lane_target_url(
494+
lane_status: dict[str, Any],
495+
payload: dict[str, Any] | None = None,
496+
request_path: str | None = None,
497+
) -> str:
498+
# If the caller forwarded the original API path (e.g. "v1/embeddings",
499+
# "v2/embed", "tokenize"), use it directly so vLLM decides what it supports.
500+
if request_path:
501+
endpoint = request_path.strip("/")
498502
else:
499503
endpoint = str(lane_status.get("inference_endpoint") or "/v1/chat/completions").lstrip("/")
500504
return f"http://127.0.0.1:{lane_status['port']}/{endpoint}"
@@ -515,7 +519,8 @@ async def _execute_infer_command(self, params: dict[str, Any]) -> dict[str, Any]
515519
raise ValueError("payload must be an object")
516520

517521
lane_status = await self._resolve_lane_for_infer(lane_id)
518-
target_url = self._lane_target_url(lane_status, payload)
522+
request_path = params.get("request_path")
523+
target_url = self._lane_target_url(lane_status, payload, request_path=request_path)
519524

520525
await lane_manager.increment_active_requests(lane_id)
521526
try:
@@ -547,7 +552,8 @@ async def _execute_stream_command(self, ws, cmd_id: str, params: dict[str, Any])
547552

548553
try:
549554
lane_status = await self._resolve_lane_for_infer(lane_id)
550-
target_url = self._lane_target_url(lane_status, payload)
555+
request_path = params.get("request_path")
556+
target_url = self._lane_target_url(lane_status, payload, request_path=request_path)
551557
except Exception as exc: # noqa: BLE001
552558
await self._send_json(ws, {"type": "stream_end", "cmd_id": cmd_id, "success": False, "error": str(exc)})
553559
return

logos/logos-workernode/logos_worker_node/main.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,14 @@ async def _auto_calibrate_if_needed(
5858
elif profile.sleeping_residual_mb is None:
5959
reason = "sleeping_residual_mb is null"
6060
elif (
61-
profile.residency_source in ("calibrated", "measured")
61+
profile.residency_source == "calibrated"
6262
and profile.loaded_vram_mb is not None
6363
and abs(profile.base_residency_mb - profile.loaded_vram_mb) > 1.0
6464
):
65-
# Old-format profile: base_residency was stored as weights-only.
66-
# New format stores full loaded VRAM. Force recalibration.
65+
# Old-format calibrated profile: base_residency was stored as
66+
# weights-only. New format stores full loaded VRAM. Force recalibration.
67+
# Note: "measured" profiles intentionally differ (base=weights-only,
68+
# loaded=weights+KV) and must NOT be flagged as stale.
6769
reason = f"stale format (base={profile.base_residency_mb:.0f} != loaded={profile.loaded_vram_mb:.0f})"
6870
if reason:
6971
logger.info(" %s needs calibration: %s", model_name, reason)

logos/src/logos/capacity/capacity_planner.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ def _log_cluster_summary(self, provider_ids: List[int]) -> None:
289289
snap = self._registry.peek_runtime_snapshot(pid) if self._registry else None
290290
if snap is None:
291291
name = self._facade.get_provider_name(pid) or "?"
292-
lines.append(f"{paint('⊘', RED)} provider={pid} worker={paint(name, BOLD)} {paint('offline', DIM)}")
292+
lines.append(f"{paint('⊘', RED)} provider={paint(name, BOLD)} {paint('offline', DIM)}")
293293
continue
294294

295295
connected += 1
@@ -311,7 +311,7 @@ def _log_cluster_summary(self, provider_ids: List[int]) -> None:
311311
worker_color = GREEN if heartbeat_age_s <= 15 else YELLOW if heartbeat_age_s <= 30 else RED
312312

313313
lines.append(
314-
f"{paint('●', worker_color)} provider={pid} worker={paint(str(worker_id), BOLD)} "
314+
f"{paint('●', worker_color)} provider={paint(str(worker_id), BOLD)} "
315315
f"status={paint('active', worker_color)} hb={heartbeat_age_s:.0f}s "
316316
f"vram={paint(f'{total_vram - free_vram:.0f}/{total_vram:.0f}MB', BOLD)} ({used_pct:.0f}%)"
317317
)
@@ -442,9 +442,10 @@ def _log_action_plan(self, actions: list[CapacityPlanAction]) -> None:
442442
}
443443
for action in actions:
444444
color = action_colors.get(action.action, CYAN)
445+
pname = self._facade.get_provider_name(action.provider_id) or str(action.provider_id)
445446
lines.append(
446447
f"{paint('→', color)} {paint(action.action, color, BOLD)} "
447-
f"provider={action.provider_id} lane={action.lane_id}"
448+
f"provider={pname} lane={action.lane_id}"
448449
)
449450
lines.extend(wrap_plain(f"model: {action.model_name}", indent=" "))
450451
lines.extend(wrap_plain(f"reason: {action.reason}", indent=" "))
@@ -565,7 +566,7 @@ async def _cold_load_for_request(
565566
profile = self._safe_get_profiles(provider_id).get(model_name)
566567
capacity = self._safe_get_capacity(provider_id)
567568
if capacity is None:
568-
logger.debug("No capacity info for provider %s, cannot cold-load %s", provider_id, model_name)
569+
logger.debug("No capacity info for provider %s, cannot cold-load %s", self._facade.get_provider_name(provider_id) or provider_id, model_name)
569570
return None
570571

571572
# No early feasibility bail-out here — the reclaim loop below will
@@ -662,11 +663,11 @@ async def _cold_load_for_request(
662663
if not ok:
663664
logger.info(
664665
"Cannot reclaim enough VRAM for cold load of %s on provider %s",
665-
model_name, provider_id,
666+
model_name, self._facade.get_provider_name(provider_id) or provider_id,
666667
)
667668
return None
668669

669-
logger.info("Cold-loading %s on provider %s (lane=%s)", model_name, provider_id, lane_id)
670+
logger.info("Cold-loading %s on provider %s (lane=%s)", model_name, self._facade.get_provider_name(provider_id) or provider_id, lane_id)
670671
async with self._lane_lock(provider_id, lane_id):
671672
loaded = await self._execute_action_with_confirmation(
672673
load_action, timeout_seconds=max(timeout_seconds, 180.0),
@@ -1725,12 +1726,12 @@ def _compute_preemptive_sleep_actions(
17251726
if candidates:
17261727
logger.info(
17271728
"Preemptive sleep candidates for provider=%s: %s",
1728-
provider_id,
1729+
self._facade.get_provider_name(provider_id) or provider_id,
17291730
", ".join(f"{name}(demand={score:.2f}, residual={profile.sleeping_residual_mb:.0f}MB)"
17301731
for score, name, profile in candidates),
17311732
)
17321733
else:
1733-
logger.info("Preemptive sleep: no candidates for provider=%s (no stopped models with known residual and demand>0)", provider_id)
1734+
logger.info("Preemptive sleep: no candidates for provider=%s (no stopped models with known residual and demand>0)", self._facade.get_provider_name(provider_id) or provider_id)
17341735

17351736
now = time.time()
17361737

@@ -2779,7 +2780,7 @@ def _validate_vram_budget(
27792780
- self.get_pending_vram_mb(provider_id)
27802781
)
27812782
except Exception:
2782-
logger.debug("Cannot check VRAM for provider %s, rejecting %s", provider_id, action.action)
2783+
logger.debug("Cannot check VRAM for provider %s, rejecting %s", self._facade.get_provider_name(provider_id) or provider_id, action.action)
27832784
continue
27842785

27852786
try:

0 commit comments

Comments
 (0)