Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Feb 26, 2025
1 parent 8cb315b commit c3407a2
Show file tree
Hide file tree
Showing 17 changed files with 717 additions and 332 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@
*/
package io.delta.kernel.defaults.engine;

import io.delta.kernel.defaults.engine.hadoopio.HadoopFileIO;
import io.delta.kernel.defaults.engine.io.FileIO;
import io.delta.kernel.engine.*;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;

/** Default implementation of {@link Engine} based on Hadoop APIs. */
public class DefaultEngine implements Engine {
private final Configuration hadoopConf;
private final FileIO fileIO;

protected DefaultEngine(Configuration hadoopConf) {
this.hadoopConf = hadoopConf;
protected DefaultEngine(FileIO fileIO) {
this.fileIO = fileIO;
}

@Override
Expand All @@ -35,17 +37,17 @@ public ExpressionHandler getExpressionHandler() {

@Override
public JsonHandler getJsonHandler() {
return new DefaultJsonHandler(hadoopConf);
return new DefaultJsonHandler(fileIO);
}

@Override
public FileSystemClient getFileSystemClient() {
return new DefaultFileSystemClient(hadoopConf);
return new DefaultFileSystemClient(fileIO);
}

@Override
public ParquetHandler getParquetHandler() {
return new DefaultParquetHandler(hadoopConf);
return new DefaultParquetHandler(fileIO);
}

@Override
Expand All @@ -60,6 +62,17 @@ public List<MetricsReporter> getMetricsReporters() {
* @return an instance of {@link DefaultEngine}.
*/
public static DefaultEngine create(Configuration hadoopConf) {
return new DefaultEngine(hadoopConf);
return new DefaultEngine(new HadoopFileIO(hadoopConf));
}

/**
* Create an instance of {@link DefaultEngine}. It takes {@link FileIO} as an argument which is
* used for I/O related operations.
*
* @param fileIO File IO implementation to use for reading and writing files.
* @return an instance of {@link DefaultEngine}.
*/
public static DefaultEngine create(FileIO fileIO) {
return new DefaultEngine(fileIO);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,15 @@
package io.delta.kernel.defaults.engine;

import io.delta.kernel.defaults.engine.io.FileIO;
import io.delta.kernel.defaults.internal.logstore.LogStoreProvider;
import io.delta.kernel.engine.FileReadRequest;
import io.delta.kernel.engine.FileSystemClient;
import io.delta.kernel.internal.util.Utils;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
import io.delta.storage.LogStore;
import java.io.*;
import java.util.Objects;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/**
* Default implementation of {@link FileSystemClient} based on Hadoop APIs. It takes a Hadoop {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.delta.kernel.data.*;
import io.delta.kernel.defaults.engine.io.FileIO;
import io.delta.kernel.defaults.engine.io.SeekableInputStream;
import io.delta.kernel.defaults.internal.data.DefaultJsonRow;
import io.delta.kernel.defaults.internal.data.DefaultRowBasedColumnarBatch;
import io.delta.kernel.defaults.internal.json.JsonUtils;
import io.delta.kernel.defaults.internal.logstore.LogStoreProvider;
import io.delta.kernel.engine.JsonHandler;
import io.delta.kernel.exceptions.KernelEngineException;
import io.delta.kernel.expressions.Predicate;
Expand All @@ -38,8 +38,6 @@
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;

/** Default implementation of {@link JsonHandler} based on Hadoop APIs. */
public class DefaultJsonHandler implements JsonHandler {
Expand All @@ -53,8 +51,10 @@ public class DefaultJsonHandler implements JsonHandler {

public DefaultJsonHandler(FileIO fileIO) {
this.fileIO = fileIO;
this.maxBatchSize = fileIO
.getConf("delta.kernel.default.json.reader.batch-size").map(Integer::valueOf)
this.maxBatchSize =
fileIO
.getConf("delta.kernel.default.json.reader.batch-size")
.map(Integer::valueOf)
.orElse(1024);
checkArgument(maxBatchSize > 0, "invalid JSON reader batch size: %d", maxBatchSize);
}
Expand Down Expand Up @@ -142,9 +142,9 @@ private boolean tryOpenNextFile() throws IOException {

if (scanFileIter.hasNext()) {
currentFile = scanFileIter.next();
InputStream stream = null;
SeekableInputStream stream = null;
try {
stream = fileIO.open(currentFile.getPath());
stream = fileIO.newInputFile(currentFile.getPath()).newStream();
currentFileReader =
new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
} catch (Exception e) {
Expand All @@ -168,28 +168,7 @@ private boolean tryOpenNextFile() throws IOException {
@Override
public void writeJsonFileAtomically(
String filePath, CloseableIterator<Row> data, boolean overwrite) throws IOException {
Path path = new Path(filePath);
fileIO.
LogStore logStore = LogStoreProvider.getLogStore(hadoopConf, path.toUri().getScheme());
try {
logStore.write(
path,
new Iterator<String>() {
@Override
public boolean hasNext() {
return data.hasNext();
}

@Override
public String next() {
return JsonUtils.rowToJson(data.next());
}
},
overwrite,
hadoopConf);
} finally {
Utils.closeCloseables(data);
}
fileIO.newOutputFile(filePath).writeAtomically(data.map(JsonUtils::rowToJson), overwrite);
}

private Row parseJson(String json, StructType readSchema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,35 @@
*/
package io.delta.kernel.defaults.engine;

import static io.delta.kernel.internal.util.Preconditions.checkState;
import static java.lang.String.format;

import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.defaults.internal.logstore.LogStoreProvider;
import io.delta.kernel.defaults.engine.io.FileIO;
import io.delta.kernel.defaults.engine.io.PositionOutputStream;
import io.delta.kernel.defaults.internal.parquet.ParquetFileReader;
import io.delta.kernel.defaults.internal.parquet.ParquetFileWriter;
import io.delta.kernel.engine.ParquetHandler;
import io.delta.kernel.expressions.Column;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.internal.util.InternalUtils;
import io.delta.kernel.internal.util.Utils;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.*;
import io.delta.kernel.utils.FileStatus;
import io.delta.storage.LogStore;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;

/** Default implementation of {@link ParquetHandler} based on Hadoop APIs. */
public class DefaultParquetHandler implements ParquetHandler {
private final Configuration hadoopConf;
private final FileIO fileIO;

/**
* Create an instance of default {@link ParquetHandler} implementation.
*
* @param hadoopConf Hadoop configuration to use.
* @param fileIO File IO implementation to use for reading and writing files.
*/
public DefaultParquetHandler(Configuration hadoopConf) {
this.hadoopConf = hadoopConf;
public DefaultParquetHandler(FileIO fileIO) {
this.fileIO = Objects.requireNonNull(fileIO, "fileIO is null");
}

@Override
Expand All @@ -58,7 +53,7 @@ public CloseableIterator<ColumnarBatch> readParquetFiles(
Optional<Predicate> predicate)
throws IOException {
return new CloseableIterator<ColumnarBatch>() {
private final ParquetFileReader batchReader = new ParquetFileReader(hadoopConf);
private final ParquetFileReader batchReader = new ParquetFileReader(fileIO);
private CloseableIterator<ColumnarBatch> currentFileReader;

@Override
Expand Down Expand Up @@ -115,50 +110,12 @@ public CloseableIterator<DataFileStatus> writeParquetFiles(
@Override
public void writeParquetFileAtomically(
String filePath, CloseableIterator<FilteredColumnarBatch> data) throws IOException {
try {
Path targetPath = new Path(filePath);
LogStore logStore = LogStoreProvider.getLogStore(hadoopConf, targetPath.toUri().getScheme());

boolean useRename = logStore.isPartialWriteVisible(targetPath, hadoopConf);

Path writePath = targetPath;
if (useRename) {
// In order to atomically write the file, write to a temp file and rename
// to target path
String tempFileName = format(".%s.%s.tmp", targetPath.getName(), UUID.randomUUID());
writePath = new Path(targetPath.getParent(), tempFileName);
}
PositionOutputStream outputStream = fileIO.newOutputFile(filePath).create(true);
try {
ParquetFileWriter fileWriter = new ParquetFileWriter(hadoopConf, writePath);

Optional<DataFileStatus> writtenFile;

try (CloseableIterator<DataFileStatus> statuses = fileWriter.write(data)) {
writtenFile = InternalUtils.getSingularElement(statuses);
} catch (UncheckedIOException uio) {
throw uio.getCause();
}

checkState(writtenFile.isPresent(), "expected to write one output file");
if (useRename) {
FileSystem fs = targetPath.getFileSystem(hadoopConf);
boolean renameDone = false;
try {
renameDone = fs.rename(writePath, targetPath);
if (!renameDone) {
if (fs.exists(targetPath)) {
throw new java.nio.file.FileAlreadyExistsException(
"target file already exists: " + targetPath);
}
throw new IOException("Failed to rename the file");
}
} finally {
if (!renameDone) {
fs.delete(writePath, false /* recursive */);
}
}
}
} finally {
Utils.closeCloseables(data);
Utils.closeCloseables(outputStream, data);
}
}
}
Loading

0 comments on commit c3407a2

Please sign in to comment.