@@ -68,17 +68,23 @@ assert_available_slots 1 $CLUSTER_ID
68
68
69
69
echo " Successfully run the sessionjob savepoint upgrade test"
70
70
71
- echo " Starting sessionjob last-state upgrade test"
72
- # Testing last-state mode upgrade
73
- # Update the FlinkSessionJob and trigger the last-state upgrade
74
- kubectl patch sessionjob ${SESSION_JOB_NAME} --type merge --patch ' {"spec":{"job": {"parallelism": 2, "upgradeMode": "last-state" } } }'
71
+ flink_version=$( kubectl get $SESSION_CLUSTER_IDENTIFIER -o yaml | yq ' .spec.flinkVersion' )
75
72
76
- # Check the job was restarted with the new parallelism
77
- wait_for_status $SESSION_JOB_IDENTIFIER ' .status.jobStatus.state' CANCELLING ${TIMEOUT} || exit 1
78
- wait_for_status $SESSION_JOB_IDENTIFIER ' .status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
79
- assert_available_slots 0 $CLUSTER_ID
73
+ if [ " $flink_version " != " v1_16" ]; then
74
+ echo " Starting sessionjob last-state upgrade test"
75
+ # Testing last-state mode upgrade
76
+ # Update the FlinkSessionJob and trigger the last-state upgrade
77
+ kubectl patch sessionjob ${SESSION_JOB_NAME} --type merge --patch ' {"spec":{"job": {"parallelism": 2, "upgradeMode": "last-state" } } }'
80
78
81
- echo " Successfully run the sessionjob last-state upgrade test"
79
+ # Check the job was restarted with the new parallelism
80
+ wait_for_status $SESSION_JOB_IDENTIFIER ' .status.jobStatus.state' CANCELLING ${TIMEOUT} || exit 1
81
+ wait_for_status $SESSION_JOB_IDENTIFIER ' .status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
82
+ assert_available_slots 0 $CLUSTER_ID
83
+
84
+ echo " Successfully run the sessionjob last-state upgrade test"
85
+ else
86
+ echo " Skipping last-state test for flink version 1.16"
87
+ fi
82
88
83
89
# Test Operator restart
84
90
echo " Delete session job " + $SESSION_JOB_NAME
0 commit comments