diff --git a/packages/docs/content/docs/api-reference.mdx b/packages/docs/content/docs/api-reference.mdx index 3282d0ad8..1a4e1a45d 100644 --- a/packages/docs/content/docs/api-reference.mdx +++ b/packages/docs/content/docs/api-reference.mdx @@ -1,547 +1,1637 @@ --- title: API Reference -description: Complete API reference for Motia framework - types, interfaces, and utilities +description: Complete API reference for Motia framework --- -# API Reference +Everything you need to know about Motia's APIs. This reference covers all the types, methods, and configurations available when building with Motia. -Complete reference documentation for Motia's TypeScript/JavaScript and Python APIs. +If you're new to Motia, start with the [Steps guide](/docs/concepts/steps) to understand the basics. -## Core Types +## Step Configurations -### Step Configuration Types +Every Step needs a config. Here's what you can put in it. - - +### ApiRouteConfig -#### ApiRouteConfig +Use this for HTTP endpoints. -Configuration for API Steps (HTTP endpoints). + + ```typescript -interface ApiRouteConfig { - type: 'api' - name: string - description?: string - path: string - method: 'GET' | 'POST' | 'PUT' | 'DELETE' | 'PATCH' | 'OPTIONS' | 'HEAD' - emits: Emit[] - virtualEmits?: Emit[] - virtualSubscribes?: string[] - flows?: string[] - middleware?: ApiMiddleware[] - bodySchema?: ZodInput - responseSchema?: Record - queryParams?: QueryParam[] - includeFiles?: string[] +import { ApiRouteConfig } from 'motia' + +const config: ApiRouteConfig = { + type: 'api', + name: 'CreateUser', + path: '/users', + method: 'POST', + emits: ['user.created'], + + // Optional fields + description: 'Creates a new user', + flows: ['user-management'], + bodySchema: z.object({ name: z.string() }), + responseSchema: { + 201: z.object({ id: z.string(), name: z.string() }) + }, + middleware: [authMiddleware], + queryParams: [{ name: 'invite', description: 'Invite code' }], + virtualEmits: ['notification.sent'], + virtualSubscribes: ['user.invited'], + includeFiles: ['../../assets/template.html'] } ``` -#### EventConfig - -Configuration for Event Steps (background tasks). - -```typescript -interface EventConfig { - type: 'event' - name: string - description?: string - subscribes: string[] - emits: Emit[] - virtualEmits?: Emit[] - input: ZodInput - flows?: string[] - includeFiles?: string[] + + + +```javascript +const config = { + type: 'api', + name: 'CreateUser', + path: '/users', + method: 'POST', + emits: ['user.created'], + + // Optional fields + description: 'Creates a new user', + flows: ['user-management'], + bodySchema: z.object({ name: z.string() }), + responseSchema: { + 201: z.object({ id: z.string(), name: z.string() }) + }, + middleware: [authMiddleware], + queryParams: [{ name: 'invite', description: 'Invite code' }], + virtualEmits: ['notification.sent'], + virtualSubscribes: ['user.invited'], + includeFiles: ['../../assets/template.html'] } ``` -#### CronConfig + + -Configuration for Cron Steps (scheduled tasks). +```python +from pydantic import BaseModel -```typescript -interface CronConfig { - type: 'cron' - name: string - description?: string - cron: string - emits: Emit[] - virtualEmits?: Emit[] - flows?: string[] - includeFiles?: string[] +class UserResponse(BaseModel): + id: str + name: str + +config = { + "type": "api", + "name": "CreateUser", + "path": "/users", + "method": "POST", + "emits": ["user.created"], + + # Optional fields + "description": "Creates a new user", + "flows": ["user-management"], + "bodySchema": {"type": "object", "properties": {"name": {"type": "string"}}}, + "responseSchema": {201: UserResponse.model_json_schema()}, + "middleware": [auth_middleware], + "queryParams": [{"name": "invite", "description": "Invite code"}], + "virtualEmits": ["notification.sent"], + "virtualSubscribes": ["user.invited"], + "includeFiles": ["../../assets/template.html"] } ``` -#### NoopConfig + + + +**Required fields:** +- `type` - Always `'api'` +- `name` - Unique identifier for this Step +- `path` - URL path (supports params like `/users/:id`) +- `method` - HTTP method (`GET`, `POST`, `PUT`, `DELETE`, `PATCH`, `OPTIONS`, `HEAD`) +- `emits` - Topics this Step can emit (list all, even if empty `[]`) + +**Optional fields:** +- `description` - Human-readable description +- `flows` - Flow names for Workbench grouping +- `bodySchema` - Zod schema (TS/JS) or JSON Schema (Python). Can be ZodObject or ZodArray. **Note:** Schema is not validated automatically. Use middleware or validate manually in your handler with `.parse()` or `.safeParse()`. +- `responseSchema` - Map of status codes to response schemas (used for type generation and OpenAPI) +- `middleware` - Functions to run before the handler (executed in array order) +- `queryParams` - Query parameter docs for Workbench +- `virtualEmits` - Topics shown in Workbench but not actually emitted (gray connections) +- `virtualSubscribes` - Topics shown in Workbench for flow visualization (useful for chaining HTTP requests) +- `includeFiles` - Files to bundle with this Step (supports glob patterns, relative to Step file) + +--- + +### EventConfig -Configuration for NOOP Steps (visual connectors). +Use this for background jobs and event-driven tasks. + + + ```typescript -interface NoopConfig { - type: 'noop' - name: string - description?: string - virtualEmits: Emit[] - virtualSubscribes: string[] - flows?: string[] +import { EventConfig } from 'motia' + +const config: EventConfig = { + type: 'event', + name: 'ProcessOrder', + subscribes: ['order.created'], + input: z.object({ orderId: z.string(), amount: z.number() }), + emits: ['order.processed'], + + // Optional fields + description: 'Processes new orders', + flows: ['orders'], + virtualEmits: ['payment.initiated'], + virtualSubscribes: ['order.cancelled'], + includeFiles: ['./templates/*.html'], + infrastructure: { + handler: { ram: 2048, timeout: 60 }, + queue: { type: 'fifo', maxRetries: 3, visibilityTimeout: 90 } + } } ``` - - -#### API Step Config - -```python -config = { - "type": "api", - "name": str, - "description": str, # Optional - "path": str, - "method": str, # GET, POST, PUT, DELETE, PATCH, OPTIONS, HEAD - "emits": list[str], - "virtualEmits": list[str], # Optional - "virtualSubscribes": list[str], # Optional - "flows": list[str], # Optional - "middleware": list, # Optional - "bodySchema": dict, # JSON Schema or Pydantic model schema - "responseSchema": dict, # Dict of status code to schema - "queryParams": list[dict], # Optional - "includeFiles": list[str] # Optional + + +```javascript +const config = { + type: 'event', + name: 'ProcessOrder', + subscribes: ['order.created'], + input: z.object({ orderId: z.string(), amount: z.number() }), + emits: ['order.processed'], + + // Optional fields + description: 'Processes new orders', + flows: ['orders'], + virtualEmits: ['payment.initiated'], + virtualSubscribes: ['order.cancelled'], + includeFiles: ['./templates/*.html'] } ``` -#### Event Step Config + + ```python +from pydantic import BaseModel + +class OrderInput(BaseModel): + order_id: str + amount: float + config = { "type": "event", - "name": str, - "description": str, # Optional - "subscribes": list[str], - "emits": list[str], - "virtualEmits": list[str], # Optional - "input": dict, # JSON Schema or Pydantic model schema - "flows": list[str], # Optional - "includeFiles": list[str] # Optional + "name": "ProcessOrder", + "subscribes": ["order.created"], + "input": OrderInput.model_json_schema(), + "emits": ["order.processed"], + + # Optional fields + "description": "Processes new orders", + "flows": ["orders"], + "virtualEmits": ["payment.initiated"], + "virtualSubscribes": ["order.cancelled"], + "includeFiles": ["./templates/*.html"], + "infrastructure": { + "handler": {"ram": 2048, "timeout": 60}, + "queue": {"type": "fifo", "maxRetries": 3, "visibilityTimeout": 90} + } } ``` -#### Cron Step Config + + + +**Required fields:** +- `type` - Always `'event'` +- `name` - Unique identifier +- `subscribes` - Topic names to listen to +- `input` - Zod schema (TS/JS) or JSON Schema (Python) for event data. **Note:** Validation is not automatic. In Python, manually validate with Pydantic if needed. +- `emits` - Topics this Step can emit + +**Optional fields:** +- `description` - Human-readable description +- `flows` - Flow names for Workbench +- `virtualEmits` / `virtualSubscribes` - For Workbench visualization only +- `includeFiles` - Files to bundle with this Step (supports glob patterns) +- `infrastructure` - Resource limits and queue config (Event Steps only, Motia Cloud) + +**Infrastructure config** (Motia Cloud only): +- `handler.ram` - Memory in MB (128-10240, required) +- `handler.cpu` - CPU vCPUs (optional, auto-calculated from RAM if not provided, must be proportional) +- `handler.timeout` - Timeout in seconds (1-900, required) +- `queue.type` - `'fifo'` or `'standard'` (required) +- `queue.maxRetries` - Max retry attempts (0+, required) +- `queue.visibilityTimeout` - Timeout in seconds (required, must be > handler.timeout to prevent premature redelivery) +- `queue.delaySeconds` - Optional delay before message becomes visible (0-900) -```python -config = { - "type": "cron", - "name": str, - "description": str, # Optional - "cron": str, # Cron expression - "emits": list[str], - "virtualEmits": list[str], # Optional - "flows": list[str], # Optional - "includeFiles": list[str] # Optional +--- + +### CronConfig + +Use this for scheduled tasks. + + + + +```typescript +import { CronConfig } from 'motia' + +const config: CronConfig = { + type: 'cron', + name: 'DailyReport', + cron: '0 9 * * *', + emits: ['report.generated'], + + // Optional fields + description: 'Generates daily reports at 9 AM', + flows: ['reporting'], + virtualEmits: ['email.sent'], + virtualSubscribes: ['report.requested'], + includeFiles: ['./templates/report.html'] +} +``` + + + + +```javascript +const config = { + type: 'cron', + name: 'DailyReport', + cron: '0 9 * * *', + emits: ['report.generated'], + + // Optional fields + description: 'Generates daily reports at 9 AM', + flows: ['reporting'], + virtualEmits: ['email.sent'], + virtualSubscribes: ['report.requested'], + includeFiles: ['./templates/report.html'] } ``` -#### NOOP Step Config + + ```python config = { - "type": "noop", - "name": str, - "description": str, # Optional - "virtualEmits": list[str], - "virtualSubscribes": list[str], - "flows": list[str] # Optional + "type": "cron", + "name": "DailyReport", + "cron": "0 9 * * *", + "emits": ["report.generated"], + + # Optional fields + "description": "Generates daily reports at 9 AM", + "flows": ["reporting"], + "virtualEmits": ["email.sent"], + "virtualSubscribes": ["report.requested"], + "includeFiles": ["./templates/report.html"] } ``` +**Required fields:** +- `type` - Always `'cron'` +- `name` - Unique identifier +- `cron` - Cron expression (e.g., `'0 9 * * *'` for 9 AM daily) +- `emits` - Topics this Step can emit + +**Optional fields:** +- `description`, `flows`, `virtualEmits`, `virtualSubscribes`, `includeFiles` - Same as above + +👉 Use [crontab.guru](https://crontab.guru) to build cron expressions. + --- -## Context API +### NoopConfig -The context object available in all Step handlers. +Use this for visual-only nodes in Workbench (no code execution). - + -### FlowContext - ```typescript -interface FlowContext { - // Event emission - emit: (event: EmitEvent) => Promise - - // Structured logging - logger: Logger - - // State management - state: StateManager +import { NoopConfig } from 'motia' + +const config: NoopConfig = { + type: 'noop', + name: 'ManualApproval', + virtualEmits: ['approved', 'rejected'], + virtualSubscribes: ['approval.requested'], - // Real-time streams - streams: StreamsManager + // Optional fields + description: 'Manager approval gate', + flows: ['approvals'] +} +``` + + + + +```javascript +const config = { + type: 'noop', + name: 'ManualApproval', + virtualEmits: ['approved', 'rejected'], + virtualSubscribes: ['approval.requested'], - // Request tracing - traceId: string + // Optional fields + description: 'Manager approval gate', + flows: ['approvals'] } ``` -### Logger + + -```typescript -interface Logger { - info(message: string, metadata?: Record): void - error(message: string, metadata?: Record): void - warn(message: string, metadata?: Record): void - debug(message: string, metadata?: Record): void +```python +config = { + "type": "noop", + "name": "ManualApproval", + "virtualEmits": ["approved", "rejected"], + "virtualSubscribes": ["approval.requested"], + + # Optional fields + "description": "Manager approval gate", + "flows": ["approvals"] } ``` -### StateManager + + + +**Required fields:** +- `type` - Always `'noop'` +- `name` - Unique identifier +- `virtualEmits` - Topics shown in Workbench +- `virtualSubscribes` - Topics shown in Workbench + +**No handler needed** - NOOP Steps don't execute code. + +--- + +## Handler Context + +Every handler gets a context object (`ctx` in TypeScript/JavaScript, `context` in Python) with these tools. + +### emit + +Trigger other Steps by publishing events. + + + ```typescript -interface StateManager { - get(groupId: string, key: string): Promise - set(groupId: string, key: string, value: T): Promise - delete(groupId: string, key: string): Promise - getGroup(groupId: string): Promise - clear(groupId: string): Promise -} +// Standard emit +await emit({ + topic: 'order.created', + data: { orderId: '123', total: 99.99 } +}) + +// FIFO queue emit (when subscriber uses queue.type: 'fifo') +await emit({ + topic: 'order.processing', + data: { orderId: '123', items: [...] }, + messageGroupId: 'user-456' // Required for FIFO queues +}) ``` -### StreamsManager +**FIFO queues:** When emitting to a topic that has a FIFO queue subscriber, you **must** include `messageGroupId`. Messages with the same `messageGroupId` are processed sequentially. Different groups are processed in parallel. -```typescript -interface MotiaStream { - get(groupId: string, id: string): Promise | null> - set(groupId: string, id: string, data: TData): Promise> - delete(groupId: string, id: string): Promise | null> - getGroup(groupId: string): Promise[]> - send(channel: StateStreamEventChannel, event: StateStreamEvent): Promise -} + + + +```javascript +// Standard emit +await emit({ + topic: 'order.created', + data: { orderId: '123', total: 99.99 } +}) + +// FIFO queue emit (when subscriber uses queue.type: 'fifo') +await emit({ + topic: 'order.processing', + data: { orderId: '123', items: [...] }, + messageGroupId: 'user-456' // Required for FIFO queues +}) ``` +**FIFO queues:** When emitting to a topic that has a FIFO queue subscriber, you **must** include `messageGroupId`. Messages with the same `messageGroupId` are processed sequentially. Different groups are processed in parallel. + -### Context - ```python -class Context: - # Event emission - async def emit(self, event: dict) -> None - - # Structured logging - logger: Logger - - # State management - state: StateManager - - # Real-time streams - streams: StreamsManager - - # Request tracing - trace_id: str +# Standard emit +await context.emit({ + "topic": "order.created", + "data": {"order_id": "123", "total": 99.99} +}) + +# FIFO queue emit (when subscriber uses queue.type: 'fifo') +await context.emit({ + "topic": "order.processing", + "data": {"order_id": "123", "items": [...]}, + "messageGroupId": "user-456" # Required for FIFO queues +}) ``` -### Logger +**FIFO queues:** When emitting to a topic that has a FIFO queue subscriber, you **must** include `messageGroupId`. Messages with the same `messageGroupId` are processed sequentially. Different groups are processed in parallel. -```python -class Logger: - def info(self, message: str, metadata: dict = None) -> None - def error(self, message: str, metadata: dict = None) -> None - def warn(self, message: str, metadata: dict = None) -> None - def debug(self, message: str, metadata: dict = None) -> None + + + +The `data` must match the `input` schema of Steps subscribing to that topic. + +--- + +### logger + +Structured logging with automatic trace ID correlation. + + + + +```typescript +logger.info('User created', { userId: '123', email: 'user@example.com' }) +logger.warn('Rate limit approaching', { current: 95, limit: 100 }) +logger.error('Payment failed', { error: err.message, orderId: '456' }) +logger.debug('Cache miss', { key: 'user:123' }) ``` -### StateManager + + -```python -class StateManager: - async def get(self, group_id: str, key: str) -> Any | None - async def set(self, group_id: str, key: str, value: Any) -> Any - async def delete(self, group_id: str, key: str) -> Any | None - async def get_group(self, group_id: str) -> list[Any] - async def clear(self, group_id: str) -> None +```javascript +logger.info('User created', { userId: '123', email: 'user@example.com' }) +logger.warn('Rate limit approaching', { current: 95, limit: 100 }) +logger.error('Payment failed', { error: err.message, orderId: '456' }) +logger.debug('Cache miss', { key: 'user:123' }) ``` -### StreamsManager + + ```python -class MotiaStream: - async def get(self, group_id: str, id: str) -> dict | None - async def set(self, group_id: str, id: str, data: dict) -> dict - async def delete(self, group_id: str, id: str) -> dict | None - async def get_group(self, group_id: str) -> list[dict] - async def send(self, channel: dict, event: dict) -> None +context.logger.info("User created", {"user_id": "123", "email": "user@example.com"}) +context.logger.warn("Rate limit approaching", {"current": 95, "limit": 100}) +context.logger.error("Payment failed", {"error": str(err), "order_id": "456"}) +context.logger.debug("Cache miss", {"key": "user:123"}) ``` ---- +All logs are automatically tagged with: +- Timestamp +- Step name +- Trace ID +- Any metadata you pass -## Handler Types +👉 [Learn more about Observability →](/docs/development-guide/observability) - - +--- -### API Handler +### state -```typescript -type ApiRouteHandler< - TRequestBody = unknown, - TResponseBody extends ApiResponse = ApiResponse, - TEmitData = never -> = ( - req: ApiRequest, - ctx: FlowContext -) => Promise -``` +Persistent key-value storage shared across Steps. -### Event Handler + + ```typescript -type EventHandler< - TInput = unknown, - TEmitData = never -> = ( - input: TInput, - ctx: FlowContext -) => Promise -``` +// Store data +await state.set('users', 'user-123', { name: 'Alice', email: 'alice@example.com' }) -### Cron Handler +// Retrieve data +const user = await state.get('users', 'user-123') -```typescript -type CronHandler = ( - ctx: FlowContext -) => Promise -``` +// Get all items in a group +const allUsers = await state.getGroup('users') -### Middleware +// Delete an item +await state.delete('users', 'user-123') -```typescript -type ApiMiddleware = ( - req: ApiRequest, - ctx: FlowContext, - next: () => Promise -) => Promise +// Clear entire group +await state.clear('users') ``` - + -### API Handler +```javascript +// Store data +await state.set('users', 'user-123', { name: 'Alice', email: 'alice@example.com' }) -```python -async def handler( - req: dict, # Contains: body, headers, pathParams, queryParams - context: Context -) -> dict # Returns: {"status": int, "body": dict, "headers": dict (optional)} -``` +// Retrieve data +const user = await state.get('users', 'user-123') -### Event Handler +// Get all items in a group +const allUsers = await state.getGroup('users') -```python -async def handler( - input_data: dict, # Data from the emitted event - context: Context -) -> None +// Delete an item +await state.delete('users', 'user-123') + +// Clear entire group +await state.clear('users') ``` -### Cron Handler + + ```python -async def handler( - context: Context -) -> None -``` +# Store data +await context.state.set("users", "user-123", {"name": "Alice", "email": "alice@example.com"}) -### Middleware +# Retrieve data +user = await context.state.get("users", "user-123") -```python -async def middleware( - req: dict, - context: Context, - next_fn: Callable -) -> dict # Returns: {"status": int, "body": dict, "headers": dict (optional)} +# Get all items in a group +all_users = await context.state.get_group("users") + +# Delete an item +await context.state.delete("users", "user-123") + +# Clear entire group +await context.state.clear("users") ``` +**Methods:** + +- `get(groupId, key)` - Returns the value or `null` +- `set(groupId, key, value)` - Stores and returns the value +- `delete(groupId, key)` - Removes and returns the value (or `null`) +- `getGroup(groupId)` - Returns array of all values in the group +- `clear(groupId)` - Removes all items in the group + +👉 [Learn more about State →](/docs/development-guide/state-management) + --- -## Request & Response Types +### streams - - +Real-time data channels for pushing updates to connected clients. -### ApiRequest + + ```typescript -interface ApiRequest { - pathParams: Record - queryParams: Record - body: TBody - headers: Record -} +// Set a stream item (create or update) +await streams.chatMessages.set('room-123', 'msg-456', { + text: 'Hello!', + author: 'Alice', + timestamp: new Date().toISOString() +}) + +// Get a specific item +const message = await streams.chatMessages.get('room-123', 'msg-456') + +// Get all items in a group +const messages = await streams.chatMessages.getGroup('room-123') + +// Delete an item +await streams.chatMessages.delete('room-123', 'msg-456') + +// Send ephemeral event (doesn't create an item) +await streams.chatMessages.send( + { groupId: 'room-123' }, + { type: 'user.typing', data: { userId: 'alice' } } +) ``` -### ApiResponse - -```typescript -interface ApiResponse { - status: TStatus - body: TBody - headers?: Record -} + + + +```javascript +// Set a stream item (create or update) +await streams.chatMessages.set('room-123', 'msg-456', { + text: 'Hello!', + author: 'Alice', + timestamp: new Date().toISOString() +}) + +// Get a specific item +const message = await streams.chatMessages.get('room-123', 'msg-456') + +// Get all items in a group +const messages = await streams.chatMessages.getGroup('room-123') + +// Delete an item +await streams.chatMessages.delete('room-123', 'msg-456') + +// Send ephemeral event (doesn't create an item) +await streams.chatMessages.send( + { groupId: 'room-123' }, + { type: 'user.typing', data: { userId: 'alice' } } +) ``` -### EmitEvent + + -```typescript -interface EmitEvent { - topic: string - data: TData -} +```python +# Set a stream item (create or update) +await context.streams.chatMessages.set("room-123", "msg-456", { + "text": "Hello!", + "author": "Alice", + "timestamp": datetime.now().isoformat() +}) + +# Get a specific item +message = await context.streams.chatMessages.get("room-123", "msg-456") + +# Get all items in a group +messages = await context.streams.chatMessages.getGroup("room-123") + +# Delete an item +await context.streams.chatMessages.delete("room-123", "msg-456") + +# Send ephemeral event (doesn't create an item) +await context.streams.chatMessages.send( + {"groupId": "room-123"}, + {"type": "user.typing", "data": {"user_id": "alice"}} +) ``` - + -### Request +**Methods:** -```python -req = { - "pathParams": dict[str, str], - "queryParams": dict[str, str | list[str]], - "body": dict | list, - "headers": dict[str, str | list[str]] +- `set(groupId, id, data)` - Create or update an item (returns the full item with metadata) +- `get(groupId, id)` - Retrieve an item or `null` +- `getGroup(groupId)` - Get all items in a group +- `delete(groupId, id)` - Remove an item +- `send(channel, event)` - Send an ephemeral event (e.g., typing indicators, reactions) + +👉 [Learn more about Streams →](/docs/development-guide/streams) + +--- + +### traceId + +Unique ID for tracking requests across Steps. + + + + +```typescript +export const handler: Handlers['MyStep'] = async (req, { traceId, logger }) => { + logger.info('Processing request', { traceId }) + return { status: 200, body: { traceId } } } ``` -### Response + + -```python -response = { - "status": int, - "body": dict | list, - "headers": dict[str, str | list[str]] # Optional +```javascript +const handler = async (req, { traceId, logger }) => { + logger.info('Processing request', { traceId }) + return { status: 200, body: { traceId } } } ``` -### Emit Event + + ```python -event = { - "topic": str, - "data": dict | list -} +async def handler(req, context): + context.logger.info("Processing request", {"trace_id": context.trace_id}) + return {"status": 200, "body": {"trace_id": context.trace_id}} ``` +The trace ID is automatically generated for each request and passed through all Steps in the workflow. Use it to correlate logs, state, and events. + --- -## Stream Configuration +## Handlers - - +Handlers are the functions that execute your business logic. The signature depends on the Step type. + +### API Step Handler + +Receives a request, returns a response. -### StreamConfig + + ```typescript -interface StreamConfig { - name: string - schema: ZodInput - baseConfig: { - storageType: 'default' +import { Handlers } from 'motia' + +export const handler: Handlers['CreateUser'] = async (req, ctx) => { + const { name, email } = req.body + const userId = crypto.randomUUID() + + await ctx.emit({ + topic: 'user.created', + data: { userId, email } + }) + + return { + status: 201, + body: { id: userId, name, email }, + headers: { 'X-Request-ID': ctx.traceId } // Optional } } ``` +**Parameters:** +- `req` - Request object (see below) +- `ctx` - Context object with `emit`, `logger`, `state`, `streams`, `traceId` + +**Returns:** `{ status, body, headers? }` + - + + +```javascript +const handler = async (req, ctx) => { + const { name, email } = req.body + const userId = crypto.randomUUID() + + await ctx.emit({ + topic: 'user.created', + data: { userId, email } + }) + + return { + status: 201, + body: { id: userId, name, email }, + headers: { 'X-Request-ID': ctx.traceId } // Optional + } +} +``` + +**Parameters:** +- `req` - Request object (see below) +- `ctx` - Context object with `emit`, `logger`, `state`, `streams`, `traceId` -### Stream Config +**Returns:** `{ status, body, headers? }` + + + ```python -config = { - "name": str, - "schema": dict, # JSON Schema or Pydantic model schema - "baseConfig": { - "storageType": "default" +import uuid + +async def handler(req, context): + name = req.get("body", {}).get("name") + email = req.get("body", {}).get("email") + user_id = str(uuid.uuid4()) + + await context.emit({ + "topic": "user.created", + "data": {"user_id": user_id, "email": email} + }) + + return { + "status": 201, + "body": {"id": user_id, "name": name, "email": email}, + "headers": {"X-Request-ID": context.trace_id} # Optional } -} ``` +**Parameters:** +- `req` - Dictionary with `body`, `headers`, `pathParams`, `queryParams` +- `context` - Context object with `emit`, `logger`, `state`, `streams`, `trace_id` + +**Returns:** `{"status": int, "body": dict, "headers": dict}` + --- -## Utility Types +### Event Step Handler - - +Receives event data, processes it. No return value. -### Emit + + ```typescript -type Emit = string | { - topic: string - label?: string - conditional?: boolean +import { Handlers } from 'motia' + +export const handler: Handlers['ProcessOrder'] = async (input, ctx) => { + const { orderId, amount } = input + + ctx.logger.info('Processing order', { orderId, amount }) + + await ctx.state.set('orders', orderId, { + id: orderId, + amount, + status: 'processed' + }) + + await ctx.emit({ + topic: 'order.processed', + data: { orderId } + }) } ``` -### QueryParam +**Parameters:** +- `input` - Event data (matches the `input` schema in config) +- `ctx` - Context object with `emit`, `logger`, `state`, `streams`, `traceId` -```typescript -interface QueryParam { - name: string - description: string +**Returns:** Nothing (void/None) + + + + +```javascript +const handler = async (input, ctx) => { + const { orderId, amount } = input + + ctx.logger.info('Processing order', { orderId, amount }) + + await ctx.state.set('orders', orderId, { + id: orderId, + amount, + status: 'processed' + }) + + await ctx.emit({ + topic: 'order.processed', + data: { orderId } + }) } ``` -### StreamItem +**Parameters:** +- `input` - Event data (matches the `input` schema in config) +- `ctx` - Context object with `emit`, `logger`, `state`, `streams`, `traceId` -```typescript -interface BaseStreamItem { - groupId: string - id: string - data: TData - createdAt: string - updatedAt: string -} +**Returns:** Nothing (void/None) + + + + +```python +async def handler(input_data, context): + order_id = input_data.get("order_id") + amount = input_data.get("amount") + + context.logger.info("Processing order", {"order_id": order_id, "amount": amount}) + + await context.state.set("orders", order_id, { + "id": order_id, + "amount": amount, + "status": "processed" + }) + + await context.emit({ + "topic": "order.processed", + "data": {"order_id": order_id} + }) ``` +**Parameters:** +- `input_data` - Event data (matches the `input` schema in config) +- `context` - Context object with `emit`, `logger`, `state`, `streams`, `trace_id` + +**Returns:** Nothing (None) + --- -## What's Next? +### Cron Step Handler - - - Learn how to use these types in your Steps - +Runs on a schedule. Only receives context. + + + + +```typescript +import { Handlers } from 'motia' + +export const handler: Handlers['DailyCleanup'] = async (ctx) => { + ctx.logger.info('Running daily cleanup') - - Deep dive into the State API - + const oldOrders = await ctx.state.getGroup('orders') + const cutoff = Date.now() - (30 * 24 * 60 * 60 * 1000) // 30 days ago + + for (const order of oldOrders) { + if (order.createdAt < cutoff) { + await ctx.state.delete('orders', order.id) + } + } +} +``` + +**Parameters:** +- `ctx` - Context object with `emit`, `logger`, `state`, `streams`, `traceId` + +**Returns:** Nothing (void/None) + + + + +```javascript +const handler = async (ctx) => { + ctx.logger.info('Running daily cleanup') + + const oldOrders = await ctx.state.getGroup('orders') + const cutoff = Date.now() - (30 * 24 * 60 * 60 * 1000) // 30 days ago - - Learn about real-time streaming + for (const order of oldOrders) { + if (order.createdAt < cutoff) { + await ctx.state.delete('orders', order.id) + } + } +} +``` + +**Parameters:** +- `ctx` - Context object with `emit`, `logger`, `state`, `streams`, `traceId` + +**Returns:** Nothing (void/None) + + + + +```python +from datetime import datetime, timedelta + +async def handler(context): + context.logger.info("Running daily cleanup") + + old_orders = await context.state.get_group("orders") + cutoff = (datetime.now() - timedelta(days=30)).timestamp() + + for order in old_orders: + if order.get("created_at") < cutoff: + await context.state.delete("orders", order.get("id")) +``` + +**Parameters:** +- `context` - Context object with `emit`, `logger`, `state`, `streams`, `trace_id` + +**Returns:** Nothing (None) + + + + +--- + +### Middleware + +Intercepts API requests before and after the handler. + + + + +```typescript +import { ApiMiddleware } from 'motia' + +export const authMiddleware: ApiMiddleware = async (req, ctx, next) => { + const token = req.headers.authorization + + if (!token) { + return { status: 401, body: { error: 'Unauthorized' } } + } + + // Verify token, attach user to request... + + return await next() // Continue to next middleware or handler +} +``` + +**Parameters:** +- `req` - Request object +- `ctx` - Context object +- `next` - Function to call the next middleware/handler + +**Returns:** Response object + + + + +```javascript +const authMiddleware = async (req, ctx, next) => { + const token = req.headers.authorization + + if (!token) { + return { status: 401, body: { error: 'Unauthorized' } } + } + + // Verify token, attach user to request... + + return await next() // Continue to next middleware or handler +} +``` + +**Parameters:** +- `req` - Request object +- `ctx` - Context object +- `next` - Function to call the next middleware/handler + +**Returns:** Response object + + + + +```python +async def auth_middleware(req, context, next_fn): + token = req.get("headers", {}).get("authorization") + + if not token: + return {"status": 401, "body": {"error": "Unauthorized"}} + + # Verify token, attach user to request... + + return await next_fn() # Continue to next middleware or handler +``` + +**Parameters:** +- `req` - Request dictionary +- `context` - Context object +- `next_fn` - Function to call the next middleware/handler + +**Returns:** Response dictionary + + + + +👉 [Learn more about Middleware →](/docs/development-guide/middleware) + +--- + +## Request Object + +API handlers receive a request object with these fields. + + + + +```typescript +export const handler: Handlers['GetUser'] = async (req, ctx) => { + // Path parameters (from /users/:id) + const userId = req.pathParams.id + + // Query parameters (?page=1&limit=10) + const page = req.queryParams.page // string or string[] + const limit = req.queryParams.limit + + // Request body + const { name, email } = req.body + + // Headers + const auth = req.headers.authorization + const userAgent = req.headers['user-agent'] + + return { status: 200, body: { userId, name } } +} +``` + +**Fields:** +- `pathParams` - Object with path parameters (e.g., `:id` from `/users/:id`) +- `queryParams` - Object with query string params (values can be string or array) +- `body` - Parsed request body (validated against `bodySchema` if defined) +- `headers` - Object with request headers (values can be string or array) + + + + +```javascript +const handler = async (req, ctx) => { + // Path parameters (from /users/:id) + const userId = req.pathParams.id + + // Query parameters (?page=1&limit=10) + const page = req.queryParams.page // string or string[] + const limit = req.queryParams.limit + + // Request body + const { name, email } = req.body + + // Headers + const auth = req.headers.authorization + const userAgent = req.headers['user-agent'] + + return { status: 200, body: { userId, name } } +} +``` + +**Fields:** +- `pathParams` - Object with path parameters (e.g., `:id` from `/users/:id`) +- `queryParams` - Object with query string params (values can be string or array) +- `body` - Parsed request body (validated against `bodySchema` if defined) +- `headers` - Object with request headers (values can be string or array) + + + + +```python +async def handler(req, context): + # Path parameters (from /users/:id) + user_id = req.get("pathParams", {}).get("id") + + # Query parameters (?page=1&limit=10) + page = req.get("queryParams", {}).get("page") # str or list[str] + limit = req.get("queryParams", {}).get("limit") + + # Request body + body = req.get("body", {}) + name = body.get("name") + email = body.get("email") + + # Headers + auth = req.get("headers", {}).get("authorization") + user_agent = req.get("headers", {}).get("user-agent") + + return {"status": 200, "body": {"user_id": user_id, "name": name}} +``` + +**Fields:** +- `pathParams` - Dictionary with path parameters +- `queryParams` - Dictionary with query params (values can be str or list) +- `body` - Dictionary with parsed request body +- `headers` - Dictionary with request headers (values can be str or list) + + + + +--- + +## Response Object + +API handlers must return an object with these fields. + + + + +```typescript +return { + status: 200, // Required: HTTP status code + body: { id: '123', name: 'Alice' }, // Required: response data + headers: { // Optional: custom response headers + 'Cache-Control': 'max-age=3600', + 'X-Custom-Header': 'value' + } +} +``` + + + + +```javascript +return { + status: 200, // Required: HTTP status code + body: { id: '123', name: 'Alice' }, // Required: response data + headers: { // Optional: custom response headers + 'Cache-Control': 'max-age=3600', + 'X-Custom-Header': 'value' + } +} +``` + + + + +```python +return { + "status": 200, # Required: HTTP status code + "body": {"id": "123", "name": "Alice"}, # Required: response data + "headers": { # Optional: custom response headers + "Cache-Control": "max-age=3600", + "X-Custom-Header": "value" + } +} +``` + + + + +**Fields:** +- `status` - HTTP status code (200, 201, 400, 404, 500, etc.) +- `body` - Response data (will be JSON-encoded automatically) +- `headers` - Optional custom headers + +--- + +## Stream Configuration + +Define real-time data streams for your app. + + + + +```typescript title="steps/chat-messages.stream.ts" +import { StreamConfig } from 'motia' +import { z } from 'zod' + +export const config: StreamConfig = { + name: 'chatMessages', + schema: z.object({ + text: z.string(), + author: z.string(), + timestamp: z.string() + }), + baseConfig: { + storageType: 'default' + } +} +``` + + + + +```javascript title="steps/chat-messages.stream.js" +const { z } = require('zod') + +export const config = { + name: 'chatMessages', + schema: z.object({ + text: z.string(), + author: z.string(), + timestamp: z.string() + }), + baseConfig: { + storageType: 'default' + } +} +``` + + + + +```python title="steps/chat_messages_stream.py" +from pydantic import BaseModel + +class ChatMessage(BaseModel): + text: str + author: str + timestamp: str + +config = { + "name": "chatMessages", + "schema": ChatMessage.model_json_schema(), + "baseConfig": { + "storageType": "default" + } +} +``` + + + + +**Fields:** +- `name` - Unique stream name (used in `ctx.streams.`) +- `schema` - Zod schema (TS/JS) or JSON Schema (Python) for data validation +- `baseConfig.storageType` - Always `'default'` (custom storage coming soon) + +File naming: +- TypeScript/JavaScript: `*.stream.ts` or `*.stream.js` +- Python: `*_stream.py` + +--- + +## CLI Commands + +Motia's command-line tools for development and deployment. + +### `motia version` + +Show Motia CLI version. + +```bash +motia version +motia -V +motia --version +``` + +--- + +### `motia create` + +Create a new Motia project. + +```bash +npx motia create my-app +npx motia create . # Use current directory +npx motia create --template python my-python-app +``` + +**Options:** +- `[name]` - Project name (or `.` for current directory) +- `-t, --template ` - Template to use (`nodejs` or `python`) +- `-c, --cursor` - Add Cursor IDE rules + +--- + +### `motia rules pull` + +Install AI development guides (AGENTS.md, CLAUDE.md) and Cursor IDE rules. + +```bash +motia rules pull +motia rules pull --force # Overwrite existing files +``` + +**Options:** +- `-f, --force` - Overwrite existing files + +--- + +### `motia dev` + +Start development server with Workbench and hot reload. + +```bash +npm run dev +# or +motia dev --port 4000 --host 0.0.0.0 +``` + +**Options:** +- `-p, --port ` - Port number (default: 3000) +- `-H, --host
` - Host address (default: localhost) +- `-d, --debug` - Enable debug logging +- `-m, --mermaid` - Generate Mermaid diagrams +- `--motia-dir ` - Custom path for `.motia` folder + +--- + +### `motia start` + +Start production server (no Workbench, no hot reload). + +```bash +motia start +motia start --port 8080 --host 0.0.0.0 +``` + +**Options:** +- `-p, --port ` - Port number (default: 3000) +- `-H, --host
` - Host address (default: localhost) +- `-d, --debug` - Enable debug logging +- `--motia-dir ` - Custom path for `.motia` folder + +--- + +### `motia build` + +Build your project for deployment. + +```bash +motia build +``` + +Compiles all Steps and generates deployment artifacts. + +--- + +### `motia generate-types` + +Generate TypeScript types from your Step configs. + +```bash +motia generate-types +``` + +Creates `types.d.ts` with type-safe `Handlers` interface. Run this after changing Step configs. + +--- + +### `motia generate step` + +Create a new Step interactively. + +```bash +motia generate step +motia generate step --dir users/create-user +``` + +**Options:** +- `-d, --dir ` - Path relative to `steps/` directory + +--- + +### `motia generate openapi` + +Generate OpenAPI specification. + +```bash +motia generate openapi +motia generate openapi --output api-spec.json --title "My API" +``` + +**Options:** +- `-t, --title ` - API title (default: package.json name) +- `-v, --version <version>` - API version (default: 1.0.0) +- `-o, --output <file>` - Output file (default: openapi.json) + +--- + +### `motia install` + +Set up Python virtual environment and install dependencies. + +```bash +motia install +npm run dev # Auto-runs motia install via postinstall hook +``` + +--- + +### `motia emit` + +Manually emit an event (for testing). + +```bash +motia emit --topic user.created --message '{"userId":"123"}' +motia emit --topic order.created --message '{"orderId":"456"}' --port 3000 +``` + +**Options:** +- `--topic <topic>` - Event topic name +- `--message <json>` - Event data as JSON string +- `-p, --port <number>` - Server port (default: 3000) + +--- + +### `motia docker setup` + +Generate Dockerfile and .dockerignore. + +```bash +motia docker setup +``` + +--- + +### `motia docker build` + +Build Docker image. + +```bash +motia docker build +motia docker build --project-name my-app +``` + +--- + +### `motia docker run` + +Build and run Docker container. + +```bash +motia docker run +motia docker run --port 8080 --skip-build +``` + +**Options:** +- `-p, --port <number>` - Host port to map (default: 3000) +- `-n, --project-name <name>` - Docker image name +- `-s, --skip-build` - Skip building the image + +--- + +### `motia cloud deploy` + +Deploy to Motia Cloud. + +```bash +motia cloud deploy -k YOUR_API_KEY -v v1.0.0 +motia cloud deploy --api-key YOUR_API_KEY --version-name v1.2.0 --environment-name production +``` + +**Options:** +- `-k, --api-key <key>` - Motia Cloud API key (or set `MOTIA_API_KEY` env var) +- `-v, --version-name <version>` - Version name/tag for this deployment +- `-n, --project-name <name>` - Project name (for new projects) +- `-s, --environment-id <id>` - Environment ID +- `--environment-name <name>` - Environment name +- `-e, --env-file <path>` - Path to environment variables file +- `-d, --version-description <desc>` - Version description +- `-c, --ci` - CI mode (non-interactive) + +--- + +## Common Patterns + +### Emit Types + +You can emit topics as strings or objects with labels. + +<Tabs items={['TypeScript', 'JavaScript', 'Python']}> +<Tab value='TypeScript'> + +```typescript +// Simple emit +emits: ['user.created', 'email.sent'] + +// With labels and conditional flags +emits: [ + { topic: 'order.approved', label: 'Auto-approved' }, + { topic: 'order.rejected', label: 'Requires review', conditional: true } +] +``` + +</Tab> +<Tab value='JavaScript'> + +```javascript +// Simple emit +emits: ['user.created', 'email.sent'] + +// With labels and conditional flags +emits: [ + { topic: 'order.approved', label: 'Auto-approved' }, + { topic: 'order.rejected', label: 'Requires review', conditional: true } +] +``` + +</Tab> +<Tab value='Python'> + +```python +# Simple emit +"emits": ["user.created", "email.sent"] + +# With labels and conditional flags +"emits": [ + {"topic": "order.approved", "label": "Auto-approved"}, + {"topic": "order.rejected", "label": "Requires review", "conditional": True} +] +``` + +</Tab> +</Tabs> + +The `label` and `conditional` fields are for Workbench visualization only. They don't affect execution. + +--- + +### Query Parameters + +Document query params for Workbench. + +```typescript +queryParams: [ + { name: 'page', description: 'Page number for pagination' }, + { name: 'limit', description: 'Number of items per page' }, + { name: 'sort', description: 'Sort field (e.g., createdAt, name)' } +] +``` + +This shows up in the Workbench endpoint tester. + +--- + +### Include Files + +Bundle files with your Step (useful for templates, assets, binaries). + +```typescript +// Relative to the Step file +includeFiles: [ + './templates/email.html', + './assets/*.png', + '../../lib/stockfish' +] +``` + +Files are copied into the deployment bundle and accessible at runtime. + +--- + +## What's Next? + +<Cards> + <Card href="/docs/concepts/steps" title="Steps"> + Learn how to build with Steps + </Card> + + <Card href="/docs/development-guide/state-management" title="State Management"> + Deep dive into the State API + </Card> + + <Card href="/docs/development-guide/streams" title="Streams"> + Real-time streaming guide + </Card> + + <Card href="/docs/development-guide/middleware" title="Middleware"> + Request/response middleware patterns </Card> - <Card href="/docs/examples" title="💡 Examples"> + <Card href="/docs/examples" title="Examples"> See these APIs in action </Card> </Cards>