-
Notifications
You must be signed in to change notification settings - Fork 683
background goroutine get job info #4160
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
background goroutine get job info #4160
Conversation
Signed-off-by: fscnick <[email protected]>
Signed-off-by: fscnick <[email protected]>
ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go
Outdated
Show resolved
Hide resolved
| if err := rayDashboardClient.StopJob(ctx, rayJobInstance.Status.JobId); err != nil { | ||
| logger.Error(err, "Failed to stop job for RayJob") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it okay to call StopJob to remove the cache placeholder before deleting the RayCluster because the status of retry calls deleteClusterResources?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably should not do this. Just let the cache client figure out how to deal with old entries by itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed at 9f87da6. The the update of JobInfo would be remove from the updating loop if it reach the terminal status. Eventually, the cached JobInfo will be remove from cache as it elapses the expiration time. If it is not acceptable, kindly let me know.
ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go
Outdated
Show resolved
Hide resolved
| ) | ||
|
|
||
| type RayDashboardClientInterface interface { | ||
| InitClient(client *http.Client, dashboardURL string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this method from the interface because the different implementation might have different input arguments.
| keys := cacheStorage.Keys() | ||
| expiredThreshold := time.Now().Add(-cacheExpiry) | ||
| for _, key := range keys { | ||
| if cached, ok := cacheStorage.Peek(key); ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Peek wouldn't update the recent-ness of cache.
Signed-off-by: fscnick <[email protected]>
… for cache Signed-off-by: fscnick <[email protected]>
Signed-off-by: fscnick <[email protected]>
…oroutine-get-job-info
Signed-off-by: fscnick <[email protected]>
Signed-off-by: fscnick <[email protected]>
Signed-off-by: fscnick <[email protected]>
…oroutine-get-job-info
Future-Outlier
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not an easy one, and I am very worried that this could cause issues—for example, goroutine leaks.
cc @seanlaii @win5923 @JiangJiaWei1103 @machichima @CheyuWu to review this
|
|
||
| type ClientProvider interface { | ||
| GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) | ||
| GetDashboardClient(ctx context.Context, mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the context to pass the logger and for graceful shutdown.
ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go
Outdated
Show resolved
Hide resolved
JiangJiaWei1103
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This argument is used in many places. Rather than marking each one inline, I’d suggest applying the same change consistently across all usages. The marked examples show the intended direction. Thanks!
| g.Expect(err).ToNot(HaveOccurred()) | ||
| url := fmt.Sprintf("127.0.0.1:%d", localPort) | ||
| rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false) | ||
| rayDashboardClientFunc := utils.GetRayDashboardClientFunc(t.Ctx(), nil, false, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious. Do we need to test the following case in which non-blocking query is enabled?
rayDashboardClientFunc := utils.GetRayDashboardClientFunc(t.Ctx(), nil, false, true)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test might be in the follow-up PR.
ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go
Show resolved
Hide resolved
ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go
Outdated
Show resolved
Hide resolved
ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go
Outdated
Show resolved
Hide resolved
ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go
Outdated
Show resolved
Hide resolved
| logger := ctrl.LoggerFrom(ctx).WithName("RayDashboardCacheClient") | ||
|
|
||
| cacheLock.RLock() | ||
| if cached, ok := cacheStorage.Get(jobId); ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be reasonable to add error handling for cache operations, including Get(), PeekOrAdd(), and Add(), to avoid panic if cache initialization failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's needed, as we always init the cache before calling this. If there's error, we should indeed panic as all other things will not function well
ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go
Outdated
Show resolved
Hide resolved
ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go
Outdated
Show resolved
Hide resolved
|
|
||
| // expiry cache cleanup | ||
| go func() { | ||
| ticker := time.NewTicker(queryInterval * 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious, why * 10 rather than use queryInterval only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems reasonable to perform cache cleanup less frequently than the main query tasks. But, we still need to discuss how to configure these parameters for different use cases.
ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go
Outdated
Show resolved
Hide resolved
Signed-off-by: fscnick <[email protected]>
Signed-off-by: fscnick <[email protected]>
ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go
Show resolved
Hide resolved
…rd client Signed-off-by: fscnick <[email protected]>
Signed-off-by: fscnick <[email protected]>
ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go
Outdated
Show resolved
Hide resolved
Signed-off-by: fscnick <[email protected]>
ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go
Show resolved
Hide resolved
| return | ||
| case w.taskQueue.In <- task: | ||
| } | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Requeue timer may panic sending to closed channel
Medium Severity
The time.AfterFunc callback at line 79 schedules a task requeue after a delay. When this callback executes, if the context has been cancelled and chanx has closed the In channel, the select statement faces a race condition. Both ctx.Done() (closed) and the send to taskQueue.In (closed channel) are ready. If Go's runtime selects the send case, sending to a closed channel causes a panic. This is a distinct shutdown race from the Out channel issue, occurring when delayed requeue attempts happen after cleanup begins.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
chanx doesn't close the In channel and the task executing a read seems disposable.
| logger.Info("worker exiting from a closed channel", "workerID", workerID) | ||
| return | ||
| } | ||
| shouldRequeue := task(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inverted channel receive condition breaks worker pool
High Severity
The channel receive condition is inverted. In Go, when receiving from a channel with value, ok := <-channel, ok is true when a value is successfully received and false when the channel is closed. The current code checks if ok and returns, meaning workers exit immediately after receiving a valid task. Additionally, when ok is false (channel closed), the code continues and calls task(ctx) where task is nil, which would cause a panic. The condition needs to be if !ok to correctly handle channel closure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed at ae8bf77
| } | ||
| dashboardCachedClient := &dashboardclient.RayDashboardCacheClient{} | ||
| dashboardCachedClient.InitClient(ctx, namespacedName, dashboardClient) | ||
| return dashboardCachedClient, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing cluster identifier in cache key causes cross-cluster collisions
Medium Severity
When rayCluster is nil (as consistently happens in the apiserver), the namespacedName remains empty, causing all cache keys to share the same prefix. The apiserver calls GetJobInfo with nil rayCluster for different clusters. If two clusters have jobs with the same submission ID, the cache returns job info from the wrong cluster. The cache key formula namespacedName.String() + "/" + jobId produces identical keys like //job-123 for different clusters, leading to incorrect data being returned when the AsyncJobInfoQuery feature is enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed at 5c7a5bb
Signed-off-by: fscnick <[email protected]>
Signed-off-by: fscnick <[email protected]>
| case w.taskQueue.In <- task: | ||
| return nil | ||
| default: | ||
| return ErrTaskQueueTemporarilyUnavailable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this happen with the UnboundedChan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might happen at the beginning. #4160 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn’t we wait a bit for its internal buffer to be enlarged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed at 7502bc8
| ) | ||
|
|
||
| func (w *workerPool) start(ctx context.Context, numWorkers int, requeueDelay time.Duration) { | ||
| logger := ctrl.LoggerFrom(ctx).WithName("RayDashboardCacheClient").WithName("WorkerPool") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| logger := ctrl.LoggerFrom(ctx).WithName("RayDashboardCacheClient").WithName("WorkerPool") | |
| logger := ctrl.LoggerFrom(ctx).WithName("WorkerPool") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The worker pool may be shared beyond RayDashboardCacheClients in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed at 8964c93
Signed-off-by: fscnick <[email protected]>
| namespacedName := types.NamespacedName{ | ||
| Name: CheckName(rayCluster.Name), | ||
| Namespace: rayCluster.Namespace, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cache key uses truncated name causing potential collisions
High Severity
The cache key for job info is created using CheckName(rayCluster.Name) instead of the raw cluster name. CheckName truncates names longer than 50 characters by removing characters from the beginning, and replaces the first character with 'r' if it's a digit or punctuation. This means different RayCluster names could produce identical cache keys - for example, two clusters with long names that differ only in their first 6+ characters would collide after truncation. This could cause one cluster's job info to be incorrectly returned for a different cluster, leading to wrong status reporting and potentially incorrect job state management.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed at 90d2c30
| default: | ||
| return ErrTaskQueueTemporarilyUnavailable | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-blocking send to unbuffered channel often fails
Medium Severity
The AddTask function uses a non-blocking select with a default case to send tasks. However, the task queue is initialized with chanx.NewUnboundedChanSize(ctx, 0, 0, initBufferSize) where the first parameter (0) makes the In channel unbuffered. A non-blocking send to an unbuffered channel only succeeds if a receiver is actively waiting at that exact moment. Since the chanx internal goroutine may not always be ready to receive, the send will frequently fall through to the default case and return ErrTaskQueueTemporarilyUnavailable, causing unnecessary task drops and repeated retries even when the system is under low load.
Additional Locations (1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed at 7502bc8
Signed-off-by: fscnick <[email protected]>
Signed-off-by: fscnick <[email protected]>
Signed-off-by: fscnick <[email protected]>
| } | ||
| dashboardCachedClient := &dashboardclient.RayDashboardCacheClient{} | ||
| dashboardCachedClient.InitClient(ctx, namespacedName, dashboardClient) | ||
| return dashboardCachedClient, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cache key mismatch when RayCluster deleted before RayJob
Low Severity
When deleting a RayJob after its RayCluster is already deleted, the deletion path creates a RayDashboardCacheClient with an empty namespacedName. The condition rayCluster != nil evaluates to true for an empty struct pointer (created via &rayv1.RayCluster{}), so the cache client is initialized with empty Name and Namespace fields. When StopJob is called, it attempts to remove a cache key like "/jobId" instead of the original "namespace/clustername/jobId". This causes the original cache entry to remain stale until it expires (10 minutes) rather than being properly cleaned up.
Additional Locations (1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed at 339adf3
Signed-off-by: fscnick <[email protected]>
background goroutine get job info (ray-project#4160)
Why are these changes needed?
Currently this pr is a draft for demonstrating the design. If it is okay, we could continue to polish it.
The operator query the job info with block operation. It might impact the efficiency of the reconciliation.
In this PR, it introduces the background goroutine to fetch the
JobInfoand cache it.implementation design:
When the dashboard client is initializing, it will also initialize the singleton of worker pool and cache storage along with a cache cleanup goroutine.
When
GetJobInfois called, it returns the cache if hit. Or, it put a placeholder and add a task to the background goroutine to update the JobInfo periodically.The placeholder will be remove via calling
StopJob. it might happen on retrying or deleting the RayJob.Additionally, this pr takes the feedback in #4043 into account.
Related issue number
Closes #4087
Checks