Skip to content

Commit 71d0d79

Browse files
authored
fix(NODE-5052): prevent cursor and changestream close logic from running more than once (mongodb#3562)
1 parent 4bac63c commit 71d0d79

File tree

9 files changed

+228
-196
lines changed

9 files changed

+228
-196
lines changed

.mocharc.json

+6-2
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@
33
"require": [
44
"source-map-support/register",
55
"ts-node/register",
6-
"test/tools/runner/chai-addons.js"
6+
"test/tools/runner/chai-addons.js",
7+
"test/tools/runner/hooks/unhandled_checker.ts"
8+
],
9+
"extension": [
10+
"js",
11+
"ts"
712
],
8-
"extension": ["js", "ts"],
913
"recursive": true,
1014
"timeout": 60000,
1115
"failZero": true,

src/cursor/abstract_cursor.ts

+7-4
Original file line numberDiff line numberDiff line change
@@ -828,13 +828,16 @@ function cleanupCursor(
828828

829829
cursor[kKilled] = true;
830830

831+
if (session.hasEnded) {
832+
return completeCleanup();
833+
}
834+
831835
executeOperation(
832836
cursor[kClient],
833837
new KillCursorsOperation(cursorId, cursorNs, server, { session })
834-
).finally(() => {
835-
completeCleanup();
836-
});
837-
return;
838+
)
839+
.catch(() => null)
840+
.finally(completeCleanup);
838841
}
839842

840843
/** @internal */

test/integration/change-streams/change_stream.test.ts

+24
Original file line numberDiff line numberDiff line change
@@ -1019,6 +1019,30 @@ describe('Change Streams', function () {
10191019
}
10201020
}
10211021
);
1022+
1023+
it(
1024+
'when closed throws "ChangeStream is closed"',
1025+
{ requires: { topology: '!single' } },
1026+
async function () {
1027+
changeStream = collection.watch();
1028+
1029+
const loop = (async function () {
1030+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
1031+
for await (const _change of changeStream) {
1032+
return 'loop entered'; // loop should never be entered
1033+
}
1034+
return 'loop ended without error'; // loop should not finish without error
1035+
})();
1036+
1037+
await sleep(1);
1038+
const closeResult = changeStream.close().catch(error => error);
1039+
expect(closeResult).to.not.be.instanceOf(Error);
1040+
1041+
const result = await loop.catch(error => error);
1042+
expect(result).to.be.instanceOf(MongoAPIError);
1043+
expect(result.message).to.match(/ChangeStream is closed/i);
1044+
}
1045+
);
10221046
});
10231047

10241048
describe('#return', function () {

test/integration/crud/crud_api.test.ts

+70-107
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { expect } from 'chai';
2+
import { on } from 'events';
23

34
import { MongoClient, MongoError, ObjectId, ReturnDocument } from '../../mongodb';
45
import { assert as test } from '../shared';
@@ -60,130 +61,92 @@ describe('CRUD API', function () {
6061
await client.close();
6162
});
6263

63-
it('should correctly execute find method using crud api', function (done) {
64-
const db = client.db();
65-
66-
db.collection('t').insertMany([{ a: 1 }, { a: 1 }, { a: 1 }, { a: 1 }], function (err) {
67-
expect(err).to.not.exist;
68-
69-
//
70-
// Cursor
71-
// --------------------------------------------------
72-
const makeCursor = () => {
73-
// Possible methods on the the cursor instance
74-
return db
75-
.collection('t')
76-
.find({})
77-
.filter({ a: 1 })
78-
.addCursorFlag('noCursorTimeout', true)
79-
.addQueryModifier('$comment', 'some comment')
80-
.batchSize(2)
81-
.comment('some comment 2')
82-
.limit(2)
83-
.maxTimeMS(50)
84-
.project({ a: 1 })
85-
.skip(0)
86-
.sort({ a: 1 });
87-
};
64+
context('when creating a cursor with find', () => {
65+
let collection;
8866

89-
//
90-
// Exercise count method
91-
// -------------------------------------------------
92-
const countMethod = function () {
93-
// Execute the different methods supported by the cursor
94-
const cursor = makeCursor();
95-
cursor.count(function (err, count) {
96-
expect(err).to.not.exist;
97-
test.equal(2, count);
98-
eachMethod();
99-
});
100-
};
67+
beforeEach(async () => {
68+
collection = client.db().collection('t');
69+
await collection.drop().catch(() => null);
70+
await collection.insertMany([{ a: 1 }, { a: 1 }, { a: 1 }, { a: 1 }]);
71+
});
10172

102-
//
103-
// Exercise legacy method each
104-
// -------------------------------------------------
105-
const eachMethod = function () {
106-
let count = 0;
73+
afterEach(async () => {
74+
await collection?.drop().catch(() => null);
75+
});
10776

77+
const makeCursor = () => {
78+
// Possible methods on the the cursor instance
79+
return collection
80+
.find({})
81+
.filter({ a: 1 })
82+
.addCursorFlag('noCursorTimeout', true)
83+
.addQueryModifier('$comment', 'some comment')
84+
.batchSize(1)
85+
.comment('some comment 2')
86+
.limit(2)
87+
.maxTimeMS(50)
88+
.project({ a: 1 })
89+
.skip(0)
90+
.sort({ a: 1 });
91+
};
92+
93+
describe('#count()', () => {
94+
it('returns the number of documents', async () => {
10895
const cursor = makeCursor();
109-
cursor.forEach(
110-
() => {
111-
count = count + 1;
112-
},
113-
err => {
114-
expect(err).to.not.exist;
115-
test.equal(2, count);
116-
toArrayMethod();
117-
}
118-
);
119-
};
96+
const res = await cursor.count();
97+
expect(res).to.equal(2);
98+
});
99+
});
120100

121-
//
122-
// Exercise toArray
123-
// -------------------------------------------------
124-
const toArrayMethod = function () {
101+
describe('#forEach()', () => {
102+
it('iterates all the documents', async () => {
125103
const cursor = makeCursor();
126-
cursor.toArray(function (err, docs) {
127-
expect(err).to.not.exist;
128-
test.equal(2, docs.length);
129-
nextMethod();
104+
let count = 0;
105+
await cursor.forEach(() => {
106+
count += 1;
130107
});
131-
};
108+
expect(count).to.equal(2);
109+
});
110+
});
132111

133-
//
134-
// Exercise next method
135-
// -------------------------------------------------
136-
const nextMethod = function () {
112+
describe('#toArray()', () => {
113+
it('returns an array with all documents', async () => {
137114
const cursor = makeCursor();
138-
cursor.next(function (err, doc) {
139-
expect(err).to.not.exist;
140-
test.ok(doc != null);
141-
142-
cursor.next(function (err, doc) {
143-
expect(err).to.not.exist;
144-
test.ok(doc != null);
115+
const res = await cursor.toArray();
116+
expect(res).to.have.lengthOf(2);
117+
});
118+
});
145119

146-
cursor.next(function (err, doc) {
147-
expect(err).to.not.exist;
148-
expect(doc).to.not.exist;
149-
streamMethod();
150-
});
151-
});
152-
});
153-
};
120+
describe('#next()', () => {
121+
it('is callable without blocking', async () => {
122+
const cursor = makeCursor();
123+
const doc0 = await cursor.next();
124+
expect(doc0).to.exist;
125+
const doc1 = await cursor.next();
126+
expect(doc1).to.exist;
127+
const doc2 = await cursor.next();
128+
expect(doc2).to.not.exist;
129+
});
130+
});
154131

155-
//
156-
// Exercise stream
157-
// -------------------------------------------------
158-
const streamMethod = function () {
159-
let count = 0;
132+
describe('#stream()', () => {
133+
it('creates a node stream that emits data events', async () => {
134+
const count = 0;
160135
const cursor = makeCursor();
161136
const stream = cursor.stream();
162-
stream.on('data', function () {
163-
count = count + 1;
164-
});
165-
137+
on(stream, 'data');
166138
cursor.once('close', function () {
167-
test.equal(2, count);
168-
explainMethod();
139+
expect(count).to.equal(2);
169140
});
170-
};
141+
});
142+
});
171143

172-
//
173-
// Explain method
174-
// -------------------------------------------------
175-
const explainMethod = function () {
144+
describe('#explain()', () => {
145+
it('returns an explain document', async () => {
176146
const cursor = makeCursor();
177-
cursor.explain(function (err, result) {
178-
expect(err).to.not.exist;
179-
test.ok(result != null);
180-
181-
client.close(done);
182-
});
183-
};
184-
185-
// Execute all the methods
186-
countMethod();
147+
const result = await cursor.explain();
148+
expect(result).to.exist;
149+
});
187150
});
188151
});
189152

test/integration/crud/find_cursor_methods.test.js

+43-35
Original file line numberDiff line numberDiff line change
@@ -108,50 +108,58 @@ describe('Find Cursor', function () {
108108
});
109109
});
110110

111-
context('#close', function () {
112-
it('should send a killCursors command when closed before completely iterated', function (done) {
113-
const commands = [];
114-
client.on('commandStarted', filterForCommands(['killCursors'], commands));
111+
describe('#close', function () {
112+
let collection;
115113

116-
const coll = client.db().collection('abstract_cursor');
117-
const cursor = coll.find({}, { batchSize: 2 });
118-
cursor.next(err => {
119-
expect(err).to.not.exist;
120-
cursor.close(err => {
121-
expect(err).to.not.exist;
122-
expect(commands).to.have.length(1);
123-
done();
124-
});
125-
});
114+
beforeEach(async function () {
115+
collection = client.db().collection('abstract_cursor');
116+
await collection.drop().catch(() => null);
117+
await collection.insertMany([{ a: 1 }, { a: 2 }, { a: 3 }, { a: 4 }]);
126118
});
127119

128-
it('should not send a killCursors command when closed after completely iterated', function (done) {
129-
const commands = [];
130-
client.on('commandStarted', filterForCommands(['killCursors'], commands));
120+
afterEach(async function () {
121+
await collection?.drop().catch(() => null);
122+
});
131123

132-
const coll = client.db().collection('abstract_cursor');
133-
const cursor = coll.find({}, { batchSize: 2 });
134-
cursor.toArray(err => {
135-
expect(err).to.not.exist;
124+
context('when closed before completely iterated', () => {
125+
it('sends a killCursors command', async () => {
126+
const killCursorsCommands = [];
127+
client.on('commandStarted', filterForCommands(['killCursors'], killCursorsCommands));
136128

137-
cursor.close(err => {
138-
expect(err).to.not.exist;
139-
expect(commands).to.have.length(0);
140-
done();
141-
});
129+
const cursor = collection.find({}, { batchSize: 2 });
130+
131+
const doc = await cursor.next();
132+
expect(doc).property('a', 1);
133+
134+
expect(killCursorsCommands).to.have.length(0);
135+
await cursor.close();
136+
expect(killCursorsCommands).to.have.length(1);
142137
});
143138
});
144139

145-
it('should not send a killCursors command when closed before initialization', function (done) {
146-
const commands = [];
147-
client.on('commandStarted', filterForCommands(['killCursors'], commands));
140+
context('when closed after completely iterated', () => {
141+
it('does not send a killCursors command', async () => {
142+
const killCursorsCommands = [];
143+
client.on('commandStarted', filterForCommands(['killCursors'], killCursorsCommands));
148144

149-
const coll = client.db().collection('abstract_cursor');
150-
const cursor = coll.find({}, { batchSize: 2 });
151-
cursor.close(err => {
152-
expect(err).to.not.exist;
153-
expect(commands).to.have.length(0);
154-
done();
145+
const cursor = collection.find();
146+
await cursor.toArray();
147+
expect(killCursorsCommands).to.have.length(0);
148+
await cursor.close();
149+
expect(killCursorsCommands).to.have.length(0);
150+
});
151+
});
152+
153+
context('when closed before initialization', () => {
154+
it('does not send a killCursors command', async () => {
155+
const killCursorsCommands = [];
156+
client.on('commandStarted', filterForCommands(['killCursors'], killCursorsCommands));
157+
158+
const cursor = collection.find();
159+
160+
expect(killCursorsCommands).to.have.length(0);
161+
await cursor.close();
162+
expect(killCursorsCommands).to.have.length(0);
155163
});
156164
});
157165
});

0 commit comments

Comments
 (0)