Skip to content

Commit

Permalink
add delete and password settings
Browse files Browse the repository at this point in the history
  • Loading branch information
davidnewhall committed Feb 18, 2024
1 parent 97afe9a commit 0895c5e
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 17 deletions.
10 changes: 8 additions & 2 deletions fog.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## Example Fog Willow config file.
## The values below are the defaults.
#####################################

# FW_PASSWORD: If a password is set then clients must send it as a setting.
password = ""
# FW_LISTEN_ADDR: IP and port to listen on.
listen_addr = ":9000"
# FW_FLUSH_INTERVAL: How old an item must be to flush it to disk.
Expand All @@ -15,9 +21,9 @@ log_files = 0
# FW_BUFFER_UDP: This is the UDP socket buffer in bytes.
buffer_udp = 1048576
# FW_BUFFER_PACKET: This is the size of the read buffer per packet.
buffer_packet = 102400
buffer_packet = 8192
# FW_BUFFER_CHAN: This is the channel buffer between the listeners and processors.
buffer_chan = 1024
buffer_chan = 0
# FW_LISTENERS: How many UDP socket listener threads to start.
listeners = 1
# FW_PROCESSORS: How many packet processor threads to start.
Expand Down
17 changes: 17 additions & 0 deletions pkg/buf/buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,23 @@ type FlusOpts struct {
Truncate bool
}

// RmRfDir deletes the path in the fileBuffer. This is dangerous and destructive.
func (f *FileBuffer) RmRfDir() {
f.mu.Lock()
defer f.mu.Unlock()

f.Debugf("Deleting recursively: %s", f.Path)

start := time.Now()

if err := os.RemoveAll(f.Path); err != nil {
f.Errorf("Deleting path %s: %v", f.Path, err)
return
}

f.Printf("Deleted %s in %s", f.Path, time.Since(start))
}

// Flush writes the file buffer to disk.
func (f *FileBuffer) Flush(opts FlusOpts) {
f.mu.Lock()
Expand Down
82 changes: 68 additions & 14 deletions pkg/fog/fog.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,20 @@ type packet struct {
count uint64
}

// List of settings we recognize from packet parsing.
const (
setPassword = "password"
setTruncate = "truncate"
setFilepath = "filepath"
setDelete = "delete"
setFlush = "flush"
)

var (
ErrInvalidPacket = fmt.Errorf("invalid packet")
ErrBadPassword = fmt.Errorf("bad password")
)

// packetListener gets raw packets from the UDP socket and sends them to another go routine.
func (c *Config) packetListener(idx uint) {
c.Printf("Starting UDP packet listener %d.", idx)
Expand Down Expand Up @@ -58,8 +72,6 @@ func (c *Config) packetProcessor(idx uint) {
}
}

var ErrInvalidPacket = fmt.Errorf("invalid packet")

// Handler is invoked by packetListener for every received packet.
// This is where the packet is parsed and stored into memory for later flushing to disk.
func (p *packet) Handler(config *Config, memory *willow.Willow) {
Expand All @@ -69,8 +81,13 @@ func (p *packet) Handler(config *Config, memory *willow.Willow) {
return
}

if err := p.check(settings, config.Password); err != nil {
config.Errorf("%v", err)
return
}

// Combine our base path with the filename path provided in the packet.
filePath := filepath.Join(config.OutputPath, strings.TrimPrefix(settings["filepath"], config.OutputPath))
filePath := settings[setFilepath].PrefixPath(config.OutputPath)
fileBuffer := memory.Get(filePath)

if fileBuffer == nil {
Expand All @@ -82,14 +99,18 @@ func (p *packet) Handler(config *Config, memory *willow.Willow) {
config.Errorf("Adding %d bytes to buffer (%d) for %s", p.size, fileBuffer.Len(), filePath)
}

if trunc := settings["truncate"] == "true"; trunc || settings["flush"] == "true" {
if settings[setDelete].True() {
go fileBuffer.RmRfDir()
} else if trunc := settings[setTruncate].True(); trunc || settings[setFlush].True() {
fileBuffer.Flush(buf.FlusOpts{Truncate: trunc}) // write to disk
memory.Delete(filePath) // remove from memory
}
}

// parse the packet into structured data.
//
//nolint:gomnd
func (p *packet) parse() (map[string]string, []byte, error) {
func (p *packet) parse() (map[string]setting, []byte, error) {
newline := bytes.IndexByte(p.data, '\n')
if newline < 0 {
return nil, nil, fmt.Errorf("%w from %s (first newline at %d)", ErrInvalidPacket, p.addr.IP, newline)
Expand All @@ -102,7 +123,7 @@ func (p *packet) parse() (map[string]string, []byte, error) {
err, p.addr.IP, newline, string(p.data[0:newline]))
}

settings := make(map[string]string, settingCount)
settings := make(map[string]setting, settingCount)
lastline := newline + 1 // +1 to remove the \n
// Parse each line 1 at a time and add them to the settings map.
for ; settingCount > 0; settingCount-- {
Expand All @@ -112,20 +133,53 @@ func (p *packet) parse() (map[string]string, []byte, error) {
ErrInvalidPacket, settingCount+len(settings), p.addr.IP, newline, lastline)
}
// Split the setting line on = to get name and value.
setting := strings.SplitN(string(p.data[lastline:newline+lastline]), "=", 2)
if len(setting) != 2 {
settingVal := strings.SplitN(string(p.data[lastline:newline+lastline]), "=", 2)
if len(settingVal) != 2 {
return nil, nil, fmt.Errorf("%w with %d settings from %s (newline/lastline: %d/%d): setting '%s' missing equal",
ErrInvalidPacket, settingCount+len(settings), p.addr.IP, newline, lastline, setting[0])
ErrInvalidPacket, settingCount+len(settings), p.addr.IP, newline, lastline, settingVal[0])
}
// Set the name and value, increment lastline and repeat.
settings[setting[0]] = setting[1]
settings[settingVal[0]] = setting(settingVal[1])
lastline += newline + 1 // +1 to remove the \n
}

if settings["filepath"] == "" {
return nil, nil, fmt.Errorf("%w from %s with %d settings and no filepath (newline/lastline: %d/%d) %s",
ErrInvalidPacket, p.addr.IP, len(settings), newline, lastline, settings)
return settings, p.data[lastline:p.size], nil
}

// check the packet for valid settings.
func (p *packet) check(settings map[string]setting, password string) error {
if settings[setFilepath].Empty() {
return fmt.Errorf("%w from %s with %d settings and no filepath",
ErrInvalidPacket, p.addr.IP, len(settings))
}

if password != "" && !settings[setPassword].Equals(password) {
return fmt.Errorf("%w from %s with %d settings", ErrBadPassword, p.addr.IP, len(settings))
}

return settings, p.data[lastline:p.size], nil
return nil
}

// setting lets us bind cool methods to our string settings.
type setting string

// PrefixPath trims and appends a root path to a setting path.
// Only really useful for the 'filepath' setting.
func (s setting) PrefixPath(path string) string {
return filepath.Join(path, strings.TrimPrefix(string(s), path))
}

// Equals returns true if the setting is equal to this value.
func (s setting) Equals(value string) bool {
return string(s) == value
}

// True returns true if the setting is a "true" string.
func (s setting) True() bool {
return string(s) == "true"
}

// Empty returns true if the setting is blank or nonexistent.
func (s setting) Empty() bool {
return string(s) == ""
}
3 changes: 2 additions & 1 deletion pkg/fog/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ const (
DefaultListenAddr = ":9000"
DefaultOutputPath = "/tmp"
DefaultUDPBuffer = 1024 * 1024
DefaultPacketBuffer = 1024 * 100
DefaultPacketBuffer = 1024 * 8
)

type Config struct {
*willow.Config
Password string `toml:"password" xml:"password"`
OutputPath string `toml:"output_path" xml:"output_path"`
ListenAddr string `toml:"listen_addr" xml:"listen_addr"`
LogFile string `toml:"log_file" xml:"log_file"`
Expand Down

0 comments on commit 0895c5e

Please sign in to comment.