Skip to content

Commit 4f6cc29

Browse files
committed
worker cache bust
1 parent 0e8841d commit 4f6cc29

File tree

4 files changed

+30
-56
lines changed

4 files changed

+30
-56
lines changed

align_app/adm/decider/tests/test_worker.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,34 @@
11
import multiprocessing as mp
22
from align_app.adm.decider.types import DeciderParams
3+
from align_app.adm.decider.worker import extract_cache_key
4+
5+
6+
class TestExtractCacheKey:
7+
def test_same_config_produces_same_key(self):
8+
config = {"model_name": "test-model", "temperature": 0.7}
9+
key1 = extract_cache_key(config)
10+
key2 = extract_cache_key(config)
11+
assert key1 == key2
12+
13+
def test_different_configs_produce_different_keys(self):
14+
config1 = {"model_name": "test-model", "temperature": 0.7}
15+
config2 = {"model_name": "test-model", "temperature": 0.8}
16+
key1 = extract_cache_key(config1)
17+
key2 = extract_cache_key(config2)
18+
assert key1 != key2
19+
20+
def test_same_model_different_settings_produce_different_keys(self):
21+
config1 = {
22+
"structured_inference_engine": {"model_name": "same-model"},
23+
"setting_a": "value1",
24+
}
25+
config2 = {
26+
"structured_inference_engine": {"model_name": "same-model"},
27+
"setting_a": "value2",
28+
}
29+
key1 = extract_cache_key(config1)
30+
key2 = extract_cache_key(config2)
31+
assert key1 != key2
332

433

534
class TestDeciderWorker:

align_app/adm/decider/worker.py

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,6 @@
1010

1111

1212
def extract_cache_key(resolved_config: Dict[str, Any]) -> str:
13-
engine = resolved_config.get("structured_inference_engine", {})
14-
model_name = engine.get("model_name") if isinstance(engine, dict) else None
15-
if model_name:
16-
return model_name
17-
1813
cache_str = json.dumps(resolved_config, sort_keys=True)
1914
return hashlib.md5(cache_str.encode()).hexdigest()
2015

@@ -31,26 +26,15 @@ def decider_worker_func(task_queue: Queue, result_queue: Queue):
3126
params: DeciderParams = task
3227
cache_key = extract_cache_key(params.resolved_config)
3328

34-
print(f"[WORKER DEBUG] Processing task")
35-
print(f"[WORKER DEBUG] model_cache_key: {cache_key}")
36-
print(f"[WORKER DEBUG] model_cache size: {len(model_cache)}")
37-
print(f"[WORKER DEBUG] model_cache keys: {list(model_cache.keys())}")
38-
3929
if cache_key not in model_cache:
40-
print(f"[WORKER DEBUG] Model cache MISS - instantiating ADM...")
4130
choose_action_func, cleanup_func = instantiate_adm(
4231
params.resolved_config
4332
)
4433
model_cache[cache_key] = (choose_action_func, cleanup_func)
4534
else:
46-
print(f"[WORKER DEBUG] Model cache HIT - reusing model")
4735
choose_action_func, _ = model_cache[cache_key]
4836

49-
print(f"[WORKER DEBUG] Running choose_action...")
5037
result: ADMResult = choose_action_func(params)
51-
print(
52-
f"[WORKER DEBUG] Decision: {result.decision.unstructured[:50]}..."
53-
)
5438
result_queue.put(result)
5539

5640
except (KeyboardInterrupt, SystemExit):

align_app/app/runs_registry.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -68,22 +68,12 @@ def populate_cache_bulk(self, runs: List[Run]) -> None:
6868
async def _execute_with_cache(self, run: Run, probe_choices: List[Dict]) -> Run:
6969
cache_key = run.compute_cache_key()
7070

71-
print(f"[CACHE DEBUG] execute_with_cache for run {run.id}")
72-
print(f"[CACHE DEBUG] decider_name: {run.decider_name}")
73-
print(f"[CACHE DEBUG] cache_key: {cache_key}")
74-
print(
75-
f"[CACHE DEBUG] resolved_config keys: {list(run.decider_params.resolved_config.keys())[:5]}..."
76-
)
77-
print(f"[CACHE DEBUG] cache size: {len(self._runs.decision_cache)}")
78-
7971
cached = runs_core.get_cached_decision(self._runs, cache_key)
8072
if cached:
81-
print(f"[CACHE DEBUG] >>> CACHE HIT! Returning cached decision")
8273
updated_run = run.model_copy(update={"decision": cached})
8374
self._runs = runs_core.add_run(self._runs, updated_run)
8475
return updated_run
8576

86-
print(f"[CACHE DEBUG] Cache miss, fetching new decision...")
8777
decision = await runs_core.fetch_decision(run, probe_choices)
8878
updated_run = run.model_copy(update={"decision": decision})
8979
self._runs = runs_core.add_run(self._runs, updated_run)

align_app/app/runs_state_adapter.py

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -363,17 +363,6 @@ def _create_run_with_edited_config(self, run_id: str) -> Optional[str]:
363363
ui_yaml = self.state.runs[run_id]["prompt"]["resolved_config_yaml"]
364364
new_config = yaml.safe_load(ui_yaml)
365365

366-
print(f"[CONFIG DEBUG] _create_run_with_edited_config")
367-
print(f"[CONFIG DEBUG] old run_id: {run_id}")
368-
print(f"[CONFIG DEBUG] old decider_name: {run.decider_name}")
369-
print(
370-
f"[CONFIG DEBUG] old config keys: {list(run.decider_params.resolved_config.keys())[:5]}"
371-
)
372-
print(f"[CONFIG DEBUG] new config keys: {list(new_config.keys())[:5]}")
373-
print(
374-
f"[CONFIG DEBUG] configs equal: {run.decider_params.resolved_config == new_config}"
375-
)
376-
377366
decider_options = self.decider_registry.get_decider_options(
378367
run.probe_id, run.decider_name
379368
)
@@ -385,8 +374,6 @@ def _create_run_with_edited_config(self, run_id: str) -> Optional[str]:
385374
run.decider_name, new_config, llm_backbones
386375
)
387376

388-
print(f"[CONFIG DEBUG] new decider_name: {new_decider_name}")
389-
390377
updated_params = run.decider_params.model_copy(
391378
update={"resolved_config": new_config}
392379
)
@@ -399,13 +386,6 @@ def _create_run_with_edited_config(self, run_id: str) -> Optional[str]:
399386
"decision": None,
400387
}
401388
)
402-
403-
old_cache_key = run.compute_cache_key()
404-
new_cache_key = new_run.compute_cache_key()
405-
print(f"[CONFIG DEBUG] old cache_key: {old_cache_key}")
406-
print(f"[CONFIG DEBUG] new cache_key: {new_cache_key}")
407-
print(f"[CONFIG DEBUG] cache keys equal: {old_cache_key == new_cache_key}")
408-
409389
self.runs_registry.add_run(new_run)
410390

411391
self.state.runs_to_compare = [
@@ -425,10 +405,7 @@ def _remove_pending_cache_key(self, cache_key: str):
425405
]
426406

427407
async def _execute_run_decision(self, run_id: str):
428-
print(f"[EXEC DEBUG] _execute_run_decision called with run_id: {run_id}")
429-
430408
if self._is_probe_edited(run_id):
431-
print("[EXEC DEBUG] probe was edited, creating new probe...")
432409
new_probe_id = self._create_edited_probe_for_run(run_id)
433410
if not new_probe_id:
434411
return
@@ -444,19 +421,13 @@ async def _execute_run_decision(self, run_id: str):
444421
]
445422
run_id = updated_run.id
446423

447-
is_config_edited = self._is_config_edited(run_id)
448-
print(f"[EXEC DEBUG] is_config_edited: {is_config_edited}")
449-
450-
if is_config_edited:
451-
print("[EXEC DEBUG] config was edited, creating new run...")
424+
if self._is_config_edited(run_id):
452425
edited_run_id = self._create_run_with_edited_config(run_id)
453426
if not edited_run_id:
454427
return
455428
run_id = edited_run_id
456-
print(f"[EXEC DEBUG] new run_id after config edit: {run_id}")
457429

458430
cache_key = self.state.runs.get(run_id, {}).get("cache_key")
459-
print(f"[EXEC DEBUG] cache_key from state: {cache_key}")
460431

461432
with self.state:
462433
self._add_pending_cache_key(cache_key)

0 commit comments

Comments
 (0)