From 7b95b19ed5b44d97bd2262184f60b7a1e16cece5 Mon Sep 17 00:00:00 2001 From: JiamingMai Date: Fri, 12 Apr 2024 01:01:46 +0800 Subject: [PATCH 1/3] Add Alluxio file system support --- lib/trino-filesystem-alluxio/pom.xml | 210 ++++++++++ .../alluxio/AlluxioFileIterator.java | 72 ++++ .../filesystem/alluxio/AlluxioFileSystem.java | 380 ++++++++++++++++++ .../alluxio/AlluxioFileSystemFactory.java | 52 +++ .../alluxio/AlluxioFileSystemInput.java | 87 ++++ .../alluxio/AlluxioFileSystemInputFile.java | 170 ++++++++ .../alluxio/AlluxioFileSystemModule.java | 29 ++ .../alluxio/AlluxioFileSystemOutputFile.java | 132 ++++++ .../alluxio/AlluxioTrinoInputStream.java | 136 +++++++ .../alluxio/AlluxioTrinoOutputStream.java | 67 +++ .../filesystem/alluxio/AlluxioUtils.java | 134 ++++++ .../AbstractTestAlluxioFileSystem.java | 126 ++++++ .../alluxio/TestAlluxioFileSystem.java | 95 +++++ .../filesystem/alluxio/TestAlluxioUtils.java | 30 ++ lib/trino-filesystem-cache-alluxio/pom.xml | 12 + lib/trino-filesystem-manager/pom.xml | 5 + .../filesystem/manager/FileSystemConfig.java | 13 + .../filesystem/manager/FileSystemModule.java | 7 + .../manager/TestFileSystemConfig.java | 3 + pom.xml | 92 +++-- 20 files changed, 1820 insertions(+), 32 deletions(-) create mode 100644 lib/trino-filesystem-alluxio/pom.xml create mode 100644 lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileIterator.java create mode 100644 lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java create mode 100644 lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemFactory.java create mode 100644 lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInput.java create mode 100644 lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInputFile.java create mode 100644 lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemModule.java create mode 100644 lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemOutputFile.java create mode 100644 lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java create mode 100644 lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java create mode 100644 lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioUtils.java create mode 100644 lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/AbstractTestAlluxioFileSystem.java create mode 100644 lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioFileSystem.java create mode 100644 lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioUtils.java diff --git a/lib/trino-filesystem-alluxio/pom.xml b/lib/trino-filesystem-alluxio/pom.xml new file mode 100644 index 0000000000000..428560c7525f2 --- /dev/null +++ b/lib/trino-filesystem-alluxio/pom.xml @@ -0,0 +1,210 @@ + + + 4.0.0 + + + io.trino + trino-root + 459-SNAPSHOT + ../../pom.xml + + + trino-filesystem-alluxio + Trino Filesystem - Alluxio + + + ${project.parent.basedir} + true + + + + + com.google.inject + guice + + + + io.grpc + grpc-netty + + + + io.trino + trino-filesystem + + + + io.trino + trino-memory-context + + + + io.trino + trino-spi + + + + org.alluxio + alluxio-core-client-fs + + + org.apache.logging.log4j + log4j-api + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.rocksdb + rocksdbjni + + + org.slf4j + log4j-over-slf4j + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-reload4j + + + + + + org.alluxio + alluxio-core-common + + + commons-cli + commons-cli + + + io.swagger + swagger-annotations + + + io.swagger + swagger-annotations + + + jakarta.servlet + jakarta.servlet-api + + + jakarta.servlet + jakarta.servlet-api + + + org.reflections + reflections + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-reload4j + + + + + + org.alluxio + alluxio-core-transport + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.slf4j + log4j-over-slf4j + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-reload4j + + + + + + com.github.docker-java + docker-java-api + test + + + + io.airlift + junit-extensions + test + + + + io.trino + trino-filesystem + ${project.version} + tests + test + + + + io.trino + trino-spi + test-jar + test + + + + io.trino + trino-testing + test + + + + io.trino + trino-testing-containers + test + + + + org.assertj + assertj-core + test + + + + org.junit.jupiter + junit-jupiter-api + test + + + + org.junit.jupiter + junit-jupiter-engine + test + + + + org.testcontainers + junit-jupiter + test + + + + org.testcontainers + testcontainers + test + + + + + diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileIterator.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileIterator.java new file mode 100644 index 0000000000000..49139c0a2cf71 --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileIterator.java @@ -0,0 +1,72 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import alluxio.client.file.URIStatus; +import io.trino.filesystem.FileEntry; +import io.trino.filesystem.FileIterator; +import io.trino.filesystem.Location; + +import java.io.IOException; +import java.time.Instant; +import java.util.List; +import java.util.Optional; + +import static io.trino.filesystem.alluxio.AlluxioUtils.convertToLocation; +import static java.util.Objects.requireNonNull; + +public class AlluxioFileIterator + implements FileIterator +{ + private final List files; + private final String mountRoot; + private int index; + + public AlluxioFileIterator(List files, String mountRoot) + { + this.files = requireNonNull(files, "files is null"); + this.mountRoot = requireNonNull(mountRoot, "mountRoot is null"); + } + + @Override + public boolean hasNext() + throws IOException + { + if (files.isEmpty() || index >= files.size()) { + return false; + } + if (files.get(index) != null) { + return true; + } + else { + return false; + } + } + + @Override + public FileEntry next() + throws IOException + { + if (!hasNext()) { + return null; + } + URIStatus fileStatus = files.get(index++); + Location location = convertToLocation(fileStatus, mountRoot); + return new FileEntry( + location, + fileStatus.getLength(), + Instant.ofEpochMilli(fileStatus.getLastModificationTimeMs()), + Optional.empty()); + } +} diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java new file mode 100644 index 0000000000000..e1b5d8af47bee --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java @@ -0,0 +1,380 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import alluxio.AlluxioURI; +import alluxio.client.file.FileSystem; +import alluxio.client.file.URIStatus; +import alluxio.exception.AlluxioException; +import alluxio.exception.FileDoesNotExistException; +import alluxio.exception.runtime.NotFoundRuntimeException; +import alluxio.grpc.CreateDirectoryPOptions; +import alluxio.grpc.DeletePOptions; +import alluxio.grpc.ListStatusPOptions; +import io.trino.filesystem.FileIterator; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.TrinoOutputFile; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.time.Instant; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static io.trino.filesystem.alluxio.AlluxioUtils.convertToAlluxioURI; +import static java.util.UUID.randomUUID; + +public class AlluxioFileSystem + implements TrinoFileSystem +{ + private final FileSystem fileSystem; + + private Location rootLocation; + + private final String mountRoot; + + public AlluxioFileSystem(FileSystem fileSystem) + { + this.fileSystem = fileSystem; + mountRoot = "/"; // default alluxio mount root + } + + public String getMountRoot() + { + return mountRoot; + } + + public void setRootLocation(Location rootLocation) + { + this.rootLocation = rootLocation; + } + + public Location getRootLocation() + { + return rootLocation; + } + + @Override + public TrinoInputFile newInputFile(Location location) + { + ensureNotRootLocation(location); + ensureNotEndWithSlash(location); + return new AlluxioFileSystemInputFile(location, null, fileSystem, mountRoot, Optional.empty()); + } + + @Override + public TrinoInputFile newInputFile(Location location, long length) + { + ensureNotRootLocation(location); + ensureNotEndWithSlash(location); + return new AlluxioFileSystemInputFile(location, length, fileSystem, mountRoot, Optional.empty()); + } + + @Override + public TrinoInputFile newInputFile(Location location, long length, Instant lastModified) + { + ensureNotRootLocation(location); + ensureNotEndWithSlash(location); + return new AlluxioFileSystemInputFile(location, length, fileSystem, mountRoot, Optional.of(lastModified)); + } + + @Override + public TrinoOutputFile newOutputFile(Location location) + { + ensureNotRootLocation(location); + ensureNotEndWithSlash(location); + return new AlluxioFileSystemOutputFile(rootLocation, location, fileSystem, mountRoot); + } + + @Override + public void deleteFile(Location location) + throws IOException + { + ensureNotRootLocation(location); + ensureNotEndWithSlash(location); + try { + fileSystem.delete(convertToAlluxioURI(location, mountRoot)); + } + catch (FileDoesNotExistException e) { + } + catch (AlluxioException e) { + throw new IOException("Error deleteFile %s".formatted(location), e); + } + } + + @Override + public void deleteDirectory(Location location) + throws IOException + { + try { + AlluxioURI uri = convertToAlluxioURI(location, mountRoot); + URIStatus status = fileSystem.getStatus(uri); + if (status == null) { + return; + } + if (!status.isFolder()) { + throw new IOException("delete directory cannot be called on a file %s".formatted(location)); + } + DeletePOptions deletePOptions = DeletePOptions.newBuilder().setRecursive(true).build(); + // recursive delete on the root directory must be handled manually + if (location.path().isEmpty() || location.path().equals(mountRoot)) { + for (URIStatus uriStatus : fileSystem.listStatus(uri)) { + fileSystem.delete(new AlluxioURI(uriStatus.getPath()), deletePOptions); + } + } + else { + fileSystem.delete(uri, deletePOptions); + } + } + catch (FileDoesNotExistException | NotFoundRuntimeException e) { + } + catch (AlluxioException e) { + throw new IOException("Error deleteDirectory %s".formatted(location), e); + } + } + + @Override + public void renameFile(Location source, Location target) + throws IOException + { + try { + ensureNotRootLocation(source); + ensureNotEndWithSlash(source); + ensureNotRootLocation(target); + ensureNotEndWithSlash(target); + } + catch (IllegalStateException e) { + throw new IllegalStateException("Cannot rename file from %s to %s as one of them is root location" + .formatted(source, target), e); + } + AlluxioURI sourceUri = convertToAlluxioURI(source, mountRoot); + AlluxioURI targetUri = convertToAlluxioURI(target, mountRoot); + + try { + if (!fileSystem.exists(sourceUri)) { + throw new IOException("Cannot rename file %s to %s as file %s doesn't exist" + .formatted(source, target, source)); + } + if (fileSystem.exists(targetUri)) { + throw new IOException("Cannot rename file %s to %s as file %s already exists" + .formatted(source, target, target)); + } + URIStatus status = fileSystem.getStatus(sourceUri); + if (status.isFolder()) { + throw new IOException("Cannot rename file %s to %s as %s is a directory" + .formatted(source, target, source)); + } + fileSystem.rename(convertToAlluxioURI(source, mountRoot), convertToAlluxioURI(target, mountRoot)); + } + catch (AlluxioException e) { + throw new IOException("Error renameFile from %s to %s".formatted(source, target), e); + } + } + + @Override + public FileIterator listFiles(Location location) + throws IOException + { + try { + URIStatus status = fileSystem.getStatus(convertToAlluxioURI(location, mountRoot)); + if (status == null) { + new AlluxioFileIterator(Collections.emptyList(), mountRoot); + } + if (!status.isFolder()) { + throw new IOException("Location is not a directory: %s".formatted(location)); + } + } + catch (NotFoundRuntimeException | AlluxioException e) { + return new AlluxioFileIterator(Collections.emptyList(), mountRoot); + } + + try { + List filesStatus = fileSystem.listStatus(convertToAlluxioURI(location, mountRoot), + ListStatusPOptions.newBuilder().setRecursive(true).build()); + return new AlluxioFileIterator(filesStatus.stream().filter(status -> !status.isFolder() & status.isCompleted()).toList(), mountRoot); + } + catch (AlluxioException e) { + throw new IOException("Error listFiles %s".formatted(location), e); + } + } + + @Override + public Optional directoryExists(Location location) + throws IOException + { + if (location.path().isEmpty()) { + return Optional.of(true); + } + try { + URIStatus status = fileSystem.getStatus(convertToAlluxioURI(location, mountRoot)); + if (status != null && status.isFolder()) { + return Optional.of(true); + } + return Optional.of(false); + } + catch (FileDoesNotExistException | FileNotFoundException | NotFoundRuntimeException e) { + return Optional.of(false); + } + catch (AlluxioException e) { + throw new IOException("Error directoryExists %s".formatted(location), e); + } + } + + @Override + public void createDirectory(Location location) + throws IOException + { + try { + AlluxioURI locationUri = convertToAlluxioURI(location, mountRoot); + if (fileSystem.exists(locationUri)) { + URIStatus status = fileSystem.getStatus(locationUri); + if (!status.isFolder()) { + throw new IOException("Cannot create a directory for an existing file location %s" + .formatted(location)); + } + } + fileSystem.createDirectory( + locationUri, + CreateDirectoryPOptions.newBuilder() + .setAllowExists(true) + .setRecursive(true) + .build()); + } + catch (AlluxioException e) { + throw new IOException("Error createDirectory %s".formatted(location), e); + } + } + + @Override + public void renameDirectory(Location source, Location target) + throws IOException + { + try { + ensureNotRootLocation(source); + ensureNotRootLocation(target); + } + catch (IllegalStateException e) { + throw new IOException("Cannot rename directory from %s to %s as one of them is root location" + .formatted(source, target), e); + } + + try { + if (fileSystem.exists(convertToAlluxioURI(target, mountRoot))) { + throw new IOException("Cannot rename %s to %s as file %s already exists" + .formatted(source, target, target)); + } + fileSystem.rename(convertToAlluxioURI(source, mountRoot), convertToAlluxioURI(target, mountRoot)); + } + catch (AlluxioException e) { + throw new IOException("Error renameDirectory from %s to %s".formatted(source, target), e); + } + } + + @Override + public Set listDirectories(Location location) + throws IOException + { + try { + if (isFile(location)) { + throw new IOException("Cannot list directories for a file %s".formatted(location)); + } + List filesStatus = fileSystem.listStatus(convertToAlluxioURI(location, mountRoot)); + return filesStatus.stream() + .filter(URIStatus::isFolder) + .map((URIStatus fileStatus) -> AlluxioUtils.convertToLocation(fileStatus, mountRoot)) + .map(loc -> { + if (!loc.toString().endsWith("/")) { + return Location.of(loc + "/"); + } + else { + return loc; + } + }) + .collect(Collectors.toSet()); + } + catch (FileDoesNotExistException | FileNotFoundException | NotFoundRuntimeException e) { + return Collections.emptySet(); + } + catch (AlluxioException e) { + throw new IOException("Error listDirectories %s".formatted(location), e); + } + } + + @Override + public Optional createTemporaryDirectory(Location targetPath, String temporaryPrefix, String relativePrefix) + throws IOException + { + // allow for absolute or relative temporary prefix + Location temporary; + if (temporaryPrefix.startsWith("/")) { + String prefix = temporaryPrefix; + while (prefix.startsWith("/")) { + prefix = prefix.substring(1); + } + temporary = targetPath.appendPath(prefix); + } + else { + temporary = targetPath.appendPath(temporaryPrefix); + } + + temporary = temporary.appendPath(randomUUID().toString()); + + createDirectory(temporary); + return Optional.of(temporary); + } + + private void ensureNotRootLocation(Location location) + { + String locationPath = location.path(); + while (locationPath.endsWith("/")) { + locationPath = locationPath.substring(0, locationPath.length() - 1); + } + + String rootLocationPath = rootLocation.path(); + while (rootLocationPath.endsWith("/")) { + rootLocationPath = rootLocationPath.substring(0, rootLocationPath.length() - 1); + } + + if (rootLocationPath.equals(locationPath)) { + throw new IllegalStateException("Illegal operation on %s".formatted(location)); + } + } + + private void ensureNotEndWithSlash(Location location) + { + String locationPath = location.path(); + if (locationPath.endsWith("/")) { + throw new IllegalStateException("Illegal operation on %s".formatted(location)); + } + } + + private boolean isFile(Location location) + { + try { + URIStatus status = fileSystem.getStatus(convertToAlluxioURI(location, mountRoot)); + if (status == null) { + return false; + } + return !status.isFolder(); + } + catch (NotFoundRuntimeException | AlluxioException | IOException e) { + return false; + } + } +} diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemFactory.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemFactory.java new file mode 100644 index 0000000000000..e14b04fd1d6ca --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import alluxio.client.file.FileSystem; +import alluxio.client.file.FileSystemContext; +import alluxio.conf.AlluxioConfiguration; +import alluxio.conf.Configuration; +import com.google.inject.Inject; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.spi.security.ConnectorIdentity; + +public class AlluxioFileSystemFactory + implements TrinoFileSystemFactory +{ + private AlluxioConfiguration conf = Configuration.global(); + + @Inject + public AlluxioFileSystemFactory() + { + } + + public void setConf(AlluxioConfiguration conf) + { + this.conf = conf; + } + + public AlluxioConfiguration getConf() + { + return conf; + } + + @Override + public TrinoFileSystem create(ConnectorIdentity identity) + { + FileSystemContext fsContext = FileSystemContext.create(conf); + FileSystem fileSystem = FileSystem.Factory.create(fsContext); + return new AlluxioFileSystem(fileSystem); + } +} diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInput.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInput.java new file mode 100644 index 0000000000000..1aec9275e366f --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInput.java @@ -0,0 +1,87 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import alluxio.client.file.FileInStream; +import io.trino.filesystem.TrinoInput; +import io.trino.filesystem.TrinoInputFile; + +import java.io.IOException; + +import static java.lang.Math.min; +import static java.util.Objects.checkFromIndexSize; +import static java.util.Objects.requireNonNull; + +public class AlluxioFileSystemInput + implements TrinoInput +{ + private final FileInStream stream; + + private final TrinoInputFile inputFile; + + private volatile boolean closed; + + public AlluxioFileSystemInput(FileInStream stream, TrinoInputFile inputFile) + { + this.stream = requireNonNull(stream, "stream is null"); + this.inputFile = requireNonNull(inputFile, "inputFile is null"); + } + + @Override + public void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength) + throws IOException + { + ensureOpen(); + checkFromIndexSize(bufferOffset, bufferLength, buffer.length); + if (position + bufferLength > inputFile.length()) { + throw new IOException("readFully position overflow %s. pos %d + buffer length %d > file size %d" + .formatted(inputFile.location(), position, bufferLength, inputFile.length())); + } + stream.positionedRead(position, buffer, bufferOffset, bufferLength); + } + + @Override + public int readTail(byte[] buffer, int bufferOffset, int bufferLength) + throws IOException + { + ensureOpen(); + checkFromIndexSize(bufferOffset, bufferLength, buffer.length); + long fileSize = inputFile.length(); + int readSize = (int) min(fileSize, bufferLength); + readFully(fileSize - readSize, buffer, bufferOffset, readSize); + return readSize; + } + + @Override + public String toString() + { + return inputFile.toString(); + } + + @Override + public void close() + throws IOException + { + closed = true; + stream.close(); + } + + private void ensureOpen() + throws IOException + { + if (closed) { + throw new IOException("Input stream closed: " + this); + } + } +} diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInputFile.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInputFile.java new file mode 100644 index 0000000000000..2faea40d4fa35 --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInputFile.java @@ -0,0 +1,170 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import alluxio.client.file.FileInStream; +import alluxio.client.file.FileSystem; +import alluxio.client.file.URIStatus; +import alluxio.exception.AlluxioException; +import alluxio.exception.FileDoesNotExistException; +import alluxio.exception.runtime.NotFoundRuntimeException; +import alluxio.grpc.OpenFilePOptions; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoInput; +import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.TrinoInputStream; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.time.Instant; +import java.util.Optional; + +import static io.trino.filesystem.alluxio.AlluxioUtils.convertToAlluxioURI; +import static java.util.Objects.requireNonNull; + +public class AlluxioFileSystemInputFile + implements TrinoInputFile +{ + private final Location location; + + private final FileSystem fileSystem; + + private final String mountRoot; + private Optional lastModified; + + private Long length; + + private URIStatus status; + + public AlluxioFileSystemInputFile(Location location, Long length, FileSystem fileSystem, String mountRoot, Optional lastModified) + { + this.location = requireNonNull(location, "location is null"); + this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); + this.mountRoot = requireNonNull(mountRoot, "mountRoot is null"); + this.length = length; + this.lastModified = requireNonNull(lastModified, "lastModified is null"); + } + + @Override + public TrinoInput newInput() + throws IOException + { + try { + return new AlluxioFileSystemInput(openFile(), this); + } + catch (AlluxioException e) { + throw new IOException("Error newInput() file: %s".formatted(location), e); + } + } + + @Override + public TrinoInputStream newStream() + throws IOException + { + try { + return new AlluxioTrinoInputStream(location, openFile(), getStatus()); + } + catch (AlluxioException e) { + throw new IOException("Error newStream() file: %s".formatted(location), e); + } + } + + private FileInStream openFile() + throws IOException, AlluxioException + { + if (!exists()) { + throw new FileNotFoundException("File does not exist: " + location); + } + return fileSystem.openFile(getStatus(), OpenFilePOptions.getDefaultInstance()); + } + + private URIStatus getStatus(boolean lazy) + throws IOException + { + if (lazy) { + if (status == null) { + getStatus(); + } + return status; + } + return getStatus(); + } + + private URIStatus getStatus() + throws IOException + { + try { + //TODO: create a URIStatus object based on the location field + status = fileSystem.getStatus(convertToAlluxioURI(location, mountRoot)); + } + catch (FileDoesNotExistException | NotFoundRuntimeException e) { + return null; + } + catch (AlluxioException | IOException e) { + throw new IOException("Get status for file %s failed: %s".formatted(location, e.getMessage()), e); + } + return status; + } + + @Override + public long length() + throws IOException + { + if (length == null) { + URIStatus status = getStatus(true); + if (status == null) { + throw new FileNotFoundException("File does not exist: %s".formatted(location)); + } + length = status.getLength(); + } + return length; + } + + @Override + public Instant lastModified() + throws IOException + { + if (lastModified.isEmpty()) { + URIStatus status = getStatus(true); + if (status == null) { + throw new FileNotFoundException("File does not exist: %s".formatted(location)); + } + lastModified = Optional.of(Instant.ofEpochMilli(status.getLastModificationTimeMs())); + } + return lastModified.orElseThrow(); + } + + @Override + public boolean exists() + throws IOException + { + URIStatus status = getStatus(); + if (status == null || !status.isCompleted()) { + return false; + } + return true; + } + + @Override + public Location location() + { + return location; + } + + @Override + public String toString() + { + return location().toString(); + } +} diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemModule.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemModule.java new file mode 100644 index 0000000000000..7a61711762c5c --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemModule.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import com.google.inject.Binder; +import com.google.inject.Module; + +import static com.google.inject.Scopes.SINGLETON; + +public class AlluxioFileSystemModule + implements Module +{ + @Override + public void configure(Binder binder) + { + binder.bind(AlluxioFileSystemFactory.class).in(SINGLETON); + } +} diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemOutputFile.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemOutputFile.java new file mode 100644 index 0000000000000..19d880fb4c8b6 --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemOutputFile.java @@ -0,0 +1,132 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import alluxio.client.file.FileOutStream; +import alluxio.client.file.FileSystem; +import alluxio.exception.AlluxioException; +import alluxio.grpc.CreateFilePOptions; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoOutputFile; +import io.trino.memory.context.AggregatedMemoryContext; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.FileAlreadyExistsException; + +import static io.trino.filesystem.alluxio.AlluxioUtils.convertToAlluxioURI; +import static java.util.Objects.requireNonNull; + +public class AlluxioFileSystemOutputFile + implements TrinoOutputFile +{ + private final Location rootLocation; + + private final Location location; + + private final FileSystem fileSystem; + + private final String mountRoot; + + public AlluxioFileSystemOutputFile(Location rootLocation, Location location, FileSystem fileSystem, String mountRoot) + { + this.rootLocation = rootLocation; + this.location = requireNonNull(location, "location is null"); + this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); + this.mountRoot = requireNonNull(mountRoot, "mountRoot is null"); + } + + @Override + public void createExclusive(byte[] data) + throws IOException + { + throwIfAlreadyExists(); + try (FileOutStream outStream = + fileSystem.createFile(convertToAlluxioURI(location, mountRoot), CreateFilePOptions.newBuilder().setRecursive(true).setOverwrite(true).build())) { + outStream.write(data); + } + catch (AlluxioException e) { + throw new IOException("Error createOrOverwrite %s".formatted(location), e); + } + } + + @Override + public void createOrOverwrite(byte[] data) + throws IOException + { + ensureOutputFileNotOutsideOfRoot(location); + try (FileOutStream outStream = + fileSystem.createFile(convertToAlluxioURI(location, mountRoot), + CreateFilePOptions.newBuilder() + .setOverwrite(true) + .setRecursive(true) + .build())) { + outStream.write(data); + } + catch (AlluxioException e) { + throw new IOException("Error createOrOverwrite %s".formatted(location), e); + } + } + + @Override + public OutputStream create(AggregatedMemoryContext memoryContext) + throws IOException + { + throwIfAlreadyExists(); + try { + return new AlluxioTrinoOutputStream( + location, + fileSystem.createFile( + convertToAlluxioURI(location, mountRoot), + CreateFilePOptions.newBuilder().setRecursive(true).build())); + } + catch (AlluxioException e) { + throw new IOException("Error create %s".formatted(location), e); + } + } + + @Override + public Location location() + { + return location; + } + + @Override + public String toString() + { + return location().toString(); + } + + private void ensureOutputFileNotOutsideOfRoot(Location location) + throws IOException + { + String path = AlluxioUtils.simplifyPath(location.path()); + if (rootLocation != null && !path.startsWith(rootLocation.path())) { + throw new IOException("Output file %s outside of root is not allowed".formatted(location)); + } + } + + private void throwIfAlreadyExists() + throws IOException + { + try { + if (fileSystem.exists(convertToAlluxioURI(location, mountRoot))) { + throw new FileAlreadyExistsException("File %s already exists".formatted(location)); + } + } + catch (AlluxioException e) { + throw new IOException("Error create %s".formatted(location), e); + } + } +} diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java new file mode 100644 index 0000000000000..09dabdf950c2b --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java @@ -0,0 +1,136 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import alluxio.client.file.FileInStream; +import alluxio.client.file.URIStatus; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoInputStream; + +import java.io.IOException; + +import static java.util.Objects.checkFromIndexSize; +import static java.util.Objects.requireNonNull; + +public class AlluxioTrinoInputStream + extends TrinoInputStream +{ + private final Location location; + private final FileInStream stream; + private final URIStatus fileStatus; + private boolean closed; + + public AlluxioTrinoInputStream(Location location, FileInStream stream, URIStatus fileStatus) + { + this.location = requireNonNull(location, "location is null"); + this.stream = requireNonNull(stream, "stream is null"); + this.fileStatus = requireNonNull(fileStatus, "fileStatus is null"); + } + + @Override + public long getPosition() + throws IOException + { + ensureOpen(); + try { + return stream.getPos(); + } + catch (IOException e) { + throw new IOException("Get position for file %s failed: %s".formatted(location, e.getMessage()), e); + } + } + + @Override + public void seek(long position) + throws IOException + { + ensureOpen(); + if (position < 0) { + throw new IOException("Negative seek offset"); + } + if (position > fileStatus.getLength()) { + throw new IOException("Cannot seek to %s. File size is %s: %s".formatted(position, fileStatus.getLength(), location)); + } + try { + stream.seek(position); + } + catch (IOException e) { + throw new IOException("Cannot seek to %s: %s".formatted(position, location)); + } + } + + @Override + public long skip(long n) + throws IOException + { + ensureOpen(); + try { + return stream.skip(n); + } + catch (IOException e) { + throw new IOException("Skipping %s bytes of file %s failed: %s".formatted(n, location, e.getMessage()), e); + } + } + + @Override + public int read() + throws IOException + { + ensureOpen(); + try { + return stream.read(); + } + catch (IOException e) { + throw new IOException("Read of file %s failed: %s".formatted(location, e.getMessage()), e); + } + } + + @Override + public int read(byte[] b, int off, int len) + throws IOException + { + ensureOpen(); + checkFromIndexSize(off, len, b.length); + try { + return stream.read(b, off, len); + } + catch (IOException e) { + throw new IOException("Read of file %s failed: %s".formatted(location, e.getMessage()), e); + } + } + + @Override + public void close() + throws IOException + { + closed = true; + stream.close(); + } + + private void ensureOpen() + throws IOException + { + if (closed) { + throw new IOException("Input stream closed: " + location); + } + } + + @Override + public int available() + throws IOException + { + ensureOpen(); + return super.available(); + } +} diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java new file mode 100644 index 0000000000000..b32c45ec2e9c7 --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java @@ -0,0 +1,67 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import io.trino.filesystem.Location; + +import java.io.IOException; +import java.io.OutputStream; + +public class AlluxioTrinoOutputStream + extends OutputStream +{ + private final Location location; + + private final OutputStream delegatedStream; + + private volatile boolean closed; + + public AlluxioTrinoOutputStream(Location location, OutputStream stream) + { + this.location = location; + delegatedStream = stream; + } + + @Override + public void write(int b) + throws IOException + { + ensureOpen(); + delegatedStream.write(b); + } + + @Override + public void flush() + throws IOException + { + ensureOpen(); + delegatedStream.flush(); + } + + @Override + public void close() + throws IOException + { + closed = true; + delegatedStream.close(); + } + + private void ensureOpen() + throws IOException + { + if (closed) { + throw new IOException("Output stream for %s closed: ".formatted(location) + this); + } + } +} diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioUtils.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioUtils.java new file mode 100644 index 0000000000000..74bfcf1a8ebc5 --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioUtils.java @@ -0,0 +1,134 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import alluxio.AlluxioURI; +import alluxio.client.file.URIStatus; +import io.grpc.netty.UdsNameResolverProvider; +import io.trino.filesystem.Location; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Optional; + +public class AlluxioUtils +{ + private AlluxioUtils() {} + + public static Location convertToLocation(URIStatus fileStatus, String mountRoot) + { + if (fileStatus == null) { + return null; + } + String path = fileStatus.getPath(); + if (path == null) { + return null; + } + if (path.isEmpty()) { + return Location.of(""); + } + if (path.startsWith("alluxio://")) { + return Location.of(fileStatus.getPath()); + } + + String schema = "alluxio://"; + if (path.startsWith("/")) { + while (path.startsWith("/")) { + path = path.substring(1); + } + } + String mountRootWithSlash = mountRoot; + if (!mountRoot.endsWith("/")) { + mountRootWithSlash = mountRoot + "/"; + } + return Location.of(schema + mountRootWithSlash + path); + } + + public static String simplifyPath(String path) + { + // Use a deque to store the path components + Deque deque = new ArrayDeque<>(); + String[] segments = path.split("/"); + + for (String segment : segments) { + if (segment.isEmpty() || segment.equals(".")) { + // Ignore empty and current directory segments + continue; + } + if (segment.equals("..")) { + // If there's a valid directory to go back to, remove it + if (!deque.isEmpty()) { + deque.pollLast(); + } + } + else { + // Add the directory to the deque + deque.offerLast(segment); + } + } + + // Build the final simplified path from the deque + StringBuilder simplifiedPath = new StringBuilder(); + for (String dir : deque) { + simplifiedPath.append(dir).append("/"); + } + + // Retain trailing slash if it was in the original path + if (!path.endsWith("/") && simplifiedPath.length() > 0) { + simplifiedPath.setLength(simplifiedPath.length() - 1); + } + + return simplifiedPath.length() == 0 ? "" : simplifiedPath.toString(); + } + + public static AlluxioURI convertToAlluxioURIBak(Location location) + { + //TODO: check if this way builds a correct AlluxioURI + String path = location.path(); + if (!path.startsWith("/")) { + path = "/" + path; + } + return new AlluxioURI(path); + } + + public static AlluxioURI convertToAlluxioURI(Location location, String mountRoot) + { + Optional schema = location.scheme(); + if (schema.isPresent()) { + if (!schema.get().equals("alluxio")) { + return new AlluxioURI(location.toString()); + } + } + String path = location.path(); + while (path.startsWith("/")) { + path = path.substring(1); + } + if (!mountRoot.endsWith("/")) { + mountRoot = mountRoot + "/"; + } + return new AlluxioURI(mountRoot + path); + } + + public static AlluxioURI convertToAlluxioURI(URIStatus fileStatus) + { + //TODO: check if this way builds a correct AlluxioURI + return new AlluxioURI(fileStatus.getPath()); + } + + public static String getUdsDefaultSchema() + { + UdsNameResolverProvider provider = new UdsNameResolverProvider(); + return provider.getDefaultScheme(); + } +} diff --git a/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/AbstractTestAlluxioFileSystem.java b/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/AbstractTestAlluxioFileSystem.java new file mode 100644 index 0000000000000..935b9f01b2002 --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/AbstractTestAlluxioFileSystem.java @@ -0,0 +1,126 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import alluxio.AlluxioURI; +import alluxio.client.file.FileSystem; +import alluxio.client.file.FileSystemContext; +import alluxio.client.file.URIStatus; +import alluxio.conf.Configuration; +import alluxio.conf.InstancedConfiguration; +import alluxio.exception.AlluxioException; +import alluxio.grpc.DeletePOptions; +import io.trino.filesystem.AbstractTestTrinoFileSystem; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.spi.security.ConnectorIdentity; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.TestInstance; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class AbstractTestAlluxioFileSystem + extends AbstractTestTrinoFileSystem +{ + private TrinoFileSystem fileSystem; + private Location rootLocation; + private FileSystem alluxioFs; + private AlluxioFileSystemFactory alluxioFileSystemFactory; + + protected void initialize() + throws IOException + { + this.rootLocation = Location.of("alluxio:///"); + InstancedConfiguration conf = Configuration.copyGlobal(); + FileSystemContext fsContext = FileSystemContext.create(conf); + this.alluxioFs = FileSystem.Factory.create(fsContext); + this.alluxioFileSystemFactory = new AlluxioFileSystemFactory(); + this.alluxioFileSystemFactory.setConf(conf); + this.fileSystem = alluxioFileSystemFactory.create(ConnectorIdentity.ofUser("alluxio")); + ((AlluxioFileSystem) fileSystem).setRootLocation(rootLocation); + } + + @AfterAll + void tearDown() + { + fileSystem = null; + alluxioFs = null; + rootLocation = null; + alluxioFileSystemFactory = null; + } + + @AfterEach + void afterEach() + throws IOException, AlluxioException + { + AlluxioURI root = new AlluxioURI(getRootLocation().toString()); + + for (URIStatus status : alluxioFs.listStatus(root)) { + alluxioFs.delete(new AlluxioURI(status.getPath()), DeletePOptions.newBuilder().setRecursive(true).build()); + } + } + + @Override + protected boolean isHierarchical() + { + return true; + } + + @Override + protected TrinoFileSystem getFileSystem() + { + return fileSystem; + } + + @Override + protected Location getRootLocation() + { + return rootLocation; + } + + @Override + protected void verifyFileSystemIsEmpty() + { + AlluxioURI bucket = + AlluxioUtils.convertToAlluxioURI(rootLocation, ((AlluxioFileSystem) fileSystem).getMountRoot()); + try { + assertThat(alluxioFs.listStatus(bucket)).isEmpty(); + } + catch (IOException | AlluxioException e) { + throw new RuntimeException(e); + } + } + + @Override + protected final boolean supportsCreateExclusive() + { + return true; + } + + @Override + protected final boolean supportsRenameFile() + { + return true; + } + + @Override + protected boolean supportsIncompleteWriteNoClobber() + { + return false; + } +} diff --git a/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioFileSystem.java b/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioFileSystem.java new file mode 100644 index 0000000000000..188d7e917cab6 --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioFileSystem.java @@ -0,0 +1,95 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +public class TestAlluxioFileSystem + extends AbstractTestAlluxioFileSystem +{ + private static final String IMAGE_NAME = "alluxio/alluxio:2.9.5"; + public static final DockerImageName ALLUXIO_IMAGE = DockerImageName.parse(IMAGE_NAME); + + @Container + private static final GenericContainer ALLUXIO_MASTER_CONTAINER = createAlluxioMasterContainer(); + + @Container + private static final GenericContainer ALLUXIO_WORKER_CONTAINER = createAlluxioWorkerContainer(); + + private static GenericContainer createAlluxioMasterContainer() + { + GenericContainer container = new GenericContainer<>(ALLUXIO_IMAGE); + container.withCommand("master-only") + .withEnv("ALLUXIO_JAVA_OPTS", + "-Dalluxio.security.authentication.type=NOSASL " + + "-Dalluxio.master.hostname=localhost " + + "-Dalluxio.worker.hostname=localhost " + + "-Dalluxio.master.mount.table.root.ufs=/opt/alluxio/underFSStorage " + + "-Dalluxio.master.journal.type=NOOP " + + "-Dalluxio.security.authorization.permission.enabled=false " + + "-Dalluxio.security.authorization.plugins.enabled=false ") + .withNetworkMode("host") + .withAccessToHost(true) + .waitingFor(Wait.forLogMessage(".*Primary started*\n", 1)); + return container; + } + + private static GenericContainer createAlluxioWorkerContainer() + { + GenericContainer container = new GenericContainer<>(ALLUXIO_IMAGE); + container.withCommand("worker-only") + .withNetworkMode("host") + .withEnv("ALLUXIO_JAVA_OPTS", + "-Dalluxio.security.authentication.type=NOSASL " + + "-Dalluxio.worker.ramdisk.size=128MB " + + "-Dalluxio.worker.hostname=localhost " + + "-Dalluxio.worker.tieredstore.level0.alias=HDD " + + "-Dalluxio.worker.tieredstore.level0.dirs.path=/tmp " + + "-Dalluxio.master.hostname=localhost " + + "-Dalluxio.security.authorization.permission.enabled=false " + + "-Dalluxio.security.authorization.plugins.enabled=false ") + .withAccessToHost(true) + .dependsOn(ALLUXIO_MASTER_CONTAINER) + .waitingFor(Wait.forLogMessage(".*Alluxio worker started.*\n", 1)); + return container; + } + + @BeforeAll + void setup() + throws IOException + { + initialize(); + // the SSHD container will be stopped by TestContainers on shutdown + // https://github.com/trinodb/trino/discussions/21969 + System.setProperty("ReportLeakedContainers.disabled", "true"); + } + + @Test + void testContainer() + { + assertThat(ALLUXIO_MASTER_CONTAINER.isRunning()).isTrue(); + assertThat(ALLUXIO_WORKER_CONTAINER.isRunning()).isTrue(); + } +} diff --git a/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioUtils.java b/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioUtils.java new file mode 100644 index 0000000000000..5c42d7e8cd81c --- /dev/null +++ b/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/TestAlluxioUtils.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.alluxio; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestAlluxioUtils +{ + @Test + public void test() + { + String path = "test/level0-file0"; + Assertions.assertEquals(path, AlluxioUtils.simplifyPath(path)); + + path = "a/./b/../../c/"; + Assertions.assertEquals("c/", AlluxioUtils.simplifyPath(path)); + } +} diff --git a/lib/trino-filesystem-cache-alluxio/pom.xml b/lib/trino-filesystem-cache-alluxio/pom.xml index a80e35640c6d5..56cc3462e6b79 100644 --- a/lib/trino-filesystem-cache-alluxio/pom.xml +++ b/lib/trino-filesystem-cache-alluxio/pom.xml @@ -82,6 +82,18 @@ org.alluxio alluxio-core-common + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.slf4j + log4j-over-slf4j + + + org.slf4j + slf4j-api + org.slf4j slf4j-reload4j diff --git a/lib/trino-filesystem-manager/pom.xml b/lib/trino-filesystem-manager/pom.xml index bc8da0fef1f0b..cfd5eeb5980ec 100644 --- a/lib/trino-filesystem-manager/pom.xml +++ b/lib/trino-filesystem-manager/pom.xml @@ -42,6 +42,11 @@ trino-filesystem + + io.trino + trino-filesystem-alluxio + + io.trino trino-filesystem-azure diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemConfig.java b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemConfig.java index 4c5b639dfff2e..7394476288d97 100644 --- a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemConfig.java +++ b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemConfig.java @@ -18,6 +18,7 @@ public class FileSystemConfig { private boolean hadoopEnabled; + private boolean alluxioEnabled; private boolean nativeAzureEnabled; private boolean nativeS3Enabled; private boolean nativeGcsEnabled; @@ -35,6 +36,18 @@ public FileSystemConfig setHadoopEnabled(boolean hadoopEnabled) return this; } + public boolean isAlluxioEnabled() + { + return alluxioEnabled; + } + + @Config("fs.alluxio.enabled") + public FileSystemConfig setAlluxioEnabled(boolean nativeAlluxioEnabled) + { + this.alluxioEnabled = nativeAlluxioEnabled; + return this; + } + public boolean isNativeAzureEnabled() { return nativeAzureEnabled; diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java index c7cec86c8b3ad..8bccdb819c686 100644 --- a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java +++ b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java @@ -24,6 +24,8 @@ import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.alluxio.AlluxioFileSystemCacheModule; +import io.trino.filesystem.alluxio.AlluxioFileSystemFactory; +import io.trino.filesystem.alluxio.AlluxioFileSystemModule; import io.trino.filesystem.azure.AzureFileSystemFactory; import io.trino.filesystem.azure.AzureFileSystemModule; import io.trino.filesystem.cache.CacheFileSystemFactory; @@ -89,6 +91,11 @@ protected void setup(Binder binder) var factories = newMapBinder(binder, String.class, TrinoFileSystemFactory.class); + if (config.isAlluxioEnabled()) { + install(new AlluxioFileSystemModule()); + factories.addBinding("alluxio").to(AlluxioFileSystemFactory.class); + } + if (config.isNativeAzureEnabled()) { install(new AzureFileSystemModule()); factories.addBinding("abfs").to(AzureFileSystemFactory.class); diff --git a/lib/trino-filesystem-manager/src/test/java/io/trino/filesystem/manager/TestFileSystemConfig.java b/lib/trino-filesystem-manager/src/test/java/io/trino/filesystem/manager/TestFileSystemConfig.java index 9609484c0a5bd..088d7653d53c9 100644 --- a/lib/trino-filesystem-manager/src/test/java/io/trino/filesystem/manager/TestFileSystemConfig.java +++ b/lib/trino-filesystem-manager/src/test/java/io/trino/filesystem/manager/TestFileSystemConfig.java @@ -29,6 +29,7 @@ public void testDefaults() { assertRecordedDefaults(recordDefaults(FileSystemConfig.class) .setHadoopEnabled(false) + .setAlluxioEnabled(false) .setNativeAzureEnabled(false) .setNativeS3Enabled(false) .setNativeGcsEnabled(false) @@ -40,6 +41,7 @@ public void testExplicitPropertyMappings() { Map properties = ImmutableMap.builder() .put("fs.hadoop.enabled", "true") + .put("fs.alluxio.enabled", "true") .put("fs.native-azure.enabled", "true") .put("fs.native-s3.enabled", "true") .put("fs.native-gcs.enabled", "true") @@ -48,6 +50,7 @@ public void testExplicitPropertyMappings() FileSystemConfig expected = new FileSystemConfig() .setHadoopEnabled(true) + .setAlluxioEnabled(true) .setNativeAzureEnabled(true) .setNativeS3Enabled(true) .setNativeGcsEnabled(true) diff --git a/pom.xml b/pom.xml index 9eb29d64875aa..3d42debcdd2b2 100644 --- a/pom.xml +++ b/pom.xml @@ -43,6 +43,7 @@ lib/trino-array lib/trino-cache lib/trino-filesystem + lib/trino-filesystem-alluxio lib/trino-filesystem-azure lib/trino-filesystem-cache-alluxio lib/trino-filesystem-gcs @@ -1102,6 +1103,18 @@ test-jar + + io.trino + trino-filesystem-alluxio + ${project.version} + + + io.perfmark + perfmark-api + + + + io.trino trino-filesystem-azure @@ -1736,14 +1749,6 @@ alluxio-core-client-fs ${dep.alluxio.version} - - io.grpc - grpc-core - - - io.grpc - grpc-stub - org.alluxio alluxio-core-common @@ -1776,30 +1781,6 @@ io.etcd jetcd-core - - io.grpc - grpc-api - - - io.grpc - grpc-core - - - io.grpc - grpc-netty - - - io.grpc - grpc-services - - - io.grpc - grpc-stub - - - io.netty - netty-tcnative-boringssl-static - org.alluxio alluxio-core-common @@ -1827,6 +1808,30 @@ + + org.alluxio + alluxio-core-transport + ${dep.alluxio.version} + + + org.apache.logging.log4j + log4j-core + + + + + + org.alluxio + alluxio-underfs-local + ${dep.alluxio.version} + + + org.apache.logging.log4j + log4j-core + + + + org.antlr antlr4-runtime @@ -2574,6 +2579,10 @@ org.alluxio alluxio-core-transport + + org.alluxio + alluxio-underfs-local + git.properties @@ -2651,6 +2660,25 @@ opencensus/proto/trace/v1/trace_config.proto + + + + org.alluxio + alluxio-core-client-fs + + + org.alluxio + alluxio-core-common + + + org.alluxio + alluxio-core-transport + + + + git.properties + + From 7c06a04e9c2f812e870124d9d548844b0688f14b Mon Sep 17 00:00:00 2001 From: JiamingMai Date: Thu, 19 Sep 2024 18:55:41 +0800 Subject: [PATCH 2/3] update the codes according to the comments --- .../alluxio/AlluxioFileIterator.java | 18 +++++------------- .../filesystem/alluxio/AlluxioFileSystem.java | 2 +- .../alluxio/AlluxioTrinoInputStream.java | 2 +- .../alluxio/AlluxioTrinoOutputStream.java | 14 +++++++------- 4 files changed, 14 insertions(+), 22 deletions(-) diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileIterator.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileIterator.java index 49139c0a2cf71..ba16de33df26b 100644 --- a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileIterator.java +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileIterator.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.time.Instant; +import java.util.Iterator; import java.util.List; import java.util.Optional; @@ -29,13 +30,12 @@ public class AlluxioFileIterator implements FileIterator { - private final List files; + private final Iterator files; private final String mountRoot; - private int index; public AlluxioFileIterator(List files, String mountRoot) { - this.files = requireNonNull(files, "files is null"); + this.files = requireNonNull(files.iterator(), "files is null"); this.mountRoot = requireNonNull(mountRoot, "mountRoot is null"); } @@ -43,15 +43,7 @@ public AlluxioFileIterator(List files, String mountRoot) public boolean hasNext() throws IOException { - if (files.isEmpty() || index >= files.size()) { - return false; - } - if (files.get(index) != null) { - return true; - } - else { - return false; - } + return files.hasNext(); } @Override @@ -61,7 +53,7 @@ public FileEntry next() if (!hasNext()) { return null; } - URIStatus fileStatus = files.get(index++); + URIStatus fileStatus = files.next(); Location location = convertToLocation(fileStatus, mountRoot); return new FileEntry( location, diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java index e1b5d8af47bee..4efe5e60e40c5 100644 --- a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java @@ -51,7 +51,7 @@ public class AlluxioFileSystem public AlluxioFileSystem(FileSystem fileSystem) { - this.fileSystem = fileSystem; + this.fileSystem = requireNonNull(fileSystem, "filesystem is null"); mountRoot = "/"; // default alluxio mount root } diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java index 09dabdf950c2b..8c7274df6b146 100644 --- a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java @@ -23,7 +23,7 @@ import static java.util.Objects.checkFromIndexSize; import static java.util.Objects.requireNonNull; -public class AlluxioTrinoInputStream +public final class AlluxioTrinoInputStream extends TrinoInputStream { private final Location location; diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java index b32c45ec2e9c7..3cd1e2ad0b1bd 100644 --- a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java @@ -18,19 +18,19 @@ import java.io.IOException; import java.io.OutputStream; -public class AlluxioTrinoOutputStream +public final class AlluxioTrinoOutputStream extends OutputStream { private final Location location; - private final OutputStream delegatedStream; + private final OutputStream delegate; private volatile boolean closed; - public AlluxioTrinoOutputStream(Location location, OutputStream stream) + public AlluxioTrinoOutputStream(Location location, OutputStream delegate) { this.location = location; - delegatedStream = stream; + this.delegate = delegate; } @Override @@ -38,7 +38,7 @@ public void write(int b) throws IOException { ensureOpen(); - delegatedStream.write(b); + delegate.write(b); } @Override @@ -46,7 +46,7 @@ public void flush() throws IOException { ensureOpen(); - delegatedStream.flush(); + delegate.flush(); } @Override @@ -54,7 +54,7 @@ public void close() throws IOException { closed = true; - delegatedStream.close(); + delegate.close(); } private void ensureOpen() From e71819769181db81059988fa0ba857e42702a017 Mon Sep 17 00:00:00 2001 From: Jianjian Date: Thu, 19 Sep 2024 11:46:45 -0700 Subject: [PATCH 3/3] update the codes according to the comments --- .../filesystem/alluxio/AlluxioFileSystem.java | 92 +++++++++---------- .../alluxio/AlluxioFileSystemFactory.java | 14 +-- .../alluxio/AlluxioFileSystemInput.java | 1 - .../alluxio/AlluxioFileSystemInputFile.java | 50 +++++----- .../alluxio/AlluxioFileSystemOutputFile.java | 28 +----- .../alluxio/AlluxioTrinoInputStream.java | 1 + .../alluxio/AlluxioTrinoOutputStream.java | 1 - .../filesystem/alluxio/AlluxioUtils.java | 6 +- .../AbstractTestAlluxioFileSystem.java | 11 +-- 9 files changed, 79 insertions(+), 125 deletions(-) diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java index 4efe5e60e40c5..d922b66aa4bc8 100644 --- a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java @@ -38,20 +38,20 @@ import java.util.stream.Collectors; import static io.trino.filesystem.alluxio.AlluxioUtils.convertToAlluxioURI; +import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; public class AlluxioFileSystem implements TrinoFileSystem { - private final FileSystem fileSystem; + private final FileSystem alluxioClient; + private final String mountRoot; private Location rootLocation; - private final String mountRoot; - - public AlluxioFileSystem(FileSystem fileSystem) + public AlluxioFileSystem(FileSystem alluxioClient) { - this.fileSystem = requireNonNull(fileSystem, "filesystem is null"); + this.alluxioClient = requireNonNull(alluxioClient, "filesystem is null"); mountRoot = "/"; // default alluxio mount root } @@ -65,17 +65,12 @@ public void setRootLocation(Location rootLocation) this.rootLocation = rootLocation; } - public Location getRootLocation() - { - return rootLocation; - } - @Override public TrinoInputFile newInputFile(Location location) { ensureNotRootLocation(location); ensureNotEndWithSlash(location); - return new AlluxioFileSystemInputFile(location, null, fileSystem, mountRoot, Optional.empty()); + return new AlluxioFileSystemInputFile(location, null, alluxioClient, mountRoot, Optional.empty()); } @Override @@ -83,7 +78,7 @@ public TrinoInputFile newInputFile(Location location, long length) { ensureNotRootLocation(location); ensureNotEndWithSlash(location); - return new AlluxioFileSystemInputFile(location, length, fileSystem, mountRoot, Optional.empty()); + return new AlluxioFileSystemInputFile(location, length, alluxioClient, mountRoot, Optional.empty()); } @Override @@ -91,7 +86,7 @@ public TrinoInputFile newInputFile(Location location, long length, Instant lastM { ensureNotRootLocation(location); ensureNotEndWithSlash(location); - return new AlluxioFileSystemInputFile(location, length, fileSystem, mountRoot, Optional.of(lastModified)); + return new AlluxioFileSystemInputFile(location, length, alluxioClient, mountRoot, Optional.of(lastModified)); } @Override @@ -99,7 +94,7 @@ public TrinoOutputFile newOutputFile(Location location) { ensureNotRootLocation(location); ensureNotEndWithSlash(location); - return new AlluxioFileSystemOutputFile(rootLocation, location, fileSystem, mountRoot); + return new AlluxioFileSystemOutputFile(rootLocation, location, alluxioClient, mountRoot); } @Override @@ -109,9 +104,9 @@ public void deleteFile(Location location) ensureNotRootLocation(location); ensureNotEndWithSlash(location); try { - fileSystem.delete(convertToAlluxioURI(location, mountRoot)); + alluxioClient.delete(convertToAlluxioURI(location, mountRoot)); } - catch (FileDoesNotExistException e) { + catch (FileDoesNotExistException _) { } catch (AlluxioException e) { throw new IOException("Error deleteFile %s".formatted(location), e); @@ -124,7 +119,7 @@ public void deleteDirectory(Location location) { try { AlluxioURI uri = convertToAlluxioURI(location, mountRoot); - URIStatus status = fileSystem.getStatus(uri); + URIStatus status = alluxioClient.getStatus(uri); if (status == null) { return; } @@ -134,12 +129,12 @@ public void deleteDirectory(Location location) DeletePOptions deletePOptions = DeletePOptions.newBuilder().setRecursive(true).build(); // recursive delete on the root directory must be handled manually if (location.path().isEmpty() || location.path().equals(mountRoot)) { - for (URIStatus uriStatus : fileSystem.listStatus(uri)) { - fileSystem.delete(new AlluxioURI(uriStatus.getPath()), deletePOptions); + for (URIStatus uriStatus : alluxioClient.listStatus(uri)) { + alluxioClient.delete(new AlluxioURI(uriStatus.getPath()), deletePOptions); } } else { - fileSystem.delete(uri, deletePOptions); + alluxioClient.delete(uri, deletePOptions); } } catch (FileDoesNotExistException | NotFoundRuntimeException e) { @@ -160,27 +155,27 @@ public void renameFile(Location source, Location target) ensureNotEndWithSlash(target); } catch (IllegalStateException e) { - throw new IllegalStateException("Cannot rename file from %s to %s as one of them is root location" - .formatted(source, target), e); + throw new IllegalStateException( + "Cannot rename file from %s to %s as one of them is root location".formatted(source, target), e); } AlluxioURI sourceUri = convertToAlluxioURI(source, mountRoot); AlluxioURI targetUri = convertToAlluxioURI(target, mountRoot); try { - if (!fileSystem.exists(sourceUri)) { - throw new IOException("Cannot rename file %s to %s as file %s doesn't exist" - .formatted(source, target, source)); + if (!alluxioClient.exists(sourceUri)) { + throw new IOException( + "Cannot rename file %s to %s as file %s doesn't exist".formatted(source, target, source)); } - if (fileSystem.exists(targetUri)) { - throw new IOException("Cannot rename file %s to %s as file %s already exists" - .formatted(source, target, target)); + if (alluxioClient.exists(targetUri)) { + throw new IOException( + "Cannot rename file %s to %s as file %s already exists".formatted(source, target, target)); } - URIStatus status = fileSystem.getStatus(sourceUri); + URIStatus status = alluxioClient.getStatus(sourceUri); if (status.isFolder()) { - throw new IOException("Cannot rename file %s to %s as %s is a directory" - .formatted(source, target, source)); + throw new IOException( + "Cannot rename file %s to %s as %s is a directory".formatted(source, target, source)); } - fileSystem.rename(convertToAlluxioURI(source, mountRoot), convertToAlluxioURI(target, mountRoot)); + alluxioClient.rename(convertToAlluxioURI(source, mountRoot), convertToAlluxioURI(target, mountRoot)); } catch (AlluxioException e) { throw new IOException("Error renameFile from %s to %s".formatted(source, target), e); @@ -192,7 +187,7 @@ public FileIterator listFiles(Location location) throws IOException { try { - URIStatus status = fileSystem.getStatus(convertToAlluxioURI(location, mountRoot)); + URIStatus status = alluxioClient.getStatus(convertToAlluxioURI(location, mountRoot)); if (status == null) { new AlluxioFileIterator(Collections.emptyList(), mountRoot); } @@ -205,7 +200,7 @@ public FileIterator listFiles(Location location) } try { - List filesStatus = fileSystem.listStatus(convertToAlluxioURI(location, mountRoot), + List filesStatus = alluxioClient.listStatus(convertToAlluxioURI(location, mountRoot), ListStatusPOptions.newBuilder().setRecursive(true).build()); return new AlluxioFileIterator(filesStatus.stream().filter(status -> !status.isFolder() & status.isCompleted()).toList(), mountRoot); } @@ -222,7 +217,7 @@ public Optional directoryExists(Location location) return Optional.of(true); } try { - URIStatus status = fileSystem.getStatus(convertToAlluxioURI(location, mountRoot)); + URIStatus status = alluxioClient.getStatus(convertToAlluxioURI(location, mountRoot)); if (status != null && status.isFolder()) { return Optional.of(true); } @@ -242,14 +237,14 @@ public void createDirectory(Location location) { try { AlluxioURI locationUri = convertToAlluxioURI(location, mountRoot); - if (fileSystem.exists(locationUri)) { - URIStatus status = fileSystem.getStatus(locationUri); + if (alluxioClient.exists(locationUri)) { + URIStatus status = alluxioClient.getStatus(locationUri); if (!status.isFolder()) { - throw new IOException("Cannot create a directory for an existing file location %s" - .formatted(location)); + throw new IOException( + "Cannot create a directory for an existing file location %s".formatted(location)); } } - fileSystem.createDirectory( + alluxioClient.createDirectory( locationUri, CreateDirectoryPOptions.newBuilder() .setAllowExists(true) @@ -270,16 +265,15 @@ public void renameDirectory(Location source, Location target) ensureNotRootLocation(target); } catch (IllegalStateException e) { - throw new IOException("Cannot rename directory from %s to %s as one of them is root location" - .formatted(source, target), e); + throw new IOException( + "Cannot rename directory from %s to %s as one of them is root location".formatted(source, target), e); } - try { - if (fileSystem.exists(convertToAlluxioURI(target, mountRoot))) { - throw new IOException("Cannot rename %s to %s as file %s already exists" - .formatted(source, target, target)); + if (alluxioClient.exists(convertToAlluxioURI(target, mountRoot))) { + throw new IOException( + "Cannot rename %s to %s as file %s already exists".formatted(source, target, target)); } - fileSystem.rename(convertToAlluxioURI(source, mountRoot), convertToAlluxioURI(target, mountRoot)); + alluxioClient.rename(convertToAlluxioURI(source, mountRoot), convertToAlluxioURI(target, mountRoot)); } catch (AlluxioException e) { throw new IOException("Error renameDirectory from %s to %s".formatted(source, target), e); @@ -294,7 +288,7 @@ public Set listDirectories(Location location) if (isFile(location)) { throw new IOException("Cannot list directories for a file %s".formatted(location)); } - List filesStatus = fileSystem.listStatus(convertToAlluxioURI(location, mountRoot)); + List filesStatus = alluxioClient.listStatus(convertToAlluxioURI(location, mountRoot)); return filesStatus.stream() .filter(URIStatus::isFolder) .map((URIStatus fileStatus) -> AlluxioUtils.convertToLocation(fileStatus, mountRoot)) @@ -367,7 +361,7 @@ private void ensureNotEndWithSlash(Location location) private boolean isFile(Location location) { try { - URIStatus status = fileSystem.getStatus(convertToAlluxioURI(location, mountRoot)); + URIStatus status = alluxioClient.getStatus(convertToAlluxioURI(location, mountRoot)); if (status == null) { return false; } diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemFactory.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemFactory.java index e14b04fd1d6ca..cc7b6303e061c 100644 --- a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemFactory.java +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemFactory.java @@ -25,28 +25,24 @@ public class AlluxioFileSystemFactory implements TrinoFileSystemFactory { - private AlluxioConfiguration conf = Configuration.global(); + private final AlluxioConfiguration conf; @Inject public AlluxioFileSystemFactory() { + this(Configuration.global()); } - public void setConf(AlluxioConfiguration conf) + public AlluxioFileSystemFactory(AlluxioConfiguration conf) { this.conf = conf; } - public AlluxioConfiguration getConf() - { - return conf; - } - @Override public TrinoFileSystem create(ConnectorIdentity identity) { FileSystemContext fsContext = FileSystemContext.create(conf); - FileSystem fileSystem = FileSystem.Factory.create(fsContext); - return new AlluxioFileSystem(fileSystem); + FileSystem alluxioClient = FileSystem.Factory.create(fsContext); + return new AlluxioFileSystem(alluxioClient); } } diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInput.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInput.java index 1aec9275e366f..44843c5ad5875 100644 --- a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInput.java +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInput.java @@ -27,7 +27,6 @@ public class AlluxioFileSystemInput implements TrinoInput { private final FileInStream stream; - private final TrinoInputFile inputFile; private volatile boolean closed; diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInputFile.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInputFile.java index 2faea40d4fa35..ab9e59b5d8361 100644 --- a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInputFile.java +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInputFile.java @@ -37,14 +37,11 @@ public class AlluxioFileSystemInputFile implements TrinoInputFile { private final Location location; - private final FileSystem fileSystem; - private final String mountRoot; - private Optional lastModified; + private Optional lastModified; private Long length; - private URIStatus status; public AlluxioFileSystemInputFile(Location location, Long length, FileSystem fileSystem, String mountRoot, Optional lastModified) @@ -73,7 +70,7 @@ public TrinoInputStream newStream() throws IOException { try { - return new AlluxioTrinoInputStream(location, openFile(), getStatus()); + return new AlluxioTrinoInputStream(location, openFile(), getURIStatus()); } catch (AlluxioException e) { throw new IOException("Error newStream() file: %s".formatted(location), e); @@ -86,22 +83,24 @@ private FileInStream openFile() if (!exists()) { throw new FileNotFoundException("File does not exist: " + location); } - return fileSystem.openFile(getStatus(), OpenFilePOptions.getDefaultInstance()); + return fileSystem.openFile(getURIStatus(), OpenFilePOptions.getDefaultInstance()); } - private URIStatus getStatus(boolean lazy) + private void loadFileStatus() throws IOException { - if (lazy) { - if (status == null) { - getStatus(); + if (status == null) { + URIStatus fileStatus = getURIStatus(); + if (length == null) { + length = fileStatus.getLength(); + } + if (lastModified.isEmpty()) { + lastModified = Optional.of(Instant.ofEpochMilli(fileStatus.getLastModificationTimeMs())); } - return status; } - return getStatus(); } - private URIStatus getStatus() + private URIStatus getURIStatus() throws IOException { try { @@ -109,7 +108,7 @@ private URIStatus getStatus() status = fileSystem.getStatus(convertToAlluxioURI(location, mountRoot)); } catch (FileDoesNotExistException | NotFoundRuntimeException e) { - return null; + throw new FileNotFoundException("File does not exist: %s".formatted(location)); } catch (AlluxioException | IOException e) { throw new IOException("Get status for file %s failed: %s".formatted(location, e.getMessage()), e); @@ -122,13 +121,9 @@ public long length() throws IOException { if (length == null) { - URIStatus status = getStatus(true); - if (status == null) { - throw new FileNotFoundException("File does not exist: %s".formatted(location)); - } - length = status.getLength(); + loadFileStatus(); } - return length; + return requireNonNull(length, "length is null"); } @Override @@ -136,11 +131,7 @@ public Instant lastModified() throws IOException { if (lastModified.isEmpty()) { - URIStatus status = getStatus(true); - if (status == null) { - throw new FileNotFoundException("File does not exist: %s".formatted(location)); - } - lastModified = Optional.of(Instant.ofEpochMilli(status.getLastModificationTimeMs())); + loadFileStatus(); } return lastModified.orElseThrow(); } @@ -149,11 +140,12 @@ public Instant lastModified() public boolean exists() throws IOException { - URIStatus status = getStatus(); - if (status == null || !status.isCompleted()) { - return false; + try { + return fileSystem.exists(convertToAlluxioURI(location, mountRoot)); + } + catch (AlluxioException e) { + throw new IOException("fail to check file existence", e); } - return true; } @Override diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemOutputFile.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemOutputFile.java index 19d880fb4c8b6..c632fe081d7f4 100644 --- a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemOutputFile.java +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemOutputFile.java @@ -32,46 +32,26 @@ public class AlluxioFileSystemOutputFile implements TrinoOutputFile { private final Location rootLocation; - private final Location location; - private final FileSystem fileSystem; - private final String mountRoot; public AlluxioFileSystemOutputFile(Location rootLocation, Location location, FileSystem fileSystem, String mountRoot) { - this.rootLocation = rootLocation; + this.rootLocation = requireNonNull(rootLocation, "root location is null"); this.location = requireNonNull(location, "location is null"); this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); this.mountRoot = requireNonNull(mountRoot, "mountRoot is null"); } - @Override - public void createExclusive(byte[] data) - throws IOException - { - throwIfAlreadyExists(); - try (FileOutStream outStream = - fileSystem.createFile(convertToAlluxioURI(location, mountRoot), CreateFilePOptions.newBuilder().setRecursive(true).setOverwrite(true).build())) { - outStream.write(data); - } - catch (AlluxioException e) { - throw new IOException("Error createOrOverwrite %s".formatted(location), e); - } - } - @Override public void createOrOverwrite(byte[] data) throws IOException { ensureOutputFileNotOutsideOfRoot(location); - try (FileOutStream outStream = - fileSystem.createFile(convertToAlluxioURI(location, mountRoot), - CreateFilePOptions.newBuilder() - .setOverwrite(true) - .setRecursive(true) - .build())) { + try (FileOutStream outStream = fileSystem.createFile( + convertToAlluxioURI(location, mountRoot), + CreateFilePOptions.newBuilder().setOverwrite(true).setRecursive(true).build())) { outStream.write(data); } catch (AlluxioException e) { diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java index 8c7274df6b146..e23d61c5cf613 100644 --- a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java @@ -29,6 +29,7 @@ public final class AlluxioTrinoInputStream private final Location location; private final FileInStream stream; private final URIStatus fileStatus; + private boolean closed; public AlluxioTrinoInputStream(Location location, FileInStream stream, URIStatus fileStatus) diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java index 3cd1e2ad0b1bd..516268d6eee1c 100644 --- a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java @@ -22,7 +22,6 @@ public final class AlluxioTrinoOutputStream extends OutputStream { private final Location location; - private final OutputStream delegate; private volatile boolean closed; diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioUtils.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioUtils.java index 74bfcf1a8ebc5..f7b50e5778a69 100644 --- a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioUtils.java +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioUtils.java @@ -104,9 +104,9 @@ public static AlluxioURI convertToAlluxioURIBak(Location location) public static AlluxioURI convertToAlluxioURI(Location location, String mountRoot) { - Optional schema = location.scheme(); - if (schema.isPresent()) { - if (!schema.get().equals("alluxio")) { + Optional scheme = location.scheme(); + if (scheme.isPresent()) { + if (!scheme.get().equals("alluxio")) { return new AlluxioURI(location.toString()); } } diff --git a/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/AbstractTestAlluxioFileSystem.java b/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/AbstractTestAlluxioFileSystem.java index 935b9f01b2002..dfd3526d52c38 100644 --- a/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/AbstractTestAlluxioFileSystem.java +++ b/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/AbstractTestAlluxioFileSystem.java @@ -49,8 +49,7 @@ protected void initialize() InstancedConfiguration conf = Configuration.copyGlobal(); FileSystemContext fsContext = FileSystemContext.create(conf); this.alluxioFs = FileSystem.Factory.create(fsContext); - this.alluxioFileSystemFactory = new AlluxioFileSystemFactory(); - this.alluxioFileSystemFactory.setConf(conf); + this.alluxioFileSystemFactory = new AlluxioFileSystemFactory(conf); this.fileSystem = alluxioFileSystemFactory.create(ConnectorIdentity.ofUser("alluxio")); ((AlluxioFileSystem) fileSystem).setRootLocation(rootLocation); } @@ -109,13 +108,7 @@ protected void verifyFileSystemIsEmpty() @Override protected final boolean supportsCreateExclusive() { - return true; - } - - @Override - protected final boolean supportsRenameFile() - { - return true; + return false; } @Override