Skip to content

Commit

Permalink
test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora committed Aug 29, 2024
1 parent b2dea76 commit 24f0742
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
7 changes: 5 additions & 2 deletions e2e-tests/test_sessionjob_operations.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ if [ "$location" == "" ];then
exit 1
fi

echo "Starting sessionjob savepoint upgrade test"
# Testing savepoint mode upgrade
# Update the FlinkSessionJob and trigger the savepoint upgrade
kubectl patch sessionjob ${SESSION_JOB_NAME} --type merge --patch '{"spec":{"job": {"parallelism": 1 } } }'
Expand All @@ -67,15 +68,17 @@ assert_available_slots 1 $CLUSTER_ID

echo "Successfully run the sessionjob savepoint upgrade test"

echo "Starting sessionjob last-state upgrade test"
# Testing last-state mode upgrade
# Update the FlinkSessionJob and trigger the last-state upgrade
kubectl patch sessionjob ${SESSION_JOB_NAME} --type merge --patch '{"spec":{"job": {"parallelism": 2, "upgradeMode": last-state } } }'
kubectl patch sessionjob ${SESSION_JOB_NAME} --type merge --patch '{"spec":{"job": {"parallelism": 2, "upgradeMode": "last-state" } } }'

# Check the job was restarted with the new parallelism
wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' CANCELLING ${TIMEOUT} || exit 1
wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
assert_available_slots 0 $CLUSTER_ID

echo "Successfully run the sessionjob savepoint upgrade test"
echo "Successfully run the sessionjob last-state upgrade test"

# Test Operator restart
echo "Delete session job " + $SESSION_JOB_NAME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
Expand All @@ -47,10 +48,10 @@
public class JobStatusObserverTest extends OperatorTestBase {

@Getter private KubernetesClient kubernetesClient;
private JobStatusObserver<FlinkDeployment> observer;
private JobStatusObserver<AbstractFlinkResource<?, ?>> observer;

@Override
public void setup() {
protected void setup() {
observer = new JobStatusObserver<>(eventRecorder);
}

Expand All @@ -70,10 +71,8 @@ void testCancellingToMissing(
.getJob()
.getState());
observer.observe(
(FlinkResourceContext)
getResourceContext(
job,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient)));
getResourceContext(
job, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient)));
assertEquals(
JobStatusObserver.JOB_NOT_FOUND_ERR,
flinkResourceEventCollector.events.poll().getMessage());
Expand All @@ -88,6 +87,7 @@ void testCancellingToMissing(
@ParameterizedTest
@EnumSource(value = JobStatus.class, mode = EnumSource.Mode.EXCLUDE, names = "CANCELED")
void testCancellingToTerminal(JobStatus fromStatus) throws Exception {
var observer = new JobStatusObserver<>(eventRecorder);
var deployment = initDeployment();
var status = deployment.getStatus();
var jobStatus = status.getJobStatus();
Expand All @@ -98,7 +98,7 @@ void testCancellingToTerminal(JobStatus fromStatus) throws Exception {
.deserializeLastReconciledSpec()
.getJob()
.getState());
var ctx = getResourceContext(deployment);
FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx = getResourceContext(deployment);
flinkService.submitApplicationCluster(
deployment.getSpec().getJob(), ctx.getDeployConfig(deployment.getSpec()), false);
flinkService.cancelJob(JobID.fromHexString(jobStatus.getJobId()), false);
Expand Down

0 comments on commit 24f0742

Please sign in to comment.