Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 29 additions & 12 deletions src/mapper/pkg/clouduploader/cloud_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package clouduploader
import (
"context"
"encoding/json"
"fmt"
"github.com/otterize/intents-operator/src/shared/errors"
"github.com/otterize/intents-operator/src/shared/serviceidresolver/serviceidentity"
"github.com/otterize/network-mapper/src/mapper/pkg/awsintentsholder"
Expand Down Expand Up @@ -126,20 +127,36 @@ func (c *CloudUploader) NotifyExternalTrafficIntents(ctx context.Context, intent
logrus.Debugf("Got external traffic notification, len %d", len(intents))

discoveredIntents := lo.Map(intents, func(intent externaltrafficholder.TimestampedExternalTrafficIntent, _ int) cloudclient.ExternalTrafficDiscoveredIntentInput {
output := cloudclient.ExternalTrafficDiscoveredIntentInput{
DiscoveredAt: intent.Timestamp,
Intent: cloudclient.ExternalTrafficIntentInput{
ClientName: intent.Intent.Client.Name,
Namespace: intent.Intent.Client.Namespace,
Target: cloudclient.DNSIPPairInput{
DnsName: lo.ToPtr(intent.Intent.DNSName),
switch typedIntent := intent.Intent.(type) {
case externaltrafficholder.DNSExternalTrafficIntent:
output := cloudclient.ExternalTrafficDiscoveredIntentInput{
DiscoveredAt: intent.Timestamp,
Intent: cloudclient.ExternalTrafficIntentInput{
ClientName: typedIntent.Client.Name,
Namespace: typedIntent.Client.Namespace,
Target: cloudclient.DNSIPPairInput{
DnsName: lo.ToPtr(typedIntent.DNSName),
},
},
},
}
for ip := range intent.Intent.IPs {
output.Intent.Target.Ips = append(output.Intent.Target.Ips, lo.ToPtr(string(ip)))
}
for ip := range typedIntent.IPs {
output.Intent.Target.Ips = append(output.Intent.Target.Ips, lo.ToPtr(string(ip)))
}
return output
case externaltrafficholder.IPExternalTrafficIntent:
output := cloudclient.ExternalTrafficDiscoveredIntentInput{
DiscoveredAt: intent.Timestamp,
Intent: cloudclient.ExternalTrafficIntentInput{
ClientName: typedIntent.Client.Name,
Namespace: typedIntent.Client.Namespace,
Target: cloudclient.DNSIPPairInput{
Ips: []*string{lo.ToPtr(string(typedIntent.IP))},
},
},
}
return output
}
return output
panic(fmt.Sprintf("Unexpected external traffic intent type: %T", intent))
})

exponentialBackoff := backoff.NewExponentialBackOff()
Expand Down
133 changes: 103 additions & 30 deletions src/mapper/pkg/externaltrafficholder/externaltrafficholder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,94 @@ package externaltrafficholder

import (
"context"
"fmt"
"github.com/otterize/network-mapper/src/mapper/pkg/config"
"github.com/otterize/network-mapper/src/mapper/pkg/graph/model"
"github.com/sirupsen/logrus"
"sync"
"time"
)

type ExternalTrafficIntent interface {
GetClient() model.OtterizeServiceIdentity
GetKey() ExternalTrafficKey
GetLastSeen() time.Time
}

type IP string

type ExternalTrafficIntent struct {
type DNSExternalTrafficIntent struct {
Client model.OtterizeServiceIdentity `json:"client"`
LastSeen time.Time
DNSName string
IPs map[IP]struct{}
TTL time.Time
}

type IPExternalTrafficIntent struct {
Client model.OtterizeServiceIdentity `json:"client"`
LastSeen time.Time
IP IP
}

func (i IPExternalTrafficIntent) GetClient() model.OtterizeServiceIdentity {
return i.Client
}

func (i IPExternalTrafficIntent) GetKey() ExternalTrafficKey {
return ExternalTrafficKey{
ClientName: i.Client.Name,
ClientNamespace: i.Client.Namespace,
DestIP: i.IP,
}
}

func (i IPExternalTrafficIntent) GetLastSeen() time.Time {
return i.LastSeen
}

type TimestampedExternalTrafficIntent struct {
Timestamp time.Time
Intent ExternalTrafficIntent
}

func (i DNSExternalTrafficIntent) GetClient() model.OtterizeServiceIdentity {
return i.Client
}

func (i DNSExternalTrafficIntent) GetKey() ExternalTrafficKey {
return ExternalTrafficKey{
ClientName: i.Client.Name,
ClientNamespace: i.Client.Namespace,
DestDNSName: i.DNSName,
}
}

func (i DNSExternalTrafficIntent) GetLastSeen() time.Time {
return i.LastSeen
}

type ExternalTrafficKey struct {
ClientName string
ClientNamespace string
DestDNSName string
// One of...
DestDNSName string
DestIP IP
}

type ExternalTrafficIntentsHolder struct {
intents map[ExternalTrafficKey]TimestampedExternalTrafficIntent
lock sync.Mutex
callbacks []ExternalTrafficCallbackFunc
intentsNoDelay map[ExternalTrafficKey]TimestampedExternalTrafficIntent
delayedIPIntents map[ExternalTrafficKey]TimestampedExternalTrafficIntent
lock sync.Mutex
callbacks []ExternalTrafficCallbackFunc
}

type ExternalTrafficCallbackFunc func(context.Context, []TimestampedExternalTrafficIntent)

func NewExternalTrafficIntentsHolder() *ExternalTrafficIntentsHolder {
return &ExternalTrafficIntentsHolder{
intents: make(map[ExternalTrafficKey]TimestampedExternalTrafficIntent),
intentsNoDelay: make(map[ExternalTrafficKey]TimestampedExternalTrafficIntent),
delayedIPIntents: make(map[ExternalTrafficKey]TimestampedExternalTrafficIntent),
}
}

Expand Down Expand Up @@ -71,51 +121,74 @@ func (h *ExternalTrafficIntentsHolder) PeriodicIntentsUpload(ctx context.Context
}
}

// GetNewIntentsSinceLastGet returns the intents that were added since the last call to this function. It also rotates the intentsNoDelay, so that the next call will return the intentsNoDelay that were added in the next iteration.
func (h *ExternalTrafficIntentsHolder) GetNewIntentsSinceLastGet() []TimestampedExternalTrafficIntent {
h.lock.Lock()
defer h.lock.Unlock()

intents := make([]TimestampedExternalTrafficIntent, 0, len(h.intents))
intents := make([]TimestampedExternalTrafficIntent, 0, len(h.intentsNoDelay))

for _, intent := range h.intents {
for _, intent := range h.intentsNoDelay {
intents = append(intents, intent)
}

h.intents = make(map[ExternalTrafficKey]TimestampedExternalTrafficIntent)
// Rotate delayedIPIntents into intentsNoDelay
h.intentsNoDelay = h.delayedIPIntents
h.delayedIPIntents = make(map[ExternalTrafficKey]TimestampedExternalTrafficIntent)

return intents
}

// AddIntent adds a new external traffic intent to the holder. DNS intentsNoDelay are added to the current iteration, while IP intentsNoDelay are added to the next iteration. This is so that DNS traffic is reported first,
// to allow Otterize Cloud to cache the DNS name and IPs before the IP intent is sent.
func (h *ExternalTrafficIntentsHolder) AddIntent(intent ExternalTrafficIntent) {
if config.ExcludedNamespaces().Contains(intent.Client.Namespace) {
if config.ExcludedNamespaces().Contains(intent.GetClient().Namespace) {
return
}

h.lock.Lock()
defer h.lock.Unlock()

key := ExternalTrafficKey{
ClientName: intent.Client.Name,
ClientNamespace: intent.Client.Namespace,
DestDNSName: intent.DNSName,
}
_, ok := h.intents[key]
if !ok {
h.intents[key] = TimestampedExternalTrafficIntent{
Timestamp: intent.LastSeen,
Intent: intent,
key := intent.GetKey()

switch typedIntent := intent.(type) {
case DNSExternalTrafficIntent:
_, ok := h.intentsNoDelay[key]
if !ok {
h.intentsNoDelay[key] = TimestampedExternalTrafficIntent{
Timestamp: intent.GetLastSeen(),
Intent: intent,
}
return
}

mergedIntent := h.intentsNoDelay[key]
if intent.GetLastSeen().After(mergedIntent.Timestamp) {
mergedIntent.Timestamp = intent.GetLastSeen()
}
return
}

mergedIntent := h.intents[key]
for ip := range typedIntent.IPs {
mergedIntent.Intent.(DNSExternalTrafficIntent).IPs[ip] = struct{}{}
}
h.intentsNoDelay[key] = mergedIntent

case IPExternalTrafficIntent:
_, ok := h.delayedIPIntents[key]
if !ok {
h.delayedIPIntents[key] = TimestampedExternalTrafficIntent{
Timestamp: intent.GetLastSeen(),
Intent: intent,
}
return
}

for ip := range intent.IPs {
mergedIntent.Intent.IPs[ip] = struct{}{}
}
if intent.LastSeen.After(mergedIntent.Timestamp) {
mergedIntent.Timestamp = intent.LastSeen
}
mergedIntent := h.delayedIPIntents[key]
if intent.GetLastSeen().After(mergedIntent.Timestamp) {
mergedIntent.Timestamp = intent.GetLastSeen()
}
h.delayedIPIntents[key] = mergedIntent

h.intents[key] = mergedIntent
default:
panic(fmt.Sprintf("Unexpected external traffic intent type: %T", intent))
}
}
Loading