diff --git a/networkdb/benchmark_test.go b/networkdb/benchmark_test.go new file mode 100644 index 0000000000..8f20b7cd09 --- /dev/null +++ b/networkdb/benchmark_test.go @@ -0,0 +1,115 @@ +package networkdb + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAddNetworkNode(t *testing.T) { + n := &NetworkDB{config: &Config{NodeID: "node-0"}, networkNodes: make(map[string]map[string]struct{})} + for i := 0; i < 2000; i++ { + n.addNetworkNode("network", "node-"+strconv.Itoa(i%1000)) + } + assert.Equal(t, 1000, len(n.networkNodes["network"])) + for i := 0; i < 2000; i++ { + n.addNetworkNode("network"+strconv.Itoa(i%1000), "node-"+strconv.Itoa(i)) + } + for i := 0; i < 1000; i++ { + assert.Equal(t, 2, len(n.networkNodes["network"+strconv.Itoa(i%1000)])) + } +} + +func TestDeleteNetworkNode(t *testing.T) { + n := &NetworkDB{config: &Config{NodeID: "node-0"}, networkNodes: make(map[string]map[string]struct{})} + for i := 0; i < 1000; i++ { + n.addNetworkNode("network", "node-"+strconv.Itoa(i%1000)) + } + assert.Equal(t, 1000, len(n.networkNodes["network"])) + for i := 0; i < 2000; i++ { + n.deleteNetworkNode("network", "node-"+strconv.Itoa(i%1000)) + } + assert.Equal(t, 0, len(n.networkNodes["network"])) + for i := 0; i < 2000; i++ { + n.addNetworkNode("network"+strconv.Itoa(i%1000), "node-"+strconv.Itoa(i)) + } + for i := 0; i < 1000; i++ { + assert.Equal(t, 2, len(n.networkNodes["network"+strconv.Itoa(i%1000)])) + n.deleteNetworkNode("network"+strconv.Itoa(i%1000), "node-"+strconv.Itoa(i)) + assert.Equal(t, 1, len(n.networkNodes["network"+strconv.Itoa(i%1000)])) + } + for i := 1000; i < 2000; i++ { + n.deleteNetworkNode("network"+strconv.Itoa(i%1000), "node-"+strconv.Itoa(i)) + assert.Equal(t, 0, len(n.networkNodes["network"+strconv.Itoa(i%1000)])) + } +} + +func TestRandomNodes(t *testing.T) { + n := &NetworkDB{config: &Config{NodeID: "node-0"}} + nodes := make(map[string]struct{}) + for i := 0; i < 1000; i++ { + nodes["node-"+strconv.Itoa(i)] = struct{}{} + } + nodeHit := make(map[string]int) + for i := 0; i < 5000; i++ { + chosen := n.mRandomNodes(3, nodes) + for _, c := range chosen { + if c == "node-0" { + t.Fatal("should never hit itself") + } + nodeHit[c]++ + } + } + + // check results + var min, max int + for node, hit := range nodeHit { + if min == 0 { + min = hit + } + if hit == 0 && node != "node-0" { + t.Fatal("node never hit") + } + if hit > max { + max = hit + } + if hit < min { + min = hit + } + } + assert.NotEqual(t, 0, min) +} + +func BenchmarkAddNetworkNode(b *testing.B) { + n := &NetworkDB{config: &Config{NodeID: "node-0"}, networkNodes: make(map[string]map[string]struct{})} + for i := 0; i < b.N; i++ { + n.addNetworkNode("network", "node-"+strconv.Itoa(i%1000)) + } +} + +func BenchmarkDeleteNetworkNode(b *testing.B) { + n := &NetworkDB{config: &Config{NodeID: "node-0"}, networkNodes: make(map[string]map[string]struct{})} + nodes := make([]string, 0, 1000) + for i := 0; i < 1000; i++ { + name := "node-" + strconv.Itoa(i) + n.addNetworkNode("network", name) + nodes = append(nodes, name) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + n.deleteNetworkNode("network", nodes[i%1000]) + } +} + +func BenchmarkRandomNodes(b *testing.B) { + n := &NetworkDB{config: &Config{NodeID: "node-0"}} + nodes := make(map[string]struct{}) + for i := 0; i < 1000; i++ { + nodes["node-"+strconv.Itoa(i)] = struct{}{} + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + n.mRandomNodes(3, nodes) + } +} diff --git a/networkdb/cluster.go b/networkdb/cluster.go index 198caceeb8..8f1c0ff135 100644 --- a/networkdb/cluster.go +++ b/networkdb/cluster.go @@ -2,11 +2,9 @@ package networkdb import ( "bytes" - "crypto/rand" "encoding/hex" "fmt" "log" - "math/big" rnd "math/rand" "net" "strings" @@ -283,7 +281,7 @@ func (nDB *NetworkDB) reconnectNode() { } nDB.RUnlock() - node := nodes[randomOffset(len(nodes))] + node := nodes[rnd.Int31()%int32(len(nodes))] addr := net.UDPAddr{IP: node.Addr, Port: int(node.Port)} if _, err := nDB.memberlist.Join([]string{addr.String()}); err != nil { @@ -295,7 +293,7 @@ func (nDB *NetworkDB) reconnectNode() { } logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name) - nDB.bulkSync([]string{node.Name}, true) + nDB.bulkSync([]string{node.Name}) } // For timing the entry deletion in the repaer APIs that doesn't use monotonic clock @@ -378,7 +376,7 @@ func (nDB *NetworkDB) gossip() { nDB.RLock() thisNodeNetworks := nDB.networks[nDB.config.NodeID] for nid := range thisNodeNetworks { - networkNodes[nid] = nDB.networkNodes[nid] + networkNodes[nid] = nDB.mRandomNodes(3, nDB.networkNodes[nid]) } printStats := time.Since(nDB.lastStatsTimestamp) >= nDB.config.StatsPrintPeriod printHealth := time.Since(nDB.lastHealthTimestamp) >= nDB.config.HealthPrintPeriod @@ -393,7 +391,6 @@ func (nDB *NetworkDB) gossip() { } for nid, nodes := range networkNodes { - mNodes := nDB.mRandomNodes(3, nodes) bytesAvail := nDB.config.PacketBufferSize - compoundHeaderOverhead nDB.RLock() @@ -432,7 +429,7 @@ func (nDB *NetworkDB) gossip() { // Create a compound message compound := makeCompoundMessage(msgs) - for _, node := range mNodes { + for _, node := range nodes { nDB.RLock() mnode := nDB.nodes[node] nDB.RUnlock() @@ -473,7 +470,7 @@ func (nDB *NetworkDB) bulkSyncTables() { networks = networks[1:] nDB.RLock() - nodes := nDB.networkNodes[nid] + nodes := nDB.mRandomNodes(2, nDB.networkNodes[nid]) nDB.RUnlock() // No peer nodes on this network. Move on. @@ -481,7 +478,7 @@ func (nDB *NetworkDB) bulkSyncTables() { continue } - completed, err := nDB.bulkSync(nodes, false) + completed, err := nDB.bulkSync(nodes) if err != nil { logrus.Errorf("periodic bulk sync failure for network %s: %v", nid, err) continue @@ -508,13 +505,7 @@ func (nDB *NetworkDB) bulkSyncTables() { } } -func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) { - if !all { - // Get 2 random nodes. 2nd node will be tried if the bulk sync to - // 1st node fails. - nodes = nDB.mRandomNodes(2, nodes) - } - +func (nDB *NetworkDB) bulkSync(nodes []string) ([]string, error) { if len(nodes) == 0 { return nil, nil } @@ -527,12 +518,7 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) { } logrus.Debugf("%v(%v): Initiating bulk sync with node %v", nDB.config.Hostname, nDB.config.NodeID, node) networks = nDB.findCommonNetworks(node) - err = nDB.bulkSyncNode(networks, node, true) - // if its periodic bulksync stop after the first successful sync - if !all && err == nil { - break - } - if err != nil { + if err = nDB.bulkSyncNode(networks, node, true); err != nil { err = fmt.Errorf("bulk sync to node %s failed: %v", node, err) logrus.Warn(err.Error()) } @@ -649,48 +635,22 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b return nil } -// Returns a random offset between 0 and n -func randomOffset(n int) int { - if n == 0 { - return 0 - } - - val, err := rand.Int(rand.Reader, big.NewInt(int64(n))) - if err != nil { - logrus.Errorf("Failed to get a random offset: %v", err) - return 0 - } - - return int(val.Int64()) -} - // mRandomNodes is used to select up to m random nodes. It is possible // that less than m nodes are returned. -func (nDB *NetworkDB) mRandomNodes(m int, nodes []string) []string { - n := len(nodes) +func (nDB *NetworkDB) mRandomNodes(m int, nodes map[string]struct{}) []string { mNodes := make([]string, 0, m) -OUTER: - // Probe up to 3*n times, with large n this is not necessary - // since k << n, but with small n we want search to be - // exhaustive - for i := 0; i < 3*n && len(mNodes) < m; i++ { - // Get random node - idx := randomOffset(n) - node := nodes[idx] + var i int + for node := range nodes { if node == nDB.config.NodeID { continue } - // Check if we have this node already - for j := 0; j < len(mNodes); j++ { - if node == mNodes[j] { - continue OUTER - } - } - - // Append the node mNodes = append(mNodes, node) + i++ + if i == m { + break + } } return mNodes diff --git a/networkdb/delegate.go b/networkdb/delegate.go index 072c6221e5..6dad71a252 100644 --- a/networkdb/delegate.go +++ b/networkdb/delegate.go @@ -154,18 +154,14 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { // Ignore the table events for networks that are in the process of going away nDB.RLock() networks := nDB.networks[nDB.config.NodeID] - network, ok := networks[tEvent.NetworkID] + network, okNetwork := networks[tEvent.NetworkID] // Check if the owner of the event is still part of the network - nodes := nDB.networkNodes[tEvent.NetworkID] var nodePresent bool - for _, node := range nodes { - if node == tEvent.NodeName { - nodePresent = true - break - } + if nodes, ok := nDB.networkNodes[tEvent.NetworkID]; ok { + _, nodePresent = nodes[tEvent.NodeName] } nDB.RUnlock() - if !ok || network.leaving || !nodePresent { + if !okNetwork || network.leaving || !nodePresent { // I'm out of the network OR the event owner is not anymore part of the network so do not propagate return false } diff --git a/networkdb/networkdb.go b/networkdb/networkdb.go index 9ec6beca35..409d5f1879 100644 --- a/networkdb/networkdb.go +++ b/networkdb/networkdb.go @@ -64,8 +64,8 @@ type NetworkDB struct { networks map[string]map[string]*network // A map of nodes which are participating in a given - // network. The key is a network ID. - networkNodes map[string][]string + // network. The first key is a network ID. The second key is + networkNodes map[string]map[string]struct{} // A table of ack channels for every node from which we are // waiting for an ack. @@ -246,7 +246,7 @@ func New(c *Config) (*NetworkDB, error) { nodes: make(map[string]*node), failedNodes: make(map[string]*node), leftNodes: make(map[string]*node), - networkNodes: make(map[string][]string), + networkNodes: make(map[string]map[string]struct{}), bulkSyncAckTbl: make(map[string]chan struct{}), broadcaster: events.NewBroadcaster(), } @@ -304,7 +304,7 @@ func (nDB *NetworkDB) Peers(nid string) []PeerInfo { nDB.RLock() defer nDB.RUnlock() peers := make([]PeerInfo, 0, len(nDB.networkNodes[nid])) - for _, nodeName := range nDB.networkNodes[nid] { + for nodeName := range nDB.networkNodes[nid] { if node, ok := nDB.nodes[nodeName]; ok { peers = append(peers, PeerInfo{ Name: node.Name, @@ -452,17 +452,12 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error { } func (nDB *NetworkDB) deleteNodeFromNetworks(deletedNode string) { - for nid, nodes := range nDB.networkNodes { - updatedNodes := make([]string, 0, len(nodes)) - for _, node := range nodes { - if node == deletedNode { - continue - } - - updatedNodes = append(updatedNodes, node) - } - - nDB.networkNodes[nid] = updatedNodes + networks, ok := nDB.networks[deletedNode] + if !ok { + return + } + for nid := range networks { + nDB.deleteNetworkNode(nid, deletedNode) } delete(nDB.networks, deletedNode) @@ -615,7 +610,10 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { RetransmitMult: 4, } nDB.addNetworkNode(nid, nDB.config.NodeID) - networkNodes := nDB.networkNodes[nid] + networkNodes := make([]string, 0, len(nDB.networkNodes[nid])) + for k := range nDB.networkNodes[nid] { + networkNodes = append(networkNodes, k) + } nDB.Unlock() if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil { @@ -623,7 +621,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { } logrus.Debugf("%v(%v): joined network %s", nDB.config.Hostname, nDB.config.NodeID, nid) - if _, err := nDB.bulkSync(networkNodes, true); err != nil { + if _, err := nDB.bulkSync(networkNodes); err != nil { logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err) } @@ -672,32 +670,25 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error { // in the passed network only if it is not already present. Caller // should hold the NetworkDB lock while calling this func (nDB *NetworkDB) addNetworkNode(nid string, nodeName string) { - nodes := nDB.networkNodes[nid] - for _, node := range nodes { - if node == nodeName { - return - } + nodesMap, ok := nDB.networkNodes[nid] + if !ok { + nodesMap = make(map[string]struct{}) + nDB.networkNodes[nid] = nodesMap + } + if _, ok := nodesMap[nodeName]; !ok { + nodesMap[nodeName] = struct{}{} } - - nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nodeName) } // Deletes the node from the list of nodes which participate in the // passed network. Caller should hold the NetworkDB lock while calling // this func (nDB *NetworkDB) deleteNetworkNode(nid string, nodeName string) { - nodes, ok := nDB.networkNodes[nid] - if !ok || len(nodes) == 0 { + nodesNetwork, ok := nDB.networkNodes[nid] + if !ok { return } - newNodes := make([]string, 0, len(nodes)-1) - for _, name := range nodes { - if name == nodeName { - continue - } - newNodes = append(newNodes, name) - } - nDB.networkNodes[nid] = newNodes + delete(nodesNetwork, nodeName) } // findCommonnetworks find the networks that both this node and the @@ -706,7 +697,8 @@ func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string { nDB.RLock() defer nDB.RUnlock() - var networks []string + // create a slice with max capacity, to avoid multiple reallocation + networks := make([]string, 0, len(nDB.networks[nDB.config.NodeID])) for nid := range nDB.networks[nDB.config.NodeID] { if n, ok := nDB.networks[nodeName][nid]; ok { if !n.leaving {