Skip to content

feat: v4 deadlock detection #1970

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Apr 25, 2025
Merged

feat: v4 deadlock detection #1970

merged 15 commits into from
Apr 25, 2025

Conversation

ericallam
Copy link
Member

@ericallam ericallam commented Apr 23, 2025

This PR implements robust deadlock detection when triggering and waiting for a child task chain. It's possible, when not releasing concurrency (e.g. on a queue with a concurrency limit), to get into a situation where the parent tasks are holding all the concurrency available on a queue, so the child task could never be executed, causing the parent task chain to become stuck. We now detect this situation by propagating "Run chain state" through the task hierarchy that accounts for the amount of concurrency behind held in queues along the direct ancestor chain, allowing us to detect issues.

Some required work needed for this PR to work are also useful additions:

  • When triggering a task that is locked to a version (e.g. when calling triggerAndWait), we will now resolve lockedQueueId and the task to the version that's passed in, ensuring they exist. An error will be thrown if they do not exist. Previously this was allowed and runs would end up in PENDING_VERSION and never get executed.
  • API 4xx errors will no longer cause tasks to get retried.
  • Massive cleanup of the RunEngineTriggerTaskService, including new tests in the webapp for it

CleanShot 2025-04-23 at 17 07 57

TODO:

  • Test batchTriggerAndWait

Summary by CodeRabbit

  • New Features

    • Introduced advanced run chain state management with concurrency release control and deadlock detection featuring detailed error messages and documentation links.
    • Added support for idempotency keys with caching and traceable payload processing.
    • Modularized queue management with validation of queue size limits and master queue retrieval.
    • Enhanced tracing capabilities for task runs including idempotent runs with detailed span attributes.
    • Added comprehensive validation for tags, entitlement, max attempts, and parent run status.
    • Extended database schema and types to support run chain state and queue locking.
    • Provided detailed types and interfaces for trigger task domain services and validations.
  • Bug Fixes

    • Improved retry logic to skip retries on unretryable client API errors and correctly handle rate limit delays.
    • Enhanced error handling with specific concurrency deadlock error enhancements and one-time-use token error support.
  • Documentation

    • Added example tasks and queues illustrating deadlock scenarios and invalid queue/task resolution.
  • Tests

    • Added extensive integration tests covering run engine task triggering, idempotency, queue locking, run chains, concurrency control, and error scenarios.
  • Refactor

    • Refactored core task triggering service for explicit dependency injection, modular concerns, and improved traceability.
  • Chores

    • Updated debug configurations and package exports to improve developer experience and testing utilities.

Copy link

changeset-bot bot commented Apr 23, 2025

🦋 Changeset detected

Latest commit: 21d9eee

The changes in this PR will be included in the next version bump.

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

Copy link
Contributor

coderabbitai bot commented Apr 23, 2025

Walkthrough

This update introduces a comprehensive modular refactor and feature expansion of the task run engine, focusing on queue management, idempotency, concurrency control, payload processing, and tracing. It adds new classes and interfaces for each concern, updates the data model and schema to support run chain state and locked queue identifiers, and enhances error handling for deadlock and concurrency scenarios. The trigger task service is refactored to inject these concerns, and extensive integration tests are added. Supporting changes include new error types, improved API error handling, schema and migration updates, and demonstration tasks for deadlock and queue locking scenarios.

Changes

File(s) / Path(s) Change Summary
apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts
apps/webapp/app/runEngine/concerns/payloads.server.ts
apps/webapp/app/runEngine/concerns/queues.server.ts
apps/webapp/app/runEngine/concerns/runChainStates.server.ts
apps/webapp/app/runEngine/concerns/runNumbers.server.ts
apps/webapp/app/runEngine/concerns/traceEvents.server.ts
apps/webapp/app/runEngine/validators/triggerTaskValidator.ts
Introduced new modular concern classes for idempotency key handling, payload processing, queue management, run chain state management, run number incrementing, trace event handling, and trigger task validation. Each class implements a corresponding interface and encapsulates logic for its domain (e.g., queue resolution, concurrency validation, payload offloading, tracing, etc.).
apps/webapp/app/runEngine/services/triggerTask.server.ts Refactored RunEngineTriggerTaskService to remove inheritance and inject new concerns for queue, validation, payload, idempotency, run number, tracing, and run chain state. The call method is rewritten to use these services, improving modularity, error handling, and validation. Old helper functions removed.
apps/webapp/app/runEngine/types.ts Added comprehensive type definitions and interfaces for trigger task requests, results, queue management, validation, tracing, and run chain state management.
apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts Added handling for EngineServiceValidationError in the trigger API, returning status from the error or defaulting to 422. Updated existing error handling to use error status if available.
apps/webapp/app/runEngine/concerns/errors.ts Added new error class EngineServiceValidationError for use in engine service validation failures.
apps/webapp/app/components/runs/v3/SpanEvents.tsx
apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx
Updated error message rendering to use a <pre> block with monospace font and color styling for improved readability and whitespace preservation.
apps/webapp/app/v3/models/workerDeployment.server.ts Added optional prismaClient parameter to worker deployment query functions to support dependency injection and transactional context.
apps/webapp/app/runEngine/services/batchTrigger.server.ts Moved import statement for startActiveSpan for code organization; no logic changes.
apps/webapp/app/v3/services/triggerTask.server.ts Enhanced TriggerTaskService.callV2 to inject all new concerns and utilities into RunEngineTriggerTaskService, enabling advanced run engine features.
apps/webapp/test/engine/triggerTask.test.ts Added comprehensive integration tests for the new trigger task service, covering basic triggering, idempotency, queue locking, run chains, concurrency, deadlock detection, and queue edge cases.
internal-packages/database/prisma/migrations/20250422152423_add_run_chain_state_to_task_run/migration.sql
internal-packages/database/prisma/schema.prisma
Added runChainState JSONB field to the TaskRun model and database schema for storing run chain/concurrency state.
internal-packages/run-engine/src/engine/db/worker.ts Modified logic to prioritize queue lookup by lockedQueueId if present, falling back to name-based lookup.
internal-packages/run-engine/src/engine/errors.ts
internal-packages/run-engine/src/index.ts
Added and exported new error class RunOneTimeUseTokenError for handling one-time-use token errors.
internal-packages/run-engine/src/engine/index.ts
internal-packages/run-engine/src/engine/types.ts
Extended RunEngine.trigger and related types to accept and propagate lockedQueueId and runChainState parameters, storing them in the run data and tracing context.
internal-packages/run-engine/src/engine/tests/setup.ts Updated test environment setup to create worker instance groups and associate tasks with queues explicitly.
internal-packages/run-engine/package.json Added new subpath export "./tests" to allow importing test setup utilities from the package.
internal-packages/testcontainers/package.json Removed trailing newline; no functional changes.
internal-packages/testcontainers/src/index.ts Refactored to extract and log database connection URI before initializing PrismaClient.
packages/core/src/v3/apiClient/errors.ts Improved error message construction logic in ApiError.makeMessage, adding explicit handling for string errors and refining fallback order.
packages/core/src/v3/errors.ts Added specific enhancement for concurrency deadlock errors in taskRunErrorEnhancer, returning user-friendly info and docs link.
packages/core/src/v3/links.ts Added deadlock documentation link under docs.concurrency.
packages/core/src/v3/schemas/schemas.ts Added RunChainState Zod schema and type for concurrency/chain state tracking.
packages/core/src/v3/workers/taskExecutor.ts Updated error handling to skip retries for unretryable API errors (400–499 except 408/429) and improved rate limit retry delay logic.
packages/core/test/taskExecutor.test.ts Added tests for API error retry behavior, ensuring unretryable errors are skipped and rate limit delays are respected.
references/hello-world/src/trigger/deadlocks.ts Added demonstration tasks and queue that intentionally create a concurrency deadlock scenario.
references/hello-world/src/trigger/queues.ts Added tasks demonstrating locked queue and identifier scenarios, including triggering with non-existent queues/tasks.
.vscode/launch.json Updated Node.js debug launch config to target triggerTask.test.ts instead of fairDequeuingStrategy.test.ts.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant TriggerAPI
    participant RunEngineTriggerTaskService
    participant QueueManager
    participant PayloadProcessor
    participant IdempotencyKeyConcern
    participant RunNumberIncrementer
    participant TraceEventConcern
    participant RunChainStateManager
    participant RunEngine
    participant DB

    Client->>TriggerAPI: POST /api.v1/tasks/:taskId/trigger
    TriggerAPI->>RunEngineTriggerTaskService: call(request)
    RunEngineTriggerTaskService->>TriggerTaskValidator: validate (tags, entitlement, attempts, parent run)
    RunEngineTriggerTaskService->>IdempotencyKeyConcern: handleTriggerRequest(request)
    alt Cached run exists
        IdempotencyKeyConcern-->>RunEngineTriggerTaskService: {cached: true, run}
        RunEngineTriggerTaskService->>TraceEventConcern: traceIdempotentRun(...)
        TraceEventConcern-->>RunEngineTriggerTaskService: span
        RunEngineTriggerTaskService-->>TriggerAPI: Return cached run
    else No cached run
        IdempotencyKeyConcern-->>RunEngineTriggerTaskService: {cached: false, idempotencyKey, expiration}
        RunEngineTriggerTaskService->>QueueManager: validateQueueLimits(env)
        RunEngineTriggerTaskService->>PayloadProcessor: process(request)
        RunEngineTriggerTaskService->>QueueManager: resolveQueueProperties(request)
        RunEngineTriggerTaskService->>RunChainStateManager: validateRunChain(request, context)
        RunEngineTriggerTaskService->>TraceEventConcern: traceRun(request, callback)
        TraceEventConcern->>RunNumberIncrementer: incrementRunNumber(request, callback)
        RunNumberIncrementer->>RunEngine: trigger(request, queue info, runChainState)
        RunEngine->>DB: Create TaskRun (w/ lockedQueueId, runChainState)
        RunEngine-->>RunNumberIncrementer: run
        RunNumberIncrementer-->>TraceEventConcern: run
        TraceEventConcern-->>RunEngineTriggerTaskService: run
        RunEngineTriggerTaskService-->>TriggerAPI: Return new run
    end
Loading

Possibly related PRs

  • triggerdotdev/trigger.dev#1852: Minor query enhancement in RunEngineTriggerTaskService.call related to database select clause; related at code level but less extensive than this modular refactor.
  • triggerdotdev/trigger.dev#1935: Refactors and improves parent run validation in trigger task service, closely related to concurrency and run chain handling in this PR.

Suggested reviewers

  • matt-aitken

Poem

In the engine's warren, code hops anew,
With queues and chains and payloads too.
Deadlocks detected, concurrency tamed,
Idempotent rabbits—no run is the same!
Tracing each hop, with errors well caught,
This modular meadow is more robust than we thought.
🐇✨

—A celebratory hop from your CodeRabbit!

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

🔭 Outside diff range comments (2)
apps/webapp/app/v3/models/workerDeployment.server.ts (2)

83-90: ⚠️ Potential issue

$prisma is ignored for the fallback query – breaks transactional safety

You correctly derive const $prisma = prismaClient ?? prisma; but the second query (lines 127‑154) still calls the global prisma.workerDeployment.findFirst.
Inside a transaction this will silently jump out of the txn context, defeating the whole point of the new prismaClient parameter and can even lead to inconsistent reads.

-  const latestDeployment = await prisma.workerDeployment.findFirst({
+  const latestDeployment = await $prisma.workerDeployment.findFirst({

Please sweep the rest of the file (and callers) for similar slips.

Also applies to: 127-154


197-215: 🛠️ Refactor suggestion

Possible type mismatch – environment.projectId may not exist

WorkerGroupService#getDefaultWorkerGroupForProject expects { projectId } but AuthenticatedEnvironment (see relevant snippets) only guarantees a nested project object.
Unless a custom type augmentation adds projectId, this will be a TypeScript error once strict mode is enabled and could be a runtime undefined at line 207.

Guard or derive the id explicitly:

const projectId = environment.project?.id ?? environment.projectId;
if (!projectId) { /* throw */ }
🧹 Nitpick comments (23)
apps/webapp/app/runEngine/concerns/runNumbers.server.ts (1)

4-14: Expose the dependency on autoIncrementCounter for testability

DefaultRunNumberIncrementer directly imports the singleton autoIncrementCounter, which makes unit‑testing harder (no easy way to stub). Passing the counter in through the constructor keeps the class pure and easier to mock.

-export class DefaultRunNumberIncrementer implements RunNumberIncrementer {
-  async incrementRunNumber<T>(
+export class DefaultRunNumberIncrementer implements RunNumberIncrementer {
+  constructor(
+    private readonly counter = autoIncrementCounter, // default to singleton
+  ) {}
+
+  async incrementRunNumber<T>(
     request: TriggerTaskRequest,
     callback: (num: number) => Promise<T>
   ): Promise<T | undefined> {
-    return await autoIncrementCounter.incrementInTransaction(
+    return await this.counter.incrementInTransaction(
       `v3-run:${request.environment.id}:${request.taskId}`,
       callback
     );
   }
 }
apps/webapp/app/v3/services/triggerTask.server.ts (2)

105-107: Environment flag parsed inline – extract a typed helper

env.RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED === "1" is repeated in several places across the codebase. Wrapping this in a helper (e.g., isConcurrencyReleaseEnabled()) prevents string‑comparison typos and centralises documentation of the flag.


3-7: Import list becoming long – consider barrel files for concerns

With eight concerns injected, the import section is getting unwieldy. A simple ~/runEngine/concerns/index.ts barrel export would improve readability:

import {
  DefaultQueueManager,
  DefaultPayloadProcessor,
  ...
} from "~/runEngine/concerns";
references/hello-world/src/trigger/deadlocks.ts (1)

11-16: Unused context parameter

{ ctx } is destructured in both tasks but never used. Removing it avoids the unused‑var lint error and clarifies the example.

-run: async (payload: any, { ctx }) => {
+run: async (payload: any) => {
packages/core/test/taskExecutor.test.ts (2)

10-11: Unused imports can be safely removed

TaskRunExecutionResult and TaskRunExecutionRetry are imported but are not referenced anywhere in this test file. Removing them keeps the import list tidy and avoids misleading readers into thinking the types are used.

-import {
-  TaskRunExecutionResult,
-  TaskRunExecutionRetry,
-} from "../src/v3/index.js";
+// (No longer needed – remove)

1773-1813: Flaky upper bound on 429‑retry delay

delay is expected to be < 32 000 ms (2 s of jitter) but the real implementation might legitimately apply more jitter or simply round up to the nearest second. This could cause sporadic CI failures.

Consider asserting only the lower bound (must be greater than ≈ 29 s) or widening the upper bound:

-    expect(delay).toBeLessThan(32000); // Account for max 2000ms jitter
+    expect(delay).toBeLessThan(40000); // Allow wider jitter tolerance

or

expect(delay).toBeGreaterThan(29500);

This keeps the test focused on “did we honour the rate‑limit reset?” rather than a tight jitter window.

apps/webapp/app/runEngine/concerns/payloads.server.ts (1)

52-61: Broaden #createPayloadPacket support beyond string & JSON

Binary or stream payloads (e.g. Uint8Array, ReadableStream) are silently treated as “no packet data”, meaning they will never be off‑loaded or uploaded. Consider:

if (payload instanceof Uint8Array) {
  return { data: payload, dataType: payloadType };
}

if (payload instanceof ReadableStream) {
  return { data: payload, dataType: payloadType };
}

to make the processor future‑proof.

apps/webapp/app/runEngine/validators/triggerTaskValidator.ts (2)

24-36: Tag validation ignores comma‑separated string form

When tags is a single string (e.g. “foo,bar”) the max‑tags check is skipped.
Either document that only an array is supported or split the string:

const tagList = Array.isArray(tags) ? tags : tags.split(",");
if (tagList.length > MAX_TAGS_PER_RUN) {  }

87-99: Parent‑run validation may double‑log on concurrent calls

If multiple children attempt to resume the same completed parent, each will log the “terminal state” message. Consider throttling or downgrading to trace to avoid noisy logs in large fan‑out scenarios.

apps/webapp/app/runEngine/concerns/traceEvents.server.ts (1)

44-51: Avoid any‐casts when forwarding attributes to the underlying event builder

The helper passed to callers casts the attribute key to any before forwarding to event.setAttribute.
This defeats the compile‑time guarantees the SemanticInternalAttributes enum is giving you and silently allows misspelled / unknown attributes to creep in.

- setAttribute: (key, value) => event.setAttribute(key as any, value),
+ setAttribute: <K extends keyof typeof SemanticInternalAttributes>(
+   key: K,
+   value: string
+ ) => event.setAttribute(
+   SemanticInternalAttributes[key],
+   value
+ ),

By typing the function generically you keep strict type‑safety and the IDE will auto‑complete valid keys.

apps/webapp/app/runEngine/types.ts (2)

10-11: Remove unused import to keep the build clean

z is imported but never referenced in this file.

-import { z } from "zod";

Leaving dead imports can produce linter noise and slower type‑checking.


112-118: Make validateTags & friends consistently async

All validation methods except validateTags, validateMaxAttempts, and validateParentRun are asynchronous.
For consistency and future‑proofing (e.g., remote feature flags), you may want to standardise on Promise<ValidationResult> for the entire interface.

No code diff provided to avoid breaking callers, but worth considering.

apps/webapp/app/v3/models/workerDeployment.server.ts (1)

162-185: Consider allowing dependency‑injection here too

getCurrentWorkerDeploymentEngineVersion still hard‑codes the global prisma instance.
If callers start executing it inside a transaction we’ll hit the same consistency issue as above. Exporting an overload or optional prismaClient parameter keeps the API symmetrical.

apps/webapp/app/runEngine/concerns/queues.server.ts (3)

37-45: Inefficient lookup pattern for locked queues

taskQueue.findFirst is executed with no projection (select) which pulls every column even though you only need id and name. Reducing the payload cuts memory and lowers wire latency.

- const specifiedQueue = await this.prisma.taskQueue.findFirst({
-   where: { ... }
- });
+ const specifiedQueue = await this.prisma.taskQueue.findFirst({
+   where: { ... },
+   select: { id: true, name: true },
+ });

75-92: Duplicate sanitisation & missed empty‑string edge case

queueName is sanitised twice (sanitizeQueueName(queueName) into sanitizedQueueName and potentially once more in the fallback).
Also, if sanitizeQueueName returns "" for the fallback string, we end up with an empty queue name again.

A simpler, single‑pass approach:

- const sanitized = sanitizeQueueName(queueName || `task/${request.taskId}`);
- if (!sanitized) {
-   throw new EngineServiceValidationError("Queue name cannot be empty");
- }
- queueName = sanitized;
+ queueName = sanitizeQueueName(queueName || `task/${request.taskId}`);
+ if (!queueName) {
+   throw new EngineServiceValidationError("Queue name cannot be empty after sanitisation");
+ }

201-215: Instantiate WorkerGroupService on each call – cache or inject

getMasterQueue creates a new WorkerGroupService every invocation. In hot paths this may be hundreds of objects per second. Either:

  • Accept a pre‑built service via the constructor, or
  • Promote a singleton via singleton() helper like elsewhere in the codebase.
apps/webapp/test/engine/triggerTask.test.ts (4)

548-556: Stray console.log pollutes test output

console.log(result); will spam CI logs and can hide useful Vitest output. Remove or gate behind a DEBUG flag.


632-659: Engine configuration duplicated 4× – extract a helper

The boilerplate new RunEngine({ prisma, worker: { … block is repeated in every test suite.
A factory like createTestEngine(redisOptions, prisma) would reduce >150 LOC and guarantee consistent config across tests.

Also applies to: 832-860, 963-990, 1092-1120


740-776: Potential brittle assertions on run.runChainState

The deep‑equality checks hard‑code queue ordering (parent queue first, child second). If implementation switches to Set/Map order or sorts differently these tests will fail although behaviour is identical.

Consider validating with expect.arrayContaining([...]) or custom matcher that ignores ordering and asserts presence & counts instead.


430-447: Error message assertion tightly couples to wording

The rejection expectation matches the exact string emitted by EngineServiceValidationError. Minor copy edits will break the test suite.

Prefer a regex or at least toContain("Specified queue").

apps/webapp/app/runEngine/concerns/runChainStates.server.ts (3)

165-172: Prefer findUnique to findFirst when querying by primary key

All three queries use an id equality filter, which is Prisma’s primary key.
findUnique is semantically clearer, marginally faster (skips LIMIT 1 scan logic), and prevents accidental “first match” behaviour if future engineers extend the where clause.

-const queue = await this.prisma.taskQueue.findFirst({
+const queue = await this.prisma.taskQueue.findUnique({
   where: { id: lockedQueueId },
   select: { concurrencyLimit: true },
 });

Apply the same change to the queries at lines 200‑208 and 237‑241.

Also applies to: 200-208, 237-241


25-28: Return-by-reference risks accidental mutation of the parent chain state

parentRunChainState (and {} in the earlier branch) is returned directly.
Because the object is later stored on the child run, any in‑process mutation by callers would also mutate the parent’s cached state, producing subtle race conditions.

A cheap defensive copy avoids the issue:

-    if (!parentRun) {
-      return {};
-    }
+    if (!parentRun) {
+      return {};
+    }

 ...

-      return parentRunChainState;
+      return { ...parentRunChainState };

(Use a deep clone if nested objects may subsequently be mutated.)

Also applies to: 38-48


140-142: Verbose debug log can bloat application logs

runChainState may become large as chains grow. Consider either:

  1. Logging only a serialised summary (e.g. JSON.stringify(runChainState).slice(0, 1_000)), or
  2. Masking fields you don’t need at debug time.

This keeps CloudWatch/Datadog bills and log‑processing latency under control.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 853a7ef and c2595f4.

📒 Files selected for processing (37)
  • .vscode/launch.json (1 hunks)
  • apps/webapp/app/components/runs/v3/SpanEvents.tsx (1 hunks)
  • apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts (2 hunks)
  • apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx (1 hunks)
  • apps/webapp/app/runEngine/concerns/errors.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/payloads.server.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/queues.server.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/runChainStates.server.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/runNumbers.server.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/traceEvents.server.ts (1 hunks)
  • apps/webapp/app/runEngine/services/batchTrigger.server.ts (1 hunks)
  • apps/webapp/app/runEngine/services/triggerTask.server.ts (3 hunks)
  • apps/webapp/app/runEngine/types.ts (1 hunks)
  • apps/webapp/app/runEngine/validators/triggerTaskValidator.ts (1 hunks)
  • apps/webapp/app/v3/models/workerDeployment.server.ts (5 hunks)
  • apps/webapp/app/v3/services/triggerTask.server.ts (2 hunks)
  • apps/webapp/test/engine/triggerTask.test.ts (1 hunks)
  • internal-packages/database/prisma/migrations/20250422152423_add_run_chain_state_to_task_run/migration.sql (1 hunks)
  • internal-packages/database/prisma/schema.prisma (1 hunks)
  • internal-packages/run-engine/package.json (1 hunks)
  • internal-packages/run-engine/src/engine/db/worker.ts (1 hunks)
  • internal-packages/run-engine/src/engine/errors.ts (1 hunks)
  • internal-packages/run-engine/src/engine/index.ts (4 hunks)
  • internal-packages/run-engine/src/engine/tests/setup.ts (4 hunks)
  • internal-packages/run-engine/src/engine/types.ts (3 hunks)
  • internal-packages/run-engine/src/index.ts (1 hunks)
  • internal-packages/testcontainers/package.json (1 hunks)
  • internal-packages/testcontainers/src/index.ts (1 hunks)
  • packages/core/src/v3/apiClient/errors.ts (1 hunks)
  • packages/core/src/v3/errors.ts (1 hunks)
  • packages/core/src/v3/links.ts (1 hunks)
  • packages/core/src/v3/schemas/schemas.ts (1 hunks)
  • packages/core/src/v3/workers/taskExecutor.ts (2 hunks)
  • packages/core/test/taskExecutor.test.ts (2 hunks)
  • references/hello-world/src/trigger/deadlocks.ts (1 hunks)
  • references/hello-world/src/trigger/queues.ts (2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (9)
internal-packages/run-engine/src/engine/types.ts (1)
packages/core/src/v3/schemas/schemas.ts (2)
  • RunChainState (301-308)
  • RunChainState (310-310)
packages/core/src/v3/errors.ts (1)
packages/core/src/v3/links.ts (1)
  • links (1-28)
apps/webapp/app/runEngine/validators/triggerTaskValidator.ts (5)
apps/webapp/app/runEngine/types.ts (6)
  • TriggerTaskValidator (112-117)
  • TagValidationParams (84-86)
  • ValidationResult (103-110)
  • EntitlementValidationParams (88-90)
  • MaxAttemptsValidationParams (92-95)
  • ParentRunValidationParams (97-101)
apps/webapp/app/models/taskRunTag.server.ts (1)
  • MAX_TAGS_PER_RUN (6-6)
apps/webapp/app/runEngine/concerns/errors.ts (1)
  • EngineServiceValidationError (1-6)
apps/webapp/app/services/platform.v3.server.ts (1)
  • getEntitlement (328-346)
apps/webapp/app/v3/services/triggerTask.server.ts (2)
  • OutOfEntitlementError (34-38)
  • MAX_ATTEMPTS (45-45)
apps/webapp/app/runEngine/concerns/payloads.server.ts (3)
apps/webapp/app/runEngine/types.ts (2)
  • PayloadProcessor (73-75)
  • TriggerTaskRequest (28-34)
apps/webapp/app/v3/r2.server.ts (1)
  • uploadPacketToObjectStore (22-62)
apps/webapp/app/runEngine/concerns/errors.ts (1)
  • EngineServiceValidationError (1-6)
packages/core/test/taskExecutor.test.ts (2)
packages/core/src/v3/lifecycle-hooks-api.ts (1)
  • lifecycleHooks (5-5)
packages/core/src/v3/workers/taskExecutor.ts (1)
  • result (1139-1186)
apps/webapp/app/v3/models/workerDeployment.server.ts (3)
packages/core/src/v3/isomorphic/consts.ts (2)
  • CURRENT_DEPLOYMENT_LABEL (1-1)
  • CURRENT_UNMANAGED_DEPLOYMENT_LABEL (2-2)
internal-packages/run-engine/src/engine/tests/setup.ts (1)
  • AuthenticatedEnvironment (16-18)
apps/webapp/app/services/apiAuth.server.ts (1)
  • AuthenticatedEnvironment (30-33)
apps/webapp/app/runEngine/concerns/queues.server.ts (8)
apps/webapp/app/runEngine/types.ts (5)
  • QueueManager (63-71)
  • TriggerTaskRequest (28-34)
  • LockedBackgroundWorker (57-60)
  • QueueProperties (52-55)
  • QueueValidationResult (42-50)
apps/webapp/app/v3/runEngine.server.ts (1)
  • engine (9-9)
apps/webapp/app/runEngine/concerns/errors.ts (1)
  • EngineServiceValidationError (1-6)
apps/webapp/app/v3/services/triggerTaskV1.server.ts (1)
  • taskId (671-718)
apps/webapp/app/v3/models/workerDeployment.server.ts (1)
  • findCurrentWorkerFromEnvironment (197-223)
internal-packages/run-engine/src/engine/tests/setup.ts (1)
  • AuthenticatedEnvironment (16-18)
apps/webapp/app/v3/queueSizeLimits.server.ts (1)
  • guardQueueSizeLimitsForEnv (11-34)
apps/webapp/app/v3/services/worker/workerGroupService.server.ts (1)
  • WorkerGroupService (7-269)
apps/webapp/app/runEngine/concerns/runNumbers.server.ts (2)
apps/webapp/app/runEngine/types.ts (2)
  • RunNumberIncrementer (77-82)
  • TriggerTaskRequest (28-34)
apps/webapp/app/services/autoIncrementCounter.server.ts (1)
  • autoIncrementCounter (63-63)
apps/webapp/app/runEngine/concerns/traceEvents.server.ts (3)
apps/webapp/app/runEngine/types.ts (3)
  • TraceEventConcern (131-146)
  • TriggerTaskRequest (28-34)
  • TracedEventSpan (119-129)
apps/webapp/app/v3/eventRepository.server.ts (2)
  • EventRepository (189-1262)
  • eventRepository (1264-1264)
packages/core/src/v3/isomorphic/friendlyId.ts (1)
  • BatchId (96-96)
⏰ Context from checks skipped due to timeout of 90000ms (6)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: units / 🧪 Unit Tests
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (43)
internal-packages/testcontainers/package.json (1)

23-23: Trivial change, no functional impact.

The only change here is the removal of the trailing newline at the end of the file, which does not affect the package configuration or behavior.

packages/core/src/v3/links.ts (1)

21-21: Documentation link added for deadlock detection feature.

This addition provides a helpful reference link for the new deadlock detection feature mentioned in the PR title. It's properly placed in the concurrency section of documentation links.

internal-packages/run-engine/src/engine/errors.ts (1)

83-88: New error class for one-time use token validation.

This error class follows the same pattern as existing error classes and provides a more semantic error type for token validation failures, improving error handling compared to catching generic Prisma constraint errors.

apps/webapp/app/runEngine/concerns/errors.ts (1)

1-6: Well-structured validation error class for run engine services.

This new error class is properly implemented with support for HTTP status codes, which aligns with its usage in API routes. The class follows TypeScript best practices and will help standardize validation error handling across the run engine.

internal-packages/testcontainers/src/index.ts (1)

64-72: Improved Prisma initialization with better logging.

Extracting the database URL to a local variable improves code readability, and adding a console log helps with debugging test database connections. These are good improvements for test infrastructure.

internal-packages/database/prisma/schema.prisma (1)

1856-1858: Addition of runChainState field for deadlock detection.

This new optional runChainState JSON field in the TaskRun model aligns perfectly with the concurrency control and deadlock detection features introduced in the run engine. Using a nullable Json? preserves backward compatibility with existing runs.

internal-packages/database/prisma/migrations/20250422152423_add_run_chain_state_to_task_run/migration.sql (1)

2-2: Add runChainState JSONB column to TaskRun table.

The migration correctly introduces the optional runChainState column. Ensure your deployment pipeline applies this migration before rolling out the updated application to avoid schema mismatches.

apps/webapp/app/runEngine/services/batchTrigger.server.ts (1)

21-21: Reposition import of startActiveSpan.

The startActiveSpan import has been moved to group it with other tracing-related imports, improving consistency with tracing concerns in this service.

.vscode/launch.json (1)

28-29: Update debug configuration for triggerTask.test.ts.

The launch target and command have been updated to point to the new triggerTask.test.ts suite, ensuring the debugger runs the correct test file for the trigger task service.

internal-packages/run-engine/src/index.ts (1)

2-2: Export new RunOneTimeUseTokenError.

Exporting RunOneTimeUseTokenError alongside RunDuplicateIdempotencyKeyError provides upstream services with more precise error handling for one-time token scenarios.

apps/webapp/app/components/runs/v3/SpanEvents.tsx (1)

86-92: Improved error message formatting with preserved whitespace

The error message is now wrapped in a <pre> element with appropriate styling, which preserves whitespace and improves readability with consistent font styling and color.

internal-packages/run-engine/src/engine/db/worker.ts (1)

164-166: Enhanced queue selection logic for locked queues

The queue selection now prioritizes finding queues by lockedQueueId if present, falling back to the name-based lookup. This change properly supports the new locked queue feature for deadlock detection and concurrency management.

packages/core/src/v3/schemas/schemas.ts (1)

301-310: Well-structured schema for run chain concurrency state

The new RunChainState schema provides a clear structure for tracking concurrency state in run chains, including which queues are holding concurrent executions and environment-level concurrency. The optional fields allow for flexibility while maintaining structure.

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx (1)

811-817: Consistent error message formatting improvements

Similar to the change in SpanEvents.tsx, error messages are now wrapped in a <pre> element with styling that preserves whitespace and provides consistent formatting. This ensures better readability of error details, especially for structured errors related to deadlocks and concurrency issues.

internal-packages/run-engine/src/engine/tests/setup.ts (4)

14-14: Good change to use type-only import

Changing to a type-only import is appropriate since RunEngine is only used as a type in function parameters, which helps with tree-shaking and prevents potential circular dependencies.


33-44: Good addition of worker group setup

Adding the worker instance group creation is essential for properly testing the run chain state management and queue locking features. This provides the necessary infrastructure for the tests to validate the deadlock detection capabilities.


53-53: Correctly associates project with worker group

Setting the project's defaultWorkerGroupId to the created worker group's ID establishes the necessary relationship between projects and worker groups, which is required for the run engine's queue management features.


173-177: Good explicit task-queue relationships

Explicitly connecting tasks to their queues by ID during both create and update operations ensures proper relational integrity in the test database schema. This is important for testing queue locking and concurrency management features.

Also applies to: 189-193

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts (2)

10-10: Good import of new error type

Adding the import for EngineServiceValidationError from the run engine concerns aligns with the modular refactoring approach of the PR.


121-122: Enhanced error handling for validation errors

The addition of error handling for EngineServiceValidationError properly surfaces validation errors from the run engine with appropriate status codes. This is important for the deadlock detection feature to provide meaningful error responses to API clients.

packages/core/src/v3/errors.ts (1)

693-706: Excellent deadlock error enhancement

This addition provides user-friendly error handling for concurrency deadlocks by:

  1. Detecting errors with messages that start with "Deadlock detected:"
  2. Transforming them into a more readable "Concurrency Deadlock Error"
  3. Including a link to relevant documentation

This improves developer experience by making deadlock errors more understandable and providing guidance on how to resolve them.

internal-packages/run-engine/src/engine/types.ts (3)

4-10: Good import organization

The expanded import to include RunChainState is organized in a clean, alphabetical manner. This properly imports the type needed for deadlock detection functionality.


93-93: Good addition of lockedQueueId

Adding the optional lockedQueueId property to TriggerParams enables queue locking functionality, which is an important aspect of the deadlock detection feature. This property allows tasks to be directed to specific queues, supporting more predictable concurrency control.


121-121: Good addition of runChainState

The optional runChainState property in TriggerParams is essential for tracking concurrency state across queues and environments. This enables the run engine to detect and prevent deadlocks by maintaining awareness of resource allocation throughout the run chain.

apps/webapp/app/runEngine/concerns/runNumbers.server.ts (1)

9-12: Key prefix hard‑coded to "v3" – verify it matches the new engine version

The increment key is hard‑coded as v3-run:${request.environment.id}:${request.taskId} even though this PR introduces v4‑specific functionality (deadlock detection, new run‑chain state column, etc.).
If the run‑number counter is supposed to be version‑scoped, using the wrong prefix can mix counters across versions. Please double‑check the expected namespace and either:

-      `v3-run:${request.environment.id}:${request.taskId}`,
+      `v4-run:${request.environment.id}:${request.taskId}`,

or extract the prefix to a shared constant so it can’t drift.

internal-packages/run-engine/package.json (1)

15-19: Confirm the build emits the new tests files into dist/

The new sub‑path export maps runtime and type files to:

./dist/src/engine/tests/setup.{js,d.ts}

Please ensure tsconfig.build.json includes src/engine/tests/**/* in its include patterns; otherwise these files won’t be present after pnpm run build, causing consumers to fail at runtime.

references/hello-world/src/trigger/deadlocks.ts (1)

21-25: Demonstration task creates an intentional deadlock – add safeguard for production

This sample mutually triggers deadlockTester and deadlockNestedTask on a single‑slot queue with releaseConcurrencyOnWaitpoint: false, guaranteeing a deadlock. That’s great for testing, but if someone accidentally deploys it in a non‑test project it will consume a worker indefinitely.

Consider adding a prominent comment or a runtime guard (e.g., throw if process.env.NODE_ENV === "production") to avoid surprises.

packages/core/src/v3/workers/taskExecutor.ts (2)

1024-1035: Improved error handling for non-retryable client errors

The addition of this early exit check for client errors (HTTP 400-499, excluding 408 and 429) enhances the retry logic by preventing unnecessary retry attempts for errors that are unlikely to be resolved by retrying.

This change aligns well with the rest of the error handling strategy and will help reduce resource waste on unproductive retries.


1054-1057: Refined rate limit retry delay logic

The updated logic only sets a custom delay when the rate limit error includes a valid reset time. This is more robust than the previous implementation, as it falls back to the default delay calculation when the rate limit response doesn't include the expected reset time header.

internal-packages/run-engine/src/engine/index.ts (4)

339-340: Added support for locked queue IDs

This new parameter supports the deadlock detection feature by allowing tasks to be locked to a specific queue ID.


364-365: Added support for run chain state

This new parameter enables the engine to track the state of run chains, which is essential for deadlock detection functionality.


420-421: Persisting lockedQueueId in task run data

The lockedQueueId is now properly persisted in the database when creating a task run, allowing for consistent queue locking behavior.


450-451: Persisting runChainState in task run data

The runChainState is now stored with the task run, enabling deadlock detection and state management across the run chain.

references/hello-world/src/trigger/queues.ts (3)

239-247: New task for testing locked queue behavior

The lockedQueueTask demonstrates how to handle non-existent queue scenarios, which is useful for testing error handling in the queue locking system. However, this implementation intentionally triggers errors by using a non-existent queue name.

Ensure this is intended behavior for testing or demonstration purposes, as it will generate runtime errors when executed.


249-254: Added simple child task for queue testing

This straightforward implementation provides a target for the locked queue tests.


256-264: New task for testing non-existent task identifiers

Similar to lockedQueueTask, this task intentionally triggers errors by trying to use a non-existent task identifier. This is useful for testing error handling, but also note that it will generate runtime errors.

Verify this is intended behavior for testing the system's response to invalid task identifiers. Consider adding a comment explaining this is for testing error handling.

packages/core/src/v3/apiClient/errors.ts (2)

31-39: Enhanced error message extraction

Added explicit handling for when the error is a string, improving the error message construction logic. This change ensures proper extraction of error messages regardless of their format.


41-55: Improved error message fallback logic

The refactored code provides a clearer priority order for constructing error messages, using a more readable if-then-else structure instead of nested ternary expressions. This makes the code more maintainable and the error message construction more predictable.

apps/webapp/app/runEngine/validators/triggerTaskValidator.ts (1)

59-71: Clarify whether attempt is 0‑ or 1‑based

attempt > MAX_ATTEMPTS will allow exactly MAX_ATTEMPTS attempts.
If the caller is 0‑based this off‑by‑one blocks one extra try.
Recommend renaming to attemptNumber and documenting expected base, or changing the comparison to >=.

-    if (attempt > MAX_ATTEMPTS) {
+    if (attempt >= MAX_ATTEMPTS) {
apps/webapp/app/runEngine/concerns/traceEvents.server.ts (1)

96-110: recordEvent failure isn’t isolated – add error handling

If eventRepository.recordEvent fails (network/db hiccup) the whole trace callback will reject, bubbling the error to the caller and turning a cache‑hit into a 500. Consider isolating the side‑effect:

- await this.eventRepository.recordEvent(
+ try {
+   await this.eventRepository.recordEvent(
       `There's an existing run for idempotencyKey: ${idempotencyKey}`,
       /* … */
     );
+ } catch (err) {
+   // best‑effort – don’t block the cached run path
+   logger.error("Failed to record idempotent‑run note", err);
+ }

Please double‑check with ops whether losing this auxiliary event is acceptable or whether the error should be re‑thrown.

apps/webapp/app/runEngine/services/triggerTask.server.ts (1)

240-329: Side‑effects before the payload packet is materialised can skew spans

event.setAttribute("queueName", …) and incrementRunNumber occur before the (async) payloadProcessor.process() completes.
If that promise hangs or fails, the span already advertises queue info although no run exists.

Consider moving the side‑effects after payloadProcessor.process resolves to ensure attributes belong to a valid run.

- event.setAttribute("queueName", queueName);
- span.setAttribute("queueName", queueName);
-
- const payloadPacket = await this.payloadProcessor.process(triggerRequest);
+ const payloadPacket = await this.payloadProcessor.process(triggerRequest);
+ event.setAttribute("queueName", queueName);
+ span.setAttribute("queueName", queueName);
apps/webapp/app/runEngine/concerns/queues.server.ts (1)

122-174: Un‑sanitised external input when queue?.name is provided

When the caller supplies options.queue.name, getQueueName returns the value verbatim (line 126) skipping sanitisation, but later resolveQueueProperties relies on getQueueName for non‑locked paths and only sanitises in the outer method. That’s okay.
However, if getQueueName is used directly elsewhere we may push unsafe names into Redis. Consider always sanitising at the boundary (inside getQueueName).

apps/webapp/app/runEngine/concerns/runChainStates.server.ts (1)

80-81: Confirm there are no unwanted shared‑reference side effects after mutating parentQueueState.holding

parentQueueState is mutated in‑place:

parentQueueState.holding += 1;

If the object originates from runChainState.concurrency?.queues, the mutation also updates the already‑stored state before you filter it out and re‑insert it. That’s currently benign, but if another part of the function throws between the mutation and the re‑assignment the parent state will be left in a partially modified state.

Please double‑check that:

  1. No code between the mutation and the re‑assignment can throw, and
  2. parentRunChainState is never reused after this method returns.

If either condition might be violated, clone parentQueueState before mutating.

Also applies to: 108-109

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (3)
apps/webapp/app/runEngine/services/triggerTask.server.ts (2)

162-168: ⚠️ Potential issue

Cached idempotent runs are returned without a trace span

The early‑return branch still shortcuts the method before traceEventConcern.traceIdempotentRun() is invoked, so “cached” executions never appear in the traces/telemetry dashboards.

The same issue was raised previously and remains unresolved.

-      if (idempotencyKeyConcernResult.isCached) {
-        return idempotencyKeyConcernResult;
-      }
+      if (idempotencyKeyConcernResult.isCached) {
+        return await this.traceEventConcern.traceIdempotentRun(
+          triggerRequest,
+          {
+            existingRun: idempotencyKeyConcernResult.run,
+            idempotencyKey: options?.idempotencyKey ?? "",
+            incomplete: false,
+            isError: Boolean(idempotencyKeyConcernResult.error),
+          },
+          async () => idempotencyKeyConcernResult
+        );
+      }

331-334: ⚠️ Potential issue

Unbounded recursion on duplicate idempotency keys

RunDuplicateIdempotencyKeyError retries via this.call() with an incremented attempt but with no hard ceiling.
If the race condition never resolves the stack will overflow and/or the function will loop indefinitely.

Previous feedback flagged this – please add a bounded retry or back‑off loop.

const MAX_DUPLICATE_RETRIES = 3;
if (error instanceof RunDuplicateIdempotencyKeyError && attempt < MAX_DUPLICATE_RETRIES) {
   return await this.call({ taskId, environment, body, options, attempt: attempt + 1 });
}
apps/webapp/test/engine/triggerTask.test.ts (1)

108-200: 🛠️ Refactor suggestion

Ensure engine.quit() always runs to avoid hanging test runners

engine.quit() is only called on the happy path. If an assertion fails the Redis & worker pools stay alive and Vitest hangs.
Please wrap each containerTest body in try / finally (or use an afterEach hook) to guarantee clean‑up, as suggested in the earlier review.

containerTest("should trigger a task with minimal options", async ({ prisma, redisOptions }) => {
-  const engine = new RunEngine({...});
-  ...
-  expect(queueLength).toBe(1);
-  await engine.quit();
+  const engine = new RunEngine({...});
+  try {
+    ...
+    expect(queueLength).toBe(1);
+  } finally {
+    await engine.quit().catch(() => {});
+  }
});

Also applies to: 202-314, 318-478, 482-638, 641-839, 842-968, 971-1099, 1101-1254

🧹 Nitpick comments (4)
apps/webapp/app/runEngine/concerns/traceEvents.server.ts (1)

49-50: Avoid the as any cast when forwarding attributes

Both callback wrappers rely on event.setAttribute(key as any, value) which circumvents type‑safety and risks silently accepting unsupported attribute types.
Expose a strongly‑typed helper on TracedEventSpan (e.g. setInternalAttribute<T extends keyof InternalEventAttributes>(key: T, value: InternalEventAttributes[T])) or overload setAttribute directly so that callers get compile‑time feedback.

This minor tweak tightens contracts without affecting runtime behaviour.

Also applies to: 116-117

apps/webapp/app/runEngine/concerns/runChainStates.server.ts (3)

146-149: Good fix for unlimited concurrency check, but consider adding a comment.

You've correctly fixed the issue mentioned in the previous review by checking request.environment.maximumConcurrencyLimit > 0. This prevents false deadlock detection when the environment has unlimited concurrency.

if (
  request.environment.maximumConcurrencyLimit > 0 &&
+ // Only check for deadlocks if there's a limit (0 means unlimited)
  environmentConcurrency > request.environment.maximumConcurrencyLimit
) {

Consider adding a comment explaining that 0 means unlimited concurrency for better code readability and maintainability.


246-248: Consider using a more specific error type.

Instead of using a generic Error, consider using the EngineServiceValidationError class that's already imported and used elsewhere in this file. This would make error handling more consistent.

- throw new Error("Deadlock detection failed, parent queue not found");
+ throw new EngineServiceValidationError("Deadlock detection failed, parent queue not found");

61-68: Consider consistent error handling for parent runs without locked queues.

Currently, the code logs an error and returns an empty state when the parent run has no locked queue. For consistency with other error handling in this class, consider throwing an EngineServiceValidationError instead.

if (!parentLockedQueueId) {
  logger.error("Parent run has no locked queue, cannot determine run chain state", {
    runId: parentRun.id,
    runState: parentRun.runChainState,
  });

-  return {};
+  throw new EngineServiceValidationError("Parent run has no locked queue, cannot determine run chain state");
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c2595f4 and 4ed9286.

📒 Files selected for processing (37)
  • .vscode/launch.json (1 hunks)
  • apps/webapp/app/components/runs/v3/SpanEvents.tsx (1 hunks)
  • apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts (2 hunks)
  • apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx (1 hunks)
  • apps/webapp/app/runEngine/concerns/errors.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/payloads.server.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/queues.server.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/runChainStates.server.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/runNumbers.server.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/traceEvents.server.ts (1 hunks)
  • apps/webapp/app/runEngine/services/batchTrigger.server.ts (1 hunks)
  • apps/webapp/app/runEngine/services/triggerTask.server.ts (3 hunks)
  • apps/webapp/app/runEngine/types.ts (1 hunks)
  • apps/webapp/app/runEngine/validators/triggerTaskValidator.ts (1 hunks)
  • apps/webapp/app/v3/models/workerDeployment.server.ts (5 hunks)
  • apps/webapp/app/v3/services/triggerTask.server.ts (2 hunks)
  • apps/webapp/test/engine/triggerTask.test.ts (1 hunks)
  • internal-packages/database/prisma/migrations/20250422152423_add_run_chain_state_to_task_run/migration.sql (1 hunks)
  • internal-packages/database/prisma/schema.prisma (1 hunks)
  • internal-packages/run-engine/package.json (1 hunks)
  • internal-packages/run-engine/src/engine/db/worker.ts (1 hunks)
  • internal-packages/run-engine/src/engine/errors.ts (1 hunks)
  • internal-packages/run-engine/src/engine/index.ts (4 hunks)
  • internal-packages/run-engine/src/engine/tests/setup.ts (4 hunks)
  • internal-packages/run-engine/src/engine/types.ts (3 hunks)
  • internal-packages/run-engine/src/index.ts (1 hunks)
  • internal-packages/testcontainers/package.json (1 hunks)
  • internal-packages/testcontainers/src/index.ts (1 hunks)
  • packages/core/src/v3/apiClient/errors.ts (1 hunks)
  • packages/core/src/v3/errors.ts (1 hunks)
  • packages/core/src/v3/links.ts (1 hunks)
  • packages/core/src/v3/schemas/schemas.ts (1 hunks)
  • packages/core/src/v3/workers/taskExecutor.ts (2 hunks)
  • packages/core/test/taskExecutor.test.ts (2 hunks)
  • references/hello-world/src/trigger/deadlocks.ts (1 hunks)
  • references/hello-world/src/trigger/queues.ts (2 hunks)
✅ Files skipped from review due to trivial changes (4)
  • packages/core/src/v3/links.ts
  • apps/webapp/app/runEngine/services/batchTrigger.server.ts
  • internal-packages/run-engine/src/engine/types.ts
  • internal-packages/run-engine/src/engine/index.ts
🚧 Files skipped from review as they are similar to previous changes (29)
  • apps/webapp/app/runEngine/concerns/errors.ts
  • internal-packages/testcontainers/package.json
  • internal-packages/database/prisma/migrations/20250422152423_add_run_chain_state_to_task_run/migration.sql
  • .vscode/launch.json
  • internal-packages/testcontainers/src/index.ts
  • apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
  • apps/webapp/app/components/runs/v3/SpanEvents.tsx
  • internal-packages/run-engine/src/engine/errors.ts
  • references/hello-world/src/trigger/deadlocks.ts
  • internal-packages/run-engine/src/index.ts
  • packages/core/src/v3/errors.ts
  • internal-packages/run-engine/package.json
  • apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx
  • packages/core/src/v3/schemas/schemas.ts
  • packages/core/src/v3/workers/taskExecutor.ts
  • references/hello-world/src/trigger/queues.ts
  • apps/webapp/app/v3/services/triggerTask.server.ts
  • apps/webapp/app/v3/models/workerDeployment.server.ts
  • packages/core/src/v3/apiClient/errors.ts
  • internal-packages/run-engine/src/engine/db/worker.ts
  • internal-packages/database/prisma/schema.prisma
  • apps/webapp/app/runEngine/concerns/payloads.server.ts
  • packages/core/test/taskExecutor.test.ts
  • internal-packages/run-engine/src/engine/tests/setup.ts
  • apps/webapp/app/runEngine/validators/triggerTaskValidator.ts
  • apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts
  • apps/webapp/app/runEngine/concerns/runNumbers.server.ts
  • apps/webapp/app/runEngine/types.ts
  • apps/webapp/app/runEngine/concerns/queues.server.ts
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: units / 🧪 Unit Tests
  • GitHub Check: typecheck / typecheck
🔇 Additional comments (3)
apps/webapp/app/runEngine/concerns/runChainStates.server.ts (3)

183-187: Well-implemented queue concurrency limit check.

The queue concurrency check properly handles the case where a concurrency limit of 0 means unlimited concurrency. This is consistent with the environment concurrency check and follows best practices.


257-275: Excellent error message for deadlock situations.

The deadlock error message is comprehensive and educational. It clearly explains:

  • What a deadlock is
  • The current situation that caused it
  • Common scenarios that lead to deadlocks
  • Multiple approaches to resolve the issue

This is an excellent example of a user-friendly error message that helps developers understand and solve the problem.


1-276: Overall well-designed deadlock detection implementation.

The implementation of the deadlock detection mechanism is thorough and addresses the core issue described in the PR objectives. Key strengths include:

  1. Clear separation of concerns with well-named methods
  2. Proper handling of both environment and queue concurrency limits
  3. Consideration of concurrency release settings
  4. Comprehensive error messaging

The code successfully propagates the run chain state through the task hierarchy and accounts for concurrency held in queues along the direct ancestor chain, which was the main goal of this PR.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
apps/webapp/app/runEngine/services/triggerTask.server.ts (2)

162-168: isCached branch still skips tracing – duplicate of earlier feedback

The early-return path bypasses traceEventConcern.traceIdempotentRun(), so cached executions remain invisible to observability dashboards, exactly as pointed out in the previous review.
Please wrap the result in the tracing helper.


331-334: Unbounded recursion when the duplicate-idempotency race persists

The retry still recurses indefinitely if the key collision never resolves, risking stack overflow / lambda timeout. A bounded loop or MAX_DUPLICATE_RETRIES guard proposed earlier is still required.

apps/webapp/app/runEngine/concerns/runChainStates.server.ts (1)

146-152: Fixed the unlimited environment concurrency check.

You've correctly addressed the previous issue with environment concurrency limits by checking if maximumConcurrencyLimit > 0 before comparing with environmentConcurrency. This properly handles the case where 0 means unlimited concurrency.

🧹 Nitpick comments (5)
apps/webapp/test/engine/triggerTask.test.ts (1)

559-561: Remove stray console.log from test output

console.log(result); pollutes test logs and slows CI. Drop or convert to a logger.debug behind a verbose flag.

-      console.log(result);
apps/webapp/app/runEngine/concerns/queues.server.ts (3)

155-170: Use structured logger instead of console.log for consistency

console.log is used twice while the rest of the codebase relies on logger.
This bypasses log formatting & log-level control.

-      console.log("Failed to get queue name: No task found", {
+      logger.debug("Failed to get queue name: No task found", {
         taskId,
         environmentId: environment.id,
       });

Apply the same change in the following console.log block.


176-194: lengthOfEnvQueue can be expensive – consider caching or rate-limiting

validateQueueLimits() calls engine.lengthOfEnvQueue() on every trigger, which may perform a full scan on Redis for large installations.
If queue size rarely changes, cache the value for a short TTL or batch calls per request cycle to reduce load.


126-129: Sanitize user-supplied queue names before early return

When a caller specifies options.queue.name, the raw value is returned immediately.
Although later resolveQueueProperties() re-sanitises, direct callers of getQueueName() (other concerns/tests) might skip that layer.

-    if (queue?.name) {
-      return queue.name;
+    if (queue?.name) {
+      return sanitizeQueueName(queue.name);
     }
apps/webapp/app/runEngine/concerns/runChainStates.server.ts (1)

231-255: Potential unhandled error in parent queue lookup.

If the parent queue isn't found, you throw a generic error. Consider enhancing this with more context to aid debugging.

-      throw new Error("Deadlock detection failed, parent queue not found");
+      throw new Error(`Deadlock detection failed, parent queue not found: ${parentLockedQueueId}`);
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 76e8aa1 and 7f46a27.

📒 Files selected for processing (37)
  • .vscode/launch.json (1 hunks)
  • apps/webapp/app/components/runs/v3/SpanEvents.tsx (1 hunks)
  • apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts (2 hunks)
  • apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx (1 hunks)
  • apps/webapp/app/runEngine/concerns/errors.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/payloads.server.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/queues.server.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/runChainStates.server.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/runNumbers.server.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/traceEvents.server.ts (1 hunks)
  • apps/webapp/app/runEngine/services/batchTrigger.server.ts (1 hunks)
  • apps/webapp/app/runEngine/services/triggerTask.server.ts (3 hunks)
  • apps/webapp/app/runEngine/types.ts (1 hunks)
  • apps/webapp/app/runEngine/validators/triggerTaskValidator.ts (1 hunks)
  • apps/webapp/app/v3/models/workerDeployment.server.ts (5 hunks)
  • apps/webapp/app/v3/services/triggerTask.server.ts (2 hunks)
  • apps/webapp/test/engine/triggerTask.test.ts (1 hunks)
  • internal-packages/database/prisma/migrations/20250422152423_add_run_chain_state_to_task_run/migration.sql (1 hunks)
  • internal-packages/database/prisma/schema.prisma (1 hunks)
  • internal-packages/run-engine/package.json (1 hunks)
  • internal-packages/run-engine/src/engine/db/worker.ts (1 hunks)
  • internal-packages/run-engine/src/engine/errors.ts (1 hunks)
  • internal-packages/run-engine/src/engine/index.ts (4 hunks)
  • internal-packages/run-engine/src/engine/tests/setup.ts (4 hunks)
  • internal-packages/run-engine/src/engine/types.ts (3 hunks)
  • internal-packages/run-engine/src/index.ts (1 hunks)
  • internal-packages/testcontainers/package.json (1 hunks)
  • internal-packages/testcontainers/src/index.ts (1 hunks)
  • packages/core/src/v3/apiClient/errors.ts (1 hunks)
  • packages/core/src/v3/errors.ts (1 hunks)
  • packages/core/src/v3/links.ts (1 hunks)
  • packages/core/src/v3/schemas/schemas.ts (1 hunks)
  • packages/core/src/v3/workers/taskExecutor.ts (2 hunks)
  • packages/core/test/taskExecutor.test.ts (2 hunks)
  • references/hello-world/src/trigger/deadlocks.ts (1 hunks)
  • references/hello-world/src/trigger/queues.ts (2 hunks)
✅ Files skipped from review due to trivial changes (2)
  • internal-packages/run-engine/src/engine/errors.ts
  • apps/webapp/app/runEngine/services/batchTrigger.server.ts
🚧 Files skipped from review as they are similar to previous changes (31)
  • packages/core/src/v3/links.ts
  • internal-packages/run-engine/src/index.ts
  • internal-packages/testcontainers/package.json
  • .vscode/launch.json
  • internal-packages/database/prisma/migrations/20250422152423_add_run_chain_state_to_task_run/migration.sql
  • apps/webapp/app/components/runs/v3/SpanEvents.tsx
  • internal-packages/run-engine/src/engine/tests/setup.ts
  • apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx
  • internal-packages/database/prisma/schema.prisma
  • internal-packages/testcontainers/src/index.ts
  • packages/core/src/v3/apiClient/errors.ts
  • internal-packages/run-engine/src/engine/db/worker.ts
  • packages/core/src/v3/schemas/schemas.ts
  • apps/webapp/app/runEngine/concerns/errors.ts
  • apps/webapp/app/v3/services/triggerTask.server.ts
  • internal-packages/run-engine/package.json
  • references/hello-world/src/trigger/deadlocks.ts
  • apps/webapp/app/runEngine/concerns/runNumbers.server.ts
  • packages/core/src/v3/workers/taskExecutor.ts
  • packages/core/src/v3/errors.ts
  • apps/webapp/app/runEngine/concerns/payloads.server.ts
  • internal-packages/run-engine/src/engine/index.ts
  • packages/core/test/taskExecutor.test.ts
  • apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
  • internal-packages/run-engine/src/engine/types.ts
  • apps/webapp/app/runEngine/validators/triggerTaskValidator.ts
  • apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts
  • apps/webapp/app/runEngine/concerns/traceEvents.server.ts
  • references/hello-world/src/trigger/queues.ts
  • apps/webapp/app/v3/models/workerDeployment.server.ts
  • apps/webapp/app/runEngine/types.ts
🧰 Additional context used
🧬 Code Graph Analysis (2)
apps/webapp/app/runEngine/concerns/queues.server.ts (8)
apps/webapp/app/runEngine/types.ts (5)
  • QueueManager (63-71)
  • TriggerTaskRequest (28-34)
  • LockedBackgroundWorker (57-60)
  • QueueProperties (52-55)
  • QueueValidationResult (42-50)
apps/webapp/app/v3/runEngine.server.ts (1)
  • engine (9-9)
apps/webapp/app/runEngine/concerns/errors.ts (1)
  • EngineServiceValidationError (1-6)
apps/webapp/app/v3/services/triggerTaskV1.server.ts (1)
  • taskId (671-718)
apps/webapp/app/v3/models/workerDeployment.server.ts (1)
  • findCurrentWorkerFromEnvironment (197-223)
internal-packages/run-engine/src/engine/tests/setup.ts (1)
  • AuthenticatedEnvironment (16-18)
apps/webapp/app/v3/queueSizeLimits.server.ts (1)
  • guardQueueSizeLimitsForEnv (11-34)
apps/webapp/app/v3/services/worker/workerGroupService.server.ts (1)
  • WorkerGroupService (7-269)
apps/webapp/app/runEngine/concerns/runChainStates.server.ts (3)
apps/webapp/app/runEngine/types.ts (2)
  • RunChainStateManager (148-153)
  • TriggerTaskRequest (28-34)
packages/core/src/v3/schemas/schemas.ts (2)
  • RunChainState (301-308)
  • RunChainState (310-310)
apps/webapp/app/runEngine/concerns/errors.ts (1)
  • EngineServiceValidationError (1-6)
⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: units / 🧪 Unit Tests
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (8)
apps/webapp/app/runEngine/concerns/runChainStates.server.ts (8)

1-14: Well-structured class initialization with feature flag support.

The DefaultRunChainStateManager is cleanly defined with proper dependency injection for the Prisma client and the concurrency release feature flag. This approach allows for flexible configuration and easier testing.


16-48: Good defensive programming with validation of parent run chain state.

The method properly handles edge cases:

  1. Returns empty state when there's no parent run
  2. Safely parses parent run chain state with error logging
  3. Respects the resumeParentOnCompletion flag

This ensures robust behavior even with corrupted or missing data.


50-68: Proper validation of parent locked queue ID.

The code correctly validates the existence of a parent locked queue ID before proceeding with concurrency calculations. The error logging provides helpful context for troubleshooting.


75-98: Concurrency management with feature flag control.

The code handles both the legacy and new concurrency release behaviors through the feature flag, ensuring backward compatibility while allowing for the new functionality.


100-127: Well-implemented selective concurrency increment based on release settings.

The code only increments the queue's holding count when the parent won't release concurrency, which effectively prevents unnecessary concurrency reservation.


183-190: Proper queue concurrency limit check with unlimited support.

The code correctly handles the case where queueConcurrencyLimit === 0 means unlimited by including an explicit check, preventing false positives in deadlock detection.


195-229: Comprehensive concurrency release determination logic.

The method effectively determines if concurrency should be released by:

  1. First checking explicit request options
  2. Then falling back to queue configuration
  3. Handling undefined/null concurrency limits
  4. Providing a sensible default

This prioritization gives callers control while maintaining consistent behavior.


257-275: Excellent error message with clear explanation and actionable suggestions.

The deadlock error message is comprehensive and helpful:

  1. Explains what a deadlock is
  2. Describes the specific situation
  3. Lists common causes
  4. Provides multiple remediation options

This will significantly improve the developer experience when encountering this error.

Comment on lines +108 to +201
describe("RunEngineTriggerTaskService", () => {
containerTest("should trigger a task with minimal options", async ({ prisma, redisOptions }) => {
const engine = new RunEngine({
prisma,
worker: {
redis: redisOptions,
workers: 1,
tasksPerWorker: 10,
pollIntervalMs: 100,
},
queue: {
redis: redisOptions,
},
runLock: {
redis: redisOptions,
},
machines: {
defaultMachine: "small-1x",
machines: {
"small-1x": {
name: "small-1x" as const,
cpu: 0.5,
memory: 0.5,
centsPerMs: 0.0001,
},
},
baseCostInCents: 0.0005,
},
tracer: trace.getTracer("test", "0.0.0"),
});

const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");

const taskIdentifier = "test-task";

//create background worker
await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier);

const queuesManager = new DefaultQueueManager(prisma, engine);

const idempotencyKeyConcern = new IdempotencyKeyConcern(
prisma,
engine,
new MockTraceEventConcern()
);

const runChainStateManager = new DefaultRunChainStateManager(prisma, true);

const triggerTaskService = new RunEngineTriggerTaskService({
engine,
prisma,
runNumberIncrementer: new MockRunNumberIncrementer(),
payloadProcessor: new MockPayloadProcessor(),
queueConcern: queuesManager,
idempotencyKeyConcern,
validator: new MockTriggerTaskValidator(),
traceEventConcern: new MockTraceEventConcern(),
runChainStateManager,
tracer: trace.getTracer("test", "0.0.0"),
});

const result = await triggerTaskService.call({
taskId: taskIdentifier,
environment: authenticatedEnvironment,
body: { payload: { test: "test" } },
});

expect(result).toBeDefined();
expect(result?.run.friendlyId).toBeDefined();
expect(result?.run.status).toBe("PENDING");
expect(result?.isCached).toBe(false);

const run = await prisma.taskRun.findUnique({
where: {
id: result?.run.id,
},
});

expect(run).toBeDefined();
expect(run?.friendlyId).toBe(result?.run.friendlyId);
expect(run?.engine).toBe("V2");
expect(run?.queuedAt).toBeDefined();
expect(run?.queue).toBe(`task/${taskIdentifier}`);

// Lets make sure the task is in the queue
const queueLength = await engine.runQueue.lengthOfQueue(
authenticatedEnvironment,
`task/${taskIdentifier}`
);
expect(queueLength).toBe(1);

await engine.quit();
});

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure engine.quit() always runs to avoid hanging test runners

engine.quit() is invoked only on the happy path; an assertion failure will leak Redis connections and leave Vitest hanging.
Wrap each containerTest body in try / finally, or register an afterEach() that gracefully stops the engine.

-containerTest("should trigger a task with minimal options", async ({ prisma, redisOptions }) => {
-  const engine = new RunEngine({ /* … */ });
-  /* tests */
-  await engine.quit();
-});
+containerTest("should trigger a task with minimal options", async ({ prisma, redisOptions }) => {
+  const engine = new RunEngine({ /* … */ });
+  try {
+    /* tests */
+  } finally {
+    await engine.quit().catch(() => {});
+  }
+});

Repeat for the remaining containerTest blocks.

Also applies to: 202-314, 316-479, 481-638, 640-839, 841-969, 971-1099, 1100-1256

Comment on lines 39 to 44
where: {
name: specifiedQueueName,
workers: { some: { id: lockedBackgroundWorker.id } }, // Ensure the queue is associated with any task of the locked worker
},
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericallam you should add the environmentId in here because the index on this table is name + runtimeEnvironmentId

@@ -61,10 +61,14 @@ const prisma = async (
{ postgresContainer }: { postgresContainer: StartedPostgreSqlContainer },
use: Use<PrismaClient>
) => {
const url = postgresContainer.getConnectionUri();

console.log("Initializing Prisma with URL:", url);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericallam did you mean to leave this in here?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
apps/webapp/test/engine/triggerTask.test.ts (1)

108-201: Ensure engine.quit() always executes – still outstanding

Previous reviews flagged the resource-leak risk. Assertions thrown before the final line skip the shutdown call, leaving Redis connections open and hanging Vitest.

Wrap each containerTest body in try / finally (or add an afterEach) so await engine.quit() runs no matter what.

This comment repeats earlier feedback.

apps/webapp/app/runEngine/services/triggerTask.server.ts (2)

162-168: ⚠️ Potential issue

Idempotent-run path still exits without a trace span

The early-return for cached runs bypasses TraceEventConcern, so these executions never show up in observability dashboards.
A virtually identical issue was raised in the previous review and still applies.

Consider wrapping the cached-run branch in traceIdempotentRun() (or an equivalent helper) before returning. This preserves the fast path while keeping tracing complete.


331-334: ⚠️ Potential issue

Unbounded recursion on RunDuplicateIdempotencyKeyError can overflow the stack

The method self-recurses indefinitely if the duplicate idempotency key condition persists.
The earlier review suggested capping the retries, but the guard is still missing.

+const MAX_DUPLICATE_RETRIES = 3;
 ...
-  if (error instanceof RunDuplicateIdempotencyKeyError) {
-    //retry calling this function, because this time it will return the idempotent run
-    return await this.call({ taskId, environment, body, options, attempt: attempt + 1 });
-  }
+  if (error instanceof RunDuplicateIdempotencyKeyError && attempt < MAX_DUPLICATE_RETRIES) {
+    // Retry (bounded) – next call should return the cached run
+    return await this.call({ taskId, environment, body, options, attempt: attempt + 1 });
+  }

After the limit, either surface a clear error or return the cached run through the tracing pathway.

🧹 Nitpick comments (3)
apps/webapp/app/runEngine/concerns/traceEvents.server.ts (1)

18-42: Minor duplication – extract common attribute builder

traceRun and traceIdempotentRun build almost-identical attributes, context, and span* objects. A small helper reduces copy-paste risk:

private buildBaseOptions(request: TriggerTaskRequest) {
  return {
    context: request.options?.traceContext,
    spanParentAsLink: request.options?.spanParentAsLink,
    parentAsLinkType: request.options?.parentAsLinkType,
    kind: "SERVER" as const,
    environment: request.environment,
    taskSlug: request.taskId,
  };
}

Then spread:

this.eventRepository.traceEvent(
  request.taskId,
  {
    ...this.buildBaseOptions(request),
    attributes: { /* … */ },
    incomplete: true,
    immediate: true,
  },
  
);

Reduces maintenance overhead when the attribute list evolves.

apps/webapp/test/engine/triggerTask.test.ts (2)

559-561: Remove stray console.log to keep test output clean

Left-over logging clutters CI output and can mask important debug traces.

-      console.log(result);

40-70: Centralise mock & engine setup to cut duplication

Each test block re-declares identical Mock* classes and lengthy new RunEngine({...}) configs. Extract helpers into test/utils.ts, e.g.:

export function createTestEngine(prisma, redis) { /* … */ }
export function createTriggerService(engine, prisma) { /* … */ }

Benefits: smaller tests, single source of truth for defaults, easier future refactors.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7f46a27 and 21d9eee.

📒 Files selected for processing (38)
  • .changeset/tender-jobs-collect.md (1 hunks)
  • .vscode/launch.json (1 hunks)
  • apps/webapp/app/components/runs/v3/SpanEvents.tsx (1 hunks)
  • apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts (2 hunks)
  • apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx (1 hunks)
  • apps/webapp/app/runEngine/concerns/errors.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/payloads.server.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/queues.server.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/runChainStates.server.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/runNumbers.server.ts (1 hunks)
  • apps/webapp/app/runEngine/concerns/traceEvents.server.ts (1 hunks)
  • apps/webapp/app/runEngine/services/batchTrigger.server.ts (14 hunks)
  • apps/webapp/app/runEngine/services/triggerTask.server.ts (3 hunks)
  • apps/webapp/app/runEngine/types.ts (1 hunks)
  • apps/webapp/app/runEngine/validators/triggerTaskValidator.ts (1 hunks)
  • apps/webapp/app/v3/models/workerDeployment.server.ts (5 hunks)
  • apps/webapp/app/v3/services/triggerTask.server.ts (2 hunks)
  • apps/webapp/test/engine/triggerTask.test.ts (1 hunks)
  • internal-packages/database/prisma/migrations/20250422152423_add_run_chain_state_to_task_run/migration.sql (1 hunks)
  • internal-packages/database/prisma/schema.prisma (1 hunks)
  • internal-packages/run-engine/package.json (1 hunks)
  • internal-packages/run-engine/src/engine/db/worker.ts (1 hunks)
  • internal-packages/run-engine/src/engine/errors.ts (1 hunks)
  • internal-packages/run-engine/src/engine/index.ts (4 hunks)
  • internal-packages/run-engine/src/engine/tests/setup.ts (4 hunks)
  • internal-packages/run-engine/src/engine/types.ts (3 hunks)
  • internal-packages/run-engine/src/index.ts (1 hunks)
  • internal-packages/testcontainers/package.json (1 hunks)
  • internal-packages/testcontainers/src/index.ts (1 hunks)
  • packages/core/src/v3/apiClient/errors.ts (1 hunks)
  • packages/core/src/v3/errors.ts (1 hunks)
  • packages/core/src/v3/links.ts (1 hunks)
  • packages/core/src/v3/schemas/schemas.ts (1 hunks)
  • packages/core/src/v3/workers/taskExecutor.ts (2 hunks)
  • packages/core/test/taskExecutor.test.ts (2 hunks)
  • references/hello-world/src/trigger/deadlocks.ts (1 hunks)
  • references/hello-world/src/trigger/queues.ts (2 hunks)
✅ Files skipped from review due to trivial changes (6)
  • .changeset/tender-jobs-collect.md
  • internal-packages/run-engine/src/engine/errors.ts
  • internal-packages/run-engine/package.json
  • apps/webapp/app/runEngine/services/batchTrigger.server.ts
  • apps/webapp/app/runEngine/concerns/runNumbers.server.ts
  • packages/core/src/v3/links.ts
🚧 Files skipped from review as they are similar to previous changes (28)
  • internal-packages/testcontainers/package.json
  • apps/webapp/app/runEngine/concerns/errors.ts
  • internal-packages/database/prisma/migrations/20250422152423_add_run_chain_state_to_task_run/migration.sql
  • packages/core/src/v3/errors.ts
  • apps/webapp/app/components/runs/v3/SpanEvents.tsx
  • internal-packages/run-engine/src/engine/db/worker.ts
  • apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx
  • .vscode/launch.json
  • apps/webapp/app/runEngine/concerns/payloads.server.ts
  • packages/core/src/v3/schemas/schemas.ts
  • internal-packages/testcontainers/src/index.ts
  • apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
  • apps/webapp/app/runEngine/validators/triggerTaskValidator.ts
  • references/hello-world/src/trigger/queues.ts
  • packages/core/test/taskExecutor.test.ts
  • internal-packages/run-engine/src/engine/types.ts
  • internal-packages/run-engine/src/engine/index.ts
  • packages/core/src/v3/workers/taskExecutor.ts
  • internal-packages/database/prisma/schema.prisma
  • internal-packages/run-engine/src/engine/tests/setup.ts
  • internal-packages/run-engine/src/index.ts
  • apps/webapp/app/v3/services/triggerTask.server.ts
  • packages/core/src/v3/apiClient/errors.ts
  • apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts
  • apps/webapp/app/runEngine/concerns/queues.server.ts
  • apps/webapp/app/runEngine/types.ts
  • apps/webapp/app/v3/models/workerDeployment.server.ts
  • apps/webapp/app/runEngine/concerns/runChainStates.server.ts
🧰 Additional context used
🧬 Code Graph Analysis (1)
apps/webapp/app/runEngine/concerns/traceEvents.server.ts (3)
apps/webapp/app/runEngine/types.ts (3)
  • TraceEventConcern (131-146)
  • TriggerTaskRequest (28-34)
  • TracedEventSpan (119-129)
apps/webapp/app/v3/eventRepository.server.ts (2)
  • EventRepository (189-1262)
  • eventRepository (1264-1264)
packages/core/src/v3/isomorphic/friendlyId.ts (1)
  • BatchId (96-96)
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: units / 🧪 Unit Tests
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (4)
references/hello-world/src/trigger/deadlocks.ts (2)

14-34: Prefer explicit payload typing instead of any

Both the payload parameter and the { ctx } object are typed as any, forfeiting compile-time safety and IntelliSense.
Define a small interface for your message shape and pass it to task<Payload>() so callers and maintainers know exactly what the task expects.

-import { task, queue } from "@trigger.dev/sdk";
+import { task, queue } from "@trigger.dev/sdk";

+interface MessagePayload {
+  message: string;
+}

-export const deadlockTester = task({
+export const deadlockTester = task<MessagePayload>({
   id: "deadlock-tester",
-  run: async (payload: any, { ctx }) => {
+  run: async (payload, { ctx }) => {
     /* … */
   },
 });

Repeat for deadlockNestedTask.
[ suggest_optional_refactor ]


36-61: Mutual batch-triggering risks unbounded recursion

deadlockTester waits for two deadlockNestedTask runs, each of which immediately triggers two additional deadlockTester runs.
Without a guard (depth counter, unique idempotency key, etc.) this can explode exponentially or recurse forever outside the new deadlock-detection path, exhausting queue capacity or blowing up the DB.

If this file is only a synthetic demonstration that deliberately causes a deadlock, add a comment that it must never be imported in production code. Otherwise, add a maximum depth or a “rootRunId” check.

Would you like an example of such a guard?

apps/webapp/app/runEngine/concerns/traceEvents.server.ts (1)

68-96: Mismatch between incomplete flag and recorded “cached” event

traceIdempotentRun forwards the caller-supplied incomplete boolean to the repository, but always records an additional child event (“There’s an existing run …”).
When incomplete === false that parent span is closed, yet the child event is attached and might violate trace semantics (child must finish before parent). Ensure your EventRepository tolerates this ordering or flip incomplete to true while adding the child record, then mark the span complete afterwards.

apps/webapp/app/runEngine/services/triggerTask.server.ts (1)

215-219: lockedQueueId can be undefined while the run is flagged as version-locked

resolveQueueProperties() is called with lockedToBackgroundWorker ?? undefined, but if the worker lookup fails (e.g., typo in lockToVersion), the service continues and eventually triggers a run without a valid lockedQueueId.

Downstream consumers may assume the field is non-null when lockToVersion was provided, potentially leading to mismatches or dead-locks.

Please verify that resolveQueueProperties (and subsequent logic) either:

  1. Throws when lockedToBackgroundWorker is null while body.options?.lockToVersion is specified, or
  2. Validates the presence of lockedQueueId before proceeding.

If not, augment the validation layer accordingly.

Comment on lines +130 to 135
const [parseDelayError, delayUntil] = await tryCatch(parseDelay(body.options?.delay));

if (parseDelayError) {
throw new EngineServiceValidationError(`Invalid delay ${body.options?.delay}`);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

parseDelay validation can silently accept invalid strings

tryCatch only captures thrown errors. If parseDelay() returns undefined for an unparsable string (the common case), parseDelayError is undefined and the code proceeds, creating a run with delayUntil = undefined.

-const [parseDelayError, delayUntil] = await tryCatch(parseDelay(body.options?.delay));
-
-if (parseDelayError) {
-  throw new EngineServiceValidationError(`Invalid delay ${body.options?.delay}`);
-}
+const [parseDelayError, delayUntil] = await tryCatch(() =>
+  parseDelay(body.options?.delay)
+);
+
+if (
+  parseDelayError ||                     // threw
+  (body.options?.delay && !delayUntil)   // returned undefined for an input
+) {
+  throw new EngineServiceValidationError(
+    `Invalid delay value "${body.options?.delay}"`
+  );
+}

This prevents silent fall-through and gives users immediate feedback.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const [parseDelayError, delayUntil] = await tryCatch(parseDelay(body.options?.delay));
if (parseDelayError) {
throw new EngineServiceValidationError(`Invalid delay ${body.options?.delay}`);
}
const [parseDelayError, delayUntil] = await tryCatch(() =>
parseDelay(body.options?.delay)
);
if (
parseDelayError ||
(body.options?.delay && !delayUntil)
) {
throw new EngineServiceValidationError(
`Invalid delay value "${body.options?.delay}"`
);
}

@ericallam ericallam merged commit e837500 into main Apr 25, 2025
12 checks passed
@ericallam ericallam deleted the ea-branch-50 branch April 25, 2025 11:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants