diff --git a/disc/disc.go b/disc/disc.go deleted file mode 100644 index 4c31cea1..00000000 --- a/disc/disc.go +++ /dev/null @@ -1,230 +0,0 @@ -//: ---------------------------------------------------------------------------- -//: Copyright (C) 2017 Verizon. All Rights Reserved. -//: All Rights Reserved -//: -//: file: disc.go -//: details: discovery vFlow nodes by multicasting -//: author: Mehrdad Arshad Rad -//: date: 04/17/2017 -//: -//: Licensed under the Apache License, Version 2.0 (the "License"); -//: you may not use this file except in compliance with the License. -//: You may obtain a copy of the License at -//: -//: http://www.apache.org/licenses/LICENSE-2.0 -//: -//: Unless required by applicable law or agreed to in writing, software -//: distributed under the License is distributed on an "AS IS" BASIS, -//: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -//: See the License for the specific language governing permissions and -//: limitations under the License. -//: ---------------------------------------------------------------------------- - -// Package discovery handles finding vFlow nodes through multicasting -package discovery - -import ( - "errors" - "log" - "net" - "strconv" - "sync" - "time" - - "golang.org/x/net/ipv4" - "golang.org/x/net/ipv6" -) - -type vFlowServer struct { - timestamp int64 -} - -// Discovery represents vflow discovery -type Discovery struct { - vFlowServers map[string]vFlowServer - mu sync.RWMutex -} - -var errMCInterfaceNotAvail = errors.New("multicast interface not available") - -// Run starts sending multicast hello packet -func Run(ip, port string) error { - tick := time.NewTicker(1 * time.Second) - - p, err := strconv.Atoi(port) - if err != nil { - return err - } - - c, err := net.DialUDP("udp", nil, &net.UDPAddr{ - IP: net.ParseIP(ip), - Port: p, - }) - - b := []byte("hello vflow") - - if err != nil { - return err - } - - for { - <-tick.C - c.Write(b) - } -} - -// Listen receives discovery hello packet -func Listen(ip, port string) (*Discovery, error) { - var ( - conn interface{} - buff = make([]byte, 1500) - disc = &Discovery{ - vFlowServers: make(map[string]vFlowServer, 10), - } - ) - - c, err := net.ListenPacket("udp", net.JoinHostPort( - ip, - port, - )) - - if err != nil { - return nil, err - } - - ifs, err := getMulticastIfs() - if err != nil { - return nil, err - } - - if net.ParseIP(ip).To4() != nil { - conn = ipv4.NewPacketConn(c) - for _, i := range ifs { - err = conn.(*ipv4.PacketConn).JoinGroup( - &i, - &net.UDPAddr{IP: net.ParseIP(ip)}, - ) - if err != nil { - return nil, err - } - } - } else { - conn = ipv6.NewPacketConn(c) - for _, i := range ifs { - err = conn.(*ipv4.PacketConn).JoinGroup( - &i, - &net.UDPAddr{IP: net.ParseIP(ip)}, - ) - if err != nil { - return nil, err - } - } - } - - laddrs, err := getLocalIPs() - if err != nil { - log.Fatal(err) - } - - go func() { - - var ( - addr net.Addr - err error - ) - - for { - - if net.ParseIP(ip).To4() != nil { - _, _, addr, err = conn.(*ipv4.PacketConn).ReadFrom(buff) - } else { - _, _, addr, err = conn.(*ipv6.PacketConn).ReadFrom(buff) - } - - if err != nil { - continue - } - - host, _, err := net.SplitHostPort(addr.String()) - if err != nil { - continue - } - - if _, ok := laddrs[host]; ok { - continue - } - - disc.mu.Lock() - disc.vFlowServers[host] = vFlowServer{time.Now().Unix()} - disc.mu.Unlock() - - } - - }() - - return disc, nil -} - -// Nodes returns a slice of available vFlow nodes -func (d *Discovery) Nodes() []string { - var servers []string - - now := time.Now().Unix() - - d.mu.Lock() - - for ip, server := range d.vFlowServers { - if now-server.timestamp < 300 { - servers = append(servers, ip) - } else { - delete(d.vFlowServers, ip) - } - } - - d.mu.Unlock() - - return servers -} - -func getMulticastIfs() ([]net.Interface, error) { - var out []net.Interface - - ifs, err := net.Interfaces() - if err != nil { - return nil, err - } - - for _, i := range ifs { - if i.Flags == 19 { - out = append(out, i) - } - } - - if len(out) < 1 { - return nil, errMCInterfaceNotAvail - } - - return out, nil -} - -func getLocalIPs() (map[string]struct{}, error) { - ips := make(map[string]struct{}) - - ifs, err := net.Interfaces() - if err != nil { - return nil, err - } - - for _, i := range ifs { - addrs, err := i.Addrs() - if err != nil || i.Flags != 19 { - continue - } - for _, addr := range addrs { - ip, _, _ := net.ParseCIDR(addr.String()) - ips[ip.String()] = struct{}{} - } - } - - return ips, nil -} diff --git a/disc/discovery.go b/disc/discovery.go new file mode 100644 index 00000000..d0bd7740 --- /dev/null +++ b/disc/discovery.go @@ -0,0 +1,132 @@ +package disc + +import ( + "errors" + "io/ioutil" + "log" + "net" + "time" + + "gopkg.in/yaml.v2" +) + +var ( + errMCInterfaceNotAvail = errors.New("multicast interface not available") +) + +type vFlowServer struct { + timestamp int64 +} + +type DiscoveryConfig struct { + DiscoveryStrategy string + Params map[string]string `yaml:"params,omitempty"` + Logger *log.Logger +} + +func (c *DiscoveryConfig) LoadConfig(fileName string) { + b, err := ioutil.ReadFile(fileName) + if err != nil { + return + } + yaml.Unmarshal(b, &c.Params) +} + +func (c *DiscoveryConfig) GetConfigItem(key string, defaultValue string) string { + var v, found = c.Params[key] + if !found { + return defaultValue + } + return v +} + +// Discovery represents Discovery interface +type Discovery interface { + Setup(config *DiscoveryConfig) error + GetvFlowServers() map[string]vFlowServer + GetRPCServers() []string +} + +func NewDiscovery(config *DiscoveryConfig) Discovery { + d := &_Discovery{} + d.vflowServers = make(map[string]vFlowServer, 10) + return d +} + +type _Discovery struct { + vflowServers map[string]vFlowServer +} + +func (d *_Discovery) GetvFlowServers() map[string]vFlowServer { + return d.vflowServers +} + +func (d *_Discovery) GetRPCServers() []string { + return BuildRpcServersList(d.GetvFlowServers()) +} + +func (d *_Discovery) Setup(config *DiscoveryConfig) error { + d.vflowServers = make(map[string]vFlowServer, 10) + return nil +} + +// Utility method to manage vflow servers list +func BuildRpcServersList(vFlowServers map[string]vFlowServer) []string { + var servers []string + + now := time.Now().Unix() + + // Add locks + + for ip, server := range vFlowServers { + if now-server.timestamp < 300 { + servers = append(servers, ip) + } else { + delete(vFlowServers, ip) + } + } + + return servers +} + +func getLocalIPs() (map[string]struct{}, error) { + ips := make(map[string]struct{}) + + ifs, err := net.Interfaces() + if err != nil { + return nil, err + } + + for _, i := range ifs { + addrs, err := i.Addrs() + if err != nil || i.Flags != 19 { + continue + } + for _, addr := range addrs { + ip, _, _ := net.ParseCIDR(addr.String()) + ips[ip.String()] = struct{}{} + } + } + + return ips, nil +} + +// Simple factory to initialize discovery based on configuration +func BuildDiscovery(config *DiscoveryConfig) (Discovery, error) { + + var discRegistered = map[string]Discovery{ + "vFlowDiscovery": new(MulticastDiscovery), + "k8sDiscovery.rest": new(K8SDiscovery), + //"k8sDiscovery.rest": DNSDiscovery + } + + disc, ok := discRegistered[config.DiscoveryStrategy] + if !ok { + return nil, errors.New("Discovery strategy not found") + } + setup_err := disc.Setup(config) + if setup_err != nil { + return nil, setup_err + } + return disc, nil +} diff --git a/disc/k8sdiscovery.go b/disc/k8sdiscovery.go new file mode 100644 index 00000000..1519e7f8 --- /dev/null +++ b/disc/k8sdiscovery.go @@ -0,0 +1,114 @@ +package disc + +import ( + "fmt" + "io/ioutil" + "log" + "net/url" + "strconv" + "time" + + "github.com/go-resty/resty/v2" + "github.com/tidwall/gjson" +) + +var ( + logger *log.Logger +) + +// K8SDiscovery represents k8s configuration for discovery of vflow nodes +type K8SDiscovery struct { + discovery Discovery + k8sApiServer string + k8sCertificatePath string + k8sNamespace string + k8sServiceName string + k8sToken string + pollInterval int + restClient *resty.Client + GetPodsEndpoint string +} + +func (d *K8SDiscovery) GetvFlowServers() map[string]vFlowServer { + return d.discovery.GetvFlowServers() +} + +func (d *K8SDiscovery) GetRPCServers() []string { + return BuildRpcServersList(d.GetvFlowServers()) +} + +// NewK8SDiscovery constructs and initializes K8S discovery. +func (k *K8SDiscovery) Setup(config *DiscoveryConfig) error { + + logger = config.Logger + + k.discovery = NewDiscovery(config) + k.k8sApiServer = "kubernetes.default.svc" + k.k8sServiceName = config.Params["k8s-service-name"] + k.k8sCertificatePath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" + + b, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token") + if err != nil { + logger.Printf("k8s discovery failed: %s\n", err) + return err + } + k.k8sToken = string(b) + + b, err = ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") + if err != nil { + logger.Printf("k8s discovery failed: %s\n", err) + return err + } + k.k8sNamespace = string(b) + + // Start with default pollinterval of 10s + k.pollInterval = 10 + if val, ok := config.Params["k8s-discovery-poll-interval"]; ok { + k.pollInterval, _ = strconv.Atoi(val) + } + + go k.runDiscovery() + + return nil +} + +func (d *K8SDiscovery) runDiscovery() { + logger.Printf("Starting k8s pod discovery using REST API (%d)\n", d.pollInterval) + tick := time.NewTicker(time.Duration(d.pollInterval) * time.Second) + + q := url.QueryEscape(fmt.Sprintf("app.kubernetes.io/instance=%s", d.k8sServiceName)) + + d.GetPodsEndpoint = fmt.Sprintf("https://%s/api/v1/namespaces/%s/pods?labelSelector=%s", + d.k8sApiServer, d.k8sNamespace, q) + + d.restClient = resty.New() + d.restClient.SetRootCertificate(d.k8sCertificatePath) + for { + <-tick.C + d.pollForServers() + } +} + +func (d *K8SDiscovery) pollForServers() { + resp, err := d.restClient.R(). + EnableTrace(). + SetAuthToken(d.k8sToken). + SetHeader("Accept", "application/json"). + Get(d.GetPodsEndpoint) + + laddrs, err := getLocalIPs() + + if err != nil { + logger.Printf("Error %s\n", err) + } else { + srch_result := gjson.Get(resp.String(), "items.#.status.podIP") + pod_ips := srch_result.Array() + for _, s := range pod_ips { + if _, ok := laddrs[s.Str]; ok { + continue + } + d.GetvFlowServers()[s.Str] = vFlowServer{time.Now().Unix()} + } + } + //logger.Printf("Discovered servers %+v", d.GetvFlowServers()) +} diff --git a/disc/multicastdiscovery.go b/disc/multicastdiscovery.go new file mode 100644 index 00000000..a3ab9a53 --- /dev/null +++ b/disc/multicastdiscovery.go @@ -0,0 +1,222 @@ +//: ---------------------------------------------------------------------------- +//: Copyright (C) 2017 Verizon. All Rights Reserved. +//: All Rights Reserved +//: +//: file: memcache_rpc.go +//: details: TODO +//: author: Mehrdad Arshad Rad +//: date: 02/01/2017 +//: +//: Licensed under the Apache License, Version 2.0 (the "License"); +//: you may not use this file except in compliance with the License. +//: You may obtain a copy of the License at +//: +//: http://www.apache.org/licenses/LICENSE-2.0 +//: +//: Unless required by applicable law or agreed to in writing, software +//: distributed under the License is distributed on an "AS IS" BASIS, +//: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//: See the License for the specific language governing permissions and +//: limitations under the License. +//: ---------------------------------------------------------------------------- + +package disc + +import ( + "log" + "net" + "strconv" + "sync" + "time" + + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" +) + +// Discovery represents vflow discovery +type MulticastDiscovery struct { + discovery Discovery + conn interface{} + group net.IP + port int + rcvdMsg chan net.IP + mu sync.RWMutex +} + +func (d *MulticastDiscovery) GetvFlowServers() map[string]vFlowServer { + return d.discovery.GetvFlowServers() +} + +func (d *MulticastDiscovery) GetRPCServers() []string { + return BuildRpcServersList(d.GetvFlowServers()) +} + +func (disc *MulticastDiscovery) Setup(config *DiscoveryConfig) error { + + logger = config.Logger + disc.discovery = NewDiscovery(config) + disc.group = net.ParseIP("224.0.0.55") + disc.port = 1024 + + if err := disc.mConn(); err != nil { + return err + } + + if disc.group.To4() != nil { + go disc.startV4() + } else { + go disc.startV6() + } + + logger.Println("Multicast discovery started") + + return nil +} + +func (d *MulticastDiscovery) mConn() error { + addr := net.JoinHostPort("", strconv.Itoa(d.port)) + c, err := net.ListenPacket("udp", addr) + if err != nil { + return err + } + + ifs, err := getMulticastIfs() + if err != nil { + return err + } + + if d.group.To4() != nil { + d.conn = ipv4.NewPacketConn(c) + for _, i := range ifs { + err = d.conn.(*ipv4.PacketConn).JoinGroup( + &i, + &net.UDPAddr{IP: d.group}, + ) + if err != nil { + return err + } + } + } else { + d.conn = ipv6.NewPacketConn(c) + for _, i := range ifs { + err = d.conn.(*ipv6.PacketConn).JoinGroup( + &i, + &net.UDPAddr{IP: d.group}, + ) + if err != nil { + return err + } + } + } + + return nil +} + +func (d *MulticastDiscovery) receiverV4() { + var b = make([]byte, 1500) + + conn := d.conn.(*ipv4.PacketConn) + laddrs, err := getLocalIPs() + if err != nil { + log.Fatal(err) + } + + for { + _, _, addr, err := conn.ReadFrom(b) + if err != nil { + continue + } + + host, _, err := net.SplitHostPort(addr.String()) + if err != nil { + continue + } + + if _, ok := laddrs[host]; ok { + continue + } + + d.mu.Lock() + d.GetvFlowServers()[host] = vFlowServer{time.Now().Unix()} + d.mu.Unlock() + } +} + +func (d *MulticastDiscovery) startV4() { + tick := time.NewTicker(1 * time.Second) + + b := []byte("Hello vFlow") + conn := d.conn.(*ipv4.PacketConn) + conn.SetTTL(2) + go d.receiverV4() + + for { + <-tick.C + conn.WriteTo(b, nil, &net.UDPAddr{IP: d.group, Port: d.port}) + } +} + +func (d *MulticastDiscovery) receiverV6() { + var b = make([]byte, 1500) + + conn := d.conn.(*ipv6.PacketConn) + laddrs, err := getLocalIPs() + if err != nil { + log.Fatal(err) + } + + for { + _, _, addr, err := conn.ReadFrom(b) + if err != nil { + continue + } + + host, _, err := net.SplitHostPort(addr.String()) + if err != nil { + continue + } + + if _, ok := laddrs[host]; ok { + continue + } + + d.mu.Lock() + d.GetvFlowServers()[host] = vFlowServer{time.Now().Unix()} + d.mu.Unlock() + } +} + +func (d *MulticastDiscovery) startV6() { + tick := time.NewTicker(1 * time.Second) + + b := []byte("Hello vFlow") + conn := d.conn.(*ipv6.PacketConn) + conn.SetHopLimit(2) + go d.receiverV6() + + for { + <-tick.C + conn.WriteTo(b, nil, &net.UDPAddr{IP: d.group, Port: d.port}) + } +} + +func getMulticastIfs() ([]net.Interface, error) { + var out []net.Interface + + ifs, err := net.Interfaces() + if err != nil { + return nil, err + } + + for _, i := range ifs { + if i.Flags == 19 { + out = append(out, i) + } + } + + if len(out) < 1 { + return nil, errMCInterfaceNotAvail + } + + return out, nil +} diff --git a/go.mod b/go.mod index ebe32916..b758bcb8 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/ClickHouse/clickhouse-go v1.4.3 github.com/Shopify/sarama v1.26.3 github.com/bsm/sarama-cluster v2.1.15+incompatible + github.com/go-resty/resty/v2 v2.4.0 github.com/nats-io/nats-server/v2 v2.1.8 // indirect github.com/nats-io/nats.go v1.10.0 github.com/nsqio/go-nsq v1.0.8 @@ -13,6 +14,7 @@ require ( github.com/onsi/gomega v1.10.3 // indirect github.com/prometheus/client_golang v1.6.0 github.com/segmentio/kafka-go v0.4.7 - golang.org/x/net v0.0.0-20201021035429-f5854403a974 + github.com/tidwall/gjson v1.6.8 + golang.org/x/net v0.0.0-20201224014010-6772e930b67b gopkg.in/yaml.v2 v2.3.0 ) diff --git a/go.sum b/go.sum index ad3e1d94..31b78b8f 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,8 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2 github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-resty/resty/v2 v2.4.0 h1:s6TItTLejEI+2mn98oijC5w/Rk2YU+OA6x0mnZN6r6k= +github.com/go-resty/resty/v2 v2.4.0/go.mod h1:B88+xCTEwvfD94NOuE6GS1wMlnoKNY8eEiNizfNwOwA= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -146,6 +148,12 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/tidwall/gjson v1.6.8 h1:CTmXMClGYPAmln7652e69B7OLXfTi5ABcPPwjIWUv7w= +github.com/tidwall/gjson v1.6.8/go.mod h1:zeFuBCIqD4sN/gmqBzZ4j7Jd6UcA2Fc56x7QFsv+8fI= +github.com/tidwall/match v1.0.3 h1:FQUVvBImDutD8wJLN6c5eMzWtjgONK9MwIBCOrUJKeE= +github.com/tidwall/match v1.0.3/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.0.2 h1:Z7S3cePv9Jwm1KwS0513MRaoUe3S01WPbLNV40pwWZU= +github.com/tidwall/pretty v1.0.2/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= @@ -168,6 +176,8 @@ golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201224014010-6772e930b67b h1:iFwSg7t5GZmB/Q5TjiEAsdoLDrdJRC1RiF2WhuV29Qw= +golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -190,6 +200,9 @@ golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= diff --git a/ipfix/memcache_rpc.go b/ipfix/memcache_rpc.go index 26de0e4b..567ec9fe 100644 --- a/ipfix/memcache_rpc.go +++ b/ipfix/memcache_rpc.go @@ -27,12 +27,9 @@ import ( "log" "net" "net/rpc" - "strconv" - "sync" "time" - "golang.org/x/net/ipv4" - "golang.org/x/net/ipv6" + "github.com/VerizonDigital/vflow/disc" ) // IRPC represents IPFIX RPC @@ -48,10 +45,12 @@ type RPCClient struct { // RPCConfig represents RPC config type RPCConfig struct { - Enabled bool - Port int - Addr net.IP - Logger *log.Logger + Enabled bool + Port int + Addr net.IP + DiscoveryStrategy string + DiscoveryStrategyConfigFile string + Logger *log.Logger } // RPCRequest represents RPC request @@ -60,23 +59,8 @@ type RPCRequest struct { IP net.IP } -type vFlowServer struct { - timestamp int64 -} - -// Discovery represents vflow discovery -type Discovery struct { - conn interface{} - group net.IP - port int - rcvdMsg chan net.IP - vFlowServers map[string]vFlowServer - mu sync.RWMutex -} - var ( - errNotAvail = errors.New("the template is not available") - errMCInterfaceNotAvail = errors.New("multicast interface not available") + errNotAvail = errors.New("the template is not available") ) // NewRPC constructs RPC @@ -139,7 +123,13 @@ func RPC(m MemCache, config *RPCConfig) { return } - disc, err := vFlowDiscovery() + var disc_config disc.DiscoveryConfig + disc_config.DiscoveryStrategy = config.DiscoveryStrategy + disc_config.LoadConfig(config.DiscoveryStrategyConfigFile) + disc_config.Logger = config.Logger + + disc, err := disc.BuildDiscovery(&disc_config) + if err != nil { config.Logger.Println(err) config.Logger.Println("RPC has been disabled") @@ -154,7 +144,7 @@ func RPC(m MemCache, config *RPCConfig) { for { req := <-rpcChan - for _, rpcServer := range disc.rpcServers() { + for _, rpcServer := range disc.GetRPCServers() { r, err := NewRPCClient(rpcServer) if err != nil { config.Logger.Println(err) @@ -175,214 +165,3 @@ func RPC(m MemCache, config *RPCConfig) { <-throttle } } -func vFlowDiscovery() (*Discovery, error) { - // TODO - disc := &Discovery{ - group: net.ParseIP("224.0.0.55"), - port: 1024, - } - - if err := disc.mConn(); err != nil { - return nil, err - } - - if disc.group.To4() != nil { - go disc.startV4() - } else { - go disc.startV6() - } - - return disc, nil -} - -func (d *Discovery) mConn() error { - addr := net.JoinHostPort("", strconv.Itoa(d.port)) - c, err := net.ListenPacket("udp", addr) - if err != nil { - return err - } - - ifs, err := getMulticastIfs() - if err != nil { - return err - } - - if d.group.To4() != nil { - d.conn = ipv4.NewPacketConn(c) - for _, i := range ifs { - err = d.conn.(*ipv4.PacketConn).JoinGroup( - &i, - &net.UDPAddr{IP: d.group}, - ) - if err != nil { - return err - } - } - } else { - d.conn = ipv6.NewPacketConn(c) - for _, i := range ifs { - err = d.conn.(*ipv6.PacketConn).JoinGroup( - &i, - &net.UDPAddr{IP: d.group}, - ) - if err != nil { - return err - } - } - } - - return nil -} - -func (d *Discovery) receiverV4() { - var b = make([]byte, 1500) - - d.vFlowServers = make(map[string]vFlowServer, 10) - conn := d.conn.(*ipv4.PacketConn) - laddrs, err := getLocalIPs() - if err != nil { - log.Fatal(err) - } - - for { - _, _, addr, err := conn.ReadFrom(b) - if err != nil { - continue - } - - host, _, err := net.SplitHostPort(addr.String()) - if err != nil { - continue - } - - if _, ok := laddrs[host]; ok { - continue - } - - d.mu.Lock() - d.vFlowServers[host] = vFlowServer{time.Now().Unix()} - d.mu.Unlock() - } -} - -func (d *Discovery) startV4() { - tick := time.NewTicker(1 * time.Second) - - b := []byte("Hello vFlow") - conn := d.conn.(*ipv4.PacketConn) - conn.SetTTL(2) - go d.receiverV4() - - for { - <-tick.C - conn.WriteTo(b, nil, &net.UDPAddr{IP: d.group, Port: d.port}) - } -} - -func (d *Discovery) receiverV6() { - var b = make([]byte, 1500) - - d.vFlowServers = make(map[string]vFlowServer, 10) - conn := d.conn.(*ipv6.PacketConn) - laddrs, err := getLocalIPs() - if err != nil { - log.Fatal(err) - } - - for { - _, _, addr, err := conn.ReadFrom(b) - if err != nil { - continue - } - - host, _, err := net.SplitHostPort(addr.String()) - if err != nil { - continue - } - - if _, ok := laddrs[host]; ok { - continue - } - - d.mu.Lock() - d.vFlowServers[host] = vFlowServer{time.Now().Unix()} - d.mu.Unlock() - } -} - -func (d *Discovery) startV6() { - tick := time.NewTicker(1 * time.Second) - - b := []byte("Hello vFlow") - conn := d.conn.(*ipv6.PacketConn) - conn.SetHopLimit(2) - go d.receiverV6() - - for { - <-tick.C - conn.WriteTo(b, nil, &net.UDPAddr{IP: d.group, Port: d.port}) - } -} - -func (d *Discovery) rpcServers() []string { - var servers []string - - now := time.Now().Unix() - - d.mu.Lock() - - for ip, server := range d.vFlowServers { - if now-server.timestamp < 300 { - servers = append(servers, ip) - } else { - delete(d.vFlowServers, ip) - } - } - - d.mu.Unlock() - - return servers -} - -func getMulticastIfs() ([]net.Interface, error) { - var out []net.Interface - - ifs, err := net.Interfaces() - if err != nil { - return nil, err - } - - for _, i := range ifs { - if i.Flags == 19 { - out = append(out, i) - } - } - - if len(out) < 1 { - return nil, errMCInterfaceNotAvail - } - - return out, nil -} - -func getLocalIPs() (map[string]struct{}, error) { - ips := make(map[string]struct{}) - - ifs, err := net.Interfaces() - if err != nil { - return nil, err - } - - for _, i := range ifs { - addrs, err := i.Addrs() - if err != nil || i.Flags != 19 { - continue - } - for _, addr := range addrs { - ip, _, _ := net.ParseCIDR(addr.String()) - ips[ip.String()] = struct{}{} - } - } - - return ips, nil -} diff --git a/vflow/ipfix.go b/vflow/ipfix.go index 298ffb11..879321d9 100644 --- a/vflow/ipfix.go +++ b/vflow/ipfix.go @@ -115,6 +115,8 @@ func (i *IPFIX) run() { logger.Printf("ipfix is running (UDP: listening on [::]:%d workers#: %d)", i.port, i.workers) + var config_file_path = path.Join(opts.VFlowConfigPath, opts.DiscoveryStrategyConfigFile) + err = ipfix.LoadExtElements(opts.VFlowConfigPath) if err != nil { logger.Println("load.ext.elements:", err) @@ -122,8 +124,10 @@ func (i *IPFIX) run() { mCache = ipfix.GetCache(opts.IPFIXTplCacheFile) go ipfix.RPC(mCache, &ipfix.RPCConfig{ - Enabled: opts.IPFIXRPCEnabled, - Logger: logger, + Enabled: opts.IPFIXRPCEnabled, + DiscoveryStrategy: opts.DiscoveryStrategy, + DiscoveryStrategyConfigFile: config_file_path, + Logger: logger, }) go mirrorIPFIXDispatcher(ipfixMCh) diff --git a/vflow/options.go b/vflow/options.go index a04a1cf4..4f5c9bfb 100644 --- a/vflow/options.go +++ b/vflow/options.go @@ -105,6 +105,10 @@ type Options struct { MQName string `yaml:"mq-name"` MQConfigFile string `yaml:"mq-config-file"` + // Discovery options + DiscoveryStrategy string `yaml:"discovery-strategy"` + DiscoveryStrategyConfigFile string `yaml:"discovery-strategy-config-file"` + VFlowConfigPath string } @@ -183,6 +187,9 @@ func NewOptions() *Options { MQName: "kafka", MQConfigFile: "mq.conf", + DiscoveryStrategy: "vFlowDiscovery", + DiscoveryStrategyConfigFile: "discovery.conf", + VFlowConfigPath: "/etc/vflow", } } @@ -362,6 +369,10 @@ func (opts *Options) flagSet() { flag.StringVar(&opts.MQName, "mqueue", opts.MQName, "producer message queue name") flag.StringVar(&opts.MQConfigFile, "mqueue-conf", opts.MQConfigFile, "producer message queue configuration file") + // discovery strategy options + flag.StringVar(&opts.DiscoveryStrategy, "discovery-strategy", opts.DiscoveryStrategy, "discovery strategy name") + flag.StringVar(&opts.DiscoveryStrategyConfigFile, "discovery-strategy-conf", opts.DiscoveryStrategyConfigFile, "discovery strategy configuration file") + flag.Usage = func() { flag.PrintDefaults() fmt.Fprintf(os.Stderr, `