Skip to content

Commit

Permalink
add file truncate setting
Browse files Browse the repository at this point in the history
  • Loading branch information
davidnewhall committed Feb 17, 2024
1 parent 6503728 commit a38d41a
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 23 deletions.
6 changes: 3 additions & 3 deletions fog.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ buffer_udp = 1048576
# FW_BUFFER_PACKET: This is the size of the read buffer per packet.
buffer_packet = 102400
# FW_LISTENERS: How many UDP socket listener threads to start.
listeners = 10
listeners = 1
# FW_PROCESSORS: How many packet processor threads to start.
processors = 10
processors = 1
# FW_DEBUG: Prints 1 line per packet when enabled.
debug = true
debug = false
20 changes: 16 additions & 4 deletions pkg/buf/buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

const (
FileMode = 0o644
FileMode = 0o664
DirMode = 0o755
)

Expand Down Expand Up @@ -48,20 +48,27 @@ func NewBuffer(path string, data []byte, logger Logger) *FileBuffer {
}

// Write sends content to the file buffer and increments the write counter.
// We added a mutex that makes this thread safe.
func (f *FileBuffer) Write(p []byte) (int, error) {
f.mu.Lock()
defer f.mu.Unlock()
f.writes++

return f.buf.Write(p)
return f.buf.Write(p) //nolint:wrapcheck
}

func (f *FileBuffer) Len() int {
return f.buf.Len()
}

// FlusOpts allows passing data into the file flusher.
type FlusOpts struct {
// Delete the file contents before writing?
Truncate bool
}

// Flush writes the file buffer to disk.
func (f *FileBuffer) Flush() {
func (f *FileBuffer) Flush(opts FlusOpts) {
f.mu.Lock()
defer f.mu.Unlock()

Expand All @@ -70,7 +77,12 @@ func (f *FileBuffer) Flush() {
f.Errorf("Creating dir for %s: %v", f.Path, err)
}

file, err := os.OpenFile(f.Path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, FileMode)
fileFlag := os.O_APPEND | os.O_CREATE | os.O_WRONLY
if opts.Truncate {
fileFlag = os.O_TRUNC | os.O_CREATE | os.O_WRONLY
}

file, err := os.OpenFile(f.Path, fileFlag, FileMode)
if err != nil {
f.Errorf("Opening or creating file %s: %v", f.Path, err)
return
Expand Down
10 changes: 5 additions & 5 deletions pkg/fog/fog.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type packet struct {
data []byte
size int
addr *net.UDPAddr
count uint
count uint64
}

// packetListener gets raw packets from the UDP socket and sends them to another go routine.
Expand All @@ -26,7 +26,7 @@ func (c *Config) packetListener(idx uint) {

var err error

for count := uint(0); ; count++ {
for count := uint64(0); ; count++ {
packet := &packet{data: make([]byte, c.BufferPacket), count: count}

packet.size, packet.addr, err = c.sock.ReadFromUDP(packet.data)
Expand Down Expand Up @@ -82,9 +82,9 @@ func (p *packet) Handler(config *Config, memory *willow.Willow) {
config.Errorf("Adding %d bytes to buffer (%d) for %s", p.size, fileBuffer.Len(), filePath)
}

if settings["flush"] == "true" {
fileBuffer.Flush() // write to disk
memory.Delete(filePath) // remove from memory
if trunc := settings["truncate"] == "true"; trunc || settings["flush"] == "true" {
fileBuffer.Flush(buf.FlusOpts{Truncate: trunc}) // write to disk
memory.Delete(filePath) // remove from memory
}
}

Expand Down
20 changes: 10 additions & 10 deletions pkg/fog/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ const (

type Config struct {
*willow.Config
OutputPath string `toml:"output_path" xml:"output_path"`
ListenAddr string `toml:"listen_addr" xml:"listen_addr"`
LogFile string `toml:"log_file" xml:"log_file"`
LogFileMB uint `toml:"log_file_mb" xml:"log_file_mb"`
LogFiles uint `toml:"log_files" xml:"log_files"`
BufferUDP uint `toml:"buffer_udp" xml:"buffer_udp"`
BufferPacket uint `toml:"buffer_packet" xml:"buffer_packet"`
Listeners uint `toml:"listeners" xml:"listeners"`
Processors uint `toml:"processors" xml:"processors"`
Debug bool `toml:"debug" xml:"debug"`
OutputPath string `toml:"output_path" xml:"output_path"`
ListenAddr string `toml:"listen_addr" xml:"listen_addr"`
LogFile string `toml:"log_file" xml:"log_file"`
LogFileMB uint `toml:"log_file_mb" xml:"log_file_mb"`
LogFiles uint `toml:"log_files" xml:"log_files"`
BufferUDP uint `toml:"buffer_udp" xml:"buffer_udp"`
BufferPacket uint `toml:"buffer_packet" xml:"buffer_packet"`
Listeners uint `toml:"listeners" xml:"listeners"`
Processors uint `toml:"processors" xml:"processors"`
Debug bool `toml:"debug" xml:"debug"`
log *log.Logger
packets chan *packet
sock *net.UDPConn
Expand Down
2 changes: 1 addition & 1 deletion pkg/willow/willow.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (w *Willow) washer(now time.Time) {
continue
}

go file.Flush()
go file.Flush(buf.FlusOpts{})
delete(w.memory, path)
}
}

0 comments on commit a38d41a

Please sign in to comment.