Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/define-runevent-schema.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": minor
---

Define RunEvent schema and update ApiClient to use it
7 changes: 7 additions & 0 deletions .changeset/runevent-schema-apiclient.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@trigger.dev/core": patch
---

fix(core): define RunEvent schema and update ApiClient validation

Defines proper RunEvent schema with Zod validation and updates ApiClient to use the new schema for improved type safety and runtime validation.
89 changes: 38 additions & 51 deletions packages/cli-v3/src/commands/deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -501,9 +501,8 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) {
const version = deployment.version;

const rawDeploymentLink = `${authorization.dashboardUrl}/projects/v3/${resolvedConfig.project}/deployments/${deployment.shortCode}`;
const rawTestLink = `${authorization.dashboardUrl}/projects/v3/${
resolvedConfig.project
}/test?environment=${options.env === "prod" ? "prod" : "stg"}`;
const rawTestLink = `${authorization.dashboardUrl}/projects/v3/${resolvedConfig.project
}/test?environment=${options.env === "prod" ? "prod" : "stg"}`;

const deploymentLink = cliLink("View deployment", rawDeploymentLink);
const testLink = cliLink("Test tasks", rawTestLink);
Expand Down Expand Up @@ -720,8 +719,7 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) {
}
} else {
outro(
`Version ${version} deployed with ${taskCount} detected task${taskCount === 1 ? "" : "s"} ${
isLinksSupported ? `| ${deploymentLink} | ${testLink}` : ""
`Version ${version} deployed with ${taskCount} detected task${taskCount === 1 ? "" : "s"} ${isLinksSupported ? `| ${deploymentLink} | ${testLink}` : ""
}`
);

Expand All @@ -745,18 +743,16 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) {
TRIGGER_VERSION: version,
TRIGGER_DEPLOYMENT_SHORT_CODE: deployment.shortCode,
TRIGGER_DEPLOYMENT_URL: `${authorization.dashboardUrl}/projects/v3/${resolvedConfig.project}/deployments/${deployment.shortCode}`,
TRIGGER_TEST_URL: `${authorization.dashboardUrl}/projects/v3/${
resolvedConfig.project
}/test?environment=${options.env === "prod" ? "prod" : "stg"}`,
TRIGGER_TEST_URL: `${authorization.dashboardUrl}/projects/v3/${resolvedConfig.project
}/test?environment=${options.env === "prod" ? "prod" : "stg"}`,
},
outputs: {
deploymentVersion: version,
workerVersion: version,
deploymentShortCode: deployment.shortCode,
deploymentUrl: `${authorization.dashboardUrl}/projects/v3/${resolvedConfig.project}/deployments/${deployment.shortCode}`,
testUrl: `${authorization.dashboardUrl}/projects/v3/${
resolvedConfig.project
}/test?environment=${options.env === "prod" ? "prod" : "stg"}`,
testUrl: `${authorization.dashboardUrl}/projects/v3/${resolvedConfig.project
}/test?environment=${options.env === "prod" ? "prod" : "stg"}`,
needsPromotion: options.skipPromotion ? "true" : "false",
},
});
Expand Down Expand Up @@ -799,8 +795,7 @@ async function failDeploy(
checkLogsForErrors(logs);

outro(
`${chalkError(`${prefix}:`)} ${
error.message
`${chalkError(`${prefix}:`)} ${error.message
}. Full build logs have been saved to ${logPath}`
);

Expand Down Expand Up @@ -1100,9 +1095,8 @@ async function handleNativeBuildServerDeploy({
const deployment = initializeDeploymentResult.data;

const rawDeploymentLink = `${dashboardUrl}/projects/v3/${config.project}/deployments/${deployment.shortCode}`;
const rawTestLink = `${dashboardUrl}/projects/v3/${config.project}/test?environment=${
options.env === "prod" ? "prod" : "stg"
}`;
const rawTestLink = `${dashboardUrl}/projects/v3/${config.project}/test?environment=${options.env === "prod" ? "prod" : "stg"
}`;

const exposedDeploymentLink = isLinksSupported
? cliLink(chalk.bold(rawDeploymentLink), rawDeploymentLink)
Expand Down Expand Up @@ -1155,8 +1149,9 @@ async function handleNativeBuildServerDeploy({
const [readSessionError, readSession] = await tryCatch(
stream.readSession(
{
start: { from: { seqNum: 0 }, clamp: true },
stop: { waitSecs: 60 * 20 }, // 20 minutes
start: {
from: { seqNum: 0 },
},
},
Comment on lines +1152 to 1155
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Removal of S2 read session timeout allows CLI to hang indefinitely

The old code had stop: { waitSecs: 60 * 20 } which provided a 20-minute safety timeout for the S2 read session. This PR removes it entirely. The for await (const record of readSession) loop at packages/cli-v3/src/commands/deploy.ts:1175 will now block indefinitely if no finalized event is ever emitted (e.g., build server crashes, stream infrastructure issues, or any scenario where the deployment process hangs without producing a terminal event). The abortController is only triggered upon receiving a finalized event (packages/cli-v3/src/commands/deploy.ts:1227), so without the server-side timeout, there is no fallback to break the loop.

Suggested change
start: {
from: { seqNum: 0 },
},
},
start: {
from: { seqNum: 0 },
},
stop: { waitSecs: 60 * 20 }, // 20 minutes
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

{ signal: abortController.signal }
)
Expand All @@ -1167,8 +1162,7 @@ async function handleNativeBuildServerDeploy({
log.warn(`Failed streaming build logs, open the deployment in the dashboard to view the logs`);

outro(
`Version ${deployment.version} is being deployed ${
isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : ""
`Version ${deployment.version} is being deployed ${isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : ""
}`
);

Expand Down Expand Up @@ -1214,10 +1208,10 @@ async function handleNativeBuildServerDeploy({
level === "error"
? chalk.bold(chalkError(message))
: level === "warn"
? chalkWarning(message)
: level === "debug"
? chalkGrey(message)
: message;
? chalkWarning(message)
: level === "debug"
? chalkGrey(message)
: message;

// We use console.log here instead of clack's logger as the current version does not support changing the line spacing.
// And the logs look verbose with the default spacing.
Expand Down Expand Up @@ -1250,8 +1244,7 @@ async function handleNativeBuildServerDeploy({
log.error("Failed dequeueing build, please try again shortly");

throw new OutroCommandError(
`Version ${deployment.version} ${
isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : ""
`Version ${deployment.version} ${isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : ""
}`
);
}
Expand All @@ -1266,8 +1259,7 @@ async function handleNativeBuildServerDeploy({
}

throw new OutroCommandError(
`Version ${deployment.version} ${
isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : ""
`Version ${deployment.version} ${isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : ""
}`
);
}
Expand All @@ -1293,13 +1285,12 @@ async function handleNativeBuildServerDeploy({
}

outro(
`Version ${deployment.version} was deployed ${
isLinksSupported
? `| ${cliLink("Test tasks", rawTestLink)} | ${cliLink(
"View deployment",
rawDeploymentLink
)}`
: ""
`Version ${deployment.version} was deployed ${isLinksSupported
? `| ${cliLink("Test tasks", rawTestLink)} | ${cliLink(
"View deployment",
rawDeploymentLink
)}`
: ""
}`
);
return process.exit(0);
Expand All @@ -1313,14 +1304,13 @@ async function handleNativeBuildServerDeploy({
chalk.bold(
chalkError(
"Deployment failed" +
(finalDeploymentEvent.message ? `: ${finalDeploymentEvent.message}` : "")
(finalDeploymentEvent.message ? `: ${finalDeploymentEvent.message}` : "")
)
)
);

throw new OutroCommandError(
`Version ${deployment.version} deployment failed ${
isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : ""
`Version ${deployment.version} deployment failed ${isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : ""
}`
);
}
Expand All @@ -1333,14 +1323,13 @@ async function handleNativeBuildServerDeploy({
chalk.bold(
chalkError(
"Deployment timed out" +
(finalDeploymentEvent.message ? `: ${finalDeploymentEvent.message}` : "")
(finalDeploymentEvent.message ? `: ${finalDeploymentEvent.message}` : "")
)
)
);

throw new OutroCommandError(
`Version ${deployment.version} deployment timed out ${
isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : ""
`Version ${deployment.version} deployment timed out ${isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : ""
}`
);
}
Expand All @@ -1353,14 +1342,13 @@ async function handleNativeBuildServerDeploy({
chalk.bold(
chalkError(
"Deployment was canceled" +
(finalDeploymentEvent.message ? `: ${finalDeploymentEvent.message}` : "")
(finalDeploymentEvent.message ? `: ${finalDeploymentEvent.message}` : "")
)
)
);

throw new OutroCommandError(
`Version ${deployment.version} deployment canceled ${
isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : ""
`Version ${deployment.version} deployment canceled ${isLinksSupported ? `| ${cliLink("View deployment", rawDeploymentLink)}` : ""
}`
);
}
Expand All @@ -1379,13 +1367,12 @@ async function handleNativeBuildServerDeploy({
}

outro(
`Version ${deployment.version} ${
isLinksSupported
? `| ${cliLink("Test tasks", rawTestLink)} | ${cliLink(
"View deployment",
rawDeploymentLink
)}`
: ""
`Version ${deployment.version} ${isLinksSupported
? `| ${cliLink("Test tasks", rawTestLink)} | ${cliLink(
"View deployment",
rawDeploymentLink
)}`
: ""
}`
);
return process.exit(0);
Expand Down
Binary file added packages/cli-v3/tsc_output.txt
Binary file not shown.
4 changes: 3 additions & 1 deletion packages/core/src/v3/apiClient/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
EnvironmentVariableResponseBody,
EnvironmentVariableWithSecret,
ListQueueOptions,
ListRunEventsResponse,
ListRunResponseItem,
ListScheduleOptions,
QueueItem,
Expand All @@ -44,6 +45,7 @@ import {
RetrieveQueueParam,
RetrieveRunResponse,
RetrieveRunTraceResponseBody,
RunEvent,
ScheduleObject,
SendInputStreamResponseBody,
StreamBatchItemsResponse,
Expand Down Expand Up @@ -702,7 +704,7 @@ export class ApiClient {

listRunEvents(runId: string, requestOptions?: ZodFetchOptions) {
return zodfetch(
z.any(), // TODO: define a proper schema for this
ListRunEventsResponse,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Schema tightening from z.any() to ListRunEventsResponse may strip fields from API response

The change from z.any() to ListRunEventsResponse at packages/core/src/v3/apiClient/index.ts:707 introduces strict Zod parsing. The API route at apps/webapp/app/routes/api.v1.runs.$runId.events.ts:48-54 returns all fields from RunPreparedEvent which includes idempotencyKey and environmentType (selected at apps/webapp/app/v3/eventRepository/eventRepository.server.ts:704,715). These fields are NOT in the new RunEvent schema and will be silently stripped by Zod's default behavior. While this is likely intentional (moving from untyped to typed), any existing consumers of listRunEvents() that relied on idempotencyKey or environmentType will lose access to those fields. This is a semantic API contract change worth noting in the changeset.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

`${this.baseUrl}/api/v1/runs/${runId}/events`,
{
method: "GET",
Expand Down
16 changes: 9 additions & 7 deletions packages/core/src/v3/realtimeStreams/streamsWriterV2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
accessToken: options.accessToken,
...(options.endpoint
? {
endpoints: {
account: options.endpoint,
basin: options.endpoint,
},
}
endpoints: {
account: options.endpoint,
basin: options.endpoint,
},
}
: {}),
});
this.flushIntervalMs = options.flushIntervalMs ?? 200;
Expand Down Expand Up @@ -152,7 +152,9 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
return;
}
// Convert each chunk to JSON string and wrap in AppendRecord
controller.enqueue(AppendRecord.string({ body: JSON.stringify({ data: chunk, id: nanoid(7) }) }));
controller.enqueue(
AppendRecord.string({ body: JSON.stringify({ data: chunk, id: nanoid(7) }) })
);
},
})
)
Expand Down Expand Up @@ -223,5 +225,5 @@ async function* streamToAsyncIterator<T>(stream: ReadableStream<T>): AsyncIterab
function safeReleaseLock(reader: ReadableStreamDefaultReader<any>) {
try {
reader.releaseLock();
} catch (error) {}
} catch (error) { }
}
Loading