Skip to content

Add WebSocket support to airframe-http-netty#4183

Merged
xerial merged 4 commits into
mainfrom
feature/20260617_092240-websocket-netty
Jun 17, 2026
Merged

Add WebSocket support to airframe-http-netty#4183
xerial merged 4 commits into
mainfrom
feature/20260617_092240-websocket-netty

Conversation

@xerial

@xerial xerial commented Jun 17, 2026

Copy link
Copy Markdown
Member

Why

The GPT Realtime API support in LLM Proxy needs a server-side WebSocket. Airframe's airframe-http-netty had no WebSocket support even though Netty supports it natively (io.netty.handler.codec.http.websocketx.* is already on the classpath). This is a general Airframe capability; the LLM Proxy is the first consumer.

Design

WebSocket connections are stateful, long-lived, and callback-driven, so they don't fit Airframe's Request => Rx[Response] route model used by @Endpoint/@RPC/RxRouter. Rather than force them into the RxRouter/Surface reflection path, WebSocket routes are registered by path on the Netty server config, with an optional per-route RxHttpFilter so auth/logging/metrics filters apply to the upgrade handshake.

Netty.server
  .withRouter(RxRouter.of[MyApi])
  .withWebSocketRoute("/ws/realtime") { req =>
    new WebSocketHandler {
      override def onOpen(ctx: WebSocketContext)                       = ctx.send("welcome")
      override def onTextMessage(ctx: WebSocketContext, msg: String)   = ctx.send(s"echo:$msg")
      override def onClose(ctx: WebSocketContext)                      = { /* cleanup */ }
    }
  }
  // optional filter applied to the upgrade handshake (non-2xx rejects the upgrade):
  .withWebSocketRoute("/ws/secure", authFilter) { req => ... }

What's included

  • WebSocketContext / WebSocketHandler in airframe-http (pure Scala, no Netty deps): send(text/binary), close(), and onOpen/onTextMessage/onBinaryMessage/onClose/onError callbacks. A fresh handler is created per connection.
  • Netty integration in airframe-http-netty: upgrade detection in NettyRequestHandler, a filter-aware handshake, and a NettyWebSocketHandler frame bridge + NettyWebSocketContext.
  • Filter chain integration: the upgrade request runs through attachContextFilter + the route's RxHttpFilter; a non-2xx response rejects the upgrade with that HTTP response.
  • withWebSocketMaxFrameSize config knob (default 1MB).

Correctness notes

  • onOpen is delivered synchronously on the event loop right after the handshake, so it always precedes any inbound frame (avoids a frame-before-open race under write backpressure).
  • The retained upgrade request is released exactly once via a handled guard — covers normal upgrade, filter rejection, filter error, and the empty-Rx (no response) case without leaking or double-releasing the ByteBuf.
  • Frame callbacks offload to the handler executor when handlerExecutorThreads is configured, matching HTTP handler behavior so blocking callbacks don't stall the event loop.
  • onClose is delivered exactly once (CAS guard over both the Close frame and channelInactive).

Testing

New WebSocketTest (AirSpec, uses the JDK11 java.net.http WebSocket client — no new dependency):
text echo, binary echo, server push on open, onOpen/onClose lifecycle, filter rejection, empty-filter rejection, WebSocket over a handler executor pool, and a no-regression check that normal @Endpoint routes work alongside WS routes.

  • ./sbt 'netty/testOnly *WebSocketTest' → 8 passed
  • ./sbt 'netty/test' → 125 passed (no regressions)
  • Cross-compiles on Scala 3 and 2.13

Scope

Server-side only. No WS client, no @WebSocket annotation, and no RxRouter node in this PR — documented as possible follow-ups.

🤖 Generated with Claude Code

The GPT Realtime API support in LLM Proxy needs a server-side WebSocket, which
Airframe did not expose even though Netty supports it natively.

WebSocket connections are stateful, long-lived, and callback-driven, so they do
not fit Airframe's Request => Rx[Response] route model. Instead of forcing them
into RxRouter/Surface reflection, WebSocket routes are registered by path on the
Netty server config:

  Netty.server
    .withWebSocketRoute("/ws") { req => new WebSocketHandler { ... } }

The HTTP upgrade request flows through an optional per-route RxHttpFilter so
auth/logging/metrics filters apply to the handshake (a non-2xx response rejects
the upgrade). After a successful handshake the connection is wired to a fresh
WebSocketHandler, with onOpen/onTextMessage/onBinaryMessage/onClose/onError
callbacks and a thread-safe WebSocketContext for sending frames and closing.

- WebSocketContext/WebSocketHandler live in airframe-http (no Netty deps)
- Frame handling, handshake, and context impl live in airframe-http-netty
- onOpen is delivered synchronously on the event loop so it precedes any frame
- frame callbacks offload to handlerExecutorThreads when configured

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@xerial

xerial commented Jun 17, 2026

Copy link
Copy Markdown
Member Author

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces WebSocket support to the Netty-based HTTP server in Airframe, adding new traits and classes (WebSocketRoute, WebSocketContext, WebSocketHandler, and NettyWebSocketHandler) to manage WebSocket lifecycles and frame handling. It updates NettyRequestHandler to intercept upgrade requests, apply filters, and perform handshakes, and adds corresponding configuration options and tests. The review feedback highlights two critical issues: a potential memory leak of the retained ByteBuf if a synchronous exception is thrown during the filter chain execution, and a potential event loop stall if onOpen contains blocking operations, suggesting offloading notifyOpen to the configured handler executor.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +119 to +151
val filtered: Rx[Response] =
wsUpgradeFilter
.andThen(route.filter).andThen { (_: Request) => Rx.single(Http.response(HttpStatus.Ok_200)) }
.apply(req)
RxRunner.run(filtered) {
case OnNext(v) =>
if (handled.compareAndSet(false, true)) {
val resp = v.asInstanceOf[Response]
if (resp.status.isSuccessful) {
doWebSocketHandshake(ctx, msg, req, route)
} else {
// A filter rejected the upgrade (e.g. failed auth); return its response without upgrading
writeResponse(msg, ctx, toNettyResponse(resp))
msg.release()
}
}
case OnError(ex) =>
if (handled.compareAndSet(false, true)) {
writeResponse(
msg,
ctx,
toNettyResponse(RPCStatus.INTERNAL_ERROR_I0.newException(ex.getMessage, ex).toResponse)
)
msg.release()
}
case OnCompletion =>
// The filter chain completed without emitting a response: release the retained request and close the
// connection so the client does not hang waiting for a handshake response that will never come.
if (handled.compareAndSet(false, true)) {
msg.release()
ctx.close()
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If an exception is thrown synchronously during the filter chain construction or when calling RxRunner.run (for example, due to a synchronous failure in a custom filter), the retained msg will never be released, leading to a memory leak of the direct buffer (ByteBuf). Wrapping the execution in a try-catch block ensures that the message is safely released if a synchronous error occurs.

    try {
      val filtered: Rx[Response] =
        wsUpgradeFilter
          .andThen(route.filter).andThen { (_: Request) => Rx.single(Http.response(HttpStatus.Ok_200)) }
          .apply(req)
      RxRunner.run(filtered) {
        case OnNext(v) =>
          if (handled.compareAndSet(false, true)) {
            val resp = v.asInstanceOf[Response]
            if (resp.status.isSuccessful) {
              doWebSocketHandshake(ctx, msg, req, route)
            } else {
              // A filter rejected the upgrade (e.g. failed auth); return its response without upgrading
              writeResponse(msg, ctx, toNettyResponse(resp))
              msg.release()
            }
          }
        case OnError(ex) =>
          if (handled.compareAndSet(false, true)) {
            writeResponse(
              msg,
              ctx,
              toNettyResponse(RPCStatus.INTERNAL_ERROR_I0.newException(ex.getMessage, ex).toResponse)
            )
            msg.release()
          }
        case OnCompletion =>
          // The filter chain completed without emitting a response: release the retained request and close the
          // connection so the client does not hang waiting for a handshake response that will never come.
          if (handled.compareAndSet(false, true)) {
            msg.release()
            ctx.close()
          }
      }
    } catch {
      case NonFatal(ex) =>
        if (handled.compareAndSet(false, true)) {
          msg.release()
        }
        throw ex
    }

Comment on lines +180 to +188
wsHandlerExecutor match {
case Some(executor) => pipeline.addLast(executor, "wsHandler", wsHandler)
case None => pipeline.addLast("wsHandler", wsHandler)
}
pipeline.remove(NettyRequestHandler.this)
// Notify onOpen synchronously on the event loop so it always precedes any inbound frame (the event loop
// will not process the next read until this task returns). Outbound writes from onOpen are still queued
// after the handshake response, so frame ordering on the wire is preserved.
wsHandler.notifyOpen()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When handlerExecutorThreads is configured, the user expects all handler callbacks to run on the offloaded thread pool to avoid blocking the Netty event loop. However, wsHandler.notifyOpen() is currently invoked directly on the event loop thread. If the user's onOpen implementation contains blocking operations, it will stall the event loop.

To fix this while preserving the exact task ordering (ensuring onOpen always runs before any inbound frames are processed), we can submit the notifyOpen task to the specific EventExecutor assigned to the handler context in the pipeline. Netty guarantees that all tasks submitted to a channel's assigned executor are executed sequentially.

          wsHandlerExecutor match {
            case Some(executor) =>
              pipeline.addLast(executor, "wsHandler", wsHandler)
              pipeline.remove(NettyRequestHandler.this)
              val handlerCtx = pipeline.context("wsHandler")
              if (handlerCtx != null) {
                handlerCtx.executor().execute { () => wsHandler.notifyOpen() }
              } else {
                wsHandler.notifyOpen()
              }
            case None =>
              pipeline.addLast("wsHandler", wsHandler)
              pipeline.remove(NettyRequestHandler.this)
              wsHandler.notifyOpen()
          }

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces native WebSocket support to the Netty backend of Airframe HTTP, adding configuration options, upgrade handshake handling, and frame bridging to user-defined handlers. The reviewer provided valuable feedback on three key areas: adding a WebSocketFrameAggregator to correctly handle fragmented frames, refactoring the upgrade filter execution to prevent memory leaks of the request buffer on synchronous exceptions, and dynamically detecting the WebSocket scheme (ws vs wss) via the X-Forwarded-Proto header to support deployments behind TLS-terminating reverse proxies.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +179 to +184
val pipeline = ctx.pipeline()
wsHandlerExecutor match {
case Some(executor) => pipeline.addLast(executor, "wsHandler", wsHandler)
case None => pipeline.addLast("wsHandler", wsHandler)
}
pipeline.remove(NettyRequestHandler.this)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Without a WebSocketFrameAggregator in the pipeline, fragmented WebSocket frames (which are common for larger payloads) will be received as ContinuationWebSocketFrames and silently ignored by NettyWebSocketHandler.

To ensure correct handling of fragmented frames, add a WebSocketFrameAggregator to the pipeline before the wsHandler.

Suggested change
val pipeline = ctx.pipeline()
wsHandlerExecutor match {
case Some(executor) => pipeline.addLast(executor, "wsHandler", wsHandler)
case None => pipeline.addLast("wsHandler", wsHandler)
}
pipeline.remove(NettyRequestHandler.this)
val pipeline = ctx.pipeline()
pipeline.addLast("wsAggregator", new io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator(config.webSocketMaxFrameSize))
wsHandlerExecutor match {
case Some(executor) => pipeline.addLast(executor, "wsHandler", wsHandler)
case None => pipeline.addLast("wsHandler", wsHandler)
}
pipeline.remove(NettyRequestHandler.this)

Comment on lines +113 to +123
// The handshake may run later (async filter and/or event loop hop), so keep the request alive
msg.retain()
// Act on the first terminal outcome only: guards against a filter that emits multiple responses (double
// handshake/release) or completes without emitting one (leaking the retained request)
val handled = new java.util.concurrent.atomic.AtomicBoolean(false)
// Run the upgrade request through the filter chain, terminating in a 200 marker that signals "upgrade allowed"
val filtered: Rx[Response] =
wsUpgradeFilter
.andThen(route.filter).andThen { (_: Request) => Rx.single(Http.response(HttpStatus.Ok_200)) }
.apply(req)
RxRunner.run(filtered) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If wsUpgradeFilter.andThen(route.filter).apply(req) throws a synchronous exception (e.g., due to validation or a null pointer in a custom filter), msg.retain() has already been called, but msg.release() will never be called because the asynchronous RxRunner.run block is never reached. This will cause a memory leak of the FullHttpRequest buffer.

To prevent this, construct the filtered Rx stream first within a try-catch block, and only call msg.retain() right before executing RxRunner.run.

    val filtered: Rx[Response] = try {
      wsUpgradeFilter
        .andThen(route.filter).andThen { (_: Request) => Rx.single(Http.response(HttpStatus.Ok_200)) }
        .apply(req)
    } catch {
      case NonFatal(ex) =>
        writeResponse(
          msg,
          ctx,
          toNettyResponse(RPCStatus.INTERNAL_ERROR_I0.newException(ex.getMessage, ex).toResponse)
        )
        return
    }

    // The handshake may run later (async filter and/or event loop hop), so keep the request alive
    msg.retain()
    // Act on the first terminal outcome only: guards against a filter that emits multiple responses (double
    // handshake/release) or completes without emitting one (leaking the retained request)
    val handled = new java.util.concurrent.atomic.AtomicBoolean(false)
    RxRunner.run(filtered) {

Comment on lines +379 to +382
private[netty] def webSocketLocation(req: HttpRequest): String = {
val host = Option(req.headers().get(HttpHeaderNames.HOST)).getOrElse("localhost")
s"ws://${host}${req.uri()}"
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In production environments, the Netty server is often deployed behind a TLS-terminating reverse proxy or load balancer (e.g., AWS ALB, NGINX, Kubernetes Ingress). In such cases, the client connects via secure WebSockets (wss://), but the proxy forwards the request to Netty over plain HTTP.

If we hardcode the scheme to ws://, the handshake location header might mismatch the client's request scheme, causing handshake failures or security policy violations in some clients. We should detect the scheme dynamically by checking the standard X-Forwarded-Proto header.

  private[netty] def webSocketLocation(req: HttpRequest): String = {
    val host = Option(req.headers().get(HttpHeaderNames.HOST)).getOrElse("localhost")
    val scheme = Option(req.headers().get("X-Forwarded-Proto"))
      .map(_.toLowerCase)
      .map {
        case "https" => "wss"
        case _       => "ws"
      }
      .getOrElse("ws")
    s"${scheme}://${host}${req.uri()}"
  }

- Add WebSocketFrameAggregator so fragmented (continuation) frames are coalesced
  into whole text/binary messages instead of being dropped (matters for large
  payloads such as realtime audio)
- Run onOpen on the frame handler's own executor (the handler executor when
  configured, else the event loop) so it does not block the event loop while
  still preceding any inbound frame via Netty's per-handler task serialization
- Build the upgrade filter chain before retain() so a synchronous filter failure
  cannot leak the FullHttpRequest buffer

Adds a test for fragmented-message aggregation.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@xerial

xerial commented Jun 17, 2026

Copy link
Copy Markdown
Member Author

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces WebSocket support to the airframe-http-netty module, allowing users to register WebSocket routes, apply upgrade filters, and handle text/binary frames. The feedback highlights two important improvements: handling comma-separated values in the Connection header to ensure compatibility with various clients, and detecting SslHandler in the Netty pipeline to correctly support secure WebSocket connections (wss://).

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +389 to +393
private[netty] def isWebSocketUpgrade(msg: HttpRequest): Boolean = {
val headers = msg.headers()
headers.containsValue(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE, true) &&
headers.containsValue(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET, true)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The Connection header can contain multiple comma-separated values (e.g., keep-alive, Upgrade sent by some browsers/clients). Using containsValue on the Connection header with HttpHeaderValues.UPGRADE will fail in those cases because it expects an exact match. We should split the Connection header values by comma and check if any of them matches Upgrade.

  private[netty] def isWebSocketUpgrade(msg: HttpRequest): Boolean = {
    val headers = msg.headers()
    headers.containsValue(HttpHeaderNames.UPGRADE, HttpHeaderValues.WEBSOCKET, true) &&
    headers.getAll(HttpHeaderNames.CONNECTION).asScala.exists { value =>
      value.split(",").exists(_.trim.equalsIgnoreCase(HttpHeaderValues.UPGRADE.toString))
    }
  }

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified against netty 4.2.10: HttpHeaders.containsValue(name, value, ignoreCase=true) delegates to containsCommaSeparatedTrimmed, so Connection: keep-alive, Upgrade already matches each token individually (confirmed it returns true). Keeping containsValue and added a comment to prevent a future exact-match regression.

): Unit = {
ctx.channel().eventLoop().execute { () =>
try {
val location = NettyRequestHandler.webSocketLocation(msg)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Pass ctx to webSocketLocation to allow detecting if the connection is secure (SSL/TLS) and using the wss:// scheme accordingly.

Suggested change
val location = NettyRequestHandler.webSocketLocation(msg)
val location = NettyRequestHandler.webSocketLocation(ctx, msg)

Comment on lines +399 to +402
private[netty] def webSocketLocation(req: HttpRequest): String = {
val host = Option(req.headers().get(HttpHeaderNames.HOST)).getOrElse("localhost")
s"ws://${host}${req.uri()}"
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Update webSocketLocation to accept ChannelHandlerContext and check if the pipeline contains an SslHandler. This ensures that if the server is configured with SSL/TLS, the correct wss:// scheme is used for the WebSocket handshake.

Suggested change
private[netty] def webSocketLocation(req: HttpRequest): String = {
val host = Option(req.headers().get(HttpHeaderNames.HOST)).getOrElse("localhost")
s"ws://${host}${req.uri()}"
}
private[netty] def webSocketLocation(ctx: ChannelHandlerContext, req: HttpRequest): String = {
val isSecure = ctx.pipeline().get(classOf[io.netty.handler.ssl.SslHandler]) != null
val scheme = if (isSecure) "wss" else "ws"
val host = Option(req.headers().get(HttpHeaderNames.HOST)).getOrElse("localhost")
s"${scheme}://${host}${req.uri()}"
}

xerial and others added 2 commits June 17, 2026 10:09
Addresses Gemini review: webSocketLocation now picks wss:// when an SslHandler
is present in the pipeline, ws:// otherwise. Also document that
isWebSocketUpgrade relies on containsValue handling comma-separated Connection
header lists (verified against netty 4.2.10), so it is not "fixed" into an
exact match later.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add a WebSocket section to docs/airframe-http.md covering withWebSocketRoute,
the WebSocketHandler callbacks, WebSocketContext, handshake filters, frame
aggregation, and the max-frame-size knob.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@github-actions github-actions Bot added the doc Documentation task label Jun 17, 2026
@xerial xerial merged commit 938ab3b into main Jun 17, 2026
23 checks passed
@xerial xerial deleted the feature/20260617_092240-websocket-netty branch June 17, 2026 04:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc Documentation task feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant