Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/agent-chain-propagation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@funkai/agents": minor
---

Add `AgentChainEntry` type and `agentChain` field to `StepInfo` and `StepFinishEvent` for agent ancestry tracking. Forward `onStepStart`/`onStepFinish` hooks from flow agent `$.agent()` to sub-agents, enabling full observability of nested agent steps from root hooks.
1 change: 1 addition & 0 deletions packages/agents/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
},
"devDependencies": {
"@ai-sdk/devtools": "^0.0.15",
"@funkai/utils": "workspace:*",
"@types/node": "catalog:",
"@vitest/coverage-v8": "catalog:",
"tsdown": "catalog:",
Expand Down
16 changes: 14 additions & 2 deletions packages/agents/src/core/agents/base/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { resolveOutput } from "@/core/agents/base/output.js";
import type { OutputParam, OutputSpec } from "@/core/agents/base/output.js";
import {
buildAITools,
extractAgentChain,
resolveValue,
resolveOptionalValue,
buildPrompt,
Expand All @@ -26,7 +27,7 @@ import { createDefaultLogger } from "@/core/logger.js";
import type { Logger } from "@/core/logger.js";
import type { LanguageModel } from "@/core/provider/types.js";
import type { Tool } from "@/core/tool.js";
import type { Model, StepFinishEvent, StreamPart } from "@/core/types.js";
import type { AgentChainEntry, Model, StepFinishEvent, StreamPart } from "@/core/types.js";
import { fireHooks, wrapHook } from "@/lib/hooks.js";
import { withModelMiddleware } from "@/lib/middleware.js";
import { AGENT_CONFIG, RUNNABLE_META } from "@/lib/runnable.js";
Expand Down Expand Up @@ -215,6 +216,10 @@ export function agent<
const hasTools = Object.keys(mergedTools).length > 0;
const hasAgents = Object.keys(mergedAgents).length > 0;

// Build agent chain: extend incoming chain with this agent's identity
const incomingChain = extractAgentChain(params);
const currentChain: readonly AgentChainEntry[] = [...incomingChain, { id: config.name }];

// Only fixed-type hooks (onStepStart, onStepFinish) are forwarded to
// Sub-agents. Generic hooks (onStart, onFinish, onError) are NOT
// Forwarded because their event types are parameterized by TInput/TOutput
Expand All @@ -226,6 +231,7 @@ export function agent<
log,
onStepStart: params.onStepStart,
onStepFinish: buildMergedHook(log, config.onStepFinish, params.onStepFinish),
agentChain: currentChain,
};

const aiTools = buildAITools(
Expand Down Expand Up @@ -266,7 +272,13 @@ export function agent<
return { toolName: tr.toolName, resultTextLength: safeSerializedLength(result) };
});
const usage = extractUsage(step.usage);
const event: StepFinishEvent = { stepId, toolCalls, toolResults, usage };
const event: StepFinishEvent = {
stepId,
toolCalls,
toolResults,
usage,
agentChain: currentChain,
};
await fireHooks(
log,
wrapHook(config.onStepFinish, event),
Expand Down
71 changes: 66 additions & 5 deletions packages/agents/src/core/agents/base/utils.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { privateField } from "@funkai/utils";
import type { LanguageModelUsage } from "ai";
import { tool } from "ai";
import { isFunction, isNil, isNotNil, isString, omitBy } from "es-toolkit";
Expand All @@ -9,7 +10,7 @@ import type { Agent, Message, Resolver } from "@/core/agents/types.js";
import type { Logger } from "@/core/logger.js";
import type { TokenUsage } from "@/core/provider/types.js";
import type { Tool } from "@/core/tool.js";
import type { StepFinishEvent, StepInfo } from "@/core/types.js";
import type { AgentChainEntry, StepFinishEvent, StepInfo } from "@/core/types.js";
import { RUNNABLE_META } from "@/lib/runnable.js";
import type { RunnableMeta } from "@/lib/runnable.js";

Expand Down Expand Up @@ -68,6 +69,13 @@ export interface ParentAgentContext {
* Uses `StepFinishEvent` — a fixed (non-generic) type, safe to forward.
*/
onStepFinish?: (event: StepFinishEvent) => void | Promise<void>;

/**
* Agent ancestry chain from root to the current agent.
*
* @internal Framework-only — not exposed on public `GenerateParams`.
*/
agentChain?: readonly AgentChainEntry[];
}

/**
Expand Down Expand Up @@ -366,18 +374,27 @@ function buildAgentTool(
// eslint-disable-next-line @typescript-eslint/no-explicit-any -- ToolSet requires `any` values; `unknown` breaks assignability with AI SDK
): ReturnType<typeof tool<any, any>> {
const parentParams = buildParentParams(parentCtx);
let parentChain: readonly AgentChainEntry[] | undefined;
if (isNotNil(parentCtx)) {
parentChain = parentCtx.agentChain;
}

if (isNotNil(meta) && isNotNil(meta.inputSchema)) {
return tool({
description: `Delegate to ${toolName}`,
inputSchema: meta.inputSchema,
execute: async (input, { abortSignal }) => {
const r = await runnable.generate({
const generateParams = {
input,
signal: abortSignal,
tools,
...parentParams,
});
};
// Stamp after spread — Symbol fields don't survive spread
if (isNotNil(parentChain)) {
_agentChainField.set(generateParams, parentChain);
}
const r = await runnable.generate(generateParams);
if (!r.ok) {
throw new Error(r.error.message);
}
Expand All @@ -389,12 +406,16 @@ function buildAgentTool(
description: `Delegate to ${toolName}`,
inputSchema: z.object({ prompt: z.string().describe("The prompt to send") }),
execute: async (input: { prompt: string }, { abortSignal }) => {
const r = await runnable.generate({
const generateParams = {
prompt: input.prompt,
signal: abortSignal,
tools,
...parentParams,
});
};
if (isNotNil(parentChain)) {
_agentChainField.set(generateParams, parentChain);
}
const r = await runnable.generate(generateParams);
if (!r.ok) {
throw new Error(r.error.message);
}
Expand All @@ -403,6 +424,46 @@ function buildAgentTool(
});
}

/**
* Private field for transporting agent ancestry chain through params.
*
* Uses a Symbol key so it is invisible to `Object.keys()`,
* `JSON.stringify()`, `for...in`, and object spread.
*
* @internal
*/
export const _agentChainField = privateField<readonly AgentChainEntry[]>("funkai:agent-chain");

/**
* Shared empty chain — avoids allocating a new `[]` on every
* top-level agent call where no parent chain exists.
*
* @private
*/
const EMPTY_CHAIN: readonly AgentChainEntry[] = [];

/**
* Extract the internal `agentChain` from raw generate params.
*
* Reads the agent chain from a Symbol-keyed private field on
* the params object. Returns an empty array when absent.
*
* @param params - The raw generate params object.
* @returns The agent chain array, or an empty array if absent.
*
* @example
* ```ts
* const chain = extractAgentChain(params);
* // => [{ id: "root" }] or []
* ```
*/
export function extractAgentChain(params: unknown): readonly AgentChainEntry[] {
if (typeof params !== "object" || params === null) {
return EMPTY_CHAIN;
}
return _agentChainField.get(params, EMPTY_CHAIN);
}

/**
* Build the per-call params to forward from parent context to sub-agent.
*
Expand Down
9 changes: 7 additions & 2 deletions packages/agents/src/core/agents/flow/flow-agent.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { AsyncIterableStream } from "ai";
import { isNil, isNotNil } from "es-toolkit";

import { resolveOptionalValue } from "@/core/agents/base/utils.js";
import { extractAgentChain, resolveOptionalValue } from "@/core/agents/base/utils.js";
import {
collectTextFromMessages,
createAssistantMessage,
Expand All @@ -24,7 +24,7 @@ import type { GenerateParams, GenerateResult, Message, StreamResult } from "@/co
import { createDefaultLogger } from "@/core/logger.js";
import type { Logger } from "@/core/logger.js";
import type { TokenUsage } from "@/core/provider/types.js";
import type { StepFinishEvent, StepInfo, StreamPart } from "@/core/types.js";
import type { AgentChainEntry, StepFinishEvent, StepInfo, StreamPart } from "@/core/types.js";
import type { Context } from "@/lib/context.js";
import { fireHooks, wrapHook } from "@/lib/hooks.js";
import { FLOW_AGENT_CONFIG, RUNNABLE_META } from "@/lib/runnable.js";
Expand Down Expand Up @@ -326,6 +326,10 @@ export function flowAgent<TInput, TOutput = any>(
const messages: Message[] = [];
const ctx: Context = { signal, log, trace, messages };

// Build agent chain: extend incoming chain with this flow agent's identity
const incomingChain = extractAgentChain(params);
const currentChain: readonly AgentChainEntry[] = [...incomingChain, { id: config.name }];

const mergedOnStepStart = buildMergedStepStartHook(log, config.onStepStart, params.onStepStart);
const mergedOnStepFinish = buildMergedStepFinishHook(
log,
Expand All @@ -340,6 +344,7 @@ export function flowAgent<TInput, TOutput = any>(
onStepFinish: mergedOnStepFinish,
},
writer,
agentChain: currentChain,
});

const $ = augmentStepBuilder(base$, ctx, _internal);
Expand Down
93 changes: 87 additions & 6 deletions packages/agents/src/core/agents/flow/steps/factory.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { isNil, isNotNil } from "es-toolkit";
import { isObject } from "es-toolkit/compat";

import { _agentChainField } from "@/core/agents/base/utils.js";
import {
buildToolCallId,
createToolCallMessage,
Expand All @@ -19,7 +20,7 @@ import type { WhileConfig } from "@/core/agents/flow/steps/while.js";
/* oxlint-disable import/max-dependencies -- step factory requires many internal modules */
import type { GenerateResult, StreamResult } from "@/core/agents/types.js";
import type { TokenUsage } from "@/core/provider/types.js";
import type { StepFinishEvent, StepInfo, StreamPart } from "@/core/types.js";
import type { AgentChainEntry, StepFinishEvent, StepInfo, StreamPart } from "@/core/types.js";
import type { Context } from "@/lib/context.js";
import { fireHooks } from "@/lib/hooks.js";
import type { TraceEntry, OperationType } from "@/lib/trace.js";
Expand Down Expand Up @@ -55,6 +56,17 @@ export interface StepBuilderOptions {
* step events through the readable stream.
*/
writer?: WritableStreamDefaultWriter<StreamPart>;

/**
* Agent ancestry chain from root to the current flow agent.
*
* Attached to every `StepInfo` and `StepFinishEvent` so hook
* consumers can trace which agent produced the event. Also
* forwarded to sub-agents called via `$.agent()`.
*
* @internal Framework-only — not exposed to users.
*/
agentChain?: readonly AgentChainEntry[];
}

/**
Expand Down Expand Up @@ -88,7 +100,7 @@ export function createStepBuilder(options: StepBuilderOptions): StepBuilder {
* the same ref so step indices are globally unique.
*/
function createStepBuilderInternal(options: StepBuilderOptions, indexRef: IndexRef): StepBuilder {
const { ctx, parentHooks, writer } = options;
const { ctx, parentHooks, writer, agentChain } = options;

/**
* Core step primitive — every other method delegates here.
Expand Down Expand Up @@ -120,7 +132,7 @@ function createStepBuilderInternal(options: StepBuilderOptions, indexRef: IndexR
}): Promise<StepResult<T>> {
const { id, type, execute, input, onStart, onFinish, onError } = params;

const stepInfo: StepInfo = { id, index: indexRef.current++, type };
const stepInfo: StepInfo = { id, index: indexRef.current++, type, agentChain };
const startedAt = Date.now();

const childTrace: TraceEntry[] = [];
Expand All @@ -130,7 +142,10 @@ function createStepBuilderInternal(options: StepBuilderOptions, indexRef: IndexR
trace: childTrace,
messages: ctx.messages,
};
const child$ = createStepBuilderInternal({ ctx: childCtx, parentHooks, writer }, indexRef);
const child$ = createStepBuilderInternal(
{ ctx: childCtx, parentHooks, writer, agentChain },
indexRef,
);

// Build synthetic tool-call message and push to context
const toolCallId = buildToolCallId(id, stepInfo.index);
Expand Down Expand Up @@ -202,7 +217,7 @@ function createStepBuilderInternal(options: StepBuilderOptions, indexRef: IndexR
fn({ id, result: value as T, duration }),
);
const parentOnStepFinishHook = buildParentHookCallback(parentHooks, "onStepFinish", (fn) =>
fn({ step: stepInfo, result: value, duration }),
fn({ step: stepInfo, result: value, duration, agentChain }),
);

await fireHooks(ctx.log, onFinishHook, parentOnStepFinishHook);
Expand Down Expand Up @@ -255,7 +270,7 @@ function createStepBuilderInternal(options: StepBuilderOptions, indexRef: IndexR

const onErrorHook = buildHookCallback(onError, (fn) => fn({ id, error }));
const parentOnStepFinishHook = buildParentHookCallback(parentHooks, "onStepFinish", (fn) =>
fn({ step: stepInfo, result: undefined, duration }),
fn({ step: stepInfo, result: undefined, duration, agentChain }),
);

await fireHooks(ctx.log, onErrorHook, parentOnStepFinishHook);
Expand All @@ -274,12 +289,20 @@ function createStepBuilderInternal(options: StepBuilderOptions, indexRef: IndexR
type: "agent",
input: config.input,
execute: async () => {
// Forward fixed-type step hooks and agent chain to sub-agent.
// Merge parent + child hooks so neither side is clobbered.
const mergedHooks = mergeStepHooks(parentHooks, config.config);
const agentParams = {
...config.config,
input: config.input,
signal: ctx.signal,
logger: ctx.log.child({ stepId: config.id }),
...mergedHooks,
};
// Stamp after spread — Symbol fields don't survive spread
if (isNotNil(agentChain)) {
_agentChainField.set(agentParams, agentChain);
}

// When stream: true and a writer is available, use agent.stream()
// To pipe events through the parent flow's stream
Expand Down Expand Up @@ -585,6 +608,64 @@ function buildOnFinishHandlerRace(
return (event) => onFinish({ id: event.id, result: event.result, duration: event.duration });
}

/**
* Merge parent flow step hooks with delegated-agent step hooks.
*
* When both parent and child define the same hook, the merged callback
* fires the child hook first, then the parent hook. Only defined hooks
* are included in the result so `undefined` values never clobber
* a child-only hook via object spread.
*
* @private
*/
function mergeStepHooks(
parentHooks: StepBuilderOptions["parentHooks"],
childConfig: Record<string, unknown> | undefined,
): Record<string, unknown> {
type StepStartHook = (event: { step: StepInfo }) => void | Promise<void>;
type StepFinishHook = (event: StepFinishEvent) => void | Promise<void>;

let parentStart: StepStartHook | undefined;
let parentFinish: StepFinishHook | undefined;
if (isNotNil(parentHooks)) {
parentStart = parentHooks.onStepStart;
parentFinish = parentHooks.onStepFinish;
}

let childStart: StepStartHook | undefined;
let childFinish: StepFinishHook | undefined;
if (isNotNil(childConfig)) {
childStart = childConfig.onStepStart as StepStartHook | undefined;
childFinish = childConfig.onStepFinish as StepFinishHook | undefined;
}

const result: Record<string, unknown> = {};

if (isNotNil(parentStart) && isNotNil(childStart)) {
result.onStepStart = async (event: { step: StepInfo }) => {
await childStart(event);
await parentStart(event);
};
} else if (isNotNil(parentStart)) {
result.onStepStart = parentStart;
} else if (isNotNil(childStart)) {
result.onStepStart = childStart;
}

if (isNotNil(parentFinish) && isNotNil(childFinish)) {
result.onStepFinish = async (event: StepFinishEvent) => {
await childFinish(event);
await parentFinish(event);
};
} else if (isNotNil(parentFinish)) {
result.onStepFinish = parentFinish;
} else if (isNotNil(childFinish)) {
result.onStepFinish = childFinish;
}

return result;
}

/**
* Sequentially reduce items with abort support using tail-recursive iteration.
*
Expand Down
Loading