Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a geoparquet writer #899

Merged
merged 6 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,16 @@ public void setup() throws IOException {
@SuppressWarnings({"squid:S1481", "squid:S2201"})
@Benchmark
public void read() {
GeoParquetReader reader = new GeoParquetReader(directory.toUri());
var path = new org.apache.hadoop.fs.Path(directory.toUri());
GeoParquetReader reader = new GeoParquetReader(path);
reader.read().count();
}

@SuppressWarnings({"squid:S1481", "squid:S2201"})
@Benchmark
public void readParallel() {
GeoParquetReader reader = new GeoParquetReader(directory.toUri());
var path = new org.apache.hadoop.fs.Path(directory.toUri());
GeoParquetReader reader = new GeoParquetReader(path);
reader.readParallel().count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,17 @@ public void setup() throws IOException {
@SuppressWarnings({"squid:S1481", "squid:S2201"})
@Benchmark
public void read() {
GeoParquetReader reader =
new GeoParquetReader(Path.of("baremaps-benchmarking/data/small/*.parquet").toUri());
var path = new org.apache.hadoop.fs.Path("baremaps-benchmarking/data/small/*.parquet");
GeoParquetReader reader = new GeoParquetReader(path);
reader.read().count();
}

@SuppressWarnings({"squid:S1481", "squid:S2201"})
@Benchmark
public void readParallel() {
var path = new org.apache.hadoop.fs.Path("baremaps-benchmarking/data/small/*.parquet");
GeoParquetReader reader =
new GeoParquetReader(Path.of("baremaps-benchmarking/data/small/*.parquet").toUri());
new GeoParquetReader(path);
reader.readParallel().count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.baremaps.data.storage.*;
import org.apache.baremaps.geoparquet.GeoParquetException;
import org.apache.baremaps.geoparquet.GeoParquetReader;
import org.apache.hadoop.fs.Path;

public class GeoParquetDataTable implements DataTable {

Expand All @@ -35,7 +36,7 @@ public class GeoParquetDataTable implements DataTable {

public GeoParquetDataTable(URI path) {
this.path = path;
this.reader = new GeoParquetReader(path);
this.reader = new GeoParquetReader(new Path(path));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public int getFieldRepetitionCount(int fieldIndex) {
}
}

private Object getValue(int fieldIndex, int index) {
Object getValue(int fieldIndex, int index) {
Object value = data[fieldIndex];
if (value instanceof List<?>list) {
return list.get(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.net.URI;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -53,31 +52,31 @@ public class GeoParquetReader {
/**
* Constructs a new {@code GeoParquetReader}.
*
* @param uri the URI to read from
* @param path the path to read from
*/
public GeoParquetReader(URI uri) {
this(uri, null, new Configuration());
public GeoParquetReader(Path path) {
this(path, null, new Configuration());
}

/**
* Constructs a new {@code GeoParquetReader}.
*
* @param uri the URI to read from
* @param path the path to read from
* @param envelope the envelope to filter records
*/
public GeoParquetReader(URI uri, Envelope envelope) {
this(uri, envelope, new Configuration());
public GeoParquetReader(Path path, Envelope envelope) {
this(path, envelope, new Configuration());
}

/**
* Constructs a new {@code GeoParquetReader}.
*
* @param uri the URI to read from
* @param path the path to read from
* @param configuration the configuration
*/
public GeoParquetReader(URI uri, Envelope envelope, Configuration configuration) {
public GeoParquetReader(Path path, Envelope envelope, Configuration configuration) {
this.configuration = configuration;
this.files = initializeFiles(uri, configuration);
this.files = initializeFiles(path, configuration);
this.envelope = envelope;
}

Expand Down Expand Up @@ -168,11 +167,10 @@ private FileInfo getFileInfo(FileStatus fileStatus) {
}
}

private static List<FileStatus> initializeFiles(URI uri, Configuration configuration) {
private static List<FileStatus> initializeFiles(Path path, Configuration configuration) {
try {
Path globPath = new Path(uri.getPath());
FileSystem fileSystem = FileSystem.get(uri, configuration);
FileStatus[] fileStatuses = fileSystem.globStatus(globPath);
FileSystem fileSystem = FileSystem.get(path.toUri(), configuration);
FileStatus[] fileStatuses = fileSystem.globStatus(path);
if (fileStatuses == null) {
throw new GeoParquetException("No files found at the specified URI.");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.baremaps.geoparquet;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.*;

/**
* WriteSupport implementation for writing GeoParquetGroup instances to Parquet.
*/
public class GeoParquetWriteSupport extends WriteSupport<GeoParquetGroup> {

private Configuration configuration;
private final MessageType schema;
private final GeoParquetMetadata metadata;
private RecordConsumer recordConsumer;
private final ObjectMapper objectMapper = new ObjectMapper();

/**
* Constructs a new GeoParquetWriteSupport.
*
* @param schema the Parquet schema
* @param metadata the GeoParquet metadata
*/
public GeoParquetWriteSupport(MessageType schema, GeoParquetMetadata metadata) {
this.schema = schema;
this.metadata = metadata;
}

@Override
public WriteContext init(Configuration configuration) {
Map<String, String> extraMetadata = new HashMap<>();
String geoMetadataJson = serializeMetadata(metadata);
extraMetadata.put("geo", geoMetadataJson);
return new WriteContext(schema, extraMetadata);
}

@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
this.recordConsumer = recordConsumer;
}

@Override
public void write(GeoParquetGroup group) {
recordConsumer.startMessage();
writeGroup(group, schema, true);
recordConsumer.endMessage();
}

private void writeGroup(GeoParquetGroup group, GroupType groupType, boolean isRoot) {
if (!isRoot) {
recordConsumer.startGroup();
}
for (int i = 0; i < groupType.getFieldCount(); i++) {
Type fieldType = groupType.getType(i);
String fieldName = fieldType.getName();
int repetitionCount = group.getFieldRepetitionCount(i);
if (repetitionCount == 0) {
continue; // Skip if no values are present
}
for (int j = 0; j < repetitionCount; j++) {
recordConsumer.startField(fieldName, i);
if (fieldType.isPrimitive()) {
Object value = group.getValue(i, j);
writePrimitive(value, fieldType.asPrimitiveType());
} else {
GeoParquetGroup childGroup = group.getGroup(i, j);
writeGroup(childGroup, fieldType.asGroupType(), false);
}
recordConsumer.endField(fieldName, i);
}
}
if (!isRoot) {
recordConsumer.endGroup();
}
}

private void writePrimitive(Object value, PrimitiveType primitiveType) {
if (value == null) {
// The Parquet format does not support writing null values directly.
// If the field is optional and the value is null, we simply do not write it.
return;
}
switch (primitiveType.getPrimitiveTypeName()) {
case INT32:
recordConsumer.addInteger((Integer) value);
break;
case INT64:
recordConsumer.addLong((Long) value);
break;
case FLOAT:
recordConsumer.addFloat((Float) value);
break;
case DOUBLE:
recordConsumer.addDouble((Double) value);
break;
case BOOLEAN:
recordConsumer.addBoolean((Boolean) value);
break;
case BINARY, FIXED_LEN_BYTE_ARRAY:
recordConsumer.addBinary((Binary) value);
break;
default:
throw new GeoParquetException(
"Unsupported type: " + primitiveType.getPrimitiveTypeName());
}
}

private String serializeMetadata(GeoParquetMetadata metadata) {
try {
return objectMapper.writeValueAsString(metadata);
} catch (JsonProcessingException e) {
throw new GeoParquetException("Failed to serialize GeoParquet metadata", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.baremaps.geoparquet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.schema.MessageType;

/**
* A writer for GeoParquet files that writes GeoParquetGroup instances to a Parquet file.
*/
public class GeoParquetWriter {

private GeoParquetWriter() {
// Prevent instantiation
}

public static Builder builder(Path file) {
return new Builder(file);
}

public static class Builder
extends ParquetWriter.Builder<GeoParquetGroup, GeoParquetWriter.Builder> {

private MessageType type = null;

private GeoParquetMetadata metadata = null;

private Builder(Path file) {
super(file);
}

/**
* Replace the message type with the specified one.
*
* @param type the message type
* @return the builder
*/
public GeoParquetWriter.Builder withType(MessageType type) {
this.type = type;
bchapuis marked this conversation as resolved.
Show resolved Hide resolved
return this;
}

/**
* Replace the metadata with the specified one.
*
* @param metadata the metadata
* @return the builder
*/
public GeoParquetWriter.Builder withGeoParquetMetadata(GeoParquetMetadata metadata) {
this.metadata = metadata;
bchapuis marked this conversation as resolved.
Show resolved Hide resolved
return this;
}

/**
* {@inheritDoc}
*/
@Override
protected WriteSupport<GeoParquetGroup> getWriteSupport(Configuration conf) {
// We don't need access to the hadoop configuration for now
return getWriteSupport((ParquetConfiguration) null);
}

/**
* {@inheritDoc}
*/
@Override
protected WriteSupport<GeoParquetGroup> getWriteSupport(ParquetConfiguration conf) {
return new GeoParquetWriteSupport(type, metadata);
bchapuis marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* {@inheritDoc}
*/
@Override
protected GeoParquetWriter.Builder self() {
return this;
}
}
}
Loading
Loading