-
Notifications
You must be signed in to change notification settings - Fork 84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Configurable support for rewriting consistency levels on writes #142
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am somewhat worried about the short-term/long-term costs of this kind of change discussed in the thread. That said, this PR seems like a reasonable way to address the issue without resorting to absurdity. Given the current situation my inclination is to say let's go with it.
Can we make the merged into branch not |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't merge to main
Added a PR for only modifying the consistency when it's invalid and only for writes: #143 |
|
* Only override write consistency * Elimnate handled parser signature change * Ingore error when returning default statement * Make option more generalized and add a test
proxy/proxy.go
Outdated
@@ -880,15 +880,15 @@ func (c *client) maybeOverrideAstraWriteConsistency(isSelect bool, raw *frame.Ra | |||
switch m := msg.(type) { | |||
case *partialExecute: | |||
if c.isInvalidAstraWriteConsistency(m.consistency) { | |||
_ = parser.PatchExecuteConsistency(raw.Body, overrideConsistency) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made that change locally but never commited... sigh
proxy/frame_patch_test.go
Outdated
) | ||
|
||
func TestPatchQueryConsistency(t *testing.T) { | ||
t.Run("valid QUERY body", func(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For valid cases, one suggestion is to use the built-in messages and codecs (esp. for batch). Not a big deal.
diff --git a/proxy/frame_patch_test.go b/proxy/frame_patch_test.go
index 52481f7..efc7e33 100644
--- a/proxy/frame_patch_test.go
+++ b/proxy/frame_patch_test.go
@@ -1,28 +1,46 @@
package proxy
import (
+ "bytes"
"encoding/binary"
+ "testing"
+
+ "github.com/datastax/go-cassandra-native-protocol/message"
"github.com/datastax/go-cassandra-native-protocol/primitive"
"github.com/stretchr/testify/assert"
- "testing"
+ "github.com/stretchr/testify/require"
)
func TestPatchQueryConsistency(t *testing.T) {
- t.Run("valid QUERY body", func(t *testing.T) {
- query := []byte("SELECT * FROM users;")
- queryLen := uint32(len(query))
+ const version = primitive.ProtocolVersion4
- body := make([]byte, 4+len(query)+2)
- binary.BigEndian.PutUint32(body[0:4], queryLen)
- copy(body[4:], query)
- binary.BigEndian.PutUint16(body[4+len(query):], uint16(primitive.ConsistencyLevelOne))
+ var queryCodec message.Codec
- err := patchQueryConsistency(body, primitive.ConsistencyLevelQuorum)
+ for _, c := range message.DefaultMessageCodecs {
+ if c.GetOpCode() == primitive.OpCodeQuery {
+ queryCodec = c
+ }
+ }
+ assert.NotNil(t, queryCodec)
+
+ t.Run("valid QUERY body", func(t *testing.T) {
+ var buf bytes.Buffer
+ err := queryCodec.Encode(&message.Query{
+ Query: "SELECT * FROM test",
+ Options: &message.QueryOptions{
+ Consistency: primitive.ConsistencyLevelOne,
+ },
+ }, &buf, version)
assert.NoError(t, err)
- offset := 4 + len(query)
- got := binary.BigEndian.Uint16(body[offset : offset+2])
- assert.Equal(t, uint16(primitive.ConsistencyLevelQuorum), got)
+ body := buf.Bytes()
+ err = patchQueryConsistency(body, primitive.ConsistencyLevelQuorum)
+ assert.NoError(t, err)
+
+ msg, err := queryCodec.Decode(bytes.NewBuffer(body), version)
+ require.NoError(t, err)
+
+ assert.Equal(t, primitive.ConsistencyLevelQuorum, msg.(*message.Query).Options.Consistency)
})
t.Run("too short body", func(t *testing.T) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great Suggestion , it led to discovering a bug - 712ae04
* Add batch test and a fix * typo
@@ -42,7 +42,7 @@ func TestLookupEndpoint_Invalid(t *testing.T) { | |||
err string | |||
}{ | |||
{"localhost", "missing port in address"}, | |||
{"dne:1234", ""}, // Errors for DNS can vary per system | |||
{"test:1234", ""}, // Errors for DNS can vary per system |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dne is getting resolved to 143.244.220.150
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to investigate this further, but to unblock things for now, I’ve changed the host to test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: Use a randomized unresolvable domain to avoid DNS caching or accidental resolution
* Add logging for consistency override * Fix and add more logging * Rename
as per @mpenick's suggestion
Implemented
patchQueryConsistency
,patchExecuteConsistency
, andpatchBatchConsistency
to update the consistency level directly in the raw frame by identifying its byte position.These implementations may be refactored in the future to use the encoder for a cleaner solution.
fixes : #139