diff --git a/e2e-tests/test_sessionjob_operations.sh b/e2e-tests/test_sessionjob_operations.sh index 80a6bd050..c2d8fdcc8 100755 --- a/e2e-tests/test_sessionjob_operations.sh +++ b/e2e-tests/test_sessionjob_operations.sh @@ -68,17 +68,23 @@ assert_available_slots 1 $CLUSTER_ID echo "Successfully run the sessionjob savepoint upgrade test" -echo "Starting sessionjob last-state upgrade test" -# Testing last-state mode upgrade -# Update the FlinkSessionJob and trigger the last-state upgrade -kubectl patch sessionjob ${SESSION_JOB_NAME} --type merge --patch '{"spec":{"job": {"parallelism": 2, "upgradeMode": "last-state" } } }' +flink_version=$(kubectl get $SESSION_CLUSTER_IDENTIFIER -o yaml | yq '.spec.flinkVersion') -# Check the job was restarted with the new parallelism -wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' CANCELLING ${TIMEOUT} || exit 1 -wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1 -assert_available_slots 0 $CLUSTER_ID +if [ "$flink_version" != "v1_16" ]; then + echo "Starting sessionjob last-state upgrade test" + # Testing last-state mode upgrade + # Update the FlinkSessionJob and trigger the last-state upgrade + kubectl patch sessionjob ${SESSION_JOB_NAME} --type merge --patch '{"spec":{"job": {"parallelism": 2, "upgradeMode": "last-state" } } }' -echo "Successfully run the sessionjob last-state upgrade test" + # Check the job was restarted with the new parallelism + wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' CANCELLING ${TIMEOUT} || exit 1 + wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1 + assert_available_slots 0 $CLUSTER_ID + + echo "Successfully run the sessionjob last-state upgrade test" +else + echo "Skipping last-state test for flink version 1.16" +fi # Test Operator restart echo "Delete session job " + $SESSION_JOB_NAME