single-connection GetSubscription flow#951
single-connection GetSubscription flow#951bitcoin-coder-bob wants to merge 12 commits intomasterfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughGetSubscription now supports creating an inline subscription when subscriptionId is empty (emits SubscriptionStartedEvent); adds unary UpdateSubscriptionScripts (modify/overwrite) and client/server support for mid‑stream script updates while retaining legacy Subscribe/Unsubscribe flows and heartbeat behavior. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Server as IndexerService
participant Manager as ListenerManager
rect rgba(135, 206, 235, 0.5)
Note over Client,Server: New single-connection flow
Client->>Server: GetSubscription(subscriptionId="", scripts=[...])
Server->>Server: subscriptionId empty? generate new ID
Server->>Manager: create & register listener (subscriptionId)
Server->>Client: SubscriptionStartedEvent(subscriptionId)
Client->>Client: cache subscriptionId
Server->>Client: IndexerSubscriptionEvent (events...)
end
rect rgba(144, 238, 144, 0.5)
Note over Client,Server: Update scripts mid-stream
Client->>Server: UpdateSubscriptionScripts(subscriptionId, modify/overwrite)
Server->>Manager: add/remove/replace topics for listener
Server->>Client: UpdateSubscriptionScriptsResponse(scriptsAdded, scriptsRemoved, allScripts)
Server->>Client: IndexerSubscriptionEvent (subsequent filtered events...)
end
rect rgba(211, 211, 211, 0.5)
Note over Client,Server: Legacy flow (still supported)
Client->>Server: SubscribeForScripts(...) -> unary subscriptionId
Client->>Server: GetSubscription(subscriptionId) -> stream
Client->>Server: UnsubscribeForScripts(...) -> unary
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/client-lib/indexer/grpc/client.go (1)
545-552:⚠️ Potential issue | 🟡 MinorClear cached subscription ID on explicit close.
Lines 545-552 close the stream but keep
subscriptionIdcached. SubsequentModifySubscriptionScripts/OverwriteSubscriptionScriptscalls can target stale state instead of failing fast locally.🔧 Proposed fix
closeFn := func() { + a.setSubscriptionID("") cancel() streamMu.Lock() defer streamMu.Unlock() if err := stream.CloseSend(); err != nil { log.Warnf("failed to close subscription stream: %v", err) } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/client-lib/indexer/grpc/client.go` around lines 545 - 552, The closeFn currently cancels context and closes the stream but leaves the cached subscriptionId intact, which can cause subsequent ModifySubscriptionScripts/OverwriteSubscriptionScripts to act on stale state; update closeFn (the closure that calls cancel(), locks streamMu, and invokes stream.CloseSend()) to also clear the cached subscriptionId while holding the same streamMu lock (e.g., set subscriptionId to the zero value) so the subscription state is reset atomically and future calls fail fast locally.
🧹 Nitpick comments (2)
api-spec/openapi/swagger/ark/v1/indexer.openapi.json (1)
449-455: Add an explicit upper bound forscriptsarray inputs.This query parameter is unbounded today. Define
maxItemshere (and enforce the same limit server-side) to reduce request-amplification risk.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api-spec/openapi/swagger/ark/v1/indexer.openapi.json` around lines 449 - 455, The OpenAPI schema for the "scripts" query parameter currently has no upper bound (only "minItems": 1), which allows unbounded arrays; add a "maxItems" (e.g., a small safe limit like 10 or whatever policy dictates) to the schema object alongside "minItems" and "type": "array" for the "scripts" parameter, and update the server-side validation in the request handler that processes the "scripts" parameter (the code that reads/validates the scripts array) to enforce the same maxItems limit and return a clear 4xx error when exceeded.internal/interface/grpc/handlers/indexer_test.go (1)
319-320: These time-based waits are likely flaky under CI jitter.Lines 319, 390, and 428 rely on fixed
time.Sleeptiming. Consider replacing these with event-driven synchronization orrequire.Eventuallychecks to stabilize the suite.Also applies to: 390-391, 428-429
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/interface/grpc/handlers/indexer_test.go` around lines 319 - 320, The test uses fixed time.Sleep before sending on ch (and in other places) which is flaky; replace those sleeps by explicit synchronization: introduce a ready/done channel or use require.Eventually to wait for the expected condition instead of sleeping, and coordinate the goroutine that sends &arkv1.GetSubscriptionResponse on ch to wait for that signal (or poll the observable condition with require.Eventually in the test). Locate uses of ch and the &arkv1.GetSubscriptionResponse send in indexer_test.go and change the sleep-based timing to either a select on a ready channel/ctx with timeout or require.Eventually assertions to wait for the state the test depends on.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@api-spec/protobuf/ark/v1/indexer.proto`:
- Around line 367-371: The proto documents subscription_id may be empty but the
current HTTP mapping requires the path segment; update the RPC's google.api.http
options to add a second, additive POST binding without the path parameter (e.g.
add POST "/v1/indexer/script/subscription" in addition to the existing POST
"/v1/indexer/script/subscription/{subscription_id}") so REST/OpenAPI clients can
call the empty-subscription_id flow; keep the existing binding for backward
compatibility, ensure the body mapping still accepts the message (including
repeated string scripts and optional subscription_id) and regenerate/update HTTP
gateway/OpenAPI artifacts.
In `@internal/interface/grpc/handlers/indexer.go`:
- Around line 451-454: The handler currently swallows the "listener not found"
error from scriptSubsHandler.getListenerChannel (leaving scriptCh nil), so
change the error handling to return a NotFound gRPC error when the
getListenerChannel call indicates "listener not found" instead of ignoring it;
update both the initial retrieval
(scriptSubsHandler.getListenerChannel(subscriptionId)) and the later similar
check around the event receive/heartbeat path to return
status.Error(codes.NotFound, err.Error()) when strings.Contains(err.Error(),
"listener not found") so the RPC surfaces an invalid subscription instead of
emitting only heartbeats.
In `@internal/interface/grpc/permissions/permissions.go`:
- Around line 258-263: The RPC mapping for UpdateSubscriptionScripts incorrectly
uses Action: "read" even though the RPC mutates subscription topics; update the
permissions entry for the route keyed by UpdateSubscriptionScripts in the
permissions map to use Action: "write" (or the appropriate write-authorization
constant) instead of "read" and ensure the change is applied to the permissions
map where UpdateSubscriptionScripts is defined (so UpdateSubscriptionScripts now
requires write permission).
---
Outside diff comments:
In `@pkg/client-lib/indexer/grpc/client.go`:
- Around line 545-552: The closeFn currently cancels context and closes the
stream but leaves the cached subscriptionId intact, which can cause subsequent
ModifySubscriptionScripts/OverwriteSubscriptionScripts to act on stale state;
update closeFn (the closure that calls cancel(), locks streamMu, and invokes
stream.CloseSend()) to also clear the cached subscriptionId while holding the
same streamMu lock (e.g., set subscriptionId to the zero value) so the
subscription state is reset atomically and future calls fail fast locally.
---
Nitpick comments:
In `@api-spec/openapi/swagger/ark/v1/indexer.openapi.json`:
- Around line 449-455: The OpenAPI schema for the "scripts" query parameter
currently has no upper bound (only "minItems": 1), which allows unbounded
arrays; add a "maxItems" (e.g., a small safe limit like 10 or whatever policy
dictates) to the schema object alongside "minItems" and "type": "array" for the
"scripts" parameter, and update the server-side validation in the request
handler that processes the "scripts" parameter (the code that reads/validates
the scripts array) to enforce the same maxItems limit and return a clear 4xx
error when exceeded.
In `@internal/interface/grpc/handlers/indexer_test.go`:
- Around line 319-320: The test uses fixed time.Sleep before sending on ch (and
in other places) which is flaky; replace those sleeps by explicit
synchronization: introduce a ready/done channel or use require.Eventually to
wait for the expected condition instead of sleeping, and coordinate the
goroutine that sends &arkv1.GetSubscriptionResponse on ch to wait for that
signal (or poll the observable condition with require.Eventually in the test).
Locate uses of ch and the &arkv1.GetSubscriptionResponse send in indexer_test.go
and change the sleep-based timing to either a select on a ready channel/ctx with
timeout or require.Eventually assertions to wait for the state the test depends
on.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 236f3ec0-64f8-4fda-a981-a4d9ed030d67
⛔ Files ignored due to path filters (3)
api-spec/protobuf/gen/ark/v1/indexer.pb.gois excluded by!**/*.pb.go,!**/gen/**api-spec/protobuf/gen/ark/v1/indexer.pb.rgw.gois excluded by!**/gen/**api-spec/protobuf/gen/ark/v1/indexer_grpc.pb.gois excluded by!**/*.pb.go,!**/gen/**
📒 Files selected for processing (7)
api-spec/openapi/swagger/ark/v1/indexer.openapi.jsonapi-spec/protobuf/ark/v1/indexer.protointernal/interface/grpc/handlers/indexer.gointernal/interface/grpc/handlers/indexer_test.gointernal/interface/grpc/permissions/permissions.gopkg/client-lib/indexer/grpc/client.gopkg/client-lib/indexer/service.go
There was a problem hiding this comment.
🧹 Nitpick comments (1)
api-spec/openapi/swagger/ark/v1/indexer.openapi.json (1)
448-454: Consider addingmaxItemsconstraint to align with server-side validation.The handler enforces a maximum of 10 scripts (see
indexer.goline 412), but the OpenAPI schema only specifiesminItems: 1. AddingmaxItems: 10would document the server's limit and enable client-side validation.📝 Suggested schema update
"name": "scripts", "in": "query", "description": "Optional: scripts to subscribe to on stream creation.\nOnly used when subscription_id is empty; ignored otherwise.", "style": "simple", "schema": { "minItems": 1, + "maxItems": 10, "type": "array", "items": { "type": "string" } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api-spec/openapi/swagger/ark/v1/indexer.openapi.json` around lines 448 - 454, The OpenAPI array schema currently has "minItems": 1 and "type": "array" with string items but omits the server-enforced upper bound; update the schema to add "maxItems": 10 next to "minItems" so the client spec matches the server-side handler that limits scripts to 10 (the indexer handler enforcing the max-10 scripts constraint).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@api-spec/openapi/swagger/ark/v1/indexer.openapi.json`:
- Around line 448-454: The OpenAPI array schema currently has "minItems": 1 and
"type": "array" with string items but omits the server-enforced upper bound;
update the schema to add "maxItems": 10 next to "minItems" so the client spec
matches the server-side handler that limits scripts to 10 (the indexer handler
enforcing the max-10 scripts constraint).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 1cb60c94-99df-4b1a-835a-353adcaaa0ce
⛔ Files ignored due to path filters (2)
api-spec/protobuf/gen/ark/v1/indexer.pb.gois excluded by!**/*.pb.go,!**/gen/**api-spec/protobuf/gen/ark/v1/indexer.pb.rgw.gois excluded by!**/gen/**
📒 Files selected for processing (6)
api-spec/openapi/swagger/ark/v1/indexer.openapi.jsonapi-spec/protobuf/ark/v1/indexer.protointernal/interface/grpc/handlers/indexer.gointernal/interface/grpc/handlers/indexer_test.gointernal/interface/grpc/permissions/permissions.gopkg/client-lib/indexer/grpc/client.go
🚧 Files skipped from review as they are similar to previous changes (1)
- internal/interface/grpc/handlers/indexer_test.go
There was a problem hiding this comment.
🧹 Nitpick comments (1)
internal/interface/grpc/handlers/indexer.go (1)
549-554: Remove scripts should be validated for consistency.The
addScriptsare parsed and validated (Line 539), butremoveScriptsare passed directly toremoveTopicswithout validation. If a client sends malformed scripts for removal, they'll silently fail to match any stored topics rather than returning an error.For consistency with the add path, consider validating remove scripts:
♻️ Proposed fix
if len(modify.GetRemoveScripts()) > 0 { + removeScripts, err := parseScripts(modify.GetRemoveScripts()) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } if err := h.scriptSubsHandler.removeTopics( - req.GetSubscriptionId(), modify.GetRemoveScripts(), + req.GetSubscriptionId(), removeScripts, ); err != nil { return nil, status.Error(codes.NotFound, err.Error()) } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/interface/grpc/handlers/indexer.go` around lines 549 - 554, The removeScripts (modify.GetRemoveScripts()) must be parsed and validated using the same logic used for addScripts (the code that processes modify.GetAddScripts()) before calling h.scriptSubsHandler.removeTopics; reuse the parser/validator invoked for add scripts to parse modify.GetRemoveScripts(), return a gRPC error (e.g., status.Error with codes.InvalidArgument) when validation fails, and only pass the validated/normalized script list to h.scriptSubsHandler.removeTopics to ensure malformed removal scripts produce an error instead of silently no-oping.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@internal/interface/grpc/handlers/indexer.go`:
- Around line 549-554: The removeScripts (modify.GetRemoveScripts()) must be
parsed and validated using the same logic used for addScripts (the code that
processes modify.GetAddScripts()) before calling
h.scriptSubsHandler.removeTopics; reuse the parser/validator invoked for add
scripts to parse modify.GetRemoveScripts(), return a gRPC error (e.g.,
status.Error with codes.InvalidArgument) when validation fails, and only pass
the validated/normalized script list to h.scriptSubsHandler.removeTopics to
ensure malformed removal scripts produce an error instead of silently no-oping.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 8291a2bb-727f-4b57-8b37-d67ce13043cc
⛔ Files ignored due to path filters (1)
api-spec/protobuf/gen/ark/v1/indexer.pb.rgw.gois excluded by!**/gen/**
📒 Files selected for processing (1)
internal/interface/grpc/handlers/indexer.go
done in: 937f4a9 |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
internal/interface/grpc/handlers/indexer_test.go (1)
427-429: Brief sleep is acceptable, but consider an alternative.The 20ms sleep allows the handler to enter its select loop. While brief and commented, this pattern can occasionally cause flaky tests. Consider sending a heartbeat-triggering wait instead, or accepting this minor risk for test simplicity.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/interface/grpc/handlers/indexer_test.go` around lines 427 - 429, Replace the fragile time.Sleep(20 * time.Millisecond) before calling cancel() with an explicit synchronization signal from the handler's select loop (e.g., a ready channel or WaitGroup): have the handler send on a readyChan once it has entered the select loop, then the test should block on receiving from readyChan before calling cancel(); update the test to wait for that ready signal instead of sleeping and adjust the handler under test to close/send on the readyChan when its select loop starts.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/interface/grpc/handlers/indexer.go`:
- Around line 538-558: The Modify handler can apply addTopics then fail while
parsing/removing scripts leaving partial state; to fix, validate all script
inputs up-front by calling parseScripts for both modify.GetAddScripts() and
modify.GetRemoveScripts() before calling h.scriptSubsHandler.addTopics or
h.scriptSubsHandler.removeTopics (using req.GetSubscriptionId()), and only apply
mutations after both parses succeed (or implement a compensating rollback if you
prefer transactional semantics) so the operation is all-or-nothing.
---
Nitpick comments:
In `@internal/interface/grpc/handlers/indexer_test.go`:
- Around line 427-429: Replace the fragile time.Sleep(20 * time.Millisecond)
before calling cancel() with an explicit synchronization signal from the
handler's select loop (e.g., a ready channel or WaitGroup): have the handler
send on a readyChan once it has entered the select loop, then the test should
block on receiving from readyChan before calling cancel(); update the test to
wait for that ready signal instead of sleeping and adjust the handler under test
to close/send on the readyChan when its select loop starts.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 708f7d75-a843-4332-a5bc-2c9fe7bf87a5
📒 Files selected for processing (2)
internal/interface/grpc/handlers/indexer.gointernal/interface/grpc/handlers/indexer_test.go
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/interface/grpc/handlers/indexer.go`:
- Around line 567-570: The response currently echoes the request payload
(modify.GetAddScripts/RemoveScripts) instead of the effective changes; update
the handler in indexer.go to compute the before-set using
h.scriptSubsHandler.getTopics(req.GetSubscriptionId()), apply the requested
adds/removes against that set (or call the broker/store method that returns
effective changes), then build UpdateSubscriptionScriptsResponse.ScriptsAdded
and ScriptsRemoved from the set differences between before and after (i.e., only
scripts actually inserted or deleted), and keep AllScripts as the resulting
topic set; reference modify.GetAddScripts(), modify.GetRemoveScripts(),
h.scriptSubsHandler.getTopics(), and UpdateSubscriptionScriptsResponse when
making this change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 59cac12f-1ef1-497b-8c6f-8c55d585c7e5
📒 Files selected for processing (1)
internal/interface/grpc/handlers/indexer.go
🔍 Review — Single-connection GetSubscription flow (d2c6527)Scope: New inline subscription creation via Protocol DesignThe dual-path approach is clean:
Security Review
Observations
Test Coverage826 lines of handler tests covering: new flow lifecycle (start event, receive events, cleanup on close, invalid scripts, heartbeat timing, mid-stream script updates, heartbeat reset after events), old flow (timeout-based cleanup, immediate cleanup with no scripts, existing subscription_id), and Cross-repo Impact
Solid work. Main action item: clarify the |
|
Removed the script cache code in: b2d81b7 Note the code specifically asked for its removal by referencing this PR to do so |
🔍 Follow-up review — scripts cache removal (b2d81b7)Addresses the What changed:
Assessment: ✅ Reconnect logic is correct for both flows. Old flow (non-empty
No blocking issues. LGTM on this commit. |
…onnection # Conflicts: # api-spec/protobuf/gen/ark/v1/indexer.pb.go
closes: #950
Aligns
IndexerServicewith the single-connection event stream pattern introduced for ArkService in #829, as proposed in #950. Old clients are fully backward-compatible.GetSubscriptionnow accepts an empty subscription_id — the server creates the subscription inline, sends aSubscriptionStartedEventas the first message, and cleans up on disconnect. Initial scriptscan be provided in the same call.
UpdateSubscriptionScriptsunary RPC — consolidates add/remove/overwrite into a single RPC with oneof semantics (ModifyScriptsorOverwriteScripts), mirroringUpdateStreamTopics.GetSubscriptionsupports the new flow, storing the subscription ID fromSubscriptionStartedEventand exposingModifySubscriptionScripts/OverwriteSubscriptionScriptshelpers. Stale IDs are cleared on reconnect.
Changes
Area:
indexer.protoWhat changed: Added scripts field to
GetSubscriptionRequest,SubscriptionStartedEventto response oneof, newUpdateSubscriptionScriptsRPC withModifyScripts/OverwriteScriptsmessages────────────────────────────────────────
Area:
handlers/indexer.goWhat changed:
GetSubscriptionbranches on empty vs provided subscription_id; newUpdateSubscriptionScriptshandler with modify/overwrite logic────────────────────────────────────────
Area:
handlers/indexer_test.goWhat changed: ~800 lines of new tests covering single-connection flow, initial scripts,
UpdateSubscriptionScripts(modify & overwrite), and error cases────────────────────────────────────────
Area:
pkg/client-lib/indexer/What changed:
GetSubscriptionaccepts variadic scripts, handlesSubscriptionStartedEvent, clears stale ID on reconnect; newModifySubscriptionScriptsandOverwriteSubscriptionScriptsmethods────────────────────────────────────────
Area:
permissions.goWhat changed: Added permission entry for the new RPC
Existing clients: No changes required.
SubscribeForScripts→GetSubscription(id)→UnsubscribeForScriptscontinues to work identically.New clients:
GetSubscription()(no ID, optional initial scripts) → receiveSubscriptionStartedEvent→UpdateSubscriptionScriptsto modify. Single connection, cleaner lifecycle.Summary by CodeRabbit
New Features
Tests