diff --git a/__tests__/api/webhook/github.route.test.ts b/__tests__/api/webhook/github.route.test.ts index 5f6d6973d..e865055a3 100644 --- a/__tests__/api/webhook/github.route.test.ts +++ b/__tests__/api/webhook/github.route.test.ts @@ -13,22 +13,34 @@ jest.mock( jest.mock("@/lib/webhook/github/handlers/issue/label.resolve.handler", () => ({ handleIssueLabelResolve: jest.fn(), })) -jest.mock("@/lib/webhook/github/handlers/pullRequest/closed.removeContainer.handler", () => ({ - handlePullRequestClosedRemoveContainer: jest.fn(), -})) -jest.mock("@/lib/webhook/github/handlers/pullRequest/label.createDependentPR.handler", () => ({ - handlePullRequestLabelCreateDependentPR: jest.fn(), -})) -jest.mock("@/lib/webhook/github/handlers/repository/edited.revalidateRepoCache.handler", () => ({ - handleRepositoryEditedRevalidate: jest.fn(), -})) +jest.mock( + "@/lib/webhook/github/handlers/pullRequest/closed.removeContainer.handler", + () => ({ + handlePullRequestClosedRemoveContainer: jest.fn(), + }) +) +jest.mock( + "@/lib/webhook/github/handlers/pullRequest/label.createDependentPR.handler", + () => ({ + handlePullRequestLabelCreateDependentPR: jest.fn(), + }) +) +jest.mock( + "@/lib/webhook/github/handlers/repository/edited.revalidateRepoCache.handler", + () => ({ + handleRepositoryEditedRevalidate: jest.fn(), + }) +) import { POST } from "@/app/api/webhook/github/route" import { handleIssueLabelAutoResolve } from "@/lib/webhook/github/handlers/issue/label.autoResolveIssue.handler" import { handlePullRequestLabelCreateDependentPR } from "@/lib/webhook/github/handlers/pullRequest/label.createDependentPR.handler" -jest.mock("@/lib/webhook/github/handlers/issue/label.autoResolveIssue.handler", () => ({ - handleIssueLabelAutoResolve: jest.fn(), -})) +jest.mock( + "@/lib/webhook/github/handlers/issue/label.autoResolveIssue.handler", + () => ({ + handleIssueLabelAutoResolve: jest.fn(), + }) +) describe("POST /api/webhook/github", () => { const secret = "test-secret" @@ -55,7 +67,8 @@ describe("POST /api/webhook/github", () => { const rawBody = Buffer.from(JSON.stringify(payload)) const signature = - "sha256=" + crypto.createHmac("sha256", secret).update(rawBody).digest("hex") + "sha256=" + + crypto.createHmac("sha256", secret).update(rawBody).digest("hex") const headers = new Headers({ "x-hub-signature-256": signature, @@ -75,7 +88,9 @@ describe("POST /api/webhook/github", () => { const callArgs = jest.mocked(handleIssueLabelAutoResolve).mock.calls[0]?.[0] expect(callArgs).toBeDefined() expect(callArgs.installationId).toBe(String(payload.installation.id)) - expect(callArgs.payload.repository.full_name).toBe(payload.repository.full_name) + expect(callArgs.payload.repository.full_name).toBe( + payload.repository.full_name + ) expect(callArgs.payload.issue.number).toBe(payload.issue.number) expect(callArgs.payload.sender.login).toBe(payload.sender.login) }) @@ -93,7 +108,8 @@ describe("POST /api/webhook/github", () => { const rawBody = Buffer.from(JSON.stringify(payload)) const signature = - "sha256=" + crypto.createHmac("sha256", secret).update(rawBody).digest("hex") + "sha256=" + + crypto.createHmac("sha256", secret).update(rawBody).digest("hex") const headers = new Headers({ "x-hub-signature-256": signature, @@ -110,11 +126,12 @@ describe("POST /api/webhook/github", () => { expect(response.status).toBe(200) expect(handlePullRequestLabelCreateDependentPR).toHaveBeenCalledTimes(1) - const callArgs = jest.mocked(handlePullRequestLabelCreateDependentPR).mock.calls[0]?.[0] + const callArgs = jest.mocked(handlePullRequestLabelCreateDependentPR).mock + .calls[0]?.[0] expect(callArgs).toBeDefined() expect(callArgs.installationId).toBe(String(payload.installation.id)) expect(callArgs.payload.number).toBe(payload.number) expect(callArgs.payload.label?.name).toBe(payload.label.name) expect(callArgs.payload.sender?.login).toBe(payload.sender.login) }) -}) \ No newline at end of file +}) diff --git a/__tests__/apps/workers/handler.autoResolveIssue.test.ts b/__tests__/apps/workers/handler.autoResolveIssue.test.ts new file mode 100644 index 000000000..b4697514a --- /dev/null +++ b/__tests__/apps/workers/handler.autoResolveIssue.test.ts @@ -0,0 +1,88 @@ +// Mock @octokit packages to prevent ES module issues +jest.mock("@octokit/auth-app", () => ({ + createAppAuth: jest.fn(() => () => Promise.resolve({ token: "fake-token" })), +})) + +jest.mock("@octokit/auth-oauth-user", () => ({ + createOAuthUserAuth: jest.fn( + () => () => Promise.resolve({ token: "fake-token" }) + ), +})) + +jest.mock("@octokit/graphql", () => ({ + graphql: jest.fn(), +})) + +jest.mock("@octokit/rest", () => ({ + Octokit: jest.fn().mockImplementation(() => ({ + rest: { + apps: { getInstallation: jest.fn() }, + repos: { get: jest.fn() }, + }, + })), +})) + +jest.mock("octokit", () => ({ + App: jest.fn().mockImplementation(() => ({ + getInstallationOctokit: jest.fn(), + })), +})) + +jest.mock( + "apps/workers/workflow-workers/src/orchestrators/autoResolveIssue", + () => ({ + autoResolveIssue: jest.fn(), + }) +) + +jest.mock("apps/workers/workflow-workers/src/helper", () => ({ + publishJobStatus: jest.fn(), +})) + +import { handler } from "apps/workers/workflow-workers/src/handler" +import { publishJobStatus } from "apps/workers/workflow-workers/src/helper" +import { autoResolveIssue } from "apps/workers/workflow-workers/src/orchestrators/autoResolveIssue" +import type { Job } from "bullmq" + +const mockAutoResolveIssue = jest.mocked(autoResolveIssue) +const mockPublishJobStatus = jest.mocked(publishJobStatus) + +describe("handler - autoResolveIssue", () => { + beforeEach(() => { + jest.clearAllMocks() + }) + + it("routes autoResolveIssue jobs and publishes status updates", async () => { + const messages = [ + { role: "assistant" as const, content: "first message" }, + { role: "assistant" as const, content: "second message" }, + ] + mockAutoResolveIssue.mockResolvedValue(messages) + + const job = { + id: "job-123", + name: "autoResolveIssue", + data: { + repoFullName: "owner/repo", + issueNumber: 42, + branch: "feature-branch", + githubLogin: "octocat", + githubInstallationId: "install-1", + }, + } + + const result = await handler(job as Job) + + expect(mockAutoResolveIssue).toHaveBeenCalledWith(job.id, job.data) + expect(result).toBe("first message\nsecond message") + expect(mockPublishJobStatus).toHaveBeenCalledWith(job.id, "Parsing job") + expect(mockPublishJobStatus).toHaveBeenCalledWith( + job.id, + "Job: Auto resolve issue" + ) + expect(mockPublishJobStatus).toHaveBeenCalledWith( + job.id, + "Completed: first message\nsecond message" + ) + }) +}) diff --git a/__tests__/config/jest.config.base.ts b/__tests__/config/jest.config.base.ts index c204aadff..e6c6ee8db 100644 --- a/__tests__/config/jest.config.base.ts +++ b/__tests__/config/jest.config.base.ts @@ -6,15 +6,24 @@ const baseConfig: Config = { moduleNameMapper: { "^@/components/(.*)$": "/components/$1", "^@/styles/(.*)$": "/styles/$1", - "^@/lib/(.*)$": "/lib/$1", + "^@/lib/(.*)$": "/lib/$1", "^@/__tests__/(.*)$": "/__tests__/$1", + "^@/shared/(.*)$": "/shared/src/$1", "^@shared/(.*)$": "/shared/src/$1", "^shared/(.*)$": "/shared/src/$1", + "^apps/(.*)$": "/apps/$1", "^@workers/(.*)$": "/apps/workers/src/$1", - "^@/(adapters|entities|ports|providers|services|ui|usecases|utils)(/.*)?$": - "/shared/src/$1$2", + "^@/adapters/(.*)$": "/shared/src/adapters/$1", + "^@/entities/(.*)$": "/shared/src/entities/$1", + "^@/ports/(.*)$": "/shared/src/ports/$1", "^@/(.*)$": "/$1", }, + transform: { + "^.+\\.(ts|tsx)$": "ts-jest", + }, + transformIgnorePatterns: [ + "node_modules/(?!(shared|@octokit|universal-user-agent)/)" + ], coveragePathIgnorePatterns: ["/node_modules/", "/.next/", "/coverage/"], rootDir: "../..", } diff --git a/__tests__/lib/webhook/handlers/pullRequest/label.createDependentPR.handler.test.ts b/__tests__/lib/webhook/handlers/pullRequest/label.createDependentPR.handler.test.ts index 9efff275b..301cf881d 100644 --- a/__tests__/lib/webhook/handlers/pullRequest/label.createDependentPR.handler.test.ts +++ b/__tests__/lib/webhook/handlers/pullRequest/label.createDependentPR.handler.test.ts @@ -1,10 +1,21 @@ import { handlePullRequestLabelCreateDependentPR } from "@/lib/webhook/github/handlers/pullRequest/label.createDependentPR.handler" import type { PullRequestPayload } from "@/lib/webhook/github/types" +import { addJob } from "@/shared/services/job" -describe("handlePullRequestLabelCreateDependentPR (noop)", () => { +jest.mock("@/shared/services/job", () => ({ + addJob: jest.fn().mockResolvedValue("job-id-123"), +})) + +describe("handlePullRequestLabelCreateDependentPR", () => { const installationId = "123456" - function makePayload(overrides: Partial = {}): PullRequestPayload { + beforeEach(() => { + jest.clearAllMocks() + }) + + function makePayload( + overrides: Partial = {} + ): PullRequestPayload { return { action: "labeled", number: 42, @@ -17,15 +28,47 @@ describe("handlePullRequestLabelCreateDependentPR (noop)", () => { } } - beforeEach(() => { - jest.clearAllMocks() + it("enqueues a createDependentPR job with expected payload", async () => { + const payload = makePayload({ number: 100 }) + + process.env.REDIS_URL = "redis://localhost:6379" + + await handlePullRequestLabelCreateDependentPR({ payload, installationId }) + + const mockedAddJob = jest.mocked(addJob) + expect(mockedAddJob).toHaveBeenCalledTimes(1) + const [queueName, jobEvent, _opts, redisUrl] = mockedAddJob.mock.calls[0] + expect(queueName).toBe("workflow-jobs") + expect(jobEvent.name).toBe("createDependentPR") + expect(jobEvent.data).toEqual({ + repoFullName: "owner/repo", + pullNumber: 100, + githubLogin: "octocat", + githubInstallationId: installationId, + }) + expect(redisUrl).toBe("redis://localhost:6379") + }) + + it("handles valid payload correctly", async () => { + process.env.REDIS_URL = "redis://localhost:6379" + const payload = makePayload({ number: 42 }) + + const result = await handlePullRequestLabelCreateDependentPR({ + payload, + installationId, + }) + + expect(result.status).toBe("noop") }) it("logs a noop message with expected context", async () => { const logSpy = jest.spyOn(console, "log").mockImplementation(() => {}) const payload = makePayload({ number: 100 }) - const result = await handlePullRequestLabelCreateDependentPR({ payload, installationId }) + const result = await handlePullRequestLabelCreateDependentPR({ + payload, + installationId, + }) expect(result).toEqual({ status: "noop", @@ -44,13 +87,28 @@ describe("handlePullRequestLabelCreateDependentPR (noop)", () => { logSpy.mockRestore() }) - it("throws if required fields are missing", async () => { + it("throws if repository fields are missing", async () => { await expect( handlePullRequestLabelCreateDependentPR({ - payload: makePayload({ repository: { name: "", owner: { login: "" } } }), + payload: makePayload({ + repository: { name: "", owner: { login: "" } }, + }), installationId, }) ).rejects.toThrow() }) -}) + it("throws if REDIS_URL is not set", async () => { + const original = process.env.REDIS_URL + delete process.env.REDIS_URL + + await expect( + handlePullRequestLabelCreateDependentPR({ + payload: makePayload(), + installationId, + }) + ).rejects.toThrow("REDIS_URL is not set") + + if (original) process.env.REDIS_URL = original + }) +}) diff --git a/__tests__/lib/webhook/label.autoResolveIssue.handler.test.ts b/__tests__/lib/webhook/label.autoResolveIssue.handler.test.ts new file mode 100644 index 000000000..ff53de9f6 --- /dev/null +++ b/__tests__/lib/webhook/label.autoResolveIssue.handler.test.ts @@ -0,0 +1,87 @@ +import { WORKFLOW_JOBS_QUEUE } from "shared/entities/Queue" +import * as jobService from "shared/services/job" + +import { handleIssueLabelAutoResolve } from "@/lib/webhook/github/handlers/issue/label.autoResolveIssue.handler" +import type { IssuesPayload } from "@/lib/webhook/github/types" + +jest.mock("shared/services/job", () => ({ + addJob: jest.fn(), +})) + +describe("handleIssueLabelAutoResolve", () => { + const originalEnv = process.env + + const buildPayload = ( + overrides: Partial = {} + ): IssuesPayload => ({ + action: "labeled", + repository: { full_name: "octo/repo" }, + issue: { number: 42 }, + sender: { login: "octocat" }, + installation: { id: 99 }, + ...overrides, + }) + + beforeEach(() => { + jest.restoreAllMocks() + process.env = { ...originalEnv } + }) + + it("enqueues autoResolveIssue job with expected data", async () => { + const addJobSpy = jest + .spyOn(jobService, "addJob") + .mockResolvedValue("job-id-1") + + process.env.REDIS_URL = "redis://localhost:6379" + + const payload = buildPayload() + + await handleIssueLabelAutoResolve({ payload, installationId: "99" }) + + expect(addJobSpy).toHaveBeenCalledTimes(1) + expect(addJobSpy).toHaveBeenCalledWith( + WORKFLOW_JOBS_QUEUE, + { + name: "autoResolveIssue", + data: { + repoFullName: "octo/repo", + issueNumber: 42, + githubLogin: "octocat", + githubInstallationId: "99", + }, + }, + {}, + "redis://localhost:6379" + ) + }) + + it("throws when REDIS_URL is missing", async () => { + // Make sure we DON'T have REDIS_URL set in the environment + delete process.env.REDIS_URL + + const payload = buildPayload() + + await expect( + handleIssueLabelAutoResolve({ payload, installationId: "99" }) + ).rejects.toThrow("REDIS_URL is not set") + }) + + it("throws when required payload fields are missing", async () => { + process.env.REDIS_URL = "redis://localhost:6379" + + const incompletePayload = { + action: "labeled", + repository: {}, + issue: {}, + sender: {}, + installation: {}, + } as unknown as IssuesPayload + + await expect( + handleIssueLabelAutoResolve({ + payload: incompletePayload, + installationId: "missing", + }) + ).rejects.toThrow("Missing required fields for autoResolveIssue job") + }) +}) diff --git a/__tests__/lib/workers/createDependentPR.worker.test.ts b/__tests__/lib/workers/createDependentPR.worker.test.ts new file mode 100644 index 000000000..65700e8fb --- /dev/null +++ b/__tests__/lib/workers/createDependentPR.worker.test.ts @@ -0,0 +1,76 @@ +import { createDependentPR } from "apps/workers/workflow-workers/src/orchestrators/createDependentPR" + +// Mock all external dependencies that make network calls +jest.mock("@octokit/auth-app", () => ({ + createAppAuth: jest.fn(() => () => Promise.resolve({ token: "fake-token" })), +})) + +jest.mock("@octokit/auth-oauth-user", () => ({ + createOAuthUserAuth: jest.fn( + () => () => Promise.resolve({ token: "fake-token" }) + ), +})) + +jest.mock("@octokit/graphql", () => ({ + graphql: jest.fn(), +})) + +jest.mock("@octokit/rest", () => ({ + Octokit: jest.fn().mockImplementation(() => ({ + rest: { + repos: { + get: jest.fn(), + createFork: jest.fn(), + }, + pulls: { + create: jest.fn(), + }, + git: { + getRef: jest.fn(), + createRef: jest.fn(), + }, + }, + })), +})) + +jest.mock("octokit", () => ({ + App: jest.fn().mockImplementation(() => ({ + getInstallationOctokit: jest.fn(), + })), +})) + +jest.mock( + "apps/workers/workflow-workers/src/orchestrators/createDependentPR", + () => ({ + createDependentPR: jest.fn().mockResolvedValue("branch-xyz"), + }) +) + +import { handler, JobInput } from "apps/workers/workflow-workers/src/handler" + +jest.mock("shared/entities/events/Job", () => { + const actual = jest.requireActual("shared/entities/events/Job") + return { + ...actual, + } +}) + +describe("workflow worker handler - createDependentPR", () => { + it("routes createDependentPR jobs to orchestrator and returns branch name", async () => { + const job: JobInput = { + id: "job-123", + name: "createDependentPR", + data: { + repoFullName: "owner/repo", + pullNumber: 55, + githubLogin: "octocat", + githubInstallationId: "123", + }, + } + + const result = await handler(job) + + expect(createDependentPR).toHaveBeenCalledTimes(1) + expect(result).toBe("branch-xyz") + }) +}) diff --git a/app/api/webhook/github/route.ts b/app/api/webhook/github/route.ts index c4fc2bdb5..82392036e 100644 --- a/app/api/webhook/github/route.ts +++ b/app/api/webhook/github/route.ts @@ -359,3 +359,4 @@ export async function POST(req: NextRequest) { return new Response("Error", { status: 500 }) } } + diff --git a/apps/workers/workflow-workers/src/handler.ts b/apps/workers/workflow-workers/src/handler.ts index 0fd63c226..b42cdac8d 100644 --- a/apps/workers/workflow-workers/src/handler.ts +++ b/apps/workers/workflow-workers/src/handler.ts @@ -12,15 +12,21 @@ * @returns The result of the job. Currently, this is not being used. */ -import { Job } from "bullmq" import { JobEventSchema } from "shared/entities/events/Job" import { publishJobStatus } from "./helper" import { autoResolveIssue } from "./orchestrators/autoResolveIssue" +import { createDependentPR } from "./orchestrators/createDependentPR" import { simulateLongRunningWorkflow } from "./orchestrators/simulateLongRunningWorkflow" import { summarizeIssue } from "./orchestrators/summarizeIssue" -export async function handler(job: Job): Promise { +export interface JobInput { + id?: string | undefined + name: string + data: D +} + +export async function handler(job: JobInput): Promise { console.log(`Received job ${job.id}: ${job.name}`) if (!job.id) { @@ -66,6 +72,12 @@ export async function handler(job: Job): Promise { ) return result.map((m) => m.content).join("\n") } + case "createDependentPR": { + await publishJobStatus(job.id, "Job: Create dependent PR") + const branch = await createDependentPR(job.id, jobData) + await publishJobStatus(job.id, `Completed: created branch ${branch}`) + return branch + } default: { await publishJobStatus(job.id, "Failed: Unknown job name") throw new Error(`Unknown job name: ${job.name}`) diff --git a/apps/workers/workflow-workers/src/helper.ts b/apps/workers/workflow-workers/src/helper.ts index 8bbf5677e..cbf27d4fb 100644 --- a/apps/workers/workflow-workers/src/helper.ts +++ b/apps/workers/workflow-workers/src/helper.ts @@ -3,6 +3,7 @@ import dotenv from "dotenv" import type IORedis from "ioredis" import path from "path" import { getRedisConnection } from "shared/adapters/ioredis/client" +import type { Neo4jDataSource } from "shared/adapters/neo4j/dataSource" import { JOB_STATUS_CHANNEL } from "shared/entities/Channels" import { JobStatusUpdateSchema } from "shared/entities/events/JobStatus" import { fileURLToPath } from "url" @@ -54,15 +55,17 @@ export async function publishJobStatus(jobId: string, status: string) { } // Register graceful shutdown handlers for the worker process. -// Stops taking new jobs, waits for in-flight jobs to finish, then exits. +// Stops taking new jobs, waits for in-flight jobs to finish, +// shuts down Redis and Neo4j connections, then exits. // If the timeout elapses, forces exit(1) after disconnecting Redis. export function registerGracefulShutdown(opts: { worker: Worker queueEvents: QueueEvents - connection: IORedis + redis: IORedis[] + neo4jDs: Neo4jDataSource timeoutMs?: number }) { - const { worker, queueEvents, connection, timeoutMs: timeoutMsOpt } = opts + const { worker, queueEvents, redis, neo4jDs, timeoutMs: timeoutMsOpt } = opts const { SHUTDOWN_TIMEOUT_MS: timeoutMsEnv } = getEnvVar() const timeoutMs = timeoutMsOpt ?? Number(timeoutMsEnv) @@ -79,7 +82,7 @@ export function registerGracefulShutdown(opts: { ) // Ensure connections are dropped before forcing exit (fire-and-forget) try { - connection.disconnect() + redis.forEach((connection) => connection.disconnect()) } catch {} process.exit(1) }, timeoutMs) @@ -89,8 +92,11 @@ export function registerGracefulShutdown(opts: { await worker.close() // Close queue event listener await queueEvents.close() - // Close the redis connection - await connection.quit() + // Close the redis connections + await Promise.all(redis.map((connection) => connection.quit())) + // Close the Neo4j driver + await neo4jDs.getDriver().close() + clearTimeout(timeout) console.log("[worker] Shutdown complete. Exiting…") process.exit(0) diff --git a/apps/workers/workflow-workers/src/index.ts b/apps/workers/workflow-workers/src/index.ts index f0864db97..1c640eaf8 100644 --- a/apps/workers/workflow-workers/src/index.ts +++ b/apps/workers/workflow-workers/src/index.ts @@ -11,16 +11,14 @@ */ import { QueueEvents, Worker } from "bullmq" -import { getRedisConnection } from "shared/adapters/ioredis/client" import { WORKFLOW_JOBS_QUEUE } from "shared/entities/Queue" import { handler } from "./handler" import { getEnvVar, registerGracefulShutdown } from "./helper" +import { neo4jDs } from "./neo4j" +import { eventsConn, workerConn } from "./redis" -const { REDIS_URL, WORKER_CONCURRENCY } = getEnvVar() - -const workerConn = getRedisConnection(REDIS_URL, "bullmq:worker") -const eventsConn = getRedisConnection(REDIS_URL, "bullmq:events") +const { WORKER_CONCURRENCY } = getEnvVar() const concurrency = Math.max(1, WORKER_CONCURRENCY) @@ -94,4 +92,9 @@ queueEvents.on("error", (err) => { }) // Register graceful shutdown with a default 1 hour timeout (overridable via SHUTDOWN_TIMEOUT_MS) -registerGracefulShutdown({ worker, queueEvents, connection: workerConn }) +registerGracefulShutdown({ + worker, + queueEvents, + redis: [workerConn, eventsConn], + neo4jDs, +}) diff --git a/apps/workers/workflow-workers/src/neo4j.ts b/apps/workers/workflow-workers/src/neo4j.ts new file mode 100644 index 000000000..bbc002a6d --- /dev/null +++ b/apps/workers/workflow-workers/src/neo4j.ts @@ -0,0 +1,12 @@ +// apps/workers/workflow-workers/src/neo4j.ts +import { createNeo4jDataSource } from "shared/adapters/neo4j/dataSource" + +import { getEnvVar } from "./helper" + +const { NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD } = getEnvVar() + +export const neo4jDs = createNeo4jDataSource({ + uri: NEO4J_URI, + user: NEO4J_USER, + password: NEO4J_PASSWORD, +}) diff --git a/apps/workers/workflow-workers/src/orchestrators/createDependentPR.ts b/apps/workers/workflow-workers/src/orchestrators/createDependentPR.ts new file mode 100644 index 000000000..a924b8f73 --- /dev/null +++ b/apps/workers/workflow-workers/src/orchestrators/createDependentPR.ts @@ -0,0 +1,108 @@ +import { createAppAuth } from "@octokit/auth-app" +import { Octokit } from "@octokit/rest" +import type { Transaction } from "neo4j-driver" +import { makeContainerCheckoutCommitAdapter } from "shared/adapters/git/checkoutCommit" +import { EventBusAdapter } from "shared/adapters/ioredis/EventBusAdapter" +import { makeSettingsReaderAdapter } from "shared/adapters/neo4j/repositories/SettingsReaderAdapter" +import { setAccessToken } from "shared/auth" +import { getPrivateKeyFromFile } from "shared/services/fs" +import { createDependentPRWorkflow } from "shared/usecases/workflows/createDependentPR" + +import { getEnvVar, publishJobStatus } from "../helper" +import { neo4jDs } from "../neo4j" + +// Minimal user repository implementation for SettingsReaderAdapter +const userRepo = { + async getUserSettings(tx: Transaction, username: string) { + const res = await tx.run( + ` + MATCH (u:User {username: $username})-[:HAS_SETTINGS]->(s:Settings) + RETURN s LIMIT 1 + `, + { username } + ) + const settings = res.records?.[0]?.get?.("s")?.properties ?? null + if (!settings) return null + return { + openAIApiKey: settings.openAIApiKey ?? null, + } + }, +} + +export type CreateDependentPRJobData = { + repoFullName: string + pullNumber: number + githubLogin: string + githubInstallationId: string +} + +export async function createDependentPR( + jobId: string, + { + repoFullName, + pullNumber, + githubLogin, + githubInstallationId, + }: CreateDependentPRJobData +) { + await publishJobStatus( + jobId, + `Preparing to create dependent PR for ${repoFullName}#${pullNumber}` + ) + + const { GITHUB_APP_ID, GITHUB_APP_PRIVATE_KEY_PATH, REDIS_URL } = getEnvVar() + + const settingsAdapter = makeSettingsReaderAdapter({ + getSession: () => neo4jDs.getSession(), + userRepo, + }) + + const privateKey = await getPrivateKeyFromFile(GITHUB_APP_PRIVATE_KEY_PATH) + const octokit = new Octokit({ + authStrategy: createAppAuth, + auth: { + appId: GITHUB_APP_ID, + privateKey, + // Octokit expects installationId as a number + installationId: Number(githubInstallationId), + }, + }) + + // Setup adapters for event bus + const eventBusAdapter = new EventBusAdapter(REDIS_URL) + const auth = await octokit.auth({ type: "installation" }) + if ( + !auth || + typeof auth !== "object" || + !("token" in auth) || + typeof auth.token !== "string" + ) { + throw new Error("Failed to get installation token") + } + setAccessToken(auth.token) + + await publishJobStatus(jobId, "Running dependent PR workflow") + + // Setup adapters for git checkout + const gitCheckoutAdapter = makeContainerCheckoutCommitAdapter() + + const result = await createDependentPRWorkflow( + { + repoFullName, + pullNumber, + login: githubLogin, + jobId, + }, + { + settings: settingsAdapter, + eventBus: eventBusAdapter, + gitCheckout: gitCheckoutAdapter, + } + ) + + await publishJobStatus( + jobId, + `Completed: created dependent branch ${result.branch}` + ) + return result.branch +} diff --git a/apps/workers/workflow-workers/src/redis.ts b/apps/workers/workflow-workers/src/redis.ts new file mode 100644 index 000000000..c1a2c3485 --- /dev/null +++ b/apps/workers/workflow-workers/src/redis.ts @@ -0,0 +1,8 @@ +import { getRedisConnection } from "shared/adapters/ioredis/client" + +import { getEnvVar } from "./helper" + +const { REDIS_URL } = getEnvVar() + +export const workerConn = getRedisConnection(REDIS_URL, "bullmq:worker") +export const eventsConn = getRedisConnection(REDIS_URL, "bullmq:events") diff --git a/lib/fetch/github/issues.ts b/lib/fetch/github/issues.ts index ba7427490..2bf2fccc7 100644 --- a/lib/fetch/github/issues.ts +++ b/lib/fetch/github/issues.ts @@ -20,7 +20,7 @@ export const getIssue = async ( const sessionProvider = makeSessionProvider(() => auth()) const accessTokenProvider = makeAccessTokenProviderFrom( sessionProvider, - (s) => s?.token?.access_token as unknown as string | null | undefined + (s) => s?.token?.access_token ?? null ) // Ensure auth present; if not, treat as forbidden diff --git a/lib/github/issues.ts b/lib/github/issues.ts index ac1498871..2d250a828 100644 --- a/lib/github/issues.ts +++ b/lib/github/issues.ts @@ -133,7 +133,7 @@ async function getUserAccessToken(): Promise { const sessionProvider = makeSessionProvider(() => auth()) const accessTokenProvider = makeAccessTokenProviderFrom( sessionProvider, - (s) => s?.token?.access_token as unknown as string | null | undefined + (s) => s?.token?.access_token ?? null ) try { return await accessTokenProvider() diff --git a/lib/webhook/github/handlers/pullRequest/label.createDependentPR.handler.ts b/lib/webhook/github/handlers/pullRequest/label.createDependentPR.handler.ts index b2541fb9b..d677c8e06 100644 --- a/lib/webhook/github/handlers/pullRequest/label.createDependentPR.handler.ts +++ b/lib/webhook/github/handlers/pullRequest/label.createDependentPR.handler.ts @@ -1,9 +1,11 @@ import type { PullRequestPayload } from "@/lib/webhook/github/types" +import { QueueEnum, WORKFLOW_JOBS_QUEUE } from "@/shared/entities/Queue" +import { addJob } from "@/shared/services/job" /** * Handler: PR labeled with "I2PR: Update PR" - * - For now, this is a no-op boundary that validates payload and logs receipt - * - Actual job enqueueing will be added in a follow-up PR once worker schema supports it + * - Enqueues the createDependentPR job onto the workflow-jobs queue + * - Includes installation id and labeler login in job data */ export async function handlePullRequestLabelCreateDependentPR({ payload, @@ -12,6 +14,11 @@ export async function handlePullRequestLabelCreateDependentPR({ payload: PullRequestPayload installationId: string }) { + const redisUrl = process.env.REDIS_URL + if (!redisUrl) { + throw new Error("REDIS_URL is not set") + } + const owner = payload.repository?.owner?.login const repo = payload.repository?.name const pullNumber = payload.number || payload.pull_request?.number @@ -19,13 +26,28 @@ export async function handlePullRequestLabelCreateDependentPR({ if (!owner || !repo || typeof pullNumber !== "number" || !githubLogin) { throw new Error( - "Missing required fields for createDependentPR (owner, repo, pullNumber, sender.login)" + "Missing required fields for createDependentPR job (owner, repo, pullNumber, sender.login)" ) } const repoFullName = `${owner}/${repo}` + const queue: QueueEnum = WORKFLOW_JOBS_QUEUE + + await addJob( + queue, + { + name: "createDependentPR", + data: { + repoFullName, + pullNumber, + githubLogin, + githubInstallationId: installationId, + }, + }, + {}, + redisUrl + ) - // No-op: Log receipt and defer enqueueing to a later PR when worker is ready console.log( `[Webhook] Received PR label 'I2PR: Update PR' for ${repoFullName}#${pullNumber} by ${githubLogin}. Enqueue skipped (noop). installationId=${installationId}` ) @@ -38,4 +60,3 @@ export async function handlePullRequestLabelCreateDependentPR({ installationId, } } - diff --git a/shared/src/adapters/git/checkoutCommit.ts b/shared/src/adapters/git/checkoutCommit.ts new file mode 100644 index 000000000..825b4adae --- /dev/null +++ b/shared/src/adapters/git/checkoutCommit.ts @@ -0,0 +1,84 @@ +// shared/src/adapters/git/containerCheckoutCommit.ts +import { err, ok, type Result } from "@/entities/result" +import { execInContainerWithDockerode } from "@/lib/docker" +import type { + CheckoutCommitErrors, + CheckoutCommitInput, + CheckoutCommitPort, +} from "@/ports/git/checkoutCommit" + +export function makeContainerCheckoutCommitAdapter( + workdir: string | undefined = "/workspace" +): CheckoutCommitPort { + const exec = async (container: string, command: string | string[]) => + await execInContainerWithDockerode({ + name: container, + command, + cwd: workdir, + }) + + async function checkoutCommit( + container: string, + input: CheckoutCommitInput + ): Promise> { + const { sha, branch } = input + + try { + // 1) Make sure we have the commit locally + const fetchRes = await exec(container, [ + "git", + "fetch", + "origin", + "--prune", + ]) + if (fetchRes.exitCode !== 0) { + return err("GitCommandFailed", { + step: "fetch", + stderr: fetchRes.stderr, + } as never) + } + + // 2) Verify the commit exists + const revRes = await exec(container, [ + "git", + "cat-file", + "-e", + `${sha}^{commit}`, + ]) + if (revRes.exitCode !== 0) { + return err("CommitNotFound") + } + + // 3) Checkout the commit in detached mode + const detachRes = await exec(container, [ + "git", + "checkout", + "--detach", + sha, + ]) + if (detachRes.exitCode !== 0) { + return err("GitCommandFailed", { + step: "checkout-detach", + stderr: detachRes.stderr, + } as never) + } + + // 4) Create or reset the working branch at that commit (idempotent) + const branchRes = await exec(container, ["git", "checkout", "-B", branch]) + if (branchRes.exitCode !== 0) { + return err("GitCommandFailed", { + step: "checkout-branch", + stderr: branchRes.stderr, + } as never) + } + + return ok(undefined) + } catch (e: unknown) { + return err("Unknown", { + message: e instanceof Error ? e.message : String(e), + } as never) + } + } + + return { checkoutCommit } +} diff --git a/shared/src/adapters/neo4j/dataSource.ts b/shared/src/adapters/neo4j/dataSource.ts index ab3833b41..db0ec32e7 100644 --- a/shared/src/adapters/neo4j/dataSource.ts +++ b/shared/src/adapters/neo4j/dataSource.ts @@ -7,6 +7,11 @@ export type Neo4jConfig = { maxConnectionLifetimeMs?: number } +export type Neo4jDataSource = { + getDriver: () => Driver + getSession: (mode: "READ" | "WRITE") => Session +} + // Factory returns a *singleton* driver accessor closed over the config export function createNeo4jDataSource(cfg: Neo4jConfig) { let driver: Driver | undefined diff --git a/shared/src/entities/events/Job.ts b/shared/src/entities/events/Job.ts index 26be89d88..88a1a0d98 100644 --- a/shared/src/entities/events/Job.ts +++ b/shared/src/entities/events/Job.ts @@ -34,10 +34,26 @@ export const AutoResolveIssueJobSchema = z.object({ export type AutoResolveIssueJob = z.infer +export const CreateDependentPRJobSchema = z.object({ + name: z.literal("createDependentPR"), + data: z.object({ + repoFullName: z.string(), + pullNumber: z.number().int().positive(), + githubLogin: z.string(), + githubInstallationId: z.string(), + }), +}) + +export type CreateDependentPRJob = z.infer< + typeof CreateDependentPRJobSchema +> + export const JobEventSchema = z.discriminatedUnion("name", [ SummarizeIssueJobSchema, SimulateLongRunningWorkflowJobSchema, AutoResolveIssueJobSchema, + CreateDependentPRJobSchema, ]) export type JobEvent = z.infer + diff --git a/shared/src/lib/agents/DependentPRAgent.ts b/shared/src/lib/agents/DependentPRAgent.ts new file mode 100644 index 000000000..10ae36304 --- /dev/null +++ b/shared/src/lib/agents/DependentPRAgent.ts @@ -0,0 +1,104 @@ +import { ResponsesAPIAgent } from "@/shared/lib/agents/base" +import { createBranchTool } from "@/shared/lib/tools/Branch" +import { createCommitTool } from "@/shared/lib/tools/Commit" +import { createContainerExecTool } from "@/shared/lib/tools/ContainerExecTool" +import { createCreatePRTool } from "@/shared/lib/tools/CreatePRTool" +import { createFileCheckTool } from "@/shared/lib/tools/FileCheckTool" +import { createGetFileContentTool } from "@/shared/lib/tools/GetFileContent" +import { createRipgrepSearchTool } from "@/shared/lib/tools/RipgrepSearchTool" +import { createSetupRepoTool } from "@/shared/lib/tools/SetupRepoTool" +import { createSyncBranchTool } from "@/shared/lib/tools/SyncBranchTool" +import { createWriteFileContentTool } from "@/shared/lib/tools/WriteFileContent" +import { AgentConstructorParams, RepoEnvironment } from "@/shared/lib/types" +import { GitHubRepository, repoFullNameSchema } from "@/shared/lib/types/github" + +const DEVELOPER_PROMPT = ` +You are a senior software engineer focused on follow-up changes for an existing pull request. + +Objective +- Read reviewer comments, reviews, and code-review threads for a given PR. +- Make small, targeted changes that address the feedback without altering the original intent. +- Use the provided tools to search, read, edit, and verify code. Keep commits minimal and meaningful. +- When finished, push your dependent branch and create a new PR targeting the repository's default branch (for example, main) using the provided tool. + +Operating principles +1) Understand the PR and feedback: skim the diff and read comments/reviews to determine concrete follow-ups. +2) Inspect before editing: search and read files first. Never modify files you haven't inspected. +3) Keep changes scoped: only address the feedback. Avoid refactors unless necessary. +4) Verify: run repository checks (type-checks, lint). Fix issues until clean. +5) Communicate: in the PR body, summarize what feedback you addressed and any notable decisions. + +Required end state +- All changes committed on the dependent branch. +- Branch synchronized to remote. +- A PR has been created using the tool with the correct base (the repository's default branch). +` + +export interface DependentPRAgentParams extends AgentConstructorParams { + env: RepoEnvironment + defaultBranch: string + /** GitHub repository metadata */ + repository?: GitHubRepository + /** Issue number that the dependent PR should close (if any) */ + issueNumber?: number + /** GitHub token with push permissions (for SyncBranchTool) */ + sessionToken?: string + jobId?: string +} + +export class DependentPRAgent extends ResponsesAPIAgent { + constructor(params: DependentPRAgentParams) { + const { + env, + defaultBranch, + repository, + issueNumber, + sessionToken, + jobId, + ...base + } = params + + super({ model: "gpt-5", ...base }) + + if (jobId) { + this.jobId = jobId + } + + // Attach prompt with reasoning usage enabled by Responses API + this.setDeveloperPrompt(DEVELOPER_PROMPT).catch((error) => { + console.error("Error initializing DependentPRAgent system prompt:", error) + }) + + // Core workspace tools + this.addTool(createSetupRepoTool(env)) + this.addTool(createGetFileContentTool(env)) + this.addTool(createRipgrepSearchTool(env)) + this.addTool(createWriteFileContentTool(env)) + this.addTool(createBranchTool(env)) + this.addTool(createCommitTool(env, defaultBranch)) + this.addTool(createFileCheckTool(env)) + + if (env.kind === "container") { + this.addTool(createContainerExecTool(env.name)) + } + + // Remote tools + try { + if (repository) { + const repo = repoFullNameSchema.parse(repository.full_name) + if (sessionToken) { + this.addTool(createSyncBranchTool(repo, env, sessionToken)) + } + // Use the original Create PR tool so that the underlying issue is appended automatically. + this.addTool(createCreatePRTool(repository, issueNumber)) + } + } catch (err) { + console.warn( + "DependentPRAgent: Failed to attach remote tools – invalid repo info:", + err + ) + } + } +} + +export default DependentPRAgent diff --git a/shared/src/lib/docker.ts b/shared/src/lib/docker.ts index c4560a549..0d32e6615 100644 --- a/shared/src/lib/docker.ts +++ b/shared/src/lib/docker.ts @@ -128,25 +128,25 @@ export async function startContainer({ /** * Executes a command in a running container using Dockerode. - * + * * SECURITY NOTICE: * - String commands use shell interpolation (sh -c) and are vulnerable to injection * - Array commands are executed directly and are safe from injection attacks * - Always prefer array form when user input is involved - * + * * @example * // UNSAFE - vulnerable to injection if userInput contains malicious characters - * await execInContainerWithDockerode({ - * name: "container", - * command: `git checkout ${userInput}` + * await execInContainerWithDockerode({ + * name: "container", + * command: `git checkout ${userInput}` * }) - * + * * // SAFE - no shell interpolation, injection-proof - * await execInContainerWithDockerode({ - * name: "container", - * command: ["git", "checkout", userInput] + * await execInContainerWithDockerode({ + * name: "container", + * command: ["git", "checkout", userInput] * }) - * + * * @param name Container name or ID * @param command Shell command to run (string uses sh -c, array avoids shell) * @param cwd Optional working directory inside container @@ -197,7 +197,7 @@ export async function execInContainerWithDockerode({ // If command is an array, pass it directly (safer, no shell interpolation) // If command is a string, use shell to match parity with the CLI version const cmd = Array.isArray(command) ? command : ["sh", "-c", command] - + const exec = await container.exec({ Cmd: cmd, AttachStdout: true, diff --git a/shared/src/lib/github/content.ts b/shared/src/lib/github/content.ts index 90fec0c31..a902db17a 100644 --- a/shared/src/lib/github/content.ts +++ b/shared/src/lib/github/content.ts @@ -1,10 +1,9 @@ -import { withTiming } from "shared/utils/telemetry" - -import getOctokit from "@/lib/github" +import getOctokit from "@/shared/lib/github" import { AuthenticatedUserRepository, GitHubRepository, -} from "@/lib/types/github" +} from "@/shared/lib/types/github" +import { withTiming } from "@/shared/utils/telemetry" export class GitHubError extends Error { constructor( @@ -177,15 +176,12 @@ export async function checkBranchExists( } try { - await withTiming( - `GitHub REST: repos.getBranch ${repoFullName}:${branch}`, - () => - octokit.rest.repos.getBranch({ - owner, - repo, - branch, - }) - ) + octokit.rest.repos.getBranch({ + owner, + repo, + branch, + }) + return true } catch (error: unknown) { const http = error as HttpLikeError | undefined diff --git a/shared/src/lib/github/index.ts b/shared/src/lib/github/index.ts index 0df039f21..ffc4afcc9 100644 --- a/shared/src/lib/github/index.ts +++ b/shared/src/lib/github/index.ts @@ -7,8 +7,8 @@ import { Octokit } from "@octokit/rest" import * as fs from "fs/promises" import { App } from "octokit" -import { getAccessTokenOrThrow } from "@/auth" -import { ExtendedOctokit } from "@/lib/types/github" +import { getAccessTokenOrThrow } from "@/shared/auth" +import { ExtendedOctokit } from "@/shared/lib/types/github" export async function getPrivateKeyFromFile(): Promise { const privateKeyPath = process.env.GITHUB_APP_PRIVATE_KEY_PATH diff --git a/shared/src/lib/github/pullRequests.ts b/shared/src/lib/github/pullRequests.ts index 7f3a8d9f0..4b628186c 100644 --- a/shared/src/lib/github/pullRequests.ts +++ b/shared/src/lib/github/pullRequests.ts @@ -1,12 +1,11 @@ -import { logEnd, logStart, withTiming } from "shared/utils/telemetry" - -import getOctokit, { getGraphQLClient } from "@/lib/github" +import getOctokit, { getGraphQLClient } from "@/shared/lib/github" import { IssueComment, PullRequest, PullRequestList, PullRequestReview, -} from "@/lib/types/github" +} from "@/shared/lib/types/github" +import { logEnd, logStart, withTiming } from "@/shared/utils/telemetry" export async function getPullRequestOnBranch({ repoFullName, diff --git a/shared/src/lib/tools/CreatePRTool.ts b/shared/src/lib/tools/CreatePRTool.ts index 5202ae0c8..f38a042d4 100644 --- a/shared/src/lib/tools/CreatePRTool.ts +++ b/shared/src/lib/tools/CreatePRTool.ts @@ -4,9 +4,9 @@ import { addLabelsToPullRequest, createPullRequest, getPullRequestOnBranch, -} from "@/lib/github/pullRequests" -import { createTool } from "@/lib/tools/helper" -import { GitHubRepository } from "@/lib/types/github" +} from "@/shared/lib/github/pullRequests" +import { createTool } from "@/shared/lib/tools/helper" +import { GitHubRepository } from "@/shared/lib/types/github" const createPRParameters = z.object({ branch: z diff --git a/shared/src/lib/tools/SyncBranchTool.ts b/shared/src/lib/tools/SyncBranchTool.ts index 7f29e36b4..54166c3cc 100644 --- a/shared/src/lib/tools/SyncBranchTool.ts +++ b/shared/src/lib/tools/SyncBranchTool.ts @@ -1,13 +1,13 @@ import { z } from "zod" -import { execInContainerWithDockerode } from "@/lib/docker" -import { pushBranch } from "@/lib/git" -import { checkBranchExists } from "@/lib/github/content" -import { BranchCreationStatus, createBranch } from "@/lib/github/git" -import { createTool } from "@/lib/tools/helper" -import { asRepoEnvironment, RepoEnvironment, Tool } from "@/lib/types" -import { RepoFullName } from "@/lib/types/github" -import { getCloneUrlWithAccessToken } from "@/lib/utils/utils-common" +import { execInContainerWithDockerode } from "@/shared/lib/docker" +import { pushBranch } from "@/shared/lib/git" +import { checkBranchExists } from "@/shared/lib/github/content" +import { BranchCreationStatus, createBranch } from "@/shared/lib/github/git" +import { createTool } from "@/shared/lib/tools/helper" +import { asRepoEnvironment, RepoEnvironment, Tool } from "@/shared/lib/types" +import { RepoFullName } from "@/shared/lib/types/github" +import { getCloneUrlWithAccessToken } from "@/shared/lib/utils/utils-common" const syncBranchParameters = z.object({ branch: z diff --git a/shared/src/ports/git/checkoutCommit.ts b/shared/src/ports/git/checkoutCommit.ts new file mode 100644 index 000000000..4bae3ee1a --- /dev/null +++ b/shared/src/ports/git/checkoutCommit.ts @@ -0,0 +1,31 @@ +// shared/src/ports/git/checkoutCommit.ts +import type { Result } from "@/entities/result" + +export type CheckoutCommitInput = { + /** Commit SHA to base the working branch on */ + sha: string + /** Name of the working branch to (re)create pointing at sha */ + branch: string +} + +export type CheckoutCommitErrors = + | "RepoNotFound" + | "CommitNotFound" + | "GitCommandFailed" + | "Unknown" + +export interface CheckoutCommitPort { + /** + * Ensure a working branch points at the given commit. + * + * Implementations should: + * - Fetch so that `sha` exists locally + * - Verify the commit exists + * - Checkout the commit (usually detached) + * - Create/reset `branch` to point at that commit + */ + checkoutCommit( + container: string, + input: CheckoutCommitInput + ): Promise> +} diff --git a/shared/src/services/job.ts b/shared/src/services/job.ts index 611360867..e593934ab 100644 --- a/shared/src/services/job.ts +++ b/shared/src/services/job.ts @@ -1,8 +1,8 @@ import { JobsOptions } from "bullmq" -import { JobEvent } from "@/entities/events/Job" -import type { QueueEnum } from "@/entities/Queue" -import { getQueue } from "@/services/queue" +import { JobEvent } from "@/shared/entities/events/Job" +import type { QueueEnum } from "@/shared/entities/Queue" +import { getQueue } from "@/shared/services/queue" /** * Enqueue a job onto a specific queue with a specific job name. diff --git a/shared/src/services/queue.ts b/shared/src/services/queue.ts index b8c5cd96f..caf31616f 100644 --- a/shared/src/services/queue.ts +++ b/shared/src/services/queue.ts @@ -1,7 +1,7 @@ import { Queue } from "bullmq" -import { getRedisConnection } from "@/adapters/ioredis/client" -import { QueueEnum } from "@/entities/Queue" +import { getRedisConnection } from "@/shared/adapters/ioredis/client" +import { QueueEnum } from "@/shared/entities/Queue" const queuesByKey = new Map() export function getQueue(name: QueueEnum, redisUrl: string): Queue { diff --git a/shared/src/usecases/workflows/createDependentPR.ts b/shared/src/usecases/workflows/createDependentPR.ts new file mode 100644 index 000000000..02ab83a3a --- /dev/null +++ b/shared/src/usecases/workflows/createDependentPR.ts @@ -0,0 +1,346 @@ +import { v4 as uuidv4 } from "uuid" + +import DependentPRAgent from "@/shared/lib/agents/DependentPRAgent" +import { execInContainerWithDockerode } from "@/shared/lib/docker" +import { getRepoFromString } from "@/shared/lib/github/content" +import { getInstallationTokenFromRepo } from "@/shared/lib/github/installation" +import { getIssue } from "@/shared/lib/github/issues" +import { + getLinkedIssuesForPR, + getPullRequest, + getPullRequestComments, + getPullRequestDiff, + getPullRequestReviewCommentsGraphQL, + getPullRequestReviews, +} from "@/shared/lib/github/pullRequests" +import { checkRepoPermissions } from "@/shared/lib/github/users" +import { langfuse } from "@/shared/lib/langfuse" +import { + createErrorEvent, + createStatusEvent, + createWorkflowStateEvent, +} from "@/shared/lib/neo4j/services/event" +import { initializeWorkflowRun } from "@/shared/lib/neo4j/services/workflow" +import { createBranchTool } from "@/shared/lib/tools/Branch" +import { RepoEnvironment } from "@/shared/lib/types" +import { GitHubIssue, RepoPermissions } from "@/shared/lib/types/github" +import { + createContainerizedDirectoryTree, + createContainerizedWorkspace, +} from "@/shared/lib/utils/container" +import { setupLocalRepository } from "@/shared/lib/utils/utils-server" +import { EventBusPort } from "@/shared/ports/events/eventBus" +import { createWorkflowEventPublisher } from "@/shared/ports/events/publisher" +import { CheckoutCommitPort } from "@/shared/ports/git/checkoutCommit" +import { SettingsReaderPort } from "@/shared/ports/repositories/settings.reader" + +interface CreateDependentPRParams { + repoFullName: string + pullNumber: number + login: string + jobId?: string +} + +interface Ports { + settings: SettingsReaderPort + eventBus?: EventBusPort + gitCheckout: CheckoutCommitPort +} + +export async function createDependentPRWorkflow( + { repoFullName, pullNumber, login, jobId }: CreateDependentPRParams, + { settings, eventBus, gitCheckout }: Ports +) { + const workflowId = jobId || uuidv4() + let containerCleanup: (() => Promise) | null = null + + const pub = createWorkflowEventPublisher(eventBus, workflowId) + + try { + // Get API key from settings + const apiKeyResult = await settings.getOpenAIKey(login) + if (!apiKeyResult.ok || !apiKeyResult.value) { + pub.workflow.error("No API key provided and no user settings found") + throw new Error("No API key provided and no user settings found") + } + const apiKey = apiKeyResult.value + + // Initialize workflow run + await initializeWorkflowRun({ + id: workflowId, + type: "createDependentPR", + repoFullName, + postToGithub: true, + }) + + await createWorkflowStateEvent({ workflowId, state: "running" }) + + await createStatusEvent({ + workflowId, + content: `Starting dependent PR workflow for ${repoFullName}#${pullNumber}`, + }) + + // Fetch PR details and context + const pr = await getPullRequest({ repoFullName, pullNumber }) + const headRef = pr.head.ref + const headSha = pr.head.sha + const baseRef = pr.base.ref + + await createStatusEvent({ + workflowId, + content: `PR #${pullNumber}: ${baseRef} <- ${headRef}`, + }) + + // Fetch linked issue (first closing reference if any) and PR artifacts in parallel + let linkedIssue: GitHubIssue | undefined + const [linkedIssues, diff, comments, reviews, reviewThreads] = + await Promise.all([ + getLinkedIssuesForPR({ repoFullName, pullNumber }), + getPullRequestDiff({ repoFullName, pullNumber }), + getPullRequestComments({ repoFullName, pullNumber }), + getPullRequestReviews({ repoFullName, pullNumber }), + getPullRequestReviewCommentsGraphQL({ repoFullName, pullNumber }), + ]) + if (linkedIssues.length > 0) { + const res = await getIssue({ + fullName: repoFullName, + issueNumber: linkedIssues[0], + }) + if (res.type === "success") linkedIssue = res.issue + } + + // Ensure local repository exists and is up-to-date + const repo = await getRepoFromString(repoFullName) + const hostRepoPath = await setupLocalRepository({ + repoFullName, + workingBranch: repo.default_branch, + }) + + // Setup containerized workspace using the local copy + const { containerName, cleanup } = await createContainerizedWorkspace({ + repoFullName, + branch: repo.default_branch, + workflowId, + hostRepoPath, + }) + const env: RepoEnvironment = { kind: "container", name: containerName } + containerCleanup = cleanup + + // Authenticate remote for fetch/push + const [owner, repoName] = repoFullName.split("/") + const sessionToken = await getInstallationTokenFromRepo({ + owner, + repo: repoName, + }) + + // Check permissions + const permissions: RepoPermissions | null = + await checkRepoPermissions(repoFullName) + if (!permissions?.canPush || !permissions?.canCreatePR) { + await createStatusEvent({ + workflowId, + content: `Warning: Insufficient permissions to push or create PRs (${permissions?.reason || "unknown"}). Will still attempt local changes and report back.`, + }) + } + + // Ensure origin remote embeds credentials + if (sessionToken) { + const { exitCode: remoteSetExitCode } = + await execInContainerWithDockerode({ + name: containerName, + command: [ + "git", + "remote", + "set-url", + "origin", + `https://x-access-token:${sessionToken}@github.com/${repoFullName}.git`, + ], + }) + if (remoteSetExitCode !== 0) { + await createStatusEvent({ + workflowId, + content: `Failed to set origin remote: ${remoteSetExitCode}`, + }) + throw new Error(`Failed to set origin remote: ${remoteSetExitCode}`) + } + } + + // Fetch and checkout the PR head branch + const dependentBranch = `${headRef}-followup-${workflowId.slice(0, 8)}` + await createStatusEvent({ + workflowId, + content: `Checking out PR head commit ${headSha} into branch ${dependentBranch}`, + }) + + const checkoutResult = await gitCheckout.checkoutCommit(containerName, { + sha: headSha, + branch: dependentBranch, + }) + + if (!checkoutResult.ok) { + await createStatusEvent({ + workflowId, + content: `Failed to checkout PR head: ${checkoutResult.error}`, + }) + throw new Error(`Failed to checkout PR head: ${checkoutResult.error}`) + } + + // Create the branch tool + const branchTool = createBranchTool(env) + await branchTool.handler({ + branch: dependentBranch, + createIfNotExists: true, + }) + + // Create directory tree for context + let tree: string[] = [] + try { + tree = await createContainerizedDirectoryTree(containerName) + } catch (e) { + await createStatusEvent({ + workflowId, + content: `Warning: Failed to create directory tree: ${String(e)}`, + }) + tree = ["(directory tree not available)"] + } + + // Initialize the dependent PR agent (reasoning-enabled) + const agent = new DependentPRAgent({ + apiKey, + env, + defaultBranch: repo.default_branch, + repository: repo, + issueNumber: linkedIssue?.number, + sessionToken: sessionToken || undefined, + jobId: workflowId, + }) + + const trace = langfuse.trace({ + name: `Create dependent PR for #${pullNumber}`, + input: { repoFullName, pullNumber }, + }) + const span = trace.span({ name: "createDependentPR" }) + agent.addSpan({ span, generationName: "createDependentPR" }) + + // Seed agent with context and instructions + const formattedComments = comments + .map( + (c, i) => + `Comment ${i + 1} by ${c.user?.login || "unknown"} at ${ + c.created_at + ? new Date(c.created_at).toLocaleString() + : new Date().toLocaleString() + }\n${c.body}` + ) + .join("\n\n") + + const formattedReviews = reviews + .map( + (r, i) => + `Review ${i + 1} by ${r.user?.login || "unknown"} (${r.state}) at ${new Date( + r.submitted_at || new Date().toISOString() + ).toLocaleString()}\n${r.body || "No comment provided"}` + ) + .join("\n\n") + + // Include review line comments (code review threads) + const formattedReviewThreads = reviewThreads + .map((rev, i) => { + const header = `Review Thread ${i + 1} by ${rev.author || "unknown"} (${rev.state}) at ${new Date( + rev.submittedAt || new Date().toISOString() + ).toLocaleString()}\n${rev.body || "No review body"}` + const commentsBlock = (rev.comments || []) + .map((c) => { + const hunk = c.diffHunk ? `\n Hunk:\n${c.diffHunk}` : "" + return ` - [${c.file || "unknown file"}] ${c.body}${hunk}` + }) + .join("\n") + return commentsBlock ? `${header}\n${commentsBlock}` : header + }) + .join("\n\n") + + const message = ` +# Goal +Implement a follow-up patch that addresses reviewer comments and discussion on PR #${pullNumber}. Work directly on branch '${dependentBranch}' which is branched off '${headRef}'. When done, push this branch to origin using the sync tool, then create a new PR targeting the repository's default branch ('${repo.default_branch}') using the create_pull_request tool. + +# Repository +${repoFullName} + +# Linked Issue +${linkedIssue ? `#${linkedIssue.number} ${linkedIssue.title}\n${linkedIssue.body}` : "(none)"} + +# Codebase Directory +${tree.join("\n")} + +# Current PR Diff (truncated) +${diff.slice(0, 200000)} +... (truncated) + +${formattedComments ? `# Comments\n${formattedComments}\n` : ""} +${formattedReviews ? `# Reviews\n${formattedReviews}\n` : ""} +${formattedReviewThreads ? `# Review Line Comments\n${formattedReviewThreads}\n` : ""} + +# Requirements +- Make only the changes necessary to satisfy the feedback in comments and reviews. +- Keep changes small and focused. +- Use meaningful commit messages. +- Run repo checks (type-check/lint) via the provided tools before finishing. +- When finished, push branch '${dependentBranch}' to GitHub using the sync tool. +- Finally, create a PR with base '${repo.default_branch}' using the create_pull_request tool. Choose a clear title and provide a detailed description of the changes you made in response to the reviews. Do not manually append issue references in the body; they will be added automatically if applicable. +` + + await agent.addInput({ role: "user", type: "message", content: message }) + + await createStatusEvent({ + workflowId, + content: "Starting dependent PR agent", + }) + + const result = await agent.runWithFunctions() + + // Ensure branch is pushed + await createStatusEvent({ + workflowId, + content: `Ensuring branch ${dependentBranch} is pushed`, + }) + if (sessionToken) { + const { exitCode } = await execInContainerWithDockerode({ + name: containerName, + command: ["git", "push", "-u", "origin", dependentBranch], + }) + if (exitCode !== 0) { + throw new Error(`Failed to push branch ${dependentBranch}`) + } + } else { + await createStatusEvent({ + workflowId, + content: `Warning: No session token available, branch ${dependentBranch} was not pushed to remote.`, + }) + } + + await createWorkflowStateEvent({ workflowId, state: "completed" }) + return { + agentResult: result, + branch: dependentBranch, + } + } catch (error) { + await createErrorEvent({ workflowId, content: String(error) }) + await createWorkflowStateEvent({ + workflowId, + state: "error", + content: String(error), + }) + throw error + } finally { + if (containerCleanup) { + try { + await containerCleanup() + } catch (e) { + await createStatusEvent({ + workflowId, + content: `Warning: container cleanup failed: ${String(e)}`, + }) + } + } + } +} diff --git a/shared/tsconfig.json b/shared/tsconfig.json index 95e57326d..dca09e728 100644 --- a/shared/tsconfig.json +++ b/shared/tsconfig.json @@ -8,7 +8,8 @@ "baseUrl": ".", "paths": { "@/*": ["./src/*"], - "@shared/*": ["./src/*"] + "@shared/*": ["./src/*"], + "@/shared/*": ["./src/*"] }, /* Emit */ diff --git a/tsconfig.json b/tsconfig.json index ce86e042e..ab63d8453 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -21,6 +21,7 @@ "@/lib/*": ["lib/*"], "@/components/*": ["components/*"], "@/styles/*": ["styles/*"], + "@/shared/*": ["shared/src/*"], "@shared/*": ["shared/src/*"] }, @@ -41,5 +42,5 @@ ".next/types/**/*.ts", "shared/dist/**/*.d.ts" ], - "exclude": ["node_modules", "apps/**"] + "exclude": ["node_modules", "apps/**", "shared/**"] }