diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java index bc6a7a24756..94e16521283 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java @@ -15,6 +15,8 @@ */ package io.delta.kernel.defaults.engine; +import io.delta.kernel.defaults.engine.hadoopio.HadoopFileIO; +import io.delta.kernel.defaults.engine.io.FileIO; import io.delta.kernel.engine.*; import java.util.Collections; import java.util.List; @@ -22,10 +24,10 @@ /** Default implementation of {@link Engine} based on Hadoop APIs. */ public class DefaultEngine implements Engine { - private final Configuration hadoopConf; + private final FileIO fileIO; - protected DefaultEngine(Configuration hadoopConf) { - this.hadoopConf = hadoopConf; + protected DefaultEngine(FileIO fileIO) { + this.fileIO = fileIO; } @Override @@ -35,17 +37,17 @@ public ExpressionHandler getExpressionHandler() { @Override public JsonHandler getJsonHandler() { - return new DefaultJsonHandler(hadoopConf); + return new DefaultJsonHandler(fileIO); } @Override public FileSystemClient getFileSystemClient() { - return new DefaultFileSystemClient(hadoopConf); + return new DefaultFileSystemClient(fileIO); } @Override public ParquetHandler getParquetHandler() { - return new DefaultParquetHandler(hadoopConf); + return new DefaultParquetHandler(fileIO); } @Override @@ -60,6 +62,17 @@ public List getMetricsReporters() { * @return an instance of {@link DefaultEngine}. */ public static DefaultEngine create(Configuration hadoopConf) { - return new DefaultEngine(hadoopConf); + return new DefaultEngine(new HadoopFileIO(hadoopConf)); + } + + /** + * Create an instance of {@link DefaultEngine}. It takes {@link FileIO} as an argument which is + * used for I/O related operations. + * + * @param fileIO File IO implementation to use for reading and writing files. + * @return an instance of {@link DefaultEngine}. + */ + public static DefaultEngine create(FileIO fileIO) { + return new DefaultEngine(fileIO); } } diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultFileSystemClient.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultFileSystemClient.java index 211264c1084..159a0927ad0 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultFileSystemClient.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultFileSystemClient.java @@ -16,19 +16,15 @@ package io.delta.kernel.defaults.engine; import io.delta.kernel.defaults.engine.io.FileIO; -import io.delta.kernel.defaults.internal.logstore.LogStoreProvider; import io.delta.kernel.engine.FileReadRequest; import io.delta.kernel.engine.FileSystemClient; -import io.delta.kernel.internal.util.Utils; import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.FileStatus; import io.delta.storage.LogStore; import java.io.*; import java.util.Objects; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; /** * Default implementation of {@link FileSystemClient} based on Hadoop APIs. It takes a Hadoop {@link diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultJsonHandler.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultJsonHandler.java index 9574afdcc27..f45f45ffa89 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultJsonHandler.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultJsonHandler.java @@ -23,10 +23,10 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import io.delta.kernel.data.*; import io.delta.kernel.defaults.engine.io.FileIO; +import io.delta.kernel.defaults.engine.io.SeekableInputStream; import io.delta.kernel.defaults.internal.data.DefaultJsonRow; import io.delta.kernel.defaults.internal.data.DefaultRowBasedColumnarBatch; import io.delta.kernel.defaults.internal.json.JsonUtils; -import io.delta.kernel.defaults.internal.logstore.LogStoreProvider; import io.delta.kernel.engine.JsonHandler; import io.delta.kernel.exceptions.KernelEngineException; import io.delta.kernel.expressions.Predicate; @@ -38,8 +38,6 @@ import java.io.*; import java.nio.charset.StandardCharsets; import java.util.*; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; /** Default implementation of {@link JsonHandler} based on Hadoop APIs. */ public class DefaultJsonHandler implements JsonHandler { @@ -53,8 +51,10 @@ public class DefaultJsonHandler implements JsonHandler { public DefaultJsonHandler(FileIO fileIO) { this.fileIO = fileIO; - this.maxBatchSize = fileIO - .getConf("delta.kernel.default.json.reader.batch-size").map(Integer::valueOf) + this.maxBatchSize = + fileIO + .getConf("delta.kernel.default.json.reader.batch-size") + .map(Integer::valueOf) .orElse(1024); checkArgument(maxBatchSize > 0, "invalid JSON reader batch size: %d", maxBatchSize); } @@ -142,9 +142,9 @@ private boolean tryOpenNextFile() throws IOException { if (scanFileIter.hasNext()) { currentFile = scanFileIter.next(); - InputStream stream = null; + SeekableInputStream stream = null; try { - stream = fileIO.open(currentFile.getPath()); + stream = fileIO.newInputFile(currentFile.getPath()).newStream(); currentFileReader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8)); } catch (Exception e) { @@ -168,28 +168,7 @@ private boolean tryOpenNextFile() throws IOException { @Override public void writeJsonFileAtomically( String filePath, CloseableIterator data, boolean overwrite) throws IOException { - Path path = new Path(filePath); - fileIO. - LogStore logStore = LogStoreProvider.getLogStore(hadoopConf, path.toUri().getScheme()); - try { - logStore.write( - path, - new Iterator() { - @Override - public boolean hasNext() { - return data.hasNext(); - } - - @Override - public String next() { - return JsonUtils.rowToJson(data.next()); - } - }, - overwrite, - hadoopConf); - } finally { - Utils.closeCloseables(data); - } + fileIO.newOutputFile(filePath).writeAtomically(data.map(JsonUtils::rowToJson), overwrite); } private Row parseJson(String json, StructType readSchema) { diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultParquetHandler.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultParquetHandler.java index c3eaf75ead7..f174147cc9e 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultParquetHandler.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultParquetHandler.java @@ -15,40 +15,35 @@ */ package io.delta.kernel.defaults.engine; -import static io.delta.kernel.internal.util.Preconditions.checkState; -import static java.lang.String.format; - import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.data.FilteredColumnarBatch; -import io.delta.kernel.defaults.internal.logstore.LogStoreProvider; +import io.delta.kernel.defaults.engine.io.FileIO; +import io.delta.kernel.defaults.engine.io.PositionOutputStream; import io.delta.kernel.defaults.internal.parquet.ParquetFileReader; import io.delta.kernel.defaults.internal.parquet.ParquetFileWriter; import io.delta.kernel.engine.ParquetHandler; import io.delta.kernel.expressions.Column; import io.delta.kernel.expressions.Predicate; -import io.delta.kernel.internal.util.InternalUtils; import io.delta.kernel.internal.util.Utils; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.*; import io.delta.kernel.utils.FileStatus; import io.delta.storage.LogStore; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.*; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; /** Default implementation of {@link ParquetHandler} based on Hadoop APIs. */ public class DefaultParquetHandler implements ParquetHandler { - private final Configuration hadoopConf; + private final FileIO fileIO; /** * Create an instance of default {@link ParquetHandler} implementation. * - * @param hadoopConf Hadoop configuration to use. + * @param fileIO File IO implementation to use for reading and writing files. */ - public DefaultParquetHandler(Configuration hadoopConf) { - this.hadoopConf = hadoopConf; + public DefaultParquetHandler(FileIO fileIO) { + this.fileIO = Objects.requireNonNull(fileIO, "fileIO is null"); } @Override @@ -58,7 +53,7 @@ public CloseableIterator readParquetFiles( Optional predicate) throws IOException { return new CloseableIterator() { - private final ParquetFileReader batchReader = new ParquetFileReader(hadoopConf); + private final ParquetFileReader batchReader = new ParquetFileReader(fileIO); private CloseableIterator currentFileReader; @Override @@ -115,50 +110,12 @@ public CloseableIterator writeParquetFiles( @Override public void writeParquetFileAtomically( String filePath, CloseableIterator data) throws IOException { - try { - Path targetPath = new Path(filePath); - LogStore logStore = LogStoreProvider.getLogStore(hadoopConf, targetPath.toUri().getScheme()); - boolean useRename = logStore.isPartialWriteVisible(targetPath, hadoopConf); - - Path writePath = targetPath; - if (useRename) { - // In order to atomically write the file, write to a temp file and rename - // to target path - String tempFileName = format(".%s.%s.tmp", targetPath.getName(), UUID.randomUUID()); - writePath = new Path(targetPath.getParent(), tempFileName); - } + PositionOutputStream outputStream = fileIO.newOutputFile(filePath).create(true); + try { ParquetFileWriter fileWriter = new ParquetFileWriter(hadoopConf, writePath); - - Optional writtenFile; - - try (CloseableIterator statuses = fileWriter.write(data)) { - writtenFile = InternalUtils.getSingularElement(statuses); - } catch (UncheckedIOException uio) { - throw uio.getCause(); - } - - checkState(writtenFile.isPresent(), "expected to write one output file"); - if (useRename) { - FileSystem fs = targetPath.getFileSystem(hadoopConf); - boolean renameDone = false; - try { - renameDone = fs.rename(writePath, targetPath); - if (!renameDone) { - if (fs.exists(targetPath)) { - throw new java.nio.file.FileAlreadyExistsException( - "target file already exists: " + targetPath); - } - throw new IOException("Failed to rename the file"); - } - } finally { - if (!renameDone) { - fs.delete(writePath, false /* recursive */); - } - } - } } finally { - Utils.closeCloseables(data); + Utils.closeCloseables(outputStream, data); } } } diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/hadoopio/HadoopFileIO.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/hadoopio/HadoopFileIO.java index a7b132106ab..c030ac9f7fc 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/hadoopio/HadoopFileIO.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/hadoopio/HadoopFileIO.java @@ -16,128 +16,106 @@ package io.delta.kernel.defaults.engine.hadoopio; import io.delta.kernel.defaults.engine.io.FileIO; -import io.delta.kernel.defaults.engine.io.SeekableInputStream; +import io.delta.kernel.defaults.engine.io.InputFile; +import io.delta.kernel.defaults.engine.io.OutputFile; import io.delta.kernel.defaults.internal.logstore.LogStoreProvider; import io.delta.kernel.engine.FileReadRequest; import io.delta.kernel.internal.util.Utils; import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.FileStatus; import io.delta.storage.LogStore; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; -import java.io.OutputStream; import java.util.Objects; import java.util.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; -/** - * Implementation of {@link FileIO} based on Hadoop APIs. - */ -public class HadoopFileIO - implements FileIO -{ - private final Configuration hadoopConf; +/** Implementation of {@link FileIO} based on Hadoop APIs. */ +public class HadoopFileIO implements FileIO { + private final Configuration hadoopConf; - public HadoopFileIO(Configuration hadoopConf) - { - this.hadoopConf = Objects.requireNonNull(hadoopConf, "hadoopConf is null"); - } + public HadoopFileIO(Configuration hadoopConf) { + this.hadoopConf = Objects.requireNonNull(hadoopConf, "hadoopConf is null"); + } - @Override - public CloseableIterator listFrom(String filePath) - throws IOException - { - Path path = new Path(filePath); - LogStore logStore = LogStoreProvider.getLogStore(hadoopConf, path.toUri().getScheme()); + @Override + public CloseableIterator listFrom(String filePath) throws IOException { + Path path = new Path(filePath); + LogStore logStore = LogStoreProvider.getLogStore(hadoopConf, path.toUri().getScheme()); - return Utils.toCloseableIterator(logStore.listFrom(path, hadoopConf)) - .map( - hadoopFileStatus -> - FileStatus.of( - hadoopFileStatus.getPath().toString(), - hadoopFileStatus.getLen(), - hadoopFileStatus.getModificationTime())); - } + return Utils.toCloseableIterator(logStore.listFrom(path, hadoopConf)) + .map( + hadoopFileStatus -> + FileStatus.of( + hadoopFileStatus.getPath().toString(), + hadoopFileStatus.getLen(), + hadoopFileStatus.getModificationTime())); + } - @Override - public String resolvePath(String path) - throws IOException - { - Path pathObject = new Path(path); - FileSystem fs = pathObject.getFileSystem(hadoopConf); - return fs.makeQualified(pathObject).toString(); - } + @Override + public String resolvePath(String path) throws IOException { + Path pathObject = new Path(path); + FileSystem fs = pathObject.getFileSystem(hadoopConf); + return fs.makeQualified(pathObject).toString(); + } - @Override - public CloseableIterator readFiles(CloseableIterator readRequests) - throws IOException - { - return readRequests.map( - elem -> getStream(elem.getPath(), elem.getStartOffset(), elem.getReadLength())); - } + @Override + public CloseableIterator readFiles( + CloseableIterator readRequests) throws IOException { + return readRequests.map( + elem -> getStream(elem.getPath(), elem.getStartOffset(), elem.getReadLength())); + } - @Override - public boolean mkdirs(String path) - throws IOException - { - Path pathObject = new Path(path); - FileSystem fs = pathObject.getFileSystem(hadoopConf); - return fs.mkdirs(pathObject); - } + @Override + public boolean mkdirs(String path) throws IOException { + Path pathObject = new Path(path); + FileSystem fs = pathObject.getFileSystem(hadoopConf); + return fs.mkdirs(pathObject); + } - @Override - public SeekableInputStream open(String path) - throws IOException - { - Path pathObj = new Path(path); - FileSystem fs = pathObj.getFileSystem(hadoopConf); - return fs.open(pathObj); - } + @Override + public InputFile newInputFile(String path) throws IOException { + return new HadoopInputFile(FileSystem.get(hadoopConf), new Path(path)); + } - @Override - public OutputStream create(String path, boolean putIfAbsent) - throws IOException - { - return null; - } + @Override + public OutputFile newOutputFile(String path) throws IOException { + return new HadoopOutputFile(hadoopConf, path); + } - @Override - public boolean delete(String path) - throws IOException - { - Path pathObject = new Path(path); - FileSystem fs = pathObject.getFileSystem(hadoopConf); - return fs.delete(pathObject, false); - } + @Override + public boolean delete(String path) throws IOException { + Path pathObject = new Path(path); + FileSystem fs = pathObject.getFileSystem(hadoopConf); + return fs.delete(pathObject, false); + } - @Override - public Optional getConf(String confKey) - { - return Optional.ofNullable(hadoopConf.get(confKey)); - } + @Override + public Optional getConf(String confKey) { + return Optional.ofNullable(hadoopConf.get(confKey)); + } - private ByteArrayInputStream getStream(String filePath, int offset, int size) { - Path path = new Path(filePath); - try { - FileSystem fs = path.getFileSystem(hadoopConf); - try (DataInputStream stream = fs.open(path)) { - stream.skipBytes(offset); - byte[] buff = new byte[size]; - stream.readFully(buff); - return new ByteArrayInputStream(buff); - } catch (IOException ex) { - throw new RuntimeException( - String.format( - "IOException reading from file %s at offset %s size %s", filePath, offset, size), - ex); - } - } catch (IOException ex) { - throw new RuntimeException( - String.format("Could not resolve the FileSystem for path %s", filePath), ex); - } + private ByteArrayInputStream getStream(String filePath, int offset, int size) { + Path path = new Path(filePath); + try { + FileSystem fs = path.getFileSystem(hadoopConf); + try (DataInputStream stream = fs.open(path)) { + stream.skipBytes(offset); + byte[] buff = new byte[size]; + stream.readFully(buff); + return new ByteArrayInputStream(buff); + } catch (IOException ex) { + throw new RuntimeException( + String.format( + "IOException reading from file %s at offset %s size %s", filePath, offset, size), + ex); + } + } catch (IOException ex) { + throw new RuntimeException( + String.format("Could not resolve the FileSystem for path %s", filePath), ex); } + } } diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/hadoopio/HadoopInputFile.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/hadoopio/HadoopInputFile.java new file mode 100644 index 00000000000..7d54aa8ede1 --- /dev/null +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/hadoopio/HadoopInputFile.java @@ -0,0 +1,47 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.engine.hadoopio; + +import io.delta.kernel.defaults.engine.io.InputFile; +import io.delta.kernel.defaults.engine.io.SeekableInputStream; +import java.io.IOException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class HadoopInputFile implements InputFile { + private final FileSystem fs; + private final Path path; + + public HadoopInputFile(FileSystem fs, Path path) { + this.fs = fs; + this.path = path; + } + + @Override + public long length() throws IOException { + return fs.getFileStatus(path).getLen(); + } + + @Override + public String path() { + return path.toString(); + } + + @Override + public SeekableInputStream newStream() throws IOException { + return new HadoopSeekableInputStream(fs.open(path)); + } +} diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/hadoopio/HadoopOutputFile.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/hadoopio/HadoopOutputFile.java new file mode 100644 index 00000000000..58b36510bf3 --- /dev/null +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/hadoopio/HadoopOutputFile.java @@ -0,0 +1,104 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.engine.hadoopio; + +import static java.lang.String.format; + +import io.delta.kernel.defaults.engine.io.OutputFile; +import io.delta.kernel.defaults.engine.io.PositionOutputStream; +import io.delta.kernel.defaults.internal.logstore.LogStoreProvider; +import io.delta.kernel.internal.util.Utils; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.storage.LogStore; +import java.io.IOException; +import java.util.Objects; +import java.util.UUID; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class HadoopOutputFile implements OutputFile { + private final Configuration hadoopConf; + private final String path; + + public HadoopOutputFile(Configuration hadoopConf, String path) { + this.hadoopConf = Objects.requireNonNull(hadoopConf, "fs is null"); + this.path = Objects.requireNonNull(path, "path is null"); + } + + @Override + public String path() { + return path; + } + + @Override + public PositionOutputStream create(boolean putIfAbsent) throws IOException { + Path targetPath = new Path(path); + FileSystem fs = targetPath.getFileSystem(hadoopConf); + if (!putIfAbsent) { + return new HadoopPositionOutputStream(fs.create(targetPath)); + } + LogStore logStore = LogStoreProvider.getLogStore(hadoopConf, targetPath.toUri().getScheme()); + + boolean useRename = logStore.isPartialWriteVisible(targetPath, hadoopConf); + + final Path writePath; + if (useRename) { + // In order to atomically write the file, write to a temp file and rename + // to target path + String tempFileName = format(".%s.%s.tmp", targetPath.getName(), UUID.randomUUID()); + writePath = new Path(targetPath.getParent(), tempFileName); + } else { + writePath = targetPath; + } + + return new HadoopPositionOutputStream(fs.create(writePath)) { + @Override + public void close() throws IOException { + super.close(); + if (useRename) { + boolean renameDone = false; + try { + renameDone = fs.rename(writePath, targetPath); + if (!renameDone) { + if (fs.exists(targetPath)) { + throw new java.nio.file.FileAlreadyExistsException( + "target file already exists: " + targetPath); + } + throw new IOException("Failed to rename the file"); + } + } finally { + if (!renameDone) { + fs.delete(writePath, false /* recursive */); + } + } + } + } + }; + } + + @Override + public void writeAtomically(CloseableIterator data, boolean overwrite) + throws IOException { + Path pathObj = new Path(path); + try { + LogStore logStore = LogStoreProvider.getLogStore(hadoopConf, pathObj.toUri().getScheme()); + logStore.write(pathObj, data, overwrite, hadoopConf); + } finally { + Utils.closeCloseables(data); + } + } +} diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/hadoopio/HadoopPositionOutputStream.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/hadoopio/HadoopPositionOutputStream.java new file mode 100644 index 00000000000..234bb524158 --- /dev/null +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/hadoopio/HadoopPositionOutputStream.java @@ -0,0 +1,58 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.engine.hadoopio; + +import io.delta.kernel.defaults.engine.io.PositionOutputStream; +import java.io.IOException; +import org.apache.hadoop.fs.FSDataOutputStream; + +public class HadoopPositionOutputStream extends PositionOutputStream { + private final FSDataOutputStream delegateStream; + + public HadoopPositionOutputStream(FSDataOutputStream delegateStream) { + this.delegateStream = delegateStream; + } + + @Override + public void write(int b) throws IOException { + delegateStream.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + delegateStream.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + delegateStream.write(b, off, len); + } + + @Override + public void flush() throws IOException { + delegateStream.flush(); + } + + @Override + public void close() throws IOException { + delegateStream.close(); + } + + @Override + public long getPos() throws IOException { + return delegateStream.getPos(); + } +} diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/hadoopio/HadoopSeekableInputStream.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/hadoopio/HadoopSeekableInputStream.java new file mode 100644 index 00000000000..17e6bc2bb81 --- /dev/null +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/hadoopio/HadoopSeekableInputStream.java @@ -0,0 +1,68 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.engine.hadoopio; + +import io.delta.kernel.defaults.engine.io.SeekableInputStream; +import java.util.Objects; +import org.apache.hadoop.fs.FSDataInputStream; + +public class HadoopSeekableInputStream extends SeekableInputStream { + private final FSDataInputStream delegateStream; + + public HadoopSeekableInputStream(FSDataInputStream delegateStream) { + this.delegateStream = Objects.requireNonNull(delegateStream, "delegateStream is null"); + } + + @Override + public int read() throws java.io.IOException { + return delegateStream.read(); + } + + @Override + public int read(byte[] b) throws java.io.IOException { + return delegateStream.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws java.io.IOException { + return delegateStream.read(b, off, len); + } + + @Override + public long skip(long n) throws java.io.IOException { + return delegateStream.skip(n); + } + + @Override + public int available() throws java.io.IOException { + return delegateStream.available(); + } + + @Override + public void close() throws java.io.IOException { + delegateStream.close(); + } + + @Override + public void seek(long pos) throws java.io.IOException { + delegateStream.seek(pos); + } + + @Override + public long getPos() throws java.io.IOException { + return delegateStream.getPos(); + } +} diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/io/FileIO.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/io/FileIO.java index d4be9755468..ed22848ad99 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/io/FileIO.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/io/FileIO.java @@ -18,95 +18,91 @@ import io.delta.kernel.engine.FileReadRequest; import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.FileStatus; - import java.io.ByteArrayInputStream; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.OutputStream; import java.util.Optional; -public interface FileIO -{ - /** - * List the paths in the same directory that are lexicographically greater or equal to (UTF-8 - * sorting) the given `path`. The result should also be sorted by the file name. - * - * @param filePath Fully qualified path to a file - * @return Closeable iterator of files. It is the responsibility of the caller to close the - * iterator. - * @throws FileNotFoundException if the file at the given path is not found - * @throws IOException for any other IO error. - */ - CloseableIterator listFrom(String filePath) throws IOException; - - /** - * Resolve the given path to a fully qualified path. - * - * @param path Input path - * @return Fully qualified path. - * @throws FileNotFoundException If the given path doesn't exist. - * @throws IOException for any other IO error. - */ - String resolvePath(String path) throws IOException; +public interface FileIO { + /** + * List the paths in the same directory that are lexicographically greater or equal to (UTF-8 + * sorting) the given `path`. The result should also be sorted by the file name. + * + * @param filePath Fully qualified path to a file + * @return Closeable iterator of files. It is the responsibility of the caller to close the + * iterator. + * @throws FileNotFoundException if the file at the given path is not found + * @throws IOException for any other IO error. + */ + CloseableIterator listFrom(String filePath) throws IOException; - /** - * Return an iterator of byte streams one for each read request in {@code readRequests}. The - * returned streams are in the same order as the given {@link FileReadRequest}s. It is the - * responsibility of the caller to close each returned stream. - * - * @param readRequests Iterator of read requests - * @return Data for each request as one {@link ByteArrayInputStream}. - * @throws IOException - */ - CloseableIterator readFiles(CloseableIterator readRequests) - throws IOException; + /** + * Resolve the given path to a fully qualified path. + * + * @param path Input path + * @return Fully qualified path. + * @throws FileNotFoundException If the given path doesn't exist. + * @throws IOException for any other IO error. + */ + String resolvePath(String path) throws IOException; - /** - * Create a directory at the given path including parent directories. This mimics the behavior of - * `mkdir -p` in Unix. - * - * @param path Full qualified path to create a directory at. - * @return true if the directory was created successfully, false otherwise. - * @throws IOException for any IO error. - */ - boolean mkdirs(String path) throws IOException; + /** + * Return an iterator of byte streams one for each read request in {@code readRequests}. The + * returned streams are in the same order as the given {@link FileReadRequest}s. It is the + * responsibility of the caller to close each returned stream. + * + * @param readRequests Iterator of read requests + * @return Data for each request as one {@link ByteArrayInputStream}. + * @throws IOException + */ + CloseableIterator readFiles(CloseableIterator readRequests) + throws IOException; - /** - * Open a {@link SeekableInputStream} at the given path which can be used to read - * the file from any arbitrary position in the file. - * - * @param path Fully qualified path to the file. - * @return {@link SeekableInputStream} instance. - * @throws IOException for any IO error. - */ - SeekableInputStream open(String path) throws IOException; + /** + * Create a directory at the given path including parent directories. This mimics the behavior of + * `mkdir -p` in Unix. + * + * @param path Full qualified path to create a directory at. + * @return true if the directory was created successfully, false otherwise. + * @throws IOException for any IO error. + */ + boolean mkdirs(String path) throws IOException; - /** - * Create a new file at the given path. If the file already exists , either at - * the time of creating the {@link OutputStream} or at the time of closing it, - * and {@code putIfAbsent} is true, then the file will not be created and an - * {@link java.nio.file.FileAlreadyExistsException} will be thrown. If - * - * @param path Fully qualified path to the file. - * @return {@link OutputStream} instance to write to the file. - * @throws IOException for any IO error. - */ - OutputStream create(String path, boolean putIfAbsent) throws IOException; + /** + * Get an {@link InputFile} for file at given path which can be used to read the file from any + * arbitrary position in the file. + * + * @param path Fully qualified path to the file. + * @return {@link InputFile} instance. + * @throws IOException for any IO error. + */ + InputFile newInputFile(String path) throws IOException; + /** + * Create a {@link OutputFile} to write new file at the given path. + * + * @param path Fully qualified path to the file. + * @return {@link OutputFile} instance which can be used to write to the file. + * @throws IOException for any IO error. + */ + OutputFile newOutputFile(String path) throws IOException; - /** - * Delete the file at given path. - * - * @param path the path to delete. If path is a directory throws an exception. - * @return true if delete is successful else false. - * @throws IOException for any IO error. - */ - boolean delete(String path) throws IOException; + /** + * Delete the file at given path. + * + * @param path the path to delete. If path is a directory throws an exception. + * @return true if delete is successful else false. + * @throws IOException for any IO error. + */ + boolean delete(String path) throws IOException; - /** - * Get the configuration value for the given key. - * @param confKey configuration key name - * @return If no such value is present, an {@link Optional#empty()} is returned. - */ - Optional getConf(String confKey); + /** + * Get the configuration value for the given key. + * + *

TODO: should be in a separate interface? may be called ConfigurationProvider? + * + * @param confKey configuration key name + * @return If no such value is present, an {@link Optional#empty()} is returned. + */ + Optional getConf(String confKey); } diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/io/InputFile.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/io/InputFile.java new file mode 100644 index 00000000000..58118c92a57 --- /dev/null +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/io/InputFile.java @@ -0,0 +1,43 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.engine.io; + +import java.io.IOException; + +/** Interface for reading a file and getting metadata about it. */ +public interface InputFile { + /** + * Get the size of the file. + * + * @return the size of the file. + */ + long length() throws IOException; + + /** + * Get the path of the file. + * + * @return the path of the file. + */ + String path(); + + /** + * Get the input stream to read the file. + * + * @return the input stream to read the file. It is the responsibility of the caller to close the + * stream. + */ + SeekableInputStream newStream() throws IOException; +} diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/io/OutputFile.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/io/OutputFile.java new file mode 100644 index 00000000000..69bd3e9d760 --- /dev/null +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/io/OutputFile.java @@ -0,0 +1,73 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.engine.io; + +import io.delta.kernel.utils.CloseableIterator; +import java.io.IOException; + +/** Interface for writing to a file and getting metadata about it. */ +public interface OutputFile { + /** + * Get the path of the file. + * + * @return the path of the file. + */ + String path(); + + /** + * Get the output stream to write to the file. + * + *

    + *
  • If the file already exists, (either at the time of creating the {@link + * PositionOutputStream} or at the time of closing it) + *
      + *
    • If {@code putIfAbsent} is true, then the file will not be created and an {@link + * java.nio.file.FileAlreadyExistsException} will be thrown. + *
    • If {@code putIfAbsent} is false, then the existing file will be overwritten + *
    + *
  • If the file does not exist, then a new file will be created with the complete contents + *
+ * + * If the file already exists , , and {@code putIfAbsent} is true, then the file will not be + * created and an {@link java.nio.file.FileAlreadyExistsException} will be thrown. + * + * @return the output stream to write to the file. It is the responsibility of the caller to close + * the stream. + * @throws java.nio.file.FileAlreadyExistsException if the file already exists and {@code + * putIfAbsent} is true. + * @throws IOException if an I/O error occurs. + */ + PositionOutputStream create(boolean putIfAbsent) throws IOException; + + /** + * Atomically write (either write is completely or don't write all - i.e. don't leave file with + * partial content) the data to a file at the given path. If the file already exists do not + * replace it if {@code replace} is false. If {@code replace} is true, then replace the file with + * the new data. + * + *

TODO: the semantics are very loose here, see if there is a better API name. One of the + * reasons why the data is passed as an iterator is because of the existing LogStore interface + * which are used in the Hadoop implementation of the {@link FileIO} + * + * @param data the data to write. Each element in the iterator is a line in the file. + * @param overwrite if true, overwrite the file with the new data. If false, do not overwrite the + * file. + * @throws java.nio.file.FileAlreadyExistsException if the file already exists and replace is + * false. + * @throws IOException for any IO error. + */ + void writeAtomically(CloseableIterator data, boolean overwrite) throws IOException; +} diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/io/PositionOutputStream.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/io/PositionOutputStream.java new file mode 100644 index 00000000000..6d0bc3c49a7 --- /dev/null +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/io/PositionOutputStream.java @@ -0,0 +1,28 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.engine.io; + +import java.io.IOException; +import java.io.OutputStream; + +public abstract class PositionOutputStream extends OutputStream { + /** + * Get the current position in the stream. + * + * @return the current position in bytes from the start of the stream + */ + public abstract long getPos() throws IOException; +} diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/io/SeekableInputStream.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/io/SeekableInputStream.java index 898b23409cf..c2063dc4dbd 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/io/SeekableInputStream.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/io/SeekableInputStream.java @@ -18,21 +18,20 @@ import java.io.IOException; import java.io.InputStream; -public abstract class SeekableInputStream extends InputStream -{ - /** - * Get the current position in the stream. - * - * @return the current position in bytes from the start of the stream - * @throws IOException if the underlying stream throws an IOException - */ - public abstract long getPos() throws IOException; +public abstract class SeekableInputStream extends InputStream { + /** + * Get the current position in the stream. + * + * @return the current position in bytes from the start of the stream + * @throws IOException if the underlying stream throws an IOException + */ + public abstract long getPos() throws IOException; - /** - * Seek to a new position in the stream. - * - * @param newPos the new position to seek to - * @throws IOException if the underlying stream throws an IOException - */ - public abstract void seek(long newPos) throws IOException; + /** + * Seek to a new position in the stream. + * + * @param newPos the new position to seek to + * @throws IOException if the underlying stream throws an IOException + */ + public abstract void seek(long newPos) throws IOException; } diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetFileReader.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetFileReader.java index ed77a65d2c4..168b35e596a 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetFileReader.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetFileReader.java @@ -18,9 +18,11 @@ import static io.delta.kernel.defaults.internal.parquet.ParquetFilterUtils.toParquetFilter; import static io.delta.kernel.internal.util.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; -import static org.apache.parquet.hadoop.ParquetInputFormat.*; import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.defaults.engine.io.FileIO; +import io.delta.kernel.defaults.engine.io.InputFile; +import io.delta.kernel.defaults.engine.io.SeekableInputStream; import io.delta.kernel.exceptions.KernelEngineException; import io.delta.kernel.expressions.Predicate; import io.delta.kernel.internal.util.Utils; @@ -30,24 +32,30 @@ import java.io.IOException; import java.util.*; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetRecordReaderWrapper; import org.apache.parquet.hadoop.api.InitContext; import org.apache.parquet.hadoop.api.ReadSupport; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.DelegatingSeekableInputStream; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; public class ParquetFileReader { - private final Configuration configuration; + private final FileIO fileIO; private final int maxBatchSize; - public ParquetFileReader(Configuration configuration) { - this.configuration = requireNonNull(configuration, "configuration is null"); + public ParquetFileReader(FileIO fileIO) { + this.fileIO = requireNonNull(fileIO, "fileIO is null"); this.maxBatchSize = - configuration.getInt("delta.kernel.default.parquet.reader.batch-size", 1024); + fileIO + .getConf("delta.kernel.default.parquet.reader.batch-size") + .map(Integer::valueOf) + .orElse(1024); checkArgument(maxBatchSize > 0, "invalid Parquet reader batch size: %s", maxBatchSize); } @@ -105,38 +113,62 @@ private void initParquetReaderIfRequired() { if (reader == null) { org.apache.parquet.hadoop.ParquetFileReader fileReader = null; try { - Configuration confCopy = configuration; - Path filePath = new Path(path); + InputFile inputFile = fileIO.newInputFile(path); // We need physical schema in order to construct a filter that can be // pushed into the `parquet-mr` reader. For that reason read the footer // in advance. + org.apache.parquet.io.InputFile parquetInputFile = + new org.apache.parquet.io.InputFile() { + @Override + public long getLength() throws IOException { + return inputFile.length(); + } + + @Override + public org.apache.parquet.io.SeekableInputStream newStream() throws IOException { + // TODO: fix the closing of inputStream in case of errors + SeekableInputStream inputStream = inputFile.newStream(); + return new DelegatingSeekableInputStream(inputStream) { + @Override + public long getPos() throws IOException { + return inputStream.getPos(); + } + + @Override + public void seek(long newPos) throws IOException { + inputStream.seek(newPos); + } + }; + } + }; + ParquetMetadata footer = - org.apache.parquet.hadoop.ParquetFileReader.readFooter(confCopy, filePath); + org.apache.parquet.hadoop.ParquetFileReader.readFooter( + parquetInputFile, ParquetMetadataConverter.NO_FILTER); MessageType parquetSchema = footer.getFileMetaData().getSchema(); Optional parquetPredicate = predicate.flatMap(predicate -> toParquetFilter(parquetSchema, predicate)); - if (parquetPredicate.isPresent()) { - // clone the configuration to avoid modifying the original one - confCopy = new Configuration(confCopy); - - setFilterPredicate(confCopy, parquetPredicate.get()); - // Disable the record level filtering as the `parquet-mr` evaluates - // the filter once the entire record has been materialized. Instead, - // we use the predicate to prune the row groups which is more efficient. - // In the future, we can consider using the record level filtering if a - // native Parquet reader is implemented in Kernel default module. - confCopy.set(RECORD_FILTERING_ENABLED, "false"); - confCopy.set(DICTIONARY_FILTERING_ENABLED, "false"); - confCopy.set(COLUMN_INDEX_FILTERING_ENABLED, "false"); - } + // Disable the record level filtering as the `parquet-mr` evaluates + // the filter once the entire record has been materialized. Instead, + // we use the predicate to prune the row groups which is more efficient. + // In the future, we can consider using the record level filtering if a + // native Parquet reader is implemented in Kernel default module. + ParquetReadOptions readOptions = + ParquetReadOptions.builder() + .useRecordFilter(false) + .useDictionaryFilter(false) + .useStatsFilter(false) + .withRecordFilter( + parquetPredicate.map(FilterCompat::get).orElse(FilterCompat.NOOP)) + .build(); // Pass the already read footer to the reader to avoid reading it again. - fileReader = new ParquetFileReaderWithFooter(filePath, confCopy, footer); + fileReader = new ParquetFileReaderWithFooter(parquetInputFile, readOptions, footer); reader = new ParquetRecordReaderWrapper<>(readSupport); - reader.initialize(fileReader, confCopy); + reader.initialize(fileReader, readOptions); } catch (IOException e) { Utils.closeCloseablesSilently(fileReader, reader); throw new KernelEngineException("Error reading Parquet file: " + path, e); @@ -243,9 +275,12 @@ private static class ParquetFileReaderWithFooter extends org.apache.parquet.hadoop.ParquetFileReader { private final ParquetMetadata footer; - ParquetFileReaderWithFooter(Path filePath, Configuration configuration, ParquetMetadata footer) + ParquetFileReaderWithFooter( + org.apache.parquet.io.InputFile inputFile, + ParquetReadOptions readOptions, + ParquetMetadata footer) throws IOException { - super(configuration, filePath, footer); + super(inputFile, readOptions); this.footer = requireNonNull(footer, "footer is null"); } diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetFileWriter.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetFileWriter.java index 3e9e0290355..1a10faf464f 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetFileWriter.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetFileWriter.java @@ -23,8 +23,11 @@ import io.delta.kernel.Meta; import io.delta.kernel.data.*; +import io.delta.kernel.defaults.engine.io.FileIO; +import io.delta.kernel.defaults.engine.io.PositionOutputStream; import io.delta.kernel.defaults.internal.parquet.ParquetColumnWriters.ColumnWriter; import io.delta.kernel.expressions.Column; +import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.util.Utils; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.*; @@ -33,7 +36,6 @@ import java.util.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.ParquetOutputFormat; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; @@ -56,9 +58,11 @@ public class ParquetFileWriter { "delta.kernel.default.parquet.writer.targetMaxFileSize"; public static final long DEFAULT_TARGET_FILE_SIZE = 128 * 1024 * 1024; // 128MB - private final Configuration configuration; + private final FileIO fileIO; private final boolean writeAsSingleFile; - private final Path location; + private final String location; + private final Path localtionPath; + private final PositionOutputStream outputStream; private final long targetMaxFileSize; private final List statsColumns; @@ -68,23 +72,28 @@ public class ParquetFileWriter { * Create writer to write data into one or more files depending upon the {@code * delta.kernel.default.parquet.writer.targetMaxFileSize} value and the given data. */ - public ParquetFileWriter(Configuration configuration, Path location, List statsColumns) { - this.configuration = requireNonNull(configuration, "configuration is null"); + public ParquetFileWriter(FileIO fileIO, String location, List statsColumns) { + this.fileIO = requireNonNull(fileIO, "fileIO is null"); this.location = requireNonNull(location, "directory is null"); + this.localtionPath = new Path(location); // Default target file size is 128 MB. - this.targetMaxFileSize = configuration.getLong(TARGET_FILE_SIZE_CONF, DEFAULT_TARGET_FILE_SIZE); + this.targetMaxFileSize = + fileIO.getConf(TARGET_FILE_SIZE_CONF).map(Long::valueOf).orElse(DEFAULT_TARGET_FILE_SIZE); checkArgument(targetMaxFileSize > 0, "Invalid target Parquet file size: %s", targetMaxFileSize); this.statsColumns = requireNonNull(statsColumns, "statsColumns is null"); this.writeAsSingleFile = false; + this.outputStream = null; } /** Create writer to write the data exactly into one file. */ - public ParquetFileWriter(Configuration configuration, Path destPath) { - this.configuration = requireNonNull(configuration, "configuration is null"); + public ParquetFileWriter(PositionOutputStream outputStream, String destPath) { this.writeAsSingleFile = true; this.location = requireNonNull(destPath, "destPath is null"); + this.localtionPath = new Path(destPath); this.targetMaxFileSize = Long.MAX_VALUE; this.statsColumns = Collections.emptyList(); + this.outputStream = requireNonNull(outputStream, "outputStream is null"); + this.fileIO = null; } /** diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkParallelCheckpointReading.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkParallelCheckpointReading.java index f84c2caf3fe..95c4f549a18 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkParallelCheckpointReading.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkParallelCheckpointReading.java @@ -22,6 +22,8 @@ import io.delta.kernel.data.*; import io.delta.kernel.defaults.engine.DefaultEngine; import io.delta.kernel.defaults.engine.DefaultParquetHandler; +import io.delta.kernel.defaults.engine.hadoopio.HadoopFileIO; +import io.delta.kernel.defaults.engine.io.FileIO; import io.delta.kernel.defaults.internal.parquet.ParquetFileReader; import io.delta.kernel.engine.Engine; import io.delta.kernel.engine.ParquetHandler; @@ -143,15 +145,15 @@ public static void main(String[] args) throws Exception { } private static Engine createEngine(int numberOfParallelThreads) { - Configuration hadoopConf = new Configuration(); + FileIO fileIO = new HadoopFileIO(new Configuration()); if (numberOfParallelThreads <= 0) { - return DefaultEngine.create(hadoopConf); + return DefaultEngine.create(fileIO); } - return new DefaultEngine(hadoopConf) { + return new DefaultEngine(fileIO) { @Override public ParquetHandler getParquetHandler() { - return new ParallelParquetHandler(hadoopConf, numberOfParallelThreads); + return new ParallelParquetHandler(fileIO, numberOfParallelThreads); } }; } @@ -165,12 +167,12 @@ public ParquetHandler getParquetHandler() { * default implementation. */ static class ParallelParquetHandler extends DefaultParquetHandler { - private final Configuration hadoopConf; + private final FileIO fileIO; private final int numberOfParallelThreads; - ParallelParquetHandler(Configuration hadoopConf, int numberOfParallelThreads) { - super(hadoopConf); - this.hadoopConf = hadoopConf; + ParallelParquetHandler(FileIO fileIO, int numberOfParallelThreads) { + super(fileIO); + this.fileIO = fileIO; this.numberOfParallelThreads = numberOfParallelThreads; } @@ -230,7 +232,7 @@ private void submitReadRequestsIfNotDone() { } List parquetFileReader(String filePath, StructType readSchema) { - ParquetFileReader reader = new ParquetFileReader(hadoopConf); + ParquetFileReader reader = new ParquetFileReader(fileIO); try (CloseableIterator batchIter = reader.read(filePath, readSchema, Optional.empty())) { List batches = new ArrayList<>();