Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: do not abort when stream payload consumed #63

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ module.exports = fp(
if (raw.socket.destroyed) {
throw new Errors.SOCKET_CLOSED(reqId)
} else {
raw.once(
raw.socket.once(
'close',
function () {
if (controllers.has(this)) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried checking if the request was completed by doing

Suggested change
if (controllers.has(this)) {
if (!raw.completed && controllers.has(this)) {

but that doesn't seem to work as intended for a http1.1 client like unidici.

Do we need some other check, as i'm not sure if it will work for http2 clients, as they can close their sending side of the socket while keeping the receiving side open?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is a must, otherwise we can abort the request as soon as the transmission has finished (request payload received and parsed).

A keep-alive connection is independent of a request lifecycle; the keep alive is handled by the socket while the request lifecycle by the IncomingMessage API (or HTTPServerRequest for HTTP/2).

Let's add it, I believe your tests already covers these cases. Maybe one more for HTTP/2 can be helpful

Expand Down
105 changes: 103 additions & 2 deletions test/index.test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
'use strict'
const { pipeline } = require('node:stream/promises')
const { Readable, PassThrough } = require('node:stream')

const { promisify } = require('util')

const tap = require('tap')
Expand Down Expand Up @@ -86,7 +89,7 @@ tap.test('fastify-racing#decoration', subtest => {
})

tap.test('fastify-racing#promise', { only: true }, subtest => {
subtest.plan(4)
subtest.plan(6)

subtest.test('Should handle a request aborted', t => {
t.plan(3)
Expand Down Expand Up @@ -124,13 +127,111 @@ tap.test('fastify-racing#promise', { only: true }, subtest => {
t.ok(err)
}
)

// Allow a full event loop cycle
await sleep(5)
abtCtlr.abort()
})
})

subtest.test('Should not incorrectly abort request when body stream consumed', t => {
t.plan(4)

const app = fastify()
app.register(plugin)
app.addContentTypeParser('application/octet-stream', {}, (_req, payload, done) => done(null, payload))

t.teardown(() => app.close())

app.post('/', async (req, _reply) => {
const signal = req.race()

// consume stream
await pipeline(req.body, new PassThrough())
await sleep(5) // Allow a full event loop cycle
t.equal(signal.aborted, false)

const result = await Promise.race([signal, dummy(signal)])
t.equal(typeof result, 'string')

if (result.type === 'aborted') return ''
else return `${result}-world`
})

app
.ready()
.then(() => app.listen({ port: 0 }))
.then(async () => {
request(
`http://localhost:${app.server.address().port}`,
{
method: 'POST',
path: '/',
signal: undefined,
body: Readable.from('stream data'),
headers: {
'Content-Type': 'application/octet-stream'
}
},
(err, res) => {
t.error(err)
t.equal(res.statusCode, 200)
}
)
})
})

subtest.test('Should handle a stream request aborted', t => {
t.plan(5)

const app = fastify()
const abtCtlr = new AbortController()
app.register(plugin)
app.addContentTypeParser('application/octet-stream', {}, (_req, payload, done) => done(null, payload))

t.teardown(() => app.close())

app.post('/', async (req, _reply) => {
const signal = req.race()

// consume stream
await pipeline(req.body, new PassThrough())
await sleep(5) // Allow a full event loop cycle
t.equal(signal.aborted, false)

const result = await Promise.race([signal, dummy(signal)])
t.equal(signal.aborted, true)
t.equal(typeof result, 'object')
t.equal(result.type, 'abort')

if (result.type === 'aborted') return ''
else return `${result}-world`
})

app
.ready()
.then(() => app.listen({ port: 0 }))
.then(async () => {
request(
`http://localhost:${app.server.address().port}`,
{
method: 'POST',
path: '/',
signal: abtCtlr.signal,
body: Readable.from('stream data'),
headers: {
'Content-Type': 'application/octet-stream'
}
},
err => {
t.ok(err)
}
)
// Allow multiple event loop cycles
await sleep(20)
abtCtlr.abort()
})
})

subtest.test(
'Should be able to handle more than one race check within a request',
t => {
Expand Down