From e575a7e253c4a27d9d9ad092aca9a8f48305690f Mon Sep 17 00:00:00 2001 From: yangfei Date: Fri, 10 Jan 2025 11:40:00 +0800 Subject: [PATCH 1/3] [FLINK-37088][k8s]support flink applicaton taskmanager log mounting --- .../KubernetesConfigOptions.java | 17 ++++ .../decorators/FlinkLogDecorator.java | 77 +++++++++++++++++++ .../factory/KubernetesJobManagerFactory.java | 4 +- .../factory/KubernetesTaskManagerFactory.java | 3 +- .../AbstractKubernetesParameters.java | 11 +++ .../flink/kubernetes/utils/Constants.java | 2 + .../decorators/FlinkLogDecoratorTest.java | 63 +++++++++++++++ 7 files changed, 175 insertions(+), 2 deletions(-) create mode 100644 flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkLogDecorator.java create mode 100644 flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkLogDecoratorTest.java diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java index 271e1479c2183..b95444cc42147 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java @@ -510,6 +510,23 @@ public class KubernetesConfigOptions { "Whether to enable HostNetwork mode. " + "The HostNetwork allows the pod could use the node network namespace instead of the individual pod network namespace. Please note that the JobManager service account should have the permission to update Kubernetes service."); + + + public static final ConfigOption KUBERNETES_LOG_VOLUME_HOSTPATH = + key("kubernetes.log.volume.hostpath") + .stringType() + .defaultValue("/apps/log/flink") + .withDescription("flink host machine log path"); + + + public static final ConfigOption KUBERNETES_LOG_VOLUMES_MOUNT_MOUNTPATH = + key("kubernetes.log.volumemounts.mountpath") + .stringType() + .defaultValue("/opt/flink/log") + .withDescription("flink container log path ."); + + + public static final ConfigOption KUBERNETES_CLIENT_USER_AGENT = key("kubernetes.client.user-agent") .stringType() diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkLogDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkLogDecorator.java new file mode 100644 index 0000000000000..c1bb75afb8905 --- /dev/null +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkLogDecorator.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.kubeclient.decorators; + +import static org.apache.flink.kubernetes.utils.Constants.FLINK_LOG; +import static org.apache.flink.util.Preconditions.checkNotNull; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.HostPathVolumeSourceBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.Volume; +import io.fabric8.kubernetes.api.model.VolumeBuilder; +import io.fabric8.kubernetes.api.model.VolumeMount; +import io.fabric8.kubernetes.api.model.VolumeMountBuilder; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters; + +/** Support mounting logs on the JobManager or TaskManager pod.. */ +public class FlinkLogDecorator extends AbstractKubernetesStepDecorator { + + private final AbstractKubernetesParameters kubernetesComponentConf; + + public FlinkLogDecorator(AbstractKubernetesParameters kubernetesComponentConf) { + this.kubernetesComponentConf = checkNotNull(kubernetesComponentConf); + } + + @Override + public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { + final Pod podWithMount = decoratePod(flinkPod.getPodWithoutMainContainer()); + final Container containerWithMount = decorateMainContainer(flinkPod.getMainContainer()); + + return new FlinkPod.Builder(flinkPod) + .withPod(podWithMount) + .withMainContainer(containerWithMount) + .build(); + + + } + + private Container decorateMainContainer(Container container) { + + VolumeMount volumeMount = new VolumeMountBuilder().withName(FLINK_LOG) + .withMountPath(kubernetesComponentConf.getVolumeMountPath()).build(); + + return new ContainerBuilder(container).addToVolumeMounts(volumeMount).build(); + } + + private Pod decoratePod(Pod pod) { + + Volume volume = new VolumeBuilder().withName(FLINK_LOG).withHostPath( + new HostPathVolumeSourceBuilder().withPath(kubernetesComponentConf.getVolumeLogs()) + .withType("DirectoryOrCreate").build()).build(); + + + + return new PodBuilder(pod).editOrNewSpec().addToVolumes(volume).endSpec().build(); + } + +} diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java index fd8f50b6476ca..9c9366500bc34 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java @@ -25,6 +25,7 @@ import org.apache.flink.kubernetes.kubeclient.decorators.EnvSecretsDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator; +import org.apache.flink.kubernetes.kubeclient.decorators.FlinkLogDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.InitJobManagerDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator; @@ -74,7 +75,8 @@ public static KubernetesJobManagerSpecification buildKubernetesJobManagerSpecifi new MountSecretsDecorator(kubernetesJobManagerParameters), new CmdJobManagerDecorator(kubernetesJobManagerParameters), new InternalServiceDecorator(kubernetesJobManagerParameters), - new ExternalServiceDecorator(kubernetesJobManagerParameters))); + new ExternalServiceDecorator(kubernetesJobManagerParameters), + new FlinkLogDecorator(kubernetesJobManagerParameters))); Configuration configuration = kubernetesJobManagerParameters.getFlinkConfiguration(); if (configuration.get(KUBERNETES_HADOOP_CONF_MOUNT_DECORATOR_ENABLED)) { diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java index 368850d3ba81f..a6b2093184236 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java @@ -23,6 +23,7 @@ import org.apache.flink.kubernetes.kubeclient.decorators.CmdTaskManagerDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.EnvSecretsDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator; +import org.apache.flink.kubernetes.kubeclient.decorators.FlinkLogDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.InitTaskManagerDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator; @@ -66,7 +67,7 @@ public static KubernetesPod buildTaskManagerKubernetesPod( } stepDecorators.add(new FlinkConfMountDecorator(kubernetesTaskManagerParameters)); - + stepDecorators.add(new FlinkLogDecorator(kubernetesTaskManagerParameters)); for (KubernetesStepDecorator stepDecorator : stepDecorators) { flinkPod = stepDecorator.decorateFlinkPod(flinkPod); } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java index da74e9d12585e..5d5bcd8f3364b 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java @@ -205,4 +205,15 @@ public List> getEnvironmentsFromSecrets() { public boolean isHostNetworkEnabled() { return flinkConfig.get(KubernetesConfigOptions.KUBERNETES_HOSTNETWORK_ENABLED); } + + + public String getVolumeLogs() { + return flinkConfig.get(KubernetesConfigOptions.KUBERNETES_LOG_VOLUME_HOSTPATH); + + } + + public String getVolumeMountPath() { + return flinkConfig.get(KubernetesConfigOptions.KUBERNETES_LOG_VOLUMES_MOUNT_MOUNTPATH); + + } } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java index ced7d73d5073f..5782dd4e08fbb 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java @@ -120,4 +120,6 @@ public class Constants { public static final String KUBERNETES_ZERO_RESOURCE_VERSION = "0"; public static final String USER_ARTIFACTS_VOLUME = "user-artifacts-volume"; + + public static final String FLINK_LOG = "flink-log"; } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkLogDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkLogDecoratorTest.java new file mode 100644 index 0000000000000..e3f6e16fb1372 --- /dev/null +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkLogDecoratorTest.java @@ -0,0 +1,63 @@ +package org.apache.flink.kubernetes.kubeclient.decorators; + + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.Volume; +import io.fabric8.kubernetes.api.model.VolumeMount; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class FlinkLogDecoratorTest { + + private static final String VOLUME_MOUNT_PATH = "/apps/log/flink"; + private static final String VOLUME_LOGS = "/opt/flink/log"; + + @Mock + private AbstractKubernetesParameters kubernetesComponentConf; + + @InjectMocks + private FlinkLogDecorator flinkLogDecorator; + + @Before + public void setUp() { + when(kubernetesComponentConf.getVolumeMountPath()).thenReturn(VOLUME_MOUNT_PATH); + when(kubernetesComponentConf.getVolumeLogs()).thenReturn(VOLUME_LOGS); + } + + @Test + public void decorateFlinkPod_ShouldAddVolumeAndVolumeMount() { + FlinkPod originalFlinkPod = new FlinkPod.Builder() + .withPod(new PodBuilder().build()) + .withMainContainer(new ContainerBuilder().build()) + .build(); + + FlinkPod decoratedFlinkPod = flinkLogDecorator.decorateFlinkPod(originalFlinkPod); + + Pod decoratedPod = decoratedFlinkPod.getPodWithoutMainContainer(); + Container decoratedContainer = decoratedFlinkPod.getMainContainer(); + + assertEquals(1, decoratedPod.getSpec().getVolumes().size()); + Volume volume = decoratedPod.getSpec().getVolumes().get(0); + assertEquals("flink-log", volume.getName()); + assertEquals(VOLUME_LOGS, volume.getHostPath().getPath()); + assertEquals("DirectoryOrCreate", volume.getHostPath().getType()); + + assertEquals(1, decoratedContainer.getVolumeMounts().size()); + VolumeMount volumeMount = decoratedContainer.getVolumeMounts().get(0); + assertEquals("flink-log", volumeMount.getName()); + assertEquals(VOLUME_MOUNT_PATH, volumeMount.getMountPath()); + } +} From c1f2d949bdcfdb7eae25539e62dfcbf9d8492644 Mon Sep 17 00:00:00 2001 From: yangfei Date: Fri, 17 Jan 2025 16:03:05 +0800 Subject: [PATCH 2/3] [FLINK-37088][k8s]support flink applicaton taskmanager log mounting --- .../configuration/KubernetesConfigOptions.java | 7 +++++++ .../kubeclient/factory/KubernetesJobManagerFactory.java | 9 +++++++-- .../kubeclient/factory/KubernetesTaskManagerFactory.java | 7 ++++++- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java index b95444cc42147..9a2c87eff596d 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java @@ -554,6 +554,13 @@ public class KubernetesConfigOptions { + "Flink. A typical use-case is when one uses Flink Kubernetes " + "Operator."); + public static final ConfigOption KUBERNETES_LOG_MOUNT_DECORATOR_ENABLED = + key("kubernetes.decorator.log-mount.enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether to enable log mount decorator. "); + public static final ConfigOption LOCAL_UPLOAD_ENABLED = ConfigOptions.key("kubernetes.artifacts.local-upload-enabled") .booleanType() diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java index 9c9366500bc34..f339e1155dc6f 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java @@ -54,6 +54,7 @@ import static org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.KUBERNETES_HADOOP_CONF_MOUNT_DECORATOR_ENABLED; import static org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.KUBERNETES_KERBEROS_MOUNT_DECORATOR_ENABLED; +import static org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.KUBERNETES_LOG_MOUNT_DECORATOR_ENABLED; /** * Utility class for constructing all the Kubernetes components on the client-side. This can include @@ -75,8 +76,7 @@ public static KubernetesJobManagerSpecification buildKubernetesJobManagerSpecifi new MountSecretsDecorator(kubernetesJobManagerParameters), new CmdJobManagerDecorator(kubernetesJobManagerParameters), new InternalServiceDecorator(kubernetesJobManagerParameters), - new ExternalServiceDecorator(kubernetesJobManagerParameters), - new FlinkLogDecorator(kubernetesJobManagerParameters))); + new ExternalServiceDecorator(kubernetesJobManagerParameters))); Configuration configuration = kubernetesJobManagerParameters.getFlinkConfiguration(); if (configuration.get(KUBERNETES_HADOOP_CONF_MOUNT_DECORATOR_ENABLED)) { @@ -86,6 +86,11 @@ public static KubernetesJobManagerSpecification buildKubernetesJobManagerSpecifi stepDecorators.add(new KerberosMountDecorator(kubernetesJobManagerParameters)); } + if (configuration.get(KUBERNETES_LOG_MOUNT_DECORATOR_ENABLED)) { + stepDecorators.add(new FlinkLogDecorator(kubernetesJobManagerParameters)); + } + + stepDecorators.addAll( Arrays.asList( new FlinkConfMountDecorator(kubernetesJobManagerParameters), diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java index a6b2093184236..f211d0c169dca 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java @@ -42,6 +42,7 @@ import static org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.KUBERNETES_HADOOP_CONF_MOUNT_DECORATOR_ENABLED; import static org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.KUBERNETES_KERBEROS_MOUNT_DECORATOR_ENABLED; +import static org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.KUBERNETES_LOG_MOUNT_DECORATOR_ENABLED; /** Utility class for constructing the TaskManager Pod on the JobManager. */ public class KubernetesTaskManagerFactory { @@ -66,8 +67,12 @@ public static KubernetesPod buildTaskManagerKubernetesPod( stepDecorators.add(new KerberosMountDecorator(kubernetesTaskManagerParameters)); } + if (configuration.get(KUBERNETES_LOG_MOUNT_DECORATOR_ENABLED)) { + stepDecorators.add(new FlinkLogDecorator(kubernetesTaskManagerParameters)); + } + stepDecorators.add(new FlinkConfMountDecorator(kubernetesTaskManagerParameters)); - stepDecorators.add(new FlinkLogDecorator(kubernetesTaskManagerParameters)); + for (KubernetesStepDecorator stepDecorator : stepDecorators) { flinkPod = stepDecorator.decorateFlinkPod(flinkPod); } From 1d09ce3ee1686da01b7678cc44a2ec8d4911ec62 Mon Sep 17 00:00:00 2001 From: yangfei Date: Fri, 17 Jan 2025 16:28:16 +0800 Subject: [PATCH 3/3] [FLINK-37088][k8s]support flink applicaton taskmanager log mounting --- .../flink/kubernetes/configuration/KubernetesConfigOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java index 9a2c87eff596d..7008cbd0b6dea 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java @@ -557,7 +557,7 @@ public class KubernetesConfigOptions { public static final ConfigOption KUBERNETES_LOG_MOUNT_DECORATOR_ENABLED = key("kubernetes.decorator.log-mount.enabled") .booleanType() - .defaultValue(true) + .defaultValue(false) .withDescription( "Whether to enable log mount decorator. ");