Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
312 changes: 312 additions & 0 deletions landing/index.html.bak

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions src/cli/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ try {
import { installWorkflow } from "../installer/install.js";
import { uninstallAllWorkflows, uninstallWorkflow, checkActiveRuns } from "../installer/uninstall.js";
import { getWorkflowStatus, listRuns, stopWorkflow } from "../installer/status.js";
import { runWorkflow } from "../installer/run.js";
import { runWorkflow, dryRunWorkflow } from "../installer/run.js";
import { listBundledWorkflows } from "../installer/workflow-fetch.js";
import { readRecentLogs } from "../lib/logger.js";
import { getRecentEvents, getRunEvents, type AntfarmEvent } from "../installer/events.js";
Expand Down Expand Up @@ -93,7 +93,7 @@ function printUsage() {
"antfarm workflow install <name> Install a workflow",
"antfarm workflow uninstall <name> Uninstall a workflow (blocked if runs active)",
"antfarm workflow uninstall --all Uninstall all workflows (--force to override)",
"antfarm workflow run <name> <task> Start a workflow run",
"antfarm workflow run <name> <task> Start a workflow run (--dry-run to validate only)",
"antfarm workflow status <query> Check run status (task substring, run ID prefix)",
"antfarm workflow runs List all workflow runs",
"antfarm workflow resume <run-id> Resume a failed run from where it left off",
Expand Down Expand Up @@ -671,13 +671,23 @@ async function main() {

if (action === "run") {
let notifyUrl: string | undefined;
let dryRun = false;
const runArgs = args.slice(3);
const nuIdx = runArgs.indexOf("--notify-url");
if (nuIdx !== -1) {
notifyUrl = runArgs[nuIdx + 1];
runArgs.splice(nuIdx, 2);
}
const drIdx = runArgs.indexOf("--dry-run");
if (drIdx !== -1) {
dryRun = true;
runArgs.splice(drIdx, 1);
}
const taskTitle = runArgs.join(" ").trim();
if (dryRun) {
await dryRunWorkflow({ workflowId: target, taskTitle });
return;
}
if (!taskTitle) { process.stderr.write("Missing task title.\n"); printUsage(); process.exit(1); }
const run = await runWorkflow({ workflowId: target, taskTitle, notifyUrl });
process.stdout.write(
Expand Down
3 changes: 3 additions & 0 deletions src/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ function migrate(db: DatabaseSync): void {
if (!colNames.has("abandoned_count")) {
db.exec("ALTER TABLE steps ADD COLUMN abandoned_count INTEGER DEFAULT 0");
}
if (!colNames.has("session_key")) {
db.exec("ALTER TABLE steps ADD COLUMN session_key TEXT");
}

// Add columns to runs table for backwards compat
const runCols = db.prepare("PRAGMA table_info(runs)").all() as Array<{ name: string }>;
Expand Down
53 changes: 10 additions & 43 deletions src/installer/agent-cron.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { createAgentCronJob, deleteAgentCronJobs, listCronJobs, checkCronToolAva
import type { WorkflowSpec } from "./types.js";
import { resolveAntfarmCli } from "./paths.js";
import { getDb } from "../db.js";
import { readOpenClawConfig } from "./openclaw-config.js";

const DEFAULT_EVERY_MS = 300_000; // 5 minutes
const DEFAULT_AGENT_TIMEOUT_SECONDS = 30 * 60; // 30 minutes
Expand Down Expand Up @@ -89,46 +88,13 @@ The workflow cannot advance until you report. Your session ending without report
}

const DEFAULT_POLLING_TIMEOUT_SECONDS = 120;
const DEFAULT_POLLING_MODEL = "default";

function extractModel(value: unknown): string | undefined {
if (!value) return undefined;
if (typeof value === "string") return value;
if (typeof value === "object" && value !== null) {
const primary = (value as { primary?: unknown }).primary;
if (typeof primary === "string") return primary;
}
return undefined;
}

async function resolveAgentCronModel(agentId: string, requestedModel?: string): Promise<string | undefined> {
if (requestedModel && requestedModel !== "default") {
return requestedModel;
}

try {
const { config } = await readOpenClawConfig();
const agents = config.agents?.list;
if (Array.isArray(agents)) {
const entry = agents.find((a: any) => a?.id === agentId);
const configured = extractModel(entry?.model);
if (configured) return configured;
}

const defaults = config.agents?.defaults;
const fallback = extractModel(defaults?.model);
if (fallback) return fallback;
} catch {
// best-effort — fallback below
}

return requestedModel;
}
const DEFAULT_POLLING_MODEL = "minimax/MiniMax-M2.5";

export function buildPollingPrompt(workflowId: string, agentId: string, workModel?: string): string {
export function buildPollingPrompt(workflowId: string, agentId: string, workModel?: string, workTimeoutSeconds?: number): string {
const fullAgentId = `${workflowId}_${agentId}`;
const cli = resolveAntfarmCli();
const model = workModel ?? "default";
const model = workModel ?? "minimax/MiniMax-M2.5";
const timeoutSec = workTimeoutSeconds ?? DEFAULT_AGENT_TIMEOUT_SECONDS;
const workPrompt = buildWorkPrompt(workflowId, agentId);

return `Step 1 — Quick check for pending work (lightweight, no side effects):
Expand All @@ -147,6 +113,7 @@ If JSON is returned, parse it to extract stepId, runId, and input fields.
Then call sessions_spawn with these parameters:
- agentId: "${fullAgentId}"
- model: "${model}"
- runTimeoutSeconds: ${timeoutSec}
- task: The full work prompt below, followed by "\\n\\nCLAIMED STEP JSON:\\n" and the exact JSON output from step claim.

Full work prompt to include in the spawned task:
Expand All @@ -173,11 +140,11 @@ export async function setupAgentCrons(workflow: WorkflowSpec): Promise<void> {
const agentId = `${workflow.id}_${agent.id}`;

// Two-phase: Phase 1 uses cheap polling model + minimal prompt
const requestedPollingModel = agent.pollingModel ?? workflowPollingModel;
const pollingModel = await resolveAgentCronModel(agentId, requestedPollingModel);
const requestedWorkModel = agent.model ?? workflowPollingModel;
const workModel = await resolveAgentCronModel(agentId, requestedWorkModel);
const prompt = buildPollingPrompt(workflow.id, agent.id, workModel);
const pollingModel = agent.pollingModel ?? workflowPollingModel;
const workModel = agent.model; // Phase 2 model (passed to sessions_spawn via prompt)
// Work agent timeout: per-agent > workflow default > library default (30 min)
const workTimeoutSeconds = agent.timeoutSeconds ?? DEFAULT_AGENT_TIMEOUT_SECONDS;
const prompt = buildPollingPrompt(workflow.id, agent.id, workModel, workTimeoutSeconds);
const timeoutSeconds = workflowPollingTimeout;

const result = await createAgentCronJob({
Expand Down
135 changes: 131 additions & 4 deletions src/installer/gateway-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ export async function checkCronToolAvailable(): Promise<{ ok: boolean; error?: s
}
}

export async function listCronJobs(): Promise<{ ok: boolean; jobs?: Array<{ id: string; name: string }>; error?: string }> {
export async function listCronJobs(): Promise<{ ok: boolean; jobs?: Array<{ id: string; name: string; lastStatus?: string; consecutiveErrors?: number; enabled?: boolean }>; error?: string }> {
// --- Try HTTP first ---
const httpResult = await listCronJobsHTTP();
if (httpResult !== null) return httpResult;
Expand All @@ -273,7 +273,7 @@ export async function listCronJobs(): Promise<{ ok: boolean; jobs?: Array<{ id:
}

/** HTTP-only list. Returns null on 404/network error. */
async function listCronJobsHTTP(): Promise<{ ok: boolean; jobs?: Array<{ id: string; name: string }>; error?: string } | null> {
async function listCronJobsHTTP(): Promise<{ ok: boolean; jobs?: Array<{ id: string; name: string; lastStatus?: string; consecutiveErrors?: number; enabled?: boolean }>; error?: string } | null> {
const gateway = await getGatewayConfig();
try {
const headers: Record<string, string> = { "Content-Type": "application/json" };
Expand All @@ -296,12 +296,18 @@ async function listCronJobsHTTP(): Promise<{ ok: boolean; jobs?: Array<{ id: str
return { ok: false, error: result.error?.message ?? "Unknown error" };
}

let jobs: Array<{ id: string; name: string }> = [];
let jobs: Array<{ id: string; name: string; lastStatus?: string; consecutiveErrors?: number; enabled?: boolean }> = [];
const content = result.result?.content;
if (Array.isArray(content) && content[0]?.text) {
try {
const parsed = JSON.parse(content[0].text);
jobs = parsed.jobs ?? [];
jobs = (parsed.jobs ?? []).map((j: any) => ({
id: j.id,
name: j.name,
lastStatus: j.state?.lastStatus,
consecutiveErrors: j.state?.consecutiveErrors,
enabled: j.enabled,
}));
} catch { /* fallback */ }
}
if (jobs.length === 0) {
Expand Down Expand Up @@ -364,6 +370,49 @@ export async function deleteAgentCronJobs(namePrefix: string): Promise<void> {
}
}

/**
* Disable a cron job by ID (circuit breaker action).
*/
export async function disableCronJob(jobId: string): Promise<{ ok: boolean; error?: string }> {
// --- Try HTTP first ---
const httpResult = await disableCronJobHTTP(jobId);
if (httpResult !== null) return httpResult;

// --- CLI fallback ---
try {
await runCli(["cron", "disable", jobId, "--json"]);
return { ok: true };
} catch (err) {
return { ok: false, error: `CLI fallback failed: ${err}. ${UPDATE_HINT}` };
}
}

/** HTTP-only disable. Returns null on 404/network error. */
async function disableCronJobHTTP(jobId: string): Promise<{ ok: boolean; error?: string } | null> {
const gateway = await getGatewayConfig();
try {
const headers: Record<string, string> = { "Content-Type": "application/json" };
if (gateway.secret) headers["Authorization"] = `Bearer ${gateway.secret}`;

const response = await fetch(`${gateway.url}/tools/invoke`, {
method: "POST",
headers,
body: JSON.stringify({ tool: "cron", args: { action: "disable", id: jobId }, sessionKey: "agent:main:main" }),
});

if (isTransientGatewayFailure(response.status)) return null;

if (!response.ok) {
return { ok: false, error: `Gateway returned ${response.status}` };
}

const result = await response.json();
return result.ok ? { ok: true } : { ok: false, error: result.error?.message ?? "Unknown error" };
} catch {
return null;
}
}

export async function sendSessionMessage(params: { sessionKey: string; message: string }): Promise<{ ok: boolean; error?: string }> {
const payload = {
tool: "sessions_send",
Expand Down Expand Up @@ -420,3 +469,81 @@ export async function sendSessionMessage(params: { sessionKey: string; message:
return { ok: false, error: `CLI fallback failed: ${err}. ${UPDATE_HINT}` };
}
}

/**
* Kill a gateway session by session key.
* Sends a termination message to the session to gracefully shut it down.
*/
export async function killSession(sessionKey: string): Promise<{ ok: boolean; error?: string }> {
const gateway = await getGatewayConfig();

// Try HTTP first - use the gateway call to send a kill message
try {
const headers: Record<string, string> = { "Content-Type": "application/json" };
if (gateway.secret) headers["Authorization"] = `Bearer ${gateway.secret}`;

// Try calling the sessions API to kill the session
const response = await fetch(`${gateway.url}/tools/invoke`, {
method: "POST",
headers,
body: JSON.stringify({
tool: "sessions_send",
args: {
action: "kill",
sessionKey: sessionKey,
},
sessionKey: "agent:main:main",
}),
});

if (response.ok) {
const result = await response.json();
if (result.ok) return { ok: true };
// If the tool doesn't exist or failed, try alternative approach
}

// If the above didn't work, try a different approach - send a termination signal
const terminateResponse = await fetch(`${gateway.url}/tools/invoke`, {
method: "POST",
headers,
body: JSON.stringify({
tool: "exec",
args: {
command: `openclaw sessions kill ${sessionKey}`,
},
sessionKey: "agent:main:main",
}),
});

if (terminateResponse.ok) {
return { ok: true };
}
} catch {
// Fall through to CLI fallback
}

// --- Fallback to CLI ---
try {
// Try to kill the session via CLI
await runCli(["sessions", "kill", sessionKey, "--json"]);
return { ok: true };
} catch {
// sessions kill might not be a valid command, try using message to signal exit
try {
await runCli([
"tool",
"run",
"--tool",
"sessions_send",
"--session",
sessionKey,
"--json",
"--message",
"SESSION_KILL_REQUESTED: This session has been terminated by antfarm. Please stop immediately.",
]);
return { ok: true };
} catch (err) {
return { ok: false, error: `Failed to kill session: ${err}` };
}
}
}
Loading