Skip to content

[WIP] NetworkDB performance improvements #2046

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions networkdb/benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package networkdb

import (
"strconv"
"testing"

"github.com/stretchr/testify/assert"
)

func TestAddNetworkNode(t *testing.T) {
n := &NetworkDB{config: &Config{NodeID: "node-0"}, networkNodes: make(map[string]map[string]struct{})}
for i := 0; i < 2000; i++ {
n.addNetworkNode("network", "node-"+strconv.Itoa(i%1000))
}
assert.Equal(t, 1000, len(n.networkNodes["network"]))
for i := 0; i < 2000; i++ {
n.addNetworkNode("network"+strconv.Itoa(i%1000), "node-"+strconv.Itoa(i))
}
for i := 0; i < 1000; i++ {
assert.Equal(t, 2, len(n.networkNodes["network"+strconv.Itoa(i%1000)]))
}
}

func TestDeleteNetworkNode(t *testing.T) {
n := &NetworkDB{config: &Config{NodeID: "node-0"}, networkNodes: make(map[string]map[string]struct{})}
for i := 0; i < 1000; i++ {
n.addNetworkNode("network", "node-"+strconv.Itoa(i%1000))
}
assert.Equal(t, 1000, len(n.networkNodes["network"]))
for i := 0; i < 2000; i++ {
n.deleteNetworkNode("network", "node-"+strconv.Itoa(i%1000))
}
assert.Equal(t, 0, len(n.networkNodes["network"]))
for i := 0; i < 2000; i++ {
n.addNetworkNode("network"+strconv.Itoa(i%1000), "node-"+strconv.Itoa(i))
}
for i := 0; i < 1000; i++ {
assert.Equal(t, 2, len(n.networkNodes["network"+strconv.Itoa(i%1000)]))
n.deleteNetworkNode("network"+strconv.Itoa(i%1000), "node-"+strconv.Itoa(i))
assert.Equal(t, 1, len(n.networkNodes["network"+strconv.Itoa(i%1000)]))
}
for i := 1000; i < 2000; i++ {
n.deleteNetworkNode("network"+strconv.Itoa(i%1000), "node-"+strconv.Itoa(i))
assert.Equal(t, 0, len(n.networkNodes["network"+strconv.Itoa(i%1000)]))
}
}

func TestRandomNodes(t *testing.T) {
n := &NetworkDB{config: &Config{NodeID: "node-0"}}
nodes := make(map[string]struct{})
for i := 0; i < 1000; i++ {
nodes["node-"+strconv.Itoa(i)] = struct{}{}
}
nodeHit := make(map[string]int)
for i := 0; i < 5000; i++ {
chosen := n.mRandomNodes(3, nodes)
for _, c := range chosen {
if c == "node-0" {
t.Fatal("should never hit itself")
}
nodeHit[c]++
}
}

// check results
var min, max int
for node, hit := range nodeHit {
if min == 0 {
min = hit
}
if hit == 0 && node != "node-0" {
t.Fatal("node never hit")
}
if hit > max {
max = hit
}
if hit < min {
min = hit
}
}
assert.NotEqual(t, 0, min)
}

func BenchmarkAddNetworkNode(b *testing.B) {
n := &NetworkDB{config: &Config{NodeID: "node-0"}, networkNodes: make(map[string]map[string]struct{})}
for i := 0; i < b.N; i++ {
n.addNetworkNode("network", "node-"+strconv.Itoa(i%1000))
}
}

func BenchmarkDeleteNetworkNode(b *testing.B) {
n := &NetworkDB{config: &Config{NodeID: "node-0"}, networkNodes: make(map[string]map[string]struct{})}
nodes := make([]string, 0, 1000)
for i := 0; i < 1000; i++ {
name := "node-" + strconv.Itoa(i)
n.addNetworkNode("network", name)
nodes = append(nodes, name)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
n.deleteNetworkNode("network", nodes[i%1000])
}
}

func BenchmarkRandomNodes(b *testing.B) {
n := &NetworkDB{config: &Config{NodeID: "node-0"}}
nodes := make(map[string]struct{})
for i := 0; i < 1000; i++ {
nodes["node-"+strconv.Itoa(i)] = struct{}{}
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
n.mRandomNodes(3, nodes)
}
}
70 changes: 15 additions & 55 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
@@ -2,11 +2,9 @@ package networkdb

import (
"bytes"
"crypto/rand"
"encoding/hex"
"fmt"
"log"
"math/big"
rnd "math/rand"
"net"
"strings"
@@ -283,7 +281,7 @@ func (nDB *NetworkDB) reconnectNode() {
}
nDB.RUnlock()

node := nodes[randomOffset(len(nodes))]
node := nodes[rnd.Int31()%int32(len(nodes))]
addr := net.UDPAddr{IP: node.Addr, Port: int(node.Port)}

if _, err := nDB.memberlist.Join([]string{addr.String()}); err != nil {
@@ -295,7 +293,7 @@ func (nDB *NetworkDB) reconnectNode() {
}

logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name)
nDB.bulkSync([]string{node.Name}, true)
nDB.bulkSync([]string{node.Name})
}

// For timing the entry deletion in the repaer APIs that doesn't use monotonic clock
@@ -378,7 +376,7 @@ func (nDB *NetworkDB) gossip() {
nDB.RLock()
thisNodeNetworks := nDB.networks[nDB.config.NodeID]
for nid := range thisNodeNetworks {
networkNodes[nid] = nDB.networkNodes[nid]
networkNodes[nid] = nDB.mRandomNodes(3, nDB.networkNodes[nid])
}
printStats := time.Since(nDB.lastStatsTimestamp) >= nDB.config.StatsPrintPeriod
printHealth := time.Since(nDB.lastHealthTimestamp) >= nDB.config.HealthPrintPeriod
@@ -393,7 +391,6 @@ func (nDB *NetworkDB) gossip() {
}

for nid, nodes := range networkNodes {
mNodes := nDB.mRandomNodes(3, nodes)
bytesAvail := nDB.config.PacketBufferSize - compoundHeaderOverhead

nDB.RLock()
@@ -432,7 +429,7 @@ func (nDB *NetworkDB) gossip() {
// Create a compound message
compound := makeCompoundMessage(msgs)

for _, node := range mNodes {
for _, node := range nodes {
nDB.RLock()
mnode := nDB.nodes[node]
nDB.RUnlock()
@@ -473,15 +470,15 @@ func (nDB *NetworkDB) bulkSyncTables() {
networks = networks[1:]

nDB.RLock()
nodes := nDB.networkNodes[nid]
nodes := nDB.mRandomNodes(2, nDB.networkNodes[nid])
nDB.RUnlock()

// No peer nodes on this network. Move on.
if len(nodes) == 0 {
continue
}

completed, err := nDB.bulkSync(nodes, false)
completed, err := nDB.bulkSync(nodes)
if err != nil {
logrus.Errorf("periodic bulk sync failure for network %s: %v", nid, err)
continue
@@ -508,13 +505,7 @@ func (nDB *NetworkDB) bulkSyncTables() {
}
}

func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) {
if !all {
// Get 2 random nodes. 2nd node will be tried if the bulk sync to
// 1st node fails.
nodes = nDB.mRandomNodes(2, nodes)
}

func (nDB *NetworkDB) bulkSync(nodes []string) ([]string, error) {
if len(nodes) == 0 {
return nil, nil
}
@@ -527,12 +518,7 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) {
}
logrus.Debugf("%v(%v): Initiating bulk sync with node %v", nDB.config.Hostname, nDB.config.NodeID, node)
networks = nDB.findCommonNetworks(node)
err = nDB.bulkSyncNode(networks, node, true)
// if its periodic bulksync stop after the first successful sync
if !all && err == nil {
break
}
if err != nil {
if err = nDB.bulkSyncNode(networks, node, true); err != nil {
err = fmt.Errorf("bulk sync to node %s failed: %v", node, err)
logrus.Warn(err.Error())
}
@@ -649,48 +635,22 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
return nil
}

// Returns a random offset between 0 and n
func randomOffset(n int) int {
if n == 0 {
return 0
}

val, err := rand.Int(rand.Reader, big.NewInt(int64(n)))
if err != nil {
logrus.Errorf("Failed to get a random offset: %v", err)
return 0
}

return int(val.Int64())
}

// mRandomNodes is used to select up to m random nodes. It is possible
// that less than m nodes are returned.
func (nDB *NetworkDB) mRandomNodes(m int, nodes []string) []string {
n := len(nodes)
func (nDB *NetworkDB) mRandomNodes(m int, nodes map[string]struct{}) []string {
mNodes := make([]string, 0, m)
OUTER:
// Probe up to 3*n times, with large n this is not necessary
// since k << n, but with small n we want search to be
// exhaustive
for i := 0; i < 3*n && len(mNodes) < m; i++ {
// Get random node
idx := randomOffset(n)
node := nodes[idx]

var i int
for node := range nodes {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think go does not guarantee the range is random as a spec. Will there be any side effects if a certain implementation of Go returns predictable range? Also is the impact of calling randomOffet rather significant?

Copy link
Author

@fcrisciani fcrisciani Jan 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to check on that, I read that since go 1.0 the keys were randomized, but have to check is that is a common assumption for all the architectures, or at least the one that we support.
Yes the randomOffset is the bottlenek, that can be seen from the detailed flame graph, the rand.Int has 90% and 10% is the big.NewInt

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. This thread has some details: https://groups.google.com/forum/#!topic/golang-nuts/zBcqMsDNt7Q . I guess it is safe to assume the randomness if there is a major perf gain and just have a comment about the assumption.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not cryptographically random and also the test highlighted some unbalance in the results, but the test also makes sure that there is no node that never comes up in the selection (min == 0 condition)
Another approach that I was thinking about is to have the original string slice and an index that is saved with the network and loop on the slice in a circular buffer fashion. Considering that nodes not change every second, that should guarantee fairness in the selection and the randomness is guaranteed at insertion time (on the base of when the node join). The problem of the slice still remains the linear loop for each insertion and deletion that is pretty lame to pay.

if node == nDB.config.NodeID {
continue
}

// Check if we have this node already
for j := 0; j < len(mNodes); j++ {
if node == mNodes[j] {
continue OUTER
}
}

// Append the node
mNodes = append(mNodes, node)
i++
if i == m {
break
}
}

return mNodes
12 changes: 4 additions & 8 deletions networkdb/delegate.go
Original file line number Diff line number Diff line change
@@ -154,18 +154,14 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
// Ignore the table events for networks that are in the process of going away
nDB.RLock()
networks := nDB.networks[nDB.config.NodeID]
network, ok := networks[tEvent.NetworkID]
network, okNetwork := networks[tEvent.NetworkID]
// Check if the owner of the event is still part of the network
nodes := nDB.networkNodes[tEvent.NetworkID]
var nodePresent bool
for _, node := range nodes {
if node == tEvent.NodeName {
nodePresent = true
break
}
if nodes, ok := nDB.networkNodes[tEvent.NetworkID]; ok {
_, nodePresent = nodes[tEvent.NodeName]
}
nDB.RUnlock()
if !ok || network.leaving || !nodePresent {
if !okNetwork || network.leaving || !nodePresent {
// I'm out of the network OR the event owner is not anymore part of the network so do not propagate
return false
}
Loading