From b1b4902c252f5ee71a7b34ddcfa7b9ffa4eeb4dd Mon Sep 17 00:00:00 2001 From: pragmaxim Date: Wed, 29 Apr 2026 07:49:49 +0200 Subject: [PATCH 01/13] chore(ws-caps): ip address rate limiting --- server/websocket.go | 140 +++++++++++++++++++++++++++++++++++++-- server/websocket_test.go | 132 ++++++++++++++++++++++++++++++++++++ 2 files changed, 266 insertions(+), 6 deletions(-) diff --git a/server/websocket.go b/server/websocket.go index e7da3b5124..c9c677d5fa 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -3,7 +3,9 @@ package server import ( "encoding/json" "math/big" + "net" "net/http" + "net/netip" "net/url" "os" "runtime/debug" @@ -29,6 +31,11 @@ const defaultTimeout = 60 * time.Second const unknownMethodLabel = "unknown" const maxWebsocketMessageBytes int64 = 4 * 1024 * 1024 const maxWebsocketPendingRequests = 48 +const maxWebsocketConnectionAttemptsPerIP = 64 +const maxWebsocketConnectionsPerIP = 128 +const websocketConnectionAttemptWindow = time.Minute +const websocketConnectionLimiterTTL = 10 * time.Minute +const websocketConnectionLimiterCleanupInterval = time.Minute const websocketLogPreviewBytes = 256 // allRates is a special "currency" parameter that means all available currencies @@ -90,6 +97,19 @@ type WebsocketServer struct { fiatRatesSubscriptionsLock sync.Mutex allowedOrigins map[string]struct{} allowedRpcCallTo map[string]struct{} + websocketLimiter *websocketConnectionLimiter +} + +type websocketClientLimit struct { + active int + attempts []time.Time + lastSeen time.Time +} + +type websocketConnectionLimiter struct { + mux sync.Mutex + clients map[string]*websocketClientLimit + lastCleanup time.Time } // NewWebsocketServer creates new websocket interface to blockbook and returns its handle @@ -118,6 +138,7 @@ func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain. addressSubscriptions: make(map[string]map[*websocketChannel]*addressDetails), fiatRatesSubscriptions: make(map[string]map[*websocketChannel]string), fiatRatesTokenSubscriptions: make(map[*websocketChannel][]string), + websocketLimiter: newWebsocketConnectionLimiter(), } s.upgrader = &websocket.Upgrader{ ReadBufferSize: 1024 * 32, @@ -191,16 +212,106 @@ func normalizeOrigin(origin string) (string, bool) { return strings.ToLower(u.Scheme) + "://" + strings.ToLower(u.Host), true } +func newWebsocketConnectionLimiter() *websocketConnectionLimiter { + return &websocketConnectionLimiter{ + clients: make(map[string]*websocketClientLimit), + } +} + +func (l *websocketConnectionLimiter) accept(ip string, now time.Time) (bool, string) { + l.mux.Lock() + defer l.mux.Unlock() + + l.cleanupLocked(now) + client := l.clients[ip] + if client == nil { + client = &websocketClientLimit{} + l.clients[ip] = client + } + client.lastSeen = now + client.trimAttempts(now) + + if client.active >= maxWebsocketConnectionsPerIP { + return false, "connection_limit" + } + if len(client.attempts) >= maxWebsocketConnectionAttemptsPerIP { + return false, "connection_attempt_limit" + } + + client.attempts = append(client.attempts, now) + client.active++ + return true, "" +} + +func (l *websocketConnectionLimiter) release(ip string, now time.Time) { + l.mux.Lock() + defer l.mux.Unlock() + + client := l.clients[ip] + if client == nil { + return + } + if client.active > 0 { + client.active-- + } + client.lastSeen = now + l.cleanupLocked(now) +} + +func (l *websocketConnectionLimiter) cleanupLocked(now time.Time) { + if !l.lastCleanup.IsZero() && now.Sub(l.lastCleanup) < websocketConnectionLimiterCleanupInterval { + return + } + l.lastCleanup = now + for ip, client := range l.clients { + client.trimAttempts(now) + if client.active == 0 && now.Sub(client.lastSeen) > websocketConnectionLimiterTTL { + delete(l.clients, ip) + } + } +} + +func (client *websocketClientLimit) trimAttempts(now time.Time) { + cutoff := now.Add(-websocketConnectionAttemptWindow) + i := 0 + for i < len(client.attempts) && client.attempts[i].Before(cutoff) { + i++ + } + if i > 0 { + copy(client.attempts, client.attempts[i:]) + client.attempts = client.attempts[:len(client.attempts)-i] + } +} + func getIP(r *http.Request) string { - ip := r.Header.Get("cf-connecting-ip") - if ip != "" { + if ip, ok := parseIP(r.Header.Get("CF-Connecting-IPv6")); ok { return ip } - ip = r.Header.Get("X-Real-Ip") - if ip != "" { + if ip, ok := parseIP(r.Header.Get("CF-Connecting-IP")); ok { return ip } - return r.RemoteAddr + + host := r.RemoteAddr + if h, _, err := net.SplitHostPort(r.RemoteAddr); err == nil { + host = h + } + if ip, ok := parseIP(host); ok { + return ip + } + + return strings.TrimSpace(r.RemoteAddr) +} + +func parseIP(value string) (string, bool) { + value = strings.TrimSpace(value) + if value == "" { + return "", false + } + ip, err := netip.ParseAddr(value) + if err != nil { + return "", false + } + return ip.String(), true } func getWebsocketPayloadPreview(d []byte) string { @@ -216,8 +327,22 @@ func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, upgradeFailed+ErrorMethodNotAllowed.Error(), http.StatusServiceUnavailable) return } + ip := getIP(r) + limited := false + if s.websocketLimiter != nil { + ok, reason := s.websocketLimiter.accept(ip, time.Now()) + if !ok { + glog.Warning("Websocket connection rejected, ", ip, ", ", reason) + http.Error(w, "Too many websocket connections", http.StatusTooManyRequests) + return + } + limited = true + } conn, err := s.upgrader.Upgrade(w, r, nil) if err != nil { + if limited { + s.websocketLimiter.release(ip, time.Now()) + } http.Error(w, upgradeFailed+err.Error(), http.StatusServiceUnavailable) return } @@ -227,7 +352,7 @@ func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { conn: conn, out: make(chan *WsRes, outChannelSize), pendingRequests: make(chan struct{}, maxWebsocketPendingRequests), - ip: getIP(r), + ip: ip, requestHeader: r.Header, alive: true, } @@ -381,6 +506,9 @@ func (s *WebsocketServer) onDisconnect(c *websocketChannel) { s.unsubscribeNewTransaction(c) s.unsubscribeAddresses(c) s.unsubscribeFiatRates(c) + if s.websocketLimiter != nil { + s.websocketLimiter.release(c.ip, time.Now()) + } glog.Info("Client disconnected ", c.id, ", ", c.ip) s.metrics.WebsocketClients.Dec() } diff --git a/server/websocket_test.go b/server/websocket_test.go index 8073f70da5..06b4b53692 100644 --- a/server/websocket_test.go +++ b/server/websocket_test.go @@ -8,6 +8,7 @@ import ( "net/http" "strings" "testing" + "time" "github.com/trezor/blockbook/api" "github.com/trezor/blockbook/bchain" @@ -161,6 +162,137 @@ func TestParseAllowedOrigins(t *testing.T) { } } +func TestGetIP(t *testing.T) { + tests := []struct { + name string + headers map[string]string + remoteAddr string + want string + }{ + { + name: "cloudflare ipv6 is preferred", + headers: map[string]string{ + "CF-Connecting-IPv6": "2001:db8::1", + "CF-Connecting-IP": "192.0.2.10", + }, + remoteAddr: "198.51.100.1:12345", + want: "2001:db8::1", + }, + { + name: "cloudflare ip is canonicalized", + headers: map[string]string{ + "CF-Connecting-IP": " 192.0.2.10 ", + }, + remoteAddr: "198.51.100.1:12345", + want: "192.0.2.10", + }, + { + name: "invalid cloudflare ip falls back to remote address", + headers: map[string]string{ + "CF-Connecting-IP": "not-an-ip", + "X-Real-Ip": "203.0.113.10", + }, + remoteAddr: "198.51.100.1:12345", + want: "198.51.100.1", + }, + { + name: "remote ipv6 address strips port", + remoteAddr: "[2001:db8::2]:443", + want: "2001:db8::2", + }, + { + name: "remote address without port is accepted", + remoteAddr: "198.51.100.2", + want: "198.51.100.2", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &http.Request{ + Header: make(http.Header), + RemoteAddr: tt.remoteAddr, + } + for k, v := range tt.headers { + r.Header.Set(k, v) + } + + got := getIP(r) + if got != tt.want { + t.Fatalf("getIP() = %q, want %q", got, tt.want) + } + }) + } +} + +func TestWebsocketConnectionLimiterConnectionAttempts(t *testing.T) { + limiter := newWebsocketConnectionLimiter() + now := time.Unix(1700000000, 0) + ip := "192.0.2.10" + + for i := 0; i < maxWebsocketConnectionAttemptsPerIP; i++ { + ok, reason := limiter.accept(ip, now) + if !ok { + t.Fatalf("accept(%d) rejected with %q", i, reason) + } + limiter.release(ip, now) + } + + ok, reason := limiter.accept(ip, now) + if ok || reason != "connection_attempt_limit" { + t.Fatalf("accept() = %v, %q, want false, connection_attempt_limit", ok, reason) + } + + ok, reason = limiter.accept(ip, now.Add(websocketConnectionAttemptWindow+time.Second)) + if !ok { + t.Fatalf("accept() after window rejected with %q", reason) + } +} + +func TestWebsocketConnectionLimiterActiveConnections(t *testing.T) { + limiter := newWebsocketConnectionLimiter() + now := time.Unix(1700000000, 0) + ip := "192.0.2.20" + + for i := 0; i < maxWebsocketConnectionsPerIP; i++ { + if i > 0 && i%maxWebsocketConnectionAttemptsPerIP == 0 { + now = now.Add(websocketConnectionAttemptWindow + time.Second) + } + ok, reason := limiter.accept(ip, now) + if !ok { + t.Fatalf("accept(%d) rejected with %q", i, reason) + } + } + + ok, reason := limiter.accept(ip, now) + if ok || reason != "connection_limit" { + t.Fatalf("accept() = %v, %q, want false, connection_limit", ok, reason) + } + + limiter.release(ip, now) + ok, reason = limiter.accept(ip, now.Add(websocketConnectionAttemptWindow+time.Second)) + if !ok { + t.Fatalf("accept() after release rejected with %q", reason) + } +} + +func TestWebsocketConnectionLimiterCleanup(t *testing.T) { + limiter := newWebsocketConnectionLimiter() + now := time.Unix(1700000000, 0) + ip := "192.0.2.30" + + ok, reason := limiter.accept(ip, now) + if !ok { + t.Fatalf("accept() rejected with %q", reason) + } + limiter.release(ip, now) + + _, _ = limiter.accept("192.0.2.31", now.Add(websocketConnectionLimiterTTL+websocketConnectionLimiterCleanupInterval+time.Second)) + if _, ok := limiter.clients[ip]; ok { + t.Fatal("idle client limit entry was not cleaned up") + } +} + func TestSetConfirmedBlockTxMetadataSetsConfirmedFields(t *testing.T) { tx := bchain.Tx{ Confirmations: 0, From 09f1e92bfb87736eec4e9a27460bce37af8a3bb8 Mon Sep 17 00:00:00 2001 From: pragmaxim Date: Wed, 29 Apr 2026 08:28:27 +0200 Subject: [PATCH 02/13] chore(ws-caps): estimateFee.blocks cap --- server/websocket.go | 4 ++++ server/websocket_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/server/websocket.go b/server/websocket.go index c9c677d5fa..8b383bbd7e 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -33,6 +33,7 @@ const maxWebsocketMessageBytes int64 = 4 * 1024 * 1024 const maxWebsocketPendingRequests = 48 const maxWebsocketConnectionAttemptsPerIP = 64 const maxWebsocketConnectionsPerIP = 128 +const maxWebsocketEstimateFeeBlocks = 32 const websocketConnectionAttemptWindow = time.Minute const websocketConnectionLimiterTTL = 10 * time.Minute const websocketConnectionLimiterCleanupInterval = time.Minute @@ -920,6 +921,9 @@ func (s *WebsocketServer) estimateFee(params []byte) (interface{}, error) { if err != nil { return nil, err } + if len(r.Blocks) > maxWebsocketEstimateFeeBlocks { + return nil, api.NewAPIError("blocks max "+strconv.Itoa(maxWebsocketEstimateFeeBlocks), true) + } res := make([]WsEstimateFeeRes, len(r.Blocks)) if s.chainParser.GetChainType() == bchain.ChainEthereumType { gas, err := s.chain.EthereumTypeEstimateGas(r.Specific) diff --git a/server/websocket_test.go b/server/websocket_test.go index 06b4b53692..c3b13fdd05 100644 --- a/server/websocket_test.go +++ b/server/websocket_test.go @@ -4,6 +4,7 @@ package server import ( + "encoding/json" "errors" "net/http" "strings" @@ -293,6 +294,30 @@ func TestWebsocketConnectionLimiterCleanup(t *testing.T) { } } +func TestEstimateFeeRejectsTooManyBlocks(t *testing.T) { + blocks := make([]int, maxWebsocketEstimateFeeBlocks+1) + params, err := json.Marshal(WsEstimateFeeReq{Blocks: blocks}) + if err != nil { + t.Fatal(err) + } + + s := &WebsocketServer{} + _, err = s.estimateFee(params) + if err == nil { + t.Fatal("expected error") + } + apiErr, ok := err.(*api.APIError) + if !ok { + t.Fatalf("expected *api.APIError, got %T", err) + } + if !apiErr.Public { + t.Fatal("expected public api error") + } + if !strings.Contains(apiErr.Error(), "blocks max 32") { + t.Fatalf("unexpected error message %q", apiErr.Error()) + } +} + func TestSetConfirmedBlockTxMetadataSetsConfirmedFields(t *testing.T) { tx := bchain.Tx{ Confirmations: 0, From 252d1c7ec82dd0359d7dcb034dc27a9648cdea07 Mon Sep 17 00:00:00 2001 From: pragmaxim Date: Wed, 29 Apr 2026 08:35:03 +0200 Subject: [PATCH 03/13] chore(ws-caps): subscribeAddresses caps --- server/websocket.go | 9 +++++++ server/websocket_test.go | 51 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/server/websocket.go b/server/websocket.go index 8b383bbd7e..bbc4e1a542 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -34,6 +34,8 @@ const maxWebsocketPendingRequests = 48 const maxWebsocketConnectionAttemptsPerIP = 64 const maxWebsocketConnectionsPerIP = 128 const maxWebsocketEstimateFeeBlocks = 32 +const maxWebsocketSubscribeAddresses = 1000 +const maxWebsocketSubscribeAddressesWithNewBlockTxs = 100 const websocketConnectionAttemptWindow = time.Minute const websocketConnectionLimiterTTL = 10 * time.Minute const websocketConnectionLimiterCleanupInterval = time.Minute @@ -1149,6 +1151,13 @@ func (s *WebsocketServer) unmarshalAddresses(params []byte) ([]string, bool, err if err != nil { return nil, false, api.NewAPIError("Invalid subscribeAddresses params", true) } + limit := maxWebsocketSubscribeAddresses + if r.NewBlockTxs { + limit = maxWebsocketSubscribeAddressesWithNewBlockTxs + } + if len(r.Addresses) > limit { + return nil, false, api.NewAPIError("addresses max "+strconv.Itoa(limit), true) + } rv := make([]string, len(r.Addresses)) for i, a := range r.Addresses { ad, err := s.chainParser.GetAddrDescFromAddress(a) diff --git a/server/websocket_test.go b/server/websocket_test.go index c3b13fdd05..166987f42a 100644 --- a/server/websocket_test.go +++ b/server/websocket_test.go @@ -318,6 +318,57 @@ func TestEstimateFeeRejectsTooManyBlocks(t *testing.T) { } } +func TestUnmarshalAddressesRejectsTooManyAddresses(t *testing.T) { + addresses := make([]string, maxWebsocketSubscribeAddresses+1) + params, err := json.Marshal(WsSubscribeAddressesReq{Addresses: addresses}) + if err != nil { + t.Fatal(err) + } + + s := &WebsocketServer{} + _, _, err = s.unmarshalAddresses(params) + if err == nil { + t.Fatal("expected error") + } + apiErr, ok := err.(*api.APIError) + if !ok { + t.Fatalf("expected *api.APIError, got %T", err) + } + if !apiErr.Public { + t.Fatal("expected public api error") + } + if !strings.Contains(apiErr.Error(), "addresses max 1000") { + t.Fatalf("unexpected error message %q", apiErr.Error()) + } +} + +func TestUnmarshalAddressesRejectsTooManyNewBlockTxAddresses(t *testing.T) { + addresses := make([]string, maxWebsocketSubscribeAddressesWithNewBlockTxs+1) + params, err := json.Marshal(WsSubscribeAddressesReq{ + Addresses: addresses, + NewBlockTxs: true, + }) + if err != nil { + t.Fatal(err) + } + + s := &WebsocketServer{} + _, _, err = s.unmarshalAddresses(params) + if err == nil { + t.Fatal("expected error") + } + apiErr, ok := err.(*api.APIError) + if !ok { + t.Fatalf("expected *api.APIError, got %T", err) + } + if !apiErr.Public { + t.Fatal("expected public api error") + } + if !strings.Contains(apiErr.Error(), "addresses max 100") { + t.Fatalf("unexpected error message %q", apiErr.Error()) + } +} + func TestSetConfirmedBlockTxMetadataSetsConfirmedFields(t *testing.T) { tx := bchain.Tx{ Confirmations: 0, From d2897d6b6a243e7afc9754b37c74f36cc6a06db3 Mon Sep 17 00:00:00 2001 From: pragmaxim Date: Wed, 29 Apr 2026 09:26:42 +0200 Subject: [PATCH 04/13] chore(ws-caps): accountInfo pagination clamping --- server/public_test.go | 28 ++++++++++++++++++++++++++++ server/websocket.go | 5 ++--- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/server/public_test.go b/server/public_test.go index ee6d8c9662..1199981332 100644 --- a/server/public_test.go +++ b/server/public_test.go @@ -2766,6 +2766,11 @@ func Test_sanitizePagingParams(t *testing.T) { {"oversized page size", 1, maxWebsocketBlockPageSize + 1, txsInAPI, maxWebsocketBlockPageSize, 1, maxWebsocketBlockPageSize}, {"negative values", -1, -1, txsInAPI, maxWebsocketBlockPageSize, 0, txsInAPI}, {"safe offset clamp", maxPageNumber, maxPageNumber, maxPageNumber, maxPageNumber, maxSafePagingOffset / maxPageNumber, maxPageNumber}, + // WS getAccountInfo arguments: default 25, cap at txsInAPI. + {"ws getAccountInfo default", 0, 0, txsOnPage, txsInAPI, 0, txsOnPage}, + {"ws getAccountInfo within limit", 1, 100, txsOnPage, txsInAPI, 1, 100}, + {"ws getAccountInfo caps at txsInAPI", 1, txsInAPI + 1, txsOnPage, txsInAPI, 1, txsInAPI}, + {"ws getAccountInfo negative defaults", 0, -5, txsOnPage, txsInAPI, 0, txsOnPage}, } for _, tt := range tests { @@ -2779,3 +2784,26 @@ func Test_sanitizePagingParams(t *testing.T) { }) } } + +func Test_validateIntValue_gapClamp(t *testing.T) { + // Mirrors the WS getAccountInfo gap clamp: validateIntValue(req.Gap, 0, 0, maxGapValue). + tests := []struct { + name string + val int + want int + }{ + {"unset passes through as 0", 0, 0}, + {"suite default 20 passes through", 20, 20}, + {"negative defaults to 0", -1, 0}, + {"caps at maxGapValue", maxGapValue + 1, maxGapValue}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := validateIntValue(tt.val, 0, 0, maxGapValue) + if got != tt.want { + t.Errorf("validateIntValue(%d, 0, 0, %d) = %d, want %d", + tt.val, maxGapValue, got, tt.want) + } + }) + } +} diff --git a/server/websocket.go b/server/websocket.go index bbc4e1a542..c95cceb1a1 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -820,9 +820,8 @@ func (s *WebsocketServer) getAccountInfo(req *WsAccountInfoReq) (res *api.Addres TokensToReturn: tokensToReturn, Protocols: req.Protocols, } - if req.PageSize == 0 { - req.PageSize = txsOnPage - } + req.Page, req.PageSize = sanitizePagingParams(req.Page, req.PageSize, txsOnPage, txsInAPI) + req.Gap = validateIntValue(req.Gap, 0, 0, maxGapValue) a, err := s.api.GetXpubAddress(req.Descriptor, req.Page, req.PageSize, opt, &filter, req.Gap, strings.ToLower(req.SecondaryCurrency)) if err != nil { return s.api.GetAddress(req.Descriptor, req.Page, req.PageSize, opt, &filter, strings.ToLower(req.SecondaryCurrency)) From 0ae4d093dfc94b4c522df08603ae6aa0a0bce736 Mon Sep 17 00:00:00 2001 From: pragmaxim Date: Wed, 29 Apr 2026 09:54:32 +0200 Subject: [PATCH 05/13] chore(ws-caps): trust X-Real-Ip from private networks Honor X-Real-Ip when the TCP peer is on a loopback/RFC1918/ULA/link-local network, i.e. an upstream proxy on the same host or LAN. For direct internet peers the header stays ignored so it can't be used to spoof past the per-IP rate limiter. Auto-detected via netip predicates, no config. --- server/websocket.go | 35 +++++++++++++++++++++++++++++------ server/websocket_test.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 6 deletions(-) diff --git a/server/websocket.go b/server/websocket.go index c95cceb1a1..3c9e6d247b 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -298,23 +298,46 @@ func getIP(r *http.Request) string { if h, _, err := net.SplitHostPort(r.RemoteAddr); err == nil { host = h } - if ip, ok := parseIP(host); ok { - return ip + remote, remoteOK := parseAddr(host) + + // Trust X-Real-Ip only when the TCP peer is on a private/loopback network, + // i.e. an upstream proxy on the same host or LAN. For direct internet + // peers the header is attacker-controlled and would let any client spoof + // their IP past the per-IP rate limiter. + if remoteOK && isTrustedProxy(remote) { + if ip, ok := parseIP(r.Header.Get("X-Real-Ip")); ok { + return ip + } } + if remoteOK { + return remote.String() + } return strings.TrimSpace(r.RemoteAddr) } func parseIP(value string) (string, bool) { + addr, ok := parseAddr(value) + if !ok { + return "", false + } + return addr.String(), true +} + +func parseAddr(value string) (netip.Addr, bool) { value = strings.TrimSpace(value) if value == "" { - return "", false + return netip.Addr{}, false } - ip, err := netip.ParseAddr(value) + addr, err := netip.ParseAddr(value) if err != nil { - return "", false + return netip.Addr{}, false } - return ip.String(), true + return addr, true +} + +func isTrustedProxy(addr netip.Addr) bool { + return addr.IsLoopback() || addr.IsPrivate() || addr.IsLinkLocalUnicast() } func getWebsocketPayloadPreview(d []byte) string { diff --git a/server/websocket_test.go b/server/websocket_test.go index 166987f42a..df17b5ad62 100644 --- a/server/websocket_test.go +++ b/server/websocket_test.go @@ -206,6 +206,38 @@ func TestGetIP(t *testing.T) { remoteAddr: "198.51.100.2", want: "198.51.100.2", }, + { + name: "x-real-ip honored when remote is loopback", + headers: map[string]string{ + "X-Real-Ip": "203.0.113.10", + }, + remoteAddr: "127.0.0.1:54321", + want: "203.0.113.10", + }, + { + name: "x-real-ip honored when remote is private network", + headers: map[string]string{ + "X-Real-Ip": "203.0.113.11", + }, + remoteAddr: "10.0.0.5:54321", + want: "203.0.113.11", + }, + { + name: "x-real-ip ignored when remote is public", + headers: map[string]string{ + "X-Real-Ip": "203.0.113.12", + }, + remoteAddr: "198.51.100.3:54321", + want: "198.51.100.3", + }, + { + name: "invalid x-real-ip from trusted proxy falls back to remote", + headers: map[string]string{ + "X-Real-Ip": "not-an-ip", + }, + remoteAddr: "127.0.0.1:54321", + want: "127.0.0.1", + }, } for _, tt := range tests { From 54a91f5d7e156661406a7571c0b45d0dc8b24d7f Mon Sep 17 00:00:00 2001 From: pragmaxim Date: Wed, 29 Apr 2026 10:31:24 +0200 Subject: [PATCH 06/13] chore(ws-caps): periodically sweep limiter from a goroutine Background ticker that calls limiter.sweep() every cleanupInterval, so TTL-expired idle entries are evicted even when no new connections arrive to drive cleanup. The goroutine is started once in NewWebsocketServer --- server/websocket.go | 24 ++++++++++++++++++++++++ server/websocket_test.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/server/websocket.go b/server/websocket.go index 3c9e6d247b..e8e345aa2e 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -162,6 +162,7 @@ func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain. if s.metrics != nil { s.metrics.WebsocketNewBlockTxsSubscriptions.Set(0) } + go s.websocketLimiter.runPeriodicCleanup(websocketConnectionLimiterCleanupInterval) return s, nil } @@ -265,6 +266,10 @@ func (l *websocketConnectionLimiter) cleanupLocked(now time.Time) { if !l.lastCleanup.IsZero() && now.Sub(l.lastCleanup) < websocketConnectionLimiterCleanupInterval { return } + l.sweepLocked(now) +} + +func (l *websocketConnectionLimiter) sweepLocked(now time.Time) { l.lastCleanup = now for ip, client := range l.clients { client.trimAttempts(now) @@ -274,6 +279,25 @@ func (l *websocketConnectionLimiter) cleanupLocked(now time.Time) { } } +// sweep evicts TTL-expired idle entries unconditionally. Used by the +// background ticker so that idle servers don't retain stale entries. +func (l *websocketConnectionLimiter) sweep(now time.Time) { + l.mux.Lock() + defer l.mux.Unlock() + l.sweepLocked(now) +} + +// runPeriodicCleanup ticks every interval and sweeps the limiter. It does not +// terminate; it is started once per WebsocketServer at construction time and +// runs for the lifetime of the process. +func (l *websocketConnectionLimiter) runPeriodicCleanup(interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for now := range ticker.C { + l.sweep(now) + } +} + func (client *websocketClientLimit) trimAttempts(now time.Time) { cutoff := now.Add(-websocketConnectionAttemptWindow) i := 0 diff --git a/server/websocket_test.go b/server/websocket_test.go index df17b5ad62..d38230a060 100644 --- a/server/websocket_test.go +++ b/server/websocket_test.go @@ -309,6 +309,36 @@ func TestWebsocketConnectionLimiterActiveConnections(t *testing.T) { } } +func TestWebsocketConnectionLimiterSweepEvictsIdleEntries(t *testing.T) { + limiter := newWebsocketConnectionLimiter() + now := time.Unix(1700000000, 0) + idle := "192.0.2.40" + active := "192.0.2.41" + + if ok, reason := limiter.accept(idle, now); !ok { + t.Fatalf("accept(idle) rejected with %q", reason) + } + limiter.release(idle, now) + if ok, reason := limiter.accept(active, now); !ok { + t.Fatalf("accept(active) rejected with %q", reason) + } + + // sweep() is what the periodic-cleanup goroutine calls; verify it evicts + // TTL-expired idle entries while keeping entries with active connections. + limiter.sweep(now.Add(websocketConnectionLimiterTTL + time.Second)) + + limiter.mux.Lock() + _, idleStillTracked := limiter.clients[idle] + _, activeStillTracked := limiter.clients[active] + limiter.mux.Unlock() + if idleStillTracked { + t.Fatal("idle TTL-expired entry was not evicted by sweep") + } + if !activeStillTracked { + t.Fatal("entry with active connection was evicted by sweep") + } +} + func TestWebsocketConnectionLimiterCleanup(t *testing.T) { limiter := newWebsocketConnectionLimiter() now := time.Unix(1700000000, 0) From 69e68281e7aa033435da28a3f92b90fb4d543aa2 Mon Sep 17 00:00:00 2001 From: pragmaxim Date: Wed, 29 Apr 2026 15:31:14 +0200 Subject: [PATCH 07/13] chore(ws-caps): account-history paging cap --- server/public.go | 15 ++++++++++++--- server/public_test.go | 33 +++++++++++++++++++++++++++++---- server/websocket.go | 2 +- 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/server/public.go b/server/public.go index b150987a03..74da6f12ac 100644 --- a/server/public.go +++ b/server/public.go @@ -36,6 +36,7 @@ const maxWebsocketBlockPageSize = 10000 const maxPageNumber = 1000000 const maxGapValue = 10000 const maxSafePagingOffset = 1000000000 +const maxAccountHistoryPagingOffset = 100000 const maxSendTxBodyBytes int64 = 8 * 1024 * 1024 const secondaryCoinCookieName = "secondary_coin" @@ -908,13 +909,21 @@ func validateIntParam(value string, defaultValue int, min int, max int) int { } func sanitizePagingParams(page, pageSize, defaultPageSize, maxPageSize int) (int, int) { + return sanitizePagingParamsWithMaxOffset(page, pageSize, defaultPageSize, maxPageSize, maxSafePagingOffset) +} + +func sanitizeAccountPagingParams(page, pageSize, defaultPageSize, maxPageSize int) (int, int) { + return sanitizePagingParamsWithMaxOffset(page, pageSize, defaultPageSize, maxPageSize, maxAccountHistoryPagingOffset) +} + +func sanitizePagingParamsWithMaxOffset(page, pageSize, defaultPageSize, maxPageSize, maxPagingOffset int) (int, int) { page = validateIntValue(page, 0, 0, maxPageNumber) pageSize = validateIntValue(pageSize, defaultPageSize, 0, maxPageSize) if pageSize == 0 { pageSize = defaultPageSize } - if page > 0 && pageSize > 0 && page > maxSafePagingOffset/pageSize { - page = maxSafePagingOffset / pageSize + if page > 0 && pageSize > 0 && page > maxPagingOffset/pageSize { + page = maxPagingOffset / pageSize } return page, pageSize } @@ -923,7 +932,7 @@ func (s *PublicServer) getAddressQueryParams(r *http.Request, accountDetails api var voutFilter = api.AddressFilterVoutOff page := validateIntParam(r.URL.Query().Get("page"), 0, 0, maxPageNumber) pageSize := validateIntParam(r.URL.Query().Get("pageSize"), maxPageSize, 0, maxPageSize) - page, pageSize = sanitizePagingParams(page, pageSize, maxPageSize, maxPageSize) + page, pageSize = sanitizeAccountPagingParams(page, pageSize, maxPageSize, maxPageSize) from := validateIntParam(r.URL.Query().Get("from"), 0, 0, 10000000000) to := validateIntParam(r.URL.Query().Get("to"), 0, 0, 10000000000) diff --git a/server/public_test.go b/server/public_test.go index 1199981332..007220ac74 100644 --- a/server/public_test.go +++ b/server/public_test.go @@ -2766,18 +2766,43 @@ func Test_sanitizePagingParams(t *testing.T) { {"oversized page size", 1, maxWebsocketBlockPageSize + 1, txsInAPI, maxWebsocketBlockPageSize, 1, maxWebsocketBlockPageSize}, {"negative values", -1, -1, txsInAPI, maxWebsocketBlockPageSize, 0, txsInAPI}, {"safe offset clamp", maxPageNumber, maxPageNumber, maxPageNumber, maxPageNumber, maxSafePagingOffset / maxPageNumber, maxPageNumber}, - // WS getAccountInfo arguments: default 25, cap at txsInAPI. + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + page, pageSize := sanitizePagingParams(tt.page, tt.pageSize, tt.defaultPageSize, tt.maxPageSize) + if page != tt.wantPage || pageSize != tt.wantPageSize { + t.Errorf("sanitizePagingParams(%d, %d, %d, %d) = (%d, %d), want (%d, %d)", + tt.page, tt.pageSize, tt.defaultPageSize, tt.maxPageSize, + page, pageSize, tt.wantPage, tt.wantPageSize) + } + }) + } +} + +func Test_sanitizeAccountPagingParams(t *testing.T) { + tests := []struct { + name string + page int + pageSize int + defaultPageSize int + maxPageSize int + wantPage int + wantPageSize int + }{ {"ws getAccountInfo default", 0, 0, txsOnPage, txsInAPI, 0, txsOnPage}, {"ws getAccountInfo within limit", 1, 100, txsOnPage, txsInAPI, 1, 100}, - {"ws getAccountInfo caps at txsInAPI", 1, txsInAPI + 1, txsOnPage, txsInAPI, 1, txsInAPI}, + {"ws getAccountInfo caps page size at txsInAPI", 1, txsInAPI + 1, txsOnPage, txsInAPI, 1, txsInAPI}, {"ws getAccountInfo negative defaults", 0, -5, txsOnPage, txsInAPI, 0, txsOnPage}, + {"api address caps history offset", maxPageNumber, txsInAPI, txsInAPI, txsInAPI, maxAccountHistoryPagingOffset / txsInAPI, txsInAPI}, + {"explorer address caps history offset", maxPageNumber, txsOnPage, txsOnPage, txsOnPage, maxAccountHistoryPagingOffset / txsOnPage, txsOnPage}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - page, pageSize := sanitizePagingParams(tt.page, tt.pageSize, tt.defaultPageSize, tt.maxPageSize) + page, pageSize := sanitizeAccountPagingParams(tt.page, tt.pageSize, tt.defaultPageSize, tt.maxPageSize) if page != tt.wantPage || pageSize != tt.wantPageSize { - t.Errorf("sanitizePagingParams(%d, %d, %d, %d) = (%d, %d), want (%d, %d)", + t.Errorf("sanitizeAccountPagingParams(%d, %d, %d, %d) = (%d, %d), want (%d, %d)", tt.page, tt.pageSize, tt.defaultPageSize, tt.maxPageSize, page, pageSize, tt.wantPage, tt.wantPageSize) } diff --git a/server/websocket.go b/server/websocket.go index e8e345aa2e..cf0d390507 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -867,7 +867,7 @@ func (s *WebsocketServer) getAccountInfo(req *WsAccountInfoReq) (res *api.Addres TokensToReturn: tokensToReturn, Protocols: req.Protocols, } - req.Page, req.PageSize = sanitizePagingParams(req.Page, req.PageSize, txsOnPage, txsInAPI) + req.Page, req.PageSize = sanitizeAccountPagingParams(req.Page, req.PageSize, txsOnPage, txsInAPI) req.Gap = validateIntValue(req.Gap, 0, 0, maxGapValue) a, err := s.api.GetXpubAddress(req.Descriptor, req.Page, req.PageSize, opt, &filter, req.Gap, strings.ToLower(req.SecondaryCurrency)) if err != nil { From 24b95e66d8a16da2812272d81ff38736a8134c2e Mon Sep 17 00:00:00 2001 From: pragmaxim Date: Wed, 29 Apr 2026 15:53:46 +0200 Subject: [PATCH 08/13] chore(ws-caps): configurable trusted proxy CIDR allowlist Add _WS_TRUSTED_PROXIES env var to extend X-Real-Ip trust beyond loopback/RFC1918 for non-Cloudflare deployments. Fails startup on /0 or otherwise overly broad prefixes (< /8 IPv4, < /16 IPv6) so misconfig can't silently turn the header into a spoofing primitive. --- server/websocket.go | 85 ++++++++++++++++++++++++++++++------- server/websocket_test.go | 91 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 161 insertions(+), 15 deletions(-) diff --git a/server/websocket.go b/server/websocket.go index cf0d390507..b9bb9ff09d 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -2,6 +2,7 @@ package server import ( "encoding/json" + "fmt" "math/big" "net" "net/http" @@ -100,6 +101,7 @@ type WebsocketServer struct { fiatRatesSubscriptionsLock sync.Mutex allowedOrigins map[string]struct{} allowedRpcCallTo map[string]struct{} + trustedProxyPrefixes []netip.Prefix websocketLimiter *websocketConnectionLimiter } @@ -159,6 +161,15 @@ func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain. } glog.Info("Support of rpcCall for these contracts: ", envRpcCall) } + trustedEnvName := strings.ToUpper(is.GetNetwork()) + "_WS_TRUSTED_PROXIES" + prefixes, err := parseTrustedProxies(trustedEnvName, os.Getenv(trustedEnvName)) + if err != nil { + return nil, err + } + s.trustedProxyPrefixes = prefixes + if len(prefixes) > 0 { + glog.Info("Trusted proxy CIDRs: ", prefixes) + } if s.metrics != nil { s.metrics.WebsocketNewBlockTxsSubscriptions.Set(0) } @@ -192,6 +203,42 @@ func parseAllowedOrigins(originEnvName, envAllowedOrigins string) map[string]str return allowedOrigins } +// parseTrustedProxies parses a comma-separated list of CIDRs that augment the +// loopback/RFC1918/link-local defaults for trusting X-Real-Ip. Any prefix +// broad enough to cover meaningful chunks of the public internet is rejected +// with an error so misconfiguration fails fast at startup rather than +// silently turning X-Real-Ip into an IP-spoofing primitive. +func parseTrustedProxies(envName, value string) ([]netip.Prefix, error) { + if strings.TrimSpace(value) == "" { + return nil, nil + } + const minIPv4Bits = 8 + const minIPv6Bits = 16 + var prefixes []netip.Prefix + for _, raw := range strings.Split(value, ",") { + raw = strings.TrimSpace(raw) + if raw == "" { + continue + } + p, err := netip.ParsePrefix(raw) + if err != nil { + return nil, fmt.Errorf("%s: invalid CIDR %q: %w", envName, raw, err) + } + if p.Addr().Is4In6() { + return nil, fmt.Errorf("%s: refusing IPv4-mapped CIDR %q; use IPv4 CIDR notation", envName, raw) + } + bits := p.Bits() + if p.Addr().Is4() && bits < minIPv4Bits { + return nil, fmt.Errorf("%s: refusing CIDR %q: prefix /%d is too broad (minimum /%d for IPv4)", envName, raw, bits, minIPv4Bits) + } + if p.Addr().Is6() && !p.Addr().Is4In6() && bits < minIPv6Bits { + return nil, fmt.Errorf("%s: refusing CIDR %q: prefix /%d is too broad (minimum /%d for IPv6)", envName, raw, bits, minIPv6Bits) + } + prefixes = append(prefixes, p.Masked()) + } + return prefixes, nil +} + func (s *WebsocketServer) checkOrigin(r *http.Request) bool { origin := r.Header.Get("Origin") if origin == "" { @@ -310,12 +357,14 @@ func (client *websocketClientLimit) trimAttempts(now time.Time) { } } -func getIP(r *http.Request) string { - if ip, ok := parseIP(r.Header.Get("CF-Connecting-IPv6")); ok { - return ip - } - if ip, ok := parseIP(r.Header.Get("CF-Connecting-IP")); ok { - return ip +func getIP(r *http.Request, trustedProxies []netip.Prefix) string { + if len(trustedProxies) == 0 { + if ip, ok := parseIP(r.Header.Get("CF-Connecting-IPv6")); ok { + return ip + } + if ip, ok := parseIP(r.Header.Get("CF-Connecting-IP")); ok { + return ip + } } host := r.RemoteAddr @@ -324,11 +373,11 @@ func getIP(r *http.Request) string { } remote, remoteOK := parseAddr(host) - // Trust X-Real-Ip only when the TCP peer is on a private/loopback network, - // i.e. an upstream proxy on the same host or LAN. For direct internet - // peers the header is attacker-controlled and would let any client spoof - // their IP past the per-IP rate limiter. - if remoteOK && isTrustedProxy(remote) { + // Trust X-Real-Ip only when the TCP peer is on a private/loopback network + // (an upstream proxy on the same host or LAN) or in a configured trusted + // CIDR. For direct internet peers the header is attacker-controlled and + // would let any client spoof their IP past the per-IP rate limiter. + if remoteOK && isTrustedProxy(remote, trustedProxies) { if ip, ok := parseIP(r.Header.Get("X-Real-Ip")); ok { return ip } @@ -360,8 +409,16 @@ func parseAddr(value string) (netip.Addr, bool) { return addr, true } -func isTrustedProxy(addr netip.Addr) bool { - return addr.IsLoopback() || addr.IsPrivate() || addr.IsLinkLocalUnicast() +func isTrustedProxy(addr netip.Addr, extras []netip.Prefix) bool { + if addr.IsLoopback() || addr.IsPrivate() || addr.IsLinkLocalUnicast() { + return true + } + for _, p := range extras { + if p.Contains(addr) { + return true + } + } + return false } func getWebsocketPayloadPreview(d []byte) string { @@ -377,7 +434,7 @@ func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, upgradeFailed+ErrorMethodNotAllowed.Error(), http.StatusServiceUnavailable) return } - ip := getIP(r) + ip := getIP(r, s.trustedProxyPrefixes) limited := false if s.websocketLimiter != nil { ok, reason := s.websocketLimiter.accept(ip, time.Now()) diff --git a/server/websocket_test.go b/server/websocket_test.go index d38230a060..22993c7e0a 100644 --- a/server/websocket_test.go +++ b/server/websocket_test.go @@ -7,6 +7,7 @@ import ( "encoding/json" "errors" "net/http" + "net/netip" "strings" "testing" "time" @@ -163,11 +164,62 @@ func TestParseAllowedOrigins(t *testing.T) { } } +func TestParseTrustedProxies(t *testing.T) { + tests := []struct { + name string + value string + want []string + wantErr bool + errSubstr string + }{ + {name: "empty value yields nil", value: "", want: nil}, + {name: "whitespace only yields nil", value: " , , ", want: nil}, + {name: "single ipv4 cidr", value: "203.0.113.0/24", want: []string{"203.0.113.0/24"}}, + {name: "multiple cidrs with spaces", value: " 203.0.113.0/24 , 2001:db8::/32 ", want: []string{"203.0.113.0/24", "2001:db8::/32"}}, + {name: "single host as /32 is fine", value: "10.0.0.5/32", want: []string{"10.0.0.5/32"}}, + {name: "rejects 0.0.0.0/0", value: "0.0.0.0/0", wantErr: true, errSubstr: "too broad"}, + {name: "rejects ::/0", value: "::/0", wantErr: true, errSubstr: "too broad"}, + {name: "rejects ipv4 broader than /8", value: "10.0.0.0/4", wantErr: true, errSubstr: "too broad"}, + {name: "rejects ipv6 broader than /16", value: "2000::/8", wantErr: true, errSubstr: "too broad"}, + {name: "rejects broad ipv4-mapped cidr", value: "::ffff:0.0.0.0/0", wantErr: true, errSubstr: "IPv4-mapped"}, + {name: "rejects specific ipv4-mapped cidr", value: "::ffff:192.0.2.0/120", wantErr: true, errSubstr: "IPv4-mapped"}, + {name: "rejects malformed cidr", value: "not-a-cidr", wantErr: true, errSubstr: "invalid CIDR"}, + {name: "rejects bare ip without prefix", value: "10.0.0.5", wantErr: true, errSubstr: "invalid CIDR"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseTrustedProxies("TEST_ENV", tt.value) + if tt.wantErr { + if err == nil { + t.Fatalf("parseTrustedProxies(%q) = nil err, want error containing %q", tt.value, tt.errSubstr) + } + if !strings.Contains(err.Error(), tt.errSubstr) { + t.Fatalf("parseTrustedProxies(%q) err = %q, want substring %q", tt.value, err.Error(), tt.errSubstr) + } + return + } + if err != nil { + t.Fatalf("parseTrustedProxies(%q) unexpected error: %v", tt.value, err) + } + if len(got) != len(tt.want) { + t.Fatalf("parseTrustedProxies(%q) = %v, want %v", tt.value, got, tt.want) + } + for i, p := range got { + if p.String() != tt.want[i] { + t.Errorf("parseTrustedProxies(%q)[%d] = %q, want %q", tt.value, i, p.String(), tt.want[i]) + } + } + }) + } +} + func TestGetIP(t *testing.T) { tests := []struct { name string headers map[string]string remoteAddr string + trusted []netip.Prefix want string }{ { @@ -238,6 +290,43 @@ func TestGetIP(t *testing.T) { remoteAddr: "127.0.0.1:54321", want: "127.0.0.1", }, + { + name: "x-real-ip honored when remote matches configured public CIDR", + headers: map[string]string{ + "X-Real-Ip": "203.0.113.50", + }, + remoteAddr: "198.51.100.5:54321", + trusted: []netip.Prefix{netip.MustParsePrefix("198.51.100.0/24")}, + want: "203.0.113.50", + }, + { + name: "custom trusted proxy ignores spoofed cloudflare header", + headers: map[string]string{ + "CF-Connecting-IP": "192.0.2.99", + "X-Real-Ip": "203.0.113.52", + }, + remoteAddr: "198.51.100.5:54321", + trusted: []netip.Prefix{netip.MustParsePrefix("198.51.100.0/24")}, + want: "203.0.113.52", + }, + { + name: "custom trusted proxy ignores cloudflare header without x-real-ip", + headers: map[string]string{ + "CF-Connecting-IP": "192.0.2.100", + }, + remoteAddr: "198.51.100.5:54321", + trusted: []netip.Prefix{netip.MustParsePrefix("198.51.100.0/24")}, + want: "198.51.100.5", + }, + { + name: "x-real-ip ignored for public remote outside configured CIDRs", + headers: map[string]string{ + "X-Real-Ip": "203.0.113.51", + }, + remoteAddr: "198.51.100.6:54321", + trusted: []netip.Prefix{netip.MustParsePrefix("203.0.113.0/24")}, + want: "198.51.100.6", + }, } for _, tt := range tests { @@ -250,7 +339,7 @@ func TestGetIP(t *testing.T) { r.Header.Set(k, v) } - got := getIP(r) + got := getIP(r, tt.trusted) if got != tt.want { t.Fatalf("getIP() = %q, want %q", got, tt.want) } From 8a4289377d734aca10430ecfd83f8463c97bb99d Mon Sep 17 00:00:00 2001 From: pragmaxim Date: Wed, 29 Apr 2026 15:57:24 +0200 Subject: [PATCH 09/13] chore(ws-caps): trusted proxy docs --- docs/env.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/docs/env.md b/docs/env.md index f4e4af4ed1..87d0343bb1 100644 --- a/docs/env.md +++ b/docs/env.md @@ -6,6 +6,17 @@ Some behavior of Blockbook can be modified by environment variables. The variabl - `_WS_ALLOWED_ORIGINS` - Comma-separated list of allowed WebSocket origins (e.g. `https://example.com`, `http://localhost:3000`). If omitted, all origins are allowed and it is the operator's responsibility to enforce origin access (for example via proxy). +- `_WS_TRUSTED_PROXIES` - Comma-separated list of trusted proxy CIDRs whose `X-Real-Ip` header should be used as the WebSocket client IP. This IP is used by per-IP WebSocket connection and connection-attempt limits. + Blockbook always trusts `X-Real-Ip` from loopback, RFC1918/private, and link-local peers, so this variable is only needed for additional non-local proxies. + + If this variable is unset, Blockbook keeps the default Cloudflare behavior and uses `CF-Connecting-IPv6` first, then `CF-Connecting-IP`, when either header contains a valid IP address. This is intended for deployments where the origin only accepts traffic from Cloudflare IP ranges, for example enforced by nginx or a firewall. Blockbook does not validate the TCP peer against Cloudflare ranges itself. + + If this variable is set, Blockbook switches to generic trusted-proxy mode: `CF-Connecting-IP` and `CF-Connecting-IPv6` are ignored, and `X-Real-Ip` is used only when the TCP peer is a built-in trusted proxy or matches one of the configured CIDRs. In this mode the proxy must overwrite or strip any client-supplied `X-Real-Ip` header before forwarding requests to Blockbook. + + Do not set this variable for a normal Cloudflare-only deployment unless the proxy in front of Blockbook sets `X-Real-Ip` to the real visitor IP. Otherwise all clients may collapse to the proxy or Cloudflare address for rate limiting. + + To avoid unsafe configuration, Blockbook fails startup if a configured prefix is too broad (`/<8` for IPv4, `/<16` for IPv6), malformed, or uses IPv4-mapped IPv6 notation. Use regular IPv4 CIDR notation instead, for example `198.51.100.0/24` rather than `::ffff:198.51.100.0/120`. + - `_STAKING_POOL_CONTRACT` - The pool name and contract used for Ethereum staking. The format of the variable is `/`. If missing, staking support is disabled. - `COINGECKO_API_KEY`, `_COINGECKO_API_KEY`, or `_COINGECKO_API_KEY` - API key for making requests to CoinGecko in the paid tier. From 42c6dfac9515f3cd837a5afc61559d7f59aba2d2 Mon Sep 17 00:00:00 2001 From: pragmaxim Date: Wed, 29 Apr 2026 16:24:54 +0200 Subject: [PATCH 10/13] chore(ws-caps): drain WS goroutines before RocksDB close Wrap the four DB-touching go ... spawn sites with a sync.WaitGroup gate and add WebsocketServer.Shutdown(ctx) that flips a shuttingDown flag, closes all registered channels, and waits for in-flight goroutines to drain. PublicServer.Shutdown now drives it after http.Server.Shutdown, so a long getAccountInfo can no longer race rocksdb_close in cgo and SIGSEGV on graceful restart. --- server/public.go | 12 +++- server/websocket.go | 120 ++++++++++++++++++++++++++++++++++++++- server/websocket_test.go | 79 ++++++++++++++++++++++++++ 3 files changed, 207 insertions(+), 4 deletions(-) diff --git a/server/public.go b/server/public.go index 74da6f12ac..6ca8a23df8 100644 --- a/server/public.go +++ b/server/public.go @@ -251,10 +251,18 @@ func (s *PublicServer) Close() error { return s.https.Close() } -// Shutdown shuts down the server +// Shutdown shuts down the server. http.Server.Shutdown does not drain +// hijacked WebSocket connections, so after the HTTP listener stops we also +// drain the WebSocket server's in-flight DB-touching goroutines; otherwise a +// long getAccountInfo can race rocksdb_close in cgo and SIGSEGV the process. func (s *PublicServer) Shutdown(ctx context.Context) error { glog.Infof("public server: shutdown") - return s.https.Shutdown(ctx) + httpErr := s.https.Shutdown(ctx) + wsErr := s.websocket.Shutdown(ctx) + if httpErr != nil { + return httpErr + } + return wsErr } // OnNewBlock notifies users subscribed to bitcoind/hashblock about new block diff --git a/server/websocket.go b/server/websocket.go index b9bb9ff09d..75f6109ac1 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -1,6 +1,7 @@ package server import ( + "context" "encoding/json" "fmt" "math/big" @@ -103,6 +104,12 @@ type WebsocketServer struct { allowedRpcCallTo map[string]struct{} trustedProxyPrefixes []netip.Prefix websocketLimiter *websocketConnectionLimiter + // Shutdown coordination: protects shuttingDown + activeChannels and gates + // trackWork so RocksDB cannot be closed while a WS goroutine is mid-read. + shutdownMu sync.Mutex + shuttingDown bool + activeChannels map[*websocketChannel]struct{} + requestWg sync.WaitGroup } type websocketClientLimit struct { @@ -144,6 +151,7 @@ func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain. fiatRatesSubscriptions: make(map[string]map[*websocketChannel]string), fiatRatesTokenSubscriptions: make(map[*websocketChannel][]string), websocketLimiter: newWebsocketConnectionLimiter(), + activeChannels: make(map[*websocketChannel]struct{}), } s.upgrader = &websocket.Upgrader{ ReadBufferSize: 1024 * 32, @@ -434,6 +442,13 @@ func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, upgradeFailed+ErrorMethodNotAllowed.Error(), http.StatusServiceUnavailable) return } + s.shutdownMu.Lock() + shuttingDown := s.shuttingDown + s.shutdownMu.Unlock() + if shuttingDown { + http.Error(w, "Server shutting down", http.StatusServiceUnavailable) + return + } ip := getIP(r, s.trustedProxyPrefixes) limited := false if s.websocketLimiter != nil { @@ -466,6 +481,13 @@ func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { if s.is.WsGetAccountInfoLimit > 0 { c.getAddressInfoDescriptors = make(map[string]struct{}) } + if !s.registerChannel(c) { + conn.Close() + if limited { + s.websocketLimiter.release(ip, time.Now()) + } + return + } go s.inputLoop(c) go s.outputLoop(c) s.onConnect(c) @@ -476,6 +498,79 @@ func (s *WebsocketServer) GetHandler() http.Handler { return s } +// registerChannel adds channel to activeChannels unless the server is shutting +// down. Returns false on shutdown so the caller can close the connection. +func (s *WebsocketServer) registerChannel(c *websocketChannel) bool { + s.shutdownMu.Lock() + defer s.shutdownMu.Unlock() + if s.shuttingDown { + return false + } + s.activeChannels[c] = struct{}{} + return true +} + +func (s *WebsocketServer) unregisterChannel(c *websocketChannel) { + s.shutdownMu.Lock() + defer s.shutdownMu.Unlock() + delete(s.activeChannels, c) +} + +// trackWork increments requestWg unless the server is shutting down. Callers +// that get true must invoke workDone exactly once when the goroutine they +// spawn returns. Used to gate goroutines that touch the DB/chain/api so that +// Shutdown can wait for them to drain before RocksDB is closed. +func (s *WebsocketServer) trackWork() bool { + s.shutdownMu.Lock() + defer s.shutdownMu.Unlock() + if s.shuttingDown { + return false + } + s.requestWg.Add(1) + return true +} + +func (s *WebsocketServer) workDone() { + s.requestWg.Done() +} + +// Shutdown initiates graceful WebSocket server shutdown: it refuses new +// connections, closes existing ones, and blocks until in-flight DB-touching +// goroutines finish or ctx is canceled. This must run before RocksDB is +// closed; otherwise a long-running getAccountInfo can race rocksdb_close in +// cgo and SIGSEGV the process. +func (s *WebsocketServer) Shutdown(ctx context.Context) error { + s.shutdownMu.Lock() + if s.shuttingDown { + s.shutdownMu.Unlock() + return nil + } + s.shuttingDown = true + chans := make([]*websocketChannel, 0, len(s.activeChannels)) + for c := range s.activeChannels { + chans = append(chans, c) + } + s.shutdownMu.Unlock() + + for _, c := range chans { + s.closeChannel(c, "server_shutdown") + } + + done := make(chan struct{}) + go func() { + s.requestWg.Wait() + close(done) + }() + select { + case <-done: + glog.Info("websocket: shutdown complete, all in-flight requests drained") + return nil + case <-ctx.Done(): + glog.Warning("websocket: shutdown timed out waiting for in-flight requests; proceeding anyway") + return ctx.Err() + } +} + func (s *WebsocketServer) closeChannel(c *websocketChannel, reason string) bool { if closed, closeReason := c.CloseOut(reason); closed { if s.metrics != nil { @@ -566,7 +661,13 @@ func (s *WebsocketServer) inputLoop(c *websocketChannel) { s.closeChannel(c, "pending_requests_limit") return } + if !s.trackWork() { + c.releaseRequestSlot() + s.closeChannel(c, "server_shutdown") + return + } go func(req WsReq) { + defer s.workDone() defer c.releaseRequestSlot() s.onRequest(c, &req) }(req) @@ -616,6 +717,7 @@ func (s *WebsocketServer) onDisconnect(c *websocketChannel) { if s.websocketLimiter != nil { s.websocketLimiter.release(c.ip, time.Now()) } + s.unregisterChannel(c) glog.Info("Client disconnected ", c.id, ", ", c.ip) s.metrics.WebsocketClients.Dec() } @@ -1518,9 +1620,13 @@ func (s *WebsocketServer) publishNewBlockTxsByAddr(block *bchain.Block) { observeNewBlockTxDuration(s.metrics, "match", matchStart) if len(subscribed) > 0 { incNewBlockTxMetric(s.metrics, "matched", "success", 1) + if !s.trackWork() { + return + } // Convert and publish asynchronously so heavy tx conversion does not // block processing of other transactions in the same block. go func(tx bchain.Tx, subscribed map[string]struct{}) { + defer s.workDone() if chainType == bchain.ChainEthereumType { receiptStatus := setEthereumReceiptIfAvailable(&tx, s.chain.EthereumTypeGetTransactionReceipt) if s.metrics != nil { @@ -1552,7 +1658,12 @@ func (s *WebsocketServer) OnNewBlock(block *bchain.Block) { go s.onNewBlockAsync(block.Hash, block.Height) if s.newBlockTxsSubscriptionCount > 0 { // Skip per-tx address matching when nobody opted into newBlockTxs. - go s.publishNewBlockTxsByAddr(block) + if s.trackWork() { + go func() { + defer s.workDone() + s.publishNewBlockTxsByAddr(block) + }() + } } } @@ -1677,7 +1788,12 @@ func (s *WebsocketServer) onNewTxAsync(tx *bchain.MempoolTx, subscribed map[stri func (s *WebsocketServer) OnNewTx(tx *bchain.MempoolTx) { subscribed := s.getNewTxSubscriptions(tx.Vin, tx.Vout, tx.TokenTransfers, nil) if len(s.newTransactionSubscriptions) > 0 || len(subscribed) > 0 { - go s.onNewTxAsync(tx, subscribed) + if s.trackWork() { + go func() { + defer s.workDone() + s.onNewTxAsync(tx, subscribed) + }() + } } } diff --git a/server/websocket_test.go b/server/websocket_test.go index 22993c7e0a..a3433a6dbe 100644 --- a/server/websocket_test.go +++ b/server/websocket_test.go @@ -4,6 +4,7 @@ package server import ( + "context" "encoding/json" "errors" "net/http" @@ -736,3 +737,81 @@ func TestPopulateBitcoinVinAddrDescsEnablesSenderOnlyMatching(t *testing.T) { t.Fatal("sender subscription did not match after vin descriptor resolution") } } + +func newShutdownTestServer() *WebsocketServer { + return &WebsocketServer{activeChannels: make(map[*websocketChannel]struct{})} +} + +func TestWebsocketShutdownWaitsForInFlightWork(t *testing.T) { + s := newShutdownTestServer() + if !s.trackWork() { + t.Fatal("trackWork() returned false before shutdown") + } + + finished := make(chan struct{}) + go func() { + // Simulate a DB-touching goroutine that takes some time. + time.Sleep(50 * time.Millisecond) + s.workDone() + close(finished) + }() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + start := time.Now() + if err := s.Shutdown(ctx); err != nil { + t.Fatalf("Shutdown() = %v, want nil", err) + } + elapsed := time.Since(start) + if elapsed < 50*time.Millisecond { + t.Fatalf("Shutdown returned in %v, expected to wait for in-flight work (~50ms)", elapsed) + } + select { + case <-finished: + default: + t.Fatal("Shutdown returned before tracked goroutine finished") + } +} + +func TestWebsocketShutdownTimesOutOnStuckWork(t *testing.T) { + s := newShutdownTestServer() + if !s.trackWork() { + t.Fatal("trackWork() returned false before shutdown") + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Millisecond) + defer cancel() + if err := s.Shutdown(ctx); err == nil { + t.Fatal("Shutdown() = nil, want context deadline error") + } + // Release after the timeout so the test goroutine doesn't leak. + s.workDone() +} + +func TestWebsocketShutdownRefusesNewWork(t *testing.T) { + s := newShutdownTestServer() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if err := s.Shutdown(ctx); err != nil { + t.Fatalf("Shutdown() = %v, want nil", err) + } + if s.trackWork() { + t.Fatal("trackWork() returned true after shutdown") + } + dummy := &websocketChannel{} + if s.registerChannel(dummy) { + t.Fatal("registerChannel() returned true after shutdown") + } +} + +func TestWebsocketShutdownIsIdempotent(t *testing.T) { + s := newShutdownTestServer() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if err := s.Shutdown(ctx); err != nil { + t.Fatalf("first Shutdown() = %v, want nil", err) + } + if err := s.Shutdown(ctx); err != nil { + t.Fatalf("second Shutdown() = %v, want nil", err) + } +} From 5bd42c174f96b8ab27afe489dc42e995c1444b3d Mon Sep 17 00:00:00 2001 From: pragmaxim Date: Wed, 29 Apr 2026 22:29:49 +0200 Subject: [PATCH 11/13] chore(ws-caps): RocksDB must not close until tracked WS work is done --- server/websocket.go | 4 +++- server/websocket_test.go | 20 +++++++++++++++++--- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/server/websocket.go b/server/websocket.go index 75f6109ac1..61b708e338 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -566,7 +566,9 @@ func (s *WebsocketServer) Shutdown(ctx context.Context) error { glog.Info("websocket: shutdown complete, all in-flight requests drained") return nil case <-ctx.Done(): - glog.Warning("websocket: shutdown timed out waiting for in-flight requests; proceeding anyway") + glog.Warning("websocket: shutdown timed out waiting for in-flight requests; waiting to avoid RocksDB close race") + <-done + glog.Info("websocket: shutdown complete after timeout") return ctx.Err() } } diff --git a/server/websocket_test.go b/server/websocket_test.go index a3433a6dbe..d7c4a9be18 100644 --- a/server/websocket_test.go +++ b/server/websocket_test.go @@ -781,11 +781,25 @@ func TestWebsocketShutdownTimesOutOnStuckWork(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Millisecond) defer cancel() - if err := s.Shutdown(ctx); err == nil { - t.Fatal("Shutdown() = nil, want context deadline error") + start := time.Now() + finished := make(chan error) + go func() { + finished <- s.Shutdown(ctx) + }() + + time.Sleep(60 * time.Millisecond) + select { + case err := <-finished: + t.Fatalf("Shutdown returned before tracked work finished: %v", err) + default: } - // Release after the timeout so the test goroutine doesn't leak. s.workDone() + if err := <-finished; err == nil { + t.Fatal("Shutdown() = nil, want context deadline error") + } + if elapsed := time.Since(start); elapsed < 60*time.Millisecond { + t.Fatalf("Shutdown returned in %v, expected to wait for tracked work after timeout", elapsed) + } } func TestWebsocketShutdownRefusesNewWork(t *testing.T) { From 9d5aba745671bd8d30f9d4270f10ba709b5b4173 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 30 Apr 2026 04:57:42 +0000 Subject: [PATCH 12/13] Strip IPv6 zone from parseAddr and add link-local zone test Agent-Logs-Url: https://github.com/trezor/blockbook/sessions/0d3d9125-200e-45e5-ae57-a54919829a60 Co-authored-by: pragmaxim <8983344+pragmaxim@users.noreply.github.com> --- server/websocket.go | 4 +++- server/websocket_test.go | 8 ++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/server/websocket.go b/server/websocket.go index 61b708e338..4c6932b311 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -414,7 +414,9 @@ func parseAddr(value string) (netip.Addr, bool) { if err != nil { return netip.Addr{}, false } - return addr, true + // Strip IPv6 zone identifier so that rate-limit keys are zone-free and + // netip.Prefix.Contains matches unzoned prefixes against link-local peers. + return addr.WithZone(""), true } func isTrustedProxy(addr netip.Addr, extras []netip.Prefix) bool { diff --git a/server/websocket_test.go b/server/websocket_test.go index d7c4a9be18..3e4ea5a452 100644 --- a/server/websocket_test.go +++ b/server/websocket_test.go @@ -328,6 +328,14 @@ func TestGetIP(t *testing.T) { trusted: []netip.Prefix{netip.MustParsePrefix("203.0.113.0/24")}, want: "198.51.100.6", }, + { + name: "link-local ipv6 peer with zone is trusted and zone is stripped from key", + headers: map[string]string{ + "X-Real-Ip": "203.0.113.60", + }, + remoteAddr: "[fe80::1%eth0]:12345", + want: "203.0.113.60", + }, } for _, tt := range tests { From 369ff93fb58b0ac9993568b9c505e0fe132bc516 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 30 Apr 2026 05:00:25 +0000 Subject: [PATCH 13/13] Fix test name casing and add zone-stripping verification test Agent-Logs-Url: https://github.com/trezor/blockbook/sessions/0d3d9125-200e-45e5-ae57-a54919829a60 Co-authored-by: pragmaxim <8983344+pragmaxim@users.noreply.github.com> --- server/websocket_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/server/websocket_test.go b/server/websocket_test.go index 3e4ea5a452..2fd440008d 100644 --- a/server/websocket_test.go +++ b/server/websocket_test.go @@ -329,13 +329,18 @@ func TestGetIP(t *testing.T) { want: "198.51.100.6", }, { - name: "link-local ipv6 peer with zone is trusted and zone is stripped from key", + name: "link-local IPv6 peer with zone is trusted and zone is stripped from key", headers: map[string]string{ "X-Real-Ip": "203.0.113.60", }, remoteAddr: "[fe80::1%eth0]:12345", want: "203.0.113.60", }, + { + name: "link-local IPv6 zone identifier is stripped from returned address", + remoteAddr: "[fe80::1%eth0]:12345", + want: "fe80::1", + }, } for _, tt := range tests {