Skip to content

Token-aware routing (WIP) #105

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/datastax/go-cassandra-native-protocol v0.0.0-20211124104234-f6aea54fa801
github.com/hashicorp/golang-lru v0.5.4
github.com/stretchr/testify v1.7.0
github.com/twmb/murmur3 v1.1.6 // indirect
go.uber.org/atomic v1.8.0
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.17.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/twmb/murmur3 v1.1.6 h1:mqrRot1BRxm+Yct+vavLMou2/iJt0tNVTTC0QoIjaZg=
github.com/twmb/murmur3 v1.1.6/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go v1.2.6/go.mod h1:anCg0y61KIhDlPZmnH+so+RQbysYVyDko0IMgJv0Nn0=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
Expand Down
20 changes: 12 additions & 8 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,8 @@ func (p *Proxy) findSession(version primitive.ProtocolVersion, keyspace string)
}
}

func (p *Proxy) newQueryPlan() proxycore.QueryPlan {
return p.lb.NewQueryPlan()
func (p *Proxy) newQueryPlan(keyspace string, token proxycore.Token) proxycore.QueryPlan {
return p.lb.NewQueryPlan(keyspace, token)
}

var (
Expand Down Expand Up @@ -575,15 +575,16 @@ func (c *client) Receive(reader io.Reader) error {
case *partialQuery:
c.handleQuery(raw, msg, body.CustomPayload)
case *partialBatch:
c.execute(raw, notDetermined, c.keyspace, msg)
// FIXME: Calculate token for partition key
c.execute(raw, notDetermined, c.keyspace, nil, msg)
default:
c.send(raw.Header, &message.ProtocolError{ErrorMessage: "Unsupported operation"})
}

return nil
}

func (c *client) execute(raw *frame.RawFrame, state idempotentState, keyspace string, msg message.Message) {
func (c *client) execute(raw *frame.RawFrame, state idempotentState, keyspace string, token proxycore.Token, msg message.Message) {
if sess, err := c.proxy.findSession(raw.Header.Version, c.keyspace); err == nil {
req := &request{
client: c,
Expand All @@ -593,7 +594,7 @@ func (c *client) execute(raw *frame.RawFrame, state idempotentState, keyspace st
keyspace: keyspace,
done: false,
stream: raw.Header.StreamId,
qp: c.proxy.newQueryPlan(),
qp: c.proxy.newQueryPlan(keyspace, token),
raw: raw,
}
req.Execute(true)
Expand Down Expand Up @@ -649,7 +650,8 @@ func (c *client) handlePrepare(raw *frame.RawFrame, msg *message.Prepare) {
}

} else {
c.execute(raw, isIdempotent, keyspace, msg) // Prepared statements can be retried themselves
// FIXME: Calculate token for partition key
c.execute(raw, isIdempotent, keyspace, nil, msg) // Prepared statements can be retried themselves
}
}

Expand All @@ -658,7 +660,8 @@ func (c *client) handleExecute(raw *frame.RawFrame, msg *partialExecute, customP
if stmt, ok := c.preparedSystemQuery[id]; ok {
c.interceptSystemQuery(raw.Header, stmt)
} else {
c.execute(raw, c.getDefaultIdempotency(customPayload), "", msg)
// FIXME: Calculate token for partition key
c.execute(raw, notDetermined, "", nil, msg)
}
}

Expand All @@ -675,7 +678,8 @@ func (c *client) handleQuery(raw *frame.RawFrame, msg *partialQuery, customPaylo
c.interceptSystemQuery(raw.Header, stmt)
}
} else {
c.execute(raw, c.getDefaultIdempotency(customPayload), c.keyspace, msg)
// FIXME: Calculate token for partition key
c.execute(raw, notDetermined, c.keyspace, nil, msg)
}
}

Expand Down
15 changes: 11 additions & 4 deletions proxycore/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func (a UpEvent) isEvent() {

type BootstrapEvent struct {
Hosts []*Host
Partitioner
Keyspaces map[string]ReplicationStrategy
}

func (b BootstrapEvent) isEvent() {
Expand Down Expand Up @@ -110,7 +112,7 @@ type ClusterConfig struct {
}

type ClusterInfo struct {
Partitioner string
Partitioner Partitioner
ReleaseVersion string
CQLVersion string
LocalDC string
Expand Down Expand Up @@ -301,7 +303,12 @@ func (c *Cluster) queryHosts(ctx context.Context, conn *ClientConn, version prim
row := rs.Row(0)
localDC := hosts[0].DC

partitioner, err := row.StringByName("partitioner")
partitionerName, err := row.StringByName("partitioner")
if err != nil {
return nil, ClusterInfo{}, err
}

partitioner, err := NewPartitionerFromName(partitionerName)
if err != nil {
return nil, ClusterInfo{}, err
}
Expand Down Expand Up @@ -348,7 +355,7 @@ func (c *Cluster) addHosts(hosts []*Host, rs *ResultSet) []*Host {
for i := 0; i < rs.RowCount(); i++ {
row := rs.Row(i)
if endpoint, err := c.config.Resolver.NewEndpoint(row); err == nil {
if host, err := NewHostFromRow(endpoint, row); err == nil {
if host, err := NewHostFromRow(endpoint, c.Info.Partitioner, row); err == nil {
hosts = append(hosts, host)
} else {
c.logger.Error("unable to create new host", zap.Stringer("endpoint", endpoint), zap.Error(err))
Expand Down Expand Up @@ -449,7 +456,7 @@ func (c *Cluster) stayConnected() {
continue
}
}
newListener.OnEvent(&BootstrapEvent{c.hosts})
newListener.OnEvent(&BootstrapEvent{c.hosts, c.Info.Partitioner, map[string]ReplicationStrategy{}}) // FIXME: Get keyspace info
c.listeners = append(c.listeners, newListener)
case <-refreshTimer.C:
c.refreshHosts()
Expand Down
34 changes: 21 additions & 13 deletions proxycore/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,31 @@

package proxycore

import "fmt"

type Host struct {
Endpoint
DC string
DC string
Rack string
Tokens []Token
}

func NewHostFromRow(endpoint Endpoint, row Row) (*Host, error) {
dc, err := row.StringByName("data_center")
func NewHostFromRow(endpoint Endpoint, partitioner Partitioner, row Row) (*Host, error) {
dc, err := row.ByName("data_center")
if err != nil {
return nil, err
return nil, fmt.Errorf("error attmpting to get 'data_center' column: %v", err)
}
return &Host{endpoint, dc}, nil
}

func (h *Host) Key() string {
return h.Endpoint.Key()
}

func (h *Host) String() string {
return h.Endpoint.String()
rack, err := row.ByName("rack")
if err != nil {
return nil, fmt.Errorf("error attmpting to get 'rack' column: %v", err)
}
tokensVal, err := row.ByName("tokens")
if err != nil {
return nil, fmt.Errorf("error attmpting to get 'tokens' column: %v", err)
}
tokens := make([]Token, 0, len(tokensVal.([]string)))
for _, token := range tokensVal.([]string) {
tokens = append(tokens, partitioner.FromString(token))
}
return &Host{endpoint, dc.(string), rack.(string), tokens}, nil
}
59 changes: 53 additions & 6 deletions proxycore/lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,19 @@ type QueryPlan interface {

type LoadBalancer interface {
ClusterListener
NewQueryPlan() QueryPlan
NewQueryPlan(keyspace string, token Token) QueryPlan
}

func NewRoundRobinLoadBalancer() LoadBalancer {
lb := &roundRobinLoadBalancer{
mu: &sync.Mutex{},
}
lb := &roundRobinLoadBalancer{}
lb.hosts.Store(make([]*Host, 0))
return lb
}

type roundRobinLoadBalancer struct {
hosts atomic.Value
index uint32
mu *sync.Mutex
mu sync.Mutex
}

func (l *roundRobinLoadBalancer) OnEvent(event Event) {
Expand Down Expand Up @@ -69,7 +67,7 @@ func (l *roundRobinLoadBalancer) copy() []*Host {
return cpy
}

func (l *roundRobinLoadBalancer) NewQueryPlan() QueryPlan {
func (l *roundRobinLoadBalancer) NewQueryPlan(_ string, _ Token) QueryPlan {
return &roundRobinQueryPlan{
hosts: l.hosts.Load().([]*Host),
offset: atomic.AddUint32(&l.index, 1) - 1,
Expand All @@ -92,3 +90,52 @@ func (p *roundRobinQueryPlan) Next() *Host {
p.index++
return host
}

type tokenAwareLoadBalancer struct {
tokenMap *TokenMap
partitioner Partitioner
mu sync.Mutex
}

func (l *tokenAwareLoadBalancer) OnEvent(event Event) {
l.mu.Lock()
defer l.mu.Unlock()

switch evt := event.(type) {
case *BootstrapEvent:
l.tokenMap = NewTokenMap(evt.Hosts, evt.Keyspaces)
l.partitioner = evt.Partitioner
case *AddEvent:
l.tokenMap.AddHost(evt.Host)
case *RemoveEvent:
l.tokenMap.RemoveHost(evt.Host)
}
//TODO implement me
panic("implement me")
}

func (l *tokenAwareLoadBalancer) NewQueryPlan(keyspace string, token Token) QueryPlan {
if token != nil {
replicas, err := l.tokenMap.GetReplicas(keyspace, token)
if err != nil {
return &tokenAwareQueryPlan{replicas: replicas}
} else {
//TODO implement me
panic("implement me")
}
} else {
//TODO implement me
panic("implement me")
}
return nil
}

type tokenAwareQueryPlan struct {
replicas []*Host
index int
}

func (t tokenAwareQueryPlan) Next() *Host {
//TODO implement me
panic("implement me")
}
Loading