diff --git a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go index e3a041a154f..e0df8eabc75 100644 --- a/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go +++ b/historyserver/pkg/collector/logcollector/runtime/logcollector/collector.go @@ -85,23 +85,6 @@ func (r *RayLogHandler) WaitForStop() <-chan struct{} { return r.ShutdownChan } -func (r *RayLogHandler) AddLogFile(absoluteLogPathName string) { - r.LogFiles <- absoluteLogPathName -} - -func (r *RayLogHandler) PushLog(absoluteLogPathName string) error { - // Simply store the file path for later processing - absoluteLogPathName = strings.TrimSpace(absoluteLogPathName) - absoluteLogPathName = filepath.Clean(absoluteLogPathName) - - r.filePathMu.Lock() - r.logFilePaths[absoluteLogPathName] = true - r.filePathMu.Unlock() - - logrus.Infof("Registered log file for later processing: %s", absoluteLogPathName) - return nil -} - // processSessionLatestLogs processes logs in /tmp/ray/session_latest/logs directory // on shutdown, using the real session ID and node ID func (r *RayLogHandler) processSessionLatestLogs() { @@ -231,68 +214,6 @@ func (r *RayLogHandler) processSessionLatestLogFile(absoluteLogPathName, session return nil } -func (r *RayLogHandler) WatchLogsLoops(watcher *fsnotify.Watcher, walkPath string) { - // Watch current directory - if err := watcher.Add(walkPath); err != nil { - logrus.Fatalf("Watcher rootpath %s error %v", r.LogDir, err) - } - - err := filepath.WalkDir(walkPath, func(path string, info fs.DirEntry, err error) error { - if err != nil { - logrus.Errorf("Walk path %s error %v", walkPath, err) - return err // Return error - } - // Check if it's a file - if !info.IsDir() { - logrus.Infof("Walk find new file %s", path) // Log file path - go r.AddLogFile(path) - } else { - logrus.Infof("Walk find new dir %s", path) // Log directory path - if err := watcher.Add(path); err != nil { - logrus.Fatalf("Watcher add %s error %v", r.LogDir, err) - } - } - return nil - }) - if err != nil { - logrus.Errorf("Walk path %s error %++v", walkPath, err) - return - } - logrus.Infof("Walk path %s success", walkPath) - - for { - select { - case <-r.ShutdownChan: - logrus.Warnf("Receive shutdown signal, so return watchFileLoops") - return - case event, ok := <-watcher.Events: - if !ok { - logrus.Warnf("Receive watcher events not ok") - return - } - if event.Op == fsnotify.Create { - name := event.Name - info, _ := os.Stat(name) - - // Check if file or directory - if !info.IsDir() { - logrus.Infof("Watch find: create a new file %s", name) - r.AddLogFile(name) - } else { - if err := watcher.Add(name); err != nil { - logrus.Fatalf("Watch add file %s error %v", name, err) - } - } - } - case _, ok := <-watcher.Errors: - if !ok { - logrus.Warnf("Watcher error, so return watchFileLoops") - return - } - } - } -} - func (r *RayLogHandler) WatchPrevLogsLoops() { watchPath := r.prevLogsDir