Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { spawnSync } from 'child_process'
import { randomUUID } from 'crypto'
import express from 'express'
import fs from 'fs'
import path from 'path'
import { callStepFile } from '../call-step-file'
import { createEventManager } from '../event-manager'
import { LockedData } from '../locked-data'
import { Logger } from '../logger'
import type { Motia } from '../motia'
import { NoTracer } from '../observability/no-tracer'
import { NoPrinter } from '../printer'
import { QueueManager } from '../queue-manager'
import { MemoryStateAdapter } from '../state/adapters/memory-state-adapter'
import { createApiStep } from './fixtures/step-fixtures'

describe('callStepFile (large payload via temp file) - Python', () => {
beforeAll(() => {
process.env._MOTIA_TEST_MODE = 'true'
})

const hasPython = spawnSync('python', ['-V']).status === 0
const itif = hasPython ? it : it.skip

itif(
'handles >1MB payload using meta file and cleans up temp dir',
async () => {
const baseDir = path.join(__dirname, 'steps')
const queueManager = new QueueManager()
const eventManager = createEventManager(queueManager)
const state = new MemoryStateAdapter()

const step = createApiStep({ emits: [] }, path.join(baseDir, 'large-data-step.py'))
const printer = new NoPrinter()
const traceId = randomUUID()
const logger = new Logger()
const tracer = new NoTracer()

const motia: Motia = {
eventManager,
state,
printer,
queueManager,
lockedData: new LockedData(baseDir, 'memory', printer),
loggerFactory: { create: () => logger },
tracerFactory: { createTracer: () => tracer, clear: () => Promise.resolve() },
app: express(),
stateAdapter: state,
}

const bigData = 'x'.repeat(2 * 1024 * 1024)

let metaPath: string | undefined
const originalWrite = fs.writeFileSync
const writeSpy = jest.spyOn(fs, 'writeFileSync').mockImplementation((file: any, data: any, options?: any) => {
if (typeof file === 'string' && file.endsWith(path.join('meta.json'))) {
metaPath = file
}
return (originalWrite as any).call(fs, file, data, options)
})

await callStepFile<number>({ step, traceId, data: bigData, logger, tracer }, motia)

if (metaPath) {
expect(fs.existsSync(metaPath)).toBe(false)
}

writeSpy.mockRestore()
},
30000,
)
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { spawnSync } from 'child_process'
import { randomUUID } from 'crypto'
import express from 'express'
import fs from 'fs'
import path from 'path'
import { callStepFile } from '../call-step-file'
import { createEventManager } from '../event-manager'
import { LockedData } from '../locked-data'
import { Logger } from '../logger'
import type { Motia } from '../motia'
import { NoTracer } from '../observability/no-tracer'
import { NoPrinter } from '../printer'
import { QueueManager } from '../queue-manager'
import { MemoryStateAdapter } from '../state/adapters/memory-state-adapter'
import { createApiStep } from './fixtures/step-fixtures'

describe('callStepFile (large payload via temp file) - Ruby', () => {
beforeAll(() => {
process.env._MOTIA_TEST_MODE = 'true'
})

const hasRuby = spawnSync('ruby', ['-v']).status === 0
const itif = hasRuby ? it : it.skip

itif(
'handles >1MB payload using meta file and cleans up temporary dir',
async () => {
const baseDir = path.join(__dirname, 'steps')
const queueManager = new QueueManager()
const eventManager = createEventManager(queueManager)
const state = new MemoryStateAdapter()

const step = createApiStep({ emits: [] }, path.join(baseDir, 'large-data-step.rb'))
const printer = new NoPrinter()
const traceId = randomUUID()
const logger = new Logger()
const tracer = new NoTracer()

const motia: Motia = {
eventManager,
state,
printer,
queueManager,
lockedData: new LockedData(baseDir, 'memory', printer),
loggerFactory: { create: () => logger },
tracerFactory: { createTracer: () => tracer, clear: () => Promise.resolve() },
app: express(),
stateAdapter: state,
}

const bigData = 'x'.repeat(2 * 1024 * 1024)

let metaPath: string | undefined
const originalWrite = fs.writeFileSync
const writeSpy = jest.spyOn(fs, 'writeFileSync').mockImplementation((file: any, data: any, options?: any) => {
if (typeof file === 'string' && file.endsWith(path.join('meta.json'))) {
metaPath = file
}
return (originalWrite as any).call(fs, file, data, options)
})

await callStepFile<number>({ step, traceId, data: bigData, logger, tracer }, motia)

if (metaPath) {
expect(fs.existsSync(metaPath)).toBe(false)
}

writeSpy.mockRestore()
},
30000,
)
})
70 changes: 70 additions & 0 deletions packages/core/src/__tests__/call-step-file.large-payload.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { randomUUID } from 'crypto'
import express from 'express'
import fs from 'fs'
import path from 'path'
import { callStepFile } from '../call-step-file'
import { createEventManager } from '../event-manager'
import { LockedData } from '../locked-data'
import { Logger } from '../logger'
import type { Motia } from '../motia'
import { NoTracer } from '../observability/no-tracer'
import { NoPrinter } from '../printer'
import { QueueManager } from '../queue-manager'
import { MemoryStateAdapter } from '../state/adapters/memory-state-adapter'
import { createApiStep } from './fixtures/step-fixtures'

describe('callStepFile (large payload via temp file)', () => {
beforeAll(() => {
// Ensure node-runner.ts is used (ts-node/register)
process.env._MOTIA_TEST_MODE = 'true'
})

it('handles >1MB payload using meta file and cleans up temp dir', async () => {
const baseDir = path.join(__dirname, 'steps')
const queueManager = new QueueManager()
const eventManager = createEventManager(queueManager)
const state = new MemoryStateAdapter()

const step = createApiStep({ emits: [] }, path.join(baseDir, 'large-data-step.ts'))
const printer = new NoPrinter()
const traceId = randomUUID()
const logger = new Logger()
const tracer = new NoTracer()

const motia: Motia = {
eventManager,
state,
printer,
queueManager,
lockedData: new LockedData(baseDir, 'memory', printer),
loggerFactory: { create: () => logger },
tracerFactory: { createTracer: () => tracer, clear: () => Promise.resolve() },
app: express(),
stateAdapter: state,
}

// Prepare a ~2MB string to exceed the 1MB threshold
const bigData = 'x'.repeat(2 * 1024 * 1024)

// Capture meta.json path to verify cleanup of its parent dir
let metaPath: string | undefined
const originalWrite = fs.writeFileSync
const writeSpy = jest.spyOn(fs, 'writeFileSync').mockImplementation((file: any, data: any, options?: any) => {
if (typeof file === 'string' && file.endsWith(path.join('meta.json'))) {
metaPath = file
}
// call through to original implementation captured before spy
return (originalWrite as any).call(fs, file, data, options)
})

// Act: call the step file and expect the handler to return the string length
await callStepFile<number>({ step, traceId, data: bigData, logger, tracer }, motia)

// Assert: temp meta file is cleaned up (shared temp dir may persist)
if (metaPath) {
expect(fs.existsSync(metaPath)).toBe(false)
}

writeSpy.mockRestore()
}, 30000)
})
28 changes: 28 additions & 0 deletions packages/core/src/__tests__/steps/large-data-step.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
config = {
"type": "api",
"name": "large-data-step",
"emits": [],
"path": "/large-data",
"method": "POST",
}


async def handler(body, _context):
if isinstance(body, str):
return {
"status": 200,
"body": {
"return data": "random",
},
}

if isinstance(body, (bytes, bytearray)):
return len(body)

if body is not None and hasattr(body, "__len__"):
try:
return len(body)
except (TypeError, NotImplementedError):
pass

return 0
32 changes: 32 additions & 0 deletions packages/core/src/__tests__/steps/large-data-step.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
def config
{
type: "api",
name: "large-data-step",
emits: [],
path: "/large-data",
method: "POST"
}
end

def handler(body, _ctx)
if body.is_a?(String)
return {
"status" => 200,
"body" => { "return data" => "random" }
}
end

if body.respond_to?(:type) && body.type == "Buffer" && body.respond_to?(:data)
data = body.data
return data.length if data.respond_to?(:length)
end

if body.respond_to?(:length)
begin
return body.length
rescue StandardError
end
end

0
end
22 changes: 22 additions & 0 deletions packages/core/src/__tests__/steps/large-data-step.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import type { ApiResponse, ApiRouteConfig, ApiRouteHandler } from '../../types'

export const config: ApiRouteConfig = {
type: 'api',
name: 'large-data-step',
emits: [],
path: '/large-data',
method: 'POST',
}

export const handler: ApiRouteHandler<string, ApiResponse<number, unknown>> = async (body) => {
if (typeof body === 'string')
return {
status: 200,
body: {
'return data': 'random',
},
}
if (Buffer.isBuffer(body)) return body.byteLength
if (body && typeof (body as any).length === 'number') return (body as any).length
return 0
}
Loading
Loading