Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions src/installer/agent-cron.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ ${workPrompt}
Reply with a short summary of what you spawned.`;
}

export async function setupAgentCrons(workflow: WorkflowSpec): Promise<void> {
export async function setupAgentCrons(workflow: WorkflowSpec, onlyAgentIds?: Set<string>): Promise<void> {
const agents = workflow.agents;
// Allow per-workflow cron interval via cron.interval_ms in workflow.yml
const everyMs = (workflow as any).cron?.interval_ms ?? DEFAULT_EVERY_MS;
Expand All @@ -168,6 +168,8 @@ export async function setupAgentCrons(workflow: WorkflowSpec): Promise<void> {

for (let i = 0; i < agents.length; i++) {
const agent = agents[i];
// Skip agents that already have crons when creating only missing ones
if (onlyAgentIds && !onlyAgentIds.has(agent.id)) continue;
const anchorMs = i * 60_000; // stagger by 1 minute each
const cronName = `antfarm/${workflow.id}/${agent.id}`;
const agentId = `${workflow.id}_${agent.id}`;
Expand Down Expand Up @@ -214,29 +216,40 @@ function countActiveRuns(workflowId: string): number {
}

/**
* Check if crons already exist for a workflow.
* Return the set of agent IDs that already have crons for a workflow.
* Cron names follow the pattern: antfarm/{workflowId}/{agentId}
*/
async function workflowCronsExist(workflowId: string): Promise<boolean> {
async function getExistingCronAgentIds(workflowId: string): Promise<Set<string>> {
const result = await listCronJobs();
if (!result.ok || !result.jobs) return false;
if (!result.ok || !result.jobs) return new Set();
const prefix = `antfarm/${workflowId}/`;
return result.jobs.some((j) => j.name.startsWith(prefix));
const ids = new Set<string>();
for (const job of result.jobs) {
if (job.name.startsWith(prefix)) {
ids.add(job.name.slice(prefix.length));
}
}
return ids;
}

/**
* Start crons for a workflow when a run begins.
* No-ops if crons already exist (another run of the same workflow is active).
* Checks that ALL expected agent crons exist and creates only missing ones.
*/
export async function ensureWorkflowCrons(workflow: WorkflowSpec): Promise<void> {
if (await workflowCronsExist(workflow.id)) return;
const existingAgentIds = await getExistingCronAgentIds(workflow.id);
const expectedAgentIds = workflow.agents.map((a) => a.id);
const missingAgentIds = expectedAgentIds.filter((id) => !existingAgentIds.has(id));

if (missingAgentIds.length === 0) return;

// Preflight: verify cron tool is accessible before attempting to create jobs
const preflight = await checkCronToolAvailable();
if (!preflight.ok) {
throw new Error(preflight.error!);
}

await setupAgentCrons(workflow);
await setupAgentCrons(workflow, new Set(missingAgentIds));
}

/**
Expand Down