diff --git a/lib/adapters/PersistingEventBusAdapter.ts b/lib/adapters/PersistingEventBusAdapter.ts index bb453020..6b938774 100644 --- a/lib/adapters/PersistingEventBusAdapter.ts +++ b/lib/adapters/PersistingEventBusAdapter.ts @@ -13,6 +13,7 @@ import { createUserResponseEvent, createWorkflowStateEvent, } from "@/lib/neo4j/services/event" +import { upsertCreatedPullRequestFromToolResult } from "@/lib/neo4j/services/pullRequest" /** * EventBus adapter that both publishes to the transport (Redis Streams) @@ -127,12 +128,34 @@ async function persistToNeo4j( case "tool.result": { const toolName = (event.metadata?.["toolName"] as string) || "unknown" const toolCallId = (event.metadata?.["toolCallId"] as string) || "" - await createToolCallResultEvent({ + const created = await createToolCallResultEvent({ workflowId, toolName, toolCallId, content: event.content ?? "", }) + + // Normalize agent-created PRs into first-class nodes + if (toolName === "create_pull_request") { + try { + const parsed = JSON.parse(event.content || "{}") + const pr = parsed?.pullRequest?.data + if (parsed?.status === "success" && pr?.number && pr?.url) { + const number = Number(pr.number) + const url = String(pr.url) + const title = pr?.title ? String(pr.title) : undefined + await upsertCreatedPullRequestFromToolResult({ + eventId: created.id, + url, + number, + title, + }) + } + } catch { + // ignore parse errors – content may be arbitrary + } + } + return } diff --git a/lib/neo4j/repositories/pullRequest.ts b/lib/neo4j/repositories/pullRequest.ts new file mode 100644 index 00000000..d85ce7fa --- /dev/null +++ b/lib/neo4j/repositories/pullRequest.ts @@ -0,0 +1,59 @@ +import { int, Integer, ManagedTransaction, Node } from "neo4j-driver" +import { z } from "zod" + +import { pullRequestSchema } from "@/lib/types/db/neo4j" + +export type PullRequestNode = z.infer + +export async function upsertPullRequest( + tx: ManagedTransaction, + { + repoFullName, + number, + url, + title, + }: { + repoFullName: string + number: number + url: string + title?: string | null + } +): Promise { + const result = await tx.run<{ + pr: Node + }>( + ` + MERGE (pr:PullRequest { repoFullName: $repoFullName, number: $number }) + ON CREATE SET pr.createdAt = datetime(), pr.url = $url, pr.title = $title + ON MATCH SET + pr.url = coalesce(pr.url, $url), + pr.title = coalesce(pr.title, $title) + RETURN pr + `, + { repoFullName, number: int(number), url, title: title ?? null } + ) + const raw = result.records[0]?.get("pr")?.properties + return pullRequestSchema.parse(raw) +} + +export async function linkEventCreatedPR( + tx: ManagedTransaction, + { + eventId, + repoFullName, + number, + }: { + eventId: string + repoFullName: string + number: number + } +): Promise { + await tx.run( + ` + MATCH (e:Event:Message {id: $eventId}) + MATCH (pr:PullRequest { repoFullName: $repoFullName, number: $number }) + MERGE (e)-[:CREATED_PR]->(pr) + `, + { eventId, repoFullName, number: int(number) } + ) +} diff --git a/lib/neo4j/services/pullRequest.ts b/lib/neo4j/services/pullRequest.ts new file mode 100644 index 00000000..211ed538 --- /dev/null +++ b/lib/neo4j/services/pullRequest.ts @@ -0,0 +1,64 @@ +import { ManagedTransaction } from "neo4j-driver" + +import { n4j } from "@/lib/neo4j/client" +import { + linkEventCreatedPR, + upsertPullRequest, +} from "@/lib/neo4j/repositories/pullRequest" + +function deriveRepoFullNameFromUrl(url: string): string | null { + // Expected formats: + // - https://github.com/owner/repo/pull/123 + // - http://github.com/owner/repo/pull/123 + try { + const u = new URL(url) + if (u.hostname !== "github.com") return null + const parts = u.pathname.split("/").filter(Boolean) + // [owner, repo, 'pull', number] + if (parts.length >= 4 && parts[2] === "pull") { + const owner = parts[0] + const repo = parts[1] + if (owner && repo) return `${owner}/${repo}` + } + return null + } catch { + return null + } +} + +export async function upsertCreatedPullRequestFromToolResult({ + eventId, + repoFullName, + url, + number, + title, +}: { + eventId: string + repoFullName?: string | null + url: string + number: number + title?: string | null +}) { + const session = await n4j.getSession() + try { + const finalRepoFullName = + repoFullName || deriveRepoFullNameFromUrl(url) || undefined + if (!finalRepoFullName) return // cannot upsert without repo context + + await session.executeWrite(async (tx: ManagedTransaction) => { + await upsertPullRequest(tx, { + repoFullName: finalRepoFullName, + number, + url, + title, + }) + await linkEventCreatedPR(tx, { + eventId, + repoFullName: finalRepoFullName, + number, + }) + }) + } finally { + await session.close() + } +} diff --git a/lib/types/db/neo4j.ts b/lib/types/db/neo4j.ts index 6a9dfa58..e0cd5502 100644 --- a/lib/types/db/neo4j.ts +++ b/lib/types/db/neo4j.ts @@ -64,6 +64,23 @@ export const taskSchema = appTaskSchema.merge( }) ) +// PullRequest node schema (normalized PRs ingested from tool results) +export const pullRequestSchema = z.object({ + repoFullName: z.string(), + number: z.instanceof(Integer), + url: z.string(), + title: z.string().optional(), + createdAt: z.instanceof(DateTime), + lastCheckedAt: z.instanceof(DateTime).optional(), + state: z.string().optional(), + isDraft: z.boolean().optional(), + merged: z.boolean().optional(), + mergedAt: z.instanceof(DateTime).optional(), + closedAt: z.instanceof(DateTime).optional(), + mergeable: z.string().optional(), + mergeStateStatus: z.string().optional(), +}) + // Event schemas export const errorEventSchema = appErrorEventSchema .merge( @@ -164,6 +181,7 @@ export type ToolCallResult = z.infer export type UserMessage = z.infer export type WorkflowRun = z.infer export type WorkflowStateEvent = z.infer +export type PullRequestNode = z.infer export function isLLMResponseWithPlan( event: LLMResponse