-
Notifications
You must be signed in to change notification settings - Fork 715
Expand file tree
/
Copy pathvalidation.go
More file actions
306 lines (262 loc) · 14.4 KB
/
validation.go
File metadata and controls
306 lines (262 loc) · 14.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
package utils
import (
errstd "errors"
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient"
"github.com/ray-project/kuberay/ray-operator/pkg/features"
)
func ValidateRayClusterStatus(instance *rayv1.RayCluster) error {
suspending := meta.IsStatusConditionTrue(instance.Status.Conditions, string(rayv1.RayClusterSuspending))
suspended := meta.IsStatusConditionTrue(instance.Status.Conditions, string(rayv1.RayClusterSuspended))
if suspending && suspended {
return errstd.New("invalid RayCluster State: rayv1.RayClusterSuspending and rayv1.RayClusterSuspended conditions should not be both true")
}
return nil
}
func ValidateRayClusterMetadata(metadata metav1.ObjectMeta) error {
if len(metadata.Name) > MaxRayClusterNameLength {
return fmt.Errorf("RayCluster name should be no more than %d characters", MaxRayClusterNameLength)
}
if errs := validation.IsDNS1035Label(metadata.Name); len(errs) > 0 {
return fmt.Errorf("RayCluster name should be a valid DNS1035 label: %v", errs)
}
return nil
}
// Validation for invalid Ray Cluster configurations.
func ValidateRayClusterSpec(spec *rayv1.RayClusterSpec, annotations map[string]string) error {
if len(spec.HeadGroupSpec.Template.Spec.Containers) == 0 {
return fmt.Errorf("headGroupSpec should have at least one container")
}
for _, workerGroup := range spec.WorkerGroupSpecs {
if len(workerGroup.Template.Spec.Containers) == 0 {
return fmt.Errorf("workerGroupSpec should have at least one container")
}
}
if annotations[RayFTEnabledAnnotationKey] != "" && spec.GcsFaultToleranceOptions != nil {
return fmt.Errorf("%s annotation and GcsFaultToleranceOptions are both set. "+
"Please use only GcsFaultToleranceOptions to configure GCS fault tolerance", RayFTEnabledAnnotationKey)
}
if !IsGCSFaultToleranceEnabled(spec, annotations) {
if EnvVarExists(RAY_REDIS_ADDRESS, spec.HeadGroupSpec.Template.Spec.Containers[RayContainerIndex].Env) {
return fmt.Errorf("%s is set which implicitly enables GCS fault tolerance, "+
"but GcsFaultToleranceOptions is not set. Please set GcsFaultToleranceOptions "+
"to enable GCS fault tolerance", RAY_REDIS_ADDRESS)
}
}
headContainer := spec.HeadGroupSpec.Template.Spec.Containers[RayContainerIndex]
if spec.GcsFaultToleranceOptions != nil {
if redisPassword := spec.HeadGroupSpec.RayStartParams["redis-password"]; redisPassword != "" {
return fmt.Errorf("cannot set `redis-password` in rayStartParams when " +
"GcsFaultToleranceOptions is enabled - use GcsFaultToleranceOptions.RedisPassword instead")
}
if EnvVarExists(REDIS_PASSWORD, headContainer.Env) {
return fmt.Errorf("cannot set `REDIS_PASSWORD` env var in head Pod when " +
"GcsFaultToleranceOptions is enabled - use GcsFaultToleranceOptions.RedisPassword instead")
}
if EnvVarExists(RAY_REDIS_ADDRESS, headContainer.Env) {
return fmt.Errorf("cannot set `RAY_REDIS_ADDRESS` env var in head Pod when " +
"GcsFaultToleranceOptions is enabled - use GcsFaultToleranceOptions.RedisAddress instead")
}
if annotations[RayExternalStorageNSAnnotationKey] != "" {
return fmt.Errorf("cannot set `ray.io/external-storage-namespace` annotation when " +
"GcsFaultToleranceOptions is enabled - use GcsFaultToleranceOptions.ExternalStorageNamespace instead")
}
}
if spec.HeadGroupSpec.RayStartParams["redis-username"] != "" || EnvVarExists(REDIS_USERNAME, headContainer.Env) {
return fmt.Errorf("cannot set redis username in rayStartParams or environment variables" +
" - use GcsFaultToleranceOptions.RedisUsername instead")
}
if !features.Enabled(features.RayJobDeletionPolicy) {
for _, workerGroup := range spec.WorkerGroupSpecs {
if workerGroup.Suspend != nil && *workerGroup.Suspend {
return fmt.Errorf("worker group %s can be suspended only when the RayJobDeletionPolicy feature gate is enabled", workerGroup.GroupName)
}
}
}
// Check if autoscaling is enabled once to avoid repeated calls
isAutoscalingEnabled := IsAutoscalingEnabled(spec)
// Validate that RAY_enable_autoscaler_v2 environment variable is not set to "1" or "true" when autoscaler is disabled
if !isAutoscalingEnabled {
if envVar, exists := EnvVarByName(RAY_ENABLE_AUTOSCALER_V2, spec.HeadGroupSpec.Template.Spec.Containers[RayContainerIndex].Env); exists {
if envVar.Value == "1" || envVar.Value == "true" {
return fmt.Errorf("environment variable %s cannot be set to '%s' when enableInTreeAutoscaling is false. Please set enableInTreeAutoscaling: true to use autoscaler v2", RAY_ENABLE_AUTOSCALER_V2, envVar.Value)
}
}
}
if isAutoscalingEnabled {
for _, workerGroup := range spec.WorkerGroupSpecs {
if workerGroup.Suspend != nil && *workerGroup.Suspend {
// TODO (rueian): This can be supported in future Ray. We should check the RayVersion once we know the version.
return fmt.Errorf("worker group %s cannot be suspended with Autoscaler enabled", workerGroup.GroupName)
}
}
if spec.AutoscalerOptions != nil && spec.AutoscalerOptions.Version != nil && EnvVarExists(RAY_ENABLE_AUTOSCALER_V2, spec.HeadGroupSpec.Template.Spec.Containers[RayContainerIndex].Env) {
return fmt.Errorf("both .spec.autoscalerOptions.version and head Pod env var %s are set, please only use the former", RAY_ENABLE_AUTOSCALER_V2)
}
if IsAutoscalingV2Enabled(spec) {
if spec.HeadGroupSpec.Template.Spec.RestartPolicy != "" && spec.HeadGroupSpec.Template.Spec.RestartPolicy != corev1.RestartPolicyNever {
return fmt.Errorf("restartPolicy for head Pod should be Never or unset when using autoscaler V2")
}
for _, workerGroup := range spec.WorkerGroupSpecs {
if workerGroup.Template.Spec.RestartPolicy != "" && workerGroup.Template.Spec.RestartPolicy != corev1.RestartPolicyNever {
return fmt.Errorf("restartPolicy for worker group %s should be Never or unset when using autoscaler V2", workerGroup.GroupName)
}
}
}
}
return nil
}
func ValidateRayJobStatus(rayJob *rayv1.RayJob) error {
if rayJob.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusWaiting && rayJob.Spec.SubmissionMode != rayv1.InteractiveMode {
return fmt.Errorf("The RayJob status is invalid: JobDeploymentStatus cannot be `Waiting` when SubmissionMode is not InteractiveMode")
}
return nil
}
func ValidateRayJobMetadata(metadata metav1.ObjectMeta) error {
if len(metadata.Name) > MaxRayJobNameLength {
return fmt.Errorf("The RayJob metadata is invalid: RayJob name should be no more than %d characters", MaxRayJobNameLength)
}
if errs := validation.IsDNS1035Label(metadata.Name); len(errs) > 0 {
return fmt.Errorf("The RayJob metadata is invalid: RayJob name should be a valid DNS1035 label: %v", errs)
}
return nil
}
func ValidateRayJobSpec(rayJob *rayv1.RayJob) error {
// KubeRay has some limitations for the suspend operation. The limitations are a subset of the limitations of
// Kueue (https://kueue.sigs.k8s.io/docs/tasks/run_rayjobs/#c-limitations). For example, KubeRay allows users
// to suspend a RayJob with autoscaling enabled, but Kueue doesn't.
if rayJob.Spec.Suspend && !rayJob.Spec.ShutdownAfterJobFinishes {
return fmt.Errorf("The RayJob spec is invalid: a RayJob with shutdownAfterJobFinishes set to false is not allowed to be suspended")
}
if rayJob.Spec.TTLSecondsAfterFinished < 0 {
return fmt.Errorf("The RayJob spec is invalid: TTLSecondsAfterFinished must be a non-negative integer")
}
if !rayJob.Spec.ShutdownAfterJobFinishes && rayJob.Spec.TTLSecondsAfterFinished > 0 {
return fmt.Errorf("The RayJob spec is invalid: a RayJob with shutdownAfterJobFinishes set to false cannot have TTLSecondsAfterFinished")
}
isClusterSelectorMode := len(rayJob.Spec.ClusterSelector) != 0
if rayJob.Spec.Suspend && isClusterSelectorMode {
return fmt.Errorf("The RayJob spec is invalid: the ClusterSelector mode doesn't support the suspend operation")
}
if rayJob.Spec.RayClusterSpec == nil && !isClusterSelectorMode {
return fmt.Errorf("The RayJob spec is invalid: one of RayClusterSpec or ClusterSelector must be set")
}
if isClusterSelectorMode {
clusterName := rayJob.Spec.ClusterSelector[RayJobClusterSelectorKey]
if len(clusterName) == 0 {
return fmt.Errorf("cluster name in ClusterSelector should not be empty")
}
if rayJob.Spec.SubmissionMode == rayv1.SidecarMode {
return fmt.Errorf("ClusterSelector is not supported in SidecarMode")
}
}
// InteractiveMode does not support backoffLimit > 1.
// When a RayJob fails (e.g., due to a missing script) and retries,
// spec.JobId remains set, causing the new job to incorrectly transition
// to Running instead of Waiting or Failed.
// After discussion, we decided to disallow retries in InteractiveMode
// to avoid ambiguous state handling and unintended behavior.
// https://github.com/ray-project/kuberay/issues/3525
if rayJob.Spec.SubmissionMode == rayv1.InteractiveMode && rayJob.Spec.BackoffLimit != nil && *rayJob.Spec.BackoffLimit > 0 {
return fmt.Errorf("The RayJob spec is invalid: BackoffLimit is incompatible with InteractiveMode")
}
if rayJob.Spec.SubmissionMode == rayv1.SidecarMode {
if rayJob.Spec.SubmitterPodTemplate != nil {
return fmt.Errorf("Currently, SidecarMode doesn't support SubmitterPodTemplate")
}
if rayJob.Spec.SubmitterConfig != nil {
return fmt.Errorf("Currently, SidecarMode doesn't support SubmitterConfig")
}
if rayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.RestartPolicy != "" && rayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.RestartPolicy != corev1.RestartPolicyNever {
return fmt.Errorf("restartPolicy for head Pod should be Never or unset when using SidecarMode")
}
}
if rayJob.Spec.RayClusterSpec != nil {
if err := ValidateRayClusterSpec(rayJob.Spec.RayClusterSpec, rayJob.Annotations); err != nil {
return fmt.Errorf("The RayJob spec is invalid: %w", err)
}
}
// Validate whether RuntimeEnvYAML is a valid YAML string. Note that this only checks its validity
// as a YAML string, not its adherence to the runtime environment schema.
if _, err := dashboardclient.UnmarshalRuntimeEnvYAML(rayJob.Spec.RuntimeEnvYAML); err != nil {
return err
}
if rayJob.Spec.ActiveDeadlineSeconds != nil && *rayJob.Spec.ActiveDeadlineSeconds <= 0 {
return fmt.Errorf("The RayJob spec is invalid: activeDeadlineSeconds must be a positive integer")
}
if rayJob.Spec.WaitingTtlSeconds != nil && *rayJob.Spec.WaitingTtlSeconds <= 0 {
return fmt.Errorf("The RayJob spec is invalid: waitingTtlSeconds must be a positive integer")
}
if rayJob.Spec.BackoffLimit != nil && *rayJob.Spec.BackoffLimit < 0 {
return fmt.Errorf("The RayJob spec is invalid: backoffLimit must be a positive integer")
}
if !features.Enabled(features.RayJobDeletionPolicy) && rayJob.Spec.DeletionStrategy != nil {
return fmt.Errorf("The RayJob spec is invalid: RayJobDeletionPolicy feature gate must be enabled to use the DeletionStrategy feature")
}
if rayJob.Spec.DeletionStrategy != nil {
onSuccessPolicy := rayJob.Spec.DeletionStrategy.OnSuccess
onFailurePolicy := rayJob.Spec.DeletionStrategy.OnFailure
if onSuccessPolicy.Policy == nil {
return fmt.Errorf("The RayJob spec is invalid: the DeletionPolicyType field of DeletionStrategy.OnSuccess cannot be unset when DeletionStrategy is enabled")
}
if onFailurePolicy.Policy == nil {
return fmt.Errorf("The RayJob spec is invalid: the DeletionPolicyType field of DeletionStrategy.OnFailure cannot be unset when DeletionStrategy is enabled")
}
if isClusterSelectorMode {
switch *onSuccessPolicy.Policy {
case rayv1.DeleteCluster:
return fmt.Errorf("The RayJob spec is invalid: the ClusterSelector mode doesn't support DeletionStrategy=DeleteCluster on success")
case rayv1.DeleteWorkers:
return fmt.Errorf("The RayJob spec is invalid: the ClusterSelector mode doesn't support DeletionStrategy=DeleteWorkers on success")
}
switch *onFailurePolicy.Policy {
case rayv1.DeleteCluster:
return fmt.Errorf("The RayJob spec is invalid: the ClusterSelector mode doesn't support DeletionStrategy=DeleteCluster on failure")
case rayv1.DeleteWorkers:
return fmt.Errorf("The RayJob spec is invalid: the ClusterSelector mode doesn't support DeletionStrategy=DeleteWorkers on failure")
}
}
if (*onSuccessPolicy.Policy == rayv1.DeleteWorkers || *onFailurePolicy.Policy == rayv1.DeleteWorkers) && IsAutoscalingEnabled(rayJob.Spec.RayClusterSpec) {
// TODO (rueian): This can be supported in a future Ray version. We should check the RayVersion once we know it.
return fmt.Errorf("The RayJob spec is invalid: DeletionStrategy=DeleteWorkers currently does not support RayCluster with autoscaling enabled")
}
if rayJob.Spec.ShutdownAfterJobFinishes && (*onSuccessPolicy.Policy == rayv1.DeleteNone || *onFailurePolicy.Policy == rayv1.DeleteNone) {
return fmt.Errorf("The RayJob spec is invalid: shutdownAfterJobFinshes is set to 'true' while deletion policy is 'DeleteNone'")
}
}
return nil
}
func ValidateRayServiceMetadata(metadata metav1.ObjectMeta) error {
if len(metadata.Name) > MaxRayServiceNameLength {
return fmt.Errorf("RayService name should be no more than %d characters", MaxRayServiceNameLength)
}
if errs := validation.IsDNS1035Label(metadata.Name); len(errs) > 0 {
return fmt.Errorf("RayService name should be a valid DNS1035 label: %v", errs)
}
return nil
}
func ValidateRayServiceSpec(rayService *rayv1.RayService) error {
if err := ValidateRayClusterSpec(&rayService.Spec.RayClusterSpec, rayService.Annotations); err != nil {
return err
}
if headSvc := rayService.Spec.RayClusterSpec.HeadGroupSpec.HeadService; headSvc != nil && headSvc.Name != "" {
return fmt.Errorf("spec.rayClusterConfig.headGroupSpec.headService.metadata.name should not be set")
}
// only NewCluster and None are valid upgradeType
if rayService.Spec.UpgradeStrategy != nil &&
rayService.Spec.UpgradeStrategy.Type != nil &&
*rayService.Spec.UpgradeStrategy.Type != rayv1.None &&
*rayService.Spec.UpgradeStrategy.Type != rayv1.NewCluster {
return fmt.Errorf("Spec.UpgradeStrategy.Type value %s is invalid, valid options are %s or %s", *rayService.Spec.UpgradeStrategy.Type, rayv1.NewCluster, rayv1.None)
}
if rayService.Spec.RayClusterDeletionDelaySeconds != nil &&
*rayService.Spec.RayClusterDeletionDelaySeconds < 0 {
return fmt.Errorf("Spec.RayClusterDeletionDelaySeconds should be a non-negative integer, got %d", *rayService.Spec.RayClusterDeletionDelaySeconds)
}
return nil
}