From d9282263baad879a6a00f9614d985a0e1a47740c Mon Sep 17 00:00:00 2001 From: Gyula Fora Date: Wed, 28 Aug 2024 11:22:30 +0200 Subject: [PATCH] [FLINK-36162] Remove flinkStateSnapshotReference and namespace from FlinkStateSnapshot jobReference --- docs/content/docs/concepts/controller-flow.md | 2 +- .../docs/custom-resource/job-management.md | 9 +- .../content/docs/custom-resource/reference.md | 17 +-- .../content/docs/custom-resource/snapshots.md | 10 +- docs/content/docs/operations/upgrade.md | 7 +- e2e-tests/test_snapshot.sh | 10 +- examples/snapshot/job-from-savepoint.yaml | 4 +- .../api/spec/FlinkStateSnapshotReference.java | 58 -------- .../operator/api/spec/JobReference.java | 9 +- .../kubernetes/operator/api/spec/JobSpec.java | 13 +- .../operator/api/status/JobStatus.java | 3 +- .../operator/observer/SnapshotObserver.java | 6 +- .../AbstractFlinkResourceReconciler.java | 24 +--- .../deployment/AbstractJobReconciler.java | 68 +++------ .../deployment/ApplicationReconciler.java | 7 +- .../sessionjob/SessionJobReconciler.java | 2 +- .../operator/utils/EventSourceUtils.java | 6 +- .../utils/FlinkStateSnapshotUtils.java | 88 ++---------- .../operator/utils/SnapshotUtils.java | 30 ++-- .../operator/validation/DefaultValidator.java | 11 +- .../FlinkDeploymentControllerTest.java | 15 +- .../FlinkSessionJobControllerTest.java | 18 +-- .../FlinkStateSnapshotControllerTest.java | 12 +- .../deployment/ApplicationObserverTest.java | 4 +- .../deployment/ApplicationReconcilerTest.java | 23 ++-- .../ApplicationReconcilerUpgradeModeTest.java | 72 +++------- .../sessionjob/SessionJobReconcilerTest.java | 24 ++-- .../service/AbstractFlinkServiceTest.java | 4 +- .../utils/FlinkStateSnapshotUtilsTest.java | 129 ++---------------- .../operator/utils/SnapshotUtilsTest.java | 13 +- .../validation/DefaultValidatorTest.java | 22 +-- .../flinkdeployments.flink.apache.org-v1.yml | 20 +-- .../flinksessionjobs.flink.apache.org-v1.yml | 20 +-- ...linkstatesnapshots.flink.apache.org-v1.yml | 2 - 34 files changed, 149 insertions(+), 613 deletions(-) delete mode 100644 flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkStateSnapshotReference.java diff --git a/docs/content/docs/concepts/controller-flow.md b/docs/content/docs/concepts/controller-flow.md index 8e8d84c00a..b6d876baab 100644 --- a/docs/content/docs/concepts/controller-flow.md +++ b/docs/content/docs/concepts/controller-flow.md @@ -98,7 +98,7 @@ It’s very important to understand that the Observer phase records a point-in-t The `AbstractFlinkResourceReconciler` encapsulates the core reconciliation flow for all Flink resource types. Let’s take a look at the high level flow before we go into specifics for session, application and session job resources. 1. Check if the resource is ready for reconciliation or if there are any pending operations that should not be interrupted (manual savepoints for example) -2. If this is the first deployment attempt for the resource, we simply deploy it. It’s important to note here that this is the only deploy operation where we use the `flinkStateSnapshotReference` provided in the spec. +2. If this is the first deployment attempt for the resource, we simply deploy it. It’s important to note here that this is the only deploy operation where we use the `initialSavepointPath` provided in the spec. 3. Next we determine if the desired spec changed and the type of change: `IGNORE, SCALE, UPGRADE`. Only for scale and upgrade type changes do we need to execute further reconciliation logic. 4. If we have upgrade/scale spec changes we execute the upgrade logic specific for the resource type 5. If we did not receive any spec change we still have to ensure that the currently deployed resources are fully reconciled: diff --git a/docs/content/docs/custom-resource/job-management.md b/docs/content/docs/custom-resource/job-management.md index 790c825aa7..231f92a873 100644 --- a/docs/content/docs/custom-resource/job-management.md +++ b/docs/content/docs/custom-resource/job-management.md @@ -247,17 +247,16 @@ Users have two options to restore a job from a target savepoint / checkpoint ### Redeploy using the savepointRedeployNonce -It is possible to redeploy a `FlinkDeployment` or `FlinkSessionJob` resource from a target savepoint by using the combination of `savepointRedeployNonce` and `flinkStateSnapshotReference` in the job spec: +It is possible to redeploy a `FlinkDeployment` or `FlinkSessionJob` resource from a target savepoint by using the combination of `savepointRedeployNonce` and `initialSavepointPath` in the job spec: ```yaml job: - flinkStateSnapshotReference: - path: file://redeploy-target-savepoint + initialSavepointPath: file://redeploy-target-savepoint # If not set previously, set to 1, otherwise increment, e.g. 2 savepointRedeployNonce: 1 ``` -When changing the `savepointRedeployNonce` the operator will redeploy the job to the savepoint defined in the `flinkStateSnapshotReference`. The savepoint path must not be empty. +When changing the `savepointRedeployNonce` the operator will redeploy the job to the savepoint defined in the `initialSavepointPath`. The savepoint path must not be empty. {{< hint warning >}} Rollbacks are not supported after redeployments. @@ -271,7 +270,7 @@ However, this also means that savepoint history is lost and the operator won't c 1. Locate the latest checkpoint/savepoint metafile in your configured checkpoint/savepoint directory. 2. Delete the `FlinkDeployment` resource for your application 3. Check that you have the current savepoint, and that your `FlinkDeployment` is deleted completely - 4. Modify your `FlinkDeployment` JobSpec and set `flinkStateSnapshotReference.path` to your last checkpoint location + 4. Modify your `FlinkDeployment` JobSpec and set `initialSavepointPath` to your last checkpoint location 5. Recreate the deployment These steps ensure that the operator will start completely fresh from the user defined savepoint path and can hopefully fully recover. diff --git a/docs/content/docs/custom-resource/reference.md b/docs/content/docs/custom-resource/reference.md index 3348f46e57..12e1d88c7d 100644 --- a/docs/content/docs/custom-resource/reference.md +++ b/docs/content/docs/custom-resource/reference.md @@ -90,17 +90,6 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r | flinkConfiguration | java.util.Map | Flink configuration overrides for the Flink deployment or Flink session job. | | deploymentName | java.lang.String | The name of the target session cluster deployment. | -### FlinkStateSnapshotReference -**Class**: org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference - -**Description**: Reference for a FlinkStateSnapshot. - -| Parameter | Type | Docs | -| ----------| ---- | ---- | -| namespace | java.lang.String | Namespace of the snapshot resource. | -| name | java.lang.String | Name of the snapshot resource. | -| path | java.lang.String | If a path is given, all other fields will be ignored, and this will be used as the initial savepoint path. | - ### FlinkStateSnapshotSpec **Class**: org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec @@ -172,7 +161,6 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r | ----------| ---- | ---- | | kind | org.apache.flink.kubernetes.operator.api.spec.JobKind | Kind of the Flink resource, FlinkDeployment or FlinkSessionJob. | | name | java.lang.String | Name of the Flink resource. | -| namespace | java.lang.String | Namespace of the Flink resource. If empty, the operator will use the namespace of the snapshot. | ### JobSpec **Class**: org.apache.flink.kubernetes.operator.api.spec.JobSpec @@ -188,11 +176,10 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r | state | org.apache.flink.kubernetes.operator.api.spec.JobState | Desired state for the job. | | savepointTriggerNonce | java.lang.Long | Nonce used to manually trigger savepoint for the running job. In order to trigger a savepoint, change the number to a different non-null value. | | initialSavepointPath | java.lang.String | Savepoint path used by the job the first time it is deployed or during savepoint redeployments (triggered by changing the savepointRedeployNonce). | -| flinkStateSnapshotReference | org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference | Snapshot reference used by the job the first time it is deployed or during savepoint redeployments (triggered by changing the savepointRedeployNonce). | | checkpointTriggerNonce | java.lang.Long | Nonce used to manually trigger checkpoint for the running job. In order to trigger a checkpoint, change the number to a different non-null value. | | upgradeMode | org.apache.flink.kubernetes.operator.api.spec.UpgradeMode | Upgrade mode of the Flink job. | | allowNonRestoredState | java.lang.Boolean | Allow checkpoint state that cannot be mapped to any job vertex in tasks. | -| savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full redeployment of the job from the savepoint path specified in initialSavepointPath or the path/FlinkStateSnapshot reference in flinkStateSnapshotReference. In order to trigger redeployment, change the number to a different non-null value. Rollback is not possible after redeployment. | +| savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full redeployment of the job from the savepoint path specified in initialSavepointPath. In order to trigger redeployment, change the number to a different non-null value. Rollback is not possible after redeployment. | ### JobState **Class**: org.apache.flink.kubernetes.operator.api.spec.JobState @@ -418,7 +405,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r | state | java.lang.String | Last observed state of the job. | | startTime | java.lang.String | Start time of the job. | | updateTime | java.lang.String | Update time of the job. | -| upgradeSnapshotReference | org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference | | +| upgradeSavepointPath | java.lang.String | | | savepointInfo | org.apache.flink.kubernetes.operator.api.status.SavepointInfo | Information about pending and last savepoint for the job. | | checkpointInfo | org.apache.flink.kubernetes.operator.api.status.CheckpointInfo | Information about pending and last checkpoint for the job. | diff --git a/docs/content/docs/custom-resource/snapshots.md b/docs/content/docs/custom-resource/snapshots.md index e0d3983586..3c1771bc36 100644 --- a/docs/content/docs/custom-resource/snapshots.md +++ b/docs/content/docs/custom-resource/snapshots.md @@ -38,7 +38,7 @@ If you set this to false, the operator will keep using the deprecated status fie To create a savepoint or checkpoint, exactly one of the spec fields `savepoint` or `checkpoint` must present. Furthermore, in case of a savepoint you can signal to the operator that the savepoint already exists using the `alreadyExists` field, and the operator will mark it as a successful snapshot in the next reconciliation phase. -You can also instruct the Operator to start a new FlinkDeployment/FlinkSessionJob from an existing snapshot CR by using `flinkStateSnapshotReference` in the job spec. +You can also instruct the Operator to start a new FlinkDeployment/FlinkSessionJob from an existing snapshot by using `initialSavepointPath` in the job spec. ## Examples @@ -78,11 +78,11 @@ spec: ### Start job from existing snapshot +To start a job from an existing snapshot, you need to extract the path then use: + ```yaml job: - flinkStateSnapshotReference: - namespace: flink # not required if it's in the same namespace - name: example-savepoint + initialSavepointPath: [savepoint_path] ``` {{< hint warning >}} @@ -131,7 +131,7 @@ This feature is not available for checkpoints. ## Triggering snapshots Upgrade savepoints are triggered automatically by the system during the upgrade process as we have seen in the previous sections. -In this case, the savepoint path will also be recorded in the `upgradeSnapshotReference` job status field, which the operator will use when restarting the job. +In this case, the savepoint path will also be recorded in the `upgradeSavepointPath` job status field, which the operator will use when restarting the job. For backup, job forking and other purposes savepoint and checkpoints can be triggered manually or periodically by the operator, however generally speaking these will not be used during upgrades and are not required for the correct operation. diff --git a/docs/content/docs/operations/upgrade.md b/docs/content/docs/operations/upgrade.md index 65f188b48b..e9b7ec922d 100644 --- a/docs/content/docs/operations/upgrade.md +++ b/docs/content/docs/operations/upgrade.md @@ -148,20 +148,19 @@ Here is a reference example of upgrading a `basic-checkpoint-ha-example` deploym ``` 5. Restore the job: - Deploy the previously deleted job using this [FlinkDeployemnt](https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml) with `v1beta1` and explicitly set the `job.flinkStateSnapshotReference.path` to the savepoint location obtained from the step 1. + Deploy the previously deleted job using this [FlinkDeployemnt](https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml) with `v1beta1` and explicitly set the `job.initialSavepointPath` to the savepoint location obtained from the step 1. ``` spec: ... job: - flinkStateSnapshotReference: - path: /flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata + initialSavepointPath: /flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata upgradeMode: savepoint ... ``` Alternatively, we may use this command to edit and deploy the manifest: ```sh - wget -qO - https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml| yq w - "spec.job.flinkStateSnapshotReference.path" "/flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata"| kubectl apply -f - + wget -qO - https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml| yq w - "spec.job.initialSavepointPath" "/flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata"| kubectl apply -f - ``` Finally, verify that `deploy/basic-checkpoint-ha-example` log has: ``` diff --git a/e2e-tests/test_snapshot.sh b/e2e-tests/test_snapshot.sh index 8d08046592..32006ddbb2 100755 --- a/e2e-tests/test_snapshot.sh +++ b/e2e-tests/test_snapshot.sh @@ -56,10 +56,10 @@ wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIME kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job":{"state": "suspended"}}}' wait_for_status $APPLICATION_IDENTIFIER '.status.lifecycleState' "SUSPENDED" ${TIMEOUT} || exit 1 -location=$(kubectl get $APPLICATION_IDENTIFIER -o yaml | yq '.status.jobStatus.upgradeSnapshotReference.path') +location=$(kubectl get $APPLICATION_IDENTIFIER -o yaml | yq '.status.jobStatus.upgradeSavepointPath') if [ "$location" == "" ]; then echo "Legacy savepoint location was empty"; exit 1; fi -echo "Removing upgradeSnapshotReference and setting lastSavepoint" -kubectl patch flinkdep ${CLUSTER_ID} --type=merge --subresource status --patch '{"status":{"jobStatus":{"upgradeSnapshotReference":null,"savepointInfo":{"lastSavepoint":{"timeStamp": 0, "location": "'$location'", "triggerNonce": 0}}}}}' +echo "Removing upgradeSavepointPath and setting lastSavepoint" +kubectl patch flinkdep ${CLUSTER_ID} --type=merge --subresource status --patch '{"status":{"jobStatus":{"upgradeSavepointPath":null,"savepointInfo":{"lastSavepoint":{"timeStamp": 0, "location": "'$location'", "triggerNonce": 0}}}}}' # Delete operator Pod to clear CR state cache kubectl delete pod -n $(get_operator_pod_namespace) $(get_operator_pod_name) @@ -151,13 +151,11 @@ wait_for_status $APPLICATION_IDENTIFIER '.status.lifecycleState' "SUSPENDED" ${T echo "Waiting for upgrade savepoint..." snapshot=$(wait_for_snapshot $CLUSTER_ID "savepoint" "upgrade" ${TIMEOUT}) if [ "$snapshot" == "" ]; then echo "Could not find snapshot"; exit 1; fi -echo "Found upgrade snapshot: $snapshot" -wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.upgradeSnapshotReference.name' "$snapshot" ${TIMEOUT} || exit 1 location=$(kubectl get flinksnp/$snapshot -o yaml | yq '.status.path') if [ "$location" == "" ]; then echo "Upgrade savepoint location was empty"; exit 1; fi - +wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.upgradeSavepointPath' "$location" ${TIMEOUT} || exit 1 echo "Restarting deployment..." kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job": {"state": "running" } } }' diff --git a/examples/snapshot/job-from-savepoint.yaml b/examples/snapshot/job-from-savepoint.yaml index 698ad2204b..cc6627a0bc 100644 --- a/examples/snapshot/job-from-savepoint.yaml +++ b/examples/snapshot/job-from-savepoint.yaml @@ -64,6 +64,4 @@ spec: jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2 upgradeMode: savepoint - flinkStateSnapshotReference: - name: example-savepoint - namespace: flink + initialSavepointPath: file:///flink-data/savepoints/savepoint-45305c-d867504446e0 diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkStateSnapshotReference.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkStateSnapshotReference.java deleted file mode 100644 index 548692c997..0000000000 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkStateSnapshotReference.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.kubernetes.operator.api.spec; - -import org.apache.flink.annotation.Experimental; -import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -/** Reference for a FlinkStateSnapshot. */ -@Experimental -@Data -@NoArgsConstructor -@AllArgsConstructor -@Builder -@JsonIgnoreProperties(ignoreUnknown = true) -public class FlinkStateSnapshotReference { - - /** Namespace of the snapshot resource. */ - private String namespace; - - /** Name of the snapshot resource. */ - private String name; - - /** - * If a path is given, all other fields will be ignored, and this will be used as the initial - * savepoint path. - */ - private String path; - - public static FlinkStateSnapshotReference fromPath(String path) { - return new FlinkStateSnapshotReference(null, null, path); - } - - public static FlinkStateSnapshotReference fromResource(FlinkStateSnapshot resource) { - return new FlinkStateSnapshotReference( - resource.getMetadata().getNamespace(), resource.getMetadata().getName(), null); - } -} diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobReference.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobReference.java index 068b0a64d3..856e457941 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobReference.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobReference.java @@ -44,16 +44,9 @@ public class JobReference { /** Name of the Flink resource. */ private String name; - /** - * Namespace of the Flink resource. If empty, the operator will use the namespace of the - * snapshot. - */ - private String namespace; - public static JobReference fromFlinkResource(AbstractFlinkResource flinkResource) { var result = new JobReference(); result.setName(flinkResource.getMetadata().getName()); - result.setNamespace(flinkResource.getMetadata().getNamespace()); if (flinkResource instanceof FlinkDeployment) { result.setKind(JobKind.FLINK_DEPLOYMENT); @@ -71,6 +64,6 @@ public String toString() { } else if (kind == JobKind.FLINK_SESSION_JOB) { kindString = CrdConstants.KIND_SESSION_JOB; } - return String.format("%s/%s (%s)", namespace, name, kindString); + return String.format("%s (%s)", name, kindString); } } diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java index 1e845b13fb..a6c582cc23 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java @@ -72,16 +72,8 @@ public class JobSpec implements Diffable { * redeployments (triggered by changing the savepointRedeployNonce). */ @SpecDiff(DiffType.IGNORE) - @Deprecated private String initialSavepointPath; - /** - * Snapshot reference used by the job the first time it is deployed or during savepoint - * redeployments (triggered by changing the savepointRedeployNonce). - */ - @SpecDiff(DiffType.IGNORE) - private FlinkStateSnapshotReference flinkStateSnapshotReference; - /** * Nonce used to manually trigger checkpoint for the running job. In order to trigger a * checkpoint, change the number to a different non-null value. @@ -100,9 +92,8 @@ public class JobSpec implements Diffable { /** * Nonce used to trigger a full redeployment of the job from the savepoint path specified in - * initialSavepointPath or the path/FlinkStateSnapshot reference in flinkStateSnapshotReference. - * In order to trigger redeployment, change the number to a different non-null value. Rollback - * is not possible after redeployment. + * initialSavepointPath. In order to trigger redeployment, change the number to a different + * non-null value. Rollback is not possible after redeployment. */ @SpecDiff(value = DiffType.SAVEPOINT_REDEPLOY, onNullIgnore = true) private Long savepointRedeployNonce; diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java index 28cfde8761..6adef53cd3 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java @@ -18,7 +18,6 @@ package org.apache.flink.kubernetes.operator.api.status; import org.apache.flink.annotation.Experimental; -import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import io.fabric8.crd.generator.annotation.PrinterColumn; @@ -51,7 +50,7 @@ public class JobStatus { /** Update time of the job. */ private String updateTime; - private FlinkStateSnapshotReference upgradeSnapshotReference; + private String upgradeSavepointPath; /** Information about pending and last savepoint for the job. */ @Deprecated private SavepointInfo savepointInfo = new SavepointInfo(); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java index 862a5dd6a4..59272af6ff 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java @@ -23,7 +23,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; -import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference; import org.apache.flink.kubernetes.operator.api.status.Checkpoint; import org.apache.flink.kubernetes.operator.api.status.CheckpointInfo; import org.apache.flink.kubernetes.operator.api.status.CommonStatus; @@ -454,10 +453,7 @@ private void observeLatestCheckpoint( flinkService .getLastCheckpoint(JobID.fromHexString(jobID), observeConfig) .ifPresent( - snapshot -> - jobStatus.setUpgradeSnapshotReference( - FlinkStateSnapshotReference.fromPath( - snapshot.getLocation()))); + snapshot -> jobStatus.setUpgradeSavepointPath(snapshot.getLocation())); } catch (Exception e) { LOG.error("Could not observe latest checkpoint information.", e); throw new ReconciliationException(e); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java index 8f2c1a7e86..cc57889bb7 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java @@ -27,7 +27,6 @@ import org.apache.flink.kubernetes.operator.api.diff.DiffType; import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; -import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference; import org.apache.flink.kubernetes.operator.api.spec.JobState; import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.api.status.CommonStatus; @@ -41,7 +40,6 @@ import org.apache.flink.kubernetes.operator.reconciler.diff.DiffResult; import org.apache.flink.kubernetes.operator.reconciler.diff.ReflectiveDiffBuilder; import org.apache.flink.kubernetes.operator.utils.EventRecorder; -import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.kubernetes.operator.utils.StatusRecorder; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; @@ -122,7 +120,7 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { updateStatusBeforeFirstDeployment( cr, spec, deployConfig, status, ctx.getKubernetesClient()); - deploy(ctx, spec, deployConfig, getInitialSnapshotPath(ctx, spec), false); + deploy(ctx, spec, deployConfig, getInitialSnapshotPath(spec), false); ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig, clock); return; @@ -178,19 +176,11 @@ public void reconcile(FlinkResourceContext ctx) throws Exception { } } - private Optional getInitialSnapshotPath( - FlinkResourceContext ctx, AbstractFlinkSpec spec) { + private Optional getInitialSnapshotPath(AbstractFlinkSpec spec) { if (spec.getJob() == null) { return Optional.empty(); } - if (spec.getJob().getFlinkStateSnapshotReference() != null) { - return Optional.of( - FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath( - ctx.getKubernetesClient(), - spec.getJob().getFlinkStateSnapshotReference())); - } - return Optional.ofNullable(spec.getJob().getInitialSavepointPath()); } @@ -233,16 +223,10 @@ private void updateStatusBeforeFirstDeployment( CR cr, SPEC spec, Configuration deployConfig, STATUS status, KubernetesClient client) { if (spec.getJob() != null) { var initialUpgradeMode = UpgradeMode.STATELESS; - var snapshotRef = spec.getJob().getFlinkStateSnapshotReference(); var initialSp = spec.getJob().getInitialSavepointPath(); - if (snapshotRef != null) { - status.getJobStatus().setUpgradeSnapshotReference(snapshotRef); - initialUpgradeMode = UpgradeMode.SAVEPOINT; - } else if (initialSp != null) { - status.getJobStatus() - .setUpgradeSnapshotReference( - FlinkStateSnapshotReference.fromPath(initialSp)); + if (initialSp != null) { + status.getJobStatus().setUpgradeSavepointPath(initialSp); initialUpgradeMode = UpgradeMode.SAVEPOINT; } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java index 7d8a15b00c..babb05f88c 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java @@ -25,7 +25,6 @@ import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.diff.DiffType; import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec; -import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference; import org.apache.flink.kubernetes.operator.api.spec.JobState; import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.api.status.CommonStatus; @@ -48,7 +47,6 @@ import io.javaoperatorsdk.operator.processing.event.ResourceID; import lombok.Value; -import org.apache.commons.lang3.ObjectUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -288,15 +286,7 @@ protected void restoreJob( if (spec.getJob().getUpgradeMode() == UpgradeMode.SAVEPOINT) { savepointOpt = Optional.ofNullable( - ctx.getResource() - .getStatus() - .getJobStatus() - .getUpgradeSnapshotReference()) - .map( - ref -> - FlinkStateSnapshotUtils - .getValidatedFlinkStateSnapshotPath( - ctx.getKubernetesClient(), ref)); + ctx.getResource().getStatus().getJobStatus().getUpgradeSavepointPath()); if (savepointOpt.isEmpty()) { savepointOpt = Optional.ofNullable( @@ -313,28 +303,26 @@ protected void restoreJob( } /** - * Updates the upgrade snapshot reference field in the JobSpec of the current Flink resource. If - * snapshot resources are enabled, a new FlinkStateSnapshot will be created, else it will only - * set the path field of the snapshot reference. + * Updates the upgrade savepoint field in the JobSpec of the current Flink resource and if + * snapshot resources are enabled, a new FlinkStateSnapshot will be created. * * @param ctx context * @param savepointLocation location of savepoint taken */ - protected void setUpgradeSnapshotReferenceFromSavepoint( - FlinkResourceContext ctx, String savepointLocation) { - var conf = ObjectUtils.firstNonNull(ctx.getObserveConfig(), new Configuration()); + protected void setUpgradeSavepointPath(FlinkResourceContext ctx, String savepointLocation) { + var conf = ctx.getObserveConfig(); var savepointFormatType = - conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE); - - var snapshotRef = - FlinkStateSnapshotUtils.createReferenceForUpgradeSavepoint( - conf, - ctx.getOperatorConfig(), - ctx.getKubernetesClient(), - ctx.getResource(), - SavepointFormatType.valueOf(savepointFormatType.name()), - savepointLocation); - ctx.getResource().getStatus().getJobStatus().setUpgradeSnapshotReference(snapshotRef); + ctx.getObserveConfig() + .get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE); + + FlinkStateSnapshotUtils.createUpgradeSnapshotResource( + conf, + ctx.getOperatorConfig(), + ctx.getKubernetesClient(), + ctx.getResource(), + SavepointFormatType.valueOf(savepointFormatType.name()), + savepointLocation); + ctx.getResource().getStatus().getJobStatus().setUpgradeSavepointPath(savepointLocation); } /** @@ -471,11 +459,11 @@ protected void resubmitJob(FlinkResourceContext ctx, boolean requireHaMetada LOG.info("Resubmitting Flink job..."); SPEC specToRecover = ReconciliationUtils.getDeployedSpec(ctx.getResource()); - var upgradeSnapshotRef = - ctx.getResource().getStatus().getJobStatus().getUpgradeSnapshotReference(); + var upgradeStatePath = + ctx.getResource().getStatus().getJobStatus().getUpgradeSavepointPath(); var savepointLegacy = ctx.getResource().getStatus().getJobStatus().getSavepointInfo().getLastSavepoint(); - var lastSavepointKnown = upgradeSnapshotRef != null || savepointLegacy != null; + var lastSavepointKnown = upgradeStatePath != null || savepointLegacy != null; if (requireHaMetadata) { specToRecover.getJob().setUpgradeMode(UpgradeMode.LAST_STATE); @@ -498,21 +486,9 @@ private void redeployWithSavepoint( cancelJob(ctx, UpgradeMode.STATELESS); currentDeploySpec.getJob().setUpgradeMode(UpgradeMode.SAVEPOINT); - var snapshotRef = currentDeploySpec.getJob().getFlinkStateSnapshotReference(); - var initialSavepointPath = currentDeploySpec.getJob().getInitialSavepointPath(); - - if (snapshotRef == null && initialSavepointPath != null) { - snapshotRef = FlinkStateSnapshotReference.fromPath(initialSavepointPath); - } - - Optional savepointPath = Optional.empty(); - if (snapshotRef != null) { - savepointPath = - Optional.of( - FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath( - ctx.getKubernetesClient(), snapshotRef)); - status.getJobStatus().setUpgradeSnapshotReference(snapshotRef); - } + Optional savepointPath = + Optional.ofNullable(currentDeploySpec.getJob().getInitialSavepointPath()); + status.getJobStatus().setUpgradeSavepointPath(savepointPath.orElse(null)); if (desiredJobState == JobState.RUNNING) { deploy( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java index 34406b9d50..ac6c69e937 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java @@ -26,7 +26,6 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; -import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference; import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; @@ -167,9 +166,7 @@ public void deploy( // Last state deployment, explicitly set a dummy savepoint path to avoid accidental // incorrect state restore in case the HA metadata is deleted by the user deployConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, LAST_STATE_DUMMY_SP_PATH); - status.getJobStatus() - .setUpgradeSnapshotReference( - FlinkStateSnapshotReference.fromPath(LAST_STATE_DUMMY_SP_PATH)); + status.getJobStatus().setUpgradeSavepointPath(LAST_STATE_DUMMY_SP_PATH); } else { // Stateless deployment, remove any user configured savepoint path deployConfig.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH); @@ -242,7 +239,7 @@ protected void cancelJob(FlinkResourceContext ctx, UpgradeMode var conf = ObjectUtils.firstNonNull(ctx.getObserveConfig(), new Configuration()); ctx.getFlinkService() .cancelJob(ctx.getResource(), upgradeMode, conf) - .ifPresent(location -> setUpgradeSnapshotReferenceFromSavepoint(ctx, location)); + .ifPresent(location -> setUpgradeSavepointPath(ctx, location)); } @Override diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java index dcf781c373..3f54a8c73f 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java @@ -104,7 +104,7 @@ protected void cancelJob(FlinkResourceContext ctx, UpgradeMode ctx.getFlinkService() .cancelSessionJob(ctx.getResource(), upgradeMode, conf) - .ifPresent(location -> setUpgradeSnapshotReferenceFromSavepoint(ctx, location)); + .ifPresent(location -> setUpgradeSavepointPath(ctx, location)); ctx.getResource().getStatus().getJobStatus().setJobId(null); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java index 018a080be1..ecc6c5c96d 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java @@ -41,7 +41,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -72,13 +71,10 @@ public class EventSourceUtils { if (jobRef == null || jobRef.getName() == null) { return Collections.emptySet(); } - var namespace = - Optional.ofNullable(jobRef.getNamespace()) - .orElse(snapshot.getMetadata().getNamespace()); return Set.of( new ResourceID( snapshot.getSpec().getJobReference().getName(), - namespace)); + snapshot.getMetadata().getNamespace())); }) .withNamespacesInheritedFromController(context) .followNamespaceChanges(true) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java index fa54247867..5cfbcb1e02 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java @@ -23,7 +23,6 @@ import org.apache.flink.kubernetes.operator.api.CrdConstants; import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; import org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec; -import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference; import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec; import org.apache.flink.kubernetes.operator.api.spec.JobReference; import org.apache.flink.kubernetes.operator.api.spec.SavepointSpec; @@ -39,7 +38,6 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.processing.event.ResourceID; import org.apache.commons.lang3.ObjectUtils; -import org.apache.commons.lang3.StringUtils; import javax.annotation.Nullable; @@ -59,71 +57,6 @@ /** Utilities class for FlinkStateSnapshot resources. */ public class FlinkStateSnapshotUtils { - /** - * From a snapshot reference, return its snapshot path. If a {@link FlinkStateSnapshot} is - * referenced, it will be retrieved from Kubernetes. - * - * @param kubernetesClient kubernetes client - * @param snapshotRef snapshot reference - * @return found savepoint path - */ - public static String getValidatedFlinkStateSnapshotPath( - KubernetesClient kubernetesClient, FlinkStateSnapshotReference snapshotRef) { - if (StringUtils.isNotBlank(snapshotRef.getPath())) { - return snapshotRef.getPath(); - } - - if (StringUtils.isBlank(snapshotRef.getName())) { - throw new IllegalArgumentException( - String.format("Invalid snapshot name: %s", snapshotRef.getName())); - } - - var result = - snapshotRef.getNamespace() == null - ? kubernetesClient - .resources(FlinkStateSnapshot.class) - .withName(snapshotRef.getName()) - .get() - : kubernetesClient - .resources(FlinkStateSnapshot.class) - .inNamespace(snapshotRef.getNamespace()) - .withName(snapshotRef.getName()) - .get(); - - if (result == null) { - throw new IllegalStateException( - String.format( - "Cannot find snapshot %s in namespace %s.", - snapshotRef.getNamespace(), snapshotRef.getName())); - } - - // We can return the savepoint path if it's marked as completed without waiting for the - // reconciler to update its status. - if (result.getSpec().isSavepoint() && result.getSpec().getSavepoint().getAlreadyExists()) { - var path = result.getSpec().getSavepoint().getPath(); - if (!StringUtils.isBlank(path)) { - return path; - } - } - - if (COMPLETED != result.getStatus().getState()) { - throw new IllegalStateException( - String.format( - "Snapshot %s/%s is not complete yet.", - snapshotRef.getNamespace(), snapshotRef.getName())); - } - - var path = result.getStatus().getPath(); - if (StringUtils.isBlank(path)) { - throw new IllegalStateException( - String.format( - "Snapshot %s/%s path is incorrect: %s.", - snapshotRef.getNamespace(), snapshotRef.getName(), path)); - } - - return path; - } - protected static FlinkStateSnapshot createFlinkStateSnapshot( KubernetesClient kubernetesClient, String namespace, @@ -263,9 +196,7 @@ public static String getFlinkStateSnapshotName( } /** - * For an upgrade savepoint, create a {@link FlinkStateSnapshot} on the Kubernetes cluster and - * return its reference if snapshot resources are enabled. In other case return a reference - * containing only the path. + * For an upgrade savepoint, create a {@link FlinkStateSnapshot} on the Kubernetes cluster. * * @param conf job configuration * @param operatorConf operator configuration @@ -273,9 +204,9 @@ public static String getFlinkStateSnapshotName( * @param flinkResource referenced Flink resource * @param savepointFormatType savepoint format type * @param savepointPath path of savepoint - * @return reference for snapshot + * @return State snapshot resource */ - public static FlinkStateSnapshotReference createReferenceForUpgradeSavepoint( + public static Optional createUpgradeSnapshotResource( Configuration conf, FlinkOperatorConfiguration operatorConf, KubernetesClient kubernetesClient, @@ -283,12 +214,12 @@ public static FlinkStateSnapshotReference createReferenceForUpgradeSavepoint( SavepointFormatType savepointFormatType, String savepointPath) { if (!isSnapshotResourceEnabled(operatorConf, conf)) { - return FlinkStateSnapshotReference.fromPath(savepointPath); + return Optional.empty(); } var disposeOnDelete = conf.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE); - var snapshot = + var savepointResource = createSavepointResource( kubernetesClient, flinkResource, @@ -296,7 +227,7 @@ public static FlinkStateSnapshotReference createReferenceForUpgradeSavepoint( SnapshotTriggerType.UPGRADE, savepointFormatType, disposeOnDelete); - return FlinkStateSnapshotReference.fromResource(snapshot); + return Optional.of(savepointResource); } /** @@ -439,9 +370,8 @@ public static void snapshotTriggerPending(FlinkStateSnapshot snapshot) { * @return namespace with the job reference to be found in */ public static ResourceID getSnapshotJobReferenceResourceId(FlinkStateSnapshot snapshot) { - var namespace = - Optional.ofNullable(snapshot.getSpec().getJobReference().getNamespace()) - .orElse(snapshot.getMetadata().getNamespace()); - return new ResourceID(snapshot.getSpec().getJobReference().getName(), namespace); + return new ResourceID( + snapshot.getSpec().getJobReference().getName(), + snapshot.getMetadata().getNamespace()); } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java index b3d4dbde46..f8ce07f44c 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java @@ -24,6 +24,7 @@ import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.api.status.CommonStatus; import org.apache.flink.kubernetes.operator.api.status.JobStatus; +import org.apache.flink.kubernetes.operator.api.status.Savepoint; import org.apache.flink.kubernetes.operator.api.status.SnapshotInfo; import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType; import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; @@ -32,6 +33,7 @@ import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler; import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.core.util.CronExpression; import org.slf4j.Logger; @@ -360,25 +362,17 @@ public static void resetSnapshotTriggers( * @return True if last savepoint is known */ public static boolean lastSavepointKnown(CommonStatus status) { - var lastSavepoint = status.getJobStatus().getUpgradeSnapshotReference(); - - if (lastSavepoint != null) { - if (StringUtils.isNotBlank(lastSavepoint.getName())) { - return true; - } - - var location = lastSavepoint.getPath(); - return location != null - && !location.equals(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH); - } - - // Check legacy savepoint field too - var lastSavepointLegacy = status.getJobStatus().getSavepointInfo().getLastSavepoint(); - if (lastSavepointLegacy == null) { + var location = + ObjectUtils.firstNonNull( + status.getJobStatus().getUpgradeSavepointPath(), + Optional.ofNullable( + status.getJobStatus().getSavepointInfo().getLastSavepoint()) + .map(Savepoint::getLocation) + .orElse(null)); + + if (location == null) { return true; } - return !lastSavepointLegacy - .getLocation() - .equals(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH); + return !location.equals(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH); } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java index 994cce8dc0..b326e96b4b 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java @@ -294,12 +294,6 @@ private Optional validateJobSpec( return Optional.of("Job parallelism must be larger than 0"); } - if (!StringUtils.isNullOrWhitespaceOnly(job.getInitialSavepointPath()) - && job.getFlinkStateSnapshotReference() != null) { - return Optional.of( - "Cannot set both initialSavepointPath and flinkStateSnapshotReference in the job spec"); - } - return Optional.empty(); } @@ -474,10 +468,9 @@ private Optional validateSpecChange( if (newJob.getSavepointRedeployNonce() != null && !newJob.getSavepointRedeployNonce() .equals(oldJob.getSavepointRedeployNonce())) { - if (StringUtils.isNullOrWhitespaceOnly(newJob.getInitialSavepointPath()) - && newJob.getFlinkStateSnapshotReference() == null) { + if (StringUtils.isNullOrWhitespaceOnly(newJob.getInitialSavepointPath())) { return Optional.of( - "InitialSavepointPath and flinkStateSnapshotReference must not be empty for savepoint redeployment"); + "InitialSavepointPath must not be empty for savepoint redeployment"); } } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 1bb04216ca..32e49502a8 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -25,7 +25,6 @@ import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; -import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference; import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.api.spec.IngressSpec; import org.apache.flink.kubernetes.operator.api.spec.JobState; @@ -397,9 +396,7 @@ public void verifyUpgradeFromSavepointLegacyMode(FlinkVersion flinkVersion) thro new TaskManagerInfo( "component=taskmanager,app=" + appCluster.getMetadata().getName(), 1), appCluster.getStatus().getTaskManager()); - assertEquals( - FlinkStateSnapshotReference.fromPath("s0"), - appCluster.getStatus().getJobStatus().getUpgradeSnapshotReference()); + assertEquals("s0", appCluster.getStatus().getJobStatus().getUpgradeSavepointPath()); var previousJobs = new ArrayList<>(jobs); appCluster.getSpec().getJob().setInitialSavepointPath("s1"); @@ -407,9 +404,7 @@ public void verifyUpgradeFromSavepointLegacyMode(FlinkVersion flinkVersion) thro // Send in a no-op change testController.reconcile(appCluster, context); assertEquals(previousJobs, new ArrayList<>(flinkService.listJobs())); - assertEquals( - FlinkStateSnapshotReference.fromPath("s0"), - appCluster.getStatus().getJobStatus().getUpgradeSnapshotReference()); + assertEquals("s0", appCluster.getStatus().getJobStatus().getUpgradeSavepointPath()); // Upgrade job appCluster.getSpec().getJob().setParallelism(100); @@ -425,8 +420,7 @@ public void verifyUpgradeFromSavepointLegacyMode(FlinkVersion flinkVersion) thro .getState()); assertEquals(new TaskManagerInfo("", 0), appCluster.getStatus().getTaskManager()); assertEquals( - FlinkStateSnapshotReference.fromPath("savepoint_0"), - appCluster.getStatus().getJobStatus().getUpgradeSnapshotReference()); + "savepoint_0", appCluster.getStatus().getJobStatus().getUpgradeSavepointPath()); testController.reconcile(appCluster, context); jobs = flinkService.listJobs(); @@ -441,8 +435,7 @@ public void verifyUpgradeFromSavepointLegacyMode(FlinkVersion flinkVersion) thro JobManagerDeploymentStatus.READY, appCluster.getStatus().getJobManagerDeploymentStatus()); assertEquals( - FlinkStateSnapshotReference.fromPath("savepoint_1"), - appCluster.getStatus().getJobStatus().getUpgradeSnapshotReference()); + "savepoint_1", appCluster.getStatus().getJobStatus().getUpgradeSavepointPath()); // Resume from last savepoint appCluster.getSpec().getJob().setState(JobState.RUNNING); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java index fb916b6658..ba6af301fb 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java @@ -23,7 +23,6 @@ import org.apache.flink.kubernetes.operator.TestingFlinkService; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; -import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference; import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.api.spec.JobState; import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; @@ -193,9 +192,7 @@ public void verifyUpgradeFromSavepointLegacy() throws Exception { var jobs = flinkService.listJobs(); assertEquals(1, jobs.size()); assertEquals("s0", jobs.get(0).f0); - assertEquals( - FlinkStateSnapshotReference.fromPath("s0"), - sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference()); + assertEquals("s0", sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath()); var previousJobs = new ArrayList<>(jobs); sessionJob.getSpec().getJob().setInitialSavepointPath("s1"); @@ -203,16 +200,13 @@ public void verifyUpgradeFromSavepointLegacy() throws Exception { // Send in a no-op change testController.reconcile(sessionJob, context); assertEquals(previousJobs, new ArrayList<>(flinkService.listJobs())); - assertEquals( - FlinkStateSnapshotReference.fromPath("s0"), - sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference()); + assertEquals("s0", sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath()); // Upgrade job sessionJob.getSpec().getJob().setParallelism(100); updateControl = testController.reconcile(sessionJob, context); assertEquals( - FlinkStateSnapshotReference.fromPath("savepoint_0"), - sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference()); + "savepoint_0", sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath()); assertEquals(0L, updateControl.getScheduleDelay().get()); assertEquals( @@ -232,8 +226,7 @@ public void verifyUpgradeFromSavepointLegacy() throws Exception { assertEquals("savepoint_0", jobs.get(0).f0); testController.reconcile(sessionJob, context); assertEquals( - FlinkStateSnapshotReference.fromPath("savepoint_0"), - sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference()); + "savepoint_0", sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath()); // Suspend job sessionJob.getSpec().getJob().setState(JobState.SUSPENDED); @@ -247,8 +240,7 @@ public void verifyUpgradeFromSavepointLegacy() throws Exception { assertEquals(1, jobs.size()); assertEquals("savepoint_1", jobs.get(0).f0); assertEquals( - FlinkStateSnapshotReference.fromPath("savepoint_1"), - sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference()); + "savepoint_1", sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath()); testController.reconcile(sessionJob, context); testController.cleanup(sessionJob, context); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java index 6c4bd57aa6..81cf121b27 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java @@ -514,10 +514,8 @@ public void testReconcileJobNotFound() { var snapshot = createSavepoint(deployment); var errorMessage = String.format( - "Secondary resource %s/%s (%s) for savepoint snapshot-test was not found", - deployment.getMetadata().getNamespace(), - deployment.getMetadata().getName(), - CrdConstants.KIND_FLINK_DEPLOYMENT); + "Secondary resource %s (%s) for savepoint snapshot-test was not found", + deployment.getMetadata().getName(), CrdConstants.KIND_FLINK_DEPLOYMENT); // First reconcile will trigger the snapshot. controller.reconcile(snapshot, TestUtils.createSnapshotContext(client, deployment)); @@ -558,10 +556,8 @@ public void testReconcileJobNotRunning() { var snapshot = createSavepoint(deployment); var errorMessage = String.format( - "Secondary resource %s/%s (%s) for savepoint snapshot-test is not running", - deployment.getMetadata().getNamespace(), - deployment.getMetadata().getName(), - CrdConstants.KIND_FLINK_DEPLOYMENT); + "Secondary resource %s (%s) for savepoint snapshot-test is not running", + deployment.getMetadata().getName(), CrdConstants.KIND_FLINK_DEPLOYMENT); controller.reconcile(snapshot, context); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java index 7cf1b74972..9f79c407fc 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java @@ -493,9 +493,7 @@ public void observeSavepoint() throws Exception { assertEquals( org.apache.flink.api.common.JobStatus.FAILED.name(), deployment.getStatus().getJobStatus().getState()); - assertEquals( - "last-SP", - deployment.getStatus().getJobStatus().getUpgradeSnapshotReference().getPath()); + assertEquals("last-SP", deployment.getStatus().getJobStatus().getUpgradeSavepointPath()); assertFalse(SnapshotUtils.savepointInProgress(deployment.getStatus().getJobStatus())); observer.observe(deployment, readyContext); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java index 51f90fb81d..6a5f60ad90 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java @@ -40,7 +40,6 @@ import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; -import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference; import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.api.spec.JobReference; import org.apache.flink.kubernetes.operator.api.spec.JobSpec; @@ -174,7 +173,7 @@ public void testSubmitAndCleanUpWithSavepoint(FlinkVersion flinkVersion) throws // clean up assertNull(deployment.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint()); - assertNull(deployment.getStatus().getJobStatus().getUpgradeSnapshotReference()); + assertNull(deployment.getStatus().getJobStatus().getUpgradeSavepointPath()); reconciler.cleanup( deployment, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient)); @@ -185,11 +184,11 @@ public void testSubmitAndCleanUpWithSavepoint(FlinkVersion flinkVersion) throws assertThat(snapshot.getSpec().getSavepoint().getPath()) .isEqualTo("savepoint_0"); assertEquals( - FlinkStateSnapshotReference.fromResource(snapshot), + "savepoint_0", deployment .getStatus() .getJobStatus() - .getUpgradeSnapshotReference()); + .getUpgradeSavepointPath()); }); } @@ -209,7 +208,7 @@ public void testSubmitAndCleanUpWithSavepointOnResource(FlinkVersion flinkVersio verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs()); // clean up - assertNull(deployment.getStatus().getJobStatus().getUpgradeSnapshotReference()); + assertNull(deployment.getStatus().getJobStatus().getUpgradeSavepointPath()); reconciler.cleanup( deployment, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient)); @@ -220,11 +219,11 @@ public void testSubmitAndCleanUpWithSavepointOnResource(FlinkVersion flinkVersio assertThat(snapshot.getSpec().getSavepoint().getPath()) .isEqualTo("savepoint_0"); assertEquals( - FlinkStateSnapshotReference.fromResource(snapshot), + "savepoint_0", deployment .getStatus() .getJobStatus() - .getUpgradeSnapshotReference()); + .getUpgradeSavepointPath()); }); } @@ -554,7 +553,7 @@ private void testSnapshotLegacy(FlinkDeployment deployment, SnapshotType snapsho verifyAndSetRunningJobsToStatus(deployment, runningJobs); assertFalse(isSnapshotInProgress.test(getJobStatus(deployment))); assertNull(getSnapshotInfo.apply(deployment).getLastSnapshot()); - assertNull(deployment.getStatus().getJobStatus().getUpgradeSnapshotReference()); + assertNull(deployment.getStatus().getJobStatus().getUpgradeSavepointPath()); assertNull(getLastSnapshotStatus(deployment, snapshotType)); FlinkDeployment snDeployment = ReconciliationUtils.clone(deployment); @@ -563,7 +562,7 @@ private void testSnapshotLegacy(FlinkDeployment deployment, SnapshotType snapsho reconciler.reconcile(snDeployment, context); assertFalse(isSnapshotInProgress.test((getJobStatus(snDeployment)))); assertNull(getSnapshotInfo.apply(deployment).getLastSnapshot()); - assertNull(deployment.getStatus().getJobStatus().getUpgradeSnapshotReference()); + assertNull(deployment.getStatus().getJobStatus().getUpgradeSavepointPath()); assertNull(getLastSnapshotStatus(snDeployment, snapshotType)); // trigger when nonce is defined @@ -1199,11 +1198,11 @@ public void testReconcileIfUpgradeModeNotAvailable() throws Exception { assertThat(snapshot.getSpec().getSavepoint().getPath()) .isEqualTo("savepoint_0"); assertEquals( - FlinkStateSnapshotReference.fromResource(snapshot), + "savepoint_0", deployment .getStatus() .getJobStatus() - .getUpgradeSnapshotReference()); + .getUpgradeSavepointPath()); }); } @@ -1376,7 +1375,7 @@ private void verifySavepointRedeploy( status.getJobManagerDeploymentStatus()); // Verify that savepoint and upgrade mode is recorded correctly in reconciled spec - assertEquals(savepoint, status.getJobStatus().getUpgradeSnapshotReference().getPath()); + assertEquals(savepoint, status.getJobStatus().getUpgradeSavepointPath()); assertEquals( UpgradeMode.SAVEPOINT, status.getReconciliationStatus() diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java index e4117659c1..465c3cf61d 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java @@ -29,7 +29,6 @@ import org.apache.flink.kubernetes.operator.api.CrdConstants; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; -import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference; import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.api.spec.JobState; import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; @@ -45,7 +44,6 @@ import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter; import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper; -import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils; import org.apache.flink.runtime.client.JobStatusMessage; import io.fabric8.kubernetes.client.KubernetesClient; @@ -178,38 +176,25 @@ private void testUpgradeToSavepoint(FlinkVersion flinkVersion, UpgradeMode fromU snapshots.get(0).getMetadata().getLabels().get(CrdConstants.LABEL_SNAPSHOT_TYPE)); } - private static Stream flinkVersionsWithLegacyMode() { - return TestUtils.flinkVersions() - .flatMap( - arg -> - Stream.of( - Arguments.of(arg.get()[0], true), - Arguments.of(arg.get()[0], false))); - } - @ParameterizedTest - @MethodSource("flinkVersionsWithLegacyMode") - public void testUpgradeFromStatelessToLastState( - FlinkVersion flinkVersion, boolean legacySnapshots) throws Exception { - testUpgradeToLastState(flinkVersion, UpgradeMode.STATELESS, legacySnapshots); + @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions") + public void testUpgradeFromStatelessToLastState(FlinkVersion flinkVersion) throws Exception { + testUpgradeToLastState(flinkVersion, UpgradeMode.STATELESS); } @ParameterizedTest - @MethodSource("flinkVersionsWithLegacyMode") - public void testUpgradeFromSavepointToLastState( - FlinkVersion flinkVersion, boolean legacySnapshots) throws Exception { - testUpgradeToLastState(flinkVersion, UpgradeMode.SAVEPOINT, legacySnapshots); + @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions") + public void testUpgradeFromSavepointToLastState(FlinkVersion flinkVersion) throws Exception { + testUpgradeToLastState(flinkVersion, UpgradeMode.SAVEPOINT); } @ParameterizedTest - @MethodSource("flinkVersionsWithLegacyMode") - public void testUpgradeFromLastStateToLastState( - FlinkVersion flinkVersion, boolean legacySnapshots) throws Exception { - testUpgradeToLastState(flinkVersion, UpgradeMode.LAST_STATE, legacySnapshots); + @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions") + public void testUpgradeFromLastStateToLastState(FlinkVersion flinkVersion) throws Exception { + testUpgradeToLastState(flinkVersion, UpgradeMode.LAST_STATE); } - private void testUpgradeToLastState( - FlinkVersion flinkVersion, UpgradeMode fromUpgradeMode, boolean legacySnapshots) + private void testUpgradeToLastState(FlinkVersion flinkVersion, UpgradeMode fromUpgradeMode) throws Exception { FlinkDeployment deployment = buildApplicationCluster(flinkVersion, fromUpgradeMode); @@ -250,27 +235,7 @@ private void testUpgradeToLastState( deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE); deployment.getSpec().setRestartNonce(200L); flinkService.setHaDataAvailable(false); - if (legacySnapshots) { - deployment - .getStatus() - .getJobStatus() - .setUpgradeSnapshotReference( - FlinkStateSnapshotReference.fromPath("finished_sp")); - } else { - var snapshot = - FlinkStateSnapshotUtils.createSavepointResource( - kubernetesClient, - deployment, - "finished_sp", - SnapshotTriggerType.UPGRADE, - SavepointFormatType.CANONICAL, - false); - deployment - .getStatus() - .getJobStatus() - .setUpgradeSnapshotReference( - FlinkStateSnapshotReference.fromResource(snapshot)); - } + deployment.getStatus().getJobStatus().setUpgradeSavepointPath("finished_sp"); deployment.getStatus().getJobStatus().setState("FINISHED"); deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); deployment @@ -308,11 +273,7 @@ public void testUpgradeUsesLatestSnapshot(boolean useLegacyFields) throws Except SavepointFormatType.CANONICAL, 0L)); } else { - deployment - .getStatus() - .getJobStatus() - .setUpgradeSnapshotReference( - FlinkStateSnapshotReference.fromPath(savepointPath)); + deployment.getStatus().getJobStatus().setUpgradeSavepointPath(savepointPath); deployment .getStatus() .getJobStatus() @@ -461,8 +422,7 @@ public void testInitialJmDeployCannotStartLegacy(UpgradeMode upgradeMode, boolea if (initSavepoint) { assertEquals("init-sp", flinkService.listJobs().get(0).f0); assertEquals( - "init-sp", - deployment.getStatus().getJobStatus().getUpgradeSnapshotReference().getPath()); + "init-sp", deployment.getStatus().getJobStatus().getUpgradeSavepointPath()); assertEquals(UpgradeMode.SAVEPOINT, lastReconciledSpec.getJob().getUpgradeMode()); } else { assertNull(flinkService.listJobs().get(0).f0); @@ -676,12 +636,12 @@ public void testLastStateDummySpPath(boolean checkpointAvailable) throws Excepti if (checkpointAvailable) { assertEquals( ApplicationReconciler.LAST_STATE_DUMMY_SP_PATH, - deployment.getStatus().getJobStatus().getUpgradeSnapshotReference().getPath()); + deployment.getStatus().getJobStatus().getUpgradeSavepointPath()); assertEquals( ApplicationReconciler.LAST_STATE_DUMMY_SP_PATH, flinkService.listJobs().get(0).f0); } else { - assertNull(deployment.getStatus().getJobStatus().getUpgradeSnapshotReference()); + assertNull(deployment.getStatus().getJobStatus().getUpgradeSavepointPath()); assertNull(flinkService.listJobs().get(0).f0); } } @@ -798,7 +758,7 @@ public void testUpgradeModeChangedToLastStateShouldNotTriggerSavepointWhileHAEna // trigger a savepoint assertEquals( ApplicationReconciler.LAST_STATE_DUMMY_SP_PATH, - deployment.getStatus().getJobStatus().getUpgradeSnapshotReference().getPath()); + deployment.getStatus().getJobStatus().getUpgradeSavepointPath()); assertEquals( ApplicationReconciler.LAST_STATE_DUMMY_SP_PATH, flinkService.listJobs().get(0).f0); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java index 88b4cf5b38..6ab77705b6 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java @@ -26,7 +26,6 @@ import org.apache.flink.kubernetes.operator.api.CrdConstants; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec; -import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference; import org.apache.flink.kubernetes.operator.api.spec.JobState; import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus; @@ -130,8 +129,7 @@ public void testSubmitAndCleanUpWithSavepoint(boolean legacySnapshots) throws Ex if (legacySnapshots) { assertEquals( - "savepoint_0", - sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference().getPath()); + "savepoint_0", sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath()); } else { var snapshots = TestUtils.getFlinkStateSnapshotsForResource( @@ -139,8 +137,8 @@ public void testSubmitAndCleanUpWithSavepoint(boolean legacySnapshots) throws Ex assertThat(snapshots).isNotEmpty(); assertEquals("savepoint_0", snapshots.get(0).getSpec().getSavepoint().getPath()); assertEquals( - FlinkStateSnapshotReference.fromResource(snapshots.get(0)), - sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference()); + snapshots.get(0).getSpec().getSavepoint().getPath(), + sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath()); } } @@ -173,8 +171,7 @@ public void testSubmitAndCleanUpWithSavepointOnResource(boolean legacySnapshots) sessionJob, TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient)); if (legacySnapshots) { assertEquals( - "savepoint_0", - sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference().getPath()); + "savepoint_0", sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath()); } else { var snapshots = TestUtils.getFlinkStateSnapshotsForResource( @@ -182,8 +179,7 @@ public void testSubmitAndCleanUpWithSavepointOnResource(boolean legacySnapshots) assertThat(snapshots).isNotEmpty(); assertEquals("savepoint_0", snapshots.get(0).getSpec().getSavepoint().getPath()); assertEquals( - FlinkStateSnapshotReference.fromResource(snapshots.get(0)), - sessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference()); + "savepoint_0", sessionJob.getStatus().getJobStatus().getUpgradeSavepointPath()); } } @@ -412,11 +408,7 @@ public void testSavepointUpgrade(boolean legacySnapshots) throws Exception { if (legacySnapshots) { assertEquals( "savepoint_0", - statefulSessionJob - .getStatus() - .getJobStatus() - .getUpgradeSnapshotReference() - .getPath()); + statefulSessionJob.getStatus().getJobStatus().getUpgradeSavepointPath()); } else { var snapshots = TestUtils.getFlinkStateSnapshotsForResource( @@ -430,8 +422,8 @@ public void testSavepointUpgrade(boolean legacySnapshots) throws Exception { .getLabels() .get(CrdConstants.LABEL_SNAPSHOT_TYPE)); assertEquals( - FlinkStateSnapshotReference.fromResource(snapshots.get(0)), - statefulSessionJob.getStatus().getJobStatus().getUpgradeSnapshotReference()); + snapshots.get(0).getSpec().getSavepoint().getPath(), + statefulSessionJob.getStatus().getJobStatus().getUpgradeSavepointPath()); } flinkService.clear(); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java index f9b4781c75..3cfa562b21 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java @@ -302,7 +302,7 @@ public void cancelJobWithStatelessUpgradeModeTest() throws Exception { assertTrue(cancelFuture.isDone()); assertEquals(jobID, cancelFuture.get()); assertNull(jobStatus.getSavepointInfo().getLastSavepoint()); - assertNull(jobStatus.getUpgradeSnapshotReference()); + assertNull(jobStatus.getUpgradeSavepointPath()); } @ParameterizedTest @@ -541,7 +541,7 @@ public void cancelJobWithLastStateUpgradeModeTest() throws Exception { configManager.getObserveConfig(deployment), false); assertNull(jobStatus.getSavepointInfo().getLastSavepoint()); - assertNull(jobStatus.getUpgradeSnapshotReference()); + assertNull(jobStatus.getUpgradeSavepointPath()); } @Test diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java index fd136969c6..1a3dc8e69f 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java @@ -23,7 +23,6 @@ import org.apache.flink.kubernetes.operator.api.CrdConstants; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; -import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference; import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.api.spec.JobKind; import org.apache.flink.kubernetes.operator.api.spec.JobReference; @@ -31,7 +30,6 @@ import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType; import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType; -import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; import io.fabric8.kubernetes.client.KubernetesClient; @@ -47,14 +45,10 @@ import static org.apache.flink.kubernetes.operator.TestUtils.reconcileSpec; import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED; import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.IN_PROGRESS; -import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.TRIGGER_PENDING; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE; -import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; /** Tests for {@link FlinkStateSnapshotUtils}. */ @@ -62,8 +56,6 @@ public class FlinkStateSnapshotUtilsTest { private KubernetesClient client; - - private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration()); private static final String NAMESPACE = "test"; private static final String SAVEPOINT_NAME = "savepoint-01"; private static final String SAVEPOINT_PATH = "/tmp/savepoint-01"; @@ -98,87 +90,6 @@ public void testGetSnapshotTriggerType() { .isEqualTo(SnapshotTriggerType.PERIODIC); } - @Test - public void testGetValidatedFlinkStateSnapshotPathPathGiven() { - var snapshotRef = FlinkStateSnapshotReference.builder().path(SAVEPOINT_PATH).build(); - var snapshotResult = - FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(client, snapshotRef); - assertEquals(SAVEPOINT_PATH, snapshotResult); - } - - @Test - public void testGetValidatedFlinkStateSnapshotPathFoundResource() { - var snapshot = initSavepoint(COMPLETED, null); - client.resource(snapshot).create(); - - var snapshotRef = - FlinkStateSnapshotReference.builder() - .namespace(NAMESPACE) - .name(SAVEPOINT_NAME) - .build(); - var snapshotResult = - FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(client, snapshotRef); - assertEquals(SAVEPOINT_PATH, snapshotResult); - } - - @Test - public void testGetValidatedFlinkStateSnapshotPathInvalidName() { - var snapshotRef = - FlinkStateSnapshotReference.builder().namespace(NAMESPACE).name(" ").build(); - assertThrows( - IllegalArgumentException.class, - () -> - FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath( - client, snapshotRef)); - } - - @Test - public void testGetValidatedFlinkStateSnapshotPathNotFound() { - var snapshotRef = - FlinkStateSnapshotReference.builder() - .namespace("not-exists") - .name("not-exists") - .build(); - assertThrows( - IllegalStateException.class, - () -> - FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath( - client, snapshotRef)); - } - - @Test - public void testGetAndValidateFlinkStateSnapshotAlreadyExists() { - var snapshot = initSavepoint(TRIGGER_PENDING, null); - snapshot.getSpec().getSavepoint().setAlreadyExists(true); - client.resource(snapshot).create(); - - var snapshotRef = - FlinkStateSnapshotReference.builder() - .namespace(NAMESPACE) - .name(SAVEPOINT_NAME) - .build(); - var snapshotResult = - FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath(client, snapshotRef); - assertEquals(SAVEPOINT_PATH, snapshotResult); - } - - @Test - public void testGetValidatedFlinkStateSnapshotPathNotCompleted() { - var snapshot = initSavepoint(IN_PROGRESS, null); - client.resource(snapshot).create(); - - var snapshotRef = - FlinkStateSnapshotReference.builder() - .namespace(NAMESPACE) - .name(SAVEPOINT_NAME) - .build(); - assertThrows( - IllegalStateException.class, - () -> - FlinkStateSnapshotUtils.getValidatedFlinkStateSnapshotPath( - client, snapshotRef)); - } - @Test public void testGetFlinkStateSnapshotsForResource() { var deployment = initDeployment(); @@ -298,45 +209,25 @@ public void testCreateReferenceForUpgradeSavepointWithResource(boolean disposeOn var conf = new Configuration(); conf.set(OPERATOR_JOB_SAVEPOINT_DISPOSE_ON_DELETE, disposeOnDelete); var operatorConf = FlinkOperatorConfiguration.fromConfiguration(conf); - var result = - FlinkStateSnapshotUtils.createReferenceForUpgradeSavepoint( - conf, - operatorConf, - client, - deployment, - SavepointFormatType.CANONICAL, - SAVEPOINT_PATH); + FlinkStateSnapshotUtils.createUpgradeSnapshotResource( + conf, + operatorConf, + client, + deployment, + SavepointFormatType.CANONICAL, + SAVEPOINT_PATH); var snapshots = TestUtils.getFlinkStateSnapshotsForResource(client, deployment); assertThat(snapshots) .hasSize(1) .allSatisfy( snapshot -> { - assertEquals(snapshot.getMetadata().getName(), result.getName()); - assertEquals( - snapshot.getMetadata().getNamespace(), result.getNamespace()); assertEquals( disposeOnDelete, snapshot.getSpec().getSavepoint().getDisposeOnDelete()); + assertEquals( + SAVEPOINT_PATH, snapshot.getSpec().getSavepoint().getPath()); + assertTrue(snapshot.getSpec().getSavepoint().getAlreadyExists()); }); - assertNull(result.getPath()); - } - - @Test - public void testCreateReferenceForUpgradeSavepointWithPath() { - var deployment = initDeployment(); - var conf = new Configuration().set(SNAPSHOT_RESOURCE_ENABLED, false); - var operatorConf = FlinkOperatorConfiguration.fromConfiguration(conf); - var result = - FlinkStateSnapshotUtils.createReferenceForUpgradeSavepoint( - conf, - operatorConf, - client, - deployment, - SavepointFormatType.CANONICAL, - SAVEPOINT_PATH); - assertEquals(SAVEPOINT_PATH, result.getPath()); - assertNull(result.getNamespace()); - assertNull(result.getName()); } @Test diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java index 23720df7d1..37f05309e5 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java @@ -22,7 +22,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; -import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference; import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; @@ -319,20 +318,12 @@ public void testLastSavepointKnown() { sp.setLocation(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH); assertFalse(SnapshotUtils.lastSavepointKnown(status)); - status.getJobStatus() - .setUpgradeSnapshotReference(FlinkStateSnapshotReference.fromPath("sp1")); + status.getJobStatus().setUpgradeSavepointPath("sp1"); assertTrue(SnapshotUtils.lastSavepointKnown(status)); status.getJobStatus() - .setUpgradeSnapshotReference( - FlinkStateSnapshotReference.fromPath( - AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH)); + .setUpgradeSavepointPath(AbstractJobReconciler.LAST_STATE_DUMMY_SP_PATH); assertFalse(SnapshotUtils.lastSavepointKnown(status)); - - status.getJobStatus() - .setUpgradeSnapshotReference( - new FlinkStateSnapshotReference("namespace", "name", null)); - assertTrue(SnapshotUtils.lastSavepointKnown(status)); } private static void resetTrigger(FlinkDeployment deployment, SnapshotType snapshotType) { diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java index 69ce55ed8c..f5d0712f10 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java @@ -31,7 +31,6 @@ import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; import org.apache.flink.kubernetes.operator.api.spec.CheckpointSpec; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; -import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference; import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.api.spec.IngressSpec; import org.apache.flink.kubernetes.operator.api.spec.JobKind; @@ -527,16 +526,6 @@ public void testValidationWithoutDefaultConfig() { dep.getSpec().getTaskManager().getResource().setEphemeralStorage("abc"); }, "TaskManager resource ephemeral storage parse error: Character a is neither a decimal digit number, decimal point, nor \"e\" notation exponential mark."); - - testError( - dep -> { - dep.getSpec() - .getJob() - .setFlinkStateSnapshotReference( - FlinkStateSnapshotReference.builder().name("snapshot").build()); - dep.getSpec().getJob().setInitialSavepointPath("s0"); - }, - "Cannot set both initialSavepointPath and flinkStateSnapshotReference in the job spec"); } @Test @@ -615,9 +604,8 @@ public void testSavepointRedeployValidation() { .serializeAndSetLastReconciledSpec(dep.getSpec(), dep); job.setSavepointRedeployNonce(1L); job.setInitialSavepointPath(null); - job.setFlinkStateSnapshotReference(null); }, - "InitialSavepointPath and flinkStateSnapshotReference must not be empty for savepoint redeployment"); + "InitialSavepointPath must not be empty for savepoint redeployment"); testError( dep -> { @@ -628,7 +616,7 @@ public void testSavepointRedeployValidation() { job.setSavepointRedeployNonce(1L); job.setInitialSavepointPath(" "); }, - "InitialSavepointPath and flinkStateSnapshotReference must not be empty for savepoint redeploymen"); + "InitialSavepointPath must not be empty for savepoint redeploymen"); testError( dep -> { @@ -642,7 +630,7 @@ public void testSavepointRedeployValidation() { job.setSavepointRedeployNonce(2L); job.setInitialSavepointPath(null); }, - "InitialSavepointPath and flinkStateSnapshotReference must not be empty for savepoint redeploymen"); + "InitialSavepointPath must not be empty for savepoint redeploymen"); } @ParameterizedTest @@ -1072,19 +1060,17 @@ public void testFlinkStateSnapshotValidator() { null); var refName = "does-not-exist"; - var namespace = "default"; var snapshot = TestUtils.buildFlinkStateSnapshotSavepoint( false, JobReference.builder() .kind(JobKind.FLINK_DEPLOYMENT) .name(refName) - .namespace(namespace) .build()); testStateSnapshotValidate( snapshot, Optional.empty(), - String.format("Target for snapshot %s/%s was not found", namespace, refName)); + String.format("Target for snapshot test/%s was not found", refName)); } private void testStateSnapshotValidateWithModifier( diff --git a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml index 6aabf7262c..b4c2d4c0f4 100644 --- a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml +++ b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml @@ -88,15 +88,6 @@ spec: type: integer entryClass: type: string - flinkStateSnapshotReference: - properties: - name: - type: string - namespace: - type: string - path: - type: string - type: object initialSavepointPath: type: string jarURI: @@ -10367,15 +10358,8 @@ spec: type: string updateTime: type: string - upgradeSnapshotReference: - properties: - name: - type: string - namespace: - type: string - path: - type: string - type: object + upgradeSavepointPath: + type: string type: object lifecycleState: enum: diff --git a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml index 0df1aa8791..b7ea567d5b 100644 --- a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml +++ b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml @@ -49,15 +49,6 @@ spec: type: integer entryClass: type: string - flinkStateSnapshotReference: - properties: - name: - type: string - namespace: - type: string - path: - type: string - type: object initialSavepointPath: type: string jarURI: @@ -209,15 +200,8 @@ spec: type: string updateTime: type: string - upgradeSnapshotReference: - properties: - name: - type: string - namespace: - type: string - path: - type: string - type: object + upgradeSavepointPath: + type: string type: object lifecycleState: enum: diff --git a/helm/flink-kubernetes-operator/crds/flinkstatesnapshots.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkstatesnapshots.flink.apache.org-v1.yml index 96ba2ca133..3f2f3475de 100644 --- a/helm/flink-kubernetes-operator/crds/flinkstatesnapshots.flink.apache.org-v1.yml +++ b/helm/flink-kubernetes-operator/crds/flinkstatesnapshots.flink.apache.org-v1.yml @@ -48,8 +48,6 @@ spec: type: string name: type: string - namespace: - type: string type: object savepoint: properties: