Skip to content

Commit

Permalink
Multi-threaded accelerometer
Browse files Browse the repository at this point in the history
  • Loading branch information
davidnewhall committed Feb 16, 2024
1 parent 0a61c1b commit 6503728
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 138 deletions.
14 changes: 8 additions & 6 deletions fog.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# FW_LISTEN_ADDR: IP and port to listen on.
listen_addr = ":12345"
listen_addr = ":9000"
# FW_FLUSH_INTERVAL: How old an item must be to flush it to disk.
flush_interval = "16s"
# FW_GROUP_INTERVAL: How often to scan everything for flushable files.
Expand All @@ -12,11 +12,13 @@ log_file = ""
log_file_mb = 0
# FW_LOG_FILES: How many logs files to keep around.
log_files = 0
# FW_BUFFER_UDP: This is the UDP socket buffer.
# FW_BUFFER_UDP: This is the UDP socket buffer in bytes.
buffer_udp = 1048576
# FW_BUFFER_PACKET: This is the size of the read buffer we use.
# FW_BUFFER_PACKET: This is the size of the read buffer per packet.
buffer_packet = 102400
# FW_BUFFER_CHANNEL: This is how many items fit in memory before we drop packets. maybe.
buffer_channel = 10240
# FW_LISTENERS: How many UDP socket listener threads to start.
listeners = 10
# FW_PROCESSORS: How many packet processor threads to start.
processors = 10
# FW_DEBUG: Prints 1 line per packet when enabled.
debug = false
debug = true
88 changes: 88 additions & 0 deletions pkg/buf/buf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Package buf provides a small file buffer that can be flushed to disk.
// Used by willow package.
package buf

import (
"bytes"
"os"
"path/filepath"
"sync"
"time"
)

const (
FileMode = 0o644
DirMode = 0o755
)

// FileBuffer holds a file before it gets flushed to disk.
type FileBuffer struct {
Logger
FirstWrite time.Time
writes uint
mu sync.Mutex
buf *bytes.Buffer
Path string
}

// Logger lets this sub module print messages.
type Logger interface {
Errorf(msg string, v ...interface{})
Printf(msg string, v ...interface{})
Debugf(msg string, v ...interface{})
}

// NewBuffer returns a new FileBuffer read to use.
func NewBuffer(path string, data []byte, logger Logger) *FileBuffer {
if logger == nil {
panic("NewBuffer() cannot take a nil logger")
}

return &FileBuffer{
Logger: logger,
FirstWrite: time.Now(),
writes: 1,
buf: bytes.NewBuffer(data),
Path: path,
}
}

// Write sends content to the file buffer and increments the write counter.
func (f *FileBuffer) Write(p []byte) (int, error) {
f.mu.Lock()
defer f.mu.Unlock()
f.writes++

return f.buf.Write(p)
}

func (f *FileBuffer) Len() int {
return f.buf.Len()
}

// Flush writes the file buffer to disk.
func (f *FileBuffer) Flush() {
f.mu.Lock()
defer f.mu.Unlock()

if err := os.MkdirAll(filepath.Dir(f.Path), DirMode); err != nil {
// We could return here, but let's try to write the file anyway?
f.Errorf("Creating dir for %s: %v", f.Path, err)
}

file, err := os.OpenFile(f.Path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, FileMode)
if err != nil {
f.Errorf("Opening or creating file %s: %v", f.Path, err)
return
}
defer file.Close()

size, err := file.Write(f.buf.Bytes())
if err != nil {
// Since all we do is print an info message, we do not need to return here.
// Consider that if you add more logic after this stanza.
f.Errorf("Writing file '%s' content: %v", f.Path, err)
}

f.Printf("Wrote %d bytes (%d lines) to '%s'", size, f.writes, f.Path)
}
127 changes: 30 additions & 97 deletions pkg/fog/fog.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"errors"
"fmt"
"net"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/Notifiarr/fogwillow/pkg/buf"
"github.com/Notifiarr/fogwillow/pkg/willow"
)

type packet struct {
Expand All @@ -19,18 +20,10 @@ type packet struct {
count uint
}

type fileBuffer struct {
*bytes.Buffer
path string
config *Config
firstWrite time.Time
writes uint
}

type fileMemMap map[string]*fileBuffer
// packetListener gets raw packets from the UDP socket and sends them to another go routine.
func (c *Config) packetListener(idx uint) {
c.Printf("Starting UDP packet listener %d.", idx)

// packetReader gets raw packets from the UDP socket and sends them to another go routine using a buffered channel.
func (c *Config) packetReader() {
var err error

for count := uint(0); ; count++ {
Expand All @@ -39,61 +32,37 @@ func (c *Config) packetReader() {
packet.size, packet.addr, err = c.sock.ReadFromUDP(packet.data)
if errors.Is(err, net.ErrClosed) {
// This happens on normal shutdown.
c.Printf("Closing UDP packet reader: %v", err)
c.Printf("Closing UDP packet listener %d: %v", idx, err)
return
} else if err != nil {
// This is probably rare.
c.Errorf("Reading UDP socket: %v", err)
c.Errorf("Reading UDP socket %d: %v", idx, err)
continue
}

c.Debugf("Got packet %d from %s at %d bytes; channel buffer: %d ",
packet.count, packet.addr, packet.size, len(c.packets))
c.Debugf("Thread %d got packet %d from %s at %d bytes; buffer: %d/%d",
idx, packet.count, packet.addr, packet.size, len(c.packets), cap(c.packets))

c.packets <- packet
}
}

// packetListener receives packets from packetReader using a buffered channel.
// This function also maintains the temporary in-memory buffer for all packets.
func (c *Config) packetListener() {
defer c.Printf("Closing UDP packet listener")
// How often do we scan all rows and check for expired items?
groups := time.NewTicker(c.GroupInterval.Duration)
defer groups.Stop()

memory := make(fileMemMap)

for {
select {
case now := <-groups.C:
c.cleanMemory(now, memory)
case packet, ok := <-c.packets:
if !ok {
return
}

packet.Handler(c, memory)
}
}
}

func (c *Config) cleanMemory(now time.Time, memory fileMemMap) {
for path, file := range memory {
if now.Sub(file.firstWrite) < c.FlushInterval.Duration {
continue
}
// packetProcessor receives packets from packetReader using a buffered channel.
// This procedure launches the packet handler.
func (c *Config) packetProcessor(idx uint) {
c.Printf("Starting UDP packet processor %d.", idx)
defer c.Printf("Closing UDP packet listener %d.", idx)

file.Flush()
delete(memory, path)
for packet := range c.packets {
packet.Handler(c, c.willow)
}
}

var ErrInvalidPacket = fmt.Errorf("invalid packet")

// Handler is invoked by packetListener for every received packet.
// This is where the packet is parsed and stored into memory for later flushing to disk.
func (p *packet) Handler(config *Config, memory fileMemMap) {
func (p *packet) Handler(config *Config, memory *willow.Willow) {
settings, body, err := p.parse()
if err != nil {
config.Errorf("%v", err)
Expand All @@ -102,32 +71,20 @@ func (p *packet) Handler(config *Config, memory fileMemMap) {

// Combine our base path with the filename path provided in the packet.
filePath := filepath.Join(config.OutputPath, settings["filepath"])

if settings["flush"] == "true" {
defer func() {
memory[filePath].Flush()
delete(memory, filePath)
}()
}

if memory[filePath] == nil {
// This creates the initial buffer, and never returns an error.
memory[filePath] = &fileBuffer{
Buffer: bytes.NewBuffer(body),
path: filePath,
config: config,
firstWrite: time.Now(),
writes: 1,
}

return
fileBuffer := memory.Get(filePath)

if fileBuffer == nil {
// Create a new fileBuffer.
fileBuffer = buf.NewBuffer(filePath, body, config)
// Save the new file buffer in memory.
memory.Set(fileBuffer)
} else if _, err := fileBuffer.Write(body); err != nil { // Append directly to existing buffer.
config.Errorf("Adding %d bytes to buffer (%d) for %s", p.size, fileBuffer.Len(), filePath)
}

// We can use write count to create metrics.
memory[filePath].writes++
// If a buffer already exists, this appends directly to it.
if _, err := memory[filePath].Write(body); err != nil {
config.Errorf("Adding %d bytes to buffer (%d) for %s", p.size, memory[filePath].Len(), filePath)
if settings["flush"] == "true" {
fileBuffer.Flush() // write to disk
memory.Delete(filePath) // remove from memory
}
}

Expand Down Expand Up @@ -172,27 +129,3 @@ func (p *packet) parse() (map[string]string, []byte, error) {

return settings, p.data[lastline:p.size], nil
}

// Flush writes the file buffer to disk.
func (f *fileBuffer) Flush() {
if err := os.MkdirAll(filepath.Dir(f.path), DefaultDirMode); err != nil {
// We could return here, but let's try to write the file anyway?
f.config.Errorf("Creating dir for %s: %v", f.path, err)
}

file, err := os.OpenFile(f.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, DefaultFileMode)
if err != nil {
f.config.Errorf("Opening or creating file %s: %v", f.path, err)
return
}
defer file.Close()

size, err := file.Write(f.Bytes())
if err != nil {
// Since all we do is print an info message, we do not need to return here.
// Consider that if you add more logic after this stanza.
f.config.Errorf("Writing file '%s' content: %v", f.path, err)
}

f.config.Printf("Wrote %d bytes (%d lines) to '%s'", size, f.writes, f.path)
}
14 changes: 9 additions & 5 deletions pkg/fog/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import (
"golift.io/rotatorr/timerotator"
)

const (
logFileMode = 0o644
megabyte = 1024 * 1024
)

// SetupLogs starts the logs rotation and sets logger output to the configured file(s).
// You must call this before calling Start to setup logs, or things will panic.
//
//nolint:gomnd
func (c *Config) SetupLogs() {
if c.LogFile == "" {
c.log = log.New(os.Stderr, "", log.LstdFlags)
Expand All @@ -26,8 +29,8 @@ func (c *Config) SetupLogs() {

rotator = rotatorr.NewMust(&rotatorr.Config{
Filepath: c.LogFile,
FileSize: int64(c.LogFileMB * 1024 * 1024),
FileMode: 0o644,
FileSize: int64(c.LogFileMB * megabyte),
FileMode: logFileMode,
Rotatorr: &timerotator.Layout{
FileCount: int(c.LogFiles),
PostRotate: postRotate,
Expand Down Expand Up @@ -62,7 +65,8 @@ func (c *Config) PrintConfig() {
c.Printf("=> Listen Address: %s", c.ListenAddr)
c.Printf("=> Output Path: %s", c.OutputPath)
c.Printf("=> Flush Interval: %s", c.FlushInterval)
c.Printf("=> Buffers; UDP/Packet/Channel: %d/%d/%d", c.BufferUDP, c.BufferPacket, c.BufferChannel)
c.Printf("=> Buffers; UDP/Packet: %d/%d", c.BufferUDP, c.BufferPacket)
c.Printf("=> Threads; Listen/Process: %d/%d", c.Listeners, c.Processors)

if c.LogFile != "" {
c.Printf("=> Log File: %s (count: %d, size: %dMB)", c.LogFile, c.LogFiles, c.LogFileMB)
Expand Down
Loading

0 comments on commit 6503728

Please sign in to comment.