Skip to content

Extended I/O Framework: Readers/Writers for Parquet #2229

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/main/java/org/apache/sysds/common/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,7 @@ public enum FileFormat {
FEDERATED, // A federated matrix
PROTO, // protocol buffer representation
HDF5, // Hierarchical Data Format (HDF)
PARQUET, // parquet format for columnar data storage
UNKNOWN;

public boolean isIJV() {
Expand Down
157 changes: 157 additions & 0 deletions src/main/java/org/apache/sysds/runtime/io/FrameReaderParquet.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.io;

import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.frame.data.FrameBlock;
import org.apache.sysds.runtime.util.HDFSTool;

/**
* Single-threaded frame parquet reader.
*
*/
public class FrameReaderParquet extends FrameReader {

/**
* Reads a Parquet file from HDFS and converts it into a FrameBlock.
*
* @param fname The HDFS file path to the Parquet file.
* @param schema The expected data types of the columns.
* @param names The names of the columns.
* @param rlen The expected number of rows.
* @param clen The expected number of columns.
* @return A FrameBlock containing the data read from the Parquet file.
*/
@Override
public FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] names, long rlen, long clen) throws IOException, DMLRuntimeException {
// Prepare file access
Configuration conf = ConfigurationManager.getCachedJobConf();
Path path = new Path(fname);

// Check existence and non-empty file
if (!HDFSTool.existsFileOnHDFS(path.toString())) {
throw new IOException("File does not exist on HDFS: " + fname);
}

// Allocate output frame block
ValueType[] lschema = createOutputSchema(schema, clen);
String[] lnames = createOutputNames(names, clen);
FrameBlock ret = createOutputFrameBlock(lschema, lnames, rlen);

// Read Parquet file
readParquetFrameFromHDFS(path, conf, ret, lschema, rlen, clen);

return ret;
}

/**
* Reads data from a Parquet file on HDFS and fills the provided FrameBlock.
* The method retrieves the Parquet schema from the file footer, maps the required column names
* to their corresponding indices, and then uses a ParquetReader to iterate over each row.
* Data is extracted based on the column type and set into the output FrameBlock.
*
* @param path The HDFS path to the Parquet file.
* @param conf The Hadoop configuration.
* @param dest The FrameBlock to populate with data.
* @param schema The expected value types for the output columns.
* @param rlen The expected number of rows.
* @param clen The expected number of columns.
*/
protected void readParquetFrameFromHDFS(Path path, Configuration conf, FrameBlock dest, ValueType[] schema, long rlen, long clen) throws IOException {
// Retrieve schema from Parquet footer
ParquetMetadata metadata = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf)).getFooter();
MessageType parquetSchema = metadata.getFileMetaData().getSchema();

// Map column names to Parquet schema indices
String[] columnNames = dest.getColumnNames();
int[] columnIndices = new int[columnNames.length];
for (int i = 0; i < columnNames.length; i++) {
columnIndices[i] = parquetSchema.getFieldIndex(columnNames[i]);
}

// Read data usind ParquetReader
try (ParquetReader<Group> rowReader = ParquetReader.builder(new GroupReadSupport(), path)
.withConf(conf)
.build()) {

Group group;
int row = 0;
while ((group = rowReader.read()) != null) {
for (int col = 0; col < clen; col++) {
int colIndex = columnIndices[col];
if (group.getFieldRepetitionCount(colIndex) > 0) {
PrimitiveType.PrimitiveTypeName type = parquetSchema.getType(columnNames[col]).asPrimitiveType().getPrimitiveTypeName();
switch (type) {
case INT32:
dest.set(row, col, group.getInteger(colIndex, 0));
break;
case INT64:
dest.set(row, col, group.getLong(colIndex, 0));
break;
case FLOAT:
dest.set(row, col, group.getFloat(colIndex, 0));
break;
case DOUBLE:
dest.set(row, col, group.getDouble(colIndex, 0));
break;
case BOOLEAN:
dest.set(row, col, group.getBoolean(colIndex, 0));
break;
case BINARY:
dest.set(row, col, group.getBinary(colIndex, 0).toStringUsingUTF8());
break;
default:
throw new IOException("Unsupported data type: " + type);
}
} else {
dest.set(row, col, null);
}
}
row++;
}

// Check frame dimensions
if (row != rlen) {
throw new IOException("Mismatch in row count: expected " + rlen + ", but got " + row);
}
}
}

//not implemented
@Override
public FrameBlock readFrameFromInputStream(InputStream is, ValueType[] schema, String[] names, long rlen, long clen)
throws IOException, DMLRuntimeException {
throw new UnsupportedOperationException("Unimplemented method 'readFrameFromInputStream'");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.io;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.frame.data.FrameBlock;
import org.apache.sysds.runtime.util.CommonThreadPool;

/**
* Multi-threaded frame parquet reader.
*
*/
public class FrameReaderParquetParallel extends FrameReaderParquet {

/**
* Reads a Parquet frame in parallel and populates the provided FrameBlock with the data.
* The method retrieves all file paths from the sequence files at that location, it then determines
* the number of threads to use based on the available files and a configured parallelism setting.
* A thread pool is created to run a reading task for each file concurrently.
*
* @param path The HDFS path to the Parquet file or the directory containing sequence files.
* @param conf The Hadoop configuration.
* @param dest The FrameBlock to be updated with the data read from the files.
* @param schema The expected value types for the frame columns.
* @param rlen The expected number of rows.
* @param clen The expected number of columns.
*/
@Override
protected void readParquetFrameFromHDFS(Path path, Configuration conf, FrameBlock dest, ValueType[] schema, long rlen, long clen) throws IOException, DMLRuntimeException {
FileSystem fs = IOUtilFunctions.getFileSystem(path);
Path[] files = IOUtilFunctions.getSequenceFilePaths(fs, path);
int numThreads = Math.min(OptimizerUtils.getParallelBinaryReadParallelism(), files.length);

// Create and execute read tasks
ExecutorService pool = CommonThreadPool.get(numThreads);
try {
List<ReadFileTask> tasks = new ArrayList<>();
for (Path file : files) {
tasks.add(new ReadFileTask(file, conf, dest, schema, clen));
}

for (Future<Object> task : pool.invokeAll(tasks)) {
task.get();
}
} catch (Exception e) {
throw new IOException("Failed parallel read of Parquet frame.", e);
} finally {
pool.shutdown();
}
}

private class ReadFileTask implements Callable<Object> {
private Path path;
private Configuration conf;
private FrameBlock dest;
private ValueType[] schema;
private long clen;

public ReadFileTask(Path path, Configuration conf, FrameBlock dest, ValueType[] schema, long clen) {
this.path = path;
this.conf = conf;
this.dest = dest;
this.schema = schema;
this.clen = clen;
}

// When executed, a ParquetReader for the assigned file opens and iterates over each row processing every column.
@Override
public Object call() throws Exception {
try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), path).withConf(conf).build()) {
Group group;
int row = 0;
while ((group = reader.read()) != null) {
for (int col = 0; col < clen; col++) {
if (group.getFieldRepetitionCount(col) > 0) {
dest.set(row, col, group.getValueToString(col, 0));
} else {
dest.set(row, col, null);
}
}
row++;
}
}
return null;
}
}
}
Loading