From e2907a26f1eb2c74568ffb124602c7c299ee19e3 Mon Sep 17 00:00:00 2001 From: kris Date: Tue, 20 Aug 2024 14:08:01 -0600 Subject: [PATCH 1/2] This PR makes a few changes to the network functionality: 1. Change the main Listen routine to use a single go routine with a buffered channel so that a blocked api.Writer won't build up an unlimited number of go routines eventually exhausting the stack and OOMing the process 2. Added an optional log.Logger member to the server so that the library isn't just spitting error messages at stdout. If the logger is not specified the old behavior is maintained. --- network/crypto.go | 6 ++--- network/fuzz.go | 1 + network/fuzz_test.go | 1 + network/main.go | 9 +++++++- network/network_x_test.go | 3 ++- network/parse.go | 12 +++++----- network/server.go | 48 ++++++++++++++++++++++++++++++--------- 7 files changed, 57 insertions(+), 23 deletions(-) diff --git a/network/crypto.go b/network/crypto.go index c98523c..c37e662 100644 --- a/network/crypto.go +++ b/network/crypto.go @@ -13,7 +13,6 @@ import ( "errors" "fmt" "io" - "log" "os" "strings" "sync" @@ -32,8 +31,8 @@ type PasswordLookup interface { // The file has a very simple syntax with one username / password mapping per // line, separated by a colon. For example: // -// alice: w0nderl4nd -// bob: bu1|der +// alice: w0nderl4nd +// bob: bu1|der type AuthFile struct { name string last time.Time @@ -178,7 +177,6 @@ func createCipher(password string, iv []byte) (cipher.Stream, error) { func encryptAES256(plaintext []byte, username, password string) ([]byte, error) { iv := make([]byte, 16) if _, err := rand.Read(iv); err != nil { - log.Printf("rand.Read: %v", err) return nil, err } diff --git a/network/fuzz.go b/network/fuzz.go index 8dbc225..462ee14 100644 --- a/network/fuzz.go +++ b/network/fuzz.go @@ -1,3 +1,4 @@ +//go:build gofuzz // +build gofuzz package network // import "collectd.org/network" diff --git a/network/fuzz_test.go b/network/fuzz_test.go index 367df29..2af97d5 100644 --- a/network/fuzz_test.go +++ b/network/fuzz_test.go @@ -1,3 +1,4 @@ +//go:build gofuzz // +build gofuzz package network // import "collectd.org/network" diff --git a/network/main.go b/network/main.go index 7cc76f4..46373ff 100644 --- a/network/main.go +++ b/network/main.go @@ -13,9 +13,16 @@ const ( // DefaultBufferSize is the default size of "Buffer". This is based on the // maximum bytes that fit into an Ethernet frame without fragmentation: -// - ( + ) = 1500 - (40 + 8) = 1452 +// +// - ( + ) = 1500 - (40 + 8) = 1452 const DefaultBufferSize = 1452 +// DefaultDispatcherBufferSize is the default depth on the dispatcher channel which +// asynchronously reades value sets from the main server Listener loop and writes them into the api.Writer +// This allows high throughput and asynchronous writes without creating the situation where an api.Writer +// that blocks can cause an infinite buildup go in flight messages and eventual OOM. +const DefaultDispatcherBufferSize = 1024 + // Numeric data source type identifiers. const ( dsTypeCounter = 0 diff --git a/network/network_x_test.go b/network/network_x_test.go index d6f4d41..2ce21f7 100644 --- a/network/network_x_test.go +++ b/network/network_x_test.go @@ -69,7 +69,8 @@ func TestNetwork(t *testing.T) { Password: password, }) if err != nil { - t.Fatal(err) + t.Error(err) + return } vl := &api.ValueList{ diff --git a/network/parse.go b/network/parse.go index 0779c58..6669eab 100644 --- a/network/parse.go +++ b/network/parse.go @@ -33,7 +33,7 @@ type ParseOpts struct { // a parse error is encountered, all ValueLists parsed to this point are // returned as well as the error. Unknown "parts" are silently ignored. func Parse(b []byte, opts ParseOpts) ([]*api.ValueList, error) { - return parse(b, None, opts) + return parse(b, None, opts, log.Default()) } func readUint16(buf *bytes.Buffer) (uint16, error) { @@ -44,7 +44,7 @@ func readUint16(buf *bytes.Buffer) (uint16, error) { return binary.BigEndian.Uint16(read), nil } -func parse(b []byte, sl SecurityLevel, opts ParseOpts) ([]*api.ValueList, error) { +func parse(b []byte, sl SecurityLevel, opts ParseOpts, lgr *log.Logger) ([]*api.ValueList, error) { var valueLists []*api.ValueList var state api.ValueList @@ -96,7 +96,7 @@ func parse(b []byte, sl SecurityLevel, opts ParseOpts) ([]*api.ValueList, error) if opts.TypesDB != nil { ds, ok := opts.TypesDB.DataSet(state.Type) if !ok { - log.Printf("unable to find %q in TypesDB", state.Type) + lgr.Printf("unable to find %q in TypesDB", state.Type) continue } @@ -110,7 +110,7 @@ func parse(b []byte, sl SecurityLevel, opts ParseOpts) ([]*api.ValueList, error) // Returns an error if the number of values is incorrect. v, err := ds.Values(ifValues...) if err != nil { - log.Printf("unable to convert metric %q, values %v according to %v in TypesDB: %v", state, ifValues, ds, err) + lgr.Printf("unable to convert metric %q, values %v according to %v in TypesDB: %v", state, ifValues, ds, err) continue } vl.Values = v @@ -243,7 +243,7 @@ func parseSignSHA256(pkg, payload []byte, opts ParseOpts) ([]*api.ValueList, err return nil, errors.New("SHA256 verification failure") } - return parse(payload, Sign, opts) + return parse(payload, Sign, opts, log.Default()) } func parseEncryptAES256(payload []byte, opts ParseOpts) ([]*api.ValueList, error) { @@ -252,7 +252,7 @@ func parseEncryptAES256(payload []byte, opts ParseOpts) ([]*api.ValueList, error return nil, errors.New("AES256 decryption failure") } - return parse(plaintext, Encrypt, opts) + return parse(plaintext, Encrypt, opts, log.Default()) } func parseInt(b []byte) (uint64, error) { diff --git a/network/server.go b/network/server.go index 5bded61..bf4dff7 100644 --- a/network/server.go +++ b/network/server.go @@ -39,6 +39,15 @@ type Server struct { // Interface is the name of the interface to use when subscribing to a // multicast group. Has no effect when using unicast. Interface string + + // Channel buffer on dispatcher, this limits how many packets/value lists + // can be held in flight before we block on reading new value lists from the network. + // Defaults to 1024 + DispatchBufferSize uint + + // Logger defines a log.Logger that can optionally be provided for handling log messages + // if none is provided a log.Default() is assigned + Logger *log.Logger } // ListenAndWrite listens on the provided UDP connection (or creates one using @@ -78,6 +87,13 @@ func (srv *Server) ListenAndWrite(ctx context.Context) error { if srv.BufferSize <= 0 { srv.BufferSize = DefaultBufferSize } + if srv.DispatchBufferSize <= 0 { + srv.BufferSize = DefaultDispatcherBufferSize + } + + if srv.Logger == nil { + srv.Logger = log.Default() + } popts := ParseOpts{ PasswordLookup: srv.PasswordLookup, @@ -94,11 +110,17 @@ func (srv *Server) ListenAndWrite(ctx context.Context) error { }() var wg sync.WaitGroup + + valueListChan := make(chan []*api.ValueList, srv.DispatchBufferSize) + wg.Add(1) + go srv.dispatcher(ctx, &wg, valueListChan) + for { buf := make([]byte, srv.BufferSize) n, err := srv.Conn.Read(buf) if err != nil { srv.Conn.Close() + close(valueListChan) wg.Wait() if ctx.Err() != nil { return ctx.Err() @@ -108,22 +130,26 @@ func (srv *Server) ListenAndWrite(ctx context.Context) error { valueLists, err := Parse(buf[:n], popts) if err != nil { - log.Printf("error while parsing: %v", err) + srv.Logger.Printf("error while parsing: %v", err) continue } - - wg.Add(1) - go func() { - defer wg.Done() - dispatch(ctx, valueLists, srv.Writer) - }() + select { + case <-ctx.Done(): + //if the context closed, just continue, we will clean up on the next loop iteration + //when the srv.Conn.Read fails + case valueListChan <- valueLists: + //ALL good, we wrote to the channel + } } } -func dispatch(ctx context.Context, valueLists []*api.ValueList, d api.Writer) { - for _, vl := range valueLists { - if err := d.Write(ctx, vl); err != nil { - log.Printf("error while dispatching: %v", err) +func (srv *Server) dispatcher(ctx context.Context, wg *sync.WaitGroup, valueListChan chan []*api.ValueList) { + defer wg.Done() + for vl := range valueListChan { + for _, v := range vl { + if err := srv.Writer.Write(ctx, v); err != nil { + srv.Logger.Printf("error while dispatching: %v", err) + } } } } From 77f44d9949204518f0fb14c908b9d8afc6730666 Mon Sep 17 00:00:00 2001 From: kris Date: Tue, 20 Aug 2024 14:13:28 -0600 Subject: [PATCH 2/2] typo --- network/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/main.go b/network/main.go index 46373ff..0aeda13 100644 --- a/network/main.go +++ b/network/main.go @@ -18,7 +18,7 @@ const ( const DefaultBufferSize = 1452 // DefaultDispatcherBufferSize is the default depth on the dispatcher channel which -// asynchronously reades value sets from the main server Listener loop and writes them into the api.Writer +// asynchronously reads value sets from the main server Listener loop and writes them into the api.Writer // This allows high throughput and asynchronous writes without creating the situation where an api.Writer // that blocks can cause an infinite buildup go in flight messages and eventual OOM. const DefaultDispatcherBufferSize = 1024