Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35414] Rework last-state upgrade mode to support job cancellation as suspend mechanism #871

Merged
merged 8 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions docs/content/docs/custom-resource/job-management.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ Supported values: `stateless`, `savepoint`, `last-state`

The `upgradeMode` setting controls both the stop and restore mechanisms as detailed in the following table:

| | Stateless | Last State | Savepoint |
|------------------------|-------------------------|--------------------------------------------|----------------------------------------|
| Config Requirement | None | Checkpointing & HA Enabled | Checkpoint/Savepoint directory defined |
| Job Status Requirement | None | HA metadata available | Job Running* |
| Suspend Mechanism | Cancel / Delete | Delete Flink deployment (keep HA metadata) | Cancel with savepoint |
| Restore Mechanism | Deploy from empty state | Recover last state using HA metadata | Restore From savepoint |
| Production Use | Not recommended | Recommended | Recommended |
| | Stateless | Last State | Savepoint |
|------------------------|-----------------|------------------------------------|----------------------------------------|
| Config Requirement | None | Checkpointing Enabled | Checkpoint/Savepoint directory defined |
| Job Status Requirement | None | Job or HA metadata accessible | Job Running* |
| Suspend Mechanism | Cancel / Delete | Cancel / Delete (keep HA metadata) | Cancel with savepoint |
| Restore Mechanism | Empty state | Use HA metadata or last cp/sp | Restore From savepoint |
| Production Use | Not recommended | Recommended | Recommended |


*\* When HA is enabled the `savepoint` upgrade mode may fall back to the `last-state` behaviour in cases where the job is in an unhealthy state.*
Expand Down Expand Up @@ -149,10 +149,6 @@ spec:
state: running
```

{{< hint warning >}}
Last state upgrade mode is currently only supported for `FlinkDeployments`.
{{< /hint >}}

### Application restarts without spec change

There are cases when users would like to restart the Flink deployments to deal with some transient problem.
Expand Down
4 changes: 0 additions & 4 deletions docs/content/docs/custom-resource/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,6 @@ COPY flink-hadoop-fs-1.19-SNAPSHOT.jar $FLINK_PLUGINS_DIR/hadoop-fs/

Alternatively, if you use helm to install flink-kubernetes-operator, it allows you to specify a postStart hook to download the required plugins.

### Limitations

- Last-state upgradeMode is currently not supported for FlinkSessionJobs

## Further information

- [Snapshots]({{< ref "docs/custom-resource/snapshots" >}})
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/custom-resource/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r

| Parameter | Type | Docs |
| ----------| ---- | ---- |
| lastSavepoint | org.apache.flink.kubernetes.operator.api.status.Savepoint | Last completed savepoint by the operator for manual and periodic snapshots. Only used if FlinkStateSnapshot resources are disabled. |
| lastSavepoint | org.apache.flink.kubernetes.operator.api.status.Savepoint | Last completed savepoint by the operator. |
| triggerId | java.lang.String | Trigger id of a pending savepoint operation. |
| triggerTimestamp | java.lang.Long | Trigger timestamp of a pending savepoint operation. |
| triggerType | org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType | Savepoint trigger mechanism. |
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/dynamic_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@
<td>Boolean</td>
<td>Enables last-state fallback for savepoint upgrade mode. When the job is not running thus savepoint cannot be triggered but HA metadata is available for last state restore the operator can initiate the upgrade process when the flag is enabled.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.job.upgrade.last-state.job-cancel.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Cancel jobs during last-state upgrade. This config is ignored for session jobs where cancel is the only mechanism to perform this type of upgrade.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@
<td>Boolean</td>
<td>Enables last-state fallback for savepoint upgrade mode. When the job is not running thus savepoint cannot be triggered but HA metadata is available for last state restore the operator can initiate the upgrade process when the flag is enabled.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.job.upgrade.last-state.job-cancel.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Cancel jobs during last-state upgrade. This config is ignored for session jobs where cancel is the only mechanism to perform this type of upgrade.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
19 changes: 19 additions & 0 deletions e2e-tests/test_sessionjob_operations.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ if [ "$location" == "" ];then
exit 1
fi

echo "Starting sessionjob savepoint upgrade test"
# Testing savepoint mode upgrade
# Update the FlinkSessionJob and trigger the savepoint upgrade
kubectl patch sessionjob ${SESSION_JOB_NAME} --type merge --patch '{"spec":{"job": {"parallelism": 1 } } }'
Expand All @@ -67,6 +68,24 @@ assert_available_slots 1 $CLUSTER_ID

echo "Successfully run the sessionjob savepoint upgrade test"

flink_version=$(kubectl get $SESSION_CLUSTER_IDENTIFIER -o yaml | yq '.spec.flinkVersion')

if [ "$flink_version" != "v1_16" ]; then
echo "Starting sessionjob last-state upgrade test"
# Testing last-state mode upgrade
# Update the FlinkSessionJob and trigger the last-state upgrade
kubectl patch sessionjob ${SESSION_JOB_NAME} --type merge --patch '{"spec":{"job": {"parallelism": 2, "upgradeMode": "last-state" } } }'

# Check the job was restarted with the new parallelism
wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' CANCELLING ${TIMEOUT} || exit 1
wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
assert_available_slots 0 $CLUSTER_ID

echo "Successfully run the sessionjob last-state upgrade test"
else
echo "Skipping last-state test for flink version 1.16"
fi

# Test Operator restart
echo "Delete session job " + $SESSION_JOB_NAME
kubectl delete flinksessionjob $SESSION_JOB_NAME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ public Void scan(Element e, Integer depth) {
}
break;
case FIELD:
if (e.getModifiers().contains(Modifier.STATIC)) {
return null;
}
out.println(
"| "
+ getNameOrJsonPropValue(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
package org.apache.flink.kubernetes.operator.api.status;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;

import com.fasterxml.jackson.annotation.JsonIgnore;
import io.fabric8.crd.generator.annotation.PrinterColumn;
import lombok.AllArgsConstructor;
import lombok.Data;
Expand Down Expand Up @@ -98,10 +96,4 @@ public ResourceLifecycleState getLifecycleState() {

return ResourceLifecycleState.DEPLOYED;
}

/**
* Internal flag to signal that due to some condition we need to schedule a new reconciliation
* loop immediately. For example autoscaler overrides have changed and we need to apply them.
*/
@JsonIgnore @Internal private boolean immediateReconciliationNeeded = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.flink.annotation.Experimental;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
import org.apache.flink.kubernetes.operator.api.utils.SpecWithMeta;

Expand Down Expand Up @@ -100,22 +99,4 @@ public boolean isLastReconciledSpecStable() {
public boolean isBeforeFirstDeployment() {
return lastReconciledSpec == null;
}

/**
* This method is only here for backward compatibility reasons. The current version of the
* operator does not leave the resources in UPGRADING state during in-place scaling therefore
* this method will always return false.
*
* @return True if in-place scaling is in progress.
*/
@JsonIgnore
@Deprecated
public boolean scalingInProgress() {
if (isBeforeFirstDeployment() || state != ReconciliationState.UPGRADING) {
return false;
}
var job = deserializeLastReconciledSpec().getJob();
// For regular full upgrades the jobstate is suspended in UPGRADING state
return job != null && job.getState() == JobState.RUNNING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -40,6 +42,9 @@ public class SavepointInfo implements SnapshotInfo {
* Last completed savepoint by the operator for manual and periodic snapshots. Only used if
* FlinkStateSnapshot resources are disabled.
*/
private static final Logger LOG = LoggerFactory.getLogger(SavepointInfo.class);

/** Last completed savepoint by the operator. */
private Savepoint lastSavepoint;

/** Trigger id of a pending savepoint operation. */
Expand Down Expand Up @@ -82,7 +87,11 @@ public void resetTrigger() {
* @param savepoint Savepoint to be added.
*/
public void updateLastSavepoint(Savepoint savepoint) {
if (lastSavepoint == null || !lastSavepoint.getLocation().equals(savepoint.getLocation())) {
if (savepoint == null) {
lastSavepoint = null;
} else if (lastSavepoint == null
|| !lastSavepoint.getLocation().equals(savepoint.getLocation())) {
LOG.debug("Updating last savepoint to {}", savepoint);
lastSavepoint = savepoint;
savepointHistory.add(savepoint);
if (savepoint.getTriggerType() == SnapshotTriggerType.PERIODIC) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,33 +148,41 @@ protected FlinkConfigBuilder applyFlinkConfiguration() {
REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.ClusterIP);
// Set 'web.cancel.enable' to false to avoid users accidentally cancelling jobs.
setDefaultConf(CANCEL_ENABLE, false);
effectiveConfig.set(FLINK_VERSION, spec.getFlinkVersion());
return this;
}

if (spec.getJob() != null) {
// Set 'pipeline.name' to resource name by default for application deployments.
setDefaultConf(PipelineOptions.NAME, clusterId);

// With last-state upgrade mode, set the default value of
// 'execution.checkpointing.interval'
// to 5 minutes when HA is enabled.
if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE) {
setDefaultConf(
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
DEFAULT_CHECKPOINTING_INTERVAL);
}

// We need to keep the application clusters around for proper operator behaviour
effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false);
if (HighAvailabilityMode.isHighAvailabilityModeActivated(effectiveConfig)) {
setDefaultConf(SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, true);
}
protected static void applyJobConfig(String name, Configuration conf, JobSpec jobSpec) {
// Set 'pipeline.name' to resource name by default for application deployments.
setDefaultConf(conf, PipelineOptions.NAME, name);

// With last-state upgrade mode, set the default value of
// 'execution.checkpointing.interval'
// to 5 minutes when HA is enabled.
if (jobSpec.getUpgradeMode() == UpgradeMode.LAST_STATE) {
setDefaultConf(
ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT,
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
conf,
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
DEFAULT_CHECKPOINTING_INTERVAL);
}
setDefaultConf(
conf,
ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT,
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

effectiveConfig.set(FLINK_VERSION, spec.getFlinkVersion());
return this;
if (jobSpec.getAllowNonRestoredState() != null) {
conf.set(
SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
jobSpec.getAllowNonRestoredState());
}

if (jobSpec.getEntryClass() != null) {
conf.set(ApplicationConfiguration.APPLICATION_MAIN_CLASS, jobSpec.getEntryClass());
}

if (jobSpec.getArgs() != null) {
conf.set(ApplicationConfiguration.APPLICATION_ARGS, Arrays.asList(jobSpec.getArgs()));
}
}

protected FlinkConfigBuilder applyLogConfiguration() throws IOException {
Expand Down Expand Up @@ -304,29 +312,18 @@ protected FlinkConfigBuilder applyJobOrSessionSpec() throws URISyntaxException {
DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());

if (jobSpec.getJarURI() != null) {
final URI uri = new URI(jobSpec.getJarURI());
effectiveConfig.set(
PipelineOptions.JARS, Collections.singletonList(uri.toString()));
PipelineOptions.JARS,
Collections.singletonList(new URI(jobSpec.getJarURI()).toString()));
}

effectiveConfig.set(CoreOptions.DEFAULT_PARALLELISM, getParallelism());

if (jobSpec.getAllowNonRestoredState() != null) {
effectiveConfig.set(
SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
jobSpec.getAllowNonRestoredState());
}

if (jobSpec.getEntryClass() != null) {
effectiveConfig.set(
ApplicationConfiguration.APPLICATION_MAIN_CLASS, jobSpec.getEntryClass());
}
// We need to keep the application clusters around for proper operator behaviour
effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false);
setDefaultConf(SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, true);

if (jobSpec.getArgs() != null) {
effectiveConfig.set(
ApplicationConfiguration.APPLICATION_ARGS,
Arrays.asList(jobSpec.getArgs()));
}
// Generic shared job config logic
applyJobConfig(clusterId, effectiveConfig, jobSpec);
} else {
effectiveConfig.set(
DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName());
Expand Down Expand Up @@ -423,8 +420,12 @@ public static Configuration buildFrom(
}

private <T> void setDefaultConf(ConfigOption<T> option, T value) {
if (!effectiveConfig.contains(option)) {
effectiveConfig.set(option, value);
setDefaultConf(effectiveConfig, option, value);
}

private static <T> void setDefaultConf(Configuration conf, ConfigOption<T> option, T value) {
if (!conf.contains(option)) {
conf.set(option, value);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.function.Consumer;

import static org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY;
import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.applyJobConfig;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.K8S_OP_CONF_PREFIX;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.NAMESPACE_CONF_PREFIX;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL;
Expand Down Expand Up @@ -292,14 +293,15 @@ private void applyConfigsFromCurrentSpec(
* @return Session job config
*/
public Configuration getSessionJobConfig(
FlinkDeployment deployment, FlinkSessionJobSpec sessionJobSpec) {
String name, FlinkDeployment deployment, FlinkSessionJobSpec sessionJobSpec) {
Configuration sessionJobConfig = getObserveConfig(deployment);

// merge session job specific config
var sessionJobFlinkConfiguration = sessionJobSpec.getFlinkConfiguration();
if (sessionJobFlinkConfiguration != null) {
sessionJobFlinkConfiguration.forEach(sessionJobConfig::setString);
}
applyJobConfig(name, sessionJobConfig, sessionJobSpec.getJob());
return sessionJobConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,14 @@ public static String operatorConfigKey(String key) {
.withDescription(
"Max allowed checkpoint age for initiating last-state upgrades on running jobs. If a checkpoint is not available within the desired age (and nothing in progress) a savepoint will be triggered.");

@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Boolean> OPERATOR_JOB_UPGRADE_LAST_STATE_CANCEL_JOB =
operatorConfig("job.upgrade.last-state.job-cancel.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Cancel jobs during last-state upgrade. This config is ignored for session jobs where cancel is the only mechanism to perform this type of upgrade.");

@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Boolean> OPERATOR_HEALTH_PROBE_ENABLED =
operatorConfig("health.probe.enabled")
Expand Down
Loading
Loading