Skip to content

Commit fd9ea41

Browse files
authored
Merge pull request #2 from trycortexai/stream-response-support
feat: add streamResponse
2 parents 62c7289 + 35049c4 commit fd9ea41

File tree

9 files changed

+294
-40
lines changed

9 files changed

+294
-40
lines changed

examples/hono/src/index.ts

+25-12
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,19 @@ import dotenv from 'dotenv';
44
import {Hono, type Context} from 'hono';
55
import {env} from 'hono/adapter';
66

7-
import {stepStream, workflowStream} from './stream.js';
7+
import {stepStreamResponse, worklflowStreamResponse} from './stream.js';
88

99
dotenv.config();
1010

1111
const PORT = 8085;
1212

1313
const app = new Hono();
1414

15-
const createCortex = (c: Context) => {
15+
const createCortex = (ctx: Context) => {
1616
const {CORTEX_API_KEY, BASE_URL} = env<{
1717
CORTEX_API_KEY: string;
1818
BASE_URL: string;
19-
}>(c);
19+
}>(ctx);
2020

2121
const cortex = new Cortex({
2222
apiKey: CORTEX_API_KEY,
@@ -30,17 +30,30 @@ app.get('/', c => {
3030
return c.text('Cortex SDK');
3131
});
3232

33-
app.get('/stream/step', c => {
34-
const cortex = createCortex(c);
35-
stepStream(cortex);
36-
return c.text('Stream Step');
33+
app.get('/stream/step', async ctx => {
34+
const cortex = createCortex(ctx);
35+
36+
const response = await stepStreamResponse(cortex);
37+
38+
if (response instanceof Response) {
39+
return response;
40+
}
41+
42+
return ctx.text(response);
3743
});
3844

39-
app.get('/stream/workflow', c => {
40-
const {WORKFLOW_ID} = env<{WORKFLOW_ID: string}>(c);
41-
const cortex = createCortex(c);
42-
workflowStream(cortex, WORKFLOW_ID);
43-
return c.text('Stream Workflow');
45+
app.get('/stream/workflow', async ctx => {
46+
const {WORKFLOW_ID} = env<{WORKFLOW_ID: string}>(ctx);
47+
48+
const cortex = createCortex(ctx);
49+
50+
const response = await worklflowStreamResponse(cortex, WORKFLOW_ID);
51+
52+
if (response instanceof Response) {
53+
return response;
54+
}
55+
56+
return ctx.text(response);
4457
});
4558

4659
console.log(`Server is running on http://localhost:${PORT}`);

examples/hono/src/stream.ts

+15-12
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ import {
44
type StepModelOutputSchema,
55
} from '@cortex-ai/sdk';
66

7-
export const stepStream = async (cortex: Cortex) => {
7+
export const stepStreamResponse = async (cortex: Cortex) => {
88
try {
9-
const result = await cortex.apps.runs.step.stream(
9+
const result = await cortex.apps.runs.step.streamResponse(
1010
{
1111
step: {
1212
type: 'model',
@@ -24,45 +24,48 @@ export const stepStream = async (cortex: Cortex) => {
2424
},
2525
},
2626
{
27-
onStream: partial => {
27+
onStream: (partial, event) => {
2828
const stepOutput = partial.output as
2929
| StepModelOutputSchema
3030
| undefined;
3131

32-
console.log('Chunk Message:', stepOutput?.message);
32+
console.log(event, stepOutput?.message);
3333
},
3434
},
3535
);
3636

37-
const stepOutput = result.output as StepModelOutputSchema | undefined;
38-
39-
console.log('\n\nFull Message: ', stepOutput?.message);
37+
return result;
4038
} catch (error) {
4139
console.log('Error', error);
40+
return error?.toString() ?? 'Error';
4241
}
4342
};
4443

45-
export const workflowStream = async (cortex: Cortex, workflowId: string) => {
44+
export const worklflowStreamResponse = async (
45+
cortex: Cortex,
46+
workflowId: string,
47+
) => {
4648
try {
47-
const result = await cortex.apps.workflows.runs.stream(
49+
const result = await cortex.apps.workflows.runs.streamResponse(
4850
workflowId,
4951
{
5052
input: {
5153
message: 'Hello, World!',
5254
},
5355
},
5456
{
55-
onStream: partial => {
57+
onStream: (partial, event) => {
5658
const stepOutput = partial.output
5759
?.MODEL as CastRunStepOutputSchema<StepModelOutputSchema>;
5860

59-
console.log('chunk', stepOutput?.output?.message);
61+
console.log(event, stepOutput?.output?.message);
6062
},
6163
},
6264
);
6365

64-
console.log('\n\nresult: ', JSON.stringify(result, null, 2));
66+
return result;
6567
} catch (error) {
6668
console.log('Error', error);
69+
return error?.toString() ?? 'Error';
6770
}
6871
};

openapi.utils.ts

+11-2
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ type MethodOptions = {
8686
body?: string;
8787
query?: string;
8888
returns: string;
89+
finalReturns?: string;
8990
description?: string;
9091
};
9192

@@ -144,6 +145,13 @@ export const generateAPIMethods = (schema: OpenAPI3): string => {
144145
name: 'stream',
145146
stream: true,
146147
});
148+
149+
methodsToPush.push({
150+
...apiMethod,
151+
name: 'streamResponse',
152+
finalReturns: 'Response',
153+
stream: true,
154+
});
147155
}
148156

149157
//
@@ -379,6 +387,7 @@ const createMethod = ({
379387
body,
380388
stream,
381389
returns,
390+
finalReturns,
382391
}: MethodOptions) => {
383392
const params = Object.entries(parameters);
384393

@@ -412,12 +421,12 @@ const createMethod = ({
412421

413422
const parsedMethod = `(${allParameters
414423
.map(([key, value]) => `${convertToCamelCase(key)}: ${value}`)
415-
.join(', ')}): Promise<${returns}> => {
424+
.join(', ')}): Promise<${finalReturns ?? returns}> => {
416425
return callAPI(${encodeParam(name)}, {
417426
method: ${encodeParam(method)},
418427
endpoint: ${encodeParam(endpoint)},
419428
${paramsString}${body ? `body,` : ''}options,
420-
}) as Promise<${returns}>;
429+
}) as Promise<${finalReturns ?? returns}>;
421430
}`;
422431

423432
return parsedMethod;

src/classes/Cortex.ts

+26-8
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
11
import {APP_LESS_PARAM} from '../constants/api';
22
import {STREAM_PARSERS} from '../constants/stream';
3-
import {
4-
APIMethodRequest,
5-
APIMethods,
6-
createAPI,
7-
paths,
8-
} from '../generated/openapi';
3+
import {APIMethodRequest, APIMethods, createAPI} from '../generated/openapi';
94
import {APIFetchClient, ClientOptions, ErrorResponse} from '../types/api';
105
import {createAPIFetchClient, readSSE} from '../utils/api';
116
import {getObjectProperty} from '../utils/object';
127
import {CortexAPIError} from './CortexAPIError';
8+
import {HttpStream} from './HttpStream';
139

1410
export class Cortex {
1511
private readonly apiMethods: APIMethods;
@@ -64,7 +60,11 @@ export class Cortex {
6460
const isPagination = name === 'list';
6561

6662
const streamParser = isStream
67-
? STREAM_PARSERS[endpoint as keyof paths]
63+
? (STREAM_PARSERS[endpoint as keyof typeof STREAM_PARSERS] as (
64+
stream: unknown,
65+
event: string,
66+
data: unknown,
67+
) => unknown)
6868
: null;
6969

7070
if (isStream && isPagination) {
@@ -120,17 +120,35 @@ export class Cortex {
120120
}
121121

122122
let resultStream: unknown = null;
123+
const httpStream = name === 'streamResponse' ? new HttpStream() : null;
123124

124-
await readSSE(response, (event, data) => {
125+
const sse = readSSE(response, (event, data) => {
125126
if (!event) {
126127
throw new Error(`Event is missing in stream for ${endpoint}`);
127128
}
128129

130+
httpStream?.writeSSE({
131+
event,
132+
data: data as object,
133+
});
134+
129135
resultStream = streamParser?.(resultStream, event, data);
130136

131137
onStream?.(resultStream, event, data);
132138
});
133139

140+
if (httpStream) {
141+
(async () => {
142+
await sse;
143+
144+
httpStream.close();
145+
})();
146+
147+
return httpStream.getResponse();
148+
}
149+
150+
await sse;
151+
134152
return resultStream;
135153
}
136154

src/classes/HttpStream.ts

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
export interface SSEMessage {
2+
data: string | object;
3+
event?: string;
4+
id?: string;
5+
}
6+
7+
export class HttpStream {
8+
private _writer: WritableStreamDefaultWriter<Uint8Array>;
9+
10+
private _readable: ReadableStream;
11+
12+
textEncoder: InstanceType<typeof TextEncoder>;
13+
14+
constructor() {
15+
const {readable, writable} = new TransformStream();
16+
17+
this._readable = readable;
18+
this._writer = writable.getWriter();
19+
this.textEncoder = new TextEncoder();
20+
21+
//
22+
}
23+
24+
getResponse() {
25+
return new Response(this._readable, {
26+
headers: {
27+
'Transfer-Encoding': 'chunked',
28+
'Content-Type': 'text/event-stream',
29+
'Cache-Control': 'no-cache',
30+
Connection: 'keep-alive',
31+
},
32+
});
33+
}
34+
35+
async write(input: Uint8Array | string) {
36+
try {
37+
if (typeof input === 'string') {
38+
input = this.textEncoder.encode(input);
39+
}
40+
await this._writer.write(input);
41+
} catch (e) {
42+
if (e) {
43+
console.warn('Error writing to stream', e?.toString());
44+
}
45+
}
46+
return this;
47+
}
48+
49+
async writeln(input: string) {
50+
await this.write(input + '\n');
51+
return this;
52+
}
53+
54+
async close() {
55+
try {
56+
await this._writer.close();
57+
} catch (e) {
58+
console.warn('Error closing stream', e?.toString());
59+
}
60+
}
61+
62+
async writeSSE(message: SSEMessage) {
63+
const parsedData =
64+
message.data && typeof message.data === 'object'
65+
? JSON.stringify(message.data)
66+
: message.data;
67+
68+
const data = parsedData
69+
.split('\n')
70+
.map(line => {
71+
return `data: ${line}`;
72+
})
73+
.join('\n');
74+
75+
const sseData =
76+
[
77+
message.event && `event: ${message.event}`,
78+
data,
79+
message.id && `id: ${message.id}`,
80+
]
81+
.filter(Boolean)
82+
.join('\n') + '\n\n';
83+
84+
await this.write(sseData);
85+
}
86+
}

src/constants/stream.ts

+17-6
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,12 @@ import {paths, RunOutputSchema, RunSchema} from '../generated/openapi';
22
import {CastRunStepOutputSchema} from '../types/runs';
33
import {handleStepStream} from '../utils/runs';
44

5-
export const STREAM_PARSERS: Partial<
6-
Record<keyof paths, (current: any, event: any, data: any) => void>
7-
> = {
5+
export const STREAM_PARSERS = {
86
'/apps/{app_id}/workflows/{workflow_id}/runs': (
97
run: RunSchema,
108
event: 'run' | 'step' | 'chunk',
119
data: unknown,
12-
) => {
10+
): RunSchema => {
1311
switch (event) {
1412
case 'run': {
1513
return data as RunSchema;
@@ -37,19 +35,23 @@ export const STREAM_PARSERS: Partial<
3735
};
3836
}
3937
}
38+
39+
return run;
4040
},
4141

4242
'/apps/{app_id}/runs/step': (
4343
step: CastRunStepOutputSchema,
4444
event: 'step' | 'chunk',
4545
data: unknown,
46-
) => {
46+
): CastRunStepOutputSchema => {
4747
switch (event) {
4848
case 'step': {
4949
return data as CastRunStepOutputSchema;
5050
}
5151
case 'chunk': {
52-
if (!step) return;
52+
if (!step) {
53+
break;
54+
}
5355

5456
step = handleStepStream(
5557
event,
@@ -60,5 +62,14 @@ export const STREAM_PARSERS: Partial<
6062
return step;
6163
}
6264
}
65+
66+
return step;
6367
},
68+
} satisfies Partial<
69+
Record<keyof paths, (current: any, event: any, data: any) => void>
70+
>;
71+
72+
export const STREAM_PARSERS_TYPES = {
73+
step: STREAM_PARSERS['/apps/{app_id}/runs/step'],
74+
run: STREAM_PARSERS['/apps/{app_id}/workflows/{workflow_id}/runs'],
6475
};

0 commit comments

Comments
 (0)