Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
1bae584
[RayJob] background job info poc
fscnick Oct 28, 2025
293937b
[RayJob] encapsulate the worker pool
fscnick Oct 29, 2025
b4326bc
[RayJob] replace concurrency map with lru cache
fscnick Oct 29, 2025
fef0c77
[RayJob] remove cache on stop and config flag
fscnick Oct 30, 2025
13070f0
[RayJob] remove delete cache from deleteClusterResources and add lock…
fscnick Dec 2, 2025
3d07403
[Helm] add argument for useBackgroundGoroutine
fscnick Dec 2, 2025
5db324b
[RayJob] remove unused function and background goroutine observability
fscnick Dec 9, 2025
026e9f0
[RayJob] rename useBackgroundGoroutine to asyncJobInfoQuery
fscnick Dec 14, 2025
645aaed
[RayJob] make cache immutable to avoid data race
fscnick Dec 19, 2025
bcb2a38
[RayJob] remove unused function
fscnick Dec 20, 2025
6dc8cf6
[RayJob] If error on fetching job info, it removes from loop
fscnick Dec 22, 2025
b0b2753
[RayJob] task queue is extendable
fscnick Dec 22, 2025
db5aa09
[RayJob] change slice to ring buffer
fscnick Dec 23, 2025
98a17d1
[RayJob] async job info query use feature gate instead
fscnick Dec 26, 2025
38a8602
[RayJob] add test for async job info query
fscnick Dec 29, 2025
adc8003
[Test][RayJob] fix e2e test and add unit test
fscnick Jan 12, 2026
31eea56
[RayJob] remove redundent code in async job query
fscnick Jan 12, 2026
8231132
Merge remote-tracking branch 'upstream/master' into feat/background-g…
fscnick Jan 13, 2026
82de99a
[Test][RayJob] fix cache key name in test
fscnick Jan 13, 2026
18d6c84
Merge remote-tracking branch 'upstream/master' into feat/background-g…
fscnick Jan 13, 2026
b64d60c
[Test] e2e RayJob test with AsyncJobInfoQuery and without AsyncJobInf…
fscnick Jan 15, 2026
f2f717f
[Test] rename to with AsyncJobInfoQuery for testing
fscnick Jan 15, 2026
720dee5
Merge remote-tracking branch 'upstream/master' into feat/background-g…
fscnick Jan 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .buildkite/test-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,27 @@
- KUBERAY_TEST_TIMEOUT_SHORT=1m KUBERAY_TEST_TIMEOUT_MEDIUM=5m KUBERAY_TEST_TIMEOUT_LONG=10m go test -timeout 40m -v ./test/e2erayjob 2>&1 | awk -f ../.buildkite/format.awk | tee $$KUBERAY_TEST_OUTPUT_DIR/gotest.log || (kubectl logs --tail -1 -l app.kubernetes.io/name=kuberay | tee $$KUBERAY_TEST_OUTPUT_DIR/kuberay-operator.log && cd $$KUBERAY_TEST_OUTPUT_DIR && find . -name "*.log" | tar -cf /artifact-mount/e2e-log.tar -T - && exit 1)
- echo "--- END:RayJob E2E (nightly operator) tests finished"

- label: 'Test RayJob E2E with AsyncJobInfoQuery (nightly operator)'
instance_size: large
image: golang:1.25-bookworm
commands:
- source .buildkite/setup-env.sh
- kind create cluster --wait 900s --config ./ci/kind-config-buildkite.yml
- kubectl config set clusters.kind-kind.server https://docker:6443
# Build nightly KubeRay operator image
- pushd ray-operator
- IMG=kuberay/operator:nightly make docker-image && kind load docker-image kuberay/operator:nightly && echo "Deploying operator with test overrides with AsyncJobInfoQuery (feature gates via test-overrides-with-async-job-info-query overlay )"
- IMG=kuberay/operator:nightly make deploy-with-override-with-async-job-info-query
- kubectl wait --timeout=90s --for=condition=Available=true deployment kuberay-operator
# Run e2e tests and print KubeRay operator logs if tests fail
- echo "--- START:Running e2e (nightly operator) RayJob tests"
- if [ -n "$${KUBERAY_TEST_RAY_IMAGE}" ]; then echo "Using Ray Image $${KUBERAY_TEST_RAY_IMAGE}"; fi
- set -o pipefail
- mkdir -p "$(pwd)/tmp" && export KUBERAY_TEST_OUTPUT_DIR=$(pwd)/tmp
- echo "KUBERAY_TEST_OUTPUT_DIR=$$KUBERAY_TEST_OUTPUT_DIR"
- KUBERAY_TEST_TIMEOUT_SHORT=1m KUBERAY_TEST_TIMEOUT_MEDIUM=5m KUBERAY_TEST_TIMEOUT_LONG=10m go test -timeout 40m -v ./test/e2erayjob 2>&1 | awk -f ../.buildkite/format.awk | tee $$KUBERAY_TEST_OUTPUT_DIR/gotest.log || (kubectl logs --tail -1 -l app.kubernetes.io/name=kuberay | tee $$KUBERAY_TEST_OUTPUT_DIR/kuberay-operator.log && cd $$KUBERAY_TEST_OUTPUT_DIR && find . -name "*.log" | tar -cf /artifact-mount/e2e-log.tar -T - && exit 1)
- echo "--- END:RayJob E2E (nightly operator) tests finished"

- label: 'Test E2E rayservice (nightly operator)'
instance_size: large
image: golang:1.25-bookworm
Expand Down
2 changes: 2 additions & 0 deletions .buildkite/values-kuberay-operator-override.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ featureGates:
enabled: true
- name: RayMultiHostIndexing
enabled: true
- name: AsyncJobInfoQuery
enabled: true
2 changes: 2 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions ray-operator/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ deploy-with-override: manifests kustomize ## Deploy controller with test-only fe
cd config/default && $(KUSTOMIZE) edit set image kuberay/operator=${IMG}
$(KUSTOMIZE) build config/overlays/test-overrides | kubectl apply --server-side=true -f -

deploy-with-override-with-async-job-info-query: manifests kustomize ## Deploy controller with test-only feature gate overrides with async job info query (does NOT affect default chart).
cd config/default && $(KUSTOMIZE) edit set image kuberay/operator=${IMG}
$(KUSTOMIZE) build config/overlays/test-overrides-with-async-job-info-query | kubectl apply --server-side=true -f -

undeploy: ## Undeploy controller from the K8s cluster specified in ~/.kube/config.
$(KUSTOMIZE) build config/default | kubectl delete -f -

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Strategic merge patch for kuberay-operator Deployment (test / CI only).
apiVersion: apps/v1
kind: Deployment
metadata:
name: kuberay-operator
spec:
template:
spec:
containers:
- name: kuberay-operator
args:
- --feature-gates=RayClusterStatusConditions=true,RayJobDeletionPolicy=true,RayMultiHostIndexing=true,AsyncJobInfoQuery=true
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
## ============================================================================
## Kustomize overlay: test-overrides (CI / e2e only)
## ----------------------------------------------------------------------------
## Purpose: Enable alpha / experimental feature gates (currently RayJobDeletionPolicy)
## for end-to-end testing without modifying base manifests or Helm defaults.
## ============================================================================
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization

resources:
- ../../default

patches:
- path: deployment-override.yaml
target:
kind: Deployment
name: kuberay-operator
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package dashboardclient

import (
"errors"
"testing"
"testing/synctest"
"time"

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"k8s.io/apimachinery/pkg/types"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient/mocks"
utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types"
)

func TestAsyncJobInfoQuery(t *testing.T) {
var mockClient *mocks.MockRayDashboardClientInterface

ctrl := gomock.NewController(t)
mockClient = mocks.NewMockRayDashboardClientInterface(ctrl)

synctest.Test(t, func(t *testing.T) {
ctx := logr.NewContext(t.Context(), logr.Discard())

clusterName := types.NamespacedName{
Namespace: "test-namespace",
Name: "raycluster-async-job-info-query",
}
asyncJobInfoQueryClient := RayDashboardCacheClient{}
asyncJobInfoQueryClient.InitClient(ctx, clusterName, mockClient)
synctest.Wait()

jobId := "test-job-id"

// earlier set up the mock expectation for the second call to avoid flaky test.
mockJobInfo := &utiltypes.RayJobInfo{
JobId: jobId,
}
mockClient.EXPECT().GetJobInfo(ctx, jobId).Return(mockJobInfo, nil)

// First call, the job info is not in cache, so it should return ErrAgain
jobInfo, err := asyncJobInfoQueryClient.GetJobInfo(ctx, jobId)
assert.Nil(t, jobInfo)
assert.Equal(t, ErrAgain, err)

synctest.Wait()

// Second call, after GetJobInfo has called in background , the job info should be in cache now.
jobInfo, err = asyncJobInfoQueryClient.GetJobInfo(ctx, jobId)
require.NoError(t, err)
assert.Equal(t, mockJobInfo, jobInfo)

expectedError := errors.New("test error")
mockClient.EXPECT().GetJobInfo(ctx, jobId).Return(nil, expectedError)

// Wait for longer than queryInterval to ensure the task has been re-queued and executed.
time.Sleep(queryInterval + time.Millisecond)
synctest.Wait()

// Third call, after GetJobInfo has called in background and returned error, the error should be in cache now.
jobInfo, err = asyncJobInfoQueryClient.GetJobInfo(ctx, jobId)
assert.Nil(t, jobInfo)
assert.Equal(t, expectedError, err)

// After error has been consumed previously, the job info is not in cache so it should return ErrAgain.
jobInfo, err = asyncJobInfoQueryClient.GetJobInfo(ctx, jobId)
assert.Nil(t, jobInfo)
assert.Equal(t, ErrAgain, err)

mockJobInfo = &utiltypes.RayJobInfo{
JobId: jobId,
JobStatus: rayv1.JobStatusSucceeded,
}
mockClient.EXPECT().GetJobInfo(ctx, jobId).Return(mockJobInfo, nil)

// Wait for longer than queryInterval to ensure the task has been re-queued and executed.
time.Sleep(queryInterval + time.Millisecond)
synctest.Wait()

// Fourth call, the job has reached the terminal status.
jobInfo, err = asyncJobInfoQueryClient.GetJobInfo(ctx, jobId)
require.NoError(t, err)
assert.Equal(t, mockJobInfo, jobInfo)

// Wait for longer than queryInterval to ensure the task has been re-queued.
time.Sleep(queryInterval + time.Millisecond)
synctest.Wait()

// Fifth call, since the job has reached the terminal status, the task has removed from the worker.
// The GetJobInfo underneath should not be called again, and the cached job info should be returned.
jobInfo, err = asyncJobInfoQueryClient.GetJobInfo(ctx, jobId)
require.NoError(t, err)
assert.Equal(t, mockJobInfo, jobInfo)

// Wait for longer than cacheExpiry to ensure the cache has been expired and removed.
time.Sleep(cacheExpiry + 10*queryInterval)
synctest.Wait()

cached, ok := cacheStorage.Get(cacheKey(clusterName, jobId))
assert.Nil(t, cached)
assert.False(t, ok)

// Test with getting a persistent error, the cache should be removed eventually.
nonExistedJobId := "not-existed-job-id"
Comment on lines +107 to +108
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another test in the same bubble. The goroutine is created by singleton and synctest is not allowed to access goroutine outside of bubble.


// earlier set up the mock expectation for the second call to avoid flaky test.
expectedError = errors.New("no such host")
mockClient.EXPECT().GetJobInfo(ctx, nonExistedJobId).Return(nil, expectedError).AnyTimes()

jobInfo, err = asyncJobInfoQueryClient.GetJobInfo(ctx, nonExistedJobId)
assert.Nil(t, jobInfo)
assert.Equal(t, ErrAgain, err)

time.Sleep(queryInterval + time.Millisecond)
synctest.Wait()

jobInfo, err = asyncJobInfoQueryClient.GetJobInfo(ctx, nonExistedJobId)
assert.Nil(t, jobInfo)
assert.Equal(t, expectedError, err)

// After error has been consumed previously, the job info is not in cache so it should return ErrAgain.
jobInfo, err = asyncJobInfoQueryClient.GetJobInfo(ctx, nonExistedJobId)
assert.Nil(t, jobInfo)
assert.Equal(t, ErrAgain, err)

time.Sleep(queryInterval + time.Millisecond)
synctest.Wait()

// Get the same error again without continuing to requeue the task.
jobInfo, err = asyncJobInfoQueryClient.GetJobInfo(ctx, nonExistedJobId)
assert.Nil(t, jobInfo)
assert.Equal(t, expectedError, err)

// The cache should be removed after previous GetJobInfo.
cached, ok = cacheStorage.Get(cacheKey(clusterName, nonExistedJobId))
assert.Nil(t, cached)
assert.False(t, ok)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
JobPath = "/api/jobs/"
)

//go:generate mockgen -destination=mocks/dashboard_client_mock.go -package=mocks github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient RayDashboardClientInterface
type RayDashboardClientInterface interface {
UpdateDeployments(ctx context.Context, configJson []byte) error
// V2/multi-app Rest API
Expand Down
Loading
Loading