Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37088][k8s]support flink applicaton taskmanager log mounting #25944

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,23 @@ public class KubernetesConfigOptions {
"Whether to enable HostNetwork mode. "
+ "The HostNetwork allows the pod could use the node network namespace instead of the individual pod network namespace. Please note that the JobManager service account should have the permission to update Kubernetes service.");



public static final ConfigOption<String> KUBERNETES_LOG_VOLUME_HOSTPATH =
key("kubernetes.log.volume.hostpath")
.stringType()
.defaultValue("/apps/log/flink")
.withDescription("flink host machine log path");


public static final ConfigOption<String> KUBERNETES_LOG_VOLUMES_MOUNT_MOUNTPATH =
key("kubernetes.log.volumemounts.mountpath")
Copy link
Contributor

@davidradl davidradl Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a k8s expert. I have some basic questions:

  • we should document these new config options.
  • are there not issues in supplying default locations to write logs in a PR, as suddenly there will be a lot of data written to locations that was previously not written. I would assume that /apps/log/flink and /opt/flink/log are folders and could get full - so size would need to be managed; maybe having criteria to delete / archive older logs. If there is new management required of these logs, then the considerations should be documented.
  • It would be helpful to describe in the Jira in more detail what the problem is and how this fixes it.
    • what is the life cycle of HostPathVolumeSource - it it associated with the pod, so are we trying to keep
      logs that would have disappeared with the container, in storage associated with the pod and we would
      lose the logs if the pods die.
  • I wonder what the impact is of enabling this for all containers after this PR.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for your review. Yes, we can configure a container's PersistentVolume (PV) and PersistentVolumeClaim (PVC) to achieve log mounting.
However, when starting a Flink application via code invocation, we found that the log mounting was not effective. Therefore, we modified this section of the code to implement log mounting and added a toggle kubernetes.decorator.log-mount.enabled. If users encounter the same situation as us, they only need to enable this toggle to achieve log mounting; it is disabled by default.

.stringType()
.defaultValue("/opt/flink/log")
.withDescription("flink container log path .");



public static final ConfigOption<String> KUBERNETES_CLIENT_USER_AGENT =
key("kubernetes.client.user-agent")
.stringType()
Expand Down Expand Up @@ -537,6 +554,13 @@ public class KubernetesConfigOptions {
+ "Flink. A typical use-case is when one uses Flink Kubernetes "
+ "Operator.");

public static final ConfigOption<Boolean> KUBERNETES_LOG_MOUNT_DECORATOR_ENABLED =
key("kubernetes.decorator.log-mount.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to enable log mount decorator. ");

public static final ConfigOption<Boolean> LOCAL_UPLOAD_ENABLED =
ConfigOptions.key("kubernetes.artifacts.local-upload-enabled")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.kubeclient.decorators;

import static org.apache.flink.kubernetes.utils.Constants.FLINK_LOG;
import static org.apache.flink.util.Preconditions.checkNotNull;

import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.HostPathVolumeSourceBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Volume;
import io.fabric8.kubernetes.api.model.VolumeBuilder;
import io.fabric8.kubernetes.api.model.VolumeMount;
import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;

/** Support mounting logs on the JobManager or TaskManager pod.. */
public class FlinkLogDecorator extends AbstractKubernetesStepDecorator {

private final AbstractKubernetesParameters kubernetesComponentConf;

public FlinkLogDecorator(AbstractKubernetesParameters kubernetesComponentConf) {
this.kubernetesComponentConf = checkNotNull(kubernetesComponentConf);
}

@Override
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
final Pod podWithMount = decoratePod(flinkPod.getPodWithoutMainContainer());
final Container containerWithMount = decorateMainContainer(flinkPod.getMainContainer());

return new FlinkPod.Builder(flinkPod)
.withPod(podWithMount)
.withMainContainer(containerWithMount)
.build();


}

private Container decorateMainContainer(Container container) {

VolumeMount volumeMount = new VolumeMountBuilder().withName(FLINK_LOG)
.withMountPath(kubernetesComponentConf.getVolumeMountPath()).build();

return new ContainerBuilder(container).addToVolumeMounts(volumeMount).build();
}

private Pod decoratePod(Pod pod) {

Volume volume = new VolumeBuilder().withName(FLINK_LOG).withHostPath(
new HostPathVolumeSourceBuilder().withPath(kubernetesComponentConf.getVolumeLogs())
.withType("DirectoryOrCreate").build()).build();



return new PodBuilder(pod).editOrNewSpec().addToVolumes(volume).endSpec().build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.kubernetes.kubeclient.decorators.EnvSecretsDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.FlinkLogDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.InitJobManagerDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator;
Expand Down Expand Up @@ -53,6 +54,7 @@

import static org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.KUBERNETES_HADOOP_CONF_MOUNT_DECORATOR_ENABLED;
import static org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.KUBERNETES_KERBEROS_MOUNT_DECORATOR_ENABLED;
import static org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.KUBERNETES_LOG_MOUNT_DECORATOR_ENABLED;

/**
* Utility class for constructing all the Kubernetes components on the client-side. This can include
Expand Down Expand Up @@ -84,6 +86,11 @@ public static KubernetesJobManagerSpecification buildKubernetesJobManagerSpecifi
stepDecorators.add(new KerberosMountDecorator(kubernetesJobManagerParameters));
}

if (configuration.get(KUBERNETES_LOG_MOUNT_DECORATOR_ENABLED)) {
stepDecorators.add(new FlinkLogDecorator(kubernetesJobManagerParameters));
}


stepDecorators.addAll(
Arrays.asList(
new FlinkConfMountDecorator(kubernetesJobManagerParameters),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.kubernetes.kubeclient.decorators.CmdTaskManagerDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.EnvSecretsDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.FlinkLogDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.InitTaskManagerDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator;
Expand All @@ -41,6 +42,7 @@

import static org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.KUBERNETES_HADOOP_CONF_MOUNT_DECORATOR_ENABLED;
import static org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.KUBERNETES_KERBEROS_MOUNT_DECORATOR_ENABLED;
import static org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.KUBERNETES_LOG_MOUNT_DECORATOR_ENABLED;

/** Utility class for constructing the TaskManager Pod on the JobManager. */
public class KubernetesTaskManagerFactory {
Expand All @@ -65,6 +67,10 @@ public static KubernetesPod buildTaskManagerKubernetesPod(
stepDecorators.add(new KerberosMountDecorator(kubernetesTaskManagerParameters));
}

if (configuration.get(KUBERNETES_LOG_MOUNT_DECORATOR_ENABLED)) {
stepDecorators.add(new FlinkLogDecorator(kubernetesTaskManagerParameters));
}

stepDecorators.add(new FlinkConfMountDecorator(kubernetesTaskManagerParameters));

for (KubernetesStepDecorator stepDecorator : stepDecorators) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,15 @@ public List<Map<String, String>> getEnvironmentsFromSecrets() {
public boolean isHostNetworkEnabled() {
return flinkConfig.get(KubernetesConfigOptions.KUBERNETES_HOSTNETWORK_ENABLED);
}


public String getVolumeLogs() {
return flinkConfig.get(KubernetesConfigOptions.KUBERNETES_LOG_VOLUME_HOSTPATH);

}

public String getVolumeMountPath() {
return flinkConfig.get(KubernetesConfigOptions.KUBERNETES_LOG_VOLUMES_MOUNT_MOUNTPATH);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,6 @@ public class Constants {
public static final String KUBERNETES_ZERO_RESOURCE_VERSION = "0";

public static final String USER_ARTIFACTS_VOLUME = "user-artifacts-volume";

public static final String FLINK_LOG = "flink-log";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package org.apache.flink.kubernetes.kubeclient.decorators;


import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;

import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Volume;
import io.fabric8.kubernetes.api.model.VolumeMount;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class FlinkLogDecoratorTest {

private static final String VOLUME_MOUNT_PATH = "/apps/log/flink";
private static final String VOLUME_LOGS = "/opt/flink/log";

@Mock
private AbstractKubernetesParameters kubernetesComponentConf;

@InjectMocks
private FlinkLogDecorator flinkLogDecorator;

@Before
public void setUp() {
when(kubernetesComponentConf.getVolumeMountPath()).thenReturn(VOLUME_MOUNT_PATH);
when(kubernetesComponentConf.getVolumeLogs()).thenReturn(VOLUME_LOGS);
}

@Test
public void decorateFlinkPod_ShouldAddVolumeAndVolumeMount() {
FlinkPod originalFlinkPod = new FlinkPod.Builder()
.withPod(new PodBuilder().build())
.withMainContainer(new ContainerBuilder().build())
.build();

FlinkPod decoratedFlinkPod = flinkLogDecorator.decorateFlinkPod(originalFlinkPod);

Pod decoratedPod = decoratedFlinkPod.getPodWithoutMainContainer();
Container decoratedContainer = decoratedFlinkPod.getMainContainer();

assertEquals(1, decoratedPod.getSpec().getVolumes().size());
Volume volume = decoratedPod.getSpec().getVolumes().get(0);
assertEquals("flink-log", volume.getName());
assertEquals(VOLUME_LOGS, volume.getHostPath().getPath());
assertEquals("DirectoryOrCreate", volume.getHostPath().getType());

assertEquals(1, decoratedContainer.getVolumeMounts().size());
VolumeMount volumeMount = decoratedContainer.getVolumeMounts().get(0);
assertEquals("flink-log", volumeMount.getName());
assertEquals(VOLUME_MOUNT_PATH, volumeMount.getMountPath());
}
}