-
Notifications
You must be signed in to change notification settings - Fork 3
feat: [DEVEX-304] supports new caught up and fell behind events. #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements support for new "caught up" and "fell behind" events for subscriptions by updating both the protocol buffers definitions and the corresponding event handling logic in Go. Key changes include:
- Enhancements to protos/streams.proto with detailed definitions for "CaughtUp" and "FellBehind" messages.
- Updates in kurrentdb/subscriptions.go to handle the new event fields.
- Adjustments in kurrentdb/subscription_event.go to reflect the new caught up and fell behind event types.
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.
File | Description |
---|---|
protos/streams.proto | Added detailed definitions for CaughtUp and FellBehind messages. |
kurrentdb/subscriptions.go | Updated event handling logic to populate caught up and fell behind. |
kurrentdb/subscription_event.go | Updated subscription event types to separate caught up and fell behind. |
caughtUp = new(CaughtUp) | ||
caughtUp.Date = wire.GetTimestamp().AsTime() | ||
|
||
if wire.StreamRevision != nil { | ||
caughtUp.StreamRevision = new(uint64) | ||
*caughtUp.StreamRevision = uint64(wire.GetStreamRevision()) | ||
} else { | ||
caughtUp.Position = new(Position) | ||
*caughtUp.Position = Position{ | ||
Commit: wire.GetPosition().CommitPosition, | ||
Prepare: wire.GetPosition().PreparePosition, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The event handling code in the CaughtUp case expects 'stream_revision' and 'position' fields, but the updated proto definition for CaughtUp only includes a 'timestamp' field. Consider aligning the CaughtUp proto message with the client-side expectations or updating the client logic accordingly.
caughtUp = new(CaughtUp) | |
caughtUp.Date = wire.GetTimestamp().AsTime() | |
if wire.StreamRevision != nil { | |
caughtUp.StreamRevision = new(uint64) | |
*caughtUp.StreamRevision = uint64(wire.GetStreamRevision()) | |
} else { | |
caughtUp.Position = new(Position) | |
*caughtUp.Position = Position{ | |
Commit: wire.GetPosition().CommitPosition, | |
Prepare: wire.GetPosition().PreparePosition, | |
} | |
} | |
caughtUp = &CaughtUp{ | |
Date: wire.GetTimestamp().AsTime(), |
Copilot uses AI. Check for mistakes.
Related Jira DEVEX-304 |
No description provided.