Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33634] Add Conditions to Flink CRD's Status field #749

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/content/docs/custom-resource/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ spec:
...
status:
clusterInfo:
...
conditions:

...
jobManagerDeploymentStatus: READY
jobStatus:
Expand Down
2 changes: 2 additions & 0 deletions docs/content/docs/custom-resource/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ This page serves as a full reference for FlinkDeployment custom resource definit
| jobManagerDeploymentStatus | org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus | Last observed status of the JobManager deployment. |
| reconciliationStatus | org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus | Status of the last reconcile operation. |
| taskManager | org.apache.flink.kubernetes.operator.api.status.TaskManagerInfo | Information about the TaskManagers for the scale subresource. |
| conditions | io.fabric8.kubernetes.api.model.Condition | The conditions array is a set of types with a status that make up the computed state of a FlinkDeployment at any time |

### FlinkSessionJobReconciliationStatus
**Class**: org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobReconciliationStatus
Expand All @@ -272,6 +273,7 @@ This page serves as a full reference for FlinkDeployment custom resource definit
| error | java.lang.String | Error information about the FlinkDeployment/FlinkSessionJob. |
| lifecycleState | org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState | Lifecycle state of the Flink resource (including being rolled back, failed etc.). |
| reconciliationStatus | org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobReconciliationStatus | Status of the last reconcile operation. |
| conditions | io.fabric8.kubernetes.api.model.Condition | The conditions array is a set of types with a status that make up the computed state of a FlinkSessionJob at any time |

### JobManagerDeploymentStatus
**Class**: org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.utils.ConditionUtils;

import com.fasterxml.jackson.annotation.JsonIgnore;
import io.fabric8.kubernetes.api.model.Condition;
import io.fabric8.kubernetes.model.annotation.PrinterColumn;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.List;

/** Last observed common status of the Flink deployment/Flink SessionJob. */
@Experimental
@Data
Expand All @@ -51,6 +56,8 @@ public abstract class CommonStatus<SPEC extends AbstractFlinkSpec> {
// column.
private ResourceLifecycleState lifecycleState;

private List<Condition> conditions = new ArrayList<>();

/**
* Current reconciliation status of this resource.
*
Expand Down Expand Up @@ -101,4 +108,67 @@ public ResourceLifecycleState getLifecycleState() {
* loop immediately. For example autoscaler overrides have changed and we need to apply them.
*/
@JsonIgnore @Internal private boolean immediateReconciliationNeeded = false;

public List<Condition> getConditions() {
switch (getLifecycleState()) {
case CREATED:
updateConditionIfNotExist(
conditions,
ConditionUtils.notReady(
"The resource was created in Kubernetes but not yet handled by the operator"));
break;
case SUSPENDED:
updateConditionIfNotExist(
conditions,
ConditionUtils.notReady("The resource (job) has been suspended"));
break;
case UPGRADING:
updateConditionIfNotExist(
conditions, ConditionUtils.notReady("The resource is being upgraded"));
break;
case DEPLOYED:
updateConditionIfNotExist(
conditions,
ConditionUtils.ready(
"The resource is deployed, but it’s not yet considered to be stable and might be rolled back in the future"));
break;
case ROLLING_BACK:
updateConditionIfNotExist(
conditions,
ConditionUtils.notReady(
"The resource is being rolled back to the last stable spec"));
break;
case ROLLED_BACK:
updateConditionIfNotExist(
conditions,
ConditionUtils.ready("The resource is deployed with the last stable spec"));
Comment on lines +115 to +144
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a lot of duplicated code and copy-pasted strings from the ResourceLifeCycleState , I think we should add this conversion to the enum (or a utility class using the enum directly)

break;
case FAILED:
updateConditionIfNotExist(conditions, ConditionUtils.error("failed"));
break;
case STABLE:
updateConditionIfNotExist(
conditions,
ConditionUtils.ready(
"The resource deployment is considered to be stable and won’t be rolled back"));
break;
}

return conditions;
}

private void updateConditionIfNotExist(List<Condition> conditions, Condition newCondition) {
if (conditions.isEmpty()) {
conditions.add(newCondition);
}
if (conditions.stream()
.noneMatch(condition -> condition.getType().equals(newCondition.getType()))) {
conditions.add(newCondition);
} else if (conditions.removeIf(
condition ->
!(condition.getReason().equals(newCondition.getReason())
&& condition.getMessage().equals(newCondition.getMessage())))) {
conditions.add(newCondition);
}
Comment on lines +164 to +172
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain what this is supposed to do exactly (and also add some docs to the code)? I am a bit confused by the logic

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will add the docs to code. above logic is to make sure that rather to blindly replace any existing conditions with new one , check for existing condition with same type and replace only if the same condition type has different message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I wrote in the last comment, I think we need a FLIP for this instead of fixing up this PR

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.operator.api.utils;

import io.fabric8.kubernetes.api.model.Condition;
import io.fabric8.kubernetes.api.model.ConditionBuilder;

import java.text.SimpleDateFormat;
import java.util.Date;

/** Status of CR. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect javadoc

public class ConditionUtils {

public static Condition ready(final String message) {
return crCondition("Ready", "True", message, "Ready");
}

public static Condition notReady(final String message) {
return crCondition("Ready", "False", message, "Progressing");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why we arbitrarily chose to have Ready and Error conditions? Why not simply use the ResourceLifecycleState name as the type and description as the message?

}

public static Condition error(final String message) {
return crCondition("Error", "True", message, "The job terminally failed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think error means that the job terminally failed in all cases. There are also operator side errors like reconciliation problems etc.

}

public static Condition crCondition(
final String type, final String status, final String message, final String reason) {
return new ConditionBuilder()
.withType(type)
.withStatus(status)
.withMessage(message)
.withReason(reason)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need a reason, I suggest let's just remove it.

Copy link
Author

@lajith2006 lajith2006 Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per the doc , https://maelvls.dev/kubernetes-conditions/ , looks like we can keep reason as simple category of cause of the current status.

Reason is intended to be used in concise output, such as one-line kubectl get output, and in summarizing occurrences of causes, whereas Message is intended to be presented to users in detailed status explanations, such as kubectl describe output.

I was thinking probably we can keep that for any not ready and error conditions, and not required for ready condition?.

.withLastTransitionTime(
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'").format(new Date()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't Instant.now().toString() work?

.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import static org.apache.flink.kubernetes.operator.TestUtils.MAX_RECONCILE_TIMES;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED;
import static org.apache.flink.kubernetes.operator.utils.EventRecorder.Reason.ValidationError;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
Expand Down Expand Up @@ -131,6 +132,11 @@ public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) throws Exception
appCluster.getStatus().getJobStatus().getState());
assertEquals(7, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
assertThat(appCluster.getStatus().getConditions())
.hasSize(1)
.extracting("message")
.contains(
"The resource deployment is considered to be stable and won’t be rolled back");

FlinkDeploymentReconciliationStatus reconciliationStatus =
appCluster.getStatus().getReconciliationStatus();
Expand Down Expand Up @@ -1170,6 +1176,11 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E
assertNull(appCluster.getStatus().getError());
assertEquals(appCluster.getSpec(), reconciliationStatus.deserializeLastReconciledSpec());
assertNull(appCluster.getStatus().getReconciliationStatus().getLastStableSpec());
assertThat(appCluster.getStatus().getConditions())
.hasSize(1)
.extracting("message")
.contains(
"The resource is deployed, but it’s not yet considered to be stable and might be rolled back in the future");

updateControl = testController.reconcile(appCluster, context);
assertEquals(
Expand All @@ -1179,6 +1190,11 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E
org.apache.flink.api.common.JobStatus.RECONCILING.name(),
appCluster.getStatus().getJobStatus().getState());
assertEquals(5, testController.getInternalStatusUpdateCount());
assertThat(appCluster.getStatus().getConditions())
.hasSize(1)
.extracting("message")
.contains(
"The resource is deployed, but it’s not yet considered to be stable and might be rolled back in the future");
assertFalse(updateControl.isUpdateStatus());
assertEquals(
Optional.of(
Expand All @@ -1193,6 +1209,11 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E
org.apache.flink.api.common.JobStatus.RUNNING.name(),
appCluster.getStatus().getJobStatus().getState());
assertEquals(6, testController.getInternalStatusUpdateCount());
assertThat(appCluster.getStatus().getConditions())
.hasSize(1)
.extracting("message")
.contains(
"The resource deployment is considered to be stable and won’t be rolled back");
assertFalse(updateControl.isUpdateStatus());
assertEquals(
Optional.of(
Expand All @@ -1207,13 +1228,24 @@ private void verifyReconcileNormalLifecycle(FlinkDeployment appCluster) throws E
assertEquals(
org.apache.flink.api.common.JobStatus.RUNNING.name(),
appCluster.getStatus().getJobStatus().getState());
assertThat(appCluster.getStatus().getConditions())
.hasSize(1)
.extracting("message")
.contains(
"The resource deployment is considered to be stable and won’t be rolled back");
assertEquals(6, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
assertEquals(
Optional.of(
configManager.getOperatorConfiguration().getReconcileInterval().toMillis()),
updateControl.getScheduleDelay());

assertThat(appCluster.getStatus().getConditions())
.hasSize(1)
.extracting("message")
.contains(
"The resource deployment is considered to be stable and won’t be rolled back");

// Validate job status
JobStatus jobStatus = appCluster.getStatus().getJobStatus();
JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

import static org.apache.flink.kubernetes.operator.TestUtils.MAX_RECONCILE_TIMES;
import static org.apache.flink.kubernetes.operator.utils.EventRecorder.Reason.ValidationError;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -118,6 +119,11 @@ public void verifyBasicReconcileLoop() throws Exception {
assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
assertEquals(5, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
assertThat(sessionJob.getStatus().getConditions())
.hasSize(1)
.extracting("message")
.contains(
"The resource deployment is considered to be stable and won’t be rolled back");

FlinkSessionJobReconciliationStatus reconciliationStatus =
sessionJob.getStatus().getReconciliationStatus();
Expand Down Expand Up @@ -617,5 +623,10 @@ private void verifyNormalBasicReconcileLoop(FlinkSessionJob sessionJob) throws E
assertEquals(
sessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec(),
sessionJob.getStatus().getReconciliationStatus().getLastStableSpec());
assertThat(sessionJob.getStatus().getConditions())
.hasSize(1)
.extracting("message")
.contains(
"The resource deployment is considered to be stable and won’t be rolled back");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9843,6 +9843,23 @@ spec:
- ROLLED_BACK
- FAILED
type: string
conditions:
items:
properties:
lastTransitionTime:
type: string
message:
type: string
observedGeneration:
type: integer
reason:
type: string
status:
type: string
type:
type: string
type: object
type: array
type: object
type: object
served: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,23 @@ spec:
- ROLLED_BACK
- FAILED
type: string
conditions:
items:
properties:
lastTransitionTime:
type: string
message:
type: string
observedGeneration:
type: integer
reason:
type: string
status:
type: string
type:
type: string
type: object
type: array
type: object
type: object
served: true
Expand Down
Loading