From 762769104d8547639fafd71bf91329c503dc789f Mon Sep 17 00:00:00 2001 From: Mate Czagany Date: Sat, 10 Aug 2024 17:07:33 +0200 Subject: [PATCH] [FLINK-35266][snapshot] Add E2E tests for FlinkStateSnapshot --- .github/workflows/ci.yml | 12 +++ e2e-tests/data/savepoint.yaml | 32 ++++++ e2e-tests/test_autoscaler.sh | 0 e2e-tests/test_dynamic_config.sh | 0 e2e-tests/test_flink_operator_ha.sh | 0 e2e-tests/test_snapshot.sh | 150 ++++++++++++++++++++++++++++ e2e-tests/utils.sh | 21 ++++ 7 files changed, 215 insertions(+) create mode 100644 e2e-tests/data/savepoint.yaml mode change 100644 => 100755 e2e-tests/test_autoscaler.sh mode change 100644 => 100755 e2e-tests/test_dynamic_config.sh mode change 100644 => 100755 e2e-tests/test_flink_operator_ha.sh create mode 100755 e2e-tests/test_snapshot.sh diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f317fc5801..928ad76e75 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -87,6 +87,8 @@ jobs: - test_multi_sessionjob.sh - test_autoscaler.sh - test_flink_operator_ha.sh + - test_application_kubernetes_ha.sh + - test_snapshot.sh include: - namespace: flink extraArgs: '--create-namespace --set "watchNamespaces={default,flink}"' @@ -111,26 +113,36 @@ jobs: test: test_autoscaler.sh - mode: standalone test: test_dynamic_config.sh + - mode: standalone + test: test_snapshot.sh - version: v1_15 test: test_autoscaler.sh - version: v1_15 test: test_dynamic_config.sh - version: v1_15 test: test_flink_operator_ha.sh + - version: v1_15 + test: test_snapshot.sh - version: v1_16 test: test_autoscaler.sh - version: v1_16 test: test_dynamic_config.sh - version: v1_16 test: test_flink_operator_ha.sh + - version: v1_16 + test: test_snapshot.sh - version: v1_17 test: test_dynamic_config.sh - version: v1_17 test: test_flink_operator_ha.sh + - version: v1_17 + test: test_snapshot.sh - version: v1_18 test: test_dynamic_config.sh - version: v1_18 test: test_flink_operator_ha.sh + - version: v1_18 + test: test_snapshot.sh - version: v1_15 java-version: 17 - version: v1_16 diff --git a/e2e-tests/data/savepoint.yaml b/e2e-tests/data/savepoint.yaml new file mode 100644 index 0000000000..e32e929148 --- /dev/null +++ b/e2e-tests/data/savepoint.yaml @@ -0,0 +1,32 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +apiVersion: flink.apache.org/v1beta1 +kind: FlinkStateSnapshot +metadata: + name: example-savepoint +spec: + backoffLimit: 0 + jobReference: + kind: FlinkDeployment + name: flink-example-statemachine + savepoint: + alreadyExists: false + disposeOnDelete: true + formatType: CANONICAL + diff --git a/e2e-tests/test_autoscaler.sh b/e2e-tests/test_autoscaler.sh old mode 100644 new mode 100755 diff --git a/e2e-tests/test_dynamic_config.sh b/e2e-tests/test_dynamic_config.sh old mode 100644 new mode 100755 diff --git a/e2e-tests/test_flink_operator_ha.sh b/e2e-tests/test_flink_operator_ha.sh old mode 100644 new mode 100755 diff --git a/e2e-tests/test_snapshot.sh b/e2e-tests/test_snapshot.sh new file mode 100755 index 0000000000..f321df6b9a --- /dev/null +++ b/e2e-tests/test_snapshot.sh @@ -0,0 +1,150 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# This script tests the FlinkStateSnapshot CR as follows: +# 1. Trigger savepoint by creating a new FlinkStateSnapshot savepoint CR +# 2. Trigger savepoint by using savepoint trigger nonce +# 3. Trigger checkpoint by using trigger nonce +# 4. Test periodic savepoints triggered by the operator +# 5. Change job to upgrade mode, suspend job and assert new FlinkStateSnapshot CR created + +SCRIPT_DIR=$(dirname "$(readlink -f "$0")") +source "${SCRIPT_DIR}/utils.sh" + +CLUSTER_ID="flink-example-statemachine" +APPLICATION_YAML="${SCRIPT_DIR}/data/flinkdep-cr.yaml" +APPLICATION_IDENTIFIER="flinkdep/$CLUSTER_ID" + +SAVEPOINT_YAML="${SCRIPT_DIR}/data/savepoint.yaml" +SAVEPOINT_IDENTIFIER="flinksnp/example-savepoint" + +TIMEOUT=300 + +on_exit cleanup_and_exit "$APPLICATION_YAML" $TIMEOUT $CLUSTER_ID + +retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1 + +wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT +jm_pod_name=$(get_jm_pod_name $CLUSTER_ID) + +wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 +wait_for_status $APPLICATION_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1 +wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1 + +kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job":{"upgradeMode": "savepoint"},"flinkConfiguration":{"kubernetes.operator.snapshot.resource.enabled":"true"}}}' + +job_id=$(kubectl logs $jm_pod_name -c flink-main-container | grep -E -o 'Job [a-z0-9]+ is submitted' | awk '{print $2}') +echo "Found job ID $job_id" + + + +# Testing manual savepoint via CR +echo "Creating manual savepoint..." +retry_times 5 30 "kubectl apply -f $SAVEPOINT_YAML" || exit 1 +wait_for_status $SAVEPOINT_IDENTIFIER '.status.state' "COMPLETED" $TIMEOUT || exit 1 + +location=$(kubectl get $SAVEPOINT_IDENTIFIER -o yaml | yq '.status.path') +if [ "$location" == "" ];then + echo "Manual savepoint location was empty" + exit 1 +fi + +echo "Disposing manual savepoint..." +kubectl delete $SAVEPOINT_IDENTIFIER +wait_for_logs $jm_pod_name "Disposing savepoint $location" ${TIMEOUT} || exit 1 + + + +# Testing manual savepoint via trigger nonce +kubectl patch $APPLICATION_IDENTIFIER --type merge --patch '{"spec":{"job": {"savepointTriggerNonce": 123456 } } }' + +echo "Waiting for manual savepoint..." +snapshot=$(wait_for_snapshot $CLUSTER_ID "savepoint" "manual" ${TIMEOUT}) +echo "Found snapshot with name $snapshot" + +wait_for_status flinksnp/$snapshot '.status.spec.checkpoint' null $TIMEOUT || exit 1 +wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.savepointInfo.triggerId' null $TIMEOUT || exit 1 +wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.savepointInfo.triggerTimestamp' null $TIMEOUT || exit 1 +if [ "$(kubectl get flinksnp/$snapshot -o yaml | yq '.status.path')" == "" ];then + echo "Manual savepoint location was empty" + exit 1 +fi + + + +# Testing manual checkpoint via trigger nonce +kubectl patch $APPLICATION_IDENTIFIER --type merge --patch '{"spec":{"job": {"checkpointTriggerNonce": 123456 } } }' + +echo "Waiting for manual checkpoint..." +snapshot=$(wait_for_snapshot $CLUSTER_ID "checkpoint" "manual" ${TIMEOUT}) +echo "Found checkpoint with name $snapshot" + +wait_for_status flinksnp/$snapshot '.status.spec.savepoint' null $TIMEOUT || exit 1 +if [ "$(kubectl get flinksnp/$snapshot -o yaml | yq '.status.path')" == "" ];then + echo "Manual checkpoint location was empty" + exit 1 +fi + + +# Test periodic savepoints +kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"flinkConfiguration":{"kubernetes.operator.periodic.savepoint.interval":"5s"}}}' + +echo "Waiting for periodic savepoint..." +snapshot=$(wait_for_snapshot $CLUSTER_ID "savepoint" "periodic" ${TIMEOUT}) +echo "Found periodic savepoint: $snapshot" +if [ "$(kubectl get flinksnp/$snapshot -o yaml | yq '.status.path')" == "" ];then + echo "Periodic savepoint location was empty" + exit 1 +fi +kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"flinkConfiguration":{"kubernetes.operator.periodic.savepoint.interval":""}}}' + + +# Test upgrade savepoint +echo "Suspending deployment..." +kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job": {"state": "suspended" } } }' +wait_for_status $APPLICATION_IDENTIFIER '.status.lifecycleState' "SUSPENDED" ${TIMEOUT} || exit 1 + +echo "Waiting for upgrade savepoint..." +snapshot=$(wait_for_snapshot $CLUSTER_ID "savepoint" "upgrade" ${TIMEOUT}) +echo "Found upgrade snapshot: $snapshot" +wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.upgradeSnapshotReference.name' "$snapshot" ${TIMEOUT} || exit 1 + +location=$(kubectl get flinksnp/$snapshot -o yaml | yq '.status.path') +if [ "$location" == "" ];then + echo "Upgrade savepoint location was empty" + exit 1 +fi + + + +echo "Restarting deployment..." +kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job": {"state": "running" } } }' +wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT +wait_for_status $APPLICATION_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1 +wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1 + +jm_pod_name=$(get_jm_pod_name $CLUSTER_ID) + +# Check the new JobManager recovering from latest successful checkpoint +wait_for_logs $jm_pod_name "Restoring job .* from Savepoint" ${TIMEOUT} || exit 1 +wait_for_logs $jm_pod_name "execution.savepoint.path, ${location}" ${TIMEOUT} || exit 1 +wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1 + +echo "Successfully run the FlinkStateSnapshot test" + diff --git a/e2e-tests/utils.sh b/e2e-tests/utils.sh index fe48acca0a..dcd6d67027 100755 --- a/e2e-tests/utils.sh +++ b/e2e-tests/utils.sh @@ -97,6 +97,27 @@ function wait_for_event { exit 1 } +function wait_for_snapshot { + local job_name=$1 + local snapshot_type=$2 + local trigger_type=$3 + local timeout=$4 + local prefix="$job_name-$snapshot_type-$trigger_type" + + for i in $(seq 1 ${timeout}); do + snapshot_name=$(kubectl get flinksnp | grep $prefix | awk '{print $1}' | sort -r | head -n 1) + if [ "$snapshot_name" ]; then + kubectl wait --timeout=${timeout}s --for=jsonpath='{.status.state}'=COMPLETED flinksnp/$snapshot_name > /dev/null || exit 1 + echo "$snapshot_name" + return 0 + fi + + sleep 1 + done + echo "Could not find snapshot with prefix $prefix with timeout of ${timeout}." + exit 1 +} + function assert_available_slots() { expected=$1 CLUSTER_ID=$2