-
-
Notifications
You must be signed in to change notification settings - Fork 739
Websocket caps #1483
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Websocket caps #1483
Changes from 4 commits
b1b4902
09f1e92
252d1c7
d2897d6
0ae4d09
54a91f5
69e6828
24b95e6
8a42893
42c6dfa
5bd42c1
9d5aba7
369ff93
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,7 +3,9 @@ package server | |
| import ( | ||
| "encoding/json" | ||
| "math/big" | ||
| "net" | ||
| "net/http" | ||
| "net/netip" | ||
| "net/url" | ||
| "os" | ||
| "runtime/debug" | ||
|
|
@@ -29,6 +31,14 @@ const defaultTimeout = 60 * time.Second | |
| const unknownMethodLabel = "unknown" | ||
| const maxWebsocketMessageBytes int64 = 4 * 1024 * 1024 | ||
| const maxWebsocketPendingRequests = 48 | ||
| const maxWebsocketConnectionAttemptsPerIP = 64 | ||
| const maxWebsocketConnectionsPerIP = 128 | ||
| const maxWebsocketEstimateFeeBlocks = 32 | ||
| const maxWebsocketSubscribeAddresses = 1000 | ||
| const maxWebsocketSubscribeAddressesWithNewBlockTxs = 100 | ||
| const websocketConnectionAttemptWindow = time.Minute | ||
| const websocketConnectionLimiterTTL = 10 * time.Minute | ||
| const websocketConnectionLimiterCleanupInterval = time.Minute | ||
| const websocketLogPreviewBytes = 256 | ||
|
|
||
| // allRates is a special "currency" parameter that means all available currencies | ||
|
|
@@ -90,6 +100,19 @@ type WebsocketServer struct { | |
| fiatRatesSubscriptionsLock sync.Mutex | ||
| allowedOrigins map[string]struct{} | ||
| allowedRpcCallTo map[string]struct{} | ||
| websocketLimiter *websocketConnectionLimiter | ||
| } | ||
|
|
||
| type websocketClientLimit struct { | ||
| active int | ||
| attempts []time.Time | ||
| lastSeen time.Time | ||
| } | ||
|
|
||
| type websocketConnectionLimiter struct { | ||
| mux sync.Mutex | ||
| clients map[string]*websocketClientLimit | ||
| lastCleanup time.Time | ||
| } | ||
|
|
||
| // NewWebsocketServer creates new websocket interface to blockbook and returns its handle | ||
|
|
@@ -118,6 +141,7 @@ func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain. | |
| addressSubscriptions: make(map[string]map[*websocketChannel]*addressDetails), | ||
| fiatRatesSubscriptions: make(map[string]map[*websocketChannel]string), | ||
| fiatRatesTokenSubscriptions: make(map[*websocketChannel][]string), | ||
| websocketLimiter: newWebsocketConnectionLimiter(), | ||
| } | ||
| s.upgrader = &websocket.Upgrader{ | ||
| ReadBufferSize: 1024 * 32, | ||
|
|
@@ -191,16 +215,106 @@ func normalizeOrigin(origin string) (string, bool) { | |
| return strings.ToLower(u.Scheme) + "://" + strings.ToLower(u.Host), true | ||
| } | ||
|
|
||
| func newWebsocketConnectionLimiter() *websocketConnectionLimiter { | ||
| return &websocketConnectionLimiter{ | ||
| clients: make(map[string]*websocketClientLimit), | ||
| } | ||
| } | ||
|
|
||
| func (l *websocketConnectionLimiter) accept(ip string, now time.Time) (bool, string) { | ||
| l.mux.Lock() | ||
| defer l.mux.Unlock() | ||
|
|
||
| l.cleanupLocked(now) | ||
| client := l.clients[ip] | ||
| if client == nil { | ||
| client = &websocketClientLimit{} | ||
| l.clients[ip] = client | ||
| } | ||
| client.lastSeen = now | ||
| client.trimAttempts(now) | ||
|
|
||
| if client.active >= maxWebsocketConnectionsPerIP { | ||
| return false, "connection_limit" | ||
| } | ||
| if len(client.attempts) >= maxWebsocketConnectionAttemptsPerIP { | ||
| return false, "connection_attempt_limit" | ||
| } | ||
|
|
||
| client.attempts = append(client.attempts, now) | ||
| client.active++ | ||
| return true, "" | ||
| } | ||
|
|
||
| func (l *websocketConnectionLimiter) release(ip string, now time.Time) { | ||
| l.mux.Lock() | ||
| defer l.mux.Unlock() | ||
|
|
||
| client := l.clients[ip] | ||
| if client == nil { | ||
| return | ||
| } | ||
| if client.active > 0 { | ||
| client.active-- | ||
| } | ||
| client.lastSeen = now | ||
| l.cleanupLocked(now) | ||
| } | ||
|
|
||
| func (l *websocketConnectionLimiter) cleanupLocked(now time.Time) { | ||
| if !l.lastCleanup.IsZero() && now.Sub(l.lastCleanup) < websocketConnectionLimiterCleanupInterval { | ||
| return | ||
| } | ||
| l.lastCleanup = now | ||
| for ip, client := range l.clients { | ||
| client.trimAttempts(now) | ||
| if client.active == 0 && now.Sub(client.lastSeen) > websocketConnectionLimiterTTL { | ||
| delete(l.clients, ip) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (client *websocketClientLimit) trimAttempts(now time.Time) { | ||
| cutoff := now.Add(-websocketConnectionAttemptWindow) | ||
| i := 0 | ||
| for i < len(client.attempts) && client.attempts[i].Before(cutoff) { | ||
| i++ | ||
| } | ||
| if i > 0 { | ||
| copy(client.attempts, client.attempts[i:]) | ||
| client.attempts = client.attempts[:len(client.attempts)-i] | ||
| } | ||
| } | ||
|
|
||
| func getIP(r *http.Request) string { | ||
| ip := r.Header.Get("cf-connecting-ip") | ||
| if ip != "" { | ||
| if ip, ok := parseIP(r.Header.Get("CF-Connecting-IPv6")); ok { | ||
| return ip | ||
| } | ||
| ip = r.Header.Get("X-Real-Ip") | ||
| if ip != "" { | ||
| if ip, ok := parseIP(r.Header.Get("CF-Connecting-IP")); ok { | ||
| return ip | ||
|
||
| } | ||
| return r.RemoteAddr | ||
|
|
||
| host := r.RemoteAddr | ||
| if h, _, err := net.SplitHostPort(r.RemoteAddr); err == nil { | ||
| host = h | ||
| } | ||
| if ip, ok := parseIP(host); ok { | ||
| return ip | ||
| } | ||
|
|
||
| return strings.TrimSpace(r.RemoteAddr) | ||
| } | ||
|
|
||
| func parseIP(value string) (string, bool) { | ||
| value = strings.TrimSpace(value) | ||
| if value == "" { | ||
| return "", false | ||
| } | ||
| ip, err := netip.ParseAddr(value) | ||
| if err != nil { | ||
| return "", false | ||
| } | ||
| return ip.String(), true | ||
| } | ||
|
|
||
| func getWebsocketPayloadPreview(d []byte) string { | ||
|
|
@@ -216,8 +330,22 @@ func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
| http.Error(w, upgradeFailed+ErrorMethodNotAllowed.Error(), http.StatusServiceUnavailable) | ||
| return | ||
| } | ||
| ip := getIP(r) | ||
| limited := false | ||
| if s.websocketLimiter != nil { | ||
| ok, reason := s.websocketLimiter.accept(ip, time.Now()) | ||
| if !ok { | ||
| glog.Warning("Websocket connection rejected, ", ip, ", ", reason) | ||
| http.Error(w, "Too many websocket connections", http.StatusTooManyRequests) | ||
| return | ||
| } | ||
| limited = true | ||
| } | ||
| conn, err := s.upgrader.Upgrade(w, r, nil) | ||
| if err != nil { | ||
| if limited { | ||
| s.websocketLimiter.release(ip, time.Now()) | ||
| } | ||
| http.Error(w, upgradeFailed+err.Error(), http.StatusServiceUnavailable) | ||
| return | ||
| } | ||
|
|
@@ -227,7 +355,7 @@ func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
| conn: conn, | ||
| out: make(chan *WsRes, outChannelSize), | ||
| pendingRequests: make(chan struct{}, maxWebsocketPendingRequests), | ||
| ip: getIP(r), | ||
| ip: ip, | ||
| requestHeader: r.Header, | ||
| alive: true, | ||
| } | ||
|
|
@@ -381,6 +509,9 @@ func (s *WebsocketServer) onDisconnect(c *websocketChannel) { | |
| s.unsubscribeNewTransaction(c) | ||
| s.unsubscribeAddresses(c) | ||
| s.unsubscribeFiatRates(c) | ||
| if s.websocketLimiter != nil { | ||
| s.websocketLimiter.release(c.ip, time.Now()) | ||
| } | ||
| glog.Info("Client disconnected ", c.id, ", ", c.ip) | ||
| s.metrics.WebsocketClients.Dec() | ||
| } | ||
|
|
@@ -689,9 +820,8 @@ func (s *WebsocketServer) getAccountInfo(req *WsAccountInfoReq) (res *api.Addres | |
| TokensToReturn: tokensToReturn, | ||
| Protocols: req.Protocols, | ||
| } | ||
| if req.PageSize == 0 { | ||
| req.PageSize = txsOnPage | ||
| } | ||
| req.Page, req.PageSize = sanitizePagingParams(req.Page, req.PageSize, txsOnPage, txsInAPI) | ||
| req.Gap = validateIntValue(req.Gap, 0, 0, maxGapValue) | ||
| a, err := s.api.GetXpubAddress(req.Descriptor, req.Page, req.PageSize, opt, &filter, req.Gap, strings.ToLower(req.SecondaryCurrency)) | ||
| if err != nil { | ||
| return s.api.GetAddress(req.Descriptor, req.Page, req.PageSize, opt, &filter, strings.ToLower(req.SecondaryCurrency)) | ||
|
|
@@ -792,6 +922,9 @@ func (s *WebsocketServer) estimateFee(params []byte) (interface{}, error) { | |
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if len(r.Blocks) > maxWebsocketEstimateFeeBlocks { | ||
| return nil, api.NewAPIError("blocks max "+strconv.Itoa(maxWebsocketEstimateFeeBlocks), true) | ||
| } | ||
| res := make([]WsEstimateFeeRes, len(r.Blocks)) | ||
| if s.chainParser.GetChainType() == bchain.ChainEthereumType { | ||
| gas, err := s.chain.EthereumTypeEstimateGas(r.Specific) | ||
|
|
@@ -1017,6 +1150,13 @@ func (s *WebsocketServer) unmarshalAddresses(params []byte) ([]string, bool, err | |
| if err != nil { | ||
| return nil, false, api.NewAPIError("Invalid subscribeAddresses params", true) | ||
| } | ||
| limit := maxWebsocketSubscribeAddresses | ||
| if r.NewBlockTxs { | ||
| limit = maxWebsocketSubscribeAddressesWithNewBlockTxs | ||
| } | ||
| if len(r.Addresses) > limit { | ||
| return nil, false, api.NewAPIError("addresses max "+strconv.Itoa(limit), true) | ||
| } | ||
| rv := make([]string, len(r.Addresses)) | ||
| for i, a := range r.Addresses { | ||
| ad, err := s.chainParser.GetAddrDescFromAddress(a) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
websocketConnectionLimiter cleanup only runs on accept()/release() calls. If the server experiences a burst of many unique IPs and then becomes idle (no new connections), stale entries older than websocketConnectionLimiterTTL will never be evicted, so the clients map can retain memory indefinitely. Consider running cleanup on a background ticker, or otherwise ensuring eviction happens without requiring subsequent connection activity.