-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.knot
More file actions
925 lines (848 loc) · 46.3 KB
/
Copy pathserver.knot
File metadata and controls
925 lines (848 loc) · 46.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
-- Skrepka Relay Server (maximally simplified)
-- Single-server implementation of the Skrepka protocol (v0.1).
-- Every collection is a flat list; rate limiting is delegated to the
-- compiler's per-endpoint `rateLimit` clauses on each route.
-- Token comparison is plain `==` (timing-attack vulnerable), retry backoff is linear,
-- and many helpers are inlined. Performance suffers; clarity is the only goal.
-- ============================================
-- Configuration
-- ============================================
-- Top-level constants: bind address, federation identity, capacity ceilings,
-- per-endpoint rate-limit budgets, and expiry/backoff/TTL durations.
-- Values written here are defaults; every scalar (Int/Float/Text/Bool/Maybe ...)
-- top-level constant can be overridden at startup via `--name=value` on the
-- command line (see Knot's `knot_override_lookup`). Run with `--help` for the
-- full list.
-- This server's canonical federation identity. Used as the `server` field in
-- presence/gossip events and for self-checks (`isLocalServer`). Not a bind
-- address — the listener accepts connections on
-- every interface regardless of this value. May also differ from the URL
-- clients use to reach the server (e.g. behind a reverse proxy or alternate
-- DNS name); peers must resolve this name to reach us for federation.
--
-- Must be unique per deployment. Every operator participating in federation
-- has to recompile with their own public hostname here — peers reject events
-- whose origin fails `isBadServerName` (which blocks the default "localhost"
-- and other private/reserved names), so two default-built servers cannot
-- federate with each other. The default is only suitable for a single-node,
-- non-federated deployment.
serverHost : ServerName
serverHost = "localhost"
-- Network interface the HTTP listener binds to. "0.0.0.0" accepts connections
-- on every IPv4 interface (the typical default behind a reverse proxy);
-- "127.0.0.1" restricts the listener to localhost-only; "::" / "::1" do the
-- IPv6 equivalents. Override with `--bindHost=...` at startup. Unrelated to
-- `serverHost` above — this is the kernel-level bind address; that is the
-- federation identity peers see.
bindHost : Text
bindHost = "0.0.0.0"
-- TCP port for incoming HTTP connections.
listenPort : Int
listenPort = 8080
-- URL scheme used when contacting peer federation servers.
federationScheme : Text
federationScheme = "https://"
-- Storage retention for messages and dedup-id records. Mailbox rows live this
-- long after `receivedAt`; expired rows are pruned by `pruneExpired` (30 days).
messageMaxAge : Int<Ms>
messageMaxAge = 2592000000
-- Max length of a hex-encoded message body / opaque blob (40 MiB hex ≈ 20 MiB bytes).
maxBlobLen : Int
maxBlobLen = 41943040
-- Cap on items in a batched send request.
maxBatchSize : Int
maxBatchSize = 100
-- Cap on gossip events accepted or emitted in a single federation exchange.
maxGossipEvents : Int
maxGossipEvents = 100
-- Cap on events returned by one long-poll response.
maxPollEvents : Int
maxPollEvents = 50
-- Per-peer ceiling on outstanding forward attempts.
maxForwardsPerServer : Int
maxForwardsPerServer = 2000
-- Max accepted length of a server hostname.
maxServerNameLen : Int
maxServerNameLen = 128
-- Ceiling on outstanding auth challenges held in memory.
maxChallenges : Int
maxChallenges = 10000
-- Ceiling on concurrent active sessions.
maxSessions : Int
maxSessions = 10000
-- Per-pubkey ceiling on outstanding challenges (prevents one key from exhausting the pool).
maxChallengesPerKey : Int
maxChallengesPerKey = 5
-- Per-pubkey ceiling on concurrent sessions.
maxSessionsPerKey : Int
maxSessionsPerKey = 10
-- Validity window for an auth challenge before it must be re-issued (1 min).
challengeExpiry : Int<Ms>
challengeExpiry = 60000
-- How long a session token remains valid after issue (1 hour).
sessionExpiry : Int<Ms>
sessionExpiry = 3600000
-- How long a received "online" presence gossip is considered fresh (90 min).
-- Receivers anchor the TTL to their own wall clock at receive time, so this
-- value isn't carried on the wire.
onlineGossipTtl : Int<Ms>
onlineGossipTtl = 5400000
-- Times to retry a federation forward before giving up.
maxForwardRetries : Int
maxForwardRetries = 10
-- Base delay for federation retry backoff (linear, not exponential — 30 s).
retryBackoffBase : Int<Ms>
retryBackoffBase = 30000
-- Cap on federation retry backoff (8 min).
retryBackoffMax : Int<Ms>
retryBackoffMax = 480000
-- How long a peer's failure record survives past its most recent failure (24 h).
-- After this, accumulated backoff resets — the next attempt is treated as fresh.
forwardFailureMaxAge : Int<Ms>
forwardFailureMaxAge = 86400000
-- Cadence of the background prune/cleanup task (30 s).
backgroundPruneInterval : Int<Ms>
backgroundPruneInterval = 30000
-- Long-poll hold timeout before returning empty to the client (25 s).
pollTimeoutMs : Int<Ms>
pollTimeoutMs = 25000
-- Rolling window for every per-endpoint rate-limit bucket (1 min). All routes
-- share this window; only the per-window request budget varies per endpoint.
rateLimitWindow : Int<Ms>
rateLimitWindow = 60000
-- Per-pubkey budget for /auth/challenge and /auth/verify per `rateLimitWindow`.
-- The normal handshake is one challenge + one verify, so a healthy flow uses 2.
authRateLimit : Int
authRateLimit = 10
-- Per-session (bearer token) budget for /poll per `rateLimitWindow`. Clients
-- reconnect after `pollTimeoutMs`, so steady state is ~3/min.
pollRateLimit : Int
pollRateLimit = 60
-- Per-session budget for /messages per `rateLimitWindow`. Each call may carry
-- up to `maxBatchSize` ciphertexts, so the effective message ceiling is higher.
sendRateLimit : Int
sendRateLimit = 60
-- Per-peer-IP budget for /federation/gossip per `rateLimitWindow`.
gossipRateLimit : Int
gossipRateLimit = 60
-- Per-peer-IP budget for /federation/forward per `rateLimitWindow`. Higher than
-- gossip because peers fan out one call per buffered ciphertext.
forwardRateLimit : Int
forwardRateLimit = 600
-- Master switch for federation; when False the server runs in single-node mode.
federationEnabled : Bool
federationEnabled = True {}
federationUrl : Text -> Text
federationUrl = \server -> federationScheme ++ server
-- ============================================
-- Types
-- ============================================
-- Wire/state shapes plus refinement aliases for hex/server-name validation.
-- Hex types are bounded length + lowercase-hex; server names are lowercased
-- and stripped of trailing dots. The SSRF deny-list (`isBadServerName`) is
-- applied at federation egress, not in the type predicate.
-- One event in a /poll response: the opaque cipher blob addressed to the
-- polling pubkey. The server cannot read the blob.
type PollEvent = {encryptedBlob: BlobHex}
-- One presence change exchanged between federated servers. Per-variant
-- payloads keep room for future event kinds that aren't tied to a pubkey.
-- `GossipUnknown` is the parser's fallback for forward-compatibility.
data GossipEventType
= GossipOnline {pubkey: PubkeyHex}
| GossipOffline {pubkey: PubkeyHex}
| GossipUnknown {}
impl ToJSON GossipEventType where
toJson et = case et of
GossipOnline {pubkey} -> toJson {eventType: "online", pubkey}
GossipOffline {pubkey} -> toJson {eventType: "offline", pubkey}
GossipUnknown {} -> toJson {eventType: "unknown"}
impl FromJSON GossipEventType where
parseJson t = (\raw -> if raw.eventType == "online" then GossipOnline {raw.pubkey} else if raw.eventType == "offline" then GossipOffline {raw.pubkey} else GossipUnknown {}) (parseJson t : {eventType: Text, pubkey: PubkeyHex})
-- /messages is all-or-nothing per call: either every message in the batch is
-- enqueued (delivered, federated, or buffered for federation), or none are
-- and the call returns a non-2xx HTTP status. The per-message rejection code
-- "self_send" surfaces in the HTTP error body. Per-session rate limiting is
-- enforced by the route's `rateLimit` clause, which short-circuits with 429
-- before the handler runs.
-- The sender's UI treats every success the same and learns "they got it"
-- only via the recipient's E2E ack.
-- True when `s` is non-empty lowercase hex of even length, with `maxLen` ceiling.
isValidBoundedHex : Int -> Text -> Bool
isValidBoundedHex = \maxLen s -> count (chars s) <= maxLen && isValidHex s
-- True when `s` is lowercase hex of exactly `exactLen` characters (even by definition).
isValidExactHex : Int -> Text -> Bool
isValidExactHex = \exactLen s -> count (chars s) == exactLen && isValidHex s
-- Hex-encoded Ed25519 public key (32 bytes → 64 chars exact).
type PubkeyHex = Text where isValidExactHex 64
-- Hex-encoded ciphertext or opaque blob; capped at `maxBlobLen` (~20 MiB).
type BlobHex = Text where isValidBoundedHex maxBlobLen
-- Hex-encoded Ed25519 signature (64 bytes → 128 chars exact).
type SignatureHex = Text where isValidExactHex 128
-- Normalized hostname (lowercased, no trailing dot, length-bounded). The SSRF
-- deny-list (`isBadServerName`) is enforced separately at federation egress
-- (`handleRecvGossip`); keeping it out of the type predicate lets the default
-- `serverHost = "localhost"` flow through presence rows in a non-federated
-- single-node deployment.
isValidServerName : Text -> Bool
isValidServerName = \s -> count (chars s) <= maxServerNameLen && s == toLower (stripTrailingDot s)
type ServerName = Text where isValidServerName
-- Wall-clock time in milliseconds since epoch.
type Timestamp = Int<Ms>
-- Outstanding nonce issued by /auth/challenge; consumed by /auth/verify.
type Challenge = {pubkey: PubkeyHex, challenge: Text, expiresAt: Timestamp, ip: Text}
-- An issued bearer token bound to (pubkey, ip, expiry).
type Session = {token: Text, pubkey: PubkeyHex, expiresAt: Timestamp, ip: Text}
-- Mailbox row: a stored message awaiting recipient pickup or federation forward.
-- Storage TTL is fixed at `messageMaxAge`, so expiry is derived from
-- `receivedAt`. Any send time the sender wants to convey lives inside
-- `encryptedBlob` (the server cannot read it). `deliveredTo` records peers
-- that have already accepted a federated forward of this blob, so a later
-- presence re-announce of `toKey` doesn't trigger a duplicate delivery.
type Message = {toKey: PubkeyHex, encryptedBlob: BlobHex, receivedAt: Timestamp, deliveredTo: [ServerName]}
-- Last-known location of a pubkey, either local (`server == serverHost`) or
-- learned from a peer via gossip. Multiple entries per pubkey are allowed.
type PresenceEntry = {pubkey: PubkeyHex, server: ServerName, expiresAt: Timestamp}
-- A message queued for cross-server delivery to `toServer`. `retries` is
-- incremented by `retryPendingForwards` until `maxForwardRetries` is hit.
-- Identity is derived from `encryptedBlob` (see `Message`).
type ForwardEntry = {toServer: ServerName, toKey: PubkeyHex, encryptedBlob: BlobHex, retries: Int}
-- Running failure counter for a peer; keyed by `server`. Drives `shouldRetryServer`.
type ForwardFailure = {server: Text, failedAt: Timestamp, failures: Int}
-- ============================================
-- Persisted State (all flat lists, no sharding)
-- ============================================
-- Mutable refs (`*x`) holding the entire server state in memory. Every list
-- scan is O(n); scaling is the first thing the production version replaces.
-- Grouped here: auth (challenges/sessions), mailbox (messages/forwards),
-- presence, and bookkeeping counters. Per-endpoint rate-limit buckets live
-- in the runtime's hidden `_knot_rate_limits` SQLite table and don't appear
-- here.
-- The Knot runtime persists every `*ref` declared here through restarts (see
-- the project memory note about SQLite-backed refs); nothing in this file is
-- truly volatile despite being typed as plain in-memory lists.
-- Outstanding /auth/challenge nonces awaiting /auth/verify. Capped overall and per-pubkey.
*challenges : [Challenge]
-- Live bearer tokens. Source of truth for `withSessionAuth`; trimmed by `pruneExpired`.
*sessions : [Session]
-- Mailbox: ciphertexts addressed to local recipients, awaiting poll or federation forward.
*messages : [Message]
-- Last-known location of each pubkey (local sessions and gossiped remote presence).
*presence : [PresenceEntry]
-- Cross-server outbox: messages queued for delivery to a peer, with retry counts.
*forwards : [ForwardEntry]
-- Per-peer failure counters; drives `shouldRetryServer`.
*forwardFailures : [ForwardFailure]
-- Strictly-monotonic sequence used as `Message.receivedAt`; guarantees poll-cursor ordering.
*seqCounter : Timestamp
-- ============================================
-- Rate limiting
-- ============================================
-- Per-endpoint token-bucket configuration. Auth endpoints key on the submitted
-- pubkey (so one keypair can't drain global capacity); authenticated client
-- endpoints key on the bearer token (per-session); federation endpoints key on
-- the connection IP. Each route has an independent bucket store, so endpoints
-- using the same key strategy don't share budget across routes. Buckets persist
-- in the runtime's `_knot_rate_limits` SQLite table; rejection responds 429
-- before the handler runs.
byPubkey = \{pubkey} _ -> Just {value: pubkey}
byAuth = \{authorization} _ -> Just {value: authorization}
byClientIp = \_ ctx -> Just {value: ctx.clientIp}
authLimit = {key: byPubkey, limit: {requests: authRateLimit, window: rateLimitWindow}}
pollLimit = {key: byAuth, limit: {requests: pollRateLimit, window: rateLimitWindow}}
sendLimit = {key: byAuth, limit: {requests: sendRateLimit, window: rateLimitWindow}}
gossipLimit = {key: byClientIp, limit: {requests: gossipRateLimit, window: rateLimitWindow}}
forwardLimit = {key: byClientIp, limit: {requests: forwardRateLimit, window: rateLimitWindow}}
-- ============================================
-- Routes
-- ============================================
-- Public HTTP surface, grouped by audience:
-- AuthApi -- challenge/verify (client identity)
-- ClientApi -- /poll, /messages (send)
-- FederationApi -- inter-server gossip, forward (single + batch)
-- Handlers return `Result HttpError T`: `Ok` produces a 200 with the
-- declared body shape, `Err` produces a non-2xx response whose body is
-- `{"error": "<code>"}`.
route AuthApi where
POST {pubkey: PubkeyHex} /auth/challenge headers {xForwardedFor: Maybe Text} -> {challenge: Text, expiresAt: Timestamp} rateLimit authLimit = ReqChallenge
POST {pubkey: PubkeyHex, challenge: Text, signature: SignatureHex, revokeOthers: Bool} /auth/verify headers {xForwardedFor: Maybe Text} -> {token: Text, expiresAt: Timestamp} rateLimit authLimit = VerifyAuth
route ClientApi where
POST {cursor: Timestamp} /poll headers {authorization: Text, xForwardedFor: Maybe Text} -> {events: [PollEvent], cursor: Timestamp} rateLimit pollLimit = Poll
POST {messages: [{to: PubkeyHex, encryptedBlob: BlobHex}]} /messages headers {authorization: Text, xForwardedFor: Maybe Text} -> {} rateLimit sendLimit = SendMessages
route FederationApi where
POST {events: [GossipEventType], fromServer: ServerName} /federation/gossip -> {} rateLimit gossipLimit = RecvGossip
POST {toKey: PubkeyHex, encryptedBlob: BlobHex} /federation/forward -> {} rateLimit forwardLimit = ForwardMessage
route Api = AuthApi | ClientApi | FederationApi
-- ============================================
-- Generic helpers
-- ============================================
-- List/predicate combinators (filter/all/findFirst/...), token generation,
-- and small text/server-name utilities used throughout the rest of the file.
-- Everything is intentionally O(n) over flat lists; performance is sacrificed
-- for clarity per the file header.
-- Cap to maxN by dropping the soonest-to-expire entries (LRU-style). Always
-- sorts by `expiresAt` so the oldest/most-stale entries are evicted first.
capList : [{expiresAt: Timestamp | r}] -> Int -> [{expiresAt: Timestamp | r}]
capList = \items maxN -> drop (max 0 (count items - maxN)) (sortBy (\x -> x.expiresAt) items)
-- Cap entries matching `pubkey` to maxN-1 (caller adds one more). Splits the
-- list into "different pubkey" (untouched) and "same pubkey" (capped +
-- LRU-evicted) so a single key can't exhaust a global pool — see
-- `maxChallengesPerKey` and `maxSessionsPerKey`.
capPerKey : [{pubkey: PubkeyHex, expiresAt: Timestamp | r}] -> PubkeyHex -> Int -> [{pubkey: PubkeyHex, expiresAt: Timestamp | r}]
capPerKey = \items pubkey maxN -> union (filter (\x -> not (x.pubkey == pubkey)) items) (capList (filter (\x -> x.pubkey == pubkey) items) (maxN - 1))
-- Random hex token: one random hex char per recursive call. `n` is the
-- desired length in characters (so n=48 yields 24 random bytes of entropy).
-- Session bearers, /auth challenges, and federation auth nonces all use n=48
-- (192 bits of entropy, well beyond birthday bound).
genToken : Int -> IO {random} Text
genToken = \n -> if n <= 0
then yield ""
else do
d <- randomInt 16
rest <- genToken (n - 1)
yield (take 1 (drop d "0123456789abcdef") ++ rest)
-- True when `s` is non-empty, even-length, and only lowercase hex digits.
isValidHex : Text -> Bool
isValidHex = \s -> do
let cs = chars s
let n = count cs
n > 0 && n / 2 * 2 == n && all (\c -> contains c "0123456789abcdef") cs
-- Recursively strip every trailing "." from `s` (DNS root form).
stripTrailingDot : Text -> Text
stripTrailingDot = \s -> if hasSuffix "." s
then stripTrailingDot (take (count (chars s) - 1) s)
else s
-- True when `s` refers to this server.
isLocalServer : Text -> Bool
isLocalServer = \s -> s == serverHost
-- True when `s` starts with `pre`.
hasPrefix : Text -> Text -> Bool
hasPrefix = \pre s -> take (count (chars pre)) s == pre
-- True when `s` ends with `suffix`.
hasSuffix : Text -> Text -> Bool
hasSuffix = \suffix s -> drop (max 0 (count (chars s) - count (chars suffix))) s == suffix
-- Block private/reserved/internal hostnames (SSRF defense). Skips obscure encoding
-- tricks (octal, hex, percent-encoding) but covers the high-value cases:
-- `localhost`, RFC1918 ranges (10/8, 192.168/16, 169.254/16), the IPv4 zero
-- net, bare IPv6 literals (any string with more than one `:` — covers loopback
-- `::`/`::1`, link-local `fe80::/10`, ULA `fc00::/7`, globals like `2001:db8::1`),
-- bracketed IPv6 literals, and the `.local` / `.internal` / `.arpa` / `.onion`
-- suffixes. Called at federation egress (so we never `fetch` a peer pubkey from
-- loopback) and on inbound requests (so a peer can't claim a bogus `fromServer`).
-- A legitimate `host:port` server name has exactly one `:`, so the multi-colon
-- check leaves that case intact.
isBadServerName : Text -> Bool
isBadServerName = \sn -> sn == "" || sn == "localhost" || any (\c -> not contains c "0123456789abcdefghijklmnopqrstuvwxyz-.:") (chars sn) || count (filter (\c -> c == ":") (chars sn)) > 1 || any (\p -> hasPrefix p sn) [
".",
":",
"[",
"127.",
"10.",
"192.168.",
"169.254.",
"0."
] || any (\s -> hasSuffix s sn) [".localhost", ".local", ".internal", ".arpa", ".onion"]
-- ============================================
-- Auth helpers
-- ============================================
-- Bearer token extraction and session lookup. Authenticated handlers funnel
-- through `authedPubkey`; per-endpoint rate limiting is enforced by the
-- compiler-generated `rateLimit` clauses on each route.
-- Federation requests are unauthenticated: anyone can hit /federation/*.
-- The `no_presence` check bounds federated forwards to currently-online keys.
-- Plain ip match (sessions with empty ip skip the check).
matchIp : Text -> Text -> Bool
matchIp = \clientIp sessionIp -> sessionIp == "" || clientIp == sessionIp
-- Unwrap an X-Forwarded-For header value to a plain IP string. Empty when the
-- header is absent (no reverse proxy in front).
clientIpOf : Maybe Text -> Text
clientIpOf = \h -> case h of
Just {value: ip} -> ip
Nothing {} -> ""
-- Resolve a Bearer token to its session pubkey if the session is unexpired and
-- the client IP matches. Returns Nothing on missing/invalid token.
authedPubkey : Text -> Int<Ms> -> Text -> IO {r *sessions} (Maybe PubkeyHex)
authedPubkey = \authorization t clientIp -> do
ses <- *sessions
if not hasPrefix "Bearer " authorization
then yield (Nothing {})
else do
let token = drop 7 authorization
yield (case findFirst ses (\s -> s.expiresAt > t && s.token == token && matchIp clientIp s.ip) of
Just {value: s} -> Just {value: s.pubkey}
_ -> Nothing {})
-- ============================================
-- Federation failure tracking (simple, no circuit breaker)
-- ============================================
-- Linear backoff per peer: each consecutive failure multiplies the delay by
-- `retryBackoffBase` (capped at `retryBackoffMax`). `shouldRetryServer` is the
-- single read-side gate; `recordFailure`/`clearFailure` are the write side.
-- There is no half-open state — a single success clears all accumulated failures.
-- True when enough time has passed since the last failure for `server` to be
-- worth retrying (or no failure recorded at all). Linear backoff:
-- `retryBackoffBase * failures`, capped at `retryBackoffMax`.
shouldRetryServer : Text -> Timestamp -> [ForwardFailure] -> Bool
shouldRetryServer = \server t failures -> not any (\f -> f.server == server && t - f.failedAt < min retryBackoffMax (retryBackoffBase * f.failures)) failures
-- Bump the failure counter for `server`.
recordFailure : Text -> IO {rw *forwardFailures, clock} {}
recordFailure = \server -> do
t <- now
atomic (do
failures <- *forwardFailures
let prevCount = case findFirst failures (\f -> f.server == server) of
Just {value: f} -> f.failures
_ -> 0
*forwardFailures = union (filter (\f -> f.server != server) failures) [
{server, failedAt: t, failures: prevCount + 1}
])
-- Drop the failure record for `server` (called after any successful peer call).
-- Single-success recovery — there is no half-open state.
clearFailure : Text -> IO {rw *forwardFailures} {}
clearFailure = \server -> atomic (do
forwardFailures <- *forwardFailures
*forwardFailures = filter (\f -> f.server != server) forwardFailures)
-- ============================================
-- Common helpers
-- ============================================
-- Helpers shared by send/poll/federation paths: monotonic sequence
-- generation, presence queries, and atomic mailbox/presence writes
-- (callers supply the surrounding `atomic` block).
-- True when `pk` has a live local presence row (i.e. is currently online here).
hasLocalPresence : [PresenceEntry] -> PubkeyHex -> Timestamp -> Bool
hasLocalPresence = \pres pk t -> any (\p -> p.pubkey == pk && isLocalServer p.server && p.expiresAt > t) pres
-- Project items through `getServer`, drop our own hostname, and dedupe.
-- Used to derive peer recipient lists from forwards/presence/etc.
collectPeers : [a] -> (a -> Text) -> [Text]
collectPeers = \items getServer -> filter (\s -> not isLocalServer s) (map getServer items)
-- Append one ciphertext to the local mailbox under a fresh monotonic
-- sequence. Row-level STM retry on `*messages` wakes any parked /poll
-- transaction whose filter matches the new row. Caller wraps in `atomic`.
appendMessage : PubkeyHex -> BlobHex -> Int<Ms> -> IO {rw *messages, rw *seqCounter} {}
appendMessage = \toKey encryptedBlob t -> do
msgs <- *messages
seq <- *seqCounter
let seqTs = max t (seq + 1)
*seqCounter = seqTs
*messages = union msgs [{toKey, encryptedBlob, receivedAt: seqTs, deliveredTo: []}]
-- True when `f` is the queued forward of `blob` to `server`. Used to dedupe
-- per-(blob, server) entries on the forwards outbox.
forwardMatches : BlobHex -> ServerName -> ForwardEntry -> Bool
forwardMatches = \blob server f -> f.encryptedBlob == blob && f.toServer == server
-- Replace any local presence rows for `pubkey` with one fresh row at
-- `expiry`, and return the post-update peer broadcast set. Caller wraps in
-- `atomic`.
refreshLocalPresence : PubkeyHex -> Timestamp -> IO {rw *presence} [Text]
refreshLocalPresence = \pubkey expiry -> do
pres <- *presence
*presence = union (filter (\p -> not (p.pubkey == pubkey && isLocalServer p.server)) pres) [
{pubkey, server: serverHost, expiresAt: expiry}
]
yield (collectPeers pres (\p -> p.server))
-- ============================================
-- Pruning
-- ============================================
-- Single sweep that drops expired entries from every state ref in one atomic
-- block. Returns the locally-expired presence rows whose pubkey has no other
-- live local session, so the caller can broadcast offline gossip for them.
-- One sweep over every state ref: drops expired rows in a single atomic block.
-- Returns the locally-expired presence rows whose pubkey has no other live
-- local session, so the caller can broadcast offline gossip for them.
pruneExpired : Int<Ms> -> IO {rw *challenges, rw *forwardFailures, rw *messages, rw *presence, rw *sessions} [PubkeyHex]
pruneExpired = \t -> atomic do
ch <- *challenges
*challenges = filter (\x -> x.expiresAt > t) ch
ses <- *sessions
*sessions = filter (\x -> x.expiresAt > t) ses
msgs <- *messages
*messages = filter (\m -> t - m.receivedAt < messageMaxAge) msgs
ff <- *forwardFailures
*forwardFailures = filter (\f -> t - f.failedAt < forwardFailureMaxAge) ff
-- presence: remember locally-expired keys for offline gossip
pres <- *presence
*presence = filter (\p -> p.expiresAt > t) pres
let liveLocalPubkeys = map (\p -> p.pubkey) (filter (\p -> isLocalServer p.server && p.expiresAt > t) pres)
yield (map (\p -> p.pubkey) (filter (\p -> p.expiresAt <= t && isLocalServer p.server && not elem p.pubkey liveLocalPubkeys) pres))
-- ============================================
-- Federation gossip
-- ============================================
-- Construction and outbound dispatch of presence gossip events. Failures from
-- a peer record into `forwardFailures` (shared with forward retries) so both
-- subsystems back off together.
-- Fan out one bundle of gossip events to each peer, skipping peers in backoff.
-- No-op when `events` is empty so callers don't need to guard. Records or
-- clears federation-failure counters per call.
sendBatchedGossip : [Text] -> [GossipEventType] -> IO {rw *forwardFailures, network, clock} {}
sendBatchedGossip = \peers events -> when (count events > 0) (do
t <- now
failures <- *forwardFailures
forEach peers (\server -> when (shouldRetryServer server t failures) (do
result <- fetch (federationUrl server) (RecvGossip {events, fromServer: serverHost})
case result of
Err _ -> recordFailure server
Ok _ -> clearFailure server)))
-- ============================================
-- Forward retries
-- ============================================
-- `retryPendingForwards` walks queued cross-server forwards in batches per
-- destination, increments retry counts, and gives up after `maxForwardRetries`.
-- `forwardQueuedToServer` is the catch-up path triggered when a remote key
-- comes online: any messages already buffered for that pubkey are enqueued
-- as forwards to the newly-known server.
-- Walk pending forwards grouped by destination, retry each (up to maxBatchSize
-- per peer) using `/federation/forward`. Drops successful entries and records
-- the peer on the message's `deliveredTo`; for failures bumps the retry count
-- and gives up after `maxForwardRetries`.
retryPendingForwards : IO {rw *forwardFailures, rw *forwards, rw *messages, network, clock} {}
retryPendingForwards = do
t <- now
fwds <- *forwards
failures <- *forwardFailures
forEach (filter (\s -> shouldRetryServer s t failures) (collectPeers fwds (\f -> f.toServer))) (\server -> forEach (take maxBatchSize (filter (\f -> f.toServer == server) fwds)) (\f -> do
result <- fetch (federationUrl server) (ForwardMessage {f.toKey, f.encryptedBlob})
success <- case result of
Err _ -> do
recordFailure server
yield (False {})
Ok _ -> do
clearFailure server
yield (True {})
let isMatch = forwardMatches f.encryptedBlob server
let isDone = \x -> isMatch x && (success || x.retries + 1 >= maxForwardRetries)
atomic (do
cur <- *forwards
*forwards = map (\x -> if isMatch x
then {x.toServer, x.toKey, x.encryptedBlob, retries: x.retries + 1}
else x) (filter (\x -> not isDone x) cur)
when success (do
msgs <- *messages
*messages = map (\m -> if m.toKey == f.toKey && m.encryptedBlob == f.encryptedBlob && not elem server m.deliveredTo
then {m.toKey, m.encryptedBlob, m.receivedAt, deliveredTo: union m.deliveredTo [server]}
else m) msgs))))
-- Forward queued messages to a server when a remote key comes online.
-- Skips messages already delivered to `server` (tracked via Message.deliveredTo)
-- so a presence re-announce after a TTL lapse doesn't duplicate delivery.
forwardQueuedToServer : PubkeyHex -> ServerName -> IO {rw *forwards, r *messages} {}
forwardQueuedToServer = \pubkey server -> atomic (do
msgs <- *messages
fwds <- *forwards
let existing = count (filter (\f -> f.toServer == server) fwds)
let budget = max 0 (maxForwardsPerServer - existing)
let candidates = filter (\m -> m.toKey == pubkey && not any (forwardMatches m.encryptedBlob server) fwds && not elem server m.deliveredTo) msgs
let newFwds = map (\m -> {toServer: server, m.toKey, m.encryptedBlob, retries: 0}) (take budget (sortBy (\m -> m.receivedAt) candidates))
*forwards = union fwds newFwds)
-- ============================================
-- Handler: challenge / verify
-- ============================================
-- Identity is a public key; there are no accounts. The flow is:
-- 1. Client requests a fresh nonce (`handleChallenge`).
-- 2. Client signs the nonce with its private key and submits the signature
-- together with the original challenge (`handleVerifyAuth`).
-- 3. Server returns a Bearer token bound to (pubkey, ip, expiry); the new
-- session also installs a local presence row and gossips an `online`
-- event to known peers.
-- Sessions expire on their own (`sessionExpiry`); there is no client-driven
-- revoke/logout.
-- POST /auth/challenge: issue a fresh signed-nonce challenge for `pubkey`.
-- Capped per-key (maxChallengesPerKey); per-pubkey rate limiting is enforced
-- by the route's `rateLimit` clause. The IP is recorded so /auth/verify can
-- require the same client.
handleChallenge : PubkeyHex -> Text -> IO {rw *challenges, clock, random} (Result HttpError {challenge: Text, expiresAt: Timestamp})
handleChallenge = \pubkey clientIp -> do
t <- now
challenge <- genToken 48
let expiresAt = t + challengeExpiry
atomic (do
ch <- *challenges
let cappedActive = capPerKey (filter (\c -> c.expiresAt > t) ch) pubkey maxChallengesPerKey
if count cappedActive >= maxChallenges
then yield (Err {error: {status: 503, message: "capacity"}})
else do
*challenges = union cappedActive [{pubkey, challenge, expiresAt, ip: clientIp}]
yield (Ok {value: {challenge, expiresAt}}))
-- POST /auth/verify: validate `signature` over `challenge` for `pubkey`.
-- On success, mints a session token, records local presence (`pubkey` is now
-- live on this server), and gossips an `online` event to known peers. If
-- `revokeOthers` is set, all other sessions for the same pubkey are dropped.
handleVerifyAuth : PubkeyHex -> Text -> Text -> Bool -> Text -> IO {rw *challenges, rw *forwardFailures, rw *presence, rw *sessions, network, clock, random} (Result HttpError {expiresAt: Timestamp, token: Text})
handleVerifyAuth = \pubkey challenge signature revokeOthers clientIp -> do
t <- now
let expiry = t + sessionExpiry
token <- genToken 48
let sigValid = case bytesFromHex pubkey of
Just {value: pk} -> case bytesFromHex signature of
Just {value: sigBytes} -> verify pk (textToBytes challenge) sigBytes
_ -> False {}
_ -> False {}
result <- atomic (do
ch <- *challenges
let same = \c -> c.pubkey == pubkey && c.challenge == challenge
let valid = any (\c -> same c && c.expiresAt > t && matchIp clientIp c.ip) ch && sigValid
if not valid
then yield (Nothing {})
else do
*challenges = filter (\c -> not same c) ch
ses <- *sessions
let pruned = if revokeOthers
then filter (\s -> s.pubkey != pubkey) ses
else capPerKey ses pubkey maxSessionsPerKey
let newSession = {token, pubkey, expiresAt: expiry, ip: clientIp}
*sessions = capList (union pruned [newSession]) maxSessions
peers <- refreshLocalPresence pubkey expiry
yield (Just {value: peers}))
case result of
Nothing {} -> yield (Err {error: {status: 401, message: "invalid"}})
Just {value: peers} -> do
when federationEnabled (fork (sendBatchedGossip peers [GossipOnline {pubkey}]))
yield (Ok {value: {token, expiresAt: expiry}})
-- ============================================
-- Handler: poll
-- ============================================
-- Long-poll. The client supplies a cursor (last-seen `receivedAt`). We:
-- * Ack-and-drop everything <= the cursor (treating poll progress as ack).
-- * Race a row-level STM wait on `*messages` against `pollTimeoutMs` and
-- return new events plus piggybacked sender-side expirations/deliveries.
-- POST /poll: long-poll body. Treats the cursor advance as an implicit ack
-- (`ackPolledMessages`), refreshes the caller's local presence row so peers
-- don't time us out mid-session, then runs `doPoll` to collect events (or
-- block until a matching message arrives or the timeout elapses) and
-- assembles the response.
handlePoll : Int -> Text -> Text -> IO {r *sessions, rw *forwardFailures, rw *forwards, rw *messages, rw *presence, network, clock} (Result HttpError {cursor: Timestamp, events: [PollEvent]})
handlePoll = \cursor authorization clientIp -> do
t <- now
setup <- atomic (do
auth <- authedPubkey authorization t clientIp
case auth of
Nothing {} -> yield (Nothing {})
Just {value: pubkey} -> do
let clampedCursor = max 0 (min t cursor)
when (clampedCursor > 0) (ackPolledMessages pubkey clampedCursor)
pres <- *presence
let stale = not any (\p -> p.pubkey == pubkey && isLocalServer p.server && p.expiresAt - t >= sessionExpiry / 2) pres
refreshPeers <- if stale then refreshLocalPresence pubkey (t + sessionExpiry) else yield []
yield (Just {value: {pubkey, clampedCursor, refreshPeers}}))
case setup of
Nothing {} -> yield (Err {error: {status: 401, message: "unauthorized"}})
Just {value: s} -> do
when federationEnabled (fork (sendBatchedGossip s.refreshPeers [GossipOnline {s.pubkey}]))
pollResult <- doPoll s t
yield (Ok {value: pollResult})
-- One pass: collect this pubkey's unexpired messages newer than the cursor,
-- sort by receivedAt, take up to `maxPollEvents`. The returned cursor is the
-- max `receivedAt` in the page (or the input cursor when the page is empty),
-- which the client echoes on the next /poll to advance — and which the
-- server treats as an implicit ack.
-- If the page is empty, race a row-level STM wait against `pollTimeoutMs`:
-- the wait transaction reads `*messages` and `retry`s until a row matching
-- this pubkey/cursor is committed; the timer wins on idle. Either way we
-- re-collect at the new wall time, so a timeout returns an empty page.
doPoll : {clampedCursor: Timestamp, pubkey: PubkeyHex | a} -> Timestamp -> IO {r *messages, clock} {cursor: Timestamp, events: [{encryptedBlob: BlobHex}]}
doPoll = \s t -> do
let collectAt = \tnow -> atomic (do
msgs <- *messages
let fresh = filter (\m -> m.toKey == s.pubkey && tnow - m.receivedAt < messageMaxAge && m.receivedAt > s.clampedCursor) msgs
let limited = take maxPollEvents (sortBy (\m -> m.receivedAt) fresh)
yield {
events: map (\m -> {m.encryptedBlob}) limited,
cursor: fold (\acc m -> max acc m.receivedAt) s.clampedCursor limited
})
initial <- collectAt t
if count initial.events > 0
then yield initial
else do
-- `race` requires both arms to share a single effect row, so each arm
-- carries the union {r *messages, clock} — the STM wait gets a `now`,
-- the timer gets a no-op `*messages` read.
race
(do
_ <- now
atomic (do
msgs <- *messages
let fresh = filter (\m -> m.toKey == s.pubkey && m.receivedAt > s.clampedCursor) msgs
if count fresh > 0 then yield {} else retry))
(do
atomic (do _ <- *messages; yield {})
sleep pollTimeoutMs)
t2 <- now
collectAt t2
-- Treat the supplied cursor as an ack: drop all messages with
-- receivedAt <= cursor, plus their pending forwards.
ackPolledMessages : PubkeyHex -> Int -> IO {rw *forwards, rw *messages} {}
ackPolledMessages = \pubkey cursor -> do
msgs <- *messages
let matches = \m -> m.toKey == pubkey && m.receivedAt <= cursor
let toAck = filter matches msgs
let blobs = map (\m -> m.encryptedBlob) toAck
*messages = filter (\m -> not matches m) msgs
fwds <- *forwards
*forwards = filter (\f -> not (f.toKey == pubkey && elem f.encryptedBlob blobs)) fwds
-- ============================================
-- Handler: send
-- ============================================
-- Accept a batch of messages for recipient pubkeys. Placement (live local
-- session, federated forward, or held-for-presence) is internal; the call
-- either succeeds (every message enqueued) or fails with a non-2xx HTTP
-- status carrying the rejection code ("self_send" | ...).
-- Duplicates are silently treated as success so retries are idempotent.
-- Per-session send rate is enforced by the route's `rateLimit` clause.
-- Federated forwards are kicked off in background forks after the atomic.
-- POST /messages: send a list of ciphertexts atomically. Self-send is
-- rejected up-front; the remaining batch is committed inside one atomic
-- block (all-or-nothing) which reads presence to pre-stage federation
-- forwards. Federation forwards are kicked per message after commit.
handleSendMessages : [{encryptedBlob: BlobHex, to: PubkeyHex | a}] -> Text -> Text -> IO {r *presence, r *sessions, rw *forwardFailures, rw *forwards, rw *messages, rw *seqCounter, network, clock} (Result HttpError {})
handleSendMessages = \messages authorization clientIp -> do
t <- now
auth <- authedPubkey authorization t clientIp
case auth of
Nothing {} -> yield (Err {error: {status: 401, message: "unauthorized"}})
Just {value: senderKey} -> do
if count messages > maxBatchSize
then yield (Err {error: {status: 413, message: "batch_too_large"}})
else if any (\m -> m.to == senderKey) messages
then yield (Err {error: {status: 400, message: "self_send"}})
else do
committed <- atomic (do
pres <- *presence
let staged = map (\m -> {
m.to,
m.encryptedBlob,
remoteServers: collectPeers (filter (\p -> p.pubkey == m.to && p.expiresAt > t) pres) (\p -> p.server)
}) messages
forEach staged (\c -> do
appendMessage c.to c.encryptedBlob t
when federationEnabled (do
fwds <- *forwards
let newFwds = map (\s -> {toServer: s, toKey: c.to, c.encryptedBlob, retries: 0}) (filter (\s -> not any (forwardMatches c.encryptedBlob s) fwds) c.remoteServers)
*forwards = union fwds newFwds))
yield staged)
when federationEnabled (forEach committed (\c -> fork (forkForwardsTo c.to c.encryptedBlob c.remoteServers)))
yield (Ok {value: {}})
-- Fire one /federation/forward call per pre-staged server. On success the
-- corresponding row in `forwards` is pruned and the peer is appended to the
-- message's `deliveredTo` set. The backing `*messages` row itself stays —
-- cleaned up by `ackPolledMessages` when the recipient polls here, or by
-- `pruneExpired` once `messageMaxAge` lapses. Keeping the row lets
-- `forwardQueuedToServer` re-forward to peers learned after send time (so a
-- multi-homed recipient who appears on a new peer post-send still receives
-- the message), and `deliveredTo` prevents re-forwards to peers that already
-- have the message. On failure the forwards row stays so
-- `retryPendingForwards` can re-attempt with backoff.
forkForwardsTo : PubkeyHex -> BlobHex -> [Text] -> IO {rw *forwardFailures, rw *forwards, rw *messages, network, clock} {}
forkForwardsTo = \to encryptedBlob servers -> forEach servers (\server -> do
result <- fetch (federationUrl server) (ForwardMessage {toKey: to, encryptedBlob})
let same = forwardMatches encryptedBlob server
case result of
Err _ -> recordFailure server
Ok _ -> do
clearFailure server
atomic (do
fwds <- *forwards
*forwards = filter (\x -> not same x) fwds
msgs <- *messages
*messages = map (\m -> if m.toKey == to && m.encryptedBlob == encryptedBlob && not elem server m.deliveredTo
then {m.toKey, m.encryptedBlob, m.receivedAt, deliveredTo: union m.deliveredTo [server]}
else m) msgs))
-- ============================================
-- Handler: federation receive
-- ============================================
-- Inbound peer-to-peer endpoints. Federation is open and unauthenticated:
-- the `no_presence` check in `handleForwardMessage` bounds forwards to
-- currently-online local keys.
-- POST /federation/gossip: ingest peer presence events. The transport-level
-- `fromServer` is the home server for every event. Drops every (pubkey,
-- fromServer) row touched by this batch, then re-adds fresh-TTL rows for
-- pubkeys with a `GossipOnline` event (so Online wins on collision). For keys
-- that transition offline → online, flushes any locally-buffered messages via
-- `forwardQueuedToServer`.
handleRecvGossip : [GossipEventType] -> Text -> IO {r *messages, rw *forwards, rw *presence, clock} (Result HttpError {})
handleRecvGossip = \events fromServer -> do
t <- now
if isLocalServer fromServer || isBadServerName fromServer || count events > maxGossipEvents
then yield (Err {error: {status: 400, message: "invalid_request"}})
else do
newlyOnline <- atomic (do
pres <- *presence
let collected = fold (\acc e -> case e of
GossipOnline {pubkey} -> {
touched: union acc.touched [pubkey],
added: union acc.added [{pubkey, server: fromServer, expiresAt: t + onlineGossipTtl}]
}
GossipOffline {pubkey} -> {acc.added, touched: union acc.touched [pubkey]}
_ -> acc) {touched: [], added: []} events
let kept = filter (\p -> p.server != fromServer || not elem p.pubkey collected.touched) pres
*presence = union kept collected.added
yield (filter (\pk -> not any (\p -> p.pubkey == pk && p.server == fromServer && p.expiresAt > t) pres) (map (\p -> p.pubkey) collected.added)))
fork (forEach newlyOnline (\pk -> forwardQueuedToServer pk fromServer))
yield (Ok {value: {}})
-- POST /federation/forward: receive a single federated message destined for
-- a local key. The presence gate requires the recipient to be **currently
-- locally online** (not just visible to gossip somewhere). Without that, an
-- A → B forward based on stale gossip about R could land on B with no
-- actively-polling client and get stuck. If B refuses, A retries via
-- `retryPendingForwards` against the next presence target.
handleForwardMessage : PubkeyHex -> BlobHex -> IO {rw *messages, r *presence, rw *seqCounter, clock} (Result HttpError {})
handleForwardMessage = \toKey encryptedBlob -> do
t <- now
atomic (do
pres <- *presence
if not hasLocalPresence pres toKey t
then yield (Err {error: {status: 404, message: "no_presence"}})
else do
appendMessage toKey encryptedBlob t
yield (Ok {value: {}}))
-- ============================================
-- Dispatch
-- ============================================
-- Single match from generated request constructor (one per route entry above)
-- to the corresponding `handleXxx` function. Adding a new route means adding
-- a `route ... = NewReq` line, a `handleNewReq` definition, and a clause here.
-- Top-level request dispatcher: maps each generated request constructor to
-- its `handleXxx` function. Every route entry above must have a clause here.
api : Server Api _
api = serve Api where
ReqChallenge = \{pubkey, xForwardedFor} -> handleChallenge pubkey (clientIpOf xForwardedFor)
VerifyAuth = \{pubkey, challenge, signature, revokeOthers, xForwardedFor} -> handleVerifyAuth pubkey challenge signature revokeOthers (clientIpOf xForwardedFor)
Poll = \{cursor, authorization, xForwardedFor} -> handlePoll cursor authorization (clientIpOf xForwardedFor)
SendMessages = \{messages, authorization, xForwardedFor} -> handleSendMessages messages authorization (clientIpOf xForwardedFor)
RecvGossip = \{events, fromServer} -> handleRecvGossip events fromServer
ForwardMessage = \{toKey, encryptedBlob} -> handleForwardMessage toKey encryptedBlob
-- ============================================
-- Background loops
-- ============================================
-- `backgroundPrune` runs the prune sweep every `backgroundPruneInterval` ms.
-- `backgroundRetryForwards` runs the forward-retry sweep on the same cadence
-- but as its own sequential loop so two retries can't race.
-- Endless tick: prune expired state and gossip offline for locally-expired
-- keys. First sweep happens immediately so stale rows persisted by the
-- previous process clear before the next interval elapses.
backgroundPrune : IO {rw *challenges, rw *forwardFailures, rw *messages, rw *presence, rw *sessions, network, clock} a
backgroundPrune = do
t <- now
expiredLocal <- pruneExpired t
when federationEnabled (fork (do
pres <- *presence
sendBatchedGossip (collectPeers pres (\p -> p.server)) (map (\pk -> GossipOffline {pubkey: pk}) expiredLocal)))
sleep backgroundPruneInterval
backgroundPrune
-- Endless tick: retry pending cross-server forwards. Calls
-- `retryPendingForwards` inline (not via `fork`) so a slow run blocks the
-- next iteration instead of racing with itself — two concurrent runs would
-- see the same `*forwards` snapshot and double-count `recordFailure`,
-- inflating the per-peer backoff multiplier.
backgroundRetryForwards : IO {rw *forwardFailures, rw *forwards, rw *messages, network, clock} a
backgroundRetryForwards = do
retryPendingForwards
sleep backgroundPruneInterval
backgroundRetryForwards
-- ============================================
-- Entry Point
-- ============================================
-- Fork the background prune loop (which runs an immediate sweep on its
-- first tick) and start the HTTP listener. The forward-retry loop only
-- runs when federation is enabled.
main : IO _ {}
main = do
logInfo ("Skrepka relay server starting on " ++ bindHost ++ ":" ++ show listenPort ++ " ...")
logInfo (" federation: " ++ (if federationEnabled then "enabled" else "disabled"))
fork backgroundPrune
when federationEnabled (fork backgroundRetryForwards)
listenOn bindHost listenPort api