-
Notifications
You must be signed in to change notification settings - Fork 0
Description
Part of: #455
Story: Collaborative and Competitive Delegation Modes for execute_open_delegation
As a power user or admin using MCP delegation
I want to submit collaborative (DAG-based multi-step) and competitive (parallel competing agents) delegation jobs
So that I can leverage Claude Server's orchestrated and competitive execution pipelines for complex coding objectives that benefit from structured decomposition or multi-engine comparison
Status: BLOCKED -- Depends on Claude Server merging feature/competitive-decomposer-pipeline branch. Do not implement until that branch is merged and deployed.
Implementation Status:
-
ClaudeServerClient.create_orchestrated_job()method (POST /jobs/orchestrated) -
ClaudeServerClient.create_competitive_job()method (POST /jobs/competitive) -
_validate_open_delegation_paramsupdated for mode-specific parameter validation -
_submit_collaborative_delegation_jobhelper function in handlers.py -
_submit_competitive_delegation_jobhelper function in handlers.py -
handle_execute_open_delegationmode routing updated (remove error returns, route to new helpers) -
execute_open_delegationtool schema updated with mode-specific optional parameters (tools.py TOOL_REGISTRY) -
execute_open_delegation.mdtool doc updated (remove "not yet supported" notes) - Unit tests for new client methods (create_orchestrated_job, create_competitive_job)
- Unit tests for mode-specific validation (collaborative steps, competitive engines)
- Unit tests for handler mode routing (collaborative/competitive/single dispatch)
- E2E manual testing completed by Claude Code
Completion: 0/12 tasks complete (0%)
Algorithm
=== ClaudeServerClient (new methods) ===
ClaudeServerClient.create_orchestrated_job(prompt, repositories, steps, engine, model, timeout, wave_timeout):
"""Create a DAG-based orchestrated job for collaborative mode."""
options = {"agentEngine": engine}
IF model IS NOT None:
options["model"] = model
IF timeout IS NOT None:
options["timeout"] = timeout
IF wave_timeout IS NOT None:
options["waveTimeout"] = wave_timeout
steps_dto = []
FOR each step IN steps:
step_dto = {
"name": step["name"],
"prompt": step["prompt"],
"dependencies": step.get("dependencies", [])
}
APPEND step_dto TO steps_dto
json_data = {
"Prompt": prompt,
"Repositories": repositories,
"Steps": steps_dto,
"Options": options
}
response = _make_authenticated_request("POST", "/jobs/orchestrated", json_data)
IF response.status IN (200, 201): RETURN response.json()
ELIF response.status >= 500: RAISE ClaudeServerError("server error")
ELSE: RAISE ClaudeServerError("job creation failed")
ClaudeServerClient.create_competitive_job(prompt, repositories, engines, distribution_strategy, min_success_threshold, model, timeout):
"""Create a competitive job with decompose-compete-judge pipeline."""
options = {}
IF model IS NOT None:
options["model"] = model
IF timeout IS NOT None:
options["timeout"] = timeout
json_data = {
"Prompt": prompt,
"Repositories": repositories,
"Engines": engines,
"DistributionStrategy": distribution_strategy,
"MinSuccessThreshold": min_success_threshold,
"Options": options
}
response = _make_authenticated_request("POST", "/jobs/competitive", json_data)
IF response.status IN (200, 201): RETURN response.json()
ELIF response.status >= 500: RAISE ClaudeServerError("server error")
ELSE: RAISE ClaudeServerError("job creation failed")
=== Validation (updated _validate_open_delegation_params) ===
_validate_open_delegation_params(args):
# Existing validation for prompt, repositories, engine (UNCHANGED)
prompt = args.get("prompt")
IF NOT prompt: RETURN error "Missing required parameter: prompt"
repositories = args.get("repositories")
IF NOT repositories: RETURN error "Missing required parameter: repositories"
engine = args.get("engine", "claude-code")
IF engine NOT IN _VALID_DELEGATION_ENGINES: RETURN error "Invalid engine"
mode = args.get("mode", "single")
IF mode NOT IN _VALID_DELEGATION_MODES: RETURN error "Invalid mode"
# REMOVED: the early-return error block for collaborative/competitive
# (was: IF mode IN ("collaborative", "competitive"): RETURN error "not yet supported")
# NEW: collaborative mode validation
IF mode == "collaborative":
steps = args.get("steps")
IF steps IS None OR NOT isinstance(steps, list) OR len(steps) == 0:
RETURN error "collaborative mode requires 'steps' with at least one step"
step_names = SET()
FOR step IN steps:
IF "name" NOT IN step OR "prompt" NOT IN step:
RETURN error "each step requires 'name' and 'prompt' fields"
IF step["name"] IN step_names:
RETURN error "duplicate step name: {step['name']}"
ADD step["name"] TO step_names
# Validate dependency references after collecting all names
FOR step IN steps:
FOR dep IN step.get("dependencies", []):
IF dep NOT IN step_names:
RETURN error "step '{step['name']}' references unknown dependency '{dep}'"
IF dep == step["name"]:
RETURN error "step '{step['name']}' cannot depend on itself"
wave_timeout = args.get("wave_timeout")
IF wave_timeout IS NOT None AND wave_timeout <= 0:
RETURN error "wave_timeout must be a positive integer"
# NEW: competitive mode validation
IF mode == "competitive":
engines = args.get("engines")
IF engines IS None OR NOT isinstance(engines, list):
RETURN error "competitive mode requires 'engines' parameter"
IF len(engines) < 2:
RETURN error "competitive mode requires at least two engines"
FOR eng IN engines:
IF eng NOT IN _VALID_DELEGATION_ENGINES:
RETURN error "invalid engine '{eng}' in engines list"
distribution_strategy = args.get("distribution_strategy", "round-robin")
IF distribution_strategy NOT IN ("round-robin", "decomposer-decides"):
RETURN error "invalid distribution_strategy; must be 'round-robin' or 'decomposer-decides'"
min_thresh = args.get("min_success_threshold")
IF min_thresh IS NOT None:
IF min_thresh < 1 OR min_thresh > len(engines):
RETURN error "min_success_threshold must be between 1 and number of engines"
RETURN None # all valid
=== Handler Mode Routing (updated handle_execute_open_delegation) ===
handle_execute_open_delegation(args, user, session_state):
# ... existing: config check, permission check, validation, guardrails, prompt assembly ...
# ... existing: engine/mode resolution from args or config defaults ...
# (ALL UNCHANGED up to the point where client context manager opens)
async with ClaudeServerClient(...) as client:
IF mode == "single":
result = await _submit_open_delegation_job(
client, prompt, repositories, engine, model, timeout, repo_ready_timeout
)
ELIF mode == "collaborative":
result = await _submit_collaborative_delegation_job(
client, prompt, repositories, steps=args["steps"],
engine=engine, model=args.get("model"), job_timeout=args.get("timeout"),
wave_timeout=args.get("wave_timeout"), repo_ready_timeout=repo_ready_timeout
)
ELIF mode == "competitive":
result = await _submit_competitive_delegation_job(
client, prompt, repositories, engines=args["engines"],
distribution_strategy=args.get("distribution_strategy", "round-robin"),
min_success_threshold=args.get("min_success_threshold", 1),
model=args.get("model"), job_timeout=args.get("timeout"),
repo_ready_timeout=repo_ready_timeout
)
# ... existing: audit logging (mode already captured), error handling ...
# (ALL UNCHANGED)
=== New Helper: _submit_collaborative_delegation_job ===
_submit_collaborative_delegation_job(client, prompt, repositories, steps, engine, model, job_timeout, wave_timeout, repo_ready_timeout):
"""Check repo readiness, create orchestrated job, register callback, start."""
# Repo readiness check (same pattern as _submit_open_delegation_job)
golden_repo_manager = getattr(app_module, "golden_repo_manager", None)
FOR alias IN repositories:
git_url, branch = _lookup_repo_git_info(golden_repo_manager, alias)
ready = await client.wait_for_repo_ready(alias, repo_ready_timeout, git_url, branch)
IF NOT ready:
RETURN error response "Repository '{alias}' failed to become ready"
job_result = await client.create_orchestrated_job(
prompt, repositories, steps, engine, model, job_timeout, wave_timeout
)
job_id = job_result.get("jobId") OR job_result.get("job_id")
IF NOT job_id:
RETURN error "Job created but no job_id returned"
await _register_open_delegation_callback(client, job_id)
start_result = await client.start_job(job_id)
DelegationJobTracker.get_instance().register_job(job_id, ...)
RETURN success response with job_id, mode="collaborative"
=== New Helper: _submit_competitive_delegation_job ===
_submit_competitive_delegation_job(client, prompt, repositories, engines, distribution_strategy, min_success_threshold, model, job_timeout, repo_ready_timeout):
"""Check repo readiness, create competitive job, register callback, start."""
# Repo readiness check (same pattern)
golden_repo_manager = getattr(app_module, "golden_repo_manager", None)
FOR alias IN repositories:
git_url, branch = _lookup_repo_git_info(golden_repo_manager, alias)
ready = await client.wait_for_repo_ready(alias, repo_ready_timeout, git_url, branch)
IF NOT ready:
RETURN error response "Repository '{alias}' failed to become ready"
job_result = await client.create_competitive_job(
prompt, repositories, engines, distribution_strategy, min_success_threshold, model, job_timeout
)
job_id = job_result.get("jobId") OR job_result.get("job_id")
IF NOT job_id:
RETURN error "Job created but no job_id returned"
await _register_open_delegation_callback(client, job_id)
start_result = await client.start_job(job_id)
DelegationJobTracker.get_instance().register_job(job_id, ...)
RETURN success response with job_id, mode="competitive"
Acceptance Criteria
Scenario 1: Collaborative delegation creates orchestrated job
Given I am an admin with delegate_open permission
When I call execute_open_delegation with mode="collaborative" and a valid steps DAG
Then a job is created via POST /jobs/orchestrated on Claude Server
And the response contains a job_id for async polling
And the job is registered in DelegationJobTracker
And the audit log records mode="collaborative"
Scenario 2: Competitive delegation creates competitive job
Given I am an admin with delegate_open permission
When I call execute_open_delegation with mode="competitive" and engines=["claude-code", "codex"]
Then a job is created via POST /jobs/competitive on Claude Server
And the response contains a job_id for async polling
And the job is registered in DelegationJobTracker
And the audit log records mode="competitive"
Scenario 3: Collaborative mode requires steps parameter
Given I call execute_open_delegation with mode="collaborative" but no steps parameter
When the handler validates parameters
Then it returns an error "collaborative mode requires 'steps' parameter with at least one step"
Scenario 4: Collaborative mode validates step dependency references
Given I call execute_open_delegation with mode="collaborative" and steps containing a dependency referencing a nonexistent step name
When the handler validates parameters
Then it returns an error identifying the unknown step dependency
Scenario 5: Competitive mode requires at least two engines
Given I call execute_open_delegation with mode="competitive" and only one engine in engines list
When the handler validates parameters
Then it returns an error "competitive mode requires at least two engines"
Scenario 6: Competitive mode validates engine names
Given I call execute_open_delegation with mode="competitive" and an invalid engine name in engines
When the handler validates parameters
Then it returns an error identifying the invalid engine
Scenario 7: Competitive mode passes distribution strategy
Given I call execute_open_delegation with mode="competitive" and distribution_strategy="decomposer-decides"
When the job is created
Then the DistributionStrategy field is set to "decomposer-decides" in the POST /jobs/competitive payload
Scenario 8: Competitive mode passes min_success_threshold
Given I call execute_open_delegation with mode="competitive" and min_success_threshold=2
When the job is created
Then the MinSuccessThreshold is set to 2 in the competitive job payload
Scenario 9: Single mode backward compatibility
Given I call execute_open_delegation with mode="single" (or no mode specified)
When the handler processes the request
Then the existing single-mode flow executes unchanged via _submit_open_delegation_job
And no collaborative or competitive code paths are invoked
Scenario 10: Repository readiness for collaborative and competitive modes
Given repositories are not yet cloned on Claude Server
When I submit a collaborative or competitive delegation job
Then the handler waits for repository readiness before creating the job
And returns an error if any repository fails to become ready within timeout
Scenario 11: Guardrails apply to all modes
Given guardrails are enabled in server configuration
When I submit a collaborative or competitive delegation job
Then guardrails text is prepended to the prompt (same as single mode)
And the guardrails repository is appended to the repositories list
Scenario 12: Polling works for collaborative and competitive jobs
Given I call poll_delegation_job with a job_id from a collaborative or competitive job
When the job is still running or completed
Then the existing polling mechanism returns status and results without changesTesting Requirements
Unit Tests (automated, TDD):
-
ClaudeServerClient.create_orchestrated_job():- Builds correct payload structure with Prompt, Repositories, Steps, Options
- Includes agentEngine in Options
- Includes model in Options when provided, omits when None
- Includes timeout in Options when provided, omits when None
- Includes waveTimeout in Options when provided, omits when None
- Calls POST /jobs/orchestrated endpoint
- Returns response JSON on 200/201
- Raises ClaudeServerError on 4xx/5xx
-
ClaudeServerClient.create_competitive_job():- Builds correct payload with Prompt, Repositories, Engines, DistributionStrategy, MinSuccessThreshold, Options
- Passes engines list unchanged
- Passes distribution_strategy as DistributionStrategy field
- Passes min_success_threshold as MinSuccessThreshold field
- Calls POST /jobs/competitive endpoint
- Returns response JSON on 200/201
- Raises ClaudeServerError on 4xx/5xx
-
_validate_open_delegation_params(collaborative mode):- Rejects missing steps parameter
- Rejects empty steps list
- Rejects steps without name field
- Rejects steps without prompt field
- Rejects duplicate step names
- Rejects dependency referencing unknown step name
- Rejects self-referencing dependency
- Rejects non-positive wave_timeout
- Accepts valid steps with dependencies
-
_validate_open_delegation_params(competitive mode):- Rejects missing engines parameter
- Rejects single engine (requires minimum 2)
- Rejects invalid engine name in list
- Rejects invalid distribution_strategy
- Rejects min_success_threshold < 1
- Rejects min_success_threshold > number of engines
- Accepts valid engines with defaults for optional params
-
handle_execute_open_delegationmode routing:- mode="single" calls _submit_open_delegation_job (existing path)
- mode="collaborative" calls _submit_collaborative_delegation_job
- mode="competitive" calls _submit_competitive_delegation_job
- No mode specified defaults to single
-
Backward compatibility:
- Existing single-mode calls produce identical behavior to before changes
E2E Manual Testing (against running CIDX server with Claude Server):
- Submit collaborative job via MCP with a 2-step DAG (step A with no deps, step B depending on A), poll for result
- Submit competitive job via MCP with engines=["claude-code", "codex"], poll for result
- Submit single-mode job to verify unchanged behavior
- Verify audit logs capture mode for all three modes
Technical Notes
Files to Modify
-
src/code_indexer/server/clients/claude_server_client.py-- Addcreate_orchestrated_job()andcreate_competitive_job()methods. Follow the same pattern as existingcreate_job_with_options()(line 482): build payload dict, call_make_authenticated_request("POST", endpoint, json_data), handle 200/201 success and error status codes. -
src/code_indexer/server/mcp/handlers.py-- Three areas of change:_validate_open_delegation_params()(line 13489): Remove the early-return error block at lines 13537-13544 for collaborative/competitive. Add mode-specific validation for steps (collaborative) and engines/distribution_strategy/min_success_threshold (competitive).- New helpers
_submit_collaborative_delegation_job()and_submit_competitive_delegation_job(): Same repo-readiness + create + callback + start + tracker registration pattern as existing_submit_open_delegation_job()(line 13573), but calling the new client methods. handle_execute_open_delegation()(line 13642): Replace the single call to_submit_open_delegation_jobat line 13719 with an IF/ELIF mode dispatch to the appropriate helper.
-
src/code_indexer/server/mcp/tools.py-- Update the TOOL_REGISTRY entry forexecute_open_delegationinputSchema to add optional properties:steps(array of objects),wave_timeout(integer),engines(array of strings),distribution_strategy(string enum),min_success_threshold(integer). -
src/code_indexer/server/mcp/tool_docs/admin/execute_open_delegation.md-- Update inputSchema to include new parameters. Update description to remove "not yet supported" notes from collaborative and competitive modes. Update SUPPORTED MODES, EXECUTION FLOW, and ERRORS sections.
Claude Server API Contracts (from conversation research)
POST /jobs/orchestrated (collaborative mode):
- DAG-based execution with waves of parallel steps
- Steps within the same wave (no inter-dependencies) execute in parallel
- Output from completed steps is injected into dependent step prompts
- Request body:
{ Prompt, Repositories, Steps: [{ name, prompt, dependencies }], Options: { agentEngine, model, timeout, waveTimeout } } - Response:
{ jobId, status, ... }(same job lifecycle as single-mode jobs)
POST /jobs/competitive (competitive mode):
- 3-phase pipeline: decompose (break prompt into approaches) -> compete (parallel execution across engines) -> judge (select best result)
- round-robin: approaches distributed evenly across engines
- decomposer-decides: decomposer assigns specific engines per approach
- Request body:
{ Prompt, Repositories, Engines: [...], DistributionStrategy, MinSuccessThreshold, Options: { model, timeout } } - Response:
{ jobId, status, ... }(same job lifecycle as single-mode jobs)
Existing Infrastructure -- No Changes Needed
| Component | Why Unchanged |
|---|---|
delegate_open permission |
Permission check happens before mode routing; covers all modes |
Guardrails (_resolve_guardrails) |
Prepended to prompt before mode dispatch; applies uniformly |
| Audit logging | Already logs mode field in audit details dict (line 13746) |
_register_open_delegation_callback |
Registers callback by job_id; mode-agnostic |
DelegationJobTracker |
Tracks any job_id regardless of how the job was created |
poll_delegation_job handler |
Polls job status by job_id; works for orchestrated/competitive jobs |
PayloadCache truncation |
Operates on response payload text; mode-agnostic |
New Tool Parameters
| Parameter | Type | Mode | Required | Description |
|---|---|---|---|---|
steps |
array of objects | collaborative | Yes (when mode=collaborative) | DAG definition. Each object: { name: string, prompt: string, dependencies: string[] }. Dependencies reference other step names. |
wave_timeout |
integer | collaborative | No | Optional timeout per execution wave in seconds |
engines |
array of strings | competitive | Yes (when mode=competitive) | List of engines for competing approaches (minimum 2). Values from supported engine enum. |
distribution_strategy |
string | competitive | No | "round-robin" (default) or "decomposer-decides" |
min_success_threshold |
integer | competitive | No | Minimum successful approaches required (default 1, max = number of engines) |
Definition of Done
- All 12 acceptance criteria satisfied
-
90% unit test coverage for new client methods and validation logic
- Unit tests for handler mode routing confirm correct dispatch
- Backward compatibility tests confirm single mode unchanged
- E2E manual testing: submit collaborative job with DAG, poll for result
- E2E manual testing: submit competitive job with multiple engines, poll for result
- E2E manual testing: single mode still works identically
- Code review approved (tdd-engineer + code-reviewer workflow)
- No lint/type errors (ruff, mypy clean)
- Tool doc updated and passes
python3 tools/verify_tool_docs.py fast-automation.shpasses with zero failures- Working software deployable to staging