Skip to content
Merged
16 changes: 15 additions & 1 deletion integration-tests/mcp_server_cyclic_schema.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import { writeFileSync } from 'node:fs';
import { join } from 'node:path';
import { beforeAll, describe, it } from 'vitest';
import { afterAll, beforeAll, describe, it } from 'vitest';
import { TestRig } from './test-helper.js';

// Windows CI runners are unreliable for node-pty interactive tests (winpty agent "File not found").
Expand Down Expand Up @@ -175,6 +175,16 @@ describe.skipIf(skipOnWindowsCi)(
},
});

process.env.LLXPRT_CODE_WELCOME_CONFIG_PATH = join(
rig.testDir!,
'welcome-config.json',
);

writeFileSync(
process.env.LLXPRT_CODE_WELCOME_CONFIG_PATH,
JSON.stringify({ welcomeCompleted: true }, null, 2),
);

// Create server script in the test directory
const testServerPath = join(rig.testDir!, 'mcp-server.cjs');
writeFileSync(testServerPath, serverScript);
Expand All @@ -186,6 +196,10 @@ describe.skipIf(skipOnWindowsCi)(
}
});

afterAll(() => {
delete process.env.LLXPRT_CODE_WELCOME_CONFIG_PATH;
});

it('mcp tool list should include tool with cyclic tool schema', async () => {
const run = await rig.runInteractive();

Expand Down
89 changes: 84 additions & 5 deletions packages/core/src/utils/retry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -605,14 +605,14 @@ describe('retryWithBackoff', () => {
* @when onPersistent429 callback returns true (switch succeeded)
* @then Retry should continue with new bucket
*/
it('should call onPersistent429 callback on consecutive 429 errors', async () => {
it('should call onPersistent429 callback on first 429 error', async () => {
vi.useFakeTimers();
let attempt = 0;
let failoverCalled = false;

const mockFn = vi.fn(async () => {
attempt++;
if (attempt <= 2) {
if (attempt <= 1) {
const error: HttpError = new Error('Rate limit exceeded');
error.status = 429;
throw error;
Expand All @@ -626,7 +626,7 @@ describe('retryWithBackoff', () => {
});

const promise = retryWithBackoff(mockFn, {
maxAttempts: 5,
maxAttempts: 1,
initialDelayMs: 100,
onPersistent429: failoverCallback,
authType: 'oauth-bucket',
Expand All @@ -635,9 +635,87 @@ describe('retryWithBackoff', () => {
await vi.runAllTimersAsync();
await expect(promise).resolves.toBe('success after bucket switch');

// onPersistent429 should be called after consecutive 429 errors
// onPersistent429 should be called after the first 429 error
expect(failoverCallback).toHaveBeenCalled();
expect(failoverCalled).toBe(true);
expect(mockFn).toHaveBeenCalledTimes(2);
});

/**
* @requirement PLAN-20251213issue490 Bucket failover
* @scenario Bucket failover on 402 errors
* @given A request that returns 402 for first bucket
* @when onPersistent429 callback returns true (switch succeeded)
* @then Retry should continue with new bucket
*/
it('should call onPersistent429 callback on first 402 error', async () => {
vi.useFakeTimers();
let attempt = 0;

const mockFn = vi.fn(async () => {
attempt++;
if (attempt === 1) {
const error: HttpError = new Error('Payment Required');
error.status = 402;
throw error;
}
return 'success after bucket switch';
});

const failoverCallback = vi.fn(async () => true);

const promise = retryWithBackoff(mockFn, {
maxAttempts: 1,
initialDelayMs: 100,
onPersistent429: failoverCallback,
authType: 'oauth-bucket',
});

await vi.runAllTimersAsync();
await expect(promise).resolves.toBe('success after bucket switch');
expect(failoverCallback).toHaveBeenCalledWith(
'oauth-bucket',
expect.any(Error),
);
expect(mockFn).toHaveBeenCalledTimes(2);
});

/**
* @requirement PLAN-20251213issue490 Bucket failover
* @scenario Bucket failover on 401 errors
* @given A request that returns 401 for first bucket
* @when onPersistent429 callback returns true (switch succeeded)
* @then Retry should continue with new bucket after refresh retry
*/
it('should retry once on 401 before bucket failover', async () => {
vi.useFakeTimers();
let failoverCalled = false;

const mockFn = vi.fn(async () => {
if (!failoverCalled) {
const error: HttpError = new Error('Unauthorized');
error.status = 401;
throw error;
}
return 'success after bucket switch';
});

const failoverCallback = vi.fn(async () => {
failoverCalled = true;
return true;
});

const promise = retryWithBackoff(mockFn, {
maxAttempts: 3,
initialDelayMs: 100,
onPersistent429: failoverCallback,
authType: 'oauth-bucket',
});

await vi.runAllTimersAsync();
await expect(promise).resolves.toBe('success after bucket switch');
expect(failoverCallback).toHaveBeenCalledTimes(1);
expect(mockFn).toHaveBeenCalledTimes(3);
});

/**
Expand Down Expand Up @@ -672,7 +750,8 @@ describe('retryWithBackoff', () => {

expect(result).toBeInstanceOf(Error);
expect(result.message).toBe('Rate limit exceeded');
expect(failoverCallback).toHaveBeenCalled();
expect(failoverCallback).toHaveBeenCalledTimes(1);
expect(mockFn).toHaveBeenCalledTimes(1);
});
});
});
59 changes: 50 additions & 9 deletions packages/core/src/utils/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,15 +287,17 @@ export async function retryWithBackoff<T>(
let attempt = 0;
let currentDelay = initialDelayMs;
let consecutive429s = 0;
const failoverThreshold = 2; // Attempt bucket failover after this many consecutive 429s
let consecutive401s = 0;
const failoverThreshold = 1; // Attempt bucket failover after this many consecutive 429s

while (attempt < maxAttempts) {
attempt++;
try {
const result = await fn();

// Reset 429 counter on success
// Reset error counters on success
consecutive429s = 0;
consecutive401s = 0;

if (
shouldRetryOnContent &&
Expand All @@ -311,9 +313,12 @@ export async function retryWithBackoff<T>(
return result;
} catch (error) {
const errorStatus = getErrorStatus(error);
const is429 = errorStatus === 429;
const is402 = errorStatus === 402;
const is401 = errorStatus === 401;

// Track consecutive 429 errors for bucket failover
if (errorStatus === 429) {
if (is429) {
consecutive429s++;
logger.debug(
() =>
Expand All @@ -323,17 +328,38 @@ export async function retryWithBackoff<T>(
consecutive429s = 0;
}

// Check if we've exhausted retries or shouldn't retry
if (attempt >= maxAttempts || !shouldRetryOnError(error as Error)) {
throw error;
if (is401) {
consecutive401s++;
} else {
consecutive401s = 0;
}

// Retry once to allow OAuth refresh or onPersistent429 to refresh before failover.
const shouldAttemptRefreshRetry =
is401 && options?.onPersistent429 && consecutive401s === 1;

if (shouldAttemptRefreshRetry) {
logger.debug(
() =>
'401 error detected, retrying once to allow refresh before bucket failover',
);
}

const canAttemptFailover = Boolean(options?.onPersistent429);
const shouldAttemptFailover =
canAttemptFailover &&
((is429 && consecutive429s >= failoverThreshold) ||
is402 ||
(is401 && consecutive401s > 1));

// Attempt bucket failover after threshold consecutive 429 errors
// @plan PLAN-20251213issue490 Bucket failover integration
if (consecutive429s >= failoverThreshold && options?.onPersistent429) {
if (shouldAttemptFailover && options?.onPersistent429) {
const failoverReason = is429
? `${consecutive429s} consecutive 429 errors`
: `status ${errorStatus}`;
logger.debug(
() =>
`Attempting bucket failover after ${consecutive429s} consecutive 429 errors`,
() => `Attempting bucket failover after ${failoverReason}`,
);
const failoverResult = await options.onPersistent429(
options.authType,
Expand All @@ -346,6 +372,7 @@ export async function retryWithBackoff<T>(
() => `Bucket failover successful, resetting retry state`,
);
consecutive429s = 0;
consecutive401s = 0;
currentDelay = initialDelayMs;
// Don't increment attempt counter - this is a fresh start with new bucket
attempt--;
Expand All @@ -360,6 +387,20 @@ export async function retryWithBackoff<T>(
// failoverResult === null means continue with normal retry
}

const shouldRetry = shouldRetryOnError(error as Error);

if (!shouldRetry && !shouldAttemptRefreshRetry) {
throw error;
}

if (attempt >= maxAttempts && !shouldAttemptRefreshRetry) {
throw error;
}

if (attempt >= maxAttempts && shouldAttemptRefreshRetry) {
attempt--;
}

const { delayDurationMs, errorStatus: delayErrorStatus } =
getDelayDurationAndStatus(error);

Expand Down
Loading