-
Notifications
You must be signed in to change notification settings - Fork 0
/
a_gossipcache.go
116 lines (97 loc) · 2.85 KB
/
a_gossipcache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package gossipcache
import (
"errors"
"fmt"
"github.com/charmbracelet/log"
"github.com/hashicorp/memberlist"
"net/http"
"os"
)
var _ memberlist.EventDelegate = &GossipCache{}
type GossipCache struct {
GroupCachePool *HTTPPool
Memberlist *memberlist.Memberlist
host string
httpPort int
hostGossipAddress string
peers []string
}
func NewGossipHTTPPool(gossipPort int, httpPort int) (*GossipCache, error) {
var err error
ac := GossipCache{}
ac.httpPort = httpPort
ac.host = "127.0.0.1"
// 1. create memberlist
mlConfig := memberlist.DefaultLocalConfig()
mlConfig.Events = &ac
mlConfig.BindAddr = "127.0.0.1"
mlConfig.BindPort = gossipPort
/* TODO: this is a hack
Later use,
- AdvertiseAddr string
- AdvertisePort int
*/
mlConfig.Name = fmt.Sprintf("%d", httpPort)
mlConfig.LogOutput = NewMemberlistLogger()
if ac.Memberlist, err = memberlist.Create(mlConfig); err != nil {
return nil, fmt.Errorf("gossipcache: can't create memberlist: %w", err)
}
// create groupcache pool
httpUrl := ac.httpGroupCacheURL(fmt.Sprintf("%d", ac.httpPort))
ac.GroupCachePool = newHTTPPool(httpUrl)
return &ac, nil
}
func (ac *GossipCache) NotifyJoin(node *memberlist.Node) {
httpUrl := ac.httpGroupCacheURL(node.Name)
ac.removePeer(httpUrl)
ac.peers = append(ac.peers, httpUrl)
log.Printf("GossipCache/NotifyJoin: %+v", ac.peers)
if ac.GroupCachePool != nil {
ac.GroupCachePool.set(ac.peers...)
}
}
func (ac *GossipCache) NotifyLeave(node *memberlist.Node) {
uri := ac.httpGroupCacheURL(node.Name)
ac.removePeer(uri)
ac.GroupCachePool.set(ac.peers...)
}
func (ac *GossipCache) NotifyUpdate(node *memberlist.Node) {
}
func (ac *GossipCache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ac.GroupCachePool.ServeHTTP(w, r)
}
func (ac *GossipCache) JoinGossipCluster(existing []string) (int, error) {
if ac.Memberlist == nil {
return 0, errors.New("memberlist cannot be nil")
}
existing = append(existing, ac.hostGossipAddress)
return ac.Memberlist.Join(existing)
}
//--------------------------------utils-------------------------------------
func (ac *GossipCache) httpGroupCacheURL(port string) string {
return fmt.Sprintf("http://%s:%s", ac.host, port)
}
func (ac *GossipCache) removePeer(uri string) {
var newPeers []string
for _, peer := range ac.peers {
if peer != uri {
newPeers = append(newPeers, peer)
}
}
ac.peers = newPeers
}
// --------------------------------utils-------------------------------------
type MemberlistLogger struct {
Logger *log.Logger
}
func NewMemberlistLogger() MemberlistLogger {
return MemberlistLogger{
Logger: log.NewWithOptions(os.Stderr, log.Options{
Prefix: "memberlist",
}),
}
}
func (l MemberlistLogger) Write(p []byte) (n int, err error) {
l.Logger.Debug(string(p)) // change it to `Info` to see the memberlist logs
return len(p), nil
}