Skip to content

Conversation

@KunWuLuan
Copy link
Contributor

@KunWuLuan KunWuLuan commented Dec 1, 2025

add the implementation of historyserver collector

Why are these changes needed?

Related issue number

#3966
#4274

Checks

  • I've made sure the tests are passing.
  • Testing Strategy
    • Unit tests
    • Manual tests
    • This PR is not tested :(

update go.work go.mod

Signed-off-by: KunWuLuan <[email protected]>
@Future-Outlier Future-Outlier self-assigned this Dec 1, 2025

// 分类事件
for _, event := range eventsToFlush {
hourKey := event.Timestamp.Truncate(time.Hour).Format("2006-01-02-15")
Copy link
Member

@Future-Outlier Future-Outlier Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be "2006-01-02-15" or "2006010215"?

Since this is different from the design doc
ray-project/enhancements#62
Attached_image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at KunWuLuan@0ed5bbb.

"github.com/sirupsen/logrus"
)

const runtimeClassConfigPath = "/var/collector-config/data"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it's not being used?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at KunWuLuan@c5ba659.

Comment on lines +215 to +221
for i := 0; i < 12; i++ {
rp, err := os.Readlink(session_latest_path)
if err != nil {
logrus.Errorf("read session_latest file error %v", err)
time.Sleep(time.Second * 5)
continue
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we are doing the retry logic here. Could we extract the maxAttempts into constant or something we can configure, and maybe use backoff here?

And curious why we set the max retry to 12 times here? I think it may be too many times for retry, can we use 3?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at KunWuLuan@fafae16.

Comment on lines +31 to +56
const (
RAY_SESSIONDIR_LOGDIR_NAME = "logs"
RAY_SESSIONDIR_METADIR_NAME = "meta"
)

const (
OssMetaFile_BasicInfo = "ack__basicinfo"

OssMetaFile_NodeSummaryKey = "restful__nodes_view_summary"
OssMetaFile_Node_Prefix = "restful__nodes_"
OssMetaFile_JOBTASK_DETAIL_Prefix = "restful__api__v0__tasks_detail_job_id_"
OssMetaFile_JOBTASK_SUMMARIZE_BY_FUNC_NAME_Prefix = "restful__api__v0__tasks_summarize_by_func_name_job_id_"
OssMetaFile_JOBTASK_SUMMARIZE_BY_LINEAGE_Prefix = "restful__api__v0__tasks_summarize_by_lineage_job_id_"
OssMetaFile_JOBDATASETS_Prefix = "restful__api__data__datasets_job_id_"
OssMetaFile_NodeLogs_Prefix = "restful__api__v0__logs_node_id_"
OssMetaFile_ClusterStatus = "restful__api__cluster_status"
OssMetaFile_LOGICAL_ACTORS = "restful__logical__actors"
OssMetaFile_ALLTASKS_DETAIL = "restful__api__v0__tasks_detail"
OssMetaFile_Events = "restful__events"
OssMetaFile_PlacementGroups = "restful__api__v0__placement_groups_detail"

OssMetaFile_ClusterSessionName = "static__api__cluster_session_name"

OssMetaFile_Jobs = "restful__api__jobs"
OssMetaFile_Applications = "restful__api__serve__applications"
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In kuberay we usually put constant in a separate file. For example ray-operator put the constant in ray-operator/controllers/ray/utils/constant.go
Could we move those into a constant.go file?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at KunWuLuan@cef9755.

I'd like to discuss naming convention with you if you're available. Thanks.

}

func GetSessionDir() (string, error) {
session_latest_path := "/tmp/ray/session_latest"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also move this to constant?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at KunWuLuan@cef9755.

Comment on lines 9 to 10
// StorageWriter is the interface for storage writer.
type StorageWritter interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can StorageWritter be StorageWriter?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at 99fd96f.

Comment on lines +22 to +23
PushInterval time.Duration
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is PushInterval be used?

Copy link
Contributor

@JiangJiaWei1103 JiangJiaWei1103 Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saw this line in NewCollector, but indeed never used:

Role string
RayClusterName string
RayClusterID string
LogBatching int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this be used?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saw this line in NewCollector, but indeed never used:

ARG TARGETARCH

FROM --platform=$BUILDPLATFORM golang:1.25.1 as builder
ENV GOPROXY=https://goproxy.cn,direct
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be the official proxy? https://proxy.golang.org/

ref: https://proxy.golang.org/

Copy link
Contributor

@JiangJiaWei1103 JiangJiaWei1103 Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, make sense!

Fixed at KunWuLuan@09697d5.


RUN apt-get update && apt-get upgrade -y && rm -rf /var/cache/apt/ && apt-get install -y ca-certificates

COPY --from=builder /historyserver/output/bin/historyserver /usr/local/bin/historyserver
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The historyserver is not ready in this pr. Should it be removed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it should

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at KunWuLuan@7d267d9.

Comment on lines 32 to 33
COPY --from=builder /historyserver/dashboard/v2.51.0/client/build /dashboard/v2.51.0/client/build
COPY --from=builder /historyserver/dashboard/homepage /dashboard/homepage No newline at end of file
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these COPY consider BUILD_RAYSERVER_DASHBOARD? Or, it might be failed on building.

rm -rf $(OUT_DIR)

.PHONY: build
build: buildcollector buildhistoryserver
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the buildhistoryserver be removed because it is not ready in this pr? And so does the historyserver related content in this file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to remove all snippets related to building history server binary to keep this PR focused.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at KunWuLuan@06175f4.

Comment on lines +32 to +40
flag.StringVar(&role, "role", "Worker", "")
flag.StringVar(&runtimeClassName, "runtime-class-name", "", "")
flag.StringVar(&rayClusterName, "ray-cluster-name", "", "")
flag.StringVar(&rayClusterId, "ray-cluster-id", "default", "")
flag.StringVar(&rayRootDir, "ray-root-dir", "", "")
flag.IntVar(&logBatching, "log-batching", 1000, "")
flag.IntVar(&eventsPort, "events-port", 8080, "")
flag.StringVar(&runtimeClassConfigPath, "runtime-class-config-path", "", "") //"/var/collector-config/data"
flag.DurationVar(&pushInterval, "push-interval", time.Minute, "")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to have some usage description.

eventServer := eventserver.NewEventServer(writter, rayRootDir, sessionDir, rayNodeId, rayClusterName, rayClusterId, sessionName)
eventServer.InitServer(eventsPort)

collector := runtime.NewCollector(&globalConfig, writter)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The collector seems collide with the package name.

Copy link
Contributor

@JiangJiaWei1103 JiangJiaWei1103 Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be renamed to logCollector to avoid confusion.

Fixed at KunWuLuan@2aa3eb1.

eventServer.InitServer(eventsPort)

collector := runtime.NewCollector(&globalConfig, writter)
_ = collector.Start(context.TODO().Done())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I didn't get it wrong, it seems no place to cancel the context. Do you mean graceful shutdown here?


// watchNodeIDFile 监听 /tmp/ray/raylet_node_id 文件内容变化
func (es *EventServer) watchNodeIDFile() {
nodeIDFilePath := "/tmp/ray/raylet_node_id"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the /tmp/ray/raylet_node_id become a constant that it could be reused?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at KunWuLuan@9b2ca52. We reuse the const definition.

Comment on lines +65 to +72
if err != nil {
// Directory doesn't exist, create it
logrus.Infof("Begin to create s3 dir %s ...", objectDir)
_, err = r.S3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(r.S3Bucket),
Key: aws.String(objectDir),
Body: bytes.NewReader([]byte("")),
})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it check the error if it is directory not existed before creating it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice suggestion! Fixed at KunWuLuan@7a28442, aiming to handle different err types explicitly, including "NoSuchKey" error you just mentioned. Thanks!

Comment on lines 244 to 262
// 检查sessionName是否发生变化
if es.currentSessionName != sessionNameStr {
logrus.Infof("Session name changed from %s to %s, flushing events", es.currentSessionName, sessionNameStr)
// 保存当前事件后再刷新
eventsToFlush := make([]Event, len(es.events))
copy(eventsToFlush, es.events)

// 清空事件列表
es.events = es.events[:0]

// 更新当前sessionName
es.currentSessionName = sessionNameStr

// 解锁后执行刷新操作
es.mutex.Unlock()

// 刷新之前的事件
es.flushEventsInternal(eventsToFlush)
return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we send a response before returning? Right now this branch exits without writing any HTTP response, which will cause clients to hang.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at KunWuLuan@0337030. Thanks!


RUN apt-get update && apt-get upgrade -y && rm -rf /var/cache/apt/ && apt-get install -y ca-certificates

COPY --from=builder /historyserver/output/bin/historyserver /usr/local/bin/historyserver
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it should

Comment on lines 23 to 66
}

func (c *config) complete(rcc *types.RayCollectorConfig, jd map[string]interface{}) {
c.RayCollectorConfig = *rcc
c.S3ID = os.Getenv("AWS_S3ID")
c.S3Secret = os.Getenv("AWS_S3SECRET")
c.S3Token = os.Getenv("AWS_S3TOKEN")
if len(jd) == 0 {
c.S3Bucket = os.Getenv("S3_BUCKET")
c.S3Endpoint = os.Getenv("S3_ENDPOINT")
c.S3Region = os.Getenv("S3_REGION")
if os.Getenv("S3FORCE_PATH_STYPE") != "" {
c.S3ForcePathStyle = aws.Bool(os.Getenv("S3FORCE_PATH_STYPE") == "true")
}
if os.Getenv("S3DISABLE_SSL") != "" {
c.DisableSSL = aws.Bool(os.Getenv("S3DISABLE_SSL") == "true")
}
} else {
if bucket, ok := jd["s3Bucket"]; ok {
c.S3Bucket = bucket.(string)
}
if endpoint, ok := jd["s3Endpoint"]; ok {
c.S3Endpoint = endpoint.(string)
}
if region, ok := jd["s3Region"]; ok {
c.S3Region = region.(string)
}
if forcePathStyle, ok := jd["s3ForcePathStyle"]; ok {
c.S3ForcePathStyle = aws.Bool(forcePathStyle.(string) == "true")
}
if s3disableSSL, ok := jd["S3DISABLE_SSL"]; ok {
c.DisableSSL = aws.Bool(s3disableSSL.(string) == "true")
}
}
}

func (c *config) completeHSConfig(rcc *types.RayHistoryServerConfig, jd map[string]interface{}) {
c.RayCollectorConfig = types.RayCollectorConfig{
RootDir: rcc.RootDir,
}
c.S3ID = os.Getenv("AWS_S3ID")
c.S3Secret = os.Getenv("AWS_S3SECRET")
c.S3Token = os.Getenv("AWS_S3TOKEN")
if len(jd) == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a TODO and talk about we should refactor complete and completeHSConfig to the same function?

Comment on lines +380 to +385
totalEvents := len(eventsToFlush)
logrus.Infof("Successfully flushed %d events to storage (%d node events, %d job events)",
totalEvents,
countEventsInMap(nodeEventsByHour),
countEventsInMap(jobEventsByHour))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for more correct information

Suggested change
totalEvents := len(eventsToFlush)
logrus.Infof("Successfully flushed %d events to storage (%d node events, %d job events)",
totalEvents,
countEventsInMap(nodeEventsByHour),
countEventsInMap(jobEventsByHour))
}
totalEvents := len(eventsToFlush)
errorCount := len(errChan)
if errorCount > 0 {
logrus.Warnf("Attempted to flush %d events to storage, but %d upload tasks failed (%d node events, %d job events)",
totalEvents, errorCount,
countEventsInMap(nodeEventsByHour),
countEventsInMap(jobEventsByHour))
} else {
logrus.Infof("Successfully flushed %d events to storage (%d node events, %d job events)",
totalEvents,
countEventsInMap(nodeEventsByHour),
countEventsInMap(jobEventsByHour))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at KunWuLuan@1fbfae0 to warn upon errors.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request adds the implementation of a history server collector for Ray clusters. It introduces a comprehensive system for collecting, storing, and managing logs and metadata from Ray clusters using multiple storage backends (S3, Aliyun OSS, and local testing).

Key changes:

  • Collector service to gather logs and events from Ray nodes
  • Support for multiple storage backends (S3, Aliyun OSS, local test)
  • Event server for collecting and persisting Ray events
  • History server infrastructure for viewing collected data

Reviewed changes

Copilot reviewed 30 out of 34 changed files in this pull request and generated 21 comments.

Show a summary per file
File Description
historyserver/go.mod Declares Go module with dependencies for the history server implementation
go.work Workspace configuration for multi-module Go project
historyserver/cmd/collector/main.go Entry point for the collector service that runs as a sidecar
historyserver/pkg/collector/* Core collector logic and storage interfaces
historyserver/pkg/utils/* Utility functions for OSS operations and Ray cluster management
historyserver/Makefile Build configuration for compiling binaries and Docker images
historyserver/Dockerfile Multi-stage build for containerized deployment
historyserver/README.md Documentation for building, configuring, and deploying the history server

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

OSSHistoryServerDir string
}

type RayMetaHanderConfig struct {
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type name 'RayMetaHanderConfig' contains a typo. 'Hander' should be 'Handler'.

Suggested change
type RayMetaHanderConfig struct {
type RayMetaHandlerConfig struct {

Copilot uses AI. Check for mistakes.
Comment on lines 34 to 35
if os.Getenv("S3FORCE_PATH_STYPE") != "" {
c.S3ForcePathStyle = aws.Bool(os.Getenv("S3FORCE_PATH_STYPE") == "true")
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The environment variable name 'S3FORCE_PATH_STYPE' contains a typo. 'STYPE' should be 'STYLE'. This appears in multiple places (lines 34, 35, 70, 71) and needs to be corrected for proper environment variable reading.

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@400Ping 400Ping Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Agree

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at 99fd96f.

Comment on lines 44 to 48
logFilePaths map[string]bool
filePathMu sync.Mutex

// Channel for signaling shutdown
ShutdownChan chan struct{}
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment uses Chinese characters. For consistency with the rest of the codebase and international collaboration, comments should be in English. Consider translating '用于跟踪当前sessionName' to 'Used to track the current sessionName' and '用于跟踪当前nodeId' to 'Used to track the current nodeId'.

Copilot uses AI. Check for mistakes.

clusterNameId = "a-s-sdf-sdfsdfsdfisfdf1A_B2safd-0sdf-sdf-000"
strs = strings.Split(clusterNameId, "_")
t.Logf("laster %s", strs[len(strs)-1])
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable name 'laster' is misspelled. It should be 'last' instead of 'laster'.

Copilot uses AI. Check for mistakes.
clusterNameId = "a-s-sdf-sdfsdfsdfisfdf1A_B2safd-0sdf-sdf-000"
strs = strings.Split(clusterNameId, "_")
t.Logf("laster %s", strs[len(strs)-1])
t.Logf("not laster all %s", strings.Join(strs[:len(strs)-1], "_"))
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable name 'laster' is misspelled. It should be 'last' instead of 'laster'.

Copilot uses AI. Check for mistakes.

func TestSlice(t *testing.T) {
strs := []string{"1", "2", "3", "4", "5"}
t.Logf("laster %s", strs[len(strs)-1])
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable name 'laster' is misspelled. It should be 'last' instead of 'laster'.

Copilot uses AI. Check for mistakes.
PushInterval time.Duration
}

// ValidateRayHanderConfig is
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name 'ValidateRayHanderConfig' contains a typo. 'Hander' should be 'Handler'.

Copilot uses AI. Check for mistakes.
Comment on lines 11 to 19
type WriterRegistry map[string]func(globalData *types.RayCollectorConfig, data map[string]interface{}) (storage.StorageWritter, error)

func GetWriterRegistry() WriterRegistry {
return writerRegistry
}

var writerRegistry = WriterRegistry{
"aliyunoss": ray.NewWritter,
"s3": s3.NewWritter,
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type name 'WriterRegistry' maps to functions returning 'StorageWritter', but 'Writter' is misspelled. It should be 'Writer' throughout the codebase for consistency and correctness.

Suggested change
type WriterRegistry map[string]func(globalData *types.RayCollectorConfig, data map[string]interface{}) (storage.StorageWritter, error)
func GetWriterRegistry() WriterRegistry {
return writerRegistry
}
var writerRegistry = WriterRegistry{
"aliyunoss": ray.NewWritter,
"s3": s3.NewWritter,
type WriterRegistry map[string]func(globalData *types.RayCollectorConfig, data map[string]interface{}) (storage.StorageWriter, error)
func GetWriterRegistry() WriterRegistry {
return writerRegistry
}
var writerRegistry = WriterRegistry{
"aliyunoss": ray.NewWriter,
"s3": s3.NewWriter,

Copilot uses AI. Check for mistakes.
)

// StorageWriter is the interface for storage writer.
type StorageWritter interface {
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interface name 'StorageWritter' is misspelled. It should be 'StorageWriter' with one 't'. This typo affects the entire codebase and should be corrected for consistency and professionalism.

Suggested change
type StorageWritter interface {
type StorageWriter interface {

Copilot uses AI. Check for mistakes.
Comment on lines 4 to 12
FROM --platform=$BUILDPLATFORM golang:1.25.1 as builder
ENV GOPROXY=https://goproxy.cn,direct
ARG BUILD_RAYSERVER_DASHBOARD

RUN if [ "$BUILD_RAYSERVER_DASHBOARD" = "yes" ] ; then \
curl -o install.sh https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.0/install.sh && chmod +x install.sh && ./install.sh && /bin/bash -c "source $HOME/.nvm/nvm.sh && nvm install 14 && nvm use 14" ;\
else \
echo "$BUILD_RAYSERVER_DASHBOARD not yes, no need install nvm"; \
fi
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Dockerfile downloads and executes a remote installation script via curl from https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.0/install.sh without any integrity verification, which introduces a supply-chain risk: if the remote script or delivery channel is compromised, arbitrary code can run in your build environment and be baked into produced images. An attacker who can tamper with that script (e.g., via GitHub compromise, DNS/CA issues, or MITM) could inject backdoors into the historyserver/collector binaries or exfiltrate build secrets. To mitigate this, avoid executing remote scripts directly; instead vendor the script or use a package-manager-installed nvm, and if you must download it, pin to an immutable reference and verify a known checksum or signature before execution.

Copilot uses AI. Check for mistakes.
Comment on lines 695 to 726
// Walk through the logs directory and process all files
err := filepath.WalkDir(logsDir, func(path string, info fs.DirEntry, err error) error {
if err != nil {
logrus.Errorf("Error walking logs path %s: %v", path, err)
return nil
}

// Skip directories
if info.IsDir() {
return nil
}

// Process log file
if err := r.processPrevLogFile(path, logsDir, sessionID, nodeID); err != nil {
logrus.Errorf("Failed to process prev-log file %s: %v", path, err)
}

return nil
})

if err != nil {
logrus.Errorf("Error walking logs directory %s: %v", logsDir, err)
return
}

// After successfully processing all files, remove the node directory
logrus.Infof("Finished processing all logs for session: %s, node: %s. Removing node directory.", sessionID, nodeID)
if err := os.RemoveAll(sessionNodeDir); err != nil {
logrus.Errorf("Failed to remove node directory %s: %v", sessionNodeDir, err)
} else {
logrus.Infof("Successfully removed node directory: %s", sessionNodeDir)
}
Copy link
Contributor

@400Ping 400Ping Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We delete sessionNodeDir unconditionally after the walk, even if any file upload failed. To avoid losing logs on a transient storage error, consider: bubble up failures from the walk (return err from the callback), track a per-dir success flag, and only RemoveAll when all files were written/moved. Alternatively, add a retry path (e.g., leave the dir in place and mark it for retry) instead of deleting on first pass.

Copy link
Contributor

@JiangJiaWei1103 JiangJiaWei1103 Dec 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at KunWuLuan@014a6ff.

For now, we use processingErrors to record file processing errors in the callback function. Then, those files can be processed again in the next retry. WDYT?

Comment on lines +80 to +93
select {
case <-sigChan:
logrus.Info("Received SIGTERM, processing all logs...")
// r.processAllLogs()
r.processSessionLatestLogs()
// r.processPrevLogsOnShutdown()
close(r.ShutdownChan)
case <-stop:
logrus.Info("Received stop signal, processing all logs...")
// r.processAllLogs()
r.processSessionLatestLogs()
// r.processPrevLogsOnShutdown()
close(r.ShutdownChan)
}
Copy link
Contributor

@400Ping 400Ping Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we coordinate shutdown so background uploads finish before exit? Suggest: add a context.WithCancel + WaitGroup, pass the ctx into WatchPrevLogsLoops/processPrevLogsDir, wg.Add/Done in each goroutine, then on stop call cancel() and wg.Wait() (optionally with a timeout) before closing ShutdownChan. That way uploads/moves complete (or time out) instead of being cut off mid-flight.

Comment on lines 34 to 35
if os.Getenv("S3FORCE_PATH_STYPE") != "" {
c.S3ForcePathStyle = aws.Bool(os.Getenv("S3FORCE_PATH_STYPE") == "true")
Copy link
Contributor

@400Ping 400Ping Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Agree

const runtimeClassConfigPath = "/var/collector-config/data"

func main() {
role := ""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Use var role string format, same for other lines

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at KunWuLuan@55b9a92. Thanks!


sessionDir, err := utils.GetSessionDir()
if err != nil {
panic("Failed to get session dir: " + err.Error())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of panic, log the error and call os.Exit(1)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at KunWuLuan@4e9b8bb.


rayNodeId, err := utils.GetRayNodeID()
if err != nil {
panic("Failed to get ray node id: " + err.Error())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at KunWuLuan@4e9b8bb.


var writerRegistry = WriterRegistry{
"aliyunoss": ray.NewWritter,
"s3": s3.NewWritter,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this any s3 compatible storage solution? If not please also add Google Cloud Storage (gcs)

}
}

func (c *config) completeHSConfig(rcc *types.RayHistoryServerConfig, jd map[string]interface{}) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's been a long time since I worked with AWS API / SDKs, but isn't there an AWS Go library that handles all the authentication checks. That seems like a better approach than trying to load envirnoment variables ourselves

},
}

data := map[string]map[string]string{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of hardcoding data here, have NewMockReader receive the data as a parameter. Same thing for clusters

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at KunWuLuan@8b2d34a.

Name string `json:"name"`
Namespace string `json:"namespace"`
SessionName string `json:"sessionName"`
CreateTime string `json:"createTime"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: CreationTime

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at KunWuLuan@a81027f.

Namespace string `json:"namespace"`
SessionName string `json:"sessionName"`
CreateTime string `json:"createTime"`
CreateTimeStamp int64 `json:"createTimeStamp"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: CreationTimestamp

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, why do we need both CreateTime and CreateTimestamp?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think CreationTime is more human-friendly and can be displayed directly in the UI. In contrast, CreationTimestamp is more machine-oriented and better suited for sorting and comparisons, for example:

func (a ClusterInfoList) Less(i, j int) bool { return a[i].CreateTimeStamp > a[j].CreateTimeStamp } // 降序排序

If this feels redundant, we can discuss which one to keep to improve maintainability. Thanks!

OssMetaFile_Node_Prefix = "restful__nodes_"
OssMetaFile_JOBTASK_DETAIL_Prefix = "restful__api__v0__tasks_detail_job_id_"
OssMetaFile_JOBTASK_SUMMARIZE_BY_FUNC_NAME_Prefix = "restful__api__v0__tasks_summarize_by_func_name_job_id_"
OssMetaFile_JOBTASK_SUMMARIZE_BY_LINEAGE_Prefix = "restful__api__v0__tasks_summarize_by_lineage_job_id_"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace Oss with Ray for all these environment variables

fi

WORKDIR /historyserver
COPY . .
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would try to avoid COPY . ., please only add directories that contain source code needed for the build. See example here: https://github.com/ray-project/kuberay/blob/master/ray-operator/Dockerfile#L12-L19

@@ -0,0 +1,519 @@
package eventserver
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add eventserver in a separate PR? I think @chiayi started this here: #4253

Is there a dependency to eventserver required for log collector?

Copy link
Contributor

@JiangJiaWei1103 JiangJiaWei1103 Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at KunWuLuan@1aea830. We've removed eventserver-related files to keep this PR focused on the log collector!

@Future-Outlier
Copy link
Member

Future-Outlier commented Dec 15, 2025

Hi, @andrewsykim

After discussion with @kevin85421, @rueian , and @KunWuLuan, we will address your comment in a follow-up PR and merge this ASAP. This will allow other contributors to iterate on the collector together, and we will not block other PRs such as the event processor by Aaron.

Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's merge this first so that

  1. other people can work on the improvement in pararrel, see here #4274
  2. other people can develop other component (like event server, history server's web server, and ray dashboard) more easily

@Future-Outlier
Copy link
Member

cc @JiangJiaWei1103 , can you open a PR to address this PR's comment?

@Future-Outlier
Copy link
Member

let's merge this first so that

  1. other people can work on the improvement in pararrel, see here [Epic][Feature] history server collctor #4274
  2. other people can develop other component (like event server, history server's web server, and ray dashboard) more easily

update: let's wait for @andrewsykim to take a look

@JiangJiaWei1103
Copy link
Contributor

cc @JiangJiaWei1103 , can you open a PR to address this PR's comment?

No problem, let's go!

@JiangJiaWei1103
Copy link
Contributor

Hi @andrewsykim and @Future-Outlier,

As AWS SDK for Go v1 has come to the end-of-support phase since July 31, 2025 [1]. Would it make sense to switch to actively maintained v2 [2] in this PR? Thanks.

[1] Announcing end-of-support for AWS SDK for Go (v1) effective July 31, 2025
[2] AWS SDK for Go v2

@@ -0,0 +1,295 @@
// Package ray is
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incomplete comment?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest just removing it, we don't usually need package level comments

runtimeClassConfigPath := "/var/collector-config/data"

flag.StringVar(&role, "role", "Worker", "")
flag.StringVar(&runtimeClassName, "runtime-class-name", "", "")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there was a previous comment that was resolved to name this to storage-provider instead of runtime-class, is that still the case?

@andrewsykim
Copy link
Member

andrewsykim commented Dec 22, 2025

As AWS SDK for Go v1 has come to the end-of-support phase since July 31, 2025 [1]. Would it make sense to switch to actively maintained v2 [2] in this PR? Thanks.

Using latest SDK makes sense to me unless there are technical issues with v2. Is the plan to update this PR to use the SDK or is that follow-up work?

fi

WORKDIR /historyserver
COPY . .
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid COPY on entire directories. Please only add packages containing source code required to compile binaries

RootDir: rcc.RootDir,
}
c.S3ID = os.Getenv("AWS_S3ID")
c.S3Secret = os.Getenv("AWS_S3SECRET")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are S3 secret and token environment variables here still required if we use official SDK? I imagine the SDK can handle that?

@andrewsykim
Copy link
Member

andrewsykim commented Dec 22, 2025

Spoke with @Future-Outlier and @rueian offline. Let's merge this PR as-is and address remaining comments in a follow-up PR.

Can we fix the merge conflict to unblock merge?

@JiangJiaWei1103
Copy link
Contributor

Spoke with @Future-Outlier and @rueian offline. Let's merge this PR as-is and address remaining comments in a follow-up PR.

Can we fix the merge conflict to unblock merge?

Thanks Andrew. We're actively addressing review comments at the moment [1] [2].

[1] KunWuLuan#1
[2] KunWuLuan#2

@andrewsykim andrewsykim merged commit 21a7348 into ray-project:master Dec 22, 2025
24 of 27 checks passed
JiangJiaWei1103 pushed a commit to JiangJiaWei1103/kuberay that referenced this pull request Dec 22, 2025
* add the implementation of historyserver collector

update go.work go.mod

Signed-off-by: KunWuLuan <[email protected]>

* update the func judging if the event is releated to the Nodes.

Signed-off-by: KunWuLuan <[email protected]>

* S3FORCE_PATH_STYPE -> S3FORCE_PATH_STYLE

Signed-off-by: Future-Outlier <[email protected]>

* S3DISABLE_SSL -> s3DisableSSL (camel case)

Signed-off-by: Future-Outlier <[email protected]>

* Add comments to explain WatchSessionLatestLoops

Signed-off-by: Future-Outlier <[email protected]>

* update

Signed-off-by: Future-Outlier <[email protected]>

* update

Signed-off-by: Future-Outlier <[email protected]>

* update

Signed-off-by: Future-Outlier <[email protected]>

---------

Signed-off-by: KunWuLuan <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Co-authored-by: Future-Outlier <[email protected]>
@Future-Outlier
Copy link
Member

cursor review

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Bugbot reviewed your changes and found no bugs!

@Future-Outlier
Copy link
Member

cursor review

c.S3ForcePathStyle = aws.Bool(forcePathStyle.(string) == "true")
}
if s3disableSSL, ok := jd["s3DisableSSLs3DisableSSL"]; ok {
c.DisableSSL = aws.Bool(s3disableSSL.(string) == "true")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated key name in JSON config parsing

High Severity

The JSON key "s3DisableSSLs3DisableSSL" in the complete function is malformed - it appears to be an accidental duplication of "s3DisableSSL". This typo prevents the DisableSSL configuration from being correctly read from JSON data, causing the SSL disable setting to be silently ignored when configured via JSON. The correct key would be "s3DisableSSL" to match the pattern used elsewhere in the file (line 87).

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants