Skip to content
Open
104 changes: 101 additions & 3 deletions src/jsc/bindings/webcore/MessagePort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "MessagePort.h"

#include "BunClientData.h"
#include "Event.h"
#include "EventNames.h"
#include "MessageEvent.h"
#include "MessagePortPipe.h"
Expand Down Expand Up @@ -110,9 +111,15 @@

// m_pipe is held for the port's whole lifetime (the GC thread reads
// it in hasPendingActivity()); marking our side Closed is sufficient.
// close() also wakes the entangled peer so it can fire its own 'close'
// event after draining any queued messages.
m_pipe->close(m_side);

removeAllEventListeners();
// Fire our own 'close' event asynchronously (Node + HTML semantics), then
// tear down listeners. If there is no close listener or JS can't run
// (context teardown), tear down synchronously as before. scheduleCloseEvent()
// takes its own strong ref, so it is safe to run before releasing m_hasRef.
bool scheduledClose = scheduleCloseEvent();

// Release the self-reference taken by jsRef() (set when .onmessage is
// assigned or .ref() is called from JS). The JS .close() binding calls
Expand All @@ -127,6 +134,82 @@
context->unrefEventLoop();
deref();
}

if (!scheduledClose)
removeAllEventListeners();
Comment thread
robobun marked this conversation as resolved.
}

void MessagePort::dispatchCloseEvent()
{
if (m_closeEventDispatched)
return;
m_closeEventDispatched = true;

auto* context = scriptExecutionContext();
if (!context || !context->globalObject())
return;

auto* globalObject = defaultGlobalObject(context->globalObject());
if (Zig::GlobalObject::scriptExecutionStatus(globalObject, globalObject) != ScriptExecutionStatus::Running)
return;

// Bypass MessagePort::dispatchEvent()'s detached guard: by the time the
// close task runs the port is already detached, but the 'close' event must
// still reach its listener.
EventTarget::dispatchEvent(Event::create(eventNames().closeEvent, Event::CanBubble::No, Event::IsCancelable::No));
}

void MessagePort::dispatchCloseEventSelf()
{
dispatchCloseEvent();
removeAllEventListeners();
m_hasMessageEventListener = false;
m_hasCloseEventListener = false;
}

bool MessagePort::scheduleCloseEvent()
{
if (m_closeEventDispatched || !m_hasCloseEventListener)
return false;

auto* context = scriptExecutionContext();
if (!context || !context->globalObject())
return false;

auto* globalObject = defaultGlobalObject(context->globalObject());
if (Zig::GlobalObject::scriptExecutionStatus(globalObject, globalObject) != ScriptExecutionStatus::Running)
return false;

return ScriptExecutionContext::postTaskTo(context->identifier(), [protectedThis = Ref { *this }](ScriptExecutionContext&) {
protectedThis->dispatchCloseEventSelf();
});
}
Comment thread
robobun marked this conversation as resolved.

void MessagePort::dispatchCloseEventFromPeer()
{
if (m_isDetached || m_closeEventDispatched || !m_hasCloseEventListener)
return;

// Runs JS (the close handler), which may drop the last external ref.
Ref protectedThis { *this };
// Stop message delivery and make a re-entrant close() a no-op. The close
// branch of hasPendingActivity() (driven by pipe state + the listener
// flag) keeps the wrapper alive across the dispatch below.
m_isDetached = true;

dispatchCloseEvent();

m_pipe->close(m_side);
removeAllEventListeners();
m_hasMessageEventListener = false;
m_hasCloseEventListener = false;

if (m_hasRef) {
m_hasRef = false;
if (auto* context = scriptExecutionContext())
context->unrefEventLoop();
deref();
}
}

TransferredMessagePort MessagePort::disentangle()
Expand Down Expand Up @@ -244,12 +327,23 @@
// atomic loads. The plain bool reads can observe stale values but
// cannot crash — at worst the wrapper is collected one cycle early
// or late, which is the same tolerance as before this refactor.
if (!scriptExecutionContext() || m_isDetached)
if (!scriptExecutionContext())
return false;

uint64_t s = m_pipe->state(m_side);

// Keep the wrapper (and its 'close' listener) alive until a pending close
// event has been dispatched and torn down. A close is pending when this
// side or its peer has closed while a close listener is still registered;
// removeAllEventListeners() clears the flag once the event has fired.
if (m_hasCloseEventListener && ((s & MessagePortPipe::Closed) || !m_pipe->isOtherSideOpen(m_side)))
return true;

Check failure on line 340 in src/jsc/bindings/webcore/MessagePort.cpp

View check run for this annotation

Claude / Claude Code Review

hasPendingActivity() leak: close listener added after close() pins wrapper forever

Adding a `close` listener to an already-closed port pins its JS wrapper for the lifetime of the context. `close()` with no listener leaves `m_closeEventDispatched=false`, then `addEventListener('close', ...)` (which has no `m_isDetached` guard) sets `m_hasCloseEventListener=true`; this branch now returns `true` forever because it runs before the `m_isDetached` check and nothing will ever clear the flag. Either set `m_closeEventDispatched=true` in `close()`'s `!scheduledClose` fallthrough (and ga
Comment thread
robobun marked this conversation as resolved.
Outdated
Comment thread
robobun marked this conversation as resolved.

if (m_isDetached)
return false;
if (!m_hasMessageEventListener)
return false;

uint64_t s = m_pipe->state(m_side);
// Keep alive if there are messages already queued for us, or the peer
// is still open and could send more.
return MessagePortPipe::queuedCount(s) > 0 || m_pipe->isOtherSideOpen(m_side);
Expand Down Expand Up @@ -312,7 +406,9 @@
if (eventType == eventNames().messageEvent) {
start();
m_hasMessageEventListener = true;
} else if (eventType == eventNames().closeEvent) {
m_hasCloseEventListener = true;
}

Check failure on line 411 in src/jsc/bindings/webcore/MessagePort.cpp

View check run for this annotation

Claude / Claude Code Review

Close event never fires on unstarted peer with only a close listener; wrapper leaks

Adding only a `close` listener (no `message` listener, no explicit `start()`) leaves the port un-`Attached` on the pipe, so the new peer-wake block in `MessagePortPipe::close()` skips it and the peer's `close` event never fires — diverging from Node, where `port2.on('close', cb)` alone fires when `port1.close()` is called. Worse, the new `hasPendingActivity()` branch then returns `true` forever (`m_hasCloseEventListener && !isOtherSideOpen`), pinning the wrapper for the lifetime of the context —
Comment thread
robobun marked this conversation as resolved.
return EventTarget::addEventListener(eventType, WTF::move(listener), options);
}

Expand All @@ -321,6 +417,8 @@
auto result = EventTarget::removeEventListener(eventType, listener, options);
if (!hasEventListeners(eventNames().messageEvent))
m_hasMessageEventListener = false;
if (!hasEventListeners(eventNames().closeEvent))
m_hasCloseEventListener = false;
return result;
}

Expand Down
16 changes: 16 additions & 0 deletions src/jsc/bindings/webcore/MessagePort.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ class MessagePort final : public ContextDestructionObserver, public EventTarget,
// Called by the pipe on this port's context thread with one dequeued message.
void dispatchOneMessage(ScriptExecutionContext&, MessageWithMessagePorts&&);

// Called by the pipe drain on this port's context thread once the inbox is
// empty and the entangled peer has closed: fires 'close' (if a listener is
// registered) and tears the port down.
void dispatchCloseEventFromPeer();

// Only here for JSMessagePortCustom's GC optimization; always null.
MessagePort* locallyEntangledPort() { return nullptr; }

Expand Down Expand Up @@ -117,6 +122,15 @@ class MessagePort final : public ContextDestructionObserver, public EventTarget,

void contextDestroyed() final;

// Fires the 'close' event on this port (once). Safe to call after the port
// is detached: it bypasses the detached guard in dispatchEvent().
void dispatchCloseEvent();
// Task body for the closing port's own asynchronous 'close' event.
void dispatchCloseEventSelf();
// Schedules dispatchCloseEventSelf() on this port's context. Returns true
// if a task was posted (in which case listener teardown is deferred to it).
bool scheduleCloseEvent();

bool isEntangled() const { return !m_isDetached; }

// Held for the port's entire lifetime — never nulled — so that the GC
Expand All @@ -128,6 +142,8 @@ class MessagePort final : public ContextDestructionObserver, public EventTarget,
bool m_started { false };
bool m_isDetached { false };
bool m_hasMessageEventListener { false };
bool m_hasCloseEventListener { false };
bool m_closeEventDispatched { false };
bool m_hasRef { false };

uint32_t m_messageEventCount { 0 };
Expand Down
44 changes: 41 additions & 3 deletions src/jsc/bindings/webcore/MessagePortPipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ void MessagePortPipe::drainAndDispatch(uint8_t side, ScriptExecutionContextIdent
auto& s = m_sides[side];

RefPtr<MessagePort> port;
size_t limit;
size_t limit = 0;
// Set when the inbox empties while the port is still attached: the port may
// then owe a 'close' event (dispatched below, outside the lock).
bool emptied = false;
{
Locker locker { s.lock };
// This task was posted to `expectedCtx` (and is running there). If
Expand All @@ -112,9 +115,18 @@ void MessagePortPipe::drainAndDispatch(uint8_t side, ScriptExecutionContextIdent
uint64_t st = s.state.load(std::memory_order_relaxed);
if (!port || s.inbox.isEmpty()) {
s.state.store(st & ~DrainScheduled, std::memory_order_release);
return;
emptied = port && (st & Attached) != 0;
} else {
limit = std::max<size_t>(s.inbox.size(), 1000);
}
limit = std::max<size_t>(s.inbox.size(), 1000);
}

if (!limit) {
// Nothing queued. If our peer has closed, the attached port still owes
// a 'close' event.
if (emptied && !isOtherSideOpen(side))
port->dispatchCloseEventFromPeer();
return;
}

auto* context = port->scriptExecutionContext();
Expand Down Expand Up @@ -142,6 +154,7 @@ void MessagePortPipe::drainAndDispatch(uint8_t side, ScriptExecutionContextIdent
uint64_t st = s.state.load(std::memory_order_relaxed);
if (!(st & Attached) || s.inbox.isEmpty()) {
s.state.store(st & ~DrainScheduled, std::memory_order_release);
emptied = (st & Attached) != 0;
break;
}
if (limit-- == 0) {
Expand All @@ -165,6 +178,10 @@ void MessagePortPipe::drainAndDispatch(uint8_t side, ScriptExecutionContextIdent

if (rescheduleCtx)
scheduleDrain(side, rescheduleCtx);
else if (emptied && !isOtherSideOpen(side))
// Inbox fully drained and the peer has closed: deliver 'close' after
// all queued messages, matching Node's ordering.
port->dispatchCloseEventFromPeer();
Comment thread
robobun marked this conversation as resolved.
Outdated
}

std::optional<MessageWithMessagePorts> MessagePortPipe::takeOne(uint8_t side)
Expand Down Expand Up @@ -241,6 +258,27 @@ void MessagePortPipe::close(uint8_t side)
dropped = std::exchange(s.inbox, {});
}

// Wake the entangled peer (after its queued messages drain) so it can
// dispatch a 'close' event now that this side has closed. Only needed
// when the peer is attached and no drain is already in flight; an
// in-flight drain observes the Closed bit we just stored (set before
// this check) and dispatches close itself. Marked Closed above →
// checked here preserves that ordering.
{
auto& peer = pipe->m_sides[1 - sd];
ScriptExecutionContextIdentifier peerCtx = 0;
{
Locker locker { peer.lock };
uint64_t ps = peer.state.load(std::memory_order_relaxed);
if ((ps & Attached) && !(ps & DrainScheduled)) {
peer.state.store(ps | DrainScheduled, std::memory_order_release);
peerCtx = peer.ctxId;
}
}
if (peerCtx)
pipe->scheduleDrain(1 - sd, peerCtx);
}

// Harvest transferred pipes before `dropped` destructs so their
// ~TransferredMessagePort sees pipe == nullptr and is a no-op.
for (auto& message : dropped) {
Expand Down
110 changes: 110 additions & 0 deletions test/js/web/workers/message-port-pipe.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,116 @@ describe("MessagePort pipe", () => {
});
});

// Closing one end of a MessageChannel must fire a 'close' event on both the
// port that was closed and its entangled peer, asynchronously, after any
// already-queued messages have been delivered. This matches Node's
// node:worker_threads MessagePort and the HTML MessagePort 'close' event.
// https://github.com/oven-sh/bun/issues/32563
describe("MessagePort close event", () => {
async function run(src: string) {
await using proc = Bun.spawn({
cmd: [bunExe(), "-e", src],
env: bunEnv,
stdout: "pipe",
stderr: "pipe",
});
const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]);
return { stdout: stdout.trim(), stderr, exitCode };
}

test.concurrent("fires on the peer after queued messages drain (issue repro)", async () => {
const { stdout, stderr, exitCode } = await run(`
const { MessageChannel } = require("node:worker_threads");
const { port1, port2 } = new MessageChannel();
port2.on("message", (message) => console.log(message));
port2.on("close", () => console.log("closed!"));
port1.postMessage("foobar");
port1.close();
`);
expect(stderr).toBe("");
expect(stdout).toBe("foobar\nclosed!");
expect(exitCode).toBe(0);
});

test.concurrent("fires on both ports; message delivered before close", async () => {
const { stdout, stderr, exitCode } = await run(`
const { MessageChannel } = require("node:worker_threads");
const { port1, port2 } = new MessageChannel();
const log = [];
let n = 0;
const finish = () => { if (++n === 2) console.log(JSON.stringify(log)); };
port1.on("close", () => { log.push("port1 close"); finish(); });
port2.on("message", (m) => log.push("port2 message: " + m));
port2.on("close", () => { log.push("port2 close"); finish(); });
port1.postMessage("foobar");
port1.close();
`);
expect(stderr).toBe("");
expect(exitCode).toBe(0);
const log = JSON.parse(stdout);
// Both ports get 'close'; the queued message drains first on port2.
expect(log).toContain("port1 close");
expect(log).toContain("port2 close");
expect(log.indexOf("port2 message: foobar")).toBeGreaterThanOrEqual(0);
expect(log.indexOf("port2 message: foobar")).toBeLessThan(log.indexOf("port2 close"));
Comment thread
coderabbitai[bot] marked this conversation as resolved.
});

test.concurrent("closing port fires its own close event", async () => {
const { stdout, stderr, exitCode } = await run(`
const { MessageChannel } = require("node:worker_threads");
const { port1, port2 } = new MessageChannel();
port1.on("close", () => console.log("port1 closed"));
port2.on("message", () => {});
port1.close();
`);
expect(stderr).toBe("");
expect(stdout).toBe("port1 closed");
expect(exitCode).toBe(0);
});

test.concurrent("Web API: addEventListener('close') on MessageChannel", async () => {
const { stdout, stderr, exitCode } = await run(`
const { port1, port2 } = new MessageChannel();
port2.addEventListener("message", (e) => console.log("msg:" + e.data));
port2.addEventListener("close", () => console.log("closed"));
port1.postMessage("x");
port1.close();
`);
expect(stderr).toBe("");
expect(stdout).toBe("msg:x\nclosed");
expect(exitCode).toBe(0);
});

test.concurrent("close fires at most once", async () => {
const { stdout, stderr, exitCode } = await run(`
const { MessageChannel } = require("node:worker_threads");
const { port1, port2 } = new MessageChannel();
let count = 0;
port1.on("close", () => count++);
port2.on("message", () => {});
port1.close();
port1.close();
process.on("exit", () => console.log("count=" + count));
`);
expect(stderr).toBe("");
expect(stdout).toBe("count=1");
expect(exitCode).toBe(0);
});

test.concurrent("no close listener: channel closes and process exits cleanly", async () => {
const { stdout, stderr, exitCode } = await run(`
const { MessageChannel } = require("node:worker_threads");
const { port1, port2 } = new MessageChannel();
port2.on("message", (m) => console.log(m));
port1.postMessage("only-message");
port1.close();
`);
expect(stderr).toBe("");
expect(stdout).toBe("only-message");
expect(exitCode).toBe(0);
});
});

// worker.postMessage / parentPort.postMessage go through the same coalesced
// inbox+batch-drain path as MessagePortPipe. Verify the observable ordering
// matches Node: messages arrive in order with a microtask checkpoint between
Expand Down
Loading