Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ import org.apache.spark.deploy.k8s.Config.{
KUBERNETES_SUBMIT_GRACE_PERIOD
}
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.config.{DRIVER_PORT, DRIVER_HOST_ADDRESS}
import org.apache.spark.internal.config.{DRIVER_PORT, DRIVER_HOST_ADDRESS, DYN_ALLOCATION_ENABLED}
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder
import io.fabric8.kubernetes.client.DefaultKubernetesClient
Expand Down Expand Up @@ -363,12 +363,6 @@ private[spark] class ArmadaClientApplication extends SparkApplication {
driverJobId: String,
executorCount: Int
): Seq[String] = {
if (executorCount <= 0) {
throw new IllegalArgumentException(
s"Executor count must be greater than 0, but got: $executorCount"
)
}

val driverData = buildDriverData(None, armadaJobConfig, conf)

val executorLabels = buildLabels(
Expand Down Expand Up @@ -443,9 +437,12 @@ private[spark] class ArmadaClientApplication extends SparkApplication {
): (String, Seq[String]) = {
val modeHelper = DeploymentModeHelper(conf)
val executorCount = modeHelper.getExecutorCount
if (executorCount <= 0) {
val isDynamic = conf.getBoolean(DYN_ALLOCATION_ENABLED.key, false)

// Allow minExecutors=0 for dynamic allocation
if (!isDynamic && executorCount <= 0) {
throw new IllegalArgumentException(
s"Executor count must be greater than 0, but got: $executorCount"
s"Executor count must be greater than 0 for static allocation, but got: $executorCount"
)
}

Expand Down Expand Up @@ -1783,12 +1780,14 @@ private[spark] class ArmadaClientApplication extends SparkApplication {
nodeUniformityLabel: Option[String],
conf: SparkConf
): Map[String, String] = {
val modeHelper = DeploymentModeHelper(conf)
val modeHelper = DeploymentModeHelper(conf)
val gangCardinality = modeHelper.getGangCardinality
configGenerator.getAnnotations ++ templateAnnotations ++ nodeUniformityLabel
.filter(_ => gangCardinality > 0) // Only add gang annotations if cardinality > 0
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Armada does not accept gangCardinality = 0; gives gang cardinality 0 is non-positive error

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cardinality is driver + num(executors), so even if you submit driver only, cardinality will be 1 in that case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it true for client mode as well? I guess for client mode, ExecutorCount is the gangCardinality

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.map(label =>
GangSchedulingAnnotations(
gangId,
modeHelper.getGangCardinality,
gangCardinality,
label
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1653,7 +1653,9 @@ class ArmadaClientApplicationSuite extends AnyFunSuite with BeforeAndAfter with
}
}

test("submitArmadaJob should validate executor count is greater than zero") {
test(
"submitArmadaJob should validate executor count is greater than zero for static allocation"
) {
val armadaJobConfig = armadaClientApp.ArmadaJobConfig(
queue = "test-queue",
jobSetId = "test-job-set",
Expand Down Expand Up @@ -1685,13 +1687,58 @@ class ArmadaClientApplicationSuite extends AnyFunSuite with BeforeAndAfter with
)

sparkConf.set("spark.executor.instances", "0")
sparkConf.set("spark.dynamicAllocation.enabled", "false")

val exception = intercept[IllegalArgumentException] {
armadaClientApp.submitArmadaJob(null, clientArguments, armadaJobConfig, sparkConf)
}
exception.getMessage should include("Executor count must be greater than 0")
}

test("submitArmadaJob should allow minExecutors=0 for dynamic allocation") {
val armadaJobConfig = armadaClientApp.ArmadaJobConfig(
queue = "test-queue",
jobSetId = "test-job-set",
jobTemplate = None,
driverJobItemTemplate = None,
executorJobItemTemplate = None,
cliConfig = armadaClientApp.CLIConfig(
queue = Some("test-queue"),
jobSetId = Some("test-job-set"),
namespace = Some("test-namespace"),
priority = Some(RUNTIME_PRIORITY),
containerImage = Some(DEFAULT_IMAGE_NAME),
podLabels = Map.empty,
driverLabels = Map.empty,
executorLabels = Map.empty,
armadaClusterUrl = Some("armada://localhost:50051"),
nodeSelectors = Map.empty,
nodeUniformityLabel = None,
executorConnectionTimeout = Some(300.seconds),
runAsUser = None,
driverResources = armadaClientApp.ResourceConfig(None, None, None, None),
executorResources = armadaClientApp.ResourceConfig(None, None, None, None)
),
applicationId = "armada-spark-app-id",
driverFeatureStepJobItem = None,
driverFeatureStepContainer = None,
executorFeatureStepJobItem = None,
executorFeatureStepContainer = None
)

sparkConf.set("spark.submit.deployMode", "cluster")
sparkConf.set("spark.dynamicAllocation.enabled", "true")
sparkConf.set("spark.dynamicAllocation.minExecutors", "0")

// Should not throw executor count validation exception
val exception = intercept[Throwable] {
armadaClientApp.submitArmadaJob(null, clientArguments, armadaJobConfig, sparkConf)
}
Option(exception.getMessage).getOrElse(
""
) should not include "Executor count must be greater than 0"
}

test("JobTemplateLoader should handle malformed YAML gracefully") {
val malformedYaml = "invalid: yaml: content: ]]]["
val templateFile = tempDir.resolve("malformed-template.yaml").toFile
Expand Down