Skip to content

Commit 8572d7f

Browse files
committed
newlog: add filter, dedup and counter functions
Newlog will have 3 ways to reduce the amount of logs: - filter: filter logs out based on the source code line that produced them - counter: count the number of logs produced by a specific source code line. Add that number to the first occurance of the log and remove the rest - deduplicator (for errors only): record the last X errors in a sliding window and remove duplicates The benchmarking of the newlogd with the new features is in the dedup_test.go file. It shows that CPU and RAM usage increase by a factor of 3 when the features are enabled. So they can be disabled by setting the deduplication window size to 0 and not providing anything to the filter and counter functions. Signed-off-by: Paul Gaiduk <paulg@zededa.com>
1 parent c690975 commit 8572d7f

8 files changed

Lines changed: 589 additions & 16 deletions

File tree

pkg/newlog/cmd/counter.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright (c) 2025 Zededa, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package main
5+
6+
import (
7+
"bufio"
8+
"encoding/json"
9+
"fmt"
10+
"os"
11+
"sync/atomic"
12+
13+
"github.com/lf-edge/eve-api/go/logs"
14+
)
15+
16+
var (
17+
// logsToCount is a list of Filename fields (src file + line number) of log entries that should be counted.
18+
logsToCount atomic.Value
19+
20+
counterSuppressedLogs = 0
21+
)
22+
23+
func countLogsInFile(file *os.File) map[string]int {
24+
logCounter := make(map[string]int)
25+
for _, logSrcLine := range logsToCount.Load().([]string) {
26+
logCounter[logSrcLine] = 0
27+
}
28+
preScanner := bufio.NewScanner(file)
29+
for preScanner.Scan() {
30+
var logEntry logs.LogEntry
31+
// we ignore the errors here, they might be coming from non-json lines like the metadata line
32+
_ = json.Unmarshal(preScanner.Bytes(), &logEntry)
33+
34+
if currentCount, ok := logCounter[logEntry.Filename]; ok {
35+
logCounter[logEntry.Filename] = currentCount + 1
36+
}
37+
}
38+
if err := preScanner.Err(); err != nil {
39+
log.Errorf("Error scanning file for log occurrence count: %v", err)
40+
}
41+
if _, err := file.Seek(0, 0); err != nil {
42+
log.Errorf("Failed to reset file pointer: %v", err)
43+
}
44+
return logCounter
45+
}
46+
47+
// addLogCount updates the log entry with a count tag based on the occurrence count
48+
// provided in the filterMap, and manages suppression of duplicate log entries.
49+
// It returns true if the log entry should be included in the output and false if it should be suppressed.
50+
func addLogCount(logEntry *logs.LogEntry, filterMap map[string]int) bool {
51+
if count, ok := filterMap[logEntry.Filename]; !ok {
52+
return true
53+
} else {
54+
if count == 0 {
55+
// the count was already included in another entry
56+
counterSuppressedLogs++
57+
return false
58+
}
59+
60+
if count == 1 {
61+
// no need to add additional count field if there is only one occurrence
62+
return true
63+
}
64+
65+
if logEntry.Tags == nil {
66+
logEntry.Tags = make(map[string]string)
67+
}
68+
logEntry.Tags["count"] = fmt.Sprint(count)
69+
70+
// mark the log entry as counted
71+
filterMap[logEntry.Filename] = 0
72+
73+
return true
74+
}
75+
}

pkg/newlog/cmd/dedup.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
// Copyright (c) 2025 Zededa, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package main
5+
6+
import (
7+
"container/ring"
8+
"encoding/json"
9+
"sync/atomic"
10+
11+
"github.com/lf-edge/eve-api/go/logs"
12+
)
13+
14+
var dedupWindowSize atomic.Uint32
15+
16+
func init() {
17+
dedupWindowSize.Store(100)
18+
}
19+
20+
type containsMsg struct {
21+
Msg string `json:"msg"`
22+
}
23+
24+
var numDedupedLogs = 0
25+
26+
// deduplicateLogs can be used to deduplicate logs on the fly reading from a channel
27+
// and writing to another channel
28+
func deduplicateLogs(in <-chan inputEntry, out chan<- inputEntry) {
29+
// 'seen' counts occurrences of each file in the current window.
30+
seen := make(map[string]string)
31+
// 'queue' holds the file fields of the last bufferSize logs.
32+
// TODO changing dedupWindowSize will have no effect
33+
// since the ring buffer is created with the initial value of dedupWindowSize
34+
// Need to find another solution here
35+
queue := ring.New(int(dedupWindowSize.Load()))
36+
37+
for logEntry := range in {
38+
// No queue means no deduplication.
39+
if queue == nil {
40+
out <- logEntry
41+
continue
42+
}
43+
44+
dedupField := ""
45+
// If logEntry.content is a valid JSON, extract the field "msg" from it.
46+
if logEntry.content != "" {
47+
var content containsMsg
48+
err := json.Unmarshal([]byte(logEntry.content), &content)
49+
if err == nil && content.Msg != "" {
50+
dedupField = content.Msg
51+
} else {
52+
dedupField = logEntry.content
53+
}
54+
}
55+
56+
// If the file hasn't appeared in the last bufferSize logs, forward it.
57+
if _, ok := seen[dedupField]; ok && logEntry.severity == "error" {
58+
log.Tracef("Deduped log at %s because of the log at %s\n", logEntry.timestamp, seen[dedupField])
59+
numDedupedLogs++
60+
} else {
61+
out <- logEntry
62+
}
63+
64+
// Remove the oldest log from the window.
65+
if oldest := queue.Value; oldest != nil {
66+
delete(seen, oldest.(string))
67+
}
68+
69+
// Add the current log to the window.
70+
queue.Value = dedupField
71+
seen[dedupField] = logEntry.timestamp
72+
73+
// Move the window.
74+
queue = queue.Next()
75+
}
76+
77+
close(out)
78+
}
79+
80+
// dedupLogEntry returns a boolean indicating whether the log entry should be used and the updated queue.
81+
// It can be used to deduplicate logs based on the content of a file
82+
func dedupLogEntry(logEntry *logs.LogEntry, seen map[string]uint64, queue *ring.Ring) (bool, *ring.Ring) {
83+
useEntry := true
84+
85+
// No queue means no deduplication.
86+
if queue == nil {
87+
return useEntry, queue
88+
}
89+
90+
dedupField := ""
91+
// If logEntry.content is a valid JSON, extract the field "msg" from it.
92+
if logEntry.Content != "" {
93+
var content containsMsg
94+
err := json.Unmarshal([]byte(logEntry.Content), &content)
95+
if err == nil && content.Msg != "" {
96+
dedupField = content.Msg
97+
} else {
98+
dedupField = logEntry.Content
99+
}
100+
}
101+
102+
// If the file hasn't appeared in the last bufferSize logs, forward it.
103+
if _, ok := seen[dedupField]; ok && logEntry.Severity == "error" {
104+
useEntry = false
105+
log.Tracef("Deduped log id %d because of the log id %d\n", logEntry.Msgid, seen[dedupField])
106+
numDedupedLogs++
107+
} else {
108+
useEntry = true
109+
}
110+
111+
// Remove the oldest log from the window.
112+
if oldest := queue.Value; oldest != nil {
113+
delete(seen, oldest.(string))
114+
}
115+
116+
// Add the current log to the window.
117+
queue.Value = dedupField
118+
seen[dedupField] = logEntry.Msgid
119+
120+
return useEntry, queue.Next()
121+
}

0 commit comments

Comments
 (0)