Skip to content

Commit 98ab408

Browse files
authored
Allow minExecutors=0 in dynamic mode (#89)
Closes G-Research/spark#157 --------- Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
1 parent 271a002 commit 98ab408

File tree

2 files changed

+58
-12
lines changed

2 files changed

+58
-12
lines changed

src/main/scala/org/apache/spark/deploy/armada/submit/ArmadaClientApplication.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ import org.apache.spark.deploy.k8s.Config.{
8181
KUBERNETES_SUBMIT_GRACE_PERIOD
8282
}
8383
import org.apache.spark.{SecurityManager, SparkConf}
84-
import org.apache.spark.internal.config.{DRIVER_PORT, DRIVER_HOST_ADDRESS}
84+
import org.apache.spark.internal.config.{DRIVER_PORT, DRIVER_HOST_ADDRESS, DYN_ALLOCATION_ENABLED}
8585
import org.apache.spark.resource.ResourceProfile
8686
import org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder
8787
import io.fabric8.kubernetes.client.DefaultKubernetesClient
@@ -363,12 +363,6 @@ private[spark] class ArmadaClientApplication extends SparkApplication {
363363
driverJobId: String,
364364
executorCount: Int
365365
): Seq[String] = {
366-
if (executorCount <= 0) {
367-
throw new IllegalArgumentException(
368-
s"Executor count must be greater than 0, but got: $executorCount"
369-
)
370-
}
371-
372366
val driverData = buildDriverData(None, armadaJobConfig, conf)
373367

374368
val executorLabels = buildLabels(
@@ -443,9 +437,12 @@ private[spark] class ArmadaClientApplication extends SparkApplication {
443437
): (String, Seq[String]) = {
444438
val modeHelper = DeploymentModeHelper(conf)
445439
val executorCount = modeHelper.getExecutorCount
446-
if (executorCount <= 0) {
440+
val isDynamic = conf.getBoolean(DYN_ALLOCATION_ENABLED.key, false)
441+
442+
// Allow minExecutors=0 for dynamic allocation
443+
if (!isDynamic && executorCount <= 0) {
447444
throw new IllegalArgumentException(
448-
s"Executor count must be greater than 0, but got: $executorCount"
445+
s"Executor count must be greater than 0 for static allocation, but got: $executorCount"
449446
)
450447
}
451448

@@ -1783,12 +1780,14 @@ private[spark] class ArmadaClientApplication extends SparkApplication {
17831780
nodeUniformityLabel: Option[String],
17841781
conf: SparkConf
17851782
): Map[String, String] = {
1786-
val modeHelper = DeploymentModeHelper(conf)
1783+
val modeHelper = DeploymentModeHelper(conf)
1784+
val gangCardinality = modeHelper.getGangCardinality
17871785
configGenerator.getAnnotations ++ templateAnnotations ++ nodeUniformityLabel
1786+
.filter(_ => gangCardinality > 0) // Only add gang annotations if cardinality > 0
17881787
.map(label =>
17891788
GangSchedulingAnnotations(
17901789
gangId,
1791-
modeHelper.getGangCardinality,
1790+
gangCardinality,
17921791
label
17931792
)
17941793
)

src/test/scala/org/apache/spark/deploy/armada/submit/ArmadaClientApplicationSuite.scala

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1653,7 +1653,9 @@ class ArmadaClientApplicationSuite extends AnyFunSuite with BeforeAndAfter with
16531653
}
16541654
}
16551655

1656-
test("submitArmadaJob should validate executor count is greater than zero") {
1656+
test(
1657+
"submitArmadaJob should validate executor count is greater than zero for static allocation"
1658+
) {
16571659
val armadaJobConfig = armadaClientApp.ArmadaJobConfig(
16581660
queue = "test-queue",
16591661
jobSetId = "test-job-set",
@@ -1685,13 +1687,58 @@ class ArmadaClientApplicationSuite extends AnyFunSuite with BeforeAndAfter with
16851687
)
16861688

16871689
sparkConf.set("spark.executor.instances", "0")
1690+
sparkConf.set("spark.dynamicAllocation.enabled", "false")
16881691

16891692
val exception = intercept[IllegalArgumentException] {
16901693
armadaClientApp.submitArmadaJob(null, clientArguments, armadaJobConfig, sparkConf)
16911694
}
16921695
exception.getMessage should include("Executor count must be greater than 0")
16931696
}
16941697

1698+
test("submitArmadaJob should allow minExecutors=0 for dynamic allocation") {
1699+
val armadaJobConfig = armadaClientApp.ArmadaJobConfig(
1700+
queue = "test-queue",
1701+
jobSetId = "test-job-set",
1702+
jobTemplate = None,
1703+
driverJobItemTemplate = None,
1704+
executorJobItemTemplate = None,
1705+
cliConfig = armadaClientApp.CLIConfig(
1706+
queue = Some("test-queue"),
1707+
jobSetId = Some("test-job-set"),
1708+
namespace = Some("test-namespace"),
1709+
priority = Some(RUNTIME_PRIORITY),
1710+
containerImage = Some(DEFAULT_IMAGE_NAME),
1711+
podLabels = Map.empty,
1712+
driverLabels = Map.empty,
1713+
executorLabels = Map.empty,
1714+
armadaClusterUrl = Some("armada://localhost:50051"),
1715+
nodeSelectors = Map.empty,
1716+
nodeUniformityLabel = None,
1717+
executorConnectionTimeout = Some(300.seconds),
1718+
runAsUser = None,
1719+
driverResources = armadaClientApp.ResourceConfig(None, None, None, None),
1720+
executorResources = armadaClientApp.ResourceConfig(None, None, None, None)
1721+
),
1722+
applicationId = "armada-spark-app-id",
1723+
driverFeatureStepJobItem = None,
1724+
driverFeatureStepContainer = None,
1725+
executorFeatureStepJobItem = None,
1726+
executorFeatureStepContainer = None
1727+
)
1728+
1729+
sparkConf.set("spark.submit.deployMode", "cluster")
1730+
sparkConf.set("spark.dynamicAllocation.enabled", "true")
1731+
sparkConf.set("spark.dynamicAllocation.minExecutors", "0")
1732+
1733+
// Should not throw executor count validation exception
1734+
val exception = intercept[Throwable] {
1735+
armadaClientApp.submitArmadaJob(null, clientArguments, armadaJobConfig, sparkConf)
1736+
}
1737+
Option(exception.getMessage).getOrElse(
1738+
""
1739+
) should not include "Executor count must be greater than 0"
1740+
}
1741+
16951742
test("JobTemplateLoader should handle malformed YAML gracefully") {
16961743
val malformedYaml = "invalid: yaml: content: ]]]["
16971744
val templateFile = tempDir.resolve("malformed-template.yaml").toFile

0 commit comments

Comments
 (0)