Skip to content

Commit

Permalink
[FLINK-35266][snapshot] Add E2E tests for FlinkStateSnapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
mateczagany committed Aug 10, 2024
1 parent 8479f22 commit 7627691
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 0 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ jobs:
- test_multi_sessionjob.sh
- test_autoscaler.sh
- test_flink_operator_ha.sh
- test_application_kubernetes_ha.sh
- test_snapshot.sh
include:
- namespace: flink
extraArgs: '--create-namespace --set "watchNamespaces={default,flink}"'
Expand All @@ -111,26 +113,36 @@ jobs:
test: test_autoscaler.sh
- mode: standalone
test: test_dynamic_config.sh
- mode: standalone
test: test_snapshot.sh
- version: v1_15
test: test_autoscaler.sh
- version: v1_15
test: test_dynamic_config.sh
- version: v1_15
test: test_flink_operator_ha.sh
- version: v1_15
test: test_snapshot.sh
- version: v1_16
test: test_autoscaler.sh
- version: v1_16
test: test_dynamic_config.sh
- version: v1_16
test: test_flink_operator_ha.sh
- version: v1_16
test: test_snapshot.sh
- version: v1_17
test: test_dynamic_config.sh
- version: v1_17
test: test_flink_operator_ha.sh
- version: v1_17
test: test_snapshot.sh
- version: v1_18
test: test_dynamic_config.sh
- version: v1_18
test: test_flink_operator_ha.sh
- version: v1_18
test: test_snapshot.sh
- version: v1_15
java-version: 17
- version: v1_16
Expand Down
32 changes: 32 additions & 0 deletions e2e-tests/data/savepoint.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

apiVersion: flink.apache.org/v1beta1
kind: FlinkStateSnapshot
metadata:
name: example-savepoint
spec:
backoffLimit: 0
jobReference:
kind: FlinkDeployment
name: flink-example-statemachine
savepoint:
alreadyExists: false
disposeOnDelete: true
formatType: CANONICAL

Empty file modified e2e-tests/test_autoscaler.sh
100644 → 100755
Empty file.
Empty file modified e2e-tests/test_dynamic_config.sh
100644 → 100755
Empty file.
Empty file modified e2e-tests/test_flink_operator_ha.sh
100644 → 100755
Empty file.
150 changes: 150 additions & 0 deletions e2e-tests/test_snapshot.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# This script tests the FlinkStateSnapshot CR as follows:
# 1. Trigger savepoint by creating a new FlinkStateSnapshot savepoint CR
# 2. Trigger savepoint by using savepoint trigger nonce
# 3. Trigger checkpoint by using trigger nonce
# 4. Test periodic savepoints triggered by the operator
# 5. Change job to upgrade mode, suspend job and assert new FlinkStateSnapshot CR created

SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
source "${SCRIPT_DIR}/utils.sh"

CLUSTER_ID="flink-example-statemachine"
APPLICATION_YAML="${SCRIPT_DIR}/data/flinkdep-cr.yaml"
APPLICATION_IDENTIFIER="flinkdep/$CLUSTER_ID"

SAVEPOINT_YAML="${SCRIPT_DIR}/data/savepoint.yaml"
SAVEPOINT_IDENTIFIER="flinksnp/example-savepoint"

TIMEOUT=300

on_exit cleanup_and_exit "$APPLICATION_YAML" $TIMEOUT $CLUSTER_ID

retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1

wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT
jm_pod_name=$(get_jm_pod_name $CLUSTER_ID)

wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1
wait_for_status $APPLICATION_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1

kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job":{"upgradeMode": "savepoint"},"flinkConfiguration":{"kubernetes.operator.snapshot.resource.enabled":"true"}}}'

job_id=$(kubectl logs $jm_pod_name -c flink-main-container | grep -E -o 'Job [a-z0-9]+ is submitted' | awk '{print $2}')
echo "Found job ID $job_id"



# Testing manual savepoint via CR
echo "Creating manual savepoint..."
retry_times 5 30 "kubectl apply -f $SAVEPOINT_YAML" || exit 1
wait_for_status $SAVEPOINT_IDENTIFIER '.status.state' "COMPLETED" $TIMEOUT || exit 1

location=$(kubectl get $SAVEPOINT_IDENTIFIER -o yaml | yq '.status.path')
if [ "$location" == "" ];then
echo "Manual savepoint location was empty"
exit 1
fi

echo "Disposing manual savepoint..."
kubectl delete $SAVEPOINT_IDENTIFIER
wait_for_logs $jm_pod_name "Disposing savepoint $location" ${TIMEOUT} || exit 1



# Testing manual savepoint via trigger nonce
kubectl patch $APPLICATION_IDENTIFIER --type merge --patch '{"spec":{"job": {"savepointTriggerNonce": 123456 } } }'

echo "Waiting for manual savepoint..."
snapshot=$(wait_for_snapshot $CLUSTER_ID "savepoint" "manual" ${TIMEOUT})
echo "Found snapshot with name $snapshot"

wait_for_status flinksnp/$snapshot '.status.spec.checkpoint' null $TIMEOUT || exit 1
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.savepointInfo.triggerId' null $TIMEOUT || exit 1
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.savepointInfo.triggerTimestamp' null $TIMEOUT || exit 1
if [ "$(kubectl get flinksnp/$snapshot -o yaml | yq '.status.path')" == "" ];then
echo "Manual savepoint location was empty"
exit 1
fi



# Testing manual checkpoint via trigger nonce
kubectl patch $APPLICATION_IDENTIFIER --type merge --patch '{"spec":{"job": {"checkpointTriggerNonce": 123456 } } }'

echo "Waiting for manual checkpoint..."
snapshot=$(wait_for_snapshot $CLUSTER_ID "checkpoint" "manual" ${TIMEOUT})
echo "Found checkpoint with name $snapshot"

wait_for_status flinksnp/$snapshot '.status.spec.savepoint' null $TIMEOUT || exit 1
if [ "$(kubectl get flinksnp/$snapshot -o yaml | yq '.status.path')" == "" ];then
echo "Manual checkpoint location was empty"
exit 1
fi


# Test periodic savepoints
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"flinkConfiguration":{"kubernetes.operator.periodic.savepoint.interval":"5s"}}}'

echo "Waiting for periodic savepoint..."
snapshot=$(wait_for_snapshot $CLUSTER_ID "savepoint" "periodic" ${TIMEOUT})
echo "Found periodic savepoint: $snapshot"
if [ "$(kubectl get flinksnp/$snapshot -o yaml | yq '.status.path')" == "" ];then
echo "Periodic savepoint location was empty"
exit 1
fi
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"flinkConfiguration":{"kubernetes.operator.periodic.savepoint.interval":""}}}'


# Test upgrade savepoint
echo "Suspending deployment..."
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job": {"state": "suspended" } } }'
wait_for_status $APPLICATION_IDENTIFIER '.status.lifecycleState' "SUSPENDED" ${TIMEOUT} || exit 1

echo "Waiting for upgrade savepoint..."
snapshot=$(wait_for_snapshot $CLUSTER_ID "savepoint" "upgrade" ${TIMEOUT})
echo "Found upgrade snapshot: $snapshot"
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.upgradeSnapshotReference.name' "$snapshot" ${TIMEOUT} || exit 1

location=$(kubectl get flinksnp/$snapshot -o yaml | yq '.status.path')
if [ "$location" == "" ];then
echo "Upgrade savepoint location was empty"
exit 1
fi



echo "Restarting deployment..."
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job": {"state": "running" } } }'
wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT
wait_for_status $APPLICATION_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1

jm_pod_name=$(get_jm_pod_name $CLUSTER_ID)

# Check the new JobManager recovering from latest successful checkpoint
wait_for_logs $jm_pod_name "Restoring job .* from Savepoint" ${TIMEOUT} || exit 1
wait_for_logs $jm_pod_name "execution.savepoint.path, ${location}" ${TIMEOUT} || exit 1
wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1

echo "Successfully run the FlinkStateSnapshot test"

21 changes: 21 additions & 0 deletions e2e-tests/utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,27 @@ function wait_for_event {
exit 1
}

function wait_for_snapshot {
local job_name=$1
local snapshot_type=$2
local trigger_type=$3
local timeout=$4
local prefix="$job_name-$snapshot_type-$trigger_type"

for i in $(seq 1 ${timeout}); do
snapshot_name=$(kubectl get flinksnp | grep $prefix | awk '{print $1}' | sort -r | head -n 1)
if [ "$snapshot_name" ]; then
kubectl wait --timeout=${timeout}s --for=jsonpath='{.status.state}'=COMPLETED flinksnp/$snapshot_name > /dev/null || exit 1
echo "$snapshot_name"
return 0
fi

sleep 1
done
echo "Could not find snapshot with prefix $prefix with timeout of ${timeout}."
exit 1
}

function assert_available_slots() {
expected=$1
CLUSTER_ID=$2
Expand Down

0 comments on commit 7627691

Please sign in to comment.