Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
d6b816f
feat: add atlas-streams-discover tool
jwongmongodb Mar 12, 2026
155c541
feat: add atlas-streams-build tool
jwongmongodb Mar 12, 2026
120c54a
feat: add atlas-streams-manage tool
jwongmongodb Mar 12, 2026
ca7ec77
feat: add atlas-streams-teardown tool
jwongmongodb Mar 12, 2026
543d993
test: add cross-tool workflow integration tests
jwongmongodb Mar 12, 2026
00e0375
fix: address PR review — deduplicate error handlers, remove PII telem…
jwongmongodb Mar 12, 2026
8ef6217
fix: address PR review — remove find/list-all-processors, fix project…
jwongmongodb Mar 12, 2026
2d0efc9
fix: regenerate README to remove find-processor and list-all-processors
jwongmongodb Mar 12, 2026
6735f6c
refactor: rename SKIP_ATLAS_TESTS to SKIP_ATLAS_INTEGRATION_TESTS
jwongmongodb Mar 12, 2026
3747b80
fix: correct descriptions, add tier z.enum validation, harden workspa…
jwongmongodb Mar 13, 2026
b74eb35
fix: make gunzipSync polyfill return input buffer instead of undefined
jwongmongodb Mar 13, 2026
46469bc
fix: add streams workspace cleanup to cleanup script and wait for wor…
jwongmongodb Mar 13, 2026
afdb691
fix: parallelize cleanup and wait for cluster deletion before deletin…
jwongmongodb Mar 13, 2026
0e8c519
fix: address copilot review — validate args before confirmation, fix …
jwongmongodb Mar 13, 2026
68efdc6
fix: support SASL_INHERIT auth type for SchemaRegistry connections
jwongmongodb Mar 13, 2026
7e60d80
fix: use async gunzip to avoid blocking event loop with concurrent MC…
jwongmongodb Mar 16, 2026
82e7842
fix: add missing metrics mock to streams test files after rebase
jwongmongodb Mar 16, 2026
3f75134
fix: prevent privateLinkConfig from overwriting provider and only lis…
jwongmongodb Mar 16, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/code-health-fork.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
- name: Run tests
run: pnpm test
env:
SKIP_ATLAS_TESTS: "true"
SKIP_ATLAS_INTEGRATION_TESTS: "true"
SKIP_ATLAS_LOCAL_TESTS: "true"
run-atlas-local-tests:
name: Run Atlas Local tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/code-health.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
- name: Run tests
run: pnpm test
env:
SKIP_ATLAS_TESTS: "true"
SKIP_ATLAS_INTEGRATION_TESTS: "true"
SKIP_ATLAS_LOCAL_TESTS: "true"
MDB_MONGOT_PASSWORD: ${{ secrets.TEST_MDB_MONGOT_PASSWORD }}
MDB_VOYAGE_API_KEY: ${{ secrets.TEST_MDB_MCP_VOYAGE_API_KEY }}
Expand Down
100 changes: 55 additions & 45 deletions README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
"test:accuracy": "sh ./scripts/accuracy/runAccuracyTests.sh",
"test:browser": "pnpm --filter browser test",
"test:long-running-tests": "vitest --project long-running-tests --coverage",
"test:local": "SKIP_ATLAS_TESTS=true SKIP_ATLAS_LOCAL_TESTS=true pnpm run test",
"test:local": "SKIP_ATLAS_INTEGRATION_TESTS=true SKIP_ATLAS_LOCAL_TESTS=true pnpm run test",
"atlas:cleanup": "vitest --project atlas-cleanup"
},
"license": "Apache-2.0",
Expand Down
147 changes: 119 additions & 28 deletions scripts/cleanupAtlasTestLeftovers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import { ConsoleLogger } from "../src/common/logging/index.js";
import { Keychain } from "../src/lib.js";
import { describe, it } from "vitest";

function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

function isOlderThanTwoHours(date: string): boolean {
const twoHoursInMs = 2 * 60 * 60 * 1000;
const projectDate = new Date(date);
Expand Down Expand Up @@ -35,6 +39,78 @@ async function findAllTestProjects(client: ApiClient, orgId: string): Promise<Gr
return testProjects.filter((proj) => isOlderThanTwoHours(proj.created));
}

async function deleteAllWorkspacesOnStaleProject(client: ApiClient, projectId: string): Promise<string[]> {
const errors: string[] = [];

try {
const workspaces = await client
.listStreamWorkspaces({
params: {
path: {
groupId: projectId,
},
},
})
.then((res) => res.results || []);

await Promise.allSettled(
workspaces.map(async (workspace) => {
const name = workspace.name || "";
try {
// Delete all processors first (auto-stops running ones)
try {
const processors = await client
.getStreamProcessors({
params: { path: { groupId: projectId, tenantName: name } },
})
.then((res) => res.results || []);
await Promise.allSettled(
processors.map((p) =>
client.deleteStreamProcessor({
params: {
path: {
groupId: projectId,
tenantName: name,
processorName: p.name || "",
},
},
})
)
);
} catch {
// Ignore errors listing/deleting processors
}
await client.deleteStreamWorkspace({
params: {
path: { groupId: projectId, tenantName: name },
},
});
// Wait for workspace to be fully deleted (up to 120s)
for (let i = 0; i < 120; i++) {
try {
await client.getStreamWorkspace({
params: {
path: { groupId: projectId, tenantName: name },
},
});
await sleep(1000);
} catch {
break;
}
}
console.log(` Deleted workspace: ${name}`);
} catch (error) {
errors.push(`Failed to delete workspace ${name} in project ${projectId}: ${String(error)}`);
}
})
);
} catch {
// Project may not have streams enabled, ignore
}

return errors;
}

async function deleteAllClustersOnStaleProject(client: ApiClient, projectId: string): Promise<string[]> {
const errors: string[] = [];

Expand All @@ -50,12 +126,13 @@ async function deleteAllClustersOnStaleProject(client: ApiClient, projectId: str

await Promise.allSettled(
allClusters.map(async (cluster) => {
const name = cluster.name || "";
try {
await client.deleteCluster({
params: { path: { groupId: projectId || "", clusterName: cluster.name || "" } },
params: { path: { groupId: projectId || "", clusterName: name } },
});
} catch (error) {
errors.push(`Failed to delete cluster ${cluster.name} in project ${projectId}: ${String(error)}`);
errors.push(`Failed to delete cluster ${name} in project ${projectId}: ${String(error)}`);
}
})
);
Expand Down Expand Up @@ -88,34 +165,48 @@ async function main(): Promise<void> {

const allErrors: string[] = [];

for (const project of testProjects) {
console.log(`Cleaning up project: ${project.name} (${project.id})`);
if (!project.id) {
console.warn(`Skipping project with missing ID: ${project.name}`);
continue;
}
const projectsWithIds = testProjects.filter((p): p is Group & { id: string } => !!p.id);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Atlas enforces a dependency chain: a project cannot be deleted while it has clusters, and clusters cannot be deleted while a stream workspace holds a connection to them.

Phase 1 issues all the delete requests (processors → workspaces → clusters) in parallel across projects. Phase 2 waits for clusters to fully terminate before attempting to delete each project — without this wait, the project deletion fails with a 409 because the async cluster teardown hasn't finished yet.

Both phases run all projects in parallel so the total wait time is bounded by the slowest single project, not the sum.


// Try to delete all clusters first
const clusterErrors = await deleteAllClustersOnStaleProject(apiClient, project.id);
allErrors.push(...clusterErrors);
// Phase 1: Delete all workspaces and clusters in parallel across all projects
await Promise.allSettled(
projectsWithIds.map(async (project) => {
console.log(`Cleaning up project: ${project.name} (${project.id})`);
const workspaceErrors = await deleteAllWorkspacesOnStaleProject(apiClient, project.id);
allErrors.push(...workspaceErrors);
const clusterErrors = await deleteAllClustersOnStaleProject(apiClient, project.id);
allErrors.push(...clusterErrors);
})
);

// Try to delete the project
try {
await apiClient.deleteGroup({
params: {
path: {
groupId: project.id,
},
},
});
console.log(`Deleted project: ${project.name} (${project.id})`);
} catch (error) {
const errorStr = String(error);
const errorMessage = `Failed to delete project ${project.name} (${project.id}): ${errorStr}`;
console.error(errorMessage);
allErrors.push(errorMessage);
}
}
// Phase 2: Wait for clusters to terminate, then delete projects in parallel
await Promise.allSettled(
projectsWithIds.map(async (project) => {
// Wait for clusters to be fully deleted (up to 300s)
for (let i = 0; i < 300; i++) {
try {
const remaining = await apiClient
.listClusters({ params: { path: { groupId: project.id } } })
.then((res) => res.results || []);
if (remaining.length === 0) {
break;
}
await sleep(1000);
} catch {
break;
}
}
try {
await apiClient.deleteGroup({
params: { path: { groupId: project.id } },
});
console.log(`Deleted project: ${project.name} (${project.id})`);
} catch (error) {
const errorMessage = `Failed to delete project ${project.name} (${project.id}): ${String(error)}`;
console.error(errorMessage);
allErrors.push(errorMessage);
}
})
);

if (allErrors.length > 0) {
const errorList = allErrors.map((err, i) => `${i + 1}. ${err}`).join("\n");
Expand Down
2 changes: 2 additions & 0 deletions src/common/config/userConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ const ServerConfigSchema = z4.object({
"drop-collection",
"delete-many",
"drop-index",
"atlas-streams-manage",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed for elicitation to the users that modifies or deletes their streams resources.

"atlas-streams-teardown",
])
.describe(
"An array of tool names that require user confirmation before execution. Requires the client to support elicitation."
Expand Down
2 changes: 1 addition & 1 deletion src/common/schemas.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export const previewFeatureValues = ["search", "mcpUI"] as const;
export const previewFeatureValues = ["search", "mcpUI", "streams"] as const;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be removed after the PR to add the Accuracy test is created.

export type PreviewFeature = (typeof previewFeatureValues)[number];

export const monitoringServerFeatureValues = ["health-check", "metrics"] as const;
Expand Down
58 changes: 53 additions & 5 deletions src/elicitation.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import type { ElicitRequestFormParams } from "@modelcontextprotocol/sdk/types.js";
import type { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";

export type ElicitedInputResult =
| { accepted: true; fields: Record<string, string> }
| { accepted: false; fields?: undefined };

const ELICITATION_TIMEOUT_MS = 300_000; // 5 minutes for user interaction
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout was added to the existing confirmAction method in elicitation.ts — the server.elicitInput() call previously had no timeout, which means it would use the SDK's default (which could hang indefinitely if a client never responds). We added { timeout: 300_000 } (5 minutes) to give users reasonable time to respond while preventing the server from hanging forever.


export class Elicitation {
private readonly server: McpServer["server"];
constructor({ server }: { server: McpServer["server"] }) {
Expand All @@ -26,14 +32,56 @@ export class Elicitation {
return true;
}

const result = await this.server.elicitInput({
mode: "form",
message,
requestedSchema: Elicitation.CONFIRMATION_SCHEMA,
});
const result = await this.server.elicitInput(
{
mode: "form",
message,
requestedSchema: Elicitation.CONFIRMATION_SCHEMA,
},
{ timeout: ELICITATION_TIMEOUT_MS }
);
return result.action === "accept" && result.content?.confirmation === "Yes";
}

/**
* Requests structured input from the user via a form.
* Returns the accepted fields, or { accepted: false } if the client doesn't
* support elicitation or the user declined.
*
* @param message - The message/title to display in the form.
* @param schema - A JSON Schema describing the fields to collect.
* @returns The user-provided values keyed by field name, or null if declined/unsupported.
*/
public async requestInput(
message: string,
schema: ElicitRequestFormParams["requestedSchema"]
): Promise<ElicitedInputResult> {
if (!this.supportsElicitation()) {
return { accepted: false };
}

const result = await this.server.elicitInput(
{
mode: "form",
message,
requestedSchema: schema,
},
{ timeout: ELICITATION_TIMEOUT_MS }
);

if (result.action !== "accept" || !result.content) {
return { accepted: false };
}

const fields: Record<string, string> = {};
for (const [key, value] of Object.entries(result.content)) {
if (typeof value === "string") {
fields[key] = value;
}
}
return { accepted: true, fields };
}

/**
* The schema for the confirmation question.
* TODO: In the future would be good to use Zod 4's toJSONSchema() to generate the schema.
Expand Down
6 changes: 6 additions & 0 deletions src/telemetry/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ export type TelemetryToolMetadata =
| AtlasMetadata
| ConnectionMetadata
| PerfAdvisorToolMetadata
| StreamsToolMetadata
| AutoEmbeddingsUsageMetadata;

export type AtlasMetadata = {
Expand All @@ -169,6 +170,11 @@ export type PerfAdvisorToolMetadata = AtlasMetadata &
operations: string[];
};

export type StreamsToolMetadata = AtlasMetadata & {
action?: string;
resource?: string;
};

export type AutoEmbeddingsUsageMetadata = ConnectionMetadata & {
/**
* Indicates which component generated the embeddings.
Expand Down
Loading
Loading