Skip to content

Commit

Permalink
Implement a geoparquet writer
Browse files Browse the repository at this point in the history
  • Loading branch information
bchapuis committed Oct 19, 2024
1 parent b622606 commit 59ec9fc
Show file tree
Hide file tree
Showing 10 changed files with 364 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,16 @@ public void setup() throws IOException {
@SuppressWarnings({"squid:S1481", "squid:S2201"})
@Benchmark
public void read() {
GeoParquetReader reader = new GeoParquetReader(directory.toUri());
var path = new org.apache.hadoop.fs.Path(directory.toUri());
GeoParquetReader reader = new GeoParquetReader(path);
reader.read().count();
}

@SuppressWarnings({"squid:S1481", "squid:S2201"})
@Benchmark
public void readParallel() {
GeoParquetReader reader = new GeoParquetReader(directory.toUri());
var path = new org.apache.hadoop.fs.Path(directory.toUri());
GeoParquetReader reader = new GeoParquetReader(path);
reader.readParallel().count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,17 @@ public void setup() throws IOException {
@SuppressWarnings({"squid:S1481", "squid:S2201"})
@Benchmark
public void read() {
GeoParquetReader reader =
new GeoParquetReader(Path.of("baremaps-benchmarking/data/small/*.parquet").toUri());
var path = new org.apache.hadoop.fs.Path("baremaps-benchmarking/data/small/*.parquet");
GeoParquetReader reader = new GeoParquetReader(path);
reader.read().count();
}

@SuppressWarnings({"squid:S1481", "squid:S2201"})
@Benchmark
public void readParallel() {
var path = new org.apache.hadoop.fs.Path("baremaps-benchmarking/data/small/*.parquet");
GeoParquetReader reader =
new GeoParquetReader(Path.of("baremaps-benchmarking/data/small/*.parquet").toUri());
new GeoParquetReader(path);
reader.readParallel().count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.baremaps.data.storage.*;
import org.apache.baremaps.geoparquet.GeoParquetException;
import org.apache.baremaps.geoparquet.GeoParquetReader;
import org.apache.hadoop.fs.Path;

public class GeoParquetDataTable implements DataTable {

Expand All @@ -35,7 +36,7 @@ public class GeoParquetDataTable implements DataTable {

public GeoParquetDataTable(URI path) {
this.path = path;
this.reader = new GeoParquetReader(path);
this.reader = new GeoParquetReader(new Path(path));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public int getFieldRepetitionCount(int fieldIndex) {
}
}

private Object getValue(int fieldIndex, int index) {
Object getValue(int fieldIndex, int index) {
Object value = data[fieldIndex];
if (value instanceof List<?>list) {
return list.get(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.net.URI;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -53,31 +52,31 @@ public class GeoParquetReader {
/**
* Constructs a new {@code GeoParquetReader}.
*
* @param uri the URI to read from
* @param path the path to read from
*/
public GeoParquetReader(URI uri) {
this(uri, null, new Configuration());
public GeoParquetReader(Path path) {
this(path, null, new Configuration());
}

/**
* Constructs a new {@code GeoParquetReader}.
*
* @param uri the URI to read from
* @param path the path to read from
* @param envelope the envelope to filter records
*/
public GeoParquetReader(URI uri, Envelope envelope) {
this(uri, envelope, new Configuration());
public GeoParquetReader(Path path, Envelope envelope) {
this(path, envelope, new Configuration());
}

/**
* Constructs a new {@code GeoParquetReader}.
*
* @param uri the URI to read from
* @param path the path to read from
* @param configuration the configuration
*/
public GeoParquetReader(URI uri, Envelope envelope, Configuration configuration) {
public GeoParquetReader(Path path, Envelope envelope, Configuration configuration) {
this.configuration = configuration;
this.files = initializeFiles(uri, configuration);
this.files = initializeFiles(path, configuration);
this.envelope = envelope;
}

Expand Down Expand Up @@ -168,11 +167,10 @@ private FileInfo getFileInfo(FileStatus fileStatus) {
}
}

private static List<FileStatus> initializeFiles(URI uri, Configuration configuration) {
private static List<FileStatus> initializeFiles(Path path, Configuration configuration) {
try {
Path globPath = new Path(uri.getPath());
FileSystem fileSystem = FileSystem.get(uri, configuration);
FileStatus[] fileStatuses = fileSystem.globStatus(globPath);
FileSystem fileSystem = FileSystem.get(path.toUri(), configuration);
FileStatus[] fileStatuses = fileSystem.globStatus(path);
if (fileStatuses == null) {
throw new GeoParquetException("No files found at the specified URI.");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.baremaps.geoparquet;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.*;

/**
* WriteSupport implementation for writing GeoParquetGroup instances to Parquet.
*/
public class GeoParquetWriteSupport extends WriteSupport<GeoParquetGroup> {

private RecordConsumer recordConsumer;
private final MessageType schema;
private final GeoParquetMetadata metadata;
private final ObjectMapper objectMapper = new ObjectMapper();

/**
* Constructs a new GeoParquetWriteSupport.
*
* @param schema the Parquet schema
* @param metadata the GeoParquet metadata
*/
public GeoParquetWriteSupport(MessageType schema, GeoParquetMetadata metadata) {
this.schema = schema;
this.metadata = metadata;
}

@Override
public WriteContext init(Configuration configuration) {
Map<String, String> extraMetadata = new HashMap<>();
// Serialize the GeoParquet metadata to JSON and add it to the file metadata
String geoMetadataJson = serializeMetadata(metadata);
extraMetadata.put("geo", geoMetadataJson);

return new WriteContext(schema, extraMetadata);
}

@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
this.recordConsumer = recordConsumer;
}

@Override
public void write(GeoParquetGroup group) {
writeGroup(group, schema);
}

private void writeGroup(GeoParquetGroup group, GroupType groupType) {
recordConsumer.startMessage();
for (int i = 0; i < groupType.getFieldCount(); i++) {
Type fieldType = groupType.getType(i);
String fieldName = fieldType.getName();
int repetitionCount = group.getFieldRepetitionCount(i);
if (repetitionCount == 0) {
continue; // Skip if no values are present
}
for (int j = 0; j < repetitionCount; j++) {
recordConsumer.startField(fieldName, i);
if (fieldType.isPrimitive()) {
Object value = group.getValue(i, j);
writePrimitive(value, fieldType.asPrimitiveType());
} else {
GeoParquetGroup childGroup = group.getGroup(i, j);
writeGroup(childGroup, fieldType.asGroupType());
}
recordConsumer.endField(fieldName, i);
}
}
recordConsumer.endMessage();
}

private void writePrimitive(Object value, PrimitiveType primitiveType) {
if (value == null) {
// The Parquet format does not support writing null values directly.
// If the field is optional and the value is null, we simply do not write it.
return;
}
switch (primitiveType.getPrimitiveTypeName()) {
case INT32:
recordConsumer.addInteger((Integer) value);
break;
case INT64:
recordConsumer.addLong((Long) value);
break;
case FLOAT:
recordConsumer.addFloat((Float) value);
break;
case DOUBLE:
recordConsumer.addDouble((Double) value);
break;
case BOOLEAN:
recordConsumer.addBoolean((Boolean) value);
break;
case BINARY, FIXED_LEN_BYTE_ARRAY:
recordConsumer.addBinary((Binary) value);
break;
default:
throw new GeoParquetException(
"Unsupported type: " + primitiveType.getPrimitiveTypeName());
}
}

private String serializeMetadata(GeoParquetMetadata metadata) {
try {
return objectMapper.writeValueAsString(metadata);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize GeoParquet metadata", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.baremaps.geoparquet;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;

/**
* A writer for GeoParquet files that writes GeoParquetGroup instances to a Parquet file.
*/
public class GeoParquetWriter implements AutoCloseable {

private final ParquetWriter<GeoParquetGroup> parquetWriter;

/**
* Constructs a new GeoParquetWriter.
*
* @param outputFile the output file
* @param schema the Parquet schema
* @param metadata the GeoParquet metadata
* @throws IOException if an I/O error occurs
*/
public GeoParquetWriter(Path outputFile, MessageType schema, GeoParquetMetadata metadata)
throws IOException {
this.parquetWriter = new ParquetWriter<>(
outputFile,
new GeoParquetWriteSupport(schema, metadata),
CompressionCodecName.UNCOMPRESSED,
ParquetWriter.DEFAULT_BLOCK_SIZE,
ParquetWriter.DEFAULT_PAGE_SIZE,
ParquetWriter.DEFAULT_PAGE_SIZE,
ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
WriterVersion.PARQUET_2_0,
new Configuration());

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
ParquetWriter.ParquetWriter
should be avoided because it has been deprecated.
}

/**
* Writes a GeoParquetGroup to the Parquet file.
*
* @param group the GeoParquetGroup to write
* @throws IOException if an I/O error occurs
*/
public void write(GeoParquetGroup group) throws IOException {
parquetWriter.write(group);
}

/**
* Closes the writer and releases any system resources associated with it.
*
* @throws IOException if an I/O error occurs
*/
public void close() throws IOException {

Check notice

Code scanning / CodeQL

Missing Override annotation Note

This method overrides
AutoCloseable.close
; it is advisable to add an Override annotation.
parquetWriter.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,45 +19,45 @@

import static org.junit.jupiter.api.Assertions.*;

import java.net.URI;
import org.apache.baremaps.testing.TestFiles;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Test;
import org.locationtech.jts.geom.Envelope;

class GeoParquetReaderTest {

@Test
void read() {
URI geoParquet = TestFiles.GEOPARQUET.toUri();
Path geoParquet = new Path(TestFiles.GEOPARQUET.toUri());
GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet);
assertEquals(5, geoParquetReader.read().count());
}

@Test
void readFiltered() {
URI geoParquet = TestFiles.GEOPARQUET.toUri();
Path geoParquet = new Path(TestFiles.GEOPARQUET.toUri());
GeoParquetReader geoParquetReader =
new GeoParquetReader(geoParquet, new Envelope(-172, -65, 18, 72));
assertEquals(1, geoParquetReader.read().count());
}

@Test
void size() {
URI geoParquet = TestFiles.GEOPARQUET.toUri();
Path geoParquet = new Path(TestFiles.GEOPARQUET.toUri());
GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet);
assertEquals(5, geoParquetReader.size());
}

@Test
void count() {
URI geoParquet = TestFiles.GEOPARQUET.toUri();
Path geoParquet = new Path(TestFiles.GEOPARQUET.toUri());
GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet);
assertEquals(5, geoParquetReader.read().count());
}

@Test
void validateSchemas() {
URI geoParquet = TestFiles.GEOPARQUET.toUri();
Path geoParquet = new Path(TestFiles.GEOPARQUET.toUri());
GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet);
assertTrue(geoParquetReader.validateSchemasAreIdentical());
}
Expand Down
Loading

0 comments on commit 59ec9fc

Please sign in to comment.