Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,23 +85,6 @@ func (r *RayLogHandler) WaitForStop() <-chan struct{} {
return r.ShutdownChan
}

func (r *RayLogHandler) AddLogFile(absoluteLogPathName string) {
r.LogFiles <- absoluteLogPathName
}

func (r *RayLogHandler) PushLog(absoluteLogPathName string) error {
// Simply store the file path for later processing
absoluteLogPathName = strings.TrimSpace(absoluteLogPathName)
absoluteLogPathName = filepath.Clean(absoluteLogPathName)

r.filePathMu.Lock()
r.logFilePaths[absoluteLogPathName] = true
r.filePathMu.Unlock()

logrus.Infof("Registered log file for later processing: %s", absoluteLogPathName)
return nil
}

// processSessionLatestLogs processes logs in /tmp/ray/session_latest/logs directory
// on shutdown, using the real session ID and node ID
func (r *RayLogHandler) processSessionLatestLogs() {
Expand Down Expand Up @@ -231,68 +214,6 @@ func (r *RayLogHandler) processSessionLatestLogFile(absoluteLogPathName, session
return nil
}

func (r *RayLogHandler) WatchLogsLoops(watcher *fsnotify.Watcher, walkPath string) {
// Watch current directory
if err := watcher.Add(walkPath); err != nil {
logrus.Fatalf("Watcher rootpath %s error %v", r.LogDir, err)
}

err := filepath.WalkDir(walkPath, func(path string, info fs.DirEntry, err error) error {
if err != nil {
logrus.Errorf("Walk path %s error %v", walkPath, err)
return err // Return error
}
// Check if it's a file
if !info.IsDir() {
logrus.Infof("Walk find new file %s", path) // Log file path
go r.AddLogFile(path)
} else {
logrus.Infof("Walk find new dir %s", path) // Log directory path
if err := watcher.Add(path); err != nil {
logrus.Fatalf("Watcher add %s error %v", r.LogDir, err)
}
}
return nil
})
if err != nil {
logrus.Errorf("Walk path %s error %++v", walkPath, err)
return
}
logrus.Infof("Walk path %s success", walkPath)

for {
select {
case <-r.ShutdownChan:
logrus.Warnf("Receive shutdown signal, so return watchFileLoops")
return
case event, ok := <-watcher.Events:
if !ok {
logrus.Warnf("Receive watcher events not ok")
return
}
if event.Op == fsnotify.Create {
name := event.Name
info, _ := os.Stat(name)

// Check if file or directory
if !info.IsDir() {
logrus.Infof("Watch find: create a new file %s", name)
r.AddLogFile(name)
} else {
if err := watcher.Add(name); err != nil {
logrus.Fatalf("Watch add file %s error %v", name, err)
}
}
}
case _, ok := <-watcher.Errors:
if !ok {
logrus.Warnf("Watcher error, so return watchFileLoops")
return
}
}
}
}

func (r *RayLogHandler) WatchPrevLogsLoops() {
watchPath := r.prevLogsDir

Expand Down
Loading