Skip to content

Commit

Permalink
update the codes according to the comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jja725 committed Sep 19, 2024
1 parent 7c06a04 commit 379d776
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,20 @@
import java.util.stream.Collectors;

import static io.trino.filesystem.alluxio.AlluxioUtils.convertToAlluxioURI;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;

public class AlluxioFileSystem
implements TrinoFileSystem
{
private final FileSystem fileSystem;
private final FileSystem alluxioClient;
private final String mountRoot;

private Location rootLocation;

private final String mountRoot;

public AlluxioFileSystem(FileSystem fileSystem)
public AlluxioFileSystem(FileSystem alluxioClient)
{
this.fileSystem = requireNonNull(fileSystem, "filesystem is null");
this.alluxioClient = requireNonNull(alluxioClient, "filesystem is null");
mountRoot = "/"; // default alluxio mount root
}

Expand All @@ -65,41 +65,36 @@ public void setRootLocation(Location rootLocation)
this.rootLocation = rootLocation;
}

public Location getRootLocation()
{
return rootLocation;
}

@Override
public TrinoInputFile newInputFile(Location location)
{
ensureNotRootLocation(location);
ensureNotEndWithSlash(location);
return new AlluxioFileSystemInputFile(location, null, fileSystem, mountRoot, Optional.empty());
return new AlluxioFileSystemInputFile(location, null, alluxioClient, mountRoot, Optional.empty());
}

@Override
public TrinoInputFile newInputFile(Location location, long length)
{
ensureNotRootLocation(location);
ensureNotEndWithSlash(location);
return new AlluxioFileSystemInputFile(location, length, fileSystem, mountRoot, Optional.empty());
return new AlluxioFileSystemInputFile(location, length, alluxioClient, mountRoot, Optional.empty());
}

@Override
public TrinoInputFile newInputFile(Location location, long length, Instant lastModified)
{
ensureNotRootLocation(location);
ensureNotEndWithSlash(location);
return new AlluxioFileSystemInputFile(location, length, fileSystem, mountRoot, Optional.of(lastModified));
return new AlluxioFileSystemInputFile(location, length, alluxioClient, mountRoot, Optional.of(lastModified));
}

@Override
public TrinoOutputFile newOutputFile(Location location)
{
ensureNotRootLocation(location);
ensureNotEndWithSlash(location);
return new AlluxioFileSystemOutputFile(rootLocation, location, fileSystem, mountRoot);
return new AlluxioFileSystemOutputFile(rootLocation, location, alluxioClient, mountRoot);
}

@Override
Expand All @@ -109,9 +104,9 @@ public void deleteFile(Location location)
ensureNotRootLocation(location);
ensureNotEndWithSlash(location);
try {
fileSystem.delete(convertToAlluxioURI(location, mountRoot));
alluxioClient.delete(convertToAlluxioURI(location, mountRoot));
}
catch (FileDoesNotExistException e) {
catch (FileDoesNotExistException _) {
}
catch (AlluxioException e) {
throw new IOException("Error deleteFile %s".formatted(location), e);
Expand All @@ -124,7 +119,7 @@ public void deleteDirectory(Location location)
{
try {
AlluxioURI uri = convertToAlluxioURI(location, mountRoot);
URIStatus status = fileSystem.getStatus(uri);
URIStatus status = alluxioClient.getStatus(uri);
if (status == null) {
return;
}
Expand All @@ -134,12 +129,12 @@ public void deleteDirectory(Location location)
DeletePOptions deletePOptions = DeletePOptions.newBuilder().setRecursive(true).build();
// recursive delete on the root directory must be handled manually
if (location.path().isEmpty() || location.path().equals(mountRoot)) {
for (URIStatus uriStatus : fileSystem.listStatus(uri)) {
fileSystem.delete(new AlluxioURI(uriStatus.getPath()), deletePOptions);
for (URIStatus uriStatus : alluxioClient.listStatus(uri)) {
alluxioClient.delete(new AlluxioURI(uriStatus.getPath()), deletePOptions);
}
}
else {
fileSystem.delete(uri, deletePOptions);
alluxioClient.delete(uri, deletePOptions);
}
}
catch (FileDoesNotExistException | NotFoundRuntimeException e) {
Expand All @@ -160,27 +155,27 @@ public void renameFile(Location source, Location target)
ensureNotEndWithSlash(target);
}
catch (IllegalStateException e) {
throw new IllegalStateException("Cannot rename file from %s to %s as one of them is root location"
.formatted(source, target), e);
throw new IllegalStateException(
"Cannot rename file from %s to %s as one of them is root location".formatted(source, target), e);
}
AlluxioURI sourceUri = convertToAlluxioURI(source, mountRoot);
AlluxioURI targetUri = convertToAlluxioURI(target, mountRoot);

try {
if (!fileSystem.exists(sourceUri)) {
throw new IOException("Cannot rename file %s to %s as file %s doesn't exist"
.formatted(source, target, source));
if (!alluxioClient.exists(sourceUri)) {
throw new IOException(
"Cannot rename file %s to %s as file %s doesn't exist".formatted(source, target, source));
}
if (fileSystem.exists(targetUri)) {
throw new IOException("Cannot rename file %s to %s as file %s already exists"
.formatted(source, target, target));
if (alluxioClient.exists(targetUri)) {
throw new IOException(
"Cannot rename file %s to %s as file %s already exists".formatted(source, target, target));
}
URIStatus status = fileSystem.getStatus(sourceUri);
URIStatus status = alluxioClient.getStatus(sourceUri);
if (status.isFolder()) {
throw new IOException("Cannot rename file %s to %s as %s is a directory"
.formatted(source, target, source));
throw new IOException(
"Cannot rename file %s to %s as %s is a directory".formatted(source, target, source));
}
fileSystem.rename(convertToAlluxioURI(source, mountRoot), convertToAlluxioURI(target, mountRoot));
alluxioClient.rename(convertToAlluxioURI(source, mountRoot), convertToAlluxioURI(target, mountRoot));
}
catch (AlluxioException e) {
throw new IOException("Error renameFile from %s to %s".formatted(source, target), e);
Expand All @@ -192,7 +187,7 @@ public FileIterator listFiles(Location location)
throws IOException
{
try {
URIStatus status = fileSystem.getStatus(convertToAlluxioURI(location, mountRoot));
URIStatus status = alluxioClient.getStatus(convertToAlluxioURI(location, mountRoot));
if (status == null) {
new AlluxioFileIterator(Collections.emptyList(), mountRoot);
}
Expand All @@ -205,7 +200,7 @@ public FileIterator listFiles(Location location)
}

try {
List<URIStatus> filesStatus = fileSystem.listStatus(convertToAlluxioURI(location, mountRoot),
List<URIStatus> filesStatus = alluxioClient.listStatus(convertToAlluxioURI(location, mountRoot),
ListStatusPOptions.newBuilder().setRecursive(true).build());
return new AlluxioFileIterator(filesStatus.stream().filter(status -> !status.isFolder() & status.isCompleted()).toList(), mountRoot);
}
Expand All @@ -222,7 +217,7 @@ public Optional<Boolean> directoryExists(Location location)
return Optional.of(true);
}
try {
URIStatus status = fileSystem.getStatus(convertToAlluxioURI(location, mountRoot));
URIStatus status = alluxioClient.getStatus(convertToAlluxioURI(location, mountRoot));
if (status != null && status.isFolder()) {
return Optional.of(true);
}
Expand All @@ -242,14 +237,14 @@ public void createDirectory(Location location)
{
try {
AlluxioURI locationUri = convertToAlluxioURI(location, mountRoot);
if (fileSystem.exists(locationUri)) {
URIStatus status = fileSystem.getStatus(locationUri);
if (alluxioClient.exists(locationUri)) {
URIStatus status = alluxioClient.getStatus(locationUri);
if (!status.isFolder()) {
throw new IOException("Cannot create a directory for an existing file location %s"
.formatted(location));
throw new IOException(
"Cannot create a directory for an existing file location %s".formatted(location));
}
}
fileSystem.createDirectory(
alluxioClient.createDirectory(
locationUri,
CreateDirectoryPOptions.newBuilder()
.setAllowExists(true)
Expand All @@ -270,16 +265,15 @@ public void renameDirectory(Location source, Location target)
ensureNotRootLocation(target);
}
catch (IllegalStateException e) {
throw new IOException("Cannot rename directory from %s to %s as one of them is root location"
.formatted(source, target), e);
throw new IOException(
"Cannot rename directory from %s to %s as one of them is root location".formatted(source, target), e);
}

try {
if (fileSystem.exists(convertToAlluxioURI(target, mountRoot))) {
throw new IOException("Cannot rename %s to %s as file %s already exists"
.formatted(source, target, target));
if (alluxioClient.exists(convertToAlluxioURI(target, mountRoot))) {
throw new IOException(
"Cannot rename %s to %s as file %s already exists".formatted(source, target, target));
}
fileSystem.rename(convertToAlluxioURI(source, mountRoot), convertToAlluxioURI(target, mountRoot));
alluxioClient.rename(convertToAlluxioURI(source, mountRoot), convertToAlluxioURI(target, mountRoot));
}
catch (AlluxioException e) {
throw new IOException("Error renameDirectory from %s to %s".formatted(source, target), e);
Expand All @@ -294,7 +288,7 @@ public Set<Location> listDirectories(Location location)
if (isFile(location)) {
throw new IOException("Cannot list directories for a file %s".formatted(location));
}
List<URIStatus> filesStatus = fileSystem.listStatus(convertToAlluxioURI(location, mountRoot));
List<URIStatus> filesStatus = alluxioClient.listStatus(convertToAlluxioURI(location, mountRoot));
return filesStatus.stream()
.filter(URIStatus::isFolder)
.map((URIStatus fileStatus) -> AlluxioUtils.convertToLocation(fileStatus, mountRoot))
Expand Down Expand Up @@ -367,7 +361,7 @@ private void ensureNotEndWithSlash(Location location)
private boolean isFile(Location location)
{
try {
URIStatus status = fileSystem.getStatus(convertToAlluxioURI(location, mountRoot));
URIStatus status = alluxioClient.getStatus(convertToAlluxioURI(location, mountRoot));
if (status == null) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,24 @@
public class AlluxioFileSystemFactory
implements TrinoFileSystemFactory
{
private AlluxioConfiguration conf = Configuration.global();
private final AlluxioConfiguration conf;

@Inject
public AlluxioFileSystemFactory()
{
this(Configuration.global());
}

public void setConf(AlluxioConfiguration conf)
public AlluxioFileSystemFactory(AlluxioConfiguration conf)
{
this.conf = conf;
}

public AlluxioConfiguration getConf()
{
return conf;
}

@Override
public TrinoFileSystem create(ConnectorIdentity identity)
{
FileSystemContext fsContext = FileSystemContext.create(conf);
FileSystem fileSystem = FileSystem.Factory.create(fsContext);
return new AlluxioFileSystem(fileSystem);
FileSystem alluxioClient = FileSystem.Factory.create(fsContext);
return new AlluxioFileSystem(alluxioClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public class AlluxioFileSystemInput
implements TrinoInput
{
private final FileInStream stream;

private final TrinoInputFile inputFile;

private volatile boolean closed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,11 @@ public class AlluxioFileSystemInputFile
implements TrinoInputFile
{
private final Location location;

private final FileSystem fileSystem;

private final String mountRoot;
private Optional<Instant> lastModified;

private Optional<Instant> lastModified;
private Long length;

private URIStatus status;

public AlluxioFileSystemInputFile(Location location, Long length, FileSystem fileSystem, String mountRoot, Optional<Instant> lastModified)
Expand Down Expand Up @@ -73,7 +70,7 @@ public TrinoInputStream newStream()
throws IOException
{
try {
return new AlluxioTrinoInputStream(location, openFile(), getStatus());
return new AlluxioTrinoInputStream(location, openFile(), getURIStatus());
}
catch (AlluxioException e) {
throw new IOException("Error newStream() file: %s".formatted(location), e);
Expand All @@ -86,30 +83,28 @@ private FileInStream openFile()
if (!exists()) {
throw new FileNotFoundException("File does not exist: " + location);
}
return fileSystem.openFile(getStatus(), OpenFilePOptions.getDefaultInstance());
return fileSystem.openFile(getURIStatus(), OpenFilePOptions.getDefaultInstance());
}

private URIStatus getStatus(boolean lazy)
private void loadFileStatus()
throws IOException
{
if (lazy) {
if (status == null) {
getStatus();
}
return status;
if (status == null) {
URIStatus fileStatus = getURIStatus();
length = fileStatus.getLength();
lastModified = Optional.of(Instant.ofEpochMilli(fileStatus.getLastModificationTimeMs()));
}
return getStatus();
}

private URIStatus getStatus()
private URIStatus getURIStatus()
throws IOException
{
try {
//TODO: create a URIStatus object based on the location field
status = fileSystem.getStatus(convertToAlluxioURI(location, mountRoot));
}
catch (FileDoesNotExistException | NotFoundRuntimeException e) {
return null;
throw new FileNotFoundException("File does not exist: %s".formatted(location));
}
catch (AlluxioException | IOException e) {
throw new IOException("Get status for file %s failed: %s".formatted(location, e.getMessage()), e);
Expand All @@ -122,7 +117,7 @@ public long length()
throws IOException
{
if (length == null) {
URIStatus status = getStatus(true);
loadFileStatus();
if (status == null) {
throw new FileNotFoundException("File does not exist: %s".formatted(location));
}
Expand All @@ -136,7 +131,7 @@ public Instant lastModified()
throws IOException
{
if (lastModified.isEmpty()) {
URIStatus status = getStatus(true);
loadFileStatus();
if (status == null) {
throw new FileNotFoundException("File does not exist: %s".formatted(location));
}
Expand All @@ -149,7 +144,7 @@ public Instant lastModified()
public boolean exists()
throws IOException
{
URIStatus status = getStatus();
URIStatus status = getURIStatus();
if (status == null || !status.isCompleted()) {
return false;
}
Expand Down
Loading

0 comments on commit 379d776

Please sign in to comment.