Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature-16396][storage-plugin] Tencent Cloud COS Storage Plugin #16565

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion docs/docs/en/guide/resource/configuration.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Resource Center Configuration

- You could use `Resource Center` to upload text files and other task-related files.
- You could configure `Resource Center` to use distributed file system like [Hadoop](https://hadoop.apache.org/docs/r2.7.0/) (2.6+), [MinIO](https://github.com/minio/minio) cluster or remote storage products like [AWS S3](https://aws.amazon.com/s3/), [Alibaba Cloud OSS](https://www.aliyun.com/product/oss), [Huawei Cloud OBS](https://support.huaweicloud.com/obs/index.html) etc.
- You could configure `Resource Center` to use distributed file system like [Hadoop](https://hadoop.apache.org/docs/r2.7.0/) (2.6+), [MinIO](https://github.com/minio/minio) cluster or remote storage products like [AWS S3](https://aws.amazon.com/s3/), [Alibaba Cloud OSS](https://www.aliyun.com/product/oss), [Huawei Cloud OBS](https://support.huaweicloud.com/obs/index.html), [Tencent Cloud COS](https://cloud.tencent.com/product/cos), etc.
- You could configure `Resource Center` to use local file system. If you deploy `DolphinScheduler` in `Standalone` mode, you could configure it to use local file system for `Resource Center` without the need of an external `HDFS` system or `S3`.
- Furthermore, if you deploy `DolphinScheduler` in `Cluster` mode, you could use [S3FS-FUSE](https://github.com/s3fs-fuse/s3fs-fuse) to mount `S3` or [JINDO-FUSE](https://help.aliyun.com/document_detail/187410.html) to mount `OSS` to your machines and use the local file system for `Resource Center`. In this way, you could operate remote files as if on your local machines.

Expand Down Expand Up @@ -96,3 +96,32 @@ resource.huawei.cloud.obs.endpoint=obs.cn-southwest-2.huaweicloud.com
> * If you want to use the resource upload function, the deployment user in [installation and deployment](../installation/standalone.md) must have relevant operation authority.
> * If you using a Hadoop cluster with HA, you need to enable HDFS resource upload, and you need to copy the `core-site.xml` and `hdfs-site.xml` under the Hadoop cluster to `worker-server/conf` and `api-server/conf`, otherwise skip this copy step.

## connect COS

if you want to upload resources to `Resource Center` connected to `COS`, you need to configure `api-server/conf/resource-center.yaml` and `worker-server/conf/resource-center.yaml`. You can refer to the following:

config the following fields

```yaml
resource:
# Tencent Cloud Storage (COS) setup, required if you set resource.storage.type=COS
tencent:
cloud:
access:
key:
id: <your-access-key-id>
secret: <your-access-key-secret>
cos:
# predefined region code: https://cloud.tencent.com/document/product/436/6224
region: ap-nanjing
bucket:
name: dolphinscheduler

```

To enable COS, you also need to configure `api-server/conf/common.properties` and `worker-server/conf/common.properties`. You can refer to the following:

```properties
resource.storage.type=COS
```

29 changes: 28 additions & 1 deletion docs/docs/zh/guide/resource/configuration.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# 资源中心配置详情

- 资源中心通常用于上传文件以及任务组管理等操作。
- 资源中心可以对接分布式的文件存储系统,如[Hadoop](https://hadoop.apache.org/docs/r2.7.0/)(2.6+)或者[MinIO](https://github.com/minio/minio)集群,也可以对接远端的对象存储,如[AWS S3](https://aws.amazon.com/s3/)或者[阿里云 OSS](https://www.aliyun.com/product/oss),[华为云 OBS](https://support.huaweicloud.com/obs/index.html) 等。
- 资源中心可以对接分布式的文件存储系统,如[Hadoop](https://hadoop.apache.org/docs/r2.7.0/)(2.6+)或者[MinIO](https://github.com/minio/minio)集群,也可以对接远端的对象存储,如[AWS S3](https://aws.amazon.com/s3/)或者[阿里云 OSS](https://www.aliyun.com/product/oss),[华为云 OBS](https://support.huaweicloud.com/obs/index.html),[腾讯云 COS](https://cloud.tencent.com/product/cos) 等。
- 资源中心也可以直接对接本地文件系统。在单机模式下,您无需依赖`Hadoop`或`S3`一类的外部存储系统,可以方便地对接本地文件系统进行体验。
- 除此之外,对于集群模式下的部署,您可以通过使用[S3FS-FUSE](https://github.com/s3fs-fuse/s3fs-fuse)将`S3`挂载到本地,或者使用[JINDO-FUSE](https://help.aliyun.com/document_detail/187410.html)将`OSS`挂载到本地等,再用资源中心对接本地文件系统方式来操作远端对象存储中的文件。

Expand Down Expand Up @@ -90,3 +90,30 @@ resource.huawei.cloud.obs.endpoint=obs.cn-southwest-2.huaweicloud.com
> * 如果用到资源上传的功能,那么[安装部署](../installation/standalone.md)中,部署用户需要有这部分的操作权限。
> * 如果 Hadoop 集群的 NameNode 配置了 HA 的话,需要开启 HDFS 类型的资源上传,同时需要将 Hadoop 集群下的 `core-site.xml` 和 `hdfs-site.xml` 复制到 `worker-server/conf` 以及 `api-server/conf`,非 NameNode HA 跳过此步骤。

## 对接腾讯云 COS

如果需要使用到资源中心的 COS 上传资源,我们需要对以下路径的进行配置:`api-server/conf/resource-center.yaml` 和 `worker-server/conf/resource-center.yaml`。可参考如下:

```yaml
resource:
# 腾讯云 COS 配置
tencent:
cloud:
access:
key:
id: <your-access-key-id>
secret: <your-access-key-secret>
cos:
# COS 区域代码可参考: https://cloud.tencent.com/document/product/436/6224
region: ap-nanjing
bucket:
name: dolphinscheduler

```

为了激活腾讯云存储 COS,还需要对以下路径的进行配置:`api-server/conf/common.properties` 和 `worker-server/conf/common.properties`。可参考如下:

```properties
resource.storage.type=COS
```

17 changes: 17 additions & 0 deletions dolphinscheduler-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
<azure-sdk-bom.version>1.2.10</azure-sdk-bom.version>
<protobuf.version>3.17.2</protobuf.version>
<esdk-obs.version>3.23.3</esdk-obs.version>
<qcloud-cos.version>5.6.231</qcloud-cos.version>
<system-lambda.version>1.2.1</system-lambda.version>
<zeppelin-client.version>0.10.1</zeppelin-client.version>
<testcontainer.version>1.19.3</testcontainer.version>
Expand Down Expand Up @@ -934,6 +935,22 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.qcloud</groupId>
<artifactId>cos_api</artifactId>
<version>${qcloud-cos.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.github.stefanbirkner</groupId>
<artifactId>system-lambda</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions dolphinscheduler-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@
<artifactId>esdk-obs-java-bundle</artifactId>
</dependency>

<dependency>
<groupId>com.qcloud</groupId>
<artifactId>cos_api</artifactId>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public final class Constants {

public static final String REMOTE_LOGGING_YAML_PATH = "/remote-logging.yaml";
public static final String AWS_YAML_PATH = "/aws.yaml";
public static final String RESOURCE_CENTER_YAML_PATH = "/resource-center.yaml";

public static final String FORMAT_SS = "%s%s";
public static final String FORMAT_S_S = "%s/%s";
Expand Down Expand Up @@ -681,6 +682,17 @@ public final class Constants {
public static final String REMOTE_LOGGING_ABS_ACCOUNT_KEY = "remote.logging.abs.account.key";
public static final String REMOTE_LOGGING_ABS_CONTAINER_NAME = "remote.logging.abs.container.name";

/**
* remote logging for COS
*/
public static final String REMOTE_LOGGING_COS_ACCESS_KEY_ID = "remote.logging.cos.access.key.id";

public static final String REMOTE_LOGGING_COS_ACCESS_KEY_SECRET = "remote.logging.cos.access.key.secret";

public static final String REMOTE_LOGGING_COS_BUCKET_NAME = "remote.logging.cos.bucket.name";

public static final String REMOTE_LOGGING_COS_REGION = "remote.logging.cos.region";

/**
* data quality
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.common.log.remote;

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;

import org.apache.commons.lang3.StringUtils;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;

import lombok.extern.slf4j.Slf4j;

import com.qcloud.cos.COSClient;
import com.qcloud.cos.ClientConfig;
import com.qcloud.cos.auth.BasicCOSCredentials;
import com.qcloud.cos.auth.COSCredentials;
import com.qcloud.cos.http.HttpProtocol;
import com.qcloud.cos.model.GetObjectRequest;
import com.qcloud.cos.model.PutObjectRequest;
import com.qcloud.cos.region.Region;

@Slf4j
public class CosRemoteLogHandler implements RemoteLogHandler, Closeable {

private final COSClient cosClient;

private final String bucketName;

private static CosRemoteLogHandler instance;

private CosRemoteLogHandler() {
String secretId = PropertyUtils.getString(Constants.REMOTE_LOGGING_COS_ACCESS_KEY_ID);
String secretKey = PropertyUtils.getString(Constants.REMOTE_LOGGING_COS_ACCESS_KEY_SECRET);
COSCredentials cosCredentials = new BasicCOSCredentials(secretId, secretKey);
String regionName = PropertyUtils.getString(Constants.REMOTE_LOGGING_COS_REGION);
ClientConfig clientConfig = new ClientConfig(new Region(regionName));
clientConfig.setHttpProtocol(HttpProtocol.https);
this.cosClient = new COSClient(cosCredentials, clientConfig);
this.bucketName = PropertyUtils.getString(Constants.REMOTE_LOGGING_COS_BUCKET_NAME);
checkBucketNameExists(this.bucketName);
}

public static synchronized CosRemoteLogHandler getInstance() {
if (instance == null) {
instance = new CosRemoteLogHandler();
}
return instance;
}

@Override
public void sendRemoteLog(String logPath) {
String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath);

try {
log.info("send remote log {} to COS {}", logPath, objectName);
PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, objectName, new File(logPath));
cosClient.putObject(putObjectRequest);
} catch (Exception e) {
log.error("error while sending remote log {} to COS {}", logPath, objectName, e);
}
}

@Override
public void getRemoteLog(String logPath) {
String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath);

try {
log.info("get remote log on COS {} to {}", objectName, logPath);
cosClient.getObject(new GetObjectRequest(bucketName, objectName), new File(logPath));
} catch (Exception e) {
log.error("error while getting remote log on COS {} to {}", objectName, logPath, e);
}
}

@Override
public void close() throws IOException {
if (cosClient != null) {
cosClient.shutdown();
}
}

private void checkBucketNameExists(String bucketName) {
if (StringUtils.isBlank(bucketName)) {
throw new IllegalArgumentException(Constants.REMOTE_LOGGING_COS_BUCKET_NAME + " is empty");
}
boolean existsBucket = cosClient.doesBucketExist(bucketName);
if (!existsBucket) {
throw new IllegalArgumentException(
"bucketName: " + bucketName + " is not exists, you need to create them by yourself");
}

log.info("Tencent COS bucket '{}' has been found for remote logging", bucketName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public RemoteLogHandler getRemoteLogHandler() {
return GcsRemoteLogHandler.getInstance();
} else if ("ABS".equals(target)) {
return AbsRemoteLogHandler.getInstance();
} else if ("COS".equals(target)) {
return CosRemoteLogHandler.getInstance();
}

log.error("No suitable remote logging target for {}", target);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.dolphinscheduler.common.constants.Constants.AWS_YAML_PATH;
import static org.apache.dolphinscheduler.common.constants.Constants.COMMON_PROPERTIES_PATH;
import static org.apache.dolphinscheduler.common.constants.Constants.REMOTE_LOGGING_YAML_PATH;
import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_CENTER_YAML_PATH;

import org.apache.dolphinscheduler.common.config.ImmutablePriorityPropertyDelegate;
import org.apache.dolphinscheduler.common.config.ImmutablePropertyDelegate;
Expand All @@ -43,7 +44,7 @@ public class PropertyUtils {
private final ImmutablePriorityPropertyDelegate propertyDelegate =
new ImmutablePriorityPropertyDelegate(
new ImmutablePropertyDelegate(COMMON_PROPERTIES_PATH),
new ImmutableYamlDelegate(REMOTE_LOGGING_YAML_PATH, AWS_YAML_PATH));
new ImmutableYamlDelegate(REMOTE_LOGGING_YAML_PATH, AWS_YAML_PATH, RESOURCE_CENTER_YAML_PATH));

public static String getString(String key) {
return propertyDelegate.get(key.trim());
Expand Down
70 changes: 1 addition & 69 deletions dolphinscheduler-common/src/main/resources/common.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,58 +21,15 @@ data.basedir.path=/tmp/dolphinscheduler
# resource view suffixs
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js

# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS. LOCAL type is default type, and it's a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration
# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, COS. LOCAL type is default type, and it's a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration
# please notice that LOCAL mode does not support reading and writing in distributed mode, which mean you can only use your resource in one machine, unless
# use shared file mount point
resource.storage.type=LOCAL
# resource store on HDFS/S3 path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
resource.storage.upload.base.path=/tmp/dolphinscheduler

# The Azure client ID (Azure Application (client) ID)
resource.azure.client.id=minioadmin
# The Azure client secret in the Azure application
resource.azure.client.secret=minioadmin
# The Azure data factory subscription ID
resource.azure.subId=minioadmin
# The Azure tenant id in the Azure Active Directory
resource.azure.tenant.id=minioadmin
# The query interval
resource.query.interval=10000

# alibaba cloud access key id, required if you set resource.storage.type=OSS
resource.alibaba.cloud.access.key.id=<your-access-key-id>
# alibaba cloud access key secret, required if you set resource.storage.type=OSS
resource.alibaba.cloud.access.key.secret=<your-access-key-secret>
# alibaba cloud region, required if you set resource.storage.type=OSS
resource.alibaba.cloud.region=cn-hangzhou
# oss bucket name, required if you set resource.storage.type=OSS
resource.alibaba.cloud.oss.bucket.name=dolphinscheduler
# oss bucket endpoint, required if you set resource.storage.type=OSS
resource.alibaba.cloud.oss.endpoint=https://oss-cn-hangzhou.aliyuncs.com

# the location of the google cloud credential, required if you set resource.storage.type=GCS
resource.google.cloud.storage.credential=/path/to/credential
# gcs bucket name, required if you set resource.storage.type=GCS
resource.google.cloud.storage.bucket.name=<your-bucket>

# abs container name, required if you set resource.storage.type=ABS
resource.azure.blob.storage.container.name=<your-container>
# abs account name, required if you set resource.storage.type=ABS
resource.azure.blob.storage.account.name=<your-account-name>
# abs connection string, required if you set resource.storage.type=ABS
resource.azure.blob.storage.connection.string=<your-connection-string>


# huawei cloud access key id, required if you set resource.storage.type=OBS
resource.huawei.cloud.access.key.id=<your-access-key-id>
# huawei cloud access key secret, required if you set resource.storage.type=OBS
resource.huawei.cloud.access.key.secret=<your-access-key-secret>
# oss bucket name, required if you set resource.storage.type=OBS
resource.huawei.cloud.obs.bucket.name=dolphinscheduler
# oss bucket endpoint, required if you set resource.storage.type=OBS
resource.huawei.cloud.obs.endpoint=obs.cn-southwest-2.huaweicloud.com


# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
resource.hdfs.root.user=hdfs
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
Expand Down Expand Up @@ -163,28 +120,3 @@ shell.interceptor.type=bash

# Whether to enable remote logging
remote.logging.enable=false
# if remote.logging.enable = true, set the target of remote logging
remote.logging.target=OSS
# if remote.logging.enable = true, set the log base directory
remote.logging.base.dir=logs
# if remote.logging.enable = true, set the number of threads to send logs to remote storage
remote.logging.thread.pool.size=10
# oss access key id, required if you set remote.logging.target=OSS
remote.logging.oss.access.key.id=<access.key.id>
# oss access key secret, required if you set remote.logging.target=OSS
remote.logging.oss.access.key.secret=<access.key.secret>
# oss bucket name, required if you set remote.logging.target=OSS
remote.logging.oss.bucket.name=<bucket.name>
# oss endpoint, required if you set remote.logging.target=OSS
remote.logging.oss.endpoint=<endpoint>

# the location of the google cloud credential, required if you set remote.logging.target=GCS
remote.logging.google.cloud.storage.credential=/path/to/credential
# gcs bucket name, required if you set remote.logging.target=GCS
remote.logging.google.cloud.storage.bucket.name=<your-bucket>
# abs account name, required if you set resource.storage.type=ABS
remote.logging.abs.account.name=<your-account-name>
# abs account key, required if you set resource.storage.type=ABS
remote.logging.abs.account.key=<your-account-key>
# abs container name, required if you set resource.storage.type=ABS
remote.logging.abs.container.name=<your-container-name>
Loading
Loading