Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion lib/adapters/PersistingEventBusAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
createUserResponseEvent,
createWorkflowStateEvent,
} from "@/lib/neo4j/services/event"
import { upsertCreatedPullRequestFromToolResult } from "@/lib/neo4j/services/pullRequest"

/**
* EventBus adapter that both publishes to the transport (Redis Streams)
Expand Down Expand Up @@ -127,12 +128,34 @@ async function persistToNeo4j(
case "tool.result": {
const toolName = (event.metadata?.["toolName"] as string) || "unknown"
const toolCallId = (event.metadata?.["toolCallId"] as string) || ""
await createToolCallResultEvent({
const created = await createToolCallResultEvent({
workflowId,
toolName,
toolCallId,
content: event.content ?? "",
})

// Normalize agent-created PRs into first-class nodes
if (toolName === "create_pull_request") {
try {
const parsed = JSON.parse(event.content || "{}")
const pr = parsed?.pullRequest?.data
if (parsed?.status === "success" && pr?.number && pr?.url) {
const number = Number(pr.number)
const url = String(pr.url)
const title = pr?.title ? String(pr.title) : undefined
await upsertCreatedPullRequestFromToolResult({
eventId: created.id,
url,
number,
title,
})
}
} catch {
// ignore parse errors – content may be arbitrary
}
}

return
}

Expand Down
59 changes: 59 additions & 0 deletions lib/neo4j/repositories/pullRequest.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { int, Integer, ManagedTransaction, Node } from "neo4j-driver"
import { z } from "zod"

import { pullRequestSchema } from "@/lib/types/db/neo4j"

export type PullRequestNode = z.infer<typeof pullRequestSchema>

export async function upsertPullRequest(
tx: ManagedTransaction,
{
repoFullName,
number,
url,
title,
}: {
repoFullName: string
number: number
url: string
title?: string | null
}
): Promise<PullRequestNode> {
const result = await tx.run<{
pr: Node<Integer, PullRequestNode, "PullRequest">
}>(
`
MERGE (pr:PullRequest { repoFullName: $repoFullName, number: $number })
ON CREATE SET pr.createdAt = datetime(), pr.url = $url, pr.title = $title
ON MATCH SET
pr.url = coalesce(pr.url, $url),
pr.title = coalesce(pr.title, $title)
RETURN pr
`,
{ repoFullName, number: int(number), url, title: title ?? null }
)
const raw = result.records[0]?.get("pr")?.properties
return pullRequestSchema.parse(raw)
}

export async function linkEventCreatedPR(
tx: ManagedTransaction,
{
eventId,
repoFullName,
number,
}: {
eventId: string
repoFullName: string
number: number
}
): Promise<void> {
await tx.run(
`
MATCH (e:Event:Message {id: $eventId})
MATCH (pr:PullRequest { repoFullName: $repoFullName, number: $number })
MERGE (e)-[:CREATED_PR]->(pr)
`,
{ eventId, repoFullName, number: int(number) }
)
}
64 changes: 64 additions & 0 deletions lib/neo4j/services/pullRequest.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { ManagedTransaction } from "neo4j-driver"

import { n4j } from "@/lib/neo4j/client"
import {
linkEventCreatedPR,
upsertPullRequest,
} from "@/lib/neo4j/repositories/pullRequest"

function deriveRepoFullNameFromUrl(url: string): string | null {
// Expected formats:
// - https://github.com/owner/repo/pull/123
// - http://github.com/owner/repo/pull/123
try {
const u = new URL(url)
if (u.hostname !== "github.com") return null
const parts = u.pathname.split("/").filter(Boolean)
// [owner, repo, 'pull', number]
if (parts.length >= 4 && parts[2] === "pull") {
const owner = parts[0]
const repo = parts[1]
if (owner && repo) return `${owner}/${repo}`
}
return null
} catch {
return null
}
}

export async function upsertCreatedPullRequestFromToolResult({
eventId,
repoFullName,
url,
number,
title,
}: {
eventId: string
repoFullName?: string | null
url: string
number: number
title?: string | null
}) {
const session = await n4j.getSession()
try {
const finalRepoFullName =
repoFullName || deriveRepoFullNameFromUrl(url) || undefined
if (!finalRepoFullName) return // cannot upsert without repo context

await session.executeWrite(async (tx: ManagedTransaction) => {
await upsertPullRequest(tx, {
repoFullName: finalRepoFullName,
number,
url,
title,
})
await linkEventCreatedPR(tx, {
eventId,
repoFullName: finalRepoFullName,
number,
})
})
} finally {
await session.close()
}
}
18 changes: 18 additions & 0 deletions lib/types/db/neo4j.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,23 @@ export const taskSchema = appTaskSchema.merge(
})
)

// PullRequest node schema (normalized PRs ingested from tool results)
export const pullRequestSchema = z.object({
repoFullName: z.string(),
number: z.instanceof(Integer),
url: z.string(),
title: z.string().optional(),
createdAt: z.instanceof(DateTime),
lastCheckedAt: z.instanceof(DateTime).optional(),
state: z.string().optional(),
isDraft: z.boolean().optional(),
merged: z.boolean().optional(),
mergedAt: z.instanceof(DateTime).optional(),
closedAt: z.instanceof(DateTime).optional(),
mergeable: z.string().optional(),
mergeStateStatus: z.string().optional(),
})

// Event schemas
export const errorEventSchema = appErrorEventSchema
.merge(
Expand Down Expand Up @@ -164,6 +181,7 @@ export type ToolCallResult = z.infer<typeof toolCallResultSchema>
export type UserMessage = z.infer<typeof userMessageSchema>
export type WorkflowRun = z.infer<typeof workflowRunSchema>
export type WorkflowStateEvent = z.infer<typeof workflowStateEventSchema>
export type PullRequestNode = z.infer<typeof pullRequestSchema>

export function isLLMResponseWithPlan(
event: LLMResponse
Expand Down