Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion conf/spark-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ spark.jars.ivy /tmp/.ivy
spark.armada.pod.labels foo=bar
spark.armada.lookouturl http://localhost:30000
spark.armada.queue test
spark.armada.internalUrl unused
#spark.executor.instances 3
#spark.armada.scheduling.nodeUniformity armada-spark
#spark.armada.scheduling.nodeSelectors armada-spark=true
Expand Down
65 changes: 60 additions & 5 deletions scripts/init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,39 @@ if [ -e "$scripts/config.sh" ]; then
source "$scripts/config.sh"
fi

# Default values for deploy mode and allocation
DEPLOY_MODE="${DEPLOY_MODE:-cluster}"
ALLOCATION_MODE="${ALLOCATION_MODE:-dynamic}"

# Parse long options (--mode, --allocation) before getopts
ARGS=()
while [[ $# -gt 0 ]]; do
case "$1" in
--mode)
DEPLOY_MODE="$2"
shift 2
;;
--allocation)
ALLOCATION_MODE="$2"
shift 2
;;
*)
ARGS+=("$1")
shift
;;
esac
done

# Restore remaining arguments for getopts
set -- "${ARGS[@]}"

print_usage () {
echo ' Usage:'
echo ' -h help'
echo ' -k "use kind cluster"'
echo ' -p "build image with python"'
echo ' -S "static mode (fixed executor count, no dynamic allocation)"'
echo ' -M, --mode <client|cluster> "deploy mode (default: cluster)"'
echo ' -A, --allocation <static|dynamic> "allocation type (default: dynamic)"'
echo ' -i <image-name>'
echo ' -m <armada-master-url>'
echo ' -q <armada-queue>'
Expand All @@ -23,25 +50,31 @@ print_usage () {
echo ' -c <class path to use>'
echo ' -e running e2e tests'
echo ''
echo 'Examples:'
echo ' --mode cluster --allocation dynamic'
echo ' --mode cluster --allocation static'
echo ' --mode client --allocation dynamic'
echo ' --mode client --allocation static'
echo ''
echo 'You also can specify those parameters in scripts/config.sh, like so:'
echo ' IMAGE_NAME=spark:armada'
echo ' ARMADA_MASTER=armada://localhost:30002'
echo ' ARMADA_QUEUE=test'
echo ' USE_KIND=true'
echo ' INCLUDE_PYTHON=true'
echo ' STATIC_MODE=true'
echo ' DEPLOY_MODE=cluster'
echo ' ALLOCATION_MODE=dynamic'
echo ' PYTHON_SCRIPT=/opt/spark/examples/src/main/python/pi.py'
echo ' SCALA_CLASS=org.apache.spark.examples.SparkPi'
echo ' CLASS_PATH=local:///opt/spark/extraFiles/spark-examples_2.12-3.5.3.jar'
exit 1
}

while getopts "hekpSi:a:m:P:s:c:q:" opt; do
while getopts "hekpi:a:m:P:s:c:q:M:A:e" opt; do
case "$opt" in
h) print_usage ;;
k) USE_KIND=true ;;
p) INCLUDE_PYTHON=true ;;
S) STATIC_MODE=true ;;
a) ARMADA_AUTH_TOKEN=$OPTARG ;;
i) IMAGE_NAME=$OPTARG ;;
m) ARMADA_MASTER=$OPTARG ;;
Expand All @@ -50,19 +83,41 @@ while getopts "hekpSi:a:m:P:s:c:q:" opt; do
s) SCALA_CLASS=$OPTARG ;;
c) CLASSPATH=$OPTARG ;;
e) RUNNING_E2E_TESTS=true ;;
M) DEPLOY_MODE=$OPTARG ;;
A) ALLOCATION_MODE=$OPTARG ;;
esac
done

export INCLUDE_PYTHON="${INCLUDE_PYTHON:-false}"
export USE_KIND="${USE_KIND:-false}"
export STATIC_MODE="${STATIC_MODE:-false}"
export IMAGE_NAME="${IMAGE_NAME:-spark:armada}"
export ARMADA_MASTER="${ARMADA_MASTER:-armada://localhost:30002}"
export ARMADA_QUEUE="${ARMADA_QUEUE:-test}"
export ARMADA_AUTH_TOKEN=${ARMADA_AUTH_TOKEN:-}
export SCALA_CLASS="${SCALA_CLASS:-org.apache.spark.examples.SparkPi}"
export RUNNING_E2E_TESTS="${RUNNING_E2E_TESTS:-false}"

# Validation

if [[ "$DEPLOY_MODE" != "client" && "$DEPLOY_MODE" != "cluster" ]]; then
echo "Error: --mode/-M must be either 'client' or 'cluster'"
exit 1
fi
export DEPLOY_MODE

if [[ "$ALLOCATION_MODE" != "static" && "$ALLOCATION_MODE" != "dynamic" ]]; then
echo "Error: --allocation/-A must be either 'static' or 'dynamic'"
exit 1
fi
export ALLOCATION_MODE

if [ "$ALLOCATION_MODE" = "static" ]; then
STATIC_MODE=true
else
STATIC_MODE=false
fi
export STATIC_MODE

if [ -z "${PYTHON_SCRIPT:-}" ]; then
PYTHON_SCRIPT="/opt/spark/examples/src/main/python/pi.py"
else
Expand Down
47 changes: 33 additions & 14 deletions scripts/submitArmadaSpark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ echo Submitting spark job to Armada.
scripts="$(cd "$(dirname "$0")"; pwd)"
source "$scripts/init.sh"

echo "Deploy Mode: $DEPLOY_MODE"
echo "Allocation Mode: $ALLOCATION_MODE"

if [ "${INCLUDE_PYTHON}" == "false" ]; then
NAME=spark-pi
CLASS_PROMPT="--class"
Expand Down Expand Up @@ -43,32 +46,33 @@ fi
# Disable config maps until this is fixed: https://github.com/G-Research/spark/issues/109
DISABLE_CONFIG_MAP=true

export ARMADA_INTERNAL_URL="${ARMADA_INTERNAL_URL:-armada://armada-server.armada:50051}"
# Set memory limits based on deploy mode
MEMORY_LIMIT="1Gi"

# Build configuration based on mode
# Build configuration based on allocation mode
if [ "$STATIC_MODE" = true ]; then
echo running static mode
# Static mode: fixed executor count (like submitArmadaSpark.sh)
# Static mode: fixed executor count
EXTRA_CONF=(
--conf spark.executor.instances=2
--conf spark.armada.executor.limit.memory=4Gi
--conf spark.armada.executor.request.memory=4Gi
--conf spark.armada.driver.limit.memory=4Gi
--conf spark.armada.driver.request.memory=4Gi
--conf spark.armada.executor.limit.memory=$MEMORY_LIMIT
--conf spark.armada.executor.request.memory=$MEMORY_LIMIT
--conf spark.armada.driver.limit.memory=$MEMORY_LIMIT
--conf spark.armada.driver.request.memory=$MEMORY_LIMIT
--conf spark.driver.extraJavaOptions="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"
--conf spark.executor.extraJavaOptions="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"
)
else
echo running dynamic mode
# Dynamic mode: dynamic allocation with debug options (original submitArmadaSparkDynamic.sh)
# Dynamic mode: dynamic allocation with debug options
EXTRA_CONF=(
--conf spark.driver.extraJavaOptions="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"
--conf spark.executor.extraJavaOptions="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"
--conf spark.armada.scheduling.namespace=default
--conf spark.armada.executor.limit.memory=4Gi
--conf spark.armada.executor.request.memory=4Gi
--conf spark.armada.driver.limit.memory=4Gi
--conf spark.armada.driver.request.memory=4Gi
--conf spark.armada.executor.limit.memory=$MEMORY_LIMIT
--conf spark.armada.executor.request.memory=$MEMORY_LIMIT
--conf spark.armada.driver.limit.memory=$MEMORY_LIMIT
--conf spark.armada.driver.request.memory=$MEMORY_LIMIT
--conf spark.default.parallelism=10
--conf spark.executor.instances=1
--conf spark.sql.shuffle.partitions=5
Expand All @@ -84,10 +88,25 @@ else
)
fi

# Build deploy-mode specific arguments array
DEPLOY_MODE_ARGS=()
if [ "$DEPLOY_MODE" = "client" ]; then
DEPLOY_MODE_ARGS=(
--conf spark.driver.host=$SPARK_DRIVER_HOST
--conf spark.driver.port=$SPARK_DRIVER_PORT
--conf spark.app.id=armada-spark-$(openssl rand -hex 3)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd really prefer the rand to happen inside our scala code. The default should be random. If the user wants to override for some reason, he can, but I don't want us to have to do this here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made some changes to fix this.

)
else
export ARMADA_INTERNAL_URL="${ARMADA_INTERNAL_URL:-armada://armada-server.armada:50051}"
DEPLOY_MODE_ARGS=(
--conf spark.armada.internalUrl=$ARMADA_INTERNAL_URL
)
fi

# Run Armada Spark via docker image
docker run -e SPARK_PRINT_LAUNCH_COMMAND=true -v $scripts/../conf:/opt/spark/conf --rm --network host $IMAGE_NAME \
/opt/spark/bin/spark-submit \
--master $ARMADA_MASTER --deploy-mode cluster \
--master $ARMADA_MASTER --deploy-mode $DEPLOY_MODE \
--name $NAME \
$CLASS_PROMPT $CLASS_ARG \
$AUTH_ARG \
Expand All @@ -97,6 +116,6 @@ docker run -e SPARK_PRINT_LAUNCH_COMMAND=true -v $scripts/../conf:/opt/spark/con
--conf spark.kubernetes.file.upload.path=/tmp \
--conf spark.kubernetes.executor.disableConfigMap=$DISABLE_CONFIG_MAP \
--conf spark.local.dir=/tmp \
--conf spark.armada.internalUrl=$ARMADA_INTERNAL_URL \
"${DEPLOY_MODE_ARGS[@]}" \
"${EXTRA_CONF[@]}" \
$FIRST_ARG "${FINAL_ARGS[@]}"
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.deploy.armada

import org.apache.spark.SparkConf
import org.apache.spark.deploy.armada.Config._
import org.apache.spark.scheduler.cluster.SchedulerBackendUtils

/** Helper trait that encapsulates deployment mode-specific behavior for Armada Spark jobs.
Expand Down Expand Up @@ -45,6 +46,35 @@ trait DeploymentModeHelper {
* The total number of pods in the gang (executors + driver if applicable)
*/
def getGangCardinality: Int

/** Returns whether the driver runs inside the cluster (cluster mode) or externally (client mode).
*
* @return
* true if driver runs in cluster, false if driver runs externally
*/
def isDriverInCluster: Boolean

/** Returns whether executors should be proactively requested at startup.
*
* This is typically true for client mode with static allocation, where executors need to be
* requested immediately since the driver is already running.
*
* @return
* true if executors should be proactively requested, false otherwise
*/
def shouldProactivelyRequestExecutors: Boolean

/** Returns the source for jobSetId based on deployment mode.
*
* In cluster mode, jobSetId comes from environment variable ARMADA_JOB_SET_ID. In client mode,
* jobSetId comes from config or falls back to application ID.
*
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember the reason why these are different between client and cluster mode. Is there a reason why we shouldn't just make them both come from the config/appId?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the specific reason, but in a general sense, I don't see any reason for them to handle differently based on modes. Should come from config regardless of modes

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Please make it so.

* @param applicationId
* Spark application ID to use as fallback
* @return
* Optional jobSetId string
*/
def getJobSetIdSource(applicationId: String): Option[String]
}

/** Static allocation in cluster mode.
Expand All @@ -54,7 +84,7 @@ trait DeploymentModeHelper {
* - The driver runs as a pod inside the cluster
* - Gang cardinality includes both driver and executors
*/
class StaticCluster(conf: SparkConf) extends DeploymentModeHelper {
class StaticCluster(val conf: SparkConf) extends DeploymentModeHelper {
override def getExecutorCount: Int = {
SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
}
Expand All @@ -63,6 +93,14 @@ class StaticCluster(conf: SparkConf) extends DeploymentModeHelper {
// In cluster mode, include the driver pod in gang scheduling
getExecutorCount + 1
}

override def isDriverInCluster: Boolean = true

override def shouldProactivelyRequestExecutors: Boolean = false

override def getJobSetIdSource(applicationId: String): Option[String] = {
sys.env.get("ARMADA_JOB_SET_ID")
}
}

/** Static allocation in client mode.
Expand All @@ -72,7 +110,7 @@ class StaticCluster(conf: SparkConf) extends DeploymentModeHelper {
* - The driver runs on the client machine (outside the cluster)
* - Gang cardinality includes only executors
*/
class StaticClient(conf: SparkConf) extends DeploymentModeHelper {
class StaticClient(val conf: SparkConf) extends DeploymentModeHelper {
override def getExecutorCount: Int = {
SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
}
Expand All @@ -81,6 +119,14 @@ class StaticClient(conf: SparkConf) extends DeploymentModeHelper {
// In client mode, driver runs externally, so only count executors
getExecutorCount
}

override def isDriverInCluster: Boolean = false

override def shouldProactivelyRequestExecutors: Boolean = true

override def getJobSetIdSource(applicationId: String): Option[String] = {
conf.get(ARMADA_JOB_SET_ID).orElse(Some(applicationId))
}
}

/** Dynamic allocation in cluster mode.
Expand All @@ -91,7 +137,7 @@ class StaticClient(conf: SparkConf) extends DeploymentModeHelper {
* - Initial allocation uses the configured minimum executor count
* - Gang cardinality includes both driver and minimum executors
*/
class DynamicCluster(conf: SparkConf) extends DeploymentModeHelper {
class DynamicCluster(val conf: SparkConf) extends DeploymentModeHelper {
override def getExecutorCount: Int = {
// For dynamic allocation, use minExecutors as the initial count
conf.getInt(
Expand All @@ -104,6 +150,14 @@ class DynamicCluster(conf: SparkConf) extends DeploymentModeHelper {
// In cluster mode, include the driver pod in gang scheduling
getExecutorCount + 1
}

override def isDriverInCluster: Boolean = true

override def shouldProactivelyRequestExecutors: Boolean = false

override def getJobSetIdSource(applicationId: String): Option[String] = {
sys.env.get("ARMADA_JOB_SET_ID")
}
}

/** Dynamic allocation in client mode.
Expand All @@ -114,7 +168,7 @@ class DynamicCluster(conf: SparkConf) extends DeploymentModeHelper {
* - Initial allocation uses the configured minimum executor count
* - Gang cardinality includes only minimum executors
*/
class DynamicClient(conf: SparkConf) extends DeploymentModeHelper {
class DynamicClient(val conf: SparkConf) extends DeploymentModeHelper {
override def getExecutorCount: Int = {
// For dynamic allocation, use minExecutors as the initial count
conf.getInt(
Expand All @@ -127,6 +181,14 @@ class DynamicClient(conf: SparkConf) extends DeploymentModeHelper {
// In client mode, driver runs externally, so only count executors
getExecutorCount
}

override def isDriverInCluster: Boolean = false

override def shouldProactivelyRequestExecutors: Boolean = false

override def getJobSetIdSource(applicationId: String): Option[String] = {
conf.get(ARMADA_JOB_SET_ID).orElse(Some(applicationId))
}
}

/** Factory for creating ModeHelper instances based on Spark configuration.
Expand Down
Loading
Loading