diff --git a/.gitignore b/.gitignore index 1388a4c..600007c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ logs -metrics \ No newline at end of file +metrics +*.db diff --git a/cmd/sentinel/container-logs.go b/cmd/sentinel/container-logs.go index 256e301..7c97bec 100644 --- a/cmd/sentinel/container-logs.go +++ b/cmd/sentinel/container-logs.go @@ -52,12 +52,12 @@ func attachContainer(cont types.Container, ctx context.Context, apiClient *clien name = fmt.Sprintf("%s-pr-%s", name, cont.Labels["coolify.pullRequestId"]) } } - logFileName := fmt.Sprintf("%s/%s.txt", logsDir, name) - streamLogs(ctx, apiClient, cont, logFileName) + // logFileName := fmt.Sprintf("%s/%s.txt", logsDir, name) + streamLogs(ctx, apiClient, cont) }(cont) } -func streamLogs(ctx context.Context, apiClient *client.Client, cont types.Container, logFileName string) { +func streamLogs(ctx context.Context, apiClient *client.Client, cont types.Container) { out, err := apiClient.ContainerLogs(ctx, cont.ID, container.LogsOptions{ ShowStdout: true, ShowStderr: true, @@ -71,17 +71,17 @@ func streamLogs(ctx context.Context, apiClient *client.Client, cont types.Contai } defer out.Close() - logFile, err := os.OpenFile(logFileName, os.O_WRONLY|os.O_CREATE, 0666) - if err != nil { - fmt.Printf("Error opening log file %s: %s\n", logFileName, err) - return - } - defer logFile.Close() + // logFile, err := os.OpenFile(logFileName, os.O_WRONLY|os.O_CREATE, 0666) + // if err != nil { + // fmt.Printf("Error opening log file %s: %s\n", logFileName, err) + // return + // } + // defer logFile.Close() seenLines := make(map[string]bool) re := regexp.MustCompile(`\x1b\[[0-9;]*m`) - stdOutWriter := newRemovingWriter(logFile, seenLines, re) - stdErrWriter := newRemovingWriter(logFile, seenLines, re) + stdOutWriter := newRemovingWriter(os.Stdout, seenLines, re) + stdErrWriter := newRemovingWriter(os.Stderr, seenLines, re) if _, err := stdcopy.StdCopy(stdOutWriter, stdErrWriter, out); err != nil { fmt.Printf("Error saving logs for container %s: %s\n", cont.ID, err) diff --git a/cmd/sentinel/containers.go b/cmd/sentinel/containers.go index a0818bf..e3563af 100644 --- a/cmd/sentinel/containers.go +++ b/cmd/sentinel/containers.go @@ -6,10 +6,6 @@ import ( "fmt" "io" "log" - "os" - "strconv" - "strings" - "time" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" @@ -81,11 +77,12 @@ func getOneContainer(containerID string, csv bool) (string, error) { return string(jsonData), nil } -func getOneContainerMetrics(containerID string, csv bool) (string, error) { + +func getContainerMetrics(containerID string, csv bool) (*ContainerMetrics, error) { ctx := context.Background() apiClient, err := client.NewClientWithOpts() if err != nil { - return "", err + return nil, err } apiClient.NegotiateAPIVersion(ctx) defer apiClient.Close() @@ -98,11 +95,11 @@ func getOneContainerMetrics(containerID string, csv bool) (string, error) { } container, err := apiClient.ContainerInspect(ctx, containerID) if err != nil { - return "", err + return nil, err } stats, err := apiClient.ContainerStats(ctx, container.ID, false) if err != nil { - return "", err + return nil, err } var v types.StatsJSON dec := json.NewDecoder(stats.Body) @@ -129,77 +126,27 @@ func getOneContainerMetrics(containerID string, csv bool) (string, error) { MemoryAvailable: v.MemoryStats.Limit, NetworkUsage: metrics.NetworkUsage, } - jsonData, err := json.MarshalIndent(metrics, "", " ") - if err != nil { - return "", err - } - if csv { - return fmt.Sprintf("%s,%f,%d,%f\n", metrics.Time, metrics.CPUUsagePercentage, metrics.MemoryUsed, metrics.MemoryUsagePercentage), nil - } - return string(jsonData), nil + + return &metrics, nil } -func getHistoryContainerUsage(from string, to string, containerId string) (string, error) { - fileName := "container-" + containerId + ".csv" - containerFile := metricsDir + "/" + fileName - if from == "" && to == "" { - // return everything - file, err := os.ReadFile(containerFile) - if err != nil { - fmt.Println("Failed to read file:", err) - return "", err - } - return string(file), nil - } - if from == "" { - from = "1970-01-01T00:00:00Z" - } - if to == "" { - to = time.Now().UTC().Format(time.RFC3339) - } - fromTime, err := time.Parse(time.RFC3339, from) - if err != nil { - fmt.Println("Failed to parse from time:", err) - return "", err - } - toTime, err := time.Parse(time.RFC3339, to) +func getOneContainerMetrics(containerID string, csv bool) (string, error) { + + metrics, err := getContainerMetrics(containerID, csv) if err != nil { - fmt.Println("Failed to parse to time:", err) return "", err } - fromTimeUnix := fromTime.UnixMilli() - toTimeUnix := toTime.UnixMilli() - file, err := os.ReadFile(containerFile) + jsonData, err := json.MarshalIndent(metrics, "", " ") if err != nil { - fmt.Println("Failed to read file:", err) return "", err } - lines := string(file) - var result string - lines = lines[strings.Index(lines, "\n")+1:] - for _, line := range strings.Split(lines, "\n") { - if line == "" { - continue - } - parts := strings.Split(line, ",") - if len(parts) != 4 { - fmt.Println("Invalid line:", line) - continue - } - time, err := strconv.ParseInt(parts[0], 10, 64) - if err != nil { - fmt.Println("Failed to parse time:", err) - continue - } - if time >= fromTimeUnix && time <= toTimeUnix { - result += line + "\n" - } + if csv { + return fmt.Sprintf("%s,%f,%d,%f\n", metrics.Time, metrics.CPUUsagePercentage, metrics.MemoryUsed, metrics.MemoryUsagePercentage), nil } - result = containerMetricsCsvHeader + result - return result, nil - + return string(jsonData), nil } + func getAllContainers() (string, error) { ctx := context.Background() apiClient, err := client.NewClientWithOpts() diff --git a/cmd/sentinel/cpu.go b/cmd/sentinel/cpu.go index 87c5fce..9d2409b 100644 --- a/cmd/sentinel/cpu.go +++ b/cmd/sentinel/cpu.go @@ -3,9 +3,8 @@ package main import ( "encoding/json" "fmt" - "os" - "strconv" - "strings" + "log" + "sentinel/pkg/db" "time" "github.com/shirou/gopsutil/cpu" @@ -23,6 +22,20 @@ type CpuUsage struct { Percent string `json:"percent"` } +func CollectCpuUsage() { + queryTimeInUnixString := getUnixTimeInMilliUTC() + + overallPercentage, err := cpu.Percent(0, false) + if err != nil { + log.Printf("%v", err) + } + cpuUsage := CpuUsage{ + Time: queryTimeInUnixString, + Percent: fmt.Sprintf("%.2f", overallPercentage[0]), + } + db.Write("cpu", int(time.Now().Unix()), cpuUsage) +} + func getCpuUsage(csv bool) (string, error) { usages := make([]CpuUsage, 0) queryTimeInUnixString := getUnixTimeInMilliUTC() @@ -53,63 +66,3 @@ func getCpuUsage(csv bool) (string, error) { return string(jsonData), nil } - -func getHistoryCpuUsage(from string, to string) (string, error) { - if from == "" && to == "" { - // return everything - file, err := os.ReadFile(cpuMetricsFile) - if err != nil { - fmt.Println("Failed to read file:", err) - return "", err - } - return string(file), nil - } - if from == "" { - from = "1970-01-01T00:00:00Z" - } - if to == "" { - to = time.Now().UTC().Format(time.RFC3339) - } - fromTime, err := time.Parse(time.RFC3339, from) - if err != nil { - fmt.Println("Failed to parse from time:", err) - return "", err - } - toTime, err := time.Parse(time.RFC3339, to) - if err != nil { - fmt.Println("Failed to parse to time:", err) - return "", err - } - - fromTimeUnix := fromTime.UnixMilli() - toTimeUnix := toTime.UnixMilli() - file, err := os.ReadFile(cpuMetricsFile) - if err != nil { - fmt.Println("Failed to read file:", err) - return "", err - } - lines := string(file) - var result string - lines = lines[strings.Index(lines, "\n")+1:] - for _, line := range strings.Split(lines, "\n") { - if line == "" { - continue - } - parts := strings.Split(line, ",") - if len(parts) != 2 { - fmt.Println("Invalid line:", line) - continue - } - time, err := strconv.ParseInt(parts[0], 10, 64) - if err != nil { - fmt.Println("Failed to parse time:", err) - continue - } - if time >= fromTimeUnix && time <= toTimeUnix { - result += line + "\n" - } - } - result = cpuCsvHeader + result - return result, nil - -} diff --git a/cmd/sentinel/disk.go b/cmd/sentinel/disk.go index 18f7de9..db4a7e8 100644 --- a/cmd/sentinel/disk.go +++ b/cmd/sentinel/disk.go @@ -3,9 +3,8 @@ package main import ( "encoding/json" "fmt" - "os" - "strconv" - "strings" + "log" + "sentinel/pkg/db" "time" "github.com/shirou/gopsutil/disk" @@ -23,6 +22,44 @@ type DiskUsage struct { var diskCsvHeader = "time,disk,mount_point,total,free,usage_percent\n" +func CollectDiskUsage() { + + partitions, err := disk.Partitions(true) + queryTimeInUnixString := getUnixTimeInMilliUTC() + if err != nil { + log.Printf("%v", err) + return + } + var usages []DiskUsage + for _, partition := range partitions { + if partition.Mountpoint == "" { + continue + } + if partition.Mountpoint != "/" { + if partition.Fstype != "ext4" && partition.Fstype != "xfs" && partition.Fstype != "btrfs" && partition.Fstype != "zfs" && partition.Fstype != "ext3" && partition.Fstype != "ext2" && partition.Fstype != "ntfs" && partition.Fstype != "fat32" && partition.Fstype != "exfat" { + continue + } + } + + usage, err := disk.Usage(partition.Mountpoint) + if err != nil { + fmt.Printf("Failed to get usage for partition %s: %s\n", partition.Mountpoint, err) + continue + } + usedPercentage := fmt.Sprintf("%.1f", usage.UsedPercent) + usages = append(usages, DiskUsage{ + Time: queryTimeInUnixString, + Disk: partition.Device, + MountPoint: partition.Mountpoint, + Total: usage.Total, + Free: usage.Free, + Used: usage.Used, + Usage: usedPercentage, + }) + db.Write("disk", int(time.Now().Unix()), usages) + } +} + func getDiskUsage(csv bool) (string, error) { partitions, err := disk.Partitions(true) queryTimeInUnixString := getUnixTimeInMilliUTC() @@ -35,8 +72,10 @@ func getDiskUsage(csv bool) (string, error) { if partition.Mountpoint == "" { continue } - if partition.Fstype != "ext4" && partition.Fstype != "xfs" && partition.Fstype != "btrfs" && partition.Fstype != "zfs" && partition.Fstype != "ext3" && partition.Fstype != "ext2" && partition.Fstype != "ntfs" && partition.Fstype != "fat32" && partition.Fstype != "exfat" { - continue + if partition.Mountpoint != "/" { + if partition.Fstype != "ext4" && partition.Fstype != "xfs" && partition.Fstype != "btrfs" && partition.Fstype != "zfs" && partition.Fstype != "ext3" && partition.Fstype != "ext2" && partition.Fstype != "ntfs" && partition.Fstype != "fat32" && partition.Fstype != "exfat" { + continue + } } usage, err := disk.Usage(partition.Mountpoint) @@ -54,7 +93,6 @@ func getDiskUsage(csv bool) (string, error) { Used: usage.Used, Usage: usedPercentage, }) - } jsonData, err := json.MarshalIndent(usages, "", " ") if err != nil { @@ -70,61 +108,3 @@ func getDiskUsage(csv bool) (string, error) { return string(jsonData), nil } -func getHistoryDiskUsage(from string, to string) (string, error) { - if from == "" && to == "" { - file, err := os.ReadFile(diskMetricsFile) - if err != nil { - fmt.Println("Failed to read file:", err) - return "", err - } - return string(file), nil - } - if from == "" { - from = "1970-01-01T00:00:00Z" - } - if to == "" { - to = time.Now().UTC().Format(time.RFC3339) - } - fromTime, err := time.Parse(time.RFC3339, from) - if err != nil { - fmt.Println("Failed to parse from time:", err) - return "", err - } - toTime, err := time.Parse(time.RFC3339, to) - if err != nil { - fmt.Println("Failed to parse to time:", err) - return "", err - } - - fromTimeUnix := fromTime.UnixMilli() - toTimeUnix := toTime.UnixMilli() - file, err := os.ReadFile(diskMetricsFile) - if err != nil { - fmt.Println("Failed to read file:", err) - return "", err - } - lines := string(file) - var result string - lines = lines[strings.Index(lines, "\n")+1:] - for _, line := range strings.Split(lines, "\n") { - if line == "" { - continue - } - parts := strings.Split(line, ",") - if len(parts) != 4 { - fmt.Println("Invalid line:", line) - continue - } - time, err := strconv.ParseInt(parts[0], 10, 64) - if err != nil { - fmt.Println("Failed to parse time:", err) - continue - } - if time >= fromTimeUnix && time <= toTimeUnix { - result += line + "\n" - } - } - result = diskCsvHeader + result - return result, nil - -} diff --git a/cmd/sentinel/main.go b/cmd/sentinel/main.go index 3cc6403..a607e0b 100644 --- a/cmd/sentinel/main.go +++ b/cmd/sentinel/main.go @@ -6,23 +6,22 @@ import ( "fmt" "log" "os" + "sentinel/pkg/db" "strconv" + "time" "github.com/gin-gonic/gin" ) -var version string = "0.0.13" -var logsDir string = "/app/logs" -var metricsDir string = "/app/metrics" -var cpuMetricsFile string = metricsDir + "/cpu.csv" -var memoryMetricsFile string = metricsDir + "/memory.csv" -var diskMetricsFile string = metricsDir + "/disk.csv" +var version string = "0.0.14" +var databaseDir string = "/app/database" // Arguments var token string var refreshRateSeconds int = 5 var metricsHistoryInDays int = 30 var startScheduler bool = false +var serverPort string = ":8888" func Token() gin.HandlerFunc { return func(c *gin.Context) { @@ -39,52 +38,25 @@ func Token() gin.HandlerFunc { } } func main() { + if gin.Mode() == gin.DebugMode { - logsDir = "./logs" - metricsDir = "./metrics" - cpuMetricsFile = metricsDir + "/cpu.csv" - memoryMetricsFile = metricsDir + "/memory.csv" - diskMetricsFile = metricsDir + "/disk.csv" - } - if err := os.MkdirAll(logsDir, 0700); err != nil { - log.Fatalf("Error creating logs directory: %v", err) - } - if err := os.MkdirAll(metricsDir, 0700); err != nil { - log.Fatalf("Error creating metrics directory: %v", err) - } - if _, err := os.Stat(cpuMetricsFile); os.IsNotExist(err) { - err := os.WriteFile(cpuMetricsFile, []byte(cpuCsvHeader), 0644) - if err != nil { - fmt.Printf("Error writing file: %s", err) - return - } - } - if _, err := os.Stat(memoryMetricsFile); os.IsNotExist(err) { - err := os.WriteFile(memoryMetricsFile, []byte(memoryCsvHeader), 0644) - if err != nil { - fmt.Printf("Error writing file: %s", err) - return - } - } - if _, err := os.Stat(diskMetricsFile); os.IsNotExist(err) { - err := os.WriteFile(diskMetricsFile, []byte(diskCsvHeader), 0644) - if err != nil { - fmt.Printf("Error writing file: %s", err) - return - } + databaseDir = "./database" } - // go func() { - // if err := streamLogsToFile(); err != nil { - // log.Fatalf("Error listening to events: %v", err) - // } - // }() + MustCreateFolderIfNotExists(databaseDir) + + db.Init(databaseDir) + flag.StringVar(&token, "token", "", "Token to access the API. Default is empty, which means no token is required.") flag.IntVar(&refreshRateSeconds, "refresh", refreshRateSeconds, "Refresh rate in seconds. Default is 5 seconds") flag.IntVar(&metricsHistoryInDays, "metrics-history", metricsHistoryInDays, "Metrics history in days. Default is 30 days") flag.BoolVar(&startScheduler, "scheduler", false, "Start scheduler that collects metrics / data. Default is false.") flag.Parse() + if os.Getenv("PORT") != "" { + serverPort = os.Getenv("PORT") + } + if os.Getenv("TOKEN") != "" { tokenFromEnv := os.Getenv("TOKEN") if tokenFromEnv != "" { @@ -121,6 +93,7 @@ func main() { r.GET("/api/version", func(c *gin.Context) { c.String(200, version) }) + r.Use(gin.Recovery()) authorized := r.Group("/api") @@ -182,18 +155,22 @@ func main() { "container": json.RawMessage(metrics), }) }) + authorized.GET("/container/:containerId/metrics/history", func(c *gin.Context) { - from := c.Query("from") - to := c.Query("to") - usage, err := getHistoryContainerUsage(from, to, c.Param("containerId")) + from, to, err := ParseFromTo(c.Query("from"), c.Query("to")) if err != nil { c.JSON(500, gin.H{ - "error": err.Error(), + "error": "Invalid from or to", }) - return } - c.String(200, usage) + + c.Writer.WriteHeader(200) + c.Writer.WriteString(containerMetricsCsvHeader) + db.ReadRange("container-"+c.Param("containerId"), from, to, func(in ContainerMetrics) { + c.Writer.WriteString(fmt.Sprintf("%v,%v,%v,%v\n", in.Time, in.CPUUsagePercentage, in.MemoryUsed, in.MemoryUsagePercentage)) + }) }) + authorized.GET("/cpu", func(c *gin.Context) { usage, err := getCpuUsage(false) if err != nil { @@ -207,28 +184,27 @@ func main() { "cpu_usage": json.RawMessage(usage), }) }) + authorized.GET("/cpu/csv", func(c *gin.Context) { - usage, err := getCpuUsage(true) - if err != nil { - c.JSON(500, gin.H{ - "error": err.Error(), - }) - return - } - usage = cpuCsvHeader + usage - c.String(200, usage) + c.Writer.WriteHeader(200) + c.Writer.WriteString(cpuCsvHeader) + db.ReadRange("cpu", 0, int(time.Now().Unix()), func(in CpuUsage) { + c.Writer.WriteString(fmt.Sprintf("%s,%s\n", in.Time, in.Percent)) + }) }) authorized.GET("/cpu/history", func(c *gin.Context) { - from := c.Query("from") - to := c.Query("to") - usage, err := getHistoryCpuUsage(from, to) + from, to, err := ParseFromTo(c.Query("from"), c.Query("to")) if err != nil { c.JSON(500, gin.H{ - "error": err.Error(), + "error": "Invalid from or to", }) - return } - c.String(200, usage) + + c.Writer.WriteHeader(200) + c.Writer.WriteString(cpuCsvHeader) + db.ReadRange("cpu", from, to, func(in CpuUsage) { + c.Writer.WriteString(fmt.Sprintf("%s,%s\n", in.Time, in.Percent)) + }) }) authorized.GET("/memory", func(c *gin.Context) { usage, err := getMemUsage(false) @@ -244,28 +220,27 @@ func main() { }) }) authorized.GET("/memory/csv", func(c *gin.Context) { - usage, err := getMemUsage(true) - if err != nil { - c.JSON(500, gin.H{ - "error": err.Error(), - }) - return - } - usage = memoryCsvHeader + usage - c.String(200, usage) + c.Writer.WriteHeader(200) + c.Writer.WriteString(memoryCsvHeader) + db.ReadRange("memory", 0, int(time.Now().Unix()), func(in MemUsage) { + c.Writer.WriteString(fmt.Sprintf("%s,%d,%d,%.2f\n", in.Time, in.Used, in.Free, in.UsedPercent)) + }) }) authorized.GET("/memory/history", func(c *gin.Context) { - from := c.Query("from") - to := c.Query("to") - usage, err := getHistoryMemoryUsage(from, to) + from, to, err := ParseFromTo(c.Query("from"), c.Query("to")) if err != nil { c.JSON(500, gin.H{ - "error": err.Error(), + "error": "Invalid from or to", }) - return } - c.String(200, usage) + + c.Writer.WriteHeader(200) + c.Writer.WriteString(memoryCsvHeader) + db.ReadRange("memory", from, to, func(in MemUsage) { + c.Writer.WriteString(fmt.Sprintf("%s,%d,%d,%.2f\n", in.Time, in.Used, in.Free, in.UsedPercent)) + }) }) + authorized.GET("/disk", func(c *gin.Context) { usage, err := getDiskUsage(false) if err != nil { @@ -280,31 +255,42 @@ func main() { }) }) authorized.GET("/disk/csv", func(c *gin.Context) { - usage, err := getDiskUsage(true) - if err != nil { - c.JSON(500, gin.H{ - "error": err.Error(), - }) - return - } - usage = memoryCsvHeader + usage - c.String(200, usage) + c.Writer.WriteHeader(200) + c.Writer.WriteString(diskCsvHeader) + db.ReadRange("disk", 0, int(time.Now().Unix()), func(in []DiskUsage) { + for { + if len(in) == 0 { + break + } + c.Writer.WriteString(fmt.Sprintf("%s,%s,%s,%d,%d,%s\n", in[0].Time, in[0].Disk, in[0].MountPoint, in[0].Total, in[0].Free, in[0].Usage)) + in = in[1:] + } + }) + }) authorized.GET("/disk/history", func(c *gin.Context) { - from := c.Query("from") - to := c.Query("to") - usage, err := getHistoryDiskUsage(from, to) + from, to, err := ParseFromTo(c.Query("from"), c.Query("to")) if err != nil { c.JSON(500, gin.H{ - "error": err.Error(), + "error": "Invalid from or to", }) - return } - c.String(200, usage) + + c.Writer.WriteHeader(200) + c.Writer.WriteString(diskCsvHeader) + db.ReadRange("disk", from, to, func(in []DiskUsage) { + for { + if len(in) == 0 { + break + } + c.Writer.WriteString(fmt.Sprintf("%s,%s,%s,%d,%d,%s\n", in[0].Time, in[0].Disk, in[0].MountPoint, in[0].Total, in[0].Free, in[0].Usage)) + in = in[1:] + } + }) }) } fmt.Println("Starting API...") - r.Run("0.0.0.0:8888") + r.Run(serverPort) } diff --git a/cmd/sentinel/memory.go b/cmd/sentinel/memory.go index ef98281..b4e9442 100644 --- a/cmd/sentinel/memory.go +++ b/cmd/sentinel/memory.go @@ -3,9 +3,8 @@ package main import ( "encoding/json" "fmt" - "os" - "strconv" - "strings" + "log" + "sentinel/pkg/db" "time" "github.com/shirou/gopsutil/mem" @@ -22,6 +21,24 @@ type MemUsage struct { Free uint64 `json:"free"` } +func CollectMemoryUsage() { + memory, err := mem.VirtualMemory() + if err != nil { + log.Printf("%v", err) + return + } + queryTimeInUnixString := getUnixTimeInMilliUTC() + memUsage := MemUsage{ + Time: queryTimeInUnixString, + Total: memory.Total, + Available: memory.Available, + Used: memory.Used, + UsedPercent: memory.UsedPercent, + Free: memory.Free, + } + db.Write("memory", int(time.Now().Unix()), memUsage) +} + func getMemUsage(csv bool) (string, error) { memory, err := mem.VirtualMemory() if err != nil { @@ -48,63 +65,3 @@ func getMemUsage(csv bool) (string, error) { return string(jsonData), nil } - -func getHistoryMemoryUsage(from string, to string) (string, error) { - if from == "" && to == "" { - // return everything - file, err := os.ReadFile(memoryMetricsFile) - if err != nil { - fmt.Println("Failed to read file:", err) - return "", err - } - return string(file), nil - } - if from == "" { - from = "1970-01-01T00:00:00Z" - } - if to == "" { - to = time.Now().UTC().Format(time.RFC3339) - } - fromTime, err := time.Parse(time.RFC3339, from) - if err != nil { - fmt.Println("Failed to parse from time:", err) - return "", err - } - toTime, err := time.Parse(time.RFC3339, to) - if err != nil { - fmt.Println("Failed to parse to time:", err) - return "", err - } - - fromTimeUnix := fromTime.UnixMilli() - toTimeUnix := toTime.UnixMilli() - file, err := os.ReadFile(memoryMetricsFile) - if err != nil { - fmt.Println("Failed to read file:", err) - return "", err - } - lines := string(file) - var result string - lines = lines[strings.Index(lines, "\n")+1:] - for _, line := range strings.Split(lines, "\n") { - if line == "" { - continue - } - parts := strings.Split(line, ",") - if len(parts) != 4 { - fmt.Println("Invalid line:", line) - continue - } - time, err := strconv.ParseInt(parts[0], 10, 64) - if err != nil { - fmt.Println("Failed to parse time:", err) - continue - } - if time >= fromTimeUnix && time <= toTimeUnix { - result += line + "\n" - } - } - result = memoryCsvHeader + result - return result, nil - -} diff --git a/cmd/sentinel/scheduler.go b/cmd/sentinel/scheduler.go index 3b29e3a..3927683 100644 --- a/cmd/sentinel/scheduler.go +++ b/cmd/sentinel/scheduler.go @@ -3,9 +3,8 @@ package main import ( "context" "fmt" - "os" - "strconv" - "strings" + "log" + "sentinel/pkg/db" "time" "github.com/docker/docker/api/types" @@ -32,9 +31,12 @@ func scheduler() { ), gocron.NewTask( func() { - cpuMetrics() - memoryMetrics() + CollectCpuUsage() + + CollectMemoryUsage() + cleanupMetricsData() + containerMetrics() }, ), @@ -49,7 +51,7 @@ func scheduler() { ), gocron.NewTask( func() { - diskMetrics() + CollectDiskUsage() }, ), ) @@ -61,92 +63,20 @@ func scheduler() { disk.Start() } -// func cleanupLogsData() { -// If the files are too big, we can remove old logs - -// currentTime := time.Now() -// minutesAgo := currentTime.Add(-1440 * time.Minute) -// files, err := os.ReadDir(logsDir) -// if err != nil { -// fmt.Printf("Error reading directory: %s", err) -// return -// } -// for _, file := range files { -// lines, err := os.ReadFile(fmt.Sprintf("%s/%s", logsDir, file.Name())) -// if err != nil { -// fmt.Printf("Error reading file: %s", err) -// return -// } -// for _, line := range strings.Split(string(lines), "\n") { -// stringTime := strings.Split(line, " ")[0] -// if stringTime == "" { -// continue -// } -// timeRfc, err := time.Parse(time.RFC3339, stringTime) -// if err != nil { -// fmt.Printf("Error parsing time: %s", err) -// continue -// } -// timeInt := timeRfc.UnixNano() / int64(time.Millisecond) -// if time.UnixMilli(timeInt).Before(minutesAgo) { -// lines = []byte(strings.ReplaceAll(string(lines), line, "")) -// lines = []byte(strings.ReplaceAll(string(lines), "\n\n", "\n")) -// err := os.WriteFile(fmt.Sprintf("%s/%s", logsDir, file.Name()), lines, 0644) -// if err != nil { -// fmt.Printf("Error writing file: %s", err) -// continue -// } -// } -// } -// } -// } - func cleanupMetricsData() { currentTime := time.Now() - minutesAgo := currentTime.Add(time.Duration(-metricsHistoryInDays) * time.Hour * 24) - files, err := os.ReadDir(metricsDir) - if err != nil { - fmt.Printf("Error reading directory: %s", err) - return - } - for _, file := range files { - lines, err := os.ReadFile(fmt.Sprintf("%s/%s", metricsDir, file.Name())) - if err != nil { - fmt.Printf("Error reading file: %s", err) - return - } - for _, line := range strings.Split(string(lines), "\n") { - if strings.Contains(line, "time") { - continue - } - if line == "" { - continue - } - timeString := strings.Split(line, ",")[0] - timeInt, err := strconv.ParseInt(timeString, 10, 64) - if err != nil { - fmt.Printf("Error parsing time: %s", err) - return - } - if time.UnixMilli(timeInt).Before(minutesAgo) { - // fmt.Println("removing line") - // fmt.Println(line) - lines = []byte(strings.ReplaceAll(string(lines), line, "")) - lines = []byte(strings.ReplaceAll(string(lines), "\n\n", "\n")) - err := os.WriteFile(fmt.Sprintf("%s/%s", metricsDir, file.Name()), lines, 0644) - if err != nil { - fmt.Printf("Error writing file: %s", err) - return - } - } - } - } + minutesAgo := currentTime.Add(time.Duration(-metricsHistoryInDays) * time.Hour * 24).Unix() + + db.DeleteOlderThan("cpu", int(minutesAgo)) + db.DeleteOlderThan("memory", int(minutesAgo)) + db.DeleteOlderThan("disk", int(minutesAgo)) } func containerMetrics() { ctx := context.Background() apiClient, err := client.NewClientWithOpts() if err != nil { + log.Printf("%v", err) return } apiClient.NegotiateAPIVersion(ctx) @@ -159,6 +89,7 @@ func containerMetrics() { } for _, container := range containers { + if container.Image == "ghcr.io/coollabsio/coolify-helper:latest" { continue } @@ -169,124 +100,23 @@ func containerMetrics() { } }() - metrics, err := getOneContainerMetrics(container.ID, true) + metrics, err := getContainerMetrics(container.ID, true) if err != nil { fmt.Printf("Error getting container metrics: %s\n", err) return } - containerNameFromLabel := container.Labels["coolify.name"] - if containerNameFromLabel == "" { + containerNameFromLabel, ok := container.Labels["coolify.name"] + if !ok { containerNameFromLabel = container.Names[0][1:] } - containerName := "container-" + containerNameFromLabel - containerMetricsFile := fmt.Sprintf("%s/%s.csv", metricsDir, containerName) + _ = containerNameFromLabel - _, err = os.Stat(containerMetricsFile) - if err != nil && os.IsNotExist(err) { - err := os.WriteFile(containerMetricsFile, []byte(containerMetricsCsvHeader), 0644) - if err != nil { - fmt.Printf("Error writing file: %s\n", err) - return - } - } + containerName := "container-" + container.ID + // log.Printf("%v", containerName) + db.Write(containerName, int(time.Now().Unix()), metrics) - f, err := os.OpenFile(containerMetricsFile, os.O_APPEND|os.O_WRONLY, 0644) - if err != nil { - fmt.Printf("Error opening file: %s\n", err) - return - } - defer f.Close() - - _, err = f.WriteString(metrics) - if err != nil { - fmt.Printf("Error writing to file: %s\n", err) - return - } }(container) } } -func cpuMetrics() { - out, err := getCpuUsage(true) - if err != nil { - fmt.Printf("Error getting containers: %s", err) - return - } - _, err = os.Stat(cpuMetricsFile) - if err != nil { - err := os.WriteFile(cpuMetricsFile, []byte(cpuCsvHeader), 0644) - if err != nil { - fmt.Printf("Error writing file: %s", err) - return - } - } - - f, err := os.OpenFile(cpuMetricsFile, os.O_APPEND|os.O_WRONLY, 0644) - if err != nil { - fmt.Printf("Error opening file: %s", err) - return - } - defer f.Close() - _, err = f.WriteString(out) - if err != nil { - fmt.Printf("Error writing to file: %s", err) - return - } -} -func diskMetrics() { - out, err := getDiskUsage(true) - if err != nil { - fmt.Printf("Error getting filesystem usage: %s", err) - return - } - _, err = os.Stat(diskMetricsFile) - if err != nil { - err := os.WriteFile(diskMetricsFile, []byte(diskCsvHeader), 0644) - if err != nil { - fmt.Printf("Error writing file: %s", err) - return - } - } - - // open file in append mode and write out to it - f, err := os.OpenFile(diskMetricsFile, os.O_APPEND|os.O_WRONLY, 0644) - if err != nil { - fmt.Printf("Error opening file: %s", err) - return - } - defer f.Close() - _, err = f.WriteString(out) - if err != nil { - fmt.Printf("Error writing to file: %s", err) - return - } -} -func memoryMetrics() { - out, err := getMemUsage(true) - if err != nil { - fmt.Printf("Error getting memory usage: %s", err) - return - } - _, err = os.Stat(memoryMetricsFile) - if err != nil { - err := os.WriteFile(memoryMetricsFile, []byte(memoryCsvHeader), 0644) - if err != nil { - fmt.Printf("Error writing file: %s", err) - return - } - } - - // open file in append mode and write out to it - f, err := os.OpenFile(memoryMetricsFile, os.O_APPEND|os.O_WRONLY, 0644) - if err != nil { - fmt.Printf("Error opening file: %s", err) - return - } - defer f.Close() - _, err = f.WriteString(out) - if err != nil { - fmt.Printf("Error writing to file: %s", err) - return - } -} diff --git a/cmd/sentinel/shared.go b/cmd/sentinel/shared.go index 536d815..a45db05 100644 --- a/cmd/sentinel/shared.go +++ b/cmd/sentinel/shared.go @@ -2,6 +2,9 @@ package main import ( "fmt" + "log" + "os" + "strconv" "time" ) @@ -16,3 +19,30 @@ func getUnixTimeInMilliUTC() string { queryTimeInUnixString := fmt.Sprintf("%d", queryTimeInUnix) return queryTimeInUnixString } + +func ParseFromTo(tmpfrom, tmpto string) (int, int, error) { + + if tmpfrom == "" { + tmpfrom = "0" + } + if tmpto == "" { + tmpto = fmt.Sprintf("%d", time.Now().Unix()) + } + from, errFrom := strconv.Atoi(tmpfrom) + to, errTo := strconv.Atoi(tmpto) + if errFrom != nil || errTo != nil { + return 0, 0, fmt.Errorf("invalid from or to") + } + return from, to, nil +} + +func MustCreateFolderIfNotExists(folderPath string) error { + if _, err := os.Stat(folderPath); os.IsNotExist(err) { + err := os.Mkdir(folderPath, 0755) + if err != nil { + log.Fatalf("Error writing file: %s", err) + return err + } + } + return nil +} diff --git a/go.mod b/go.mod index 7b5204b..575e07c 100644 --- a/go.mod +++ b/go.mod @@ -52,6 +52,7 @@ require ( github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.12 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect + go.etcd.io/bbolt v1.3.11 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 // indirect go.opentelemetry.io/otel v1.26.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.26.0 // indirect diff --git a/go.sum b/go.sum index 8d44ddf..92ae1e4 100644 --- a/go.sum +++ b/go.sum @@ -140,6 +140,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0= +go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 h1:Xs2Ncz0gNihqu9iosIZ5SkBbWo5T8JhhLJFMQL1qmLI= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0/go.mod h1:vy+2G/6NvVMpwGX/NyLqcC41fxepnuKHk16E6IZUcJc= go.opentelemetry.io/otel v1.26.0 h1:LQwgL5s/1W7YiiRwxf03QGnWLb2HW4pLiAhaA5cZXBs= diff --git a/pkg/bin/bin.go b/pkg/bin/bin.go new file mode 100644 index 0000000..e7872b3 --- /dev/null +++ b/pkg/bin/bin.go @@ -0,0 +1,15 @@ +package bin + +import "encoding/binary" + +func IntToBytes(n int) []byte { + endian := binary.BigEndian + bytes := make([]byte, 8) + endian.PutUint64(bytes, uint64(n)) + return bytes +} + +func BytesToInt(bytes []byte) int { + endian := binary.BigEndian + return int(endian.Uint64(bytes)) +} diff --git a/pkg/db/db.go b/pkg/db/db.go new file mode 100644 index 0000000..b1df5f3 --- /dev/null +++ b/pkg/db/db.go @@ -0,0 +1,79 @@ +package db + +import ( + "encoding/json" + "path/filepath" + "sentinel/pkg/bin" + + "go.etcd.io/bbolt" +) + +var db *bbolt.DB + +func Init(path string) { + var err error + db, err = bbolt.Open(filepath.Join(path, "sentinel.db"), 0600, nil) + if err != nil { + panic(err) + } +} + +func Write(bucket string, key int, value any) error { + return db.Update(func(tx *bbolt.Tx) error { + b, err := tx.CreateBucketIfNotExists([]byte(bucket)) + if err != nil { + return err + } + jb, jerr := json.Marshal(value) + if jerr != nil { + return jerr + } + return b.Put(bin.IntToBytes(key), jb) + }) +} + +func ReadRange[T any](bucket string, from, to int, cb func(T)) error { + return db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket([]byte(bucket)) + if b == nil { + return nil + } + + c := b.Cursor() + for k, v := c.Seek(bin.IntToBytes(from)); k != nil && bin.BytesToInt(k) <= to; k, v = c.Next() { + var value T + if err := json.Unmarshal(v, &value); err != nil { + return err + } + cb(value) + } + return nil + }) +} + +func DeleteOlderThan(bucket string, timestamp int) error { + return db.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket([]byte(bucket)) + if b == nil { + return nil + } + + c := b.Cursor() + for k, _ := c.First(); k != nil; k, _ = c.Next() { + if bin.BytesToInt(k) < timestamp { + if err := c.Delete(); err != nil { + return err + } + } + } + return nil + }) +} + +func Close() { + db.Close() +} + +func GetDB() *bbolt.DB { + return db +}