Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/runtime/snapshot_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ BaseSnapshotLoader::BaseSnapshotLoader(ExecEnv* env, int64_t job_id, int64_t tas
}

Status BaseSnapshotLoader::init(TStorageBackendType::type type, const std::string& location) {
if (TStorageBackendType::type::S3 == type) {
if (TStorageBackendType::type::S3 == type || TStorageBackendType::type::AZURE == type) {
S3Conf s3_conf;
S3URI s3_uri(location);
RETURN_IF_ERROR(s3_uri.parse());
Expand Down
4 changes: 4 additions & 0 deletions fe/be-java-extensions/paimon-scanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ under the License.
<artifactId>java-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-azure</artifactId>
</dependency>

<dependency>
<groupId>org.apache.paimon</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ public void createRepository(CreateRepositoryCommand command) throws DdlExceptio
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Failed to create repository: " + st.getErrMsg());
}
//fixme why ping again? it has pinged in addAndInitRepoIfNotExist
if (!repo.ping()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Failed to create repository: failed to connect to the repo");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ protected static void pingAzure(String bucketName, String rootPath,
Map<String, String> newProperties) throws DdlException {

Long timestamp = System.currentTimeMillis();
String testObj = "azure://" + bucketName + "/" + rootPath
+ "/doris-test-object-valid-" + timestamp.toString() + ".txt";
//todo @zyk Azure connection test
String testObj = "s3://" + bucketName + "/" + rootPath
+ "/doris-test-object-valid-" + timestamp + ".txt";

byte[] contentData = new byte[2 * ObjStorage.CHUNK_SIZE];
Arrays.fill(contentData, (byte) 'A');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,11 @@ public static String validateAndNormalizeUri(String path) throws UserException {
if (StringUtils.isBlank(path)) {
throw new StoragePropertiesException("Path cannot be null or empty");
}

String lower = path.toLowerCase();

// Only accept Azure Blob Storage-related URI schemes
if (!(lower.startsWith("wasb://") || lower.startsWith("wasbs://")
|| lower.startsWith("abfs://") || lower.startsWith("abfss://")
|| lower.startsWith("https://") || lower.startsWith("http://")
|| lower.startsWith("s3://"))) {
if (!(path.startsWith("wasb://") || path.startsWith("wasbs://")
|| path.startsWith("abfs://") || path.startsWith("abfss://")
|| path.startsWith("https://") || path.startsWith("http://")
|| path.startsWith("s3://"))) {
throw new StoragePropertiesException("Unsupported Azure URI scheme: " + path);
}

Expand All @@ -92,14 +89,12 @@ private static String convertToS3Style(String uri) {
if (StringUtils.isBlank(uri)) {
throw new StoragePropertiesException("URI is blank");
}

String lowerUri = uri.toLowerCase();
if (lowerUri.startsWith("s3://")) {
return lowerUri;
if (uri.startsWith("s3://")) {
return uri;
}
// Handle Azure HDFS-style URIs (wasb://, wasbs://, abfs://, abfss://)
if (lowerUri.startsWith("wasb://") || lowerUri.startsWith("wasbs://")
|| lowerUri.startsWith("abfs://") || lowerUri.startsWith("abfss://")) {
if (uri.startsWith("wasb://") || uri.startsWith("wasbs://")
|| uri.startsWith("abfs://") || uri.startsWith("abfss://")) {

// Example: wasbs://[email protected]/path/file.txt
String schemeRemoved = uri.replaceFirst("^[a-z]+s?://", "");
Expand All @@ -125,7 +120,7 @@ private static String convertToS3Style(String uri) {
}

// ② Handle HTTPS/HTTP Azure Blob Storage URLs
if (lowerUri.startsWith("https://") || lowerUri.startsWith("http://")) {
if (uri.startsWith("https://") || uri.startsWith("http://")) {
try {
URI parsed = new URI(uri);
String host = parsed.getHost();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.common.util.S3URI;
import org.apache.doris.common.util.S3Util;
import org.apache.doris.datasource.property.storage.AzureProperties;
import org.apache.doris.datasource.property.storage.AzurePropertyUtils;
import org.apache.doris.fs.remote.RemoteFile;

import com.azure.core.http.rest.PagedIterable;
Expand Down Expand Up @@ -120,6 +121,7 @@ public Triple<String, String, String> getStsToken() throws DdlException {
@Override
public Status headObject(String remotePath) {
try {
remotePath = AzurePropertyUtils.validateAndNormalizeUri(remotePath);
S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri);
BlobClient blobClient = getClient().getBlobContainerClient(uri.getBucket()).getBlobClient(uri.getKey());
if (LOG.isDebugEnabled()) {
Expand All @@ -144,6 +146,7 @@ public Status headObject(String remotePath) {
@Override
public Status getObject(String remoteFilePath, File localFile) {
try {
remoteFilePath = AzurePropertyUtils.validateAndNormalizeUri(remoteFilePath);
S3URI uri = S3URI.create(remoteFilePath, isUsePathStyle, forceParsingByStandardUri);
BlobClient blobClient = getClient().getBlobContainerClient(uri.getBucket()).getBlobClient(uri.getKey());
BlobProperties properties = blobClient.downloadToFile(localFile.getAbsolutePath());
Expand All @@ -164,6 +167,7 @@ public Status getObject(String remoteFilePath, File localFile) {
@Override
public Status putObject(String remotePath, @Nullable InputStream content, long contentLength) {
try {
remotePath = AzurePropertyUtils.validateAndNormalizeUri(remotePath);
S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri);
BlobClient blobClient = getClient().getBlobContainerClient(uri.getBucket()).getBlobClient(uri.getKey());
blobClient.upload(content, contentLength);
Expand All @@ -181,6 +185,7 @@ public Status putObject(String remotePath, @Nullable InputStream content, long c
@Override
public Status deleteObject(String remotePath) {
try {
remotePath = AzurePropertyUtils.validateAndNormalizeUri(remotePath);
S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri);
BlobClient blobClient = getClient().getBlobContainerClient(uri.getBucket()).getBlobClient(uri.getKey());
blobClient.delete();
Expand All @@ -204,6 +209,7 @@ public Status deleteObject(String remotePath) {
@Override
public Status deleteObjects(String remotePath) {
try {
remotePath = AzurePropertyUtils.validateAndNormalizeUri(remotePath);
S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri);
BlobContainerClient blobClient = getClient().getBlobContainerClient(uri.getBucket());
String containerUrl = blobClient.getBlobContainerUrl();
Expand Down Expand Up @@ -285,6 +291,7 @@ public void completeMultipartUpload(String bucket, String key, Map<Integer, Stri
@Override
public RemoteObjects listObjects(String remotePath, String continuationToken) throws DdlException {
try {
remotePath = AzurePropertyUtils.validateAndNormalizeUri(remotePath);
S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri);
ListBlobsOptions options = new ListBlobsOptions().setPrefix(uri.getKey());
PagedIterable<BlobItem> pagedBlobs = getClient().getBlobContainerClient(uri.getBucket())
Expand Down Expand Up @@ -320,6 +327,7 @@ private String constructS3Path(String fileName, String bucket) {

public Status listDirectories(String remotePath, Set<String> result) {
try {
remotePath = AzurePropertyUtils.validateAndNormalizeUri(remotePath);
S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri);
String bucket = uri.getBucket();
String key = uri.getKey();
Expand Down Expand Up @@ -347,6 +355,7 @@ public Status globList(String remotePath, List<RemoteFile> result, boolean fileN
long startTime = System.nanoTime();
Status st = Status.OK;
try {
remotePath = AzurePropertyUtils.validateAndNormalizeUri(remotePath);
S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri);
String globPath = S3Util.extendGlobs(uri.getKey());
String bucket = uri.getBucket();
Expand Down Expand Up @@ -429,6 +438,7 @@ public Status globList(String remotePath, List<RemoteFile> result, boolean fileN

public Status listFiles(String remotePath, boolean recursive, List<RemoteFile> result) {
try {
remotePath = AzurePropertyUtils.validateAndNormalizeUri(remotePath);
S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri);
String bucket = uri.getBucket();
String key = uri.getKey();
Expand Down Expand Up @@ -482,6 +492,7 @@ public Status multipartUpload(String remotePath, @Nullable InputStream inputStre


try {
remotePath = AzurePropertyUtils.validateAndNormalizeUri(remotePath);
S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri);
blockBlobClient = getClient().getBlobContainerClient(uri.getBucket())
.getBlobClient(uri.getKey()).getBlockBlobClient();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import static groovy.test.GroovyAssert.shouldFail;
import java.util.concurrent.ThreadLocalRandom

suite("azure_blob_all_test", "p2,external,new_catalog_property") {


String abfsAzureAccountName = context.config.otherConfigs.get("abfsAccountName")
String abfsAzureAccountKey = context.config.otherConfigs.get("abfsAccountKey")
String abfsContainer = context.config.otherConfigs.get("abfsContainer")
String abfsEndpoint = context.config.otherConfigs.get("abfsEndpoint")
def abfs_azure_config_props = """
"provider" = "azure",
"azure.endpoint"="${abfsEndpoint}",
"azure.account_name" = "${abfsAzureAccountName}",
"azure.account_key" = "${abfsAzureAccountKey}"
"""

// Iceberg FS

def testIcebergTest = { String storage_props,String iceberg_fs_catalog_name, String protocol,String hdfsLocationType ->

sql """
drop catalog if exists ${iceberg_fs_catalog_name};
"""
sql"""
create catalog ${iceberg_fs_catalog_name} properties(
'type'='iceberg',
'iceberg.catalog.type'='hadoop',
'warehouse'='${protocol}://${abfsContainer}@${abfsAzureAccountName}.${hdfsLocationType}.core.windows.net/regression/external/azure/${protocol}/iceberg_fs_warehouse/',
${storage_props}
);
"""

sql """
switch ${iceberg_fs_catalog_name}
"""

sql """
drop database if exists ${iceberg_fs_catalog_name}_db_test;
"""
sql """
create database ${iceberg_fs_catalog_name}_db_test;
"""
sql """
use ${iceberg_fs_catalog_name}_db_test;
"""
sql """
create table ${iceberg_fs_catalog_name}_table_test (id int, name string)
"""
sql """
insert into ${iceberg_fs_catalog_name}_table_test values(1, 'iceberg_fs_abfs_test');
"""
def query_result = sql """
select count(1) from ${iceberg_fs_catalog_name}_table_test;
"""

assert query_result[0][0] == 1

sql """
drop table if exists ${iceberg_fs_catalog_name}_table_test;
"""
sql """
drop database if exists ${iceberg_fs_catalog_name}_db_test;
"""
sql """
drop catalog if exists ${iceberg_fs_catalog_name};
"""
}


//abfs
testIcebergTest(abfs_azure_config_props, "iceberg_fs_abfs_catalog", "abfs","dfs")
testIcebergTest(abfs_azure_config_props, "iceberg_fs_abfss_catalog", "abfss","dfs")



//abfss
def testPaimonTest = { String storage_props,String paimon_catalog_name, String protocol,String hdfsLocationType,String queryTbl ->
sql """
drop catalog if exists ${paimon_catalog_name};
"""
sql"""
create catalog ${paimon_catalog_name} properties(
'type'='paimon',
'paimon.catalog.type'='filesystem',
'warehouse'='${protocol}://${abfsContainer}@${abfsAzureAccountName}.${hdfsLocationType}.core.windows.net/regression/azure/${protocol}/paimon_fs_warehouse/',
${abfs_azure_config_props}
);
"""

sql """
switch ${paimon_catalog_name}
"""

def query_result =sql """
select * from ${paimon_catalog_name}.${queryTbl}
"""
println query_result

sql """
drop catalog if exists ${paimon_catalog_name};
"""
}

// Paimon FS
sql """
set force_jni_scanner=false;
"""

def paimon_fs_abfss_db_tbl = "paimon_fs_abfss_test_db.external_test_table"
def paimon_fs_abfs_db_tbl = "paimon_fs_abfs_test_db.external_test_table"
testPaimonTest(abfs_azure_config_props, "paimon_fs_abfs_catalog", "abfs","dfs",paimon_fs_abfs_db_tbl)
testPaimonTest(abfs_azure_config_props, "paimon_fs_abfss_catalog", "abfss","dfs",paimon_fs_abfss_db_tbl)

// TODO: Enable this once BE's HDFS dependency management is fully ready.
// This module requires higher-version JARs to support JDK 17 access.
/* sql """
set force_jni_scanner=true;
"""
testPaimonTest(abfs_azure_config_props, "paimon_fs_abfs_catalog", "abfs","dfs",paimon_fs_abfs_db_tbl)
testPaimonTest(abfs_azure_config_props, "paimon_fs_abfss_catalog", "abfss","dfs",paimon_fs_abfss_db_tbl)*/


}
Loading
Loading