diff --git a/historyserver/docs/set_up_collector.md b/historyserver/docs/set_up_collector.md index d17b62c3065..a59a5588431 100644 --- a/historyserver/docs/set_up_collector.md +++ b/historyserver/docs/set_up_collector.md @@ -141,3 +141,26 @@ kubectl delete -f historyserver/config/raycluster.yaml You're supposed to see the uploaded logs and events in the minio UI as below: ![write_logs_and_events](https://github.com/ray-project/kuberay/blob/db7cb864061518ed4cfa7bf48cf05cfbfeb49f95/historyserver/docs/assets/write_logs_and_events.png) + +## Troubleshooting + +### "too many open files" error + +If you encounter `level=fatal msg="Create fsnotify NewWatcher error too many open files"` in the collector logs, +it is likely due to the inotify limits on the Kubernetes nodes. + +To fix this, increase the limits on the **host nodes** (not inside the container): + +```bash +# Apply changes immediately +sudo sysctl -w fs.inotify.max_user_instances=8192 +sudo sysctl -w fs.inotify.max_user_watches=524288 +``` + +To make these changes persistent across reboots, use the following lines: + +```text +echo "fs.inotify.max_user_instances=8192" | sudo tee -a /etc/sysctl.conf +echo "fs.inotify.max_user_watches=524288" | sudo tee -a /etc/sysctl.conf +sudo sysctl -p +``` diff --git a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go index 55a0e3d0b15..d845f2e94d9 100644 --- a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go +++ b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go @@ -22,23 +22,24 @@ import ( ) type RayLogHandler struct { - Writer storage.StorageWriter - LogFiles chan string - HttpClient *http.Client - ShutdownChan chan struct{} - logFilePaths map[string]bool - MetaDir string - RayClusterName string - LogDir string - RayNodeName string - RayClusterID string - RootDir string - SessionDir string - prevLogsDir string - PushInterval time.Duration - LogBatching int - filePathMu sync.Mutex - EnableMeta bool + Writer storage.StorageWriter + LogFiles chan string + HttpClient *http.Client + ShutdownChan chan struct{} + logFilePaths map[string]bool + MetaDir string + RayClusterName string + LogDir string + RayNodeName string + RayClusterID string + RootDir string + SessionDir string + prevLogsDir string + persistCompleteLogsDir string + PushInterval time.Duration + LogBatching int + filePathMu sync.Mutex + EnableMeta bool } func (r *RayLogHandler) Start(stop <-chan struct{}) error { @@ -49,6 +50,7 @@ func (r *RayLogHandler) Start(stop <-chan struct{}) error { func (r *RayLogHandler) Run(stop <-chan struct{}) error { // watchPath := r.LogDir r.prevLogsDir = "/tmp/ray/prev-logs" + r.persistCompleteLogsDir = "/tmp/ray/persist-complete-logs" // Initialize log file paths storage r.logFilePaths = make(map[string]bool) @@ -62,6 +64,11 @@ func (r *RayLogHandler) Run(stop <-chan struct{}) error { // Setup signal handling for SIGTERM sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGTERM) + + // WatchPrevLogsLoops performs an initial scan of the prev-logs directory on startup + // to process leftover log files in prev-logs/{sessionID}/{nodeID}/logs/ directories. + // After scanning, it watches for new directories and files. This ensures incomplete + // uploads from previous runs are resumed. go r.WatchPrevLogsLoops() if r.EnableMeta { go r.WatchSessionLatestLoops() // Watch session_latest symlink changes @@ -231,7 +238,7 @@ func (r *RayLogHandler) WatchPrevLogsLoops() { } // Also check and create persist-complete-logs directory - completeLogsDir := "/tmp/ray/persist-complete-logs" + completeLogsDir := r.persistCompleteLogsDir if _, err := os.Stat(completeLogsDir); os.IsNotExist(err) { logrus.Infof("persist-complete-logs directory does not exist, creating it: %s", completeLogsDir) if err := os.MkdirAll(completeLogsDir, 0o777); err != nil { @@ -492,6 +499,38 @@ func (r *RayLogHandler) processSessionPrevLogs(sessionDir string) { } } +// isFileAlreadyPersisted checks if a log file has already been uploaded to storage and moved to +// the persist-complete-logs directory. This prevents duplicate uploads during collector restarts. +// +// When a log file is successfully uploaded, it is moved from prev-logs to persist-complete-logs +// to mark it as processed. This function checks if the equivalent file path exists in the +// persist-complete-logs directory. +// +// Example: +// +// Given absoluteLogPath = "/tmp/ray/prev-logs/session_123/node_456/logs/raylet.out" +// This function checks if "/tmp/ray/persist-complete-logs/session_123/node_456/logs/raylet.out" exists +// - If exists: returns true (file was already uploaded, skip it) +// - If not exists: returns false (file needs to be uploaded) +func (r *RayLogHandler) isFileAlreadyPersisted(absoluteLogPath, sessionID, nodeID string) bool { + // Calculate the relative path within the logs directory + logsDir := filepath.Join(r.prevLogsDir, sessionID, nodeID, "logs") + relativeLogPath, err := filepath.Rel(logsDir, absoluteLogPath) + if err != nil { + logrus.Errorf("Failed to get relative path for %s: %v", absoluteLogPath, err) + return false + } + + // Construct the path in persist-complete-logs + persistedPath := filepath.Join(r.persistCompleteLogsDir, sessionID, nodeID, "logs", relativeLogPath) + + // Check if the file exists + if _, err := os.Stat(persistedPath); err == nil { + return true + } + return false +} + // processPrevLogsDir processes logs in a /tmp/ray/prev-logs/{sessionid}/{nodeid} directory func (r *RayLogHandler) processPrevLogsDir(sessionNodeDir string) { // Extract session ID and node ID from the path @@ -513,13 +552,6 @@ func (r *RayLogHandler) processPrevLogsDir(sessionNodeDir string) { return } - // Check if this directory has already been processed by checking in persist-complete-logs - completeDir := filepath.Join("/tmp/ray/persist-complete-logs", sessionID, nodeID, "logs") - if _, err := os.Stat(completeDir); err == nil { - logrus.Infof("Session %s node %s logs already processed, skipping", sessionID, nodeID) - return - } - logrus.Infof("Processing prev-logs for session: %s, node: %s", sessionID, nodeID) logsDir := filepath.Join(sessionNodeDir, "logs") @@ -550,6 +582,12 @@ func (r *RayLogHandler) processPrevLogsDir(sessionNodeDir string) { return nil } + // Check if this file has already been persisted + if r.isFileAlreadyPersisted(path, sessionID, nodeID) { + logrus.Debugf("File %s already persisted, skipping", path) + return nil + } + // Process log file if err := r.processPrevLogFile(path, logsDir, sessionID, nodeID); err != nil { logrus.Errorf("Failed to process prev-log file %s: %v", path, err) @@ -616,7 +654,7 @@ func (r *RayLogHandler) processPrevLogFile(absoluteLogPathName, localLogDir, ses logrus.Infof("Successfully wrote object %s, size: %d bytes", objectName, len(content)) // Move the processed file to persist-complete-logs directory to avoid re-uploading - completeBaseDir := filepath.Join("/tmp/ray/persist-complete-logs", sessionID, nodeID) + completeBaseDir := filepath.Join(r.persistCompleteLogsDir, sessionID, nodeID) completeDir := filepath.Join(completeBaseDir, "logs") if _, err := os.Stat(completeDir); os.IsNotExist(err) { diff --git a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector_test.go b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector_test.go new file mode 100644 index 00000000000..c3f76fadf41 --- /dev/null +++ b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector_test.go @@ -0,0 +1,197 @@ +package logcollector + +import ( + "io" + "os" + "path/filepath" + "sync" + "testing" + "time" + + . "github.com/onsi/gomega" +) + +// MockStorageWriter is a mock implementation of storage.StorageWriter for testing +type MockStorageWriter struct { + mu sync.Mutex + createdDirs []string + writtenFiles map[string]string // path -> content +} + +func NewMockStorageWriter() *MockStorageWriter { + return &MockStorageWriter{ + createdDirs: make([]string, 0), + writtenFiles: make(map[string]string), + } +} + +func (m *MockStorageWriter) CreateDirectory(path string) error { + m.mu.Lock() + defer m.mu.Unlock() + m.createdDirs = append(m.createdDirs, path) + return nil +} + +func (m *MockStorageWriter) WriteFile(file string, reader io.ReadSeeker) error { + content, err := io.ReadAll(reader) + if err != nil { + return err + } + m.mu.Lock() + defer m.mu.Unlock() + m.writtenFiles[file] = string(content) + return nil +} + +// setupRayTestEnvironment creates test directories under /tmp/ray for realistic testing +// This matches the actual paths used by the logcollector +func setupRayTestEnvironment(t *testing.T) (string, func()) { + baseDir := filepath.Join("/tmp", "ray-test-"+t.Name()) + + // Create base directory + if err := os.MkdirAll(baseDir, 0755); err != nil { + t.Fatalf("Failed to create base dir: %v", err) + } + + // Create prev-logs and persist-complete-logs directories + prevLogsDir := filepath.Join(baseDir, "prev-logs") + persistLogsDir := filepath.Join(baseDir, "persist-complete-logs") + + if err := os.MkdirAll(prevLogsDir, 0755); err != nil { + t.Fatalf("Failed to create prev-logs dir: %v", err) + } + if err := os.MkdirAll(persistLogsDir, 0755); err != nil { + t.Fatalf("Failed to create persist-complete-logs dir: %v", err) + } + + cleanup := func() { + os.RemoveAll(baseDir) + } + + return baseDir, cleanup +} + +// createTestLogFile creates a test log file with given content +func createTestLogFile(t *testing.T, path string, content string) { + dir := filepath.Dir(path) + if err := os.MkdirAll(dir, 0755); err != nil { + t.Fatalf("Failed to create directory %s: %v", dir, err) + } + + if err := os.WriteFile(path, []byte(content), 0644); err != nil { + t.Fatalf("Failed to write file %s: %v", path, err) + } +} + +// TestIsFileAlreadyPersisted tests the file-level persistence check +func TestIsFileAlreadyPersisted(t *testing.T) { + baseDir, cleanup := setupRayTestEnvironment(t) + defer cleanup() + + // Use the actual prev-logs directory structure that matches production + handler := &RayLogHandler{ + prevLogsDir: filepath.Join(baseDir, "prev-logs"), + persistCompleteLogsDir: filepath.Join(baseDir, "persist-complete-logs"), + } + + sessionID := "session-123" + nodeID := "node-456" + + // Create prev-logs structure + prevLogsPath := filepath.Join(handler.prevLogsDir, sessionID, nodeID, "logs", "worker.log") + createTestLogFile(t, prevLogsPath, "test log content") + + // Test case 1: File not yet persisted + if handler.isFileAlreadyPersisted(prevLogsPath, sessionID, nodeID) { + t.Error("Expected file to not be persisted yet") + } + + // Create the persisted file in persist-complete-logs + persistedPath := filepath.Join(baseDir, "persist-complete-logs", sessionID, nodeID, "logs", "worker.log") + createTestLogFile(t, persistedPath, "test log content") + + // Test case 2: File already persisted + if !handler.isFileAlreadyPersisted(prevLogsPath, sessionID, nodeID) { + t.Error("Expected file to be detected as persisted") + } +} + +// TestScanAndProcess tests the full lifecycle: partial upload, interruption, and resumption via scan. +// +// This test simulates a crash recovery scenario: +// 1. Two log files exist in prev-logs +// 2. Only file1 is processed (simulating partial success before crash) +// 3. File1 is restored to prev-logs (simulating incomplete rename during crash) +// 4. WatchPrevLogsLoops is started (simulating collector restart) +// 5. Verify that file1 is NOT re-uploaded (idempotency) and file2 is uploaded +// 6. Verify that the node directory is cleaned up after all files are processed +func TestScanAndProcess(t *testing.T) { + g := NewWithT(t) + + baseDir, cleanup := setupRayTestEnvironment(t) + defer cleanup() + + mockWriter := NewMockStorageWriter() + handler := &RayLogHandler{ + Writer: mockWriter, + RootDir: "/test-root", + prevLogsDir: filepath.Join(baseDir, "prev-logs"), + persistCompleteLogsDir: filepath.Join(baseDir, "persist-complete-logs"), + ShutdownChan: make(chan struct{}), + RayClusterName: "test-cluster", + RayClusterID: "cluster-123", + } + + sessionID := "session-lifecycle" + nodeID := "node-1" + logsDir := filepath.Join(handler.prevLogsDir, sessionID, nodeID, "logs") + + // Prepare two log files in prev-logs directory + f1 := filepath.Join(logsDir, "file1.log") + f2 := filepath.Join(logsDir, "file2.log") + createTestLogFile(t, f1, "content1") + createTestLogFile(t, f2, "content2") + + // --- Step 1: Process file1 only (simulating partial success before crash) --- + err := handler.processPrevLogFile(f1, logsDir, sessionID, nodeID) + if err != nil { + t.Fatalf("Failed to process file1: %v", err) + } + + // Verify file1 is uploaded to storage + if len(mockWriter.writtenFiles) != 1 { + t.Errorf("Expected 1 file in storage, got %d", len(mockWriter.writtenFiles)) + } + + // Manually restore file1 to prev-logs to simulate a crash right after upload + // but before the rename operation completed + createTestLogFile(t, f1, "content1") + + // --- Step 2: Start the startup scan in background (simulating collector restart) --- + go handler.WatchPrevLogsLoops() + + // --- Step 3: Use Eventually to wait for async processing --- + sessionNodeDir := filepath.Join(handler.prevLogsDir, sessionID, nodeID) + + // Wait until storage has exactly 2 files. + // file1 should NOT be re-uploaded because it already exists in persist-complete-logs. + // Only file2 should be newly uploaded. + g.Eventually(func() int { + mockWriter.mu.Lock() + defer mockWriter.mu.Unlock() + return len(mockWriter.writtenFiles) + }, 5*time.Second, 100*time.Millisecond).Should(Equal(2), + "Storage should have 2 unique files (file1 should NOT be re-uploaded due to idempotency check)") + + // Wait until the node directory in prev-logs is removed. + // After all files are processed and moved to persist-complete-logs, + // the node directory should be cleaned up. + g.Eventually(func() bool { + _, err := os.Stat(sessionNodeDir) + return os.IsNotExist(err) + }, 5*time.Second, 100*time.Millisecond).Should(BeTrue(), + "Node directory should be removed after all files are processed and moved to persist-complete-logs") + + // Signal the background goroutine to exit gracefully + close(handler.ShutdownChan) +} diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index 3fe16341405..0139fbd04bf 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -53,6 +53,10 @@ func TestCollector(t *testing.T) { name: "Simulate OOMKilled behavior: Single session single node logs and events should be uploaded to S3 after the ray-head container is restarted", testFunc: testCollectorSeparatesFilesBySession, }, + { + name: "Collector restart: should scan prev-logs and resume uploads left by a crash", + testFunc: testCollectorResumesUploadsOnRestart, + }, } for _, tt := range tests { @@ -104,7 +108,7 @@ func testCollectorUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) // Verify logs and node_events are successfully uploaded to S3. - verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID) + verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID, false) // Delete S3 bucket to ensure test isolation. deleteS3Bucket(test, g, s3Client) @@ -179,7 +183,135 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 } // Verify logs and node_events are successfully uploaded to S3. - verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID) + verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID, false) + + deleteS3Bucket(test, g, s3Client) +} + +// testCollectorResumesUploadsOnRestart verifies that the Collector scans and resumes uploads from +// the prev-logs directory on startup. +// +// The test case follows these steps: +// 1. Prepare test environment by applying a Ray cluster with the collector and ensuring an empty S3 bucket. +// 2. Inject leftover logs before killing the collector: +// - file1.log -> /tmp/ray/persist-complete-logs/{sessionID}/{nodeID}/logs/ (already uploaded) +// - file2.log -> /tmp/ray/prev-logs/{sessionID}/{nodeID}/logs/ (pending upload) +// Note: node_events are not injected or verified here; they are handled by the EventServer via a separate path, +// and prev-logs processing only covers the logs directory. +// +// 3. Kill the collector sidecar container to trigger a container restart. +// 4. Wait for the collector container to restart and become Ready. +// 5. Verify S3 uploads: recovered log objects exist under log/{clusterName}_{clusterID}/{sessionID}/logs/ and have content. +// 6. Verify local state: the node directory is present under persist-complete-logs and removed from prev-logs. +// 7. Clean up the S3 bucket to ensure test isolation. +func testCollectorResumesUploadsOnRestart(test Test, g *WithT, namespace *corev1.Namespace, s3Client *s3.S3) { + rayCluster := prepareTestEnv(test, g, namespace, s3Client) + + // Directory variables for easier maintenance + prevLogsBaseDir := "/tmp/ray/prev-logs" + persistCompleteBaseDir := "/tmp/ray/persist-complete-logs" + + // Use namespace name to ensure test isolation (avoid conflicts from previous test runs) + dummySessionID := fmt.Sprintf("test-recovery-session-%s", namespace.Name) + dummyNodeID := fmt.Sprintf("head-node-%s", namespace.Name) + clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, rayClusterID) + sessionPrefix := fmt.Sprintf("log/%s/%s/", clusterNameID, dummySessionID) + + // Inject "leftover" logs BEFORE killing collector. + // This ensures files exist when collector restarts and performs its initial scan. + headPod, err := GetHeadPod(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Injecting logs into %s before killing collector", prevLogsBaseDir) + sessionDir := filepath.Join(prevLogsBaseDir, dummySessionID, dummyNodeID) + persistDir := filepath.Join(persistCompleteBaseDir, dummySessionID, dummyNodeID) + injectCmd := fmt.Sprintf( + "mkdir -p %s/logs && "+ + "echo 'file1 content' > %s/logs/file1.log && "+ + "mkdir -p %s/logs && "+ + "echo 'file2 content' > %s/logs/file2.log", + persistDir, + persistDir, + sessionDir, + sessionDir, + ) + _, stderr := ExecPodCmd(test, headPod, "ray-head", []string{"sh", "-c", injectCmd}) + g.Expect(stderr.String()).To(BeEmpty()) + + // Kill the collector container to trigger a restart. + // When collector restarts, WatchPrevLogsLoops() will scan prev-logs and find the injected files. + LogWithTimestamp(test.T(), "Killing collector container to test startup scanning of prev-logs") + _, stderrKill := ExecPodCmd(test, headPod, "collector", []string{"kill", "1"}) + g.Expect(stderrKill.String()).To(BeEmpty()) + + // Wait for collector container to restart and become ready. + LogWithTimestamp(test.T(), "Waiting for collector container to restart and become ready") + g.Eventually(func(gg Gomega) { + updatedPod, err := GetHeadPod(test, rayCluster) + gg.Expect(err).NotTo(HaveOccurred()) + cs, err := getContainerStatusByName(updatedPod, "collector") + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(cs.RestartCount).To(BeNumerically(">", 0)) + gg.Expect(cs.Ready).To(BeTrue()) + }, TestTimeoutMedium).Should(Succeed()) + + // Verify S3 uploads using the existing verifyS3SessionDirs helper. + // Skip node_events verification since prev-logs processing only handles logs directory. + LogWithTimestamp(test.T(), "Verifying scanning logic: checking S3 for recovered files") + + // Verify that file2.log was actually uploaded to S3. + // file1.log should NOT be uploaded because it was already marked as "completed" in persist-complete-logs. + // file2.log should be uploaded because it was in prev-logs (pending upload). + LogWithTimestamp(test.T(), "Verifying file2.log was uploaded to S3 (idempotency check)") + g.Eventually(func(gg Gomega) { + // List all objects under the session logs prefix + logsPrefix := sessionPrefix + "logs/" + objects, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{ + Bucket: aws.String(s3BucketName), + Prefix: aws.String(logsPrefix), + }) + gg.Expect(err).NotTo(HaveOccurred()) + + // Collect all uploaded file keys + var uploadedKeys []string + for _, obj := range objects.Contents { + uploadedKeys = append(uploadedKeys, aws.StringValue(obj.Key)) + } + LogWithTimestamp(test.T(), "Found uploaded objects: %v", uploadedKeys) + + // Verify file2.log exists in S3 (it was in prev-logs, so it should be uploaded) + hasFile2 := false + for _, key := range uploadedKeys { + if strings.HasSuffix(key, "file2.log") { + hasFile2 = true + break + } + } + gg.Expect(hasFile2).To(BeTrue(), "file2.log should be uploaded to S3 because it was in prev-logs") + + // Note: file1.log was only placed in persist-complete-logs (local marker), + // it was never actually uploaded to S3 in this test scenario. + // The persist-complete-logs directory is just a local marker to prevent re-upload. + }, TestTimeoutMedium).Should(Succeed()) + + // Verify local state: the node directory should be moved from prev-logs to persist-complete-logs. + LogWithTimestamp(test.T(), "Verifying local state: node directory should be moved to %s", persistCompleteBaseDir) + g.Eventually(func(gg Gomega) { + currentHeadPod, err := GetHeadPod(test, rayCluster) + gg.Expect(err).NotTo(HaveOccurred()) + // Check that the node directory exists in persist-complete-logs + persistPath := filepath.Join(persistCompleteBaseDir, dummySessionID, dummyNodeID) + checkCmd := fmt.Sprintf("test -d %s && echo 'exists'", persistPath) + stdout, stderrCheck := ExecPodCmd(test, currentHeadPod, "ray-head", []string{"sh", "-c", checkCmd}) + gg.Expect(stderrCheck.String()).To(BeEmpty()) + gg.Expect(strings.TrimSpace(stdout.String())).To(Equal("exists"), "Node directory should be in persist-complete-logs") + + // Check that the node directory is gone from prev-logs + prevPath := filepath.Join(prevLogsBaseDir, dummySessionID, dummyNodeID) + checkGoneCmd := fmt.Sprintf("test ! -d %s && echo 'gone'", prevPath) + stdoutGone, stderrGone := ExecPodCmd(test, currentHeadPod, "ray-head", []string{"sh", "-c", checkGoneCmd}) + gg.Expect(stderrGone.String()).To(BeEmpty()) + gg.Expect(strings.TrimSpace(stdoutGone.String())).To(Equal("gone"), "Node directory should be cleaned from prev-logs") + }, TestTimeoutMedium).Should(Succeed()) deleteS3Bucket(test, g, s3Client) } @@ -374,8 +506,12 @@ func applyRayJobToCluster(test Test, g *WithT, namespace *corev1.Namespace, rayC // Additionally, it verifies that specific files have content: // - logs//raylet.out must exist and have content > 0 bytes // - node_events/_ must exist and have content > 0 bytes (suffix can be ignored for verification) -func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix string, nodeID string) { - dirs := []string{"logs", "node_events"} +// If skipNodeEvents is true, node_events directory verification will be skipped. +func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix string, nodeID string, skipNodeEvents bool) { + dirs := []string{"logs"} + if !skipNodeEvents { + dirs = append(dirs, "node_events") + } for _, dir := range dirs { dirPrefix := sessionPrefix + dir + "/"