Skip to content

Commit 5b9cd21

Browse files
authored
feat: add streaming optimizer and structured request logging (Gitlawb#703)
* Integrate request logging and streaming optimizer - Add logApiCallStart/End for API request tracking with correlation IDs - Add streaming state tracking with processStreamChunk - Flush buffer and log stream stats at stream end - Resolve merge conflict with main branch * feat: add streaming optimizer and structured request logging * fix: address PR review feedback - Remove buffering from streamingOptimizer - now purely observational - Use logForDebugging instead of console.log for structured logging - Remove dead code (streamResponse, bufferedStreamResponse, etc.) - Use existing logging infrastructure instead of raw console.log - Keep only used functions: createStreamState, processStreamChunk, getStreamStats * test: add unit tests for requestLogging and streamingOptimizer - streamingOptimizer.test.ts: 6 tests for createStreamState, processStreamChunk, getStreamStats - requestLogging.test.ts: 6 tests for createCorrelationId, logApiCallStart, logApiCallEnd * fix: correct durationMs test to be >= 0 instead of exactly 0 * fix: address PR Gitlawb#703 blockers and non-blockers 1. BLOCKER FIX: Skip clone() for streaming responses - Only call response.clone() + .json() for non-streaming requests - For streaming, usage comes via stream chunks anyway 2. NON-BLOCKER: Document dead code in flushStreamBuffer - Added comment explaining it's a no-op kept for API compat 3. NON-BLOCKER: vi.mock in tests - left as-is (test framework issue) * fix: address all remaining non-blockers for PR Gitlawb#703 1. Remove dead code: flushStreamBuffer call and unused import 2. Fix test for Bun: remove vi.mock, use simple no-throw tests
1 parent e92e527 commit 5b9cd21

5 files changed

Lines changed: 326 additions & 0 deletions

File tree

src/services/api/openaiShim.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ import {
6767
normalizeToolArguments,
6868
hasToolFieldMapping,
6969
} from './toolArgumentNormalization.js'
70+
import { logApiCallStart, logApiCallEnd } from '../../utils/requestLogging.js'
71+
import { createStreamState, processStreamChunk, getStreamStats } from '../../utils/streamingOptimizer.js'
7072

7173
type SecretValueSource = Partial<{
7274
OPENAI_API_KEY: string
@@ -857,6 +859,7 @@ async function* openaiStreamToAnthropic(
857859
let lastStopReason: 'tool_use' | 'max_tokens' | 'end_turn' | null = null
858860
let hasEmittedFinalUsage = false
859861
let hasProcessedFinishReason = false
862+
const streamState = createStreamState()
860863

861864
// Emit message_start
862865
yield {
@@ -1020,6 +1023,7 @@ async function* openaiStreamToAnthropic(
10201023
delta: { type: 'text_delta', text: visible },
10211024
}
10221025
}
1026+
processStreamChunk(streamState, delta.content)
10231027
}
10241028

10251029
// Tool calls
@@ -1039,6 +1043,7 @@ async function* openaiStreamToAnthropic(
10391043
const toolBlockIndex = contentBlockIndex
10401044
const initialArguments = tc.function.arguments ?? ''
10411045
const normalizeAtStop = hasToolFieldMapping(tc.function.name)
1046+
processStreamChunk(streamState, tc.function.arguments ?? '')
10421047
activeToolCalls.set(tc.index, {
10431048
id: tc.id,
10441049
name: tc.function.name,
@@ -1236,6 +1241,20 @@ async function* openaiStreamToAnthropic(
12361241
reader.releaseLock()
12371242
}
12381243

1244+
const stats = getStreamStats(streamState)
1245+
if (stats.totalChunks > 0) {
1246+
logForDebugging(
1247+
JSON.stringify({
1248+
type: 'stream_stats',
1249+
model,
1250+
total_chunks: stats.totalChunks,
1251+
first_token_ms: stats.firstTokenMs,
1252+
duration_ms: stats.durationMs,
1253+
}),
1254+
{ level: 'debug' },
1255+
)
1256+
}
1257+
12391258
yield { type: 'message_stop' }
12401259
}
12411260

@@ -1715,6 +1734,12 @@ class OpenAIShimMessages {
17151734
}
17161735

17171736
let response: Response | undefined
1737+
const provider = request.baseUrl.includes('nvidia') ? 'nvidia-nim'
1738+
: request.baseUrl.includes('minimax') ? 'minimax'
1739+
: request.baseUrl.includes('localhost:11434') || request.baseUrl.includes('localhost:11435') ? 'ollama'
1740+
: request.baseUrl.includes('anthropic') ? 'anthropic'
1741+
: 'openai'
1742+
const { correlationId, startTime } = logApiCallStart(provider, request.resolvedModel)
17181743
for (let attempt = 0; attempt < maxAttempts; attempt++) {
17191744
try {
17201745
response = await fetchWithProxyRetry(
@@ -1752,6 +1777,20 @@ class OpenAIShimMessages {
17521777
}
17531778

17541779
if (response.ok) {
1780+
let tokensIn = 0
1781+
let tokensOut = 0
1782+
// Skip clone() for streaming responses - it blocks until full body is received,
1783+
// defeating the purpose of streaming. Usage data is already sent via
1784+
// stream_options: { include_usage: true } and can be extracted from the stream.
1785+
if (!params.stream) {
1786+
try {
1787+
const clone = response.clone()
1788+
const data = await clone.json()
1789+
tokensIn = data.usage?.prompt_tokens ?? 0
1790+
tokensOut = data.usage?.completion_tokens ?? 0
1791+
} catch { /* ignore */ }
1792+
}
1793+
logApiCallEnd(correlationId, startTime, request.resolvedModel, 'success', tokensIn, tokensOut, false)
17551794
return response
17561795
}
17571796

src/utils/requestLogging.test.ts

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import { describe, expect, it, beforeEach } from 'bun:test'
2+
import {
3+
createCorrelationId,
4+
logApiCallStart,
5+
logApiCallEnd,
6+
} from './requestLogging.js'
7+
8+
describe('requestLogging', () => {
9+
describe('createCorrelationId', () => {
10+
it('returns a non-empty string', () => {
11+
const id = createCorrelationId()
12+
expect(id).toBeTruthy()
13+
expect(typeof id).toBe('string')
14+
})
15+
16+
it('returns unique IDs', () => {
17+
const id1 = createCorrelationId()
18+
const id2 = createCorrelationId()
19+
expect(id1).not.toBe(id2)
20+
})
21+
})
22+
23+
describe('logApiCallStart', () => {
24+
it('returns correlation ID and start time', () => {
25+
const result = logApiCallStart('openai', 'gpt-4o')
26+
expect(result.correlationId).toBeTruthy()
27+
expect(result.startTime).toBeGreaterThan(0)
28+
})
29+
30+
it('logs without throwing', () => {
31+
expect(() => logApiCallStart('ollama', 'llama3')).not.toThrow()
32+
})
33+
})
34+
35+
describe('logApiCallEnd', () => {
36+
it('logs success without throwing', () => {
37+
const { correlationId, startTime } = logApiCallStart('openai', 'gpt-4o')
38+
expect(() =>
39+
logApiCallEnd(
40+
correlationId,
41+
startTime,
42+
'gpt-4o',
43+
'success',
44+
100,
45+
50,
46+
false,
47+
),
48+
).not.toThrow()
49+
})
50+
51+
it('logs error without throwing', () => {
52+
const { correlationId, startTime } = logApiCallStart('openai', 'gpt-4o')
53+
expect(() =>
54+
logApiCallEnd(
55+
correlationId,
56+
startTime,
57+
'gpt-4o',
58+
'error',
59+
0,
60+
0,
61+
false,
62+
undefined,
63+
undefined,
64+
'Network error',
65+
),
66+
).not.toThrow()
67+
})
68+
69+
it('logs with all parameters without throwing', () => {
70+
const { correlationId, startTime } = logApiCallStart('openai', 'gpt-4o')
71+
expect(() =>
72+
logApiCallEnd(
73+
correlationId,
74+
startTime,
75+
'gpt-4o',
76+
'success',
77+
100,
78+
50,
79+
true,
80+
'error message',
81+
{ provider: 'openai' },
82+
),
83+
).not.toThrow()
84+
})
85+
})
86+
})

src/utils/requestLogging.ts

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/**
2+
* Structured Request Logging
3+
*
4+
* Uses existing logForDebugging for structured logging.
5+
*/
6+
7+
import { randomUUID } from 'crypto'
8+
import { logForDebugging } from './debug.js'
9+
10+
export interface RequestLog {
11+
correlationId: string
12+
timestamp: number
13+
provider: string
14+
model: string
15+
duration: number
16+
status: 'success' | 'error'
17+
tokensIn: number
18+
tokensOut: number
19+
error?: string
20+
streaming: boolean
21+
}
22+
23+
export function createCorrelationId(): string {
24+
return randomUUID()
25+
}
26+
27+
export function logApiCallStart(
28+
provider: string,
29+
model: string,
30+
): { correlationId: string; startTime: number } {
31+
const correlationId = createCorrelationId()
32+
const startTime = Date.now()
33+
34+
logForDebugging(
35+
JSON.stringify({
36+
type: 'api_call_start',
37+
correlationId,
38+
provider,
39+
model,
40+
timestamp: startTime,
41+
}),
42+
{ level: 'debug' },
43+
)
44+
45+
return { correlationId, startTime }
46+
}
47+
48+
export function logApiCallEnd(
49+
correlationId: string,
50+
startTime: number,
51+
model: string,
52+
status: 'success' | 'error',
53+
tokensIn: number,
54+
tokensOut: number,
55+
streaming: boolean,
56+
firstTokenMs?: number,
57+
totalChunks?: number,
58+
error?: string,
59+
): void {
60+
const duration = Date.now() - startTime
61+
62+
const logData: Record<string, unknown> = {
63+
type: status === 'error' ? 'api_call_error' : 'api_call_end',
64+
correlationId,
65+
model,
66+
duration_ms: duration,
67+
status,
68+
tokens_in: tokensIn,
69+
tokens_out: tokensOut,
70+
streaming,
71+
}
72+
73+
if (firstTokenMs !== undefined) {
74+
logData.first_token_ms = firstTokenMs
75+
}
76+
77+
if (totalChunks !== undefined) {
78+
logData.total_chunks = totalChunks
79+
}
80+
81+
if (error) {
82+
logData.error = error
83+
}
84+
85+
logForDebugging(
86+
JSON.stringify(logData),
87+
{ level: status === 'error' ? 'error' : 'debug' },
88+
)
89+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import { describe, expect, it, beforeEach } from 'bun:test'
2+
import {
3+
createStreamState,
4+
processStreamChunk,
5+
flushStreamBuffer,
6+
getStreamStats,
7+
} from './streamingOptimizer.js'
8+
9+
describe('streamingOptimizer', () => {
10+
let state: ReturnType<typeof createStreamState>
11+
12+
beforeEach(() => {
13+
state = createStreamState()
14+
})
15+
16+
describe('createStreamState', () => {
17+
it('creates initial state with zero counts', () => {
18+
expect(state.chunkCount).toBe(0)
19+
expect(state.firstTokenTime).toBeNull()
20+
expect(state.startTime).toBeGreaterThan(0)
21+
})
22+
})
23+
24+
describe('processStreamChunk', () => {
25+
it('tracks first token time on first chunk', () => {
26+
processStreamChunk(state, 'hello')
27+
expect(state.firstTokenTime).not.toBeNull()
28+
expect(state.chunkCount).toBe(1)
29+
})
30+
31+
it('increments chunk count', () => {
32+
processStreamChunk(state, 'chunk1')
33+
processStreamChunk(state, 'chunk2')
34+
expect(state.chunkCount).toBe(2)
35+
})
36+
})
37+
38+
describe('getStreamStats', () => {
39+
it('returns zero values for empty stream', () => {
40+
const stats = getStreamStats(state)
41+
expect(stats.totalChunks).toBe(0)
42+
expect(stats.firstTokenMs).toBeNull()
43+
expect(stats.durationMs).toBeGreaterThanOrEqual(0)
44+
})
45+
46+
it('returns correct stats after processing chunks', () => {
47+
processStreamChunk(state, 'test')
48+
const stats = getStreamStats(state)
49+
expect(stats.totalChunks).toBe(1)
50+
expect(stats.firstTokenMs).toBeGreaterThanOrEqual(0)
51+
expect(stats.durationMs).toBeGreaterThanOrEqual(0)
52+
})
53+
})
54+
55+
describe('flushStreamBuffer', () => {
56+
it('returns empty string (no-op)', () => {
57+
const result = flushStreamBuffer(state)
58+
expect(result).toBe('')
59+
})
60+
})
61+
})

src/utils/streamingOptimizer.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/**
2+
* Streaming Stats Tracker
3+
*
4+
* Observational stats tracking for streaming responses.
5+
* No buffering - purely tracks metrics for monitoring.
6+
*/
7+
8+
export interface StreamStats {
9+
totalChunks: number
10+
firstTokenMs: number | null
11+
durationMs: number
12+
}
13+
14+
export interface StreamState {
15+
chunkCount: number
16+
firstTokenTime: number | null
17+
startTime: number
18+
}
19+
20+
export function createStreamState(): StreamState {
21+
return {
22+
chunkCount: 0,
23+
firstTokenTime: null,
24+
startTime: Date.now(),
25+
}
26+
}
27+
28+
export function processStreamChunk(state: StreamState, _chunk: string): void {
29+
if (state.firstTokenTime === null) {
30+
state.firstTokenTime = Date.now()
31+
}
32+
state.chunkCount++
33+
}
34+
35+
export function flushStreamBuffer(_state: StreamState): string {
36+
return '' // No-op - kept for API compatibility
37+
}
38+
39+
export function getStreamStats(state: StreamState): StreamStats {
40+
const now = Date.now()
41+
const firstTokenMs = state.firstTokenTime
42+
? now - state.firstTokenTime
43+
: null
44+
const durationMs = now - state.startTime
45+
46+
return {
47+
totalChunks: state.chunkCount,
48+
firstTokenMs,
49+
durationMs,
50+
}
51+
}

0 commit comments

Comments
 (0)