Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
3e99dff
upload assets in a separate request when needed
AbanoubGhadban Jun 30, 2025
920ddfd
add ndjson end point to accept the rendering request in chunks
AbanoubGhadban Aug 10, 2025
2c792ed
Implement Incremental Render Request Manager and Bundle Validation
AbanoubGhadban Aug 11, 2025
9514f8d
WIP: handle errors happen during incremental rendering
AbanoubGhadban Aug 11, 2025
b1cb931
handle errors happen at the InrecementalRequestManager
AbanoubGhadban Aug 13, 2025
d9f47a7
replace pending operations with content buffer
AbanoubGhadban Aug 13, 2025
1438fa5
Refactor incremental rendering to use a new function stream handler
AbanoubGhadban Aug 14, 2025
f3dc8b2
Enhance error handling in incremental rendering stream
AbanoubGhadban Aug 14, 2025
49093a2
Refactor incremental render tests for improved readability and mainta…
AbanoubGhadban Aug 14, 2025
8d8d34b
create a test to test the streaming from server to client
AbanoubGhadban Aug 15, 2025
0ba4ec4
Refactor incremental render tests to use custom waitFor function
AbanoubGhadban Aug 15, 2025
91d5cb4
Enhance incremental render tests with helper functions for setup and …
AbanoubGhadban Aug 15, 2025
16763f4
Remove unnecessary console logs from worker and test files
AbanoubGhadban Aug 15, 2025
634965c
Refactor incremental render tests to use jest mock functions for sink…
AbanoubGhadban Aug 15, 2025
8b5a511
add echo server test and enhance error reporting in waitFor function
AbanoubGhadban Aug 15, 2025
b17c721
Refactor incremental render request handling and improve error manage…
AbanoubGhadban Aug 18, 2025
5af15e1
Refactor request handling by consolidating prechecks
AbanoubGhadban Aug 19, 2025
ba0c803
make asset-exists endpoint check authentication only
AbanoubGhadban Aug 20, 2025
afa949b
linting
AbanoubGhadban Aug 20, 2025
8264a7b
Enhance asset upload handling to support bundles
AbanoubGhadban Aug 20, 2025
ac4548e
Enhance tests for asset upload handling
AbanoubGhadban Aug 20, 2025
f885339
Add test for asset upload with bundles in hash directories
AbanoubGhadban Aug 20, 2025
f7af8dc
Add incremental render endpoint tests
AbanoubGhadban Aug 21, 2025
64d6db6
Refactor and enhance incremental render endpoint tests
AbanoubGhadban Aug 21, 2025
675676e
make buildVM returns the built vm
AbanoubGhadban Sep 5, 2025
ff526eb
Refactor VM handling and introduce ExecutionContext
AbanoubGhadban Sep 5, 2025
5775d07
Fix runOnOtherBundle function parameters and improve global context h…
AbanoubGhadban Sep 9, 2025
bb6f166
Refactor incremental render handling and improve error management
AbanoubGhadban Sep 9, 2025
befe24a
Enhance incremental render functionality and improve test coverage
AbanoubGhadban Sep 9, 2025
b299408
tmp
AbanoubGhadban Oct 22, 2025
d69eb8b
Fix incremental render tests (#2032)
AbanoubGhadban Nov 16, 2025
aa4a3b3
Add AsyncPropManager to react-on-rails-pro package (#2049)
AbanoubGhadban Nov 19, 2025
d1f4395
Refactor generateRSCPayload from global to parameter (#2061)
AbanoubGhadban Nov 20, 2025
931d45e
Refactor authentication and protocol version handling to use request …
AbanoubGhadban Nov 24, 2025
8703f5d
Add support for incremental rendering at ruby side (#2076)
AbanoubGhadban Nov 24, 2025
03db7f8
Fix HTTPX thread-safety issue with thread-local connections for incre…
AbanoubGhadban Nov 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions packages/react-on-rails-pro-node-renderer/src/shared/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import * as errorReporter from './errorReporter.js';
import { getConfig } from './configBuilder.js';
import log from './log.js';
import type { RenderResult } from '../worker/vm.js';
import fileExistsAsync from './fileExistsAsync.js';

export const TRUNCATION_FILLER = '\n... TRUNCATED ...\n';

Expand Down Expand Up @@ -169,3 +170,29 @@ export function getAssetPath(bundleTimestamp: string | number, filename: string)
const bundleDirectory = getBundleDirectory(bundleTimestamp);
return path.join(bundleDirectory, filename);
}

export async function validateBundlesExist(
bundleTimestamp: string | number,
dependencyBundleTimestamps?: (string | number)[],
): Promise<ResponseResult | null> {
const missingBundles = (
await Promise.all(
[...(dependencyBundleTimestamps ?? []), bundleTimestamp].map(async (timestamp) => {
const bundleFilePath = getRequestBundleFilePath(timestamp);
const fileExists = await fileExistsAsync(bundleFilePath);
return fileExists ? null : timestamp;
}),
)
).filter((timestamp) => timestamp !== null);

if (missingBundles.length > 0) {
const missingBundlesText = missingBundles.length > 1 ? 'bundles' : 'bundle';
log.info(`No saved ${missingBundlesText}: ${missingBundles.join(', ')}`);
return {
headers: { 'Cache-Control': 'no-cache, no-store, max-age=0, must-revalidate' },
status: 410,
data: 'No bundle uploaded',
};
}
return null;
}
209 changes: 162 additions & 47 deletions packages/react-on-rails-pro-node-renderer/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,21 @@ import log, { sharedLoggerOptions } from './shared/log.js';
import packageJson from './shared/packageJson.js';
import { buildConfig, Config, getConfig } from './shared/configBuilder.js';
import fileExistsAsync from './shared/fileExistsAsync.js';
import type { FastifyInstance, FastifyReply, FastifyRequest } from './worker/types.js';
import checkProtocolVersion from './worker/checkProtocolVersionHandler.js';
import authenticate from './worker/authHandler.js';
import { handleRenderRequest, type ProvidedNewBundle } from './worker/handleRenderRequest.js';
import type { FastifyInstance, FastifyReply } from './worker/types.js';
import { performRequestPrechecks } from './worker/requestPrechecks.js';
import { type AuthBody, authenticate } from './worker/authHandler.js';
import {
handleRenderRequest,
type ProvidedNewBundle,
handleNewBundlesProvided,
} from './worker/handleRenderRequest.js';
import handleGracefulShutdown from './worker/handleGracefulShutdown.js';
import {
handleIncrementalRenderRequest,
type IncrementalRenderInitialRequest,
type IncrementalRenderSink,
} from './worker/handleIncrementalRenderRequest.js';
import { handleIncrementalRenderStream } from './worker/handleIncrementalRenderStream.js';
import {
errorResponseResult,
formatExceptionMessage,
Expand Down Expand Up @@ -163,41 +173,11 @@ export default function run(config: Partial<Config>) {
},
});

const isProtocolVersionMatch = async (req: FastifyRequest, res: FastifyReply) => {
// Check protocol version
const protocolVersionCheckingResult = checkProtocolVersion(req);

if (typeof protocolVersionCheckingResult === 'object') {
await setResponse(protocolVersionCheckingResult, res);
return false;
}

return true;
};

const isAuthenticated = async (req: FastifyRequest, res: FastifyReply) => {
// Authenticate Ruby client
const authResult = authenticate(req);

if (typeof authResult === 'object') {
await setResponse(authResult, res);
return false;
}

return true;
};

const requestPrechecks = async (req: FastifyRequest, res: FastifyReply) => {
if (!(await isProtocolVersionMatch(req, res))) {
return false;
}

if (!(await isAuthenticated(req, res))) {
return false;
}

return true;
};
// Ensure NDJSON bodies are not buffered and are available as a stream immediately
app.addContentTypeParser('application/x-ndjson', (req, payload, done) => {
// Pass through the raw stream; the route will consume req.raw
done(null, payload);
});

// See https://github.com/shakacode/react_on_rails_pro/issues/119 for why
// the digest is part of the request URL. Yes, it's not used here, but the
Expand All @@ -212,7 +192,9 @@ export default function run(config: Partial<Config>) {
// Can't infer from the route like Express can
Params: { bundleTimestamp: string; renderRequestDigest: string };
}>('/bundles/:bundleTimestamp/render/:renderRequestDigest', async (req, res) => {
if (!(await requestPrechecks(req, res))) {
const precheckResult = performRequestPrechecks(req.body);
if (precheckResult) {
await setResponse(precheckResult, res);
return;
}

Expand Down Expand Up @@ -254,7 +236,7 @@ export default function run(config: Partial<Config>) {
providedNewBundles,
assetsToCopy,
});
await setResponse(result, res);
await setResponse(result.response, res);
} catch (err) {
const exceptionMessage = formatExceptionMessage(
renderingRequest,
Expand All @@ -272,17 +254,129 @@ export default function run(config: Partial<Config>) {
}
});

// Streaming NDJSON incremental render endpoint
app.post<{
Params: { bundleTimestamp: string; renderRequestDigest: string };
}>('/bundles/:bundleTimestamp/incremental-render/:renderRequestDigest', async (req, res) => {
const { bundleTimestamp } = req.params;

// Stream parser state
let incrementalSink: IncrementalRenderSink | undefined;

try {
// Handle the incremental render stream
await handleIncrementalRenderStream({
request: req,
onRenderRequestReceived: async (obj: unknown) => {
// Build a temporary FastifyRequest shape for protocol/auth check
const tempReqBody = typeof obj === 'object' && obj !== null ? (obj as Record<string, unknown>) : {};

// Perform request prechecks
const precheckResult = performRequestPrechecks(tempReqBody);
if (precheckResult) {
return {
response: precheckResult,
shouldContinue: false,
};
}

// Extract data for incremental render request
const dependencyBundleTimestamps = extractBodyArrayField(
tempReqBody as WithBodyArrayField<Record<string, unknown>, 'dependencyBundleTimestamps'>,
'dependencyBundleTimestamps',
);

const initial: IncrementalRenderInitialRequest = {
firstRequestChunk: obj,
bundleTimestamp,
dependencyBundleTimestamps,
};

try {
const { response, sink } = await handleIncrementalRenderRequest(initial);
incrementalSink = sink;

return {
response,
shouldContinue: !!incrementalSink,
};
} catch (err) {
const errorResponse = errorResponseResult(
formatExceptionMessage(
'IncrementalRender',
err,
'Error while handling incremental render request',
),
);
return {
response: errorResponse,
shouldContinue: false,
};
}
},

onUpdateReceived: async (obj: unknown) => {
if (!incrementalSink) {
log.error({ msg: 'Unexpected update chunk received after rendering was aborted', obj });
return;
}

try {
log.info(`Received a new update chunk ${JSON.stringify(obj)}`);
await incrementalSink.add(obj);
} catch (err) {
// Log error but don't stop processing
log.error({ err, msg: 'Error processing update chunk' });
}
},

onResponseStart: async (response: ResponseResult) => {
await setResponse(response, res);
},

onRequestEnded: () => {
if (!incrementalSink) {
return;
}

incrementalSink.handleRequestClosed();
},
});
} catch (err) {
// If an error occurred during stream processing, send error response
const errorResponse = errorResponseResult(
formatExceptionMessage('IncrementalRender', err, 'Error while processing incremental render stream'),
);
await setResponse(errorResponse, res);
}
});

// There can be additional files that might be required at the runtime.
// Since the remote renderer doesn't contain any assets, they must be uploaded manually.
app.post<{
Body: WithBodyArrayField<Record<string, Asset>, 'targetBundles'>;
}>('/upload-assets', async (req, res) => {
if (!(await requestPrechecks(req, res))) {
const precheckResult = performRequestPrechecks(req.body);
if (precheckResult) {
await setResponse(precheckResult, res);
return;
}
let lockAcquired = false;
let lockfileName: string | undefined;
const assets: Asset[] = Object.values(req.body).filter(isAsset);
const assets: Asset[] = [];

// Extract bundles that start with 'bundle_' prefix
const bundles: Array<{ timestamp: string; bundle: Asset }> = [];
Object.entries(req.body).forEach(([key, value]) => {
if (isAsset(value)) {
if (key.startsWith('bundle_')) {
const timestamp = key.replace('bundle_', '');
bundles.push({ timestamp, bundle: value });
} else {
assets.push(value);
}
}
});

// Handle targetBundles as either a string or an array
const targetBundles = extractBodyArrayField(req.body, 'targetBundles');
Expand All @@ -294,7 +388,9 @@ export default function run(config: Partial<Config>) {
}

const assetsDescription = JSON.stringify(assets.map((asset) => asset.filename));
const taskDescription = `Uploading files ${assetsDescription} to bundle directories: ${targetBundles.join(', ')}`;
const bundlesDescription =
bundles.length > 0 ? ` and bundles ${JSON.stringify(bundles.map((b) => b.bundle.filename))}` : '';
const taskDescription = `Uploading files ${assetsDescription}${bundlesDescription} to bundle directories: ${targetBundles.join(', ')}`;

try {
const { lockfileName: name, wasLockAcquired, errorMessage } = await lock('transferring-assets');
Expand Down Expand Up @@ -333,7 +429,24 @@ export default function run(config: Partial<Config>) {

await Promise.all(assetCopyPromises);

// Delete assets from uploads directory
// Handle bundles using the existing logic from handleRenderRequest
if (bundles.length > 0) {
const providedNewBundles = bundles.map(({ timestamp, bundle }) => ({
timestamp,
bundle,
}));

// Use the existing bundle handling logic
// Note: handleNewBundlesProvided will handle deleting the uploaded bundle files
// Pass null for assetsToCopy since we handle assets separately in this endpoint
const bundleResult = await handleNewBundlesProvided('upload-assets', providedNewBundles, null);
if (bundleResult) {
await setResponse(bundleResult, res);
return;
}
}

// Delete assets from uploads directory (bundles are already handled by handleNewBundlesProvided)
await deleteUploadedAssets(assets);

await setResponse(
Expand All @@ -344,7 +457,7 @@ export default function run(config: Partial<Config>) {
res,
);
} catch (err) {
const msg = 'ERROR when trying to copy assets';
const msg = 'ERROR when trying to copy assets and bundles';
const message = `${msg}. ${err}. Task: ${taskDescription}`;
log.error({
msg,
Expand Down Expand Up @@ -376,7 +489,9 @@ export default function run(config: Partial<Config>) {
Querystring: { filename: string };
Body: WithBodyArrayField<Record<string, unknown>, 'targetBundles'>;
}>('/asset-exists', async (req, res) => {
if (!(await isAuthenticated(req, res))) {
const authResult = authenticate(req.body as AuthBody);
if (authResult) {
await setResponse(authResult, res);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ import { timingSafeEqual } from 'crypto';
import type { FastifyRequest } from './types.js';
import { getConfig } from '../shared/configBuilder.js';

export default function authenticate(req: FastifyRequest) {
export interface AuthBody {
password?: string;
}

export function authenticate(body: AuthBody) {
const { password } = getConfig();

if (password) {
const reqPassword = (req.body as { password?: string }).password || '';
const reqPassword = body.password || '';

// Use timing-safe comparison to prevent timing attacks
// Both strings must be converted to buffers of the same length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
* Logic for checking protocol version.
* @module worker/checkProtocVersionHandler
*/
import type { FastifyRequest } from './types.js';
import packageJson from '../shared/packageJson.js';
import log from '../shared/log.js';

Expand Down Expand Up @@ -35,14 +34,14 @@ function normalizeVersion(version: string): string {
return normalized;
}

interface RequestBody {
export interface RequestBody {
protocolVersion?: string;
gemVersion?: string;
railsEnv?: string;
}

export default function checkProtocolVersion(req: FastifyRequest) {
const { protocolVersion: reqProtocolVersion, gemVersion, railsEnv } = req.body as RequestBody;
export function checkProtocolVersion(body: RequestBody) {
const { protocolVersion: reqProtocolVersion, gemVersion, railsEnv } = body;

// Check protocol version
if (reqProtocolVersion !== packageJson.protocolVersion) {
Expand All @@ -52,7 +51,7 @@ export default function checkProtocolVersion(req: FastifyRequest) {
data: `Unsupported renderer protocol version ${
reqProtocolVersion
? `request protocol ${reqProtocolVersion}`
: `MISSING with body ${JSON.stringify(req.body)}`
: `MISSING with body ${JSON.stringify(body)}`
} does not match installed renderer protocol ${packageJson.protocolVersion} for version ${packageJson.version}.
Update either the renderer or the Rails server`,
};
Expand Down
Loading
Loading