Skip to content

Commit

Permalink
Create a member list of the cluster using etcd (#185)
Browse files Browse the repository at this point in the history
  • Loading branch information
hackerwins authored May 20, 2021
1 parent 0c2955b commit ba7b5f2
Show file tree
Hide file tree
Showing 23 changed files with 765 additions and 150 deletions.
8 changes: 3 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ go 1.16

require (
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.4.2
github.com/golang/protobuf v1.4.3
github.com/golang/snappy v0.0.1 // indirect
github.com/golangci/golangci-lint v1.31.0
github.com/google/uuid v1.0.0
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/magiconair/properties v1.8.1
github.com/moby/locker v1.0.1
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/client_golang v1.10.0
github.com/rs/xid v1.2.1
github.com/spf13/cobra v1.1.1
github.com/stretchr/testify v1.6.2-0.20201103103935-92707c0b2d50
Expand All @@ -21,8 +20,7 @@ require (
github.com/xdg/stringprep v1.0.0 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20201125193152-8a03d2e9614b
go.mongodb.org/mongo-driver v1.1.2
go.uber.org/multierr v1.2.0 // indirect
go.uber.org/zap v1.11.0
go.uber.org/zap v1.13.0
golang.org/x/tools v0.0.0-20201014231627-1610a49f37af // indirect
google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a
google.golang.org/grpc v1.26.0
Expand Down
180 changes: 157 additions & 23 deletions go.sum

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions test/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ func ByteCountIEC(b uint64) string {
}

// TestYorkie returns Yorkie instance for testing.
func TestYorkie() *yorkie.Yorkie {
func TestYorkie(portOffset int) *yorkie.Yorkie {
y, err := yorkie.New(&yorkie.Config{
RPC: &rpc.Config{
Port: RPCPort,
Port: RPCPort + portOffset,
},
Metrics: &prometheus.Config{
Port: MetricsPort,
Port: MetricsPort + portOffset,
},
Backend: &backend.Config{
SnapshotThreshold: SnapshotThreshold,
Expand Down
4 changes: 2 additions & 2 deletions test/integration/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

func TestClient(t *testing.T) {
t.Run("dial and close test", func(t *testing.T) {
cli, err := client.Dial(testYorkie.RPCAddr())
cli, err := client.Dial(defaultYorkie.RPCAddr())
assert.NoError(t, err)

defer func() {
Expand All @@ -39,7 +39,7 @@ func TestClient(t *testing.T) {
})

t.Run("activate/deactivate test", func(t *testing.T) {
cli, err := client.Dial(testYorkie.RPCAddr())
cli, err := client.Dial(defaultYorkie.RPCAddr())
assert.NoError(t, err)
defer func() {
err := cli.Close()
Expand Down
133 changes: 133 additions & 0 deletions test/integration/cluster_mode_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// +build integration

/*
* Copyright 2021 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package integration

import (
"context"
"io"
"sort"
gosync "sync"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/yorkie-team/yorkie/client"
"github.com/yorkie-team/yorkie/pkg/document"
"github.com/yorkie-team/yorkie/pkg/document/proxy"
"github.com/yorkie-team/yorkie/test/helper"
"github.com/yorkie-team/yorkie/yorkie/backend/sync"
)

func keysFromAgents(m map[string]*sync.AgentInfo) []string {
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}

sort.Strings(keys)

return keys
}

func TestClusterMode(t *testing.T) {
t.Run("member list test", func(t *testing.T) {
agentA := helper.TestYorkie(1000)
agentB := helper.TestYorkie(2000)
assert.NoError(t, agentA.Start())
assert.NoError(t, agentB.Start())

time.Sleep(time.Second)

assert.Equal(t, keysFromAgents(agentA.Members()), keysFromAgents(agentB.Members()))
assert.Len(t, defaultYorkie.Members(), 3)

assert.NoError(t, agentA.Shutdown(true))
assert.Len(t, defaultYorkie.Members(), 2)

assert.NoError(t, agentB.Shutdown(true))
assert.Len(t, defaultYorkie.Members(), 1)
})

t.Run("watch document across agents test", func(t *testing.T) {
agentA := helper.TestYorkie(1000)
agentB := helper.TestYorkie(2000)
assert.NoError(t, agentA.Start())
assert.NoError(t, agentB.Start())

defer func() {
assert.NoError(t, agentA.Shutdown(true))
assert.NoError(t, agentB.Shutdown(true))
}()

ctx := context.Background()
clientA, err := client.Dial(agentA.RPCAddr())
assert.NoError(t, err)
clientB, err := client.Dial(agentB.RPCAddr())
assert.NoError(t, err)
assert.NoError(t, clientA.Activate(ctx))
assert.NoError(t, clientB.Activate(ctx))

docA := document.New(helper.Collection, t.Name())
docB := document.New(helper.Collection, t.Name())

assert.NoError(t, clientA.Attach(ctx, docA))
assert.NoError(t, clientB.Attach(ctx, docB))

wg := gosync.WaitGroup{}

wg.Add(1)
rch := clientA.Watch(ctx, docA)
go func() {
defer wg.Done()

select {
case resp := <-rch:
if resp.Err == io.EOF {
return
}
assert.NoError(t, resp.Err)

err := clientA.Sync(ctx, resp.Keys...)
assert.NoError(t, err)
case <-time.After(time.Second):
return
}
}()

err = docB.Update(func(root *proxy.ObjectProxy) error {
root.SetString("hello", "world")
return nil
})
assert.NoError(t, err)

wg.Wait()

// TODO(hackerwins): uncomment below test
// assert.Equal(t, docA.Marshal(), docB.Marshal())

defer func() {
assert.NoError(t, clientA.Deactivate(ctx))
assert.NoError(t, clientA.Close())

assert.NoError(t, clientB.Deactivate(ctx))
assert.NoError(t, clientB.Close())
}()
})
}
4 changes: 4 additions & 0 deletions test/integration/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,20 @@ package integration
import (
"testing"

"github.com/rs/xid"
"github.com/stretchr/testify/assert"

"github.com/yorkie-team/yorkie/test/helper"
"github.com/yorkie-team/yorkie/yorkie/backend/sync"
"github.com/yorkie-team/yorkie/yorkie/backend/sync/etcd"
)

func TestETCD(t *testing.T) {
t.Run("new and close test", func(t *testing.T) {
cli, err := etcd.Dial(&etcd.Config{
Endpoints: helper.ETCDEndpoints,
}, &sync.AgentInfo{
ID: xid.New().String(),
})
assert.NoError(t, err)

Expand Down
18 changes: 11 additions & 7 deletions test/integration/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,17 @@ import (
"github.com/yorkie-team/yorkie/yorkie"
)

var testYorkie *yorkie.Yorkie
var defaultYorkie *yorkie.Yorkie

func TestMain(m *testing.M) {
y := helper.TestYorkie()
y := helper.TestYorkie(0)
if err := y.Start(); err != nil {
log.Logger.Fatal(err)
}
testYorkie = y
defaultYorkie = y
code := m.Run()
if testYorkie != nil {
if err := testYorkie.Shutdown(true); err != nil {
if defaultYorkie != nil {
if err := defaultYorkie.Shutdown(true); err != nil {
log.Logger.Error(err)
}
}
Expand Down Expand Up @@ -84,7 +84,7 @@ func syncClientsThenAssertEqual(t *testing.T, pairs []clientAndDocPair) {
}

func createConn() (*grpc.ClientConn, error) {
conn, err := grpc.Dial(testYorkie.RPCAddr(), grpc.WithInsecure())
conn, err := grpc.Dial(defaultYorkie.RPCAddr(), grpc.WithInsecure())
if err != nil {
return nil, err
}
Expand All @@ -95,23 +95,27 @@ func createConn() (*grpc.ClientConn, error) {
func createActivatedClients(t *testing.T, n int) (clients []*client.Client) {
for i := 0; i < n; i++ {
c, err := client.Dial(
testYorkie.RPCAddr(),
defaultYorkie.RPCAddr(),
client.Option{Metadata: map[string]string{
"name": fmt.Sprintf("name-%d", i),
}},
)
assert.NoError(t, err)

err = c.Activate(context.Background())
assert.NoError(t, err)

clients = append(clients, c)
}

return
}

func cleanupClients(t *testing.T, clients []*client.Client) {
for _, c := range clients {
err := c.Deactivate(context.Background())
assert.NoError(t, err)

err = c.Close()
assert.NoError(t, err)
}
Expand Down
3 changes: 3 additions & 0 deletions test/stress/etcd_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"testing"
"time"

"github.com/rs/xid"
"github.com/stretchr/testify/assert"

"github.com/yorkie-team/yorkie/pkg/log"
Expand All @@ -36,6 +37,8 @@ func TestETCDStress(t *testing.T) {
t.Run("lock/unlock stress test", func(t *testing.T) {
cli, err := etcd.Dial(&etcd.Config{
Endpoints: helper.ETCDEndpoints,
}, &sync.AgentInfo{
ID: xid.New().String(),
})
assert.NoError(t, err)
defer func() {
Expand Down
46 changes: 39 additions & 7 deletions yorkie/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
package backend

import (
"os"
gosync "sync"
"time"

"github.com/rs/xid"

"github.com/yorkie-team/yorkie/pkg/log"
"github.com/yorkie-team/yorkie/yorkie/backend/db"
Expand All @@ -26,7 +30,6 @@ import (
"github.com/yorkie-team/yorkie/yorkie/backend/sync/etcd"
"github.com/yorkie-team/yorkie/yorkie/backend/sync/memory"
"github.com/yorkie-team/yorkie/yorkie/metrics"
"github.com/yorkie-team/yorkie/yorkie/metrics/prometheus"
)

// Config is the configuration for creating a Backend instance.
Expand All @@ -42,7 +45,8 @@ type Config struct {
// Backend manages Yorkie's remote states such as data store, distributed lock
// and etc. And it has the server status like the configuration.
type Backend struct {
Config *Config
Config *Config
agentInfo *sync.AgentInfo

DB db.DB
LockerMap sync.LockerMap
Expand All @@ -65,31 +69,54 @@ func New(
conf *Config,
mongoConf *mongo.Config,
etcdConf *etcd.Config,
rpcAddr string,
met metrics.Metrics,
) (*Backend, error) {
met := prometheus.NewMetrics()
hostname, err := os.Hostname()
if err != nil {
return nil, err
}

agentInfo := &sync.AgentInfo{
ID: xid.New().String(),
Hostname: hostname,
RPCAddr: rpcAddr,
UpdatedAt: time.Now(),
}

mongoClient, err := mongo.Dial(mongoConf)
if err != nil {
return nil, err
}

// TODO(hackerwins): Merge these instances into Coordinator.
var pubSub sync.PubSub
var lockerMap sync.LockerMap
if etcdConf != nil {
etcdClient, err := etcd.Dial(etcdConf)
etcdClient, err := etcd.Dial(etcdConf, agentInfo)
if err != nil {
return nil, err
}
if err := etcdClient.Initialize(); err != nil {
return nil, err
}

lockerMap = etcdClient
// TODO(hackerwins): We need to replace pubsub with etcdClient.
pubSub = memory.NewPubSub()
pubSub = etcdClient
} else {
lockerMap = memory.NewLockerMap()
pubSub = memory.NewPubSub()
pubSub = memory.NewPubSub(agentInfo)
}

log.Logger.Infof(
"backend created: id: %s, addr: %s",
agentInfo.ID,
agentInfo.RPCAddr,
)

return &Backend{
Config: conf,
agentInfo: agentInfo,
DB: mongoClient,
LockerMap: lockerMap,
PubSub: pubSub,
Expand Down Expand Up @@ -133,3 +160,8 @@ func (b *Backend) AttachGoroutine(f func()) {
f()
}()
}

// Members returns the members of this cluster.
func (b *Backend) Members() map[string]*sync.AgentInfo {
return b.PubSub.Members()
}
Loading

0 comments on commit ba7b5f2

Please sign in to comment.