Skip to content
Open
149 changes: 146 additions & 3 deletions src/jsc/bindings/webcore/MessagePort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "MessagePort.h"

#include "BunClientData.h"
#include "Event.h"
#include "EventNames.h"
#include "MessageEvent.h"
#include "MessagePortPipe.h"
Expand Down Expand Up @@ -112,7 +113,19 @@
// it in hasPendingActivity()); marking our side Closed is sufficient.
m_pipe->close(m_side);

removeAllEventListeners();
// Wake the entangled peer so it can fire its own 'close' event after
// draining any queued messages — but only for a real close() from script.
// During context teardown (contextDestroyed) or GC (~MessagePort calls the
// pipe directly, bypassing this method), the scheduled drain task would
// never run and would leak, so gate it on the context being able to run JS.

Check warning on line 120 in src/jsc/bindings/webcore/MessagePort.cpp

View check run for this annotation

Claude / Claude Code Review

Stale/misleading comments left after 0b4576c3

nit (doc-only): two comments went stale during the 0b4576c3 rework. (1) Here and in the matching `wakePeerForClose()` impl comment: "the scheduled drain task would never run and would leak" is wrong — `wakePeerForClose()` posts to the *peer's* context (`scheduleDrain(1 - side, peerCtx)`), which is alive when this side is being torn down; the actual reason for the `canRunScript()` gate is now the design choice not to fire `close` on the peer for teardown/GC (per resolved #3448777406). (2) In `dis
Comment thread
robobun marked this conversation as resolved.
Outdated
if (canRunScript())
m_pipe->wakePeerForClose(m_side);

// Fire our own 'close' event asynchronously (Node + HTML semantics), then
// tear down listeners. If JS can't run (context teardown) tear down
// synchronously as before. scheduleCloseEvent() takes its own strong ref,
// so it is safe to run before releasing m_hasRef.
bool scheduledClose = scheduleCloseEvent();

// Release the self-reference taken by jsRef() (set when .onmessage is
// assigned or .ref() is called from JS). The JS .close() binding calls
Expand All @@ -127,6 +140,108 @@
context->unrefEventLoop();
deref();
}

if (!scheduledClose) {
// No 'close' task could be posted (context teardown / postTaskTo
// failed), so no 'close' event will ever fire for this port. Mark the
// close consumed and tear down now, so a close listener added after
// close() cannot pin the already-closed wrapper via hasPendingActivity().
m_closeEventDispatched = true;
removeAllEventListeners();
Comment thread
robobun marked this conversation as resolved.
}
}

void MessagePort::startForClose()
{
// Register with the pipe so MessagePortPipe::close() on the peer can
Comment thread
robobun marked this conversation as resolved.
Outdated
// schedule a drain that reaches us and dispatches 'close'. Unlike start(),
// this does not set m_started, so a later 'message' listener still runs
// start() and re-attaches to flush any buffered messages. attach() is
// idempotent, so calling it again from start() is harmless.
if (!isEntangled())
return;
auto* context = scriptExecutionContext();
if (!context)
return;
m_pipe->attach(m_side, context->identifier(), ThreadSafeWeakPtr<MessagePort> { *this });
}

bool MessagePort::canRunScript() const
{
auto* context = scriptExecutionContext();
if (!context || !context->globalObject())
return false;
auto* globalObject = defaultGlobalObject(context->globalObject());
return Zig::GlobalObject::scriptExecutionStatus(globalObject, globalObject) == ScriptExecutionStatus::Running;
}

void MessagePort::dispatchCloseEvent()
{
if (m_closeEventDispatched)
return;
m_closeEventDispatched = true;

if (!canRunScript())
return;

// Bypass MessagePort::dispatchEvent()'s detached guard: by the time the
// close task runs the port is already detached, but the 'close' event must
// still reach its listener.
EventTarget::dispatchEvent(Event::create(eventNames().closeEvent, Event::CanBubble::No, Event::IsCancelable::No));
}

void MessagePort::dispatchCloseEventSelf()
{
dispatchCloseEvent();
removeAllEventListeners();
m_hasMessageEventListener = false;
m_hasCloseEventListener = false;
}

bool MessagePort::scheduleCloseEvent()
{
if (m_closeEventDispatched)
return false;

// Post unconditionally when JS can run, even without a close listener yet:
// Node and the HTML spec queue the close task on close(), so a listener
// added synchronously afterwards (port.close(); port.on('close', cb)) still
// fires. dispatchCloseEventSelf() is a no-op dispatch when no listener
// exists and then tears the port down, so the wrapper still gets collected.
if (!canRunScript())
return false;

auto* context = scriptExecutionContext();
return ScriptExecutionContext::postTaskTo(context->identifier(), [protectedThis = Ref { *this }](ScriptExecutionContext&) {
protectedThis->dispatchCloseEventSelf();
});
}
Comment thread
robobun marked this conversation as resolved.

void MessagePort::dispatchCloseEventFromPeer()
{
if (m_isDetached || m_closeEventDispatched || !m_hasCloseEventListener)
return;

// Runs JS (the close handler), which may drop the last external ref.
Ref protectedThis { *this };
// Stop message delivery and make a re-entrant close() a no-op. The close
// branch of hasPendingActivity() (driven by pipe state + the listener
// flag) keeps the wrapper alive across the dispatch below.
m_isDetached = true;

dispatchCloseEvent();

m_pipe->close(m_side);
removeAllEventListeners();
m_hasMessageEventListener = false;
m_hasCloseEventListener = false;

if (m_hasRef) {
m_hasRef = false;
if (auto* context = scriptExecutionContext())
context->unrefEventLoop();
deref();
}
}

TransferredMessagePort MessagePort::disentangle()
Expand Down Expand Up @@ -244,12 +359,35 @@
// atomic loads. The plain bool reads can observe stale values but
// cannot crash — at worst the wrapper is collected one cycle early
// or late, which is the same tolerance as before this refactor.
if (!scriptExecutionContext() || m_isDetached)
if (!scriptExecutionContext())
return false;

uint64_t s = m_pipe->state(m_side);

// Keep the wrapper (and its 'close' listener) alive until a pending close
// event is dispatched. A close is pending, and will actually fire, when a
// close listener is registered, it has not been dispatched yet, and either:
// - this side has closed: the closing port's own 'close' task (posted by
// close() while JS can run) will fire it, or close() already set
// m_closeEventDispatched when it couldn't post (teardown); or
// - the peer has closed AND a drain is scheduled on this side: that drain
// is the only thing that calls dispatchCloseEventFromPeer(), so the pin
// is tied to its existence. A peer closed via ~MessagePort /
// ~TransferredMessagePort / teardown never wakes us (no drain), so such
// a port is not pinned and stays collectable.
// The !m_closeEventDispatched guard lets an already-closed port be
// collected once its close has fired (or was marked consumed), so a close
// listener added afterwards cannot pin the wrapper forever.
if (m_hasCloseEventListener && !m_closeEventDispatched
&& ((s & MessagePortPipe::Closed)
|| ((s & MessagePortPipe::DrainScheduled) && !m_pipe->isOtherSideOpen(m_side))))
return true;
Comment thread
robobun marked this conversation as resolved.

if (m_isDetached)
return false;
if (!m_hasMessageEventListener)
return false;

uint64_t s = m_pipe->state(m_side);
// Keep alive if there are messages already queued for us, or the peer
// is still open and could send more.
return MessagePortPipe::queuedCount(s) > 0 || m_pipe->isOtherSideOpen(m_side);
Expand Down Expand Up @@ -312,6 +450,9 @@
if (eventType == eventNames().messageEvent) {
start();
m_hasMessageEventListener = true;
} else if (eventType == eventNames().closeEvent) {
startForClose();
m_hasCloseEventListener = true;
}
Comment thread
robobun marked this conversation as resolved.
return EventTarget::addEventListener(eventType, WTF::move(listener), options);
}
Expand All @@ -321,6 +462,8 @@
auto result = EventTarget::removeEventListener(eventType, listener, options);
if (!hasEventListeners(eventNames().messageEvent))
m_hasMessageEventListener = false;
if (!hasEventListeners(eventNames().closeEvent))
m_hasCloseEventListener = false;
return result;
}

Expand Down
25 changes: 25 additions & 0 deletions src/jsc/bindings/webcore/MessagePort.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ class MessagePort final : public ContextDestructionObserver, public EventTarget,
// Called by the pipe on this port's context thread with one dequeued message.
void dispatchOneMessage(ScriptExecutionContext&, MessageWithMessagePorts&&);

// Called by the pipe drain on this port's context thread once the inbox is
// empty and the entangled peer has closed: fires 'close' (if a listener is
// registered) and tears the port down.
void dispatchCloseEventFromPeer();

// Only here for JSMessagePortCustom's GC optimization; always null.
MessagePort* locallyEntangledPort() { return nullptr; }

Expand Down Expand Up @@ -117,6 +122,24 @@ class MessagePort final : public ContextDestructionObserver, public EventTarget,

void contextDestroyed() final;

// Attach to the pipe so the peer can wake this port to dispatch 'close',
// without enabling message delivery (unlike start(), which a 'message'
// listener calls). Idempotent with a later start().
void startForClose();

// Whether this port's context can currently run JS (not mid-teardown).
// Gates scheduling close-event work that would otherwise never run.
bool canRunScript() const;

// Fires the 'close' event on this port (once). Safe to call after the port
// is detached: it bypasses the detached guard in dispatchEvent().
void dispatchCloseEvent();
// Task body for the closing port's own asynchronous 'close' event.
void dispatchCloseEventSelf();
// Schedules dispatchCloseEventSelf() on this port's context. Returns true
// if a task was posted (in which case listener teardown is deferred to it).
bool scheduleCloseEvent();

bool isEntangled() const { return !m_isDetached; }

// Held for the port's entire lifetime — never nulled — so that the GC
Expand All @@ -128,6 +151,8 @@ class MessagePort final : public ContextDestructionObserver, public EventTarget,
bool m_started { false };
bool m_isDetached { false };
bool m_hasMessageEventListener { false };
bool m_hasCloseEventListener { false };
bool m_closeEventDispatched { false };
bool m_hasRef { false };

uint32_t m_messageEventCount { 0 };
Expand Down
67 changes: 63 additions & 4 deletions src/jsc/bindings/webcore/MessagePortPipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ void MessagePortPipe::drainAndDispatch(uint8_t side, ScriptExecutionContextIdent
auto& s = m_sides[side];

RefPtr<MessagePort> port;
size_t limit;
size_t limit = 0;
// Set when the inbox empties while the port is still attached: the port may
// then owe a 'close' event (dispatched below, outside the lock).
bool emptied = false;
{
Locker locker { s.lock };
// This task was posted to `expectedCtx` (and is running there). If
Expand All @@ -112,9 +115,18 @@ void MessagePortPipe::drainAndDispatch(uint8_t side, ScriptExecutionContextIdent
uint64_t st = s.state.load(std::memory_order_relaxed);
if (!port || s.inbox.isEmpty()) {
s.state.store(st & ~DrainScheduled, std::memory_order_release);
return;
emptied = port && (st & Attached) != 0;
} else {
limit = std::max<size_t>(s.inbox.size(), 1000);
}
limit = std::max<size_t>(s.inbox.size(), 1000);
}

if (!limit) {
// Nothing queued. If our peer has closed, the attached port still owes
// a 'close' event.
if (emptied && !isOtherSideOpen(side))
port->dispatchCloseEventFromPeer();
return;
}

auto* context = port->scriptExecutionContext();
Expand All @@ -141,6 +153,20 @@ void MessagePortPipe::drainAndDispatch(uint8_t side, ScriptExecutionContextIdent
break;
uint64_t st = s.state.load(std::memory_order_relaxed);
if (!(st & Attached) || s.inbox.isEmpty()) {
s.state.store(st & ~DrainScheduled, std::memory_order_release);
emptied = (st & Attached) != 0;
break;
}
// A port attached only for a 'close' listener (start() was never
// called) must not have its queued messages dispatched to no one;
// an unstarted port buffers per the HTML spec. Leave them queued —
// adding a 'message' listener (or start()) runs start(), which
// re-attaches and reschedules this drain. Don't set `emptied`: the
// inbox is non-empty, so a pending 'close' waits behind the
// undelivered messages (matching Node). A *started* port with no
// listener still dispatches (to no one) below, as the spec requires,
// which keeps its inbox draining and avoids stranding/pinning it.
if (!port->started()) {
s.state.store(st & ~DrainScheduled, std::memory_order_release);
break;
}
Comment thread
robobun marked this conversation as resolved.
Expand All @@ -165,6 +191,10 @@ void MessagePortPipe::drainAndDispatch(uint8_t side, ScriptExecutionContextIdent

if (rescheduleCtx)
scheduleDrain(side, rescheduleCtx);
else if (emptied && !isOtherSideOpen(side))
// Inbox fully drained and the peer has closed: deliver 'close' after
// all queued messages, matching Node's ordering.
port->dispatchCloseEventFromPeer();
Comment thread
robobun marked this conversation as resolved.
Outdated
}

std::optional<MessageWithMessagePorts> MessagePortPipe::takeOne(uint8_t side)
Expand All @@ -189,7 +219,11 @@ void MessagePortPipe::attach(uint8_t side, ScriptExecutionContextIdentifier ctxI
s.port = WTF::move(port);
uint64_t st = s.state.load(std::memory_order_relaxed);
uint64_t ns = (st | Attached) & ~Closed;
if (queuedCount(st) > 0 && !(st & DrainScheduled)) {
// Schedule a drain if there is work to dispatch: queued messages, or a
// peer that has already closed (a 'close' event is owed). The latter
// covers a 'close' listener added after the peer closed, which would
// otherwise have missed the peer's wake-up in close().
if (!(st & DrainScheduled) && (queuedCount(st) > 0 || !isOtherSideOpen(side))) {
ns |= DrainScheduled;
wakeCtx = ctxId;
}
Expand All @@ -199,6 +233,31 @@ void MessagePortPipe::attach(uint8_t side, ScriptExecutionContextIdentifier ctxI
scheduleDrain(side, wakeCtx);
}

void MessagePortPipe::wakePeerForClose(uint8_t side)
{
ASSERT(side < 2);
// Called by MessagePort::close() (a real close() from script) after this
// side has been marked Closed: schedule a drain on the entangled peer so
// it can dispatch its 'close' event after its queued messages drain. Must
// NOT be called from ~MessagePort / context teardown — the scheduled task
// would never run (the event loop is gone) and would leak.
//
// If a drain is already in flight on the peer it will observe this side's
// Closed bit and dispatch close itself, so only schedule when none is.
auto& peer = m_sides[1 - side];
ScriptExecutionContextIdentifier peerCtx = 0;
{
Locker locker { peer.lock };
uint64_t ps = peer.state.load(std::memory_order_relaxed);
if ((ps & Attached) && !(ps & DrainScheduled)) {
peer.state.store(ps | DrainScheduled, std::memory_order_release);
peerCtx = peer.ctxId;
}
}
if (peerCtx)
scheduleDrain(1 - side, peerCtx);
}

void MessagePortPipe::detach(uint8_t side)
{
ASSERT(side < 2);
Expand Down
5 changes: 5 additions & 0 deletions src/jsc/bindings/webcore/MessagePortPipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ class MessagePortPipe final : public ThreadSafeRefCounted<MessagePortPipe> {
void detach(uint8_t side);
void close(uint8_t side);

// Schedules the entangled peer to dispatch its 'close' event after this
// side has been marked Closed. Only safe from a live (script-running)
// context, never from a destructor or teardown — see the impl comment.
void wakePeerForClose(uint8_t side);

// Lockless snapshot for the GC visitor / hasPendingActivity.
uint64_t state(uint8_t side) const { return m_sides[side].state.load(std::memory_order_acquire); }
bool isOtherSideOpen(uint8_t side) const { return !(state(1 - side) & Closed); }
Expand Down
Loading
Loading