Skip to content

Commit d9d051c

Browse files
authored
Add initiator field and parse url (#3558)
- Add initiator field to flow proto - Parse URL - Update a few trace logs
1 parent cb318b7 commit d9d051c

File tree

4 files changed

+89
-69
lines changed

4 files changed

+89
-69
lines changed

client/internal/netflow/manager.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func (m *Manager) Update(update *nftypes.FlowConfig) error {
117117
return nil
118118
}
119119

120-
log.Tracef("updating flow configuration with new settings: %+v", update)
120+
log.Tracef("updating flow configuration with new settings: url -> %s, interval -> %s, enabled? %t", update.URL, update.Interval, update.Enabled)
121121

122122
m.mux.Lock()
123123
defer m.mux.Unlock()

flow/client/client.go

+12-4
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"crypto/x509"
77
"errors"
88
"fmt"
9-
"strings"
9+
"net/url"
1010
"sync"
1111
"time"
1212

@@ -31,9 +31,12 @@ type GRPCClient struct {
3131
}
3232

3333
func NewClient(addr, payload, signature string, interval time.Duration) (*GRPCClient, error) {
34+
parsedURL, err := url.Parse(addr)
35+
if err != nil {
36+
return nil, fmt.Errorf("parsing url: %w", err)
37+
}
3438
var opts []grpc.DialOption
35-
36-
if strings.Contains(addr, "443") {
39+
if parsedURL.Scheme == "https" {
3740
certPool, err := x509.SystemCertPool()
3841
if err != nil || certPool == nil {
3942
log.Debugf("System cert pool not available; falling back to embedded cert, error: %v", err)
@@ -58,7 +61,7 @@ func NewClient(addr, payload, signature string, interval time.Duration) (*GRPCCl
5861
grpc.WithDefaultServiceConfig(`{"healthCheckConfig": {"serviceName": ""}}`),
5962
)
6063

61-
conn, err := grpc.NewClient(addr, opts...)
64+
conn, err := grpc.NewClient(fmt.Sprintf("%s:%s", parsedURL.Hostname(), parsedURL.Port()), opts...)
6265
if err != nil {
6366
return nil, fmt.Errorf("creating new grpc client: %w", err)
6467
}
@@ -100,6 +103,11 @@ func (c *GRPCClient) establishStreamAndReceive(ctx context.Context, msgHandler f
100103
return fmt.Errorf("create event stream: %w", err)
101104
}
102105

106+
err = stream.Send(&proto.FlowEvent{IsInitiator: true})
107+
if err != nil {
108+
log.Infof("failed to send initiator message to flow receiver but will attempt to continue. Error: %s", err)
109+
}
110+
103111
if err = checkHeader(stream); err != nil {
104112
return fmt.Errorf("check header: %w", err)
105113
}

flow/proto/flow.pb.go

+74-64
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

flow/proto/flow.proto

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ message FlowEvent {
2222
bytes public_key = 3;
2323

2424
FlowFields flow_fields = 4;
25+
26+
bool isInitiator = 5;
2527
}
2628

2729
message FlowEventAck {

0 commit comments

Comments
 (0)