diff --git a/.gitignore b/.gitignore index a66d04304..8af30f71f 100644 --- a/.gitignore +++ b/.gitignore @@ -49,6 +49,9 @@ examples/openstreetmap/tiles/ examples/transformation/*.pbf +# Benchmarking +baremaps-benchmarking/data/ + # Docs .jekyll-cache/ _site/ diff --git a/baremaps-benchmarking/pom.xml b/baremaps-benchmarking/pom.xml new file mode 100644 index 000000000..53f2db9ce --- /dev/null +++ b/baremaps-benchmarking/pom.xml @@ -0,0 +1,64 @@ + + + 4.0.0 + + org.apache.baremaps + baremaps + 0.7.4-SNAPSHOT + + + baremaps-benchmarking + + + 1.37 + true + + + + + org.apache.baremaps + baremaps-geoparquet + + + org.apache.baremaps + baremaps-testing + + + org.openjdk.jmh + jmh-core + ${jmh.version} + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.6.0 + + + + shade + + package + + benchmarks + + + org.openjdk.jmh.Main + + + + + + + + + + diff --git a/baremaps-benchmarking/src/main/java/org/apache/baremaps/benchmarking/geoparquet/OvertureMapsBenchmark.java b/baremaps-benchmarking/src/main/java/org/apache/baremaps/benchmarking/geoparquet/OvertureMapsBenchmark.java new file mode 100644 index 000000000..a230e41da --- /dev/null +++ b/baremaps-benchmarking/src/main/java/org/apache/baremaps/benchmarking/geoparquet/OvertureMapsBenchmark.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.baremaps.benchmarking.geoparquet; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.TimeUnit; +import org.apache.baremaps.geoparquet.GeoParquetReader; +import org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +@Warmup(iterations = 0) +@Measurement(iterations = 1) +public class OvertureMapsBenchmark { + + private static Path directory = Path.of("baremaps-benchmarking/data/overturemaps"); + + public static void main(String[] args) throws RunnerException { + Options opt = new OptionsBuilder() + .include(OvertureMapsBenchmark.class.getSimpleName()) + .forks(1) + .build(); + new Runner(opt).run(); + } + + @Setup + public void setup() throws IOException { + if (!Files.exists(directory)) { + try (var client = S3Client.builder() + .region(Region.US_EAST_1) + .credentialsProvider(new AnonymousAWSCredentialsProvider()) + .build()) { + + var listRequest = ListObjectsV2Request.builder() + .bucket("overturemaps-us-west-2") + .prefix("release/2024-09-18.0/theme=addresses/") + .build(); + var objects = client.listObjectsV2(listRequest).contents(); + for (var object : objects) { + var key = object.key(); + var name = key.substring(key.lastIndexOf("/") + 1); + var file = directory.resolve(name); + Files.createDirectories(file.getParent()); + if (!Files.exists(file)) { + var getRequest = GetObjectRequest.builder() + .bucket("overturemaps-us-west-2") + .key(key) + .build(); + client.getObject(getRequest, file); + } + } + } + } + } + + @SuppressWarnings({"squid:S1481", "squid:S2201"}) + @Benchmark + public void read() { + GeoParquetReader reader = new GeoParquetReader(directory.toUri()); + reader.read().count(); + } + + @SuppressWarnings({"squid:S1481", "squid:S2201"}) + @Benchmark + public void readParallel() { + GeoParquetReader reader = new GeoParquetReader(directory.toUri()); + reader.readParallel().count(); + } +} diff --git a/baremaps-benchmarking/src/main/java/org/apache/baremaps/benchmarking/geoparquet/SmallFileBenchmark.java b/baremaps-benchmarking/src/main/java/org/apache/baremaps/benchmarking/geoparquet/SmallFileBenchmark.java new file mode 100644 index 000000000..11f468f00 --- /dev/null +++ b/baremaps-benchmarking/src/main/java/org/apache/baremaps/benchmarking/geoparquet/SmallFileBenchmark.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.baremaps.benchmarking.geoparquet; + + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.TimeUnit; +import org.apache.baremaps.geoparquet.GeoParquetReader; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +@Warmup(iterations = 0) +@Measurement(iterations = 1) +public class SmallFileBenchmark { + + private Path source = Path.of("baremaps-testing/data/samples/example.parquet").toAbsolutePath(); + private Path directory = Path.of("baremaps-benchmarking/data/small").toAbsolutePath(); + + public static void main(String[] args) throws RunnerException { + Options opt = new OptionsBuilder() + .include(SmallFileBenchmark.class.getSimpleName()) + .forks(1) + .build(); + new Runner(opt).run(); + } + + @Setup + public void setup() throws IOException { + if (!Files.exists(directory)) { + for (int i = 0; i < 1000; i++) { + Path target = directory.resolve(i + ".parquet"); + Files.createDirectories(target.getParent()); + Files.copy(source, target); + } + } + } + + @SuppressWarnings({"squid:S1481", "squid:S2201"}) + @Benchmark + public void read() { + GeoParquetReader reader = + new GeoParquetReader(Path.of("baremaps-benchmarking/data/small/*.parquet").toUri()); + reader.read().count(); + } + + @SuppressWarnings({"squid:S1481", "squid:S2201"}) + @Benchmark + public void readParallel() { + GeoParquetReader reader = + new GeoParquetReader(Path.of("baremaps-benchmarking/data/small/*.parquet").toUri()); + reader.readParallel().count(); + } +} diff --git a/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java b/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java index 225ce61ce..90c78d1a7 100644 --- a/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java +++ b/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java @@ -35,18 +35,12 @@ public class GeoParquetDataTable implements DataTable { public GeoParquetDataTable(URI path) { this.path = path; - } - - private GeoParquetReader reader() { - if (reader == null) { - reader = new GeoParquetReader(path); - } - return reader; + this.reader = new GeoParquetReader(path); } @Override public long size() { - return reader().size(); + return reader.size(); } @Override @@ -66,8 +60,8 @@ public Stream stream() { @Override public Stream parallelStream() { - return reader().readParallel().map(group -> new DataRowImpl( - GeoParquetTypeConversion.asSchema(path.toString(), group.getSchema()), + return reader.readParallel().map(group -> new DataRowImpl( + GeoParquetTypeConversion.asSchema(path.toString(), group.getGeoParquetSchema()), GeoParquetTypeConversion.asRowValues(group))); } @@ -76,7 +70,6 @@ public void clear() { if (reader != null) { reader = null; } - if (schema != null) { schema = null; } @@ -87,7 +80,7 @@ public DataSchema schema() { if (schema == null) { this.schema = GeoParquetTypeConversion.asSchema( path.toString(), - reader().getGeoParquetSchema()); + reader.getGeoParquetSchema()); return this.schema; } return schema; @@ -95,7 +88,7 @@ public DataSchema schema() { public int srid(String column) { try { - return reader().getGeoParquetMetadata().getSrid(column); + return reader.getGeoParquetMetadata().getSrid(column); } catch (Exception e) { throw new GeoParquetException("Fail to read the SRID from the GeoParquet metadata", e); } diff --git a/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetTypeConversion.java b/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetTypeConversion.java index 435effa5e..246a10a42 100644 --- a/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetTypeConversion.java +++ b/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetTypeConversion.java @@ -24,21 +24,22 @@ import org.apache.baremaps.data.storage.*; import org.apache.baremaps.data.storage.DataColumn.Cardinality; import org.apache.baremaps.data.storage.DataColumn.Type; -import org.apache.baremaps.geoparquet.data.GeoParquetGroup; -import org.apache.baremaps.geoparquet.data.GeoParquetGroup.Field; -import org.apache.baremaps.geoparquet.data.GeoParquetGroup.GroupField; -import org.apache.baremaps.geoparquet.data.GeoParquetGroup.Schema; +import org.apache.baremaps.geoparquet.GeoParquetGroup; +import org.apache.baremaps.geoparquet.GeoParquetSchema; +import org.apache.baremaps.geoparquet.GeoParquetSchema.Field; +import org.apache.baremaps.geoparquet.GeoParquetSchema.GroupField; +import org.apache.parquet.io.api.Binary; public class GeoParquetTypeConversion { private GeoParquetTypeConversion() {} - public static DataSchema asSchema(String table, Schema schema) { + public static DataSchema asSchema(String table, GeoParquetSchema schema) { List columns = asDataColumns(schema); return new DataSchemaImpl(table, columns); } - private static List asDataColumns(Schema field) { + private static List asDataColumns(GeoParquetSchema field) { return field.fields().stream() .map(GeoParquetTypeConversion::asDataColumn) .toList(); @@ -66,41 +67,48 @@ private static DataColumn asDataColumn(Field field) { } public static List asRowValues(GeoParquetGroup group) { - List values = new ArrayList<>(); - Schema schema = group.getSchema(); + GeoParquetSchema schema = group.getGeoParquetSchema(); List fields = schema.fields(); + List values = new ArrayList<>(); for (int i = 0; i < fields.size(); i++) { - if (group.getValues(i).isEmpty()) { - values.add(null); - continue; - } Field field = fields.get(i); - switch (field.type()) { - case BINARY -> values.add(group.getBinaryValue(i).getBytes()); - case BOOLEAN -> values.add(group.getBooleanValue(i)); - case INTEGER -> values.add(group.getIntegerValue(i)); - case INT96, LONG -> values.add(group.getLongValue(i)); - case FLOAT -> values.add(group.getFloatValue(i)); - case DOUBLE -> values.add(group.getDoubleValue(i)); - case STRING -> values.add(group.getStringValue(i)); - case GEOMETRY -> values.add(group.getGeometryValue(i)); - case ENVELOPE -> values.add(group.getEnvelopeValue(i)); - case GROUP -> values.add(asNested(group.getGroupValue(i))); - } + values.add(asValue(field, group, i)); } return values; } public static Map asNested(GeoParquetGroup group) { Map nested = new HashMap<>(); - Schema schema = group.getSchema(); + GeoParquetSchema schema = group.getGeoParquetSchema(); List fields = schema.fields(); for (int i = 0; i < fields.size(); i++) { if (group.getValues(i).isEmpty()) { continue; } Field field = fields.get(i); - nested.put(field.name(), switch (field.type()) { + Object value = asValue(field, group, i); + nested.put(field.name(), value); + } + return nested; + } + + public static Object asValue(Field field, GeoParquetGroup group, int i) { + if (field.cardinality() == GeoParquetSchema.Cardinality.REPEATED) { + return switch (field.type()) { + case BINARY -> group.getBinaryValues(i).stream().map(Binary::getBytes).toList(); + case BOOLEAN -> group.getBooleanValues(i); + case INTEGER -> group.getIntegerValues(i); + case INT96, LONG -> group.getLongValues(i); + case FLOAT -> group.getFloatValues(i); + case DOUBLE -> group.getDoubleValues(i); + case STRING -> group.getStringValues(i); + case GEOMETRY -> group.getGeometryValues(i); + case ENVELOPE -> group.getEnvelopeValues(i); + case GROUP -> group.getGroupValues(i).stream().map(GeoParquetTypeConversion::asNested) + .toList(); + }; + } else { + return switch (field.type()) { case BINARY -> group.getBinaryValue(i).getBytes(); case BOOLEAN -> group.getBooleanValue(i); case INTEGER -> group.getIntegerValue(i); @@ -111,9 +119,7 @@ public static Map asNested(GeoParquetGroup group) { case GEOMETRY -> group.getGeometryValue(i); case ENVELOPE -> group.getEnvelopeValue(i); case GROUP -> asNested(group.getGroupValue(i)); - }); + }; } - return nested; } - } diff --git a/baremaps-geoparquet/pom.xml b/baremaps-geoparquet/pom.xml index 6323d2076..bd687aa3a 100644 --- a/baremaps-geoparquet/pom.xml +++ b/baremaps-geoparquet/pom.xml @@ -49,5 +49,9 @@ software.amazon.awssdk s3 + + software.amazon.awssdk + s3-transfer-manager + diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetException.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetException.java index 2de65da36..6b5f22d05 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetException.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetException.java @@ -17,11 +17,26 @@ package org.apache.baremaps.geoparquet; +/** + * Exception thrown when an error occurs during the processing of GeoParquet files. + */ public class GeoParquetException extends RuntimeException { + + /** + * Constructs a new GeoParquetException with the specified message. + * + * @param message the message + */ public GeoParquetException(String message) { super(message); } + /** + * Constructs a new GeoParquetException with the specified detail message and cause. + * + * @param message the message + * @param cause the cause + */ public GeoParquetException(String message, Throwable cause) { super(message, cause); } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroup.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroup.java new file mode 100644 index 000000000..38c6dd746 --- /dev/null +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroup.java @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.baremaps.geoparquet; + +import java.util.ArrayList; +import java.util.List; +import org.apache.baremaps.geoparquet.GeoParquetSchema.EnvelopeField; +import org.apache.baremaps.geoparquet.GeoParquetSchema.Field; +import org.apache.baremaps.geoparquet.GeoParquetSchema.GroupField; +import org.apache.baremaps.geoparquet.GeoParquetSchema.Type; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.GroupType; +import org.locationtech.jts.geom.Envelope; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.io.ParseException; +import org.locationtech.jts.io.WKBReader; +import org.locationtech.jts.io.WKBWriter; + +/** + * A group of fields in a GeoParquet file. + */ +public class GeoParquetGroup { + + private final GroupType parquetSchema; + private final GeoParquetMetadata geoParquetMetadata; + private final GeoParquetSchema geoParquetSchema; + private final Object[] data; + + /** + * Constructs a new GeoParquetGroup with the specified schema, metadata and GeoParquet schema. + * + * @param parquetSchema + * @param geoParquetMetadata + * @param geoParquetSchema + */ + public GeoParquetGroup(GroupType parquetSchema, GeoParquetMetadata geoParquetMetadata, + GeoParquetSchema geoParquetSchema) { + this.parquetSchema = parquetSchema; + this.geoParquetMetadata = geoParquetMetadata; + this.geoParquetSchema = geoParquetSchema; + this.data = new Object[parquetSchema.getFields().size()]; + for (int i = 0; i < parquetSchema.getFieldCount(); i++) { + Field field = geoParquetSchema.fields().get(i); + if (field.cardinality() == GeoParquetSchema.Cardinality.REPEATED) { + this.data[i] = new ArrayList<>(); + } else { + this.data[i] = null; // For REQUIRED or OPTIONAL fields + } + } + } + + public GeoParquetGroup addGroup(int fieldIndex) { + GeoParquetGroup group = createGroup(fieldIndex); + add(fieldIndex, group); + return group; + } + + public GeoParquetGroup addGroup(String field) { + return addGroup(getParquetSchema().getFieldIndex(field)); + } + + private GeoParquetGroup createGroup(int fieldIndex) { + Field field = geoParquetSchema.fields().get(fieldIndex); + if (field instanceof EnvelopeField envelopeField) { + return new GeoParquetGroup(parquetSchema.getType(fieldIndex).asGroupType(), + geoParquetMetadata, + envelopeField.schema()); + } else if (field instanceof GroupField groupField) { + return new GeoParquetGroup(parquetSchema.getType(fieldIndex).asGroupType(), + geoParquetMetadata, + groupField.schema()); + } + throw new GeoParquetException("Field at index " + fieldIndex + " is not a group"); + } + + public GeoParquetGroup getGroup(int fieldIndex, int index) { + return (GeoParquetGroup) getValue(fieldIndex, index); + } + + public GeoParquetGroup getGroup(String field, int index) { + return getGroup(getParquetSchema().getFieldIndex(field), index); + } + + public int getFieldRepetitionCount(int fieldIndex) { + Object value = data[fieldIndex]; + if (value instanceof Listlist) { + return list.size(); + } else { + return value == null ? 0 : 1; + } + } + + private Object getValue(int fieldIndex, int index) { + Object value = data[fieldIndex]; + if (value instanceof Listlist) { + return list.get(index); + } else if (index == 0) { + return value; + } else { + throw createGeoParquetException(fieldIndex, "element number " + index); + } + } + + private Object getValue(int fieldIndex) { + Field field = geoParquetSchema.fields().get(fieldIndex); + if (field.cardinality() == GeoParquetSchema.Cardinality.REPEATED) { + throw new IllegalStateException("Field " + fieldIndex + " (" + field.name() + + ") is repeated. Use getValues() instead."); + } + return data[fieldIndex]; + } + + public List getValues(int fieldIndex) { + Field field = geoParquetSchema.fields().get(fieldIndex); + if (field.cardinality() != GeoParquetSchema.Cardinality.REPEATED) { + return List.of(getValue(fieldIndex)); + } + return (List) data[fieldIndex]; + } + + private void addValue(int fieldIndex, Object value) { + Object currentValue = data[fieldIndex]; + if (currentValue instanceof List) { + ((List) currentValue).add(value); + } else { + data[fieldIndex] = value; + } + } + + public void add(int fieldIndex, int value) { + addValue(fieldIndex, value); + } + + public void add(int fieldIndex, long value) { + addValue(fieldIndex, value); + } + + public void add(int fieldIndex, String value) { + addValue(fieldIndex, Binary.fromString(value)); + } + + public void add(int fieldIndex, boolean value) { + addValue(fieldIndex, value); + } + + public void add(int fieldIndex, Binary value) { + addValue(fieldIndex, value); + } + + public void add(int fieldIndex, float value) { + addValue(fieldIndex, value); + } + + public void add(int fieldIndex, double value) { + addValue(fieldIndex, value); + } + + public void add(int fieldIndex, GeoParquetGroup value) { + addValue(fieldIndex, value); + } + + public void add(int fieldIndex, Geometry geometry) { + byte[] bytes = new WKBWriter().write(geometry); + add(fieldIndex, Binary.fromConstantByteArray(bytes)); + } + + public void add(String field, int value) { + add(getParquetSchema().getFieldIndex(field), value); + } + + public void add(String field, long value) { + add(getParquetSchema().getFieldIndex(field), value); + } + + public void add(String field, float value) { + add(getParquetSchema().getFieldIndex(field), value); + } + + public void add(String field, double value) { + add(getParquetSchema().getFieldIndex(field), value); + } + + public void add(String field, String value) { + add(getParquetSchema().getFieldIndex(field), value); + } + + public void add(String field, boolean value) { + add(getParquetSchema().getFieldIndex(field), value); + } + + public void add(String field, Binary value) { + add(getParquetSchema().getFieldIndex(field), value); + } + + public void add(String field, GeoParquetGroup value) { + add(getParquetSchema().getFieldIndex(field), value); + } + + public void add(String field, Geometry geometry) { + add(getParquetSchema().getFieldIndex(field), geometry); + } + + private GeoParquetException createGeoParquetException(int fieldIndex, String elementText) { + String msg = String.format("Not found %d (%s) %s in group%n%s", fieldIndex, + parquetSchema.getFieldName(fieldIndex), elementText, this); + return new GeoParquetException(msg); + } + + public GeoParquetSchema getGeoParquetSchema() { + return geoParquetSchema; + } + + public GroupType getParquetSchema() { + return parquetSchema; + } + + public GeoParquetMetadata getGeoParquetMetadata() { + return geoParquetMetadata; + } + + // Getter methods for different data types + public String getStringValue(int fieldIndex, int index) { + return getBinaryValue(fieldIndex, index).toStringUsingUTF8(); + } + + public Binary getBinaryValue(int fieldIndex, int index) { + return (Binary) getValue(fieldIndex, index); + } + + public Geometry getGeometry(int fieldIndex, int index) { + byte[] bytes = getBinaryValue(fieldIndex, index).getBytes(); + try { + return new WKBReader().read(bytes); + } catch (ParseException e) { + throw new GeoParquetException("WKBReader failed to parse", e); + } + } + + // Simplify getter methods for single values + public Binary getBinaryValue(int fieldIndex) { + return (Binary) getValue(fieldIndex); + } + + public Boolean getBooleanValue(int fieldIndex) { + return (Boolean) getValue(fieldIndex); + } + + public Double getDoubleValue(int fieldIndex) { + return (Double) getValue(fieldIndex); + } + + public Float getFloatValue(int fieldIndex) { + return (Float) getValue(fieldIndex); + } + + public Integer getIntegerValue(int fieldIndex) { + return (Integer) getValue(fieldIndex); + } + + public Long getLongValue(int fieldIndex) { + return (Long) getValue(fieldIndex); + } + + public String getStringValue(int fieldIndex) { + return getStringValue(fieldIndex, 0); + } + + public Geometry getGeometryValue(int fieldIndex) { + return getGeometry(fieldIndex, 0); + } + + public GeoParquetGroup getGroupValue(int fieldIndex) { + return (GeoParquetGroup) getValue(fieldIndex); + } + + // Simplify getter methods for list of values + private List getValuesOfType(int fieldIndex, Class clazz) { + return getValues(fieldIndex).stream().map(clazz::cast).toList(); + } + + public List getBinaryValues(int fieldIndex) { + return getValuesOfType(fieldIndex, Binary.class); + } + + public List getBooleanValues(int fieldIndex) { + return getValuesOfType(fieldIndex, Boolean.class); + } + + public List getDoubleValues(int fieldIndex) { + return getValuesOfType(fieldIndex, Double.class); + } + + public List getFloatValues(int fieldIndex) { + return getValuesOfType(fieldIndex, Float.class); + } + + public List getIntegerValues(int fieldIndex) { + return getValuesOfType(fieldIndex, Integer.class); + } + + public List getLongValues(int fieldIndex) { + return getValuesOfType(fieldIndex, Long.class); + } + + public List getStringValues(int fieldIndex) { + return getValues(fieldIndex).stream() + .map(value -> ((Binary) value).toStringUsingUTF8()) + .toList(); + } + + public List getGeometryValues(int fieldIndex) { + return getValues(fieldIndex).stream() + .map(value -> { + try { + return new WKBReader().read(((Binary) value).getBytes()); + } catch (ParseException e) { + throw new GeoParquetException("WKBReader failed to parse.", e); + } + }) + .toList(); + } + + public List getGroupValues(int fieldIndex) { + if (data[fieldIndex] instanceof Listlist) { + return (List) list; + } else { + return List.of((GeoParquetGroup) data[fieldIndex]); + } + } + + // Helper method to get numeric value (float or double) + private double getNumericValue(GeoParquetGroup group, int fieldIndex) { + Type fieldType = group.getGeoParquetSchema().fields().get(fieldIndex).type(); + return switch (fieldType) { + case FLOAT -> group.getFloatValue(fieldIndex); + case DOUBLE -> group.getDoubleValue(fieldIndex); + default -> throw new GeoParquetException("Expected numeric field at index " + fieldIndex); + }; + } + + public Envelope getEnvelopeValue(int fieldIndex) { + return getEnvelopeValues(fieldIndex).get(0); + } + + public List getEnvelopeValues(int fieldIndex) { + return getGroupValues(fieldIndex).stream().map(group -> { + double xMin = getNumericValue(group, 0); + double yMin = getNumericValue(group, 1); + double xMax = getNumericValue(group, 2); + double yMax = getNumericValue(group, 3); + return new Envelope(xMin, xMax, yMin, yMax); + }).toList(); + } + + // Methods to access fields by name + public Binary getBinaryValue(String fieldName) { + return getBinaryValue(parquetSchema.getFieldIndex(fieldName)); + } + + public List getBinaryValues(String fieldName) { + return getBinaryValues(parquetSchema.getFieldIndex(fieldName)); + } + + public Boolean getBooleanValue(String fieldName) { + return getBooleanValue(parquetSchema.getFieldIndex(fieldName)); + } + + public List getBooleanValues(String fieldName) { + return getBooleanValues(parquetSchema.getFieldIndex(fieldName)); + } + + public Double getDoubleValue(String fieldName) { + return getDoubleValue(parquetSchema.getFieldIndex(fieldName)); + } + + public List getDoubleValues(String fieldName) { + return getDoubleValues(parquetSchema.getFieldIndex(fieldName)); + } + + public Float getFloatValue(String fieldName) { + return getFloatValue(parquetSchema.getFieldIndex(fieldName)); + } + + public List getFloatValues(String fieldName) { + return getFloatValues(parquetSchema.getFieldIndex(fieldName)); + } + + public Integer getIntegerValue(String fieldName) { + return getIntegerValue(parquetSchema.getFieldIndex(fieldName)); + } + + public List getIntegerValues(String fieldName) { + return getIntegerValues(parquetSchema.getFieldIndex(fieldName)); + } + + public Long getLongValue(String fieldName) { + return getLongValue(parquetSchema.getFieldIndex(fieldName)); + } + + public List getLongValues(String fieldName) { + return getLongValues(parquetSchema.getFieldIndex(fieldName)); + } + + public String getStringValue(String fieldName) { + return getStringValue(parquetSchema.getFieldIndex(fieldName)); + } + + public List getStringValues(String fieldName) { + return getStringValues(parquetSchema.getFieldIndex(fieldName)); + } + + public Geometry getGeometryValue(String fieldName) { + return getGeometryValue(parquetSchema.getFieldIndex(fieldName)); + } + + public List getGeometryValues(String fieldName) { + return getGeometryValues(parquetSchema.getFieldIndex(fieldName)); + } + + public GeoParquetGroup getGroupValue(String fieldName) { + return getGroupValue(parquetSchema.getFieldIndex(fieldName)); + } + + public List getGroupValues(String fieldName) { + return getGroupValues(parquetSchema.getFieldIndex(fieldName)); + } + + public Envelope getEnvelopeValue(String fieldName) { + return getEnvelopeValue(parquetSchema.getFieldIndex(fieldName)); + } + + public List getEnvelopeValues(String fieldName) { + return getEnvelopeValues(parquetSchema.getFieldIndex(fieldName)); + } + + public String toString() { + return toString(""); + } + + private String toString(String indent) { + StringBuilder builder = new StringBuilder(); + int fieldCount = parquetSchema.getFields().size(); + + for (int i = 0; i < fieldCount; i++) { + String fieldName = parquetSchema.getFieldName(i); + Object fieldValue = data[i]; + if (fieldValue != null) { + appendFieldToString(builder, indent, fieldName, fieldValue); + } + } + + return builder.toString(); + } + + private void appendFieldToString(StringBuilder builder, String indent, String fieldName, + Object fieldValue) { + if (fieldValue instanceof Listvalues) { + for (Object value : values) { + appendValueToString(builder, indent, fieldName, value); + } + } else { + appendValueToString(builder, indent, fieldName, fieldValue); + } + } + + private void appendValueToString(StringBuilder builder, String indent, String fieldName, + Object value) { + builder.append(indent).append(fieldName); + if (value == null) { + builder.append(": NULL\n"); + } else if (value instanceof GeoParquetGroup group) { + builder.append("\n").append(group.toString(indent + " ")); + } else { + String valueString = getValueAsString(value); + builder.append(": ").append(valueString).append("\n"); + } + } + + private String getValueAsString(Object value) { + if (value instanceof Binary binary) { + return binary.toStringUsingUTF8(); + } else { + return value.toString(); + } + } + +} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupConverter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupConverter.java similarity index 74% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupConverter.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupConverter.java index 6649aebde..fd8bd94fc 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupConverter.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupConverter.java @@ -15,21 +15,33 @@ * limitations under the License. */ -package org.apache.baremaps.geoparquet.data; +package org.apache.baremaps.geoparquet; import org.apache.parquet.io.api.Converter; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.Type; +/** + * A {@link GroupConverter} for {@link GeoParquetGroup}s. + */ class GeoParquetGroupConverter extends GroupConverter { private final GeoParquetGroupConverter parent; private final int index; - protected GeoParquetGroupImpl current; + protected GeoParquetGroup current; private final Converter[] converters; - GeoParquetGroupConverter(GeoParquetGroupConverter parent, int index, + /** + * Constructs a new {@code GeoParquetGroupConverter} with the specified parent, index and schema. + * + * @param parent the parent + * @param index the index + * @param schema the schema + */ + GeoParquetGroupConverter( + GeoParquetGroupConverter parent, + int index, GroupType schema) { this.parent = parent; this.index = index; @@ -43,26 +55,39 @@ class GeoParquetGroupConverter extends GroupConverter { } else { converters[i] = new GeoParquetGroupConverter(this, i, type.asGroupType()); } - } } + /** + * {@inheritDoc} + */ @Override public void start() { current = parent.getCurrentRecord().addGroup(index); } + /** + * {@inheritDoc} + */ @Override public Converter getConverter(int fieldIndex) { return converters[fieldIndex]; } + /** + * {@inheritDoc} + */ @Override public void end() { current = null; } - public GeoParquetGroupImpl getCurrentRecord() { + /** + * Returns the current record. + * + * @return the current record + */ + public GeoParquetGroup getCurrentRecord() { return current; } } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupFactory.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupFactory.java similarity index 51% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupFactory.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupFactory.java index 5abe77642..a88f3b13e 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupFactory.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupFactory.java @@ -15,29 +15,45 @@ * limitations under the License. */ -package org.apache.baremaps.geoparquet.data; +package org.apache.baremaps.geoparquet; import java.util.List; -import org.apache.baremaps.geoparquet.data.GeoParquetGroup.Field; +import org.apache.baremaps.geoparquet.GeoParquetSchema.*; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; -public class GeoParquetGroupFactory { +/** + * A factory for creating {@link GeoParquetGroup}s. + */ +class GeoParquetGroupFactory { private final GroupType schema; private final GeoParquetMetadata metadata; - private final GeoParquetGroup.Schema geoParquetSchema; + private final GeoParquetSchema geoParquetSchema; + /** + * Constructs a new {@code GeoParquetGroupFactory} with the specified schema and metadata. + * + * @param schema the schema + * @param metadata the metadata + */ public GeoParquetGroupFactory(GroupType schema, GeoParquetMetadata metadata) { this.schema = schema; this.metadata = metadata; this.geoParquetSchema = createGeoParquetSchema(schema, metadata); } - public static GeoParquetGroup.Schema createGeoParquetSchema( + /** + * Creates a {@link GeoParquetSchema} from a {@link GroupType} and a {@link GeoParquetMetadata}. + * + * @param schema the schema + * @param metadata the metadata + * @return the schema + */ + public static GeoParquetSchema createGeoParquetSchema( GroupType schema, GeoParquetMetadata metadata) { @@ -45,29 +61,29 @@ public static GeoParquetGroup.Schema createGeoParquetSchema( List fields = schema.getFields().stream().map(field -> { // Map the column cardinality - GeoParquetGroup.Cardinality cardinality = switch (field.getRepetition()) { - case REQUIRED -> GeoParquetGroup.Cardinality.REQUIRED; - case OPTIONAL -> GeoParquetGroup.Cardinality.OPTIONAL; - case REPEATED -> GeoParquetGroup.Cardinality.REPEATED; + Cardinality cardinality = switch (field.getRepetition()) { + case REQUIRED -> GeoParquetSchema.Cardinality.REQUIRED; + case OPTIONAL -> GeoParquetSchema.Cardinality.OPTIONAL; + case REPEATED -> GeoParquetSchema.Cardinality.REPEATED; }; // Handle geometry columns - if (field.isPrimitive() && metadata.isGeometryColumn(field.getName())) { - return new GeoParquetGroup.GeometryField(field.getName(), cardinality); + if (field.isPrimitive() && metadata.columns().containsKey(field.getName())) { + return new GeometryField(field.getName(), cardinality); } // Handle envelope columns else if (!field.isPrimitive() && field.getName().equals("bbox")) { GroupType groupType = field.asGroupType(); - GeoParquetGroup.Schema geoParquetSchema = createGeoParquetSchema(groupType, metadata); - return new GeoParquetGroup.EnvelopeField(field.getName(), cardinality, geoParquetSchema); + GeoParquetSchema geoParquetSchema = createGeoParquetSchema(groupType, metadata); + return new EnvelopeField(field.getName(), cardinality, geoParquetSchema); } // Handle group columns else if (!field.isPrimitive()) { GroupType groupType = field.asGroupType(); - GeoParquetGroup.Schema geoParquetSchema = createGeoParquetSchema(groupType, metadata); - return (Field) new GeoParquetGroup.GroupField( + GeoParquetSchema geoParquetSchema = createGeoParquetSchema(groupType, metadata); + return (Field) new GroupField( groupType.getName(), cardinality, geoParquetSchema); @@ -79,23 +95,27 @@ else if (!field.isPrimitive()) { PrimitiveTypeName primitiveTypeName = primitiveType.getPrimitiveTypeName(); String columnName = primitiveType.getName(); return switch (primitiveTypeName) { - case INT32 -> new GeoParquetGroup.IntegerField(columnName, cardinality); - case INT64 -> new GeoParquetGroup.LongField(columnName, cardinality); - case INT96 -> new GeoParquetGroup.Int96Field(columnName, cardinality); - case FLOAT -> new GeoParquetGroup.FloatField(columnName, cardinality); - case DOUBLE -> new GeoParquetGroup.DoubleField(columnName, cardinality); - case BOOLEAN -> new GeoParquetGroup.BooleanField(columnName, cardinality); - case BINARY -> new GeoParquetGroup.BinaryField(columnName, cardinality); - case FIXED_LEN_BYTE_ARRAY -> new GeoParquetGroup.BinaryField(columnName, cardinality); + case INT32 -> new IntegerField(columnName, cardinality); + case INT64 -> new LongField(columnName, cardinality); + case INT96 -> new Int96Field(columnName, cardinality); + case FLOAT -> new FloatField(columnName, cardinality); + case DOUBLE -> new DoubleField(columnName, cardinality); + case BOOLEAN -> new BooleanField(columnName, cardinality); + case BINARY -> new BinaryField(columnName, cardinality); + case FIXED_LEN_BYTE_ARRAY -> new BinaryField(columnName, cardinality); }; } }).toList(); - return new GeoParquetGroup.Schema(schema.getName(), fields); + return new GeoParquetSchema(schema.getName(), fields); } - public GeoParquetGroupImpl newGroup() { - return new GeoParquetGroupImpl(schema, metadata, geoParquetSchema); + /** + * Creates a new {@link GeoParquetGroup}. + * + * @return the group + */ + public GeoParquetGroup newGroup() { + return new GeoParquetGroup(schema, metadata, geoParquetSchema); } - } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupRecordConverter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupRecordMaterializer.java similarity index 79% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupRecordConverter.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupRecordMaterializer.java index 01cfc871d..cb84bb375 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupRecordConverter.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupRecordMaterializer.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.baremaps.geoparquet.data; +package org.apache.baremaps.geoparquet; /* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license @@ -36,13 +36,23 @@ import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; -public class GeoParquetGroupRecordConverter extends RecordMaterializer { +/** + * A {@link RecordMaterializer} for {@link GeoParquetGroup}s. + */ +class GeoParquetGroupRecordMaterializer extends RecordMaterializer { private final GeoParquetGroupFactory groupFactory; private final GeoParquetGroupConverter root; - public GeoParquetGroupRecordConverter(MessageType schema, GeoParquetMetadata metadata) { + /** + * Constructs a new {@code GeoParquetGroupRecordMaterializer} with the specified schema and + * metadata. + * + * @param schema the schema + * @param metadata the metadata + */ + public GeoParquetGroupRecordMaterializer(MessageType schema, GeoParquetMetadata metadata) { this.groupFactory = new GeoParquetGroupFactory(schema, metadata); this.root = new GeoParquetGroupConverter(null, 0, schema) { @Override @@ -57,11 +67,17 @@ public void end() { }; } + /** + * {@inheritDoc} + */ @Override - public GeoParquetGroupImpl getCurrentRecord() { + public GeoParquetGroup getCurrentRecord() { return root.getCurrentRecord(); } + /** + * {@inheritDoc} + */ @Override public GroupConverter getRootConverter() { return root; diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java deleted file mode 100644 index 7a86a3869..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet; - -import java.io.IOException; -import java.util.*; -import java.util.function.Consumer; -import org.apache.baremaps.geoparquet.data.GeoParquetGroup; -import org.apache.baremaps.geoparquet.hadoop.GeoParquetGroupReadSupport; -import org.apache.hadoop.fs.FileStatus; -import org.apache.parquet.hadoop.ParquetReader; - -class GeoParquetGroupSpliterator implements Spliterator { - - private final GeoParquetReader geoParquetReader; - private final List fileStatuses; - private ParquetReader reader; - - GeoParquetGroupSpliterator(GeoParquetReader geoParquetReader, List files) { - this.geoParquetReader = geoParquetReader; - this.fileStatuses = Collections.synchronizedList(files); - setupReaderForNextFile(); - } - - private void setupReaderForNextFile() { - FileStatus fileStatus = fileStatuses.remove(0); - try { - reader = createParquetReader(fileStatus); - } catch (IOException e) { - throw new GeoParquetException("Failed to create reader for " + fileStatus, e); - } - } - - @Override - public boolean tryAdvance(Consumer action) { - try { - // Read the next group - GeoParquetGroup group = reader.read(); - - // If the group is null, try to get the one from the next file. - while (group == null) { - synchronized (fileStatuses) { - if (fileStatuses.isEmpty()) { - reader.close(); - return false; - } - setupReaderForNextFile(); - } - group = reader.read(); - } - - // Accept the group and tell the caller that there are more groups to read - action.accept(group); - return true; - - } catch (IOException e) { - // If an exception occurs, try to close the resources and throw a runtime exception - try { - reader.close(); - } catch (IOException e2) { - // Ignore the exception as the original exception is more important - } - throw new GeoParquetException("IOException caught while trying to read the next file.", e); - } - } - - private ParquetReader createParquetReader(FileStatus file) - throws IOException { - return ParquetReader - .builder(new GeoParquetGroupReadSupport(), file.getPath()) - .withConf(geoParquetReader.configuration) - .build(); - } - - @Override - public Spliterator trySplit() { - List sublist; - synchronized (fileStatuses) { - if (fileStatuses.size() < 2) { - // There is nothing left to split - return null; - } - - sublist = fileStatuses.subList(0, fileStatuses.size() / 2); - } - List secondList = new ArrayList<>(sublist); - sublist.clear(); - - // Return a new spliterator with the sublist - return new GeoParquetGroupSpliterator(geoParquetReader, secondList); - } - - @Override - public long estimateSize() { - return geoParquetReader.size(); - } - - @Override - public int characteristics() { - // The spliterator is not ordered, or sorted - return NONNULL | CONCURRENT | DISTINCT; - } -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetMetadata.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetMetadata.java new file mode 100644 index 000000000..35f32d63d --- /dev/null +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetMetadata.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.baremaps.geoparquet; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import java.util.List; +import java.util.Map; + +/** + * A representation of the metadata of a GeoParquet file encoded as JSON. + * + * @param version + * @param primaryColumn + * @param columns + * @param encoding + * @param geometryTypes + * @param crs + * @param edges + * @param bbox + * @param epoch + * @param covering + */ +public record GeoParquetMetadata( + @JsonProperty("version") String version, + @JsonProperty("primary_column") String primaryColumn, + @JsonProperty("columns") Map columns, + @JsonProperty("encoding") String encoding, + @JsonProperty("geometry_types") List geometryTypes, + @JsonProperty("crs") Object crs, + @JsonProperty("edges") String edges, + @JsonProperty("bbox") List bbox, + @JsonProperty("epoch") String epoch, + @JsonProperty("covering") Object covering) { + + /** + * A representation of the metadata of a column encoded as JSON. + * + * @param encoding + * @param geometryTypes + * @param crs + * @param orientation + * @param edges + * @param bbox + */ + public record Column( + @JsonProperty("encoding") String encoding, + @JsonProperty("geometry_types") List geometryTypes, + @JsonProperty("crs") JsonNode crs, + @JsonProperty("orientation") String orientation, + @JsonProperty("edges") String edges, + @JsonProperty("bbox") List bbox) { + } + + /** + * Returns the SRID of the specified column. + * + * @param column the column + * @return the SRID, or 4326 if no valid SRID is found + */ + public int getSrid(String column) { + // Retrieve the column metadata, return default SRID if not present + Column columnMetadata = columns.get(column); + if (columnMetadata == null || columnMetadata.crs() == null) { + return 4326; // Default to 4326 if no CRS is present + } + + // Extract the CRS JsonNode + JsonNode crsNode = columnMetadata.crs(); + JsonNode idNode = crsNode.get("id"); + + // Return default SRID if "id" field is not present + if (idNode == null || idNode.get("authority") == null || idNode.get("code") == null) { + return 4326; + } + + // Extract authority and code values + String authority = idNode.get("authority").asText(); + String code = idNode.get("code").asText(); + + // Determine SRID based on authority and code + if (authority.equals("EPSG")) { + return getEpsgCode(code); + } else if (authority.equals("OGC")) { + return getOgcSrid(code); + } else { + return 4326; // Default SRID if authority is unrecognized + } + } + + /** + * Handle OGC-specific SRIDs. + * + * @param code the OGC code + * @return the SRID, or 0 if the code is unrecognized + */ + private int getOgcSrid(String code) { + if ("CRS84".equals(code)) { + return 4326; + } else { + return 0; // Unrecognized OGC code + } + } + + /** + * Parse EPSG SRID code as an integer. + * + * @param code the EPSG code + * @return the SRID as an integer, or 0 if the code is invalid + */ + private int getEpsgCode(String code) { + try { + return Integer.parseInt(code); + } catch (NumberFormatException e) { + return 0; // Return 0 if the code is not a valid integer + } + } + +} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetPrimitiveConverter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetPrimitiveConverter.java similarity index 89% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetPrimitiveConverter.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetPrimitiveConverter.java index d75fccbc7..b001991c2 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetPrimitiveConverter.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetPrimitiveConverter.java @@ -15,16 +15,25 @@ * limitations under the License. */ -package org.apache.baremaps.geoparquet.data; +package org.apache.baremaps.geoparquet; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.PrimitiveConverter; +/** + * A {@link PrimitiveConverter} for {@link GeoParquetGroup}s. + */ class GeoParquetPrimitiveConverter extends PrimitiveConverter { private final GeoParquetGroupConverter parent; private final int index; + /** + * Constructs a new {@code GeoParquetPrimitiveConverter} with the specified parent and index. + * + * @param parent the parent + * @param index the index + */ GeoParquetPrimitiveConverter(GeoParquetGroupConverter parent, int index) { this.parent = parent; this.index = index; diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java index 03a07e68d..8b4697aa1 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java @@ -22,149 +22,188 @@ import java.io.IOException; import java.net.URI; import java.util.*; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; -import org.apache.baremaps.geoparquet.data.GeoParquetGroup; -import org.apache.baremaps.geoparquet.data.GeoParquetGroup.Schema; -import org.apache.baremaps.geoparquet.data.GeoParquetGroupFactory; -import org.apache.baremaps.geoparquet.data.GeoParquetMetadata; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider; -import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; - +import org.locationtech.jts.geom.Envelope; /** - * This reader is based on the parquet example code located at: org.apache.parquet.example.data.*. + * This reader enables reading of GeoParquet files from a specified URI with the stream API. The + * schema of the Parquet files and the corresponding geoparquet schema and metadata are + * automatically inferred from the files. The reader can be used to read the records in a sequential + * or parallel manner. It is also capable of filtering records based on an envelope. */ public class GeoParquetReader { - private final URI uri; - final Configuration configuration; - private List files; - private Long groupCount; + protected final Configuration configuration; + protected final List files; + private final AtomicLong groupCount = new AtomicLong(-1); + private final Envelope envelope; - record FileInfo(FileStatus file, Long recordCount, Map keyValueMetadata, - MessageType messageType, GeoParquetMetadata metadata, - GeoParquetGroup.Schema geoParquetSchema) { + /** + * Constructs a new {@code GeoParquetReader}. + * + * @param uri the URI to read from + */ + public GeoParquetReader(URI uri) { + this(uri, null, new Configuration()); } - public GeoParquetReader(URI uri) { - this(uri, createConfiguration()); + /** + * Constructs a new {@code GeoParquetReader}. + * + * @param uri the URI to read from + * @param envelope the envelope to filter records + */ + public GeoParquetReader(URI uri, Envelope envelope) { + this(uri, envelope, new Configuration()); } - public GeoParquetReader(URI uri, Configuration configuration) { - this.uri = uri; + /** + * Constructs a new {@code GeoParquetReader}. + * + * @param uri the URI to read from + * @param configuration the configuration + */ + public GeoParquetReader(URI uri, Envelope envelope, Configuration configuration) { this.configuration = configuration; + this.files = initializeFiles(uri, configuration); + this.envelope = envelope; } public MessageType getParquetSchema() { - return files().stream() + return files.stream() .findFirst() .map(this::getFileInfo) - .orElseThrow() - .messageType(); + .orElseThrow( + () -> new GeoParquetException("No files available to read schema.")).messageType; } - private FileInfo getFileInfo(FileStatus fileStatus) { - try { - return buildFileInfo(fileStatus); - } catch (IOException e) { - throw new GeoParquetException("Failed to build Info", e); - } - } + public GeoParquetMetadata getGeoParquetMetadata() { - return files().stream() + return files.stream() .findFirst() .map(this::getFileInfo) - .orElseThrow() + .orElseThrow(this::noParquetFilesAvailable) .metadata(); } - public Schema getGeoParquetSchema() { - return files().stream() + public GeoParquetSchema getGeoParquetSchema() { + return files.stream() .findFirst() .map(this::getFileInfo) - .orElseThrow() + .orElseThrow(this::noParquetFilesAvailable) .geoParquetSchema(); } + public GeoParquetException noParquetFilesAvailable() { + return new GeoParquetException("No parquet files available."); + } + public boolean validateSchemasAreIdentical() { - // Verify that the files all have the same schema - final int messageTypeCount = files().stream().parallel().map(this::getFileInfo) - .map(FileInfo::messageType).collect(Collectors.toSet()).size(); - return messageTypeCount == 1; + // Verify that all files have the same schema + Set schemas = files.parallelStream() + .map(this::getFileInfo) + .map(fileInfo -> fileInfo.messageType) + .collect(Collectors.toSet()); + return schemas.size() == 1; } public long size() { - if (groupCount == null) { - groupCount = files().stream().parallel().map(this::getFileInfo).map(FileInfo::recordCount) - .reduce(0L, Long::sum); + if (groupCount.get() == -1) { + long totalCount = files.parallelStream() + .map(this::getFileInfo) + .mapToLong(fileInfo -> fileInfo.recordCount) + .sum(); + groupCount.set(totalCount); } - return groupCount; + return groupCount.get(); } - private synchronized List files() { + private FileInfo getFileInfo(FileStatus fileStatus) { try { - if (files == null) { - Path globPath = new Path(uri.getPath()); - FileSystem fileSystem = FileSystem.get(uri, configuration); - - files = new ArrayList<>(Arrays.asList(fileSystem.globStatus(globPath))); + ParquetMetadata parquetMetadata = + ParquetFileReader.readFooter(configuration, fileStatus.getPath()); + + long recordCount = parquetMetadata.getBlocks().stream() + .mapToLong(BlockMetaData::getRowCount) + .sum(); + + FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); + Map keyValueMetadata = fileMetaData.getKeyValueMetaData(); + MessageType messageType = fileMetaData.getSchema(); + + GeoParquetMetadata geoParquetMetadata = null; + GeoParquetSchema geoParquetSchema = null; + if (keyValueMetadata.containsKey("geo")) { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + geoParquetMetadata = + objectMapper.readValue(keyValueMetadata.get("geo"), GeoParquetMetadata.class); + geoParquetSchema = + GeoParquetGroupFactory.createGeoParquetSchema(messageType, geoParquetMetadata); } + + return new FileInfo( + fileStatus, + recordCount, + keyValueMetadata, + messageType, + geoParquetMetadata, + geoParquetSchema); + } catch (IOException e) { - throw new GeoParquetException("IOException while attempting to list files.", e); + throw new GeoParquetException("Failed to build FileInfo for file: " + fileStatus, e); } - return files; } - private FileInfo buildFileInfo(FileStatus file) throws IOException { - long recordCount; - MessageType messageType; - Map keyValueMetadata; - try (ParquetFileReader reader = ParquetFileReader.open(configuration, file.getPath())) { - recordCount = reader.getRecordCount(); - messageType = reader.getFileMetaData().getSchema(); - keyValueMetadata = reader.getFileMetaData().getKeyValueMetaData(); - } - GeoParquetMetadata geoParquetMetadata = null; - Schema geoParquetSchema = null; - if (keyValueMetadata.containsKey("geo")) { - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - geoParquetMetadata = - objectMapper.readValue(keyValueMetadata.get("geo"), GeoParquetMetadata.class); - geoParquetSchema = - GeoParquetGroupFactory.createGeoParquetSchema(messageType, geoParquetMetadata); + private static List initializeFiles(URI uri, Configuration configuration) { + try { + Path globPath = new Path(uri.getPath()); + FileSystem fileSystem = FileSystem.get(uri, configuration); + FileStatus[] fileStatuses = fileSystem.globStatus(globPath); + if (fileStatuses == null) { + throw new GeoParquetException("No files found at the specified URI."); + } + return Collections.unmodifiableList(Arrays.asList(fileStatuses)); + } catch (IOException e) { + throw new GeoParquetException("IOException while attempting to list files.", e); } - return new FileInfo(file, recordCount, keyValueMetadata, messageType, - geoParquetMetadata, geoParquetSchema); } - public Stream readParallel() { - return retrieveGeoParquetGroups(true); + private Stream streamGeoParquetGroups(boolean inParallel) { + Spliterator spliterator = + new GeoParquetSpliterator(files, envelope, configuration, 0, files.size()); + return StreamSupport.stream(spliterator, inParallel); } - private Stream retrieveGeoParquetGroups(boolean inParallel) { - return StreamSupport.stream(new GeoParquetGroupSpliterator(this, files()), inParallel); + public Stream read() { + return streamGeoParquetGroups(false); } - public Stream read() { - return retrieveGeoParquetGroups(false); + public Stream readParallel() { + return streamGeoParquetGroups(true); } - private static Configuration createConfiguration() { - Configuration conf = new Configuration(); - conf.set("fs.s3a.endpoint", "s3.us-west-2.amazonaws.com"); - conf.set("fs.s3a.aws.credentials.provider", AnonymousAWSCredentialsProvider.class.getName()); - conf.set("fs.s3a.impl", S3AFileSystem.class.getName()); - conf.set("fs.s3a.path.style.access", "true"); - return conf; + private record FileInfo( + FileStatus file, + long recordCount, + Map keyValueMetadata, + MessageType messageType, + GeoParquetMetadata metadata, + GeoParquetSchema geoParquetSchema) { + } + } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSchema.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSchema.java new file mode 100644 index 000000000..f7c1b5454 --- /dev/null +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSchema.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.baremaps.geoparquet; + +import java.util.List; + +/** + * A GeoParquet schema that describes the fields of a group and can easily be introspected. + * + * @param name + * @param fields the fields of the schema + */ +public record GeoParquetSchema(String name, List fields) { + + /** + * The type of a GeoParquet field. + */ + public enum Type { + BINARY, + BOOLEAN, + DOUBLE, + FLOAT, + INTEGER, + INT96, + LONG, + STRING, + GEOMETRY, + ENVELOPE, + GROUP + } + + /** + * The cardinality of a GeoParquet field. + */ + public enum Cardinality { + REQUIRED, + OPTIONAL, + REPEATED + } + + /** + * A sealed inteface for the fields of a GeoParquet schema. + *

+ * Sealed interfaces were introduced in Java 17 and can be used with pattern matching since Java + * 21. + */ + public sealed + interface Field { + String name(); + + Type type(); + + Cardinality cardinality(); + } + + record BinaryField(String name, Cardinality cardinality) implements Field { + + @Override + public Type type() { + return Type.BINARY; + } + } + + record BooleanField(String name, Cardinality cardinality) implements Field { + + @Override + public Type type() { + return Type.BOOLEAN; + } + } + + record DoubleField(String name, Cardinality cardinality) implements Field { + + @Override + public Type type() { + return Type.DOUBLE; + } + } + + record FloatField(String name, Cardinality cardinality) implements Field { + + @Override + public Type type() { + return Type.FLOAT; + } + } + + record IntegerField(String name, Cardinality cardinality) implements Field { + + @Override + public Type type() { + return Type.INTEGER; + } + } + + record Int96Field(String name, Cardinality cardinality) implements Field { + + @Override + public Type type() { + return Type.INT96; + } + } + + record LongField(String name, Cardinality cardinality) implements Field { + + @Override + public Type type() { + return Type.LONG; + } + } + + record StringField(String name, Cardinality cardinality) implements Field { + + @Override + public Type type() { + return Type.STRING; + } + } + + record GeometryField(String name, Cardinality cardinality) implements Field { + + @Override + public Type type() { + return Type.GEOMETRY; + } + } + + record EnvelopeField(String name, Cardinality cardinality, + GeoParquetSchema schema) implements Field { + + @Override + public Type type() { + return Type.ENVELOPE; + } + } + + public record GroupField(String name, Cardinality cardinality, + GeoParquetSchema schema) implements Field { + + @Override + public Type type() { + return Type.GROUP; + } + } +} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSpliterator.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSpliterator.java new file mode 100644 index 000000000..12d4a2662 --- /dev/null +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSpliterator.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.baremaps.geoparquet; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.util.List; +import java.util.Spliterator; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; +import org.locationtech.jts.geom.Envelope; + +/** + * A {@link Spliterator} for {@link GeoParquetGroup}s stored in Parquet files. The envelope is used + * to filter the records based on their bounding box. + */ +class GeoParquetSpliterator implements Spliterator { + + private final List files; + private final Configuration configuration; + private final Envelope envelope; + + private ParquetFileReader fileReader; + private int fileStartIndex; + private int fileEndIndex; + private MessageType schema; + private GeoParquetMetadata metadata; + private MessageColumnIO columnIO; + private RecordReader recordReader; + private int currentRowGroup; + private long rowsReadInGroup; + private long rowsInCurrentGroup; + + /** + * Constructs a new {@code GeoParquetSpliterator} with the specified files, envelope, + * configuration, file start index and file end index. + * + * @param files the files + * @param envelope the envelope + * @param configuration the configuration + * @param fileStartIndex the file start index + * @param fileEndIndex the file end index + */ + GeoParquetSpliterator( + List files, + Envelope envelope, + Configuration configuration, + int fileStartIndex, + int fileEndIndex) { + this.files = files; + this.configuration = configuration; + this.envelope = envelope; + this.fileStartIndex = fileStartIndex; + this.fileEndIndex = fileEndIndex; + setupReaderForNextFile(); + } + + private void setupReaderForNextFile() { + closeCurrentReader(); + + while (fileStartIndex < fileEndIndex) { + FileStatus fileStatus = files.get(fileStartIndex++); + try { + InputFile inputFile = HadoopInputFile.fromPath(fileStatus.getPath(), configuration); + fileReader = ParquetFileReader.open(inputFile); + + FileMetaData fileMetaData = fileReader.getFooter().getFileMetaData(); + + schema = fileMetaData.getSchema(); + metadata = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .readValue(fileMetaData.getKeyValueMetaData().get("geo"), GeoParquetMetadata.class); + + // Check if file's bbox overlaps with the envelope + if (envelope != null && metadata != null && metadata.bbox() != null) { + List fileBBox = metadata.bbox(); + if (fileBBox.size() == 4) { + Envelope fileEnvelope = new Envelope( + fileBBox.get(0), fileBBox.get(2), fileBBox.get(1), fileBBox.get(3)); + if (!fileEnvelope.intersects(envelope)) { + // Skip this file and continue to the next one + fileReader.close(); + fileReader = null; + continue; + } + } + } + + columnIO = new ColumnIOFactory().getColumnIO(schema); + currentRowGroup = 0; + rowsReadInGroup = 0; + rowsInCurrentGroup = 0; + advanceToNextRowGroup(); + return; + } catch (IOException e) { + throw new GeoParquetException("Failed to create reader for " + fileStatus, e); + } + } + + // No more files to process + fileReader = null; + } + + private void advanceToNextRowGroup() throws IOException { + if (currentRowGroup >= fileReader.getRowGroups().size()) { + setupReaderForNextFile(); + return; + } + + PageReadStore pages = fileReader.readNextFilteredRowGroup(); + if (pages == null) { + setupReaderForNextFile(); + return; + } + + rowsInCurrentGroup = pages.getRowCount(); + rowsReadInGroup = 0; + + GeoParquetGroupRecordMaterializer materializer = + new GeoParquetGroupRecordMaterializer(schema, metadata); + + FilterPredicate envelopeFilter = createEnvelopeFilter(schema, envelope); + Filter filter = envelopeFilter == null ? FilterCompat.NOOP : FilterCompat.get(envelopeFilter); + + recordReader = columnIO.getRecordReader(pages, materializer, filter); + currentRowGroup++; + } + + private FilterPredicate createEnvelopeFilter(MessageType schema, Envelope envelope) { + // Check whether the envelope is null or the world + if (envelope == null + || envelope.isNull() + || envelope.equals(new Envelope(-180, 180, -90, 90))) { + return null; + } + + // Check whether the schema has a bbox field + Type type = schema.getType("bbox"); + if (type == null) { + return null; + } + + // Check whether the bbox has the xmin, ymin, xmax, ymax fields + GroupType bbox = type.asGroupType(); + if (bbox.getFieldCount() != 4 + || !bbox.containsField("xmin") + || !bbox.containsField("ymin") + || !bbox.containsField("xmax") + || !bbox.containsField("ymax")) { + return null; + } + + // Check whether all fields are primitive types + List types = bbox.getFields(); + if (types.stream().anyMatch(t -> !t.isPrimitive())) { + return null; + } + + // Check whether all fields are of the same type + List typeNames = types.stream() + .map(t -> t.asPrimitiveType().getPrimitiveTypeName()) + .toList(); + PrimitiveTypeName typeName = typeNames.get(0); + if (!typeNames.stream().allMatch(typeName::equals)) { + return null; + } + + // Check whether the type is a float or a double + if (typeName != PrimitiveTypeName.DOUBLE && typeName != PrimitiveTypeName.FLOAT) { + return null; + } + + // Initialize the filter predicate creator for the given type + BiFunction filterPredicateCreator = + (column, value) -> switch (typeName) { + case DOUBLE -> FilterApi.gtEq(FilterApi.doubleColumn(column), value.doubleValue()); + case FLOAT -> FilterApi.gtEq(FilterApi.floatColumn(column), value.floatValue()); + default -> throw new IllegalStateException("Unexpected value: " + typeName); + }; + + // Create the filter predicate + return FilterApi.and( + FilterApi.and( + filterPredicateCreator.apply("bbox.xmin", envelope.getMinX()), + filterPredicateCreator.apply("bbox.xmax", envelope.getMaxX())), + FilterApi.and( + filterPredicateCreator.apply("bbox.ymin", envelope.getMinY()), + filterPredicateCreator.apply("bbox.ymax", envelope.getMaxY()))); + } + + @Override + public boolean tryAdvance(Consumer action) { + try { + while (true) { + if (fileReader == null) { + return false; + } + + if (rowsReadInGroup >= rowsInCurrentGroup) { + advanceToNextRowGroup(); + continue; + } + + GeoParquetGroup group = recordReader.read(); + rowsReadInGroup++; + if (group != null) { + action.accept(group); + } + + return true; + } + } catch (IOException e) { + closeCurrentReader(); + throw new GeoParquetException("IOException caught while trying to read the next record.", e); + } + } + + private void closeCurrentReader() { + if (fileReader != null) { + try { + fileReader.close(); + } catch (IOException e) { + throw new GeoParquetException("Failed to close ParquetFileReader.", e); + } finally { + fileReader = null; + } + } + } + + @Override + public Spliterator trySplit() { + int remainingFiles = fileEndIndex - fileStartIndex; + if (remainingFiles <= 1) { + return null; + } + int mid = fileStartIndex + remainingFiles / 2; + GeoParquetSpliterator split = + new GeoParquetSpliterator(files, envelope, configuration, mid, fileEndIndex); + this.fileEndIndex = mid; + return split; + } + + @Override + public long estimateSize() { + // Return Long.MAX_VALUE as the actual number of elements is unknown + return Long.MAX_VALUE; + } + + @Override + public int characteristics() { + return NONNULL | IMMUTABLE; + } +} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/DoubleValue.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/OvertureMaps.java similarity index 55% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/DoubleValue.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/OvertureMaps.java index 4537e10d8..78c53fbb1 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/DoubleValue.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/OvertureMaps.java @@ -15,30 +15,25 @@ * limitations under the License. */ -package org.apache.baremaps.geoparquet.data; +package org.apache.baremaps.geoparquet; -import org.apache.parquet.io.api.RecordConsumer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider; +import org.apache.hadoop.fs.s3a.S3AFileSystem; -public class DoubleValue extends Primitive { +public class OvertureMaps { - private final double value; - - public DoubleValue(double value) { - this.value = value; - } - - @Override - public double getDouble() { - return value; + private OvertureMaps() { + // Prevent instantiation } - @Override - public void writeValue(RecordConsumer recordConsumer) { - recordConsumer.addDouble(value); + public static Configuration configuration() { + Configuration conf = new Configuration(); + conf.set("fs.s3a.endpoint", "s3.us-west-2.amazonaws.com"); + conf.set("fs.s3a.aws.credentials.provider", AnonymousAWSCredentialsProvider.class.getName()); + conf.set("fs.s3a.impl", S3AFileSystem.class.getName()); + conf.set("fs.s3a.path.style.access", "true"); + return conf; } - @Override - public String toString() { - return String.valueOf(value); - } } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/common/GroupWriter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/common/GroupWriter.java deleted file mode 100644 index 09fc039be..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/common/GroupWriter.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.common; - -import org.apache.baremaps.geoparquet.data.GeoParquetGroupImpl; -import org.apache.parquet.io.api.RecordConsumer; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.Type; - -public abstract class GroupWriter { - protected final RecordConsumer recordConsumer; - protected final GroupType schema; - - protected GroupWriter(RecordConsumer recordConsumer, GroupType schema) { - this.recordConsumer = recordConsumer; - this.schema = schema; - } - - public final void write(GeoParquetGroupImpl group) { - recordConsumer.startMessage(); - writeGroup(group, schema); - recordConsumer.endMessage(); - } - - private void writeGroup(GeoParquetGroupImpl group, GroupType type) { - int fieldCount = type.getFieldCount(); - for (int field = 0; field < fieldCount; ++field) { - int valueCount = group.getFieldRepetitionCount(field); - if (valueCount > 0) { - Type fieldType = type.getType(field); - String fieldName = fieldType.getName(); - recordConsumer.startField(fieldName, field); - for (int index = 0; index < valueCount; ++index) { - if (fieldType.isPrimitive()) { - group.writeValue(field, index, recordConsumer); - } else { - recordConsumer.startGroup(); - writeGroup(group.getGroup(field, index), fieldType.asGroupType()); - recordConsumer.endGroup(); - } - } - recordConsumer.endField(fieldName, field); - } - } - } -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/BinaryValue.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/BinaryValue.java deleted file mode 100644 index b02678c50..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/BinaryValue.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.data; - -import org.apache.parquet.io.api.Binary; -import org.apache.parquet.io.api.RecordConsumer; - - -public class BinaryValue extends Primitive { - - private final Binary binary; - - public BinaryValue(Binary binary) { - this.binary = binary; - } - - @Override - public Binary getBinary() { - return binary; - } - - @Override - public String getString() { - return binary.toStringUsingUTF8(); - } - - @Override - public void writeValue(RecordConsumer recordConsumer) { - recordConsumer.addBinary(binary); - } - - @Override - public String toString() { - return getString(); - } -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/BooleanValue.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/BooleanValue.java deleted file mode 100644 index 50b49497e..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/BooleanValue.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.data; - -import org.apache.parquet.io.api.RecordConsumer; - -public class BooleanValue extends Primitive { - - private final boolean bool; - - public BooleanValue(boolean bool) { - this.bool = bool; - } - - @Override - public String toString() { - return String.valueOf(bool); - } - - @Override - public boolean getBoolean() { - return bool; - } - - @Override - public void writeValue(RecordConsumer recordConsumer) { - recordConsumer.addBoolean(bool); - } -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/FloatValue.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/FloatValue.java deleted file mode 100644 index 35aa03229..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/FloatValue.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.data; - -import org.apache.parquet.io.api.RecordConsumer; - -public class FloatValue extends Primitive { - - private final float value; - - public FloatValue(float value) { - this.value = value; - } - - @Override - public float getFloat() { - return value; - } - - @Override - public void writeValue(RecordConsumer recordConsumer) { - recordConsumer.addFloat(value); - } - - @Override - public String toString() { - return String.valueOf(value); - } -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetColumnMetadata.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetColumnMetadata.java deleted file mode 100644 index 1638d287f..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetColumnMetadata.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.data; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.base.Objects; -import java.util.List; - -public class GeoParquetColumnMetadata { - - @JsonProperty("encoding") - private String encoding; - - @JsonProperty("geometry_types") - private List geometryTypes; - - @JsonProperty("crs") - private JsonNode crs; - - @JsonProperty("orientation") - private String orientation; - - @JsonProperty("edges") - private String edges; - - @JsonProperty("bbox") - private Double[] bbox; - - public String getEncoding() { - return encoding; - } - - public void setEncoding(String encoding) { - this.encoding = encoding; - } - - public List getGeometryTypes() { - return geometryTypes; - } - - public void setGeometryTypes(List geometryTypes) { - this.geometryTypes = geometryTypes; - } - - public JsonNode getCrs() { - return crs; - } - - public void setCrs(JsonNode crs) { - this.crs = crs; - } - - public String getOrientation() { - return orientation; - } - - public void setOrientation(String orientation) { - this.orientation = orientation; - } - - public String getEdges() { - return edges; - } - - public void setEdges(String edges) { - this.edges = edges; - } - - public Double[] getBbox() { - return bbox; - } - - public void setBbox(Double[] bbox) { - this.bbox = bbox; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - GeoParquetColumnMetadata that = (GeoParquetColumnMetadata) o; - return Objects.equal(encoding, that.encoding) - && Objects.equal(geometryTypes, that.geometryTypes) - && Objects.equal(crs, that.crs) - && Objects.equal(orientation, that.orientation) - && Objects.equal(edges, that.edges) - && Objects.equal(bbox, that.bbox); - } - - @Override - public int hashCode() { - return Objects.hashCode(encoding, geometryTypes, crs, orientation, edges, bbox); - } -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java deleted file mode 100644 index 46e57bb3d..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java +++ /dev/null @@ -1,395 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.data; - -import java.util.List; -import org.apache.parquet.io.api.Binary; -import org.apache.parquet.schema.GroupType; -import org.locationtech.jts.geom.Envelope; -import org.locationtech.jts.geom.Geometry; - -/** - * A group of fields in a GeoParquet file. - * - */ -public interface GeoParquetGroup { - - /** - * Returns the GeoParquet schema of the group built upon the Parquet schema and the GeoParquet - * metadata. - * - * @return the GeoParquet schema - */ - Schema getSchema(); - - /** - * Returns the Parquet schema of the group. - * - * @return the Parquet schema - */ - GroupType getParquetSchema(); - - /** - * Returns the GeoParquet metadata of the group. - * - * @return the Parquet metadata - */ - GeoParquetMetadata getGeoParquetMetadata(); - - /** - * Creates a new empty group in the group at the specified field index. - * - * @param fieldIndex the field index - * @return the new group - */ - GeoParquetGroup createGroup(int fieldIndex); - - List getValues(int fieldIndex); - - Binary getBinaryValue(int fieldIndex); - - List getBinaryValues(int fieldIndex); - - Boolean getBooleanValue(int fieldIndex); - - List getBooleanValues(int fieldIndex); - - Double getDoubleValue(int fieldIndex); - - List getDoubleValues(int fieldIndex); - - Float getFloatValue(int fieldIndex); - - List getFloatValues(int fieldIndex); - - Integer getIntegerValue(int fieldIndex); - - List getIntegerValues(int fieldIndex); - - Binary getInt96Value(int fieldIndex); - - List getInt96Values(int fieldIndex); - - Binary getNanoTimeValue(int fieldIndex); - - List getNanoTimeValues(int fieldIndex); - - Long getLongValue(int fieldIndex); - - List getLongValues(int fieldIndex); - - String getStringValue(int fieldIndex); - - List getStringValues(int fieldIndex); - - Geometry getGeometryValue(int fieldIndex); - - List getGeometryValues(int fieldIndex); - - Envelope getEnvelopeValue(int fieldIndex); - - List getEnvelopeValues(int fieldIndex); - - GeoParquetGroup getGroupValue(int fieldIndex); - - List getGroupValues(int fieldIndex); - - Binary getBinaryValue(String fieldName); - - List getBinaryValues(String fieldName); - - Boolean getBooleanValue(String fieldName); - - List getBooleanValues(String fieldName); - - Double getDoubleValue(String fieldName); - - List getDoubleValues(String fieldName); - - Float getFloatValue(String fieldName); - - List getFloatValues(String fieldName); - - Integer getIntegerValue(String fieldName); - - List getIntegerValues(String fieldName); - - Binary getInt96Value(String fieldName); - - List getInt96Values(String fieldName); - - Binary getNanoTimeValue(String fieldName); - - List getNanoTimeValues(String fieldName); - - Long getLongValue(String fieldName); - - List getLongValues(String fieldName); - - String getStringValue(String fieldName); - - List getStringValues(String fieldName); - - Geometry getGeometryValue(String fieldName); - - List getGeometryValues(String fieldName); - - Envelope getEnvelopeValue(String fieldName); - - List getEnvelopeValues(String fieldName); - - GeoParquetGroup getGroupValue(String fieldName); - - List getGroupValues(String fieldName); - - void setBinaryValue(int fieldIndex, Binary binaryValue); - - void setBinaryValues(int fieldIndex, List binaryValues); - - void setBooleanValue(int fieldIndex, Boolean booleanValue); - - void setBooleanValues(int fieldIndex, List booleanValues); - - void setDoubleValue(int fieldIndex, Double doubleValue); - - void setDoubleValues(int fieldIndex, List doubleValues); - - void setFloatValue(int fieldIndex, Float floatValue); - - void setFloatValues(int fieldIndex, List floatValues); - - void setIntegerValue(int fieldIndex, Integer integerValue); - - void setIntegerValues(int fieldIndex, List integerValues); - - void setInt96Value(int fieldIndex, Binary int96Value); - - void setInt96Values(int fieldIndex, List int96Values); - - void setNanoTimeValue(int fieldIndex, Binary nanoTimeValue); - - void setNanoTimeValues(int fieldIndex, List nanoTimeValues); - - void setLongValue(int fieldIndex, Long longValue); - - void setLongValues(int fieldIndex, List longValues); - - void setStringValue(int fieldIndex, String stringValue); - - void setStringValues(int fieldIndex, List stringValues); - - void setGeometryValue(int fieldIndex, Geometry geometryValue); - - void setGeometryValues(int fieldIndex, List geometryValues); - - void setEnvelopeValue(int fieldIndex, Envelope envelopeValue); - - void setEnvelopeValues(int fieldIndex, List envelopeValues); - - void setGroupValue(int fieldIndex, GeoParquetGroup groupValue); - - void setGroupValues(int fieldIndex, List groupValues); - - void setBinaryValue(String fieldName, Binary binaryValue); - - void setBinaryValues(String fieldName, List binaryValues); - - void setBooleanValue(String fieldName, Boolean booleanValue); - - void setBooleanValues(String fieldName, List booleanValues); - - void setDoubleValue(String fieldName, Double doubleValue); - - void setDoubleValues(String fieldName, List doubleValues); - - void setFloatValue(String fieldName, Float floatValue); - - void setFloatValues(String fieldName, List floatValues); - - void setIntegerValue(String fieldName, Integer integerValue); - - void setIntegerValues(String fieldName, List integerValues); - - void setInt96Value(String fieldName, Binary int96Value); - - void setInt96Values(String fieldName, List int96Values); - - void setNanoTimeValue(String fieldName, Binary nanoTimeValue); - - void setNanoTimeValues(String fieldName, List nanoTimeValues); - - void setLongValue(String fieldName, Long longValue); - - void setLongValues(String fieldName, List longValues); - - void setStringValue(String fieldName, String stringValue); - - void setStringValues(String fieldName, List stringValues); - - void setGeometryValue(String fieldName, Geometry geometryValue); - - void setGeometryValues(String fieldName, List geometryValues); - - void setEnvelopeValue(String fieldName, Envelope envelopeValue); - - void setEnvelopeValues(String fieldName, List envelopeValues); - - void setGroupValue(String fieldName, GeoParquetGroup groupValue); - - void setGroupValues(String fieldName, List groupValues); - - /** - * A GeoParquet schema that describes the fields of a group and can easily be introspected. - * - * @param name - * @param fields the fields of the schema - */ - record Schema(String name, List fields) { - - } - - /** - * A sealed inteface for the fields of a GeoParquet schema. - *

- * Sealed interfaces were introduced in Java 17 and can be used with pattern matching since Java - * 21. - */ - sealed - interface Field { - String name(); - - Type type(); - - Cardinality cardinality(); - } - - record BinaryField(String name, Cardinality cardinality) implements Field { - - @Override - public Type type() { - return Type.BINARY; - } - } - - record BooleanField(String name, Cardinality cardinality) implements Field { - - @Override - public Type type() { - return Type.BOOLEAN; - } - } - - record DoubleField(String name, Cardinality cardinality) implements Field { - - @Override - public Type type() { - return Type.DOUBLE; - } - } - - record FloatField(String name, Cardinality cardinality) implements Field { - - @Override - public Type type() { - return Type.FLOAT; - } - } - - record IntegerField(String name, Cardinality cardinality) implements Field { - - @Override - public Type type() { - return Type.INTEGER; - } - } - - record Int96Field(String name, Cardinality cardinality) implements Field { - - @Override - public Type type() { - return Type.INT96; - } - } - - record LongField(String name, Cardinality cardinality) implements Field { - - @Override - public Type type() { - return Type.LONG; - } - } - - record StringField(String name, Cardinality cardinality) implements Field { - - @Override - public Type type() { - return Type.STRING; - } - } - - record GeometryField(String name, Cardinality cardinality) implements Field { - - @Override - public Type type() { - return Type.GEOMETRY; - } - } - - record EnvelopeField(String name, Cardinality cardinality, Schema schema) implements Field { - - @Override - public Type type() { - return Type.ENVELOPE; - } - } - - record GroupField(String name, Cardinality cardinality, Schema schema) implements Field { - - @Override - public Type type() { - return Type.GROUP; - } - } - - /** - * The type of a GeoParquet field. - */ - enum Type { - BINARY, - BOOLEAN, - DOUBLE, - FLOAT, - INTEGER, - INT96, - LONG, - STRING, - GEOMETRY, - ENVELOPE, - GROUP - } - - /** - * The cardinality of a GeoParquet field. - */ - enum Cardinality { - REQUIRED, - OPTIONAL, - REPEATED - } - -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java deleted file mode 100644 index a108e383f..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java +++ /dev/null @@ -1,826 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.data; - -import java.util.ArrayList; -import java.util.List; -import org.apache.baremaps.geoparquet.GeoParquetException; -import org.apache.parquet.io.api.Binary; -import org.apache.parquet.io.api.RecordConsumer; -import org.apache.parquet.schema.GroupType; -import org.locationtech.jts.geom.Envelope; -import org.locationtech.jts.geom.Geometry; -import org.locationtech.jts.io.ParseException; -import org.locationtech.jts.io.WKBReader; -import org.locationtech.jts.io.WKBWriter; - -public class GeoParquetGroupImpl implements GeoParquetGroup { - - private final GroupType schema; - - private final GeoParquetMetadata metadata; - - private final Schema geoParquetSchema; - - private final List[] data; - - public GeoParquetGroupImpl( - GroupType schema, - GeoParquetMetadata metadata, - Schema geoParquetSchema) { - this.schema = schema; - this.metadata = metadata; - this.geoParquetSchema = geoParquetSchema; - this.data = new List[schema.getFields().size()]; - for (int i = 0; i < schema.getFieldCount(); i++) { - this.data[i] = new ArrayList<>(); - } - } - - public GeoParquetGroupImpl addGroup(int fieldIndex) { - GeoParquetGroupImpl group = createGroup(fieldIndex); - add(fieldIndex, group); - return group; - } - - public GeoParquetGroupImpl addGroup(String field) { - return addGroup(getParquetSchema().getFieldIndex(field)); - } - - public GeoParquetGroupImpl getGroup(int fieldIndex, int index) { - return (GeoParquetGroupImpl) getValue(fieldIndex, index); - } - - public GeoParquetGroupImpl getGroup(String field, int index) { - return getGroup(getParquetSchema().getFieldIndex(field), index); - } - - public int getFieldRepetitionCount(int fieldIndex) { - List list = data[fieldIndex]; - return list == null ? 0 : list.size(); - } - - public String getValueToString(int fieldIndex, int index) { - return String.valueOf(getValue(fieldIndex, index)); - } - - public String getString(int fieldIndex, int index) { - return ((BinaryValue) getValue(fieldIndex, index)).getString(); - } - - public int getInteger(int fieldIndex, int index) { - return ((IntegerValue) getValue(fieldIndex, index)).getInteger(); - } - - public long getLong(int fieldIndex, int index) { - return ((LongValue) getValue(fieldIndex, index)).getLong(); - } - - public double getDouble(int fieldIndex, int index) { - return ((DoubleValue) getValue(fieldIndex, index)).getDouble(); - } - - public float getFloat(int fieldIndex, int index) { - return ((FloatValue) getValue(fieldIndex, index)).getFloat(); - } - - public boolean getBoolean(int fieldIndex, int index) { - return ((BooleanValue) getValue(fieldIndex, index)).getBoolean(); - } - - public Binary getBinary(int fieldIndex, int index) { - return ((BinaryValue) getValue(fieldIndex, index)).getBinary(); - } - - public Binary getInt96(int fieldIndex, int index) { - return ((Int96Value) getValue(fieldIndex, index)).getInt96(); - } - - public Geometry getGeometry(int fieldIndex, int index) { - byte[] bytes = ((BinaryValue) getValue(fieldIndex, index)).getBinary().getBytes(); - try { - return new WKBReader().read(bytes); - } catch (ParseException e) { - throw new GeoParquetException("WKBReader failed to parse", e); - } - } - - private Object getValue(int fieldIndex, int index) { - List list = getObjects(fieldIndex); - try { - return list.get(index); - } catch (IndexOutOfBoundsException e) { - String elementText = String.format(" element number %d ", index); - throw createGeoParquetException(fieldIndex, elementText); - } - } - - private List getObjects(int fieldIndex) { - List list; - if (fieldIndex < 0 || fieldIndex >= data.length) { - throw createGeoParquetException(fieldIndex, ""); - } - list = data[fieldIndex]; - return list; - } - - private GeoParquetException createGeoParquetException(int fieldIndex, String elementText) { - String msg = String.format("Not found %d (%s)%s in group%n%s", fieldIndex, - schema.getFieldName(fieldIndex), elementText, this); - return new GeoParquetException(msg); - } - - private void add(int fieldIndex, Primitive value) { - org.apache.parquet.schema.Type type = schema.getType(fieldIndex); - List list = getObjects(fieldIndex); - if (!type.isRepetition(org.apache.parquet.schema.Type.Repetition.REPEATED) - && !list.isEmpty()) { - throw new IllegalStateException("field " + fieldIndex + " (" + type.getName() - + ") can not have more than one value: " + list); - } - list.add(value); - } - - public void add(int fieldIndex, int value) { - add(fieldIndex, new IntegerValue(value)); - } - - public void add(int fieldIndex, long value) { - add(fieldIndex, new LongValue(value)); - } - - public void add(int fieldIndex, String value) { - add(fieldIndex, new BinaryValue(Binary.fromString(value))); - } - - public void add(int fieldIndex, boolean value) { - add(fieldIndex, new BooleanValue(value)); - } - - public void add(int fieldIndex, Binary value) { - switch (getParquetSchema().getType(fieldIndex).asPrimitiveType().getPrimitiveTypeName()) { - case BINARY, FIXED_LEN_BYTE_ARRAY: - add(fieldIndex, new BinaryValue(value)); - break; - case INT96: - add(fieldIndex, new Int96Value(value)); - break; - default: - throw new UnsupportedOperationException( - getParquetSchema().asPrimitiveType().getName() + " not supported for Binary"); - } - } - - public void add(int fieldIndex, float value) { - add(fieldIndex, new FloatValue(value)); - } - - public void add(int fieldIndex, double value) { - add(fieldIndex, new DoubleValue(value)); - } - - public void add(int fieldIndex, GeoParquetGroupImpl value) { - List list = data[fieldIndex]; - list.add(value); - } - - public void add(int fieldIndex, Geometry geometry) { - byte[] bytes = new WKBWriter().write(geometry); - add(fieldIndex, Binary.fromConstantByteArray(bytes)); - } - - public void add(String field, int value) { - add(getParquetSchema().getFieldIndex(field), value); - } - - public void add(String field, long value) { - add(getParquetSchema().getFieldIndex(field), value); - } - - public void add(String field, float value) { - add(getParquetSchema().getFieldIndex(field), value); - } - - public void add(String field, double value) { - add(getParquetSchema().getFieldIndex(field), value); - } - - public void add(String field, String value) { - add(getParquetSchema().getFieldIndex(field), value); - } - - public void add(String field, boolean value) { - add(getParquetSchema().getFieldIndex(field), value); - } - - public void add(String field, Binary value) { - add(getParquetSchema().getFieldIndex(field), value); - } - - public void add(String field, GeoParquetGroupImpl value) { - add(getParquetSchema().getFieldIndex(field), value); - } - - public void add(String field, Geometry geometry) { - byte[] bytes = new WKBWriter().write(geometry); - add(getParquetSchema().getFieldIndex(field), Binary.fromConstantByteArray(bytes)); - } - - public void writeValue(int field, int index, RecordConsumer recordConsumer) { - ((Primitive) getValue(field, index)).writeValue(recordConsumer); - } - - @Override - public String toString() { - return toString(""); - } - - public String toString(String indent) { - StringBuilder builder = new StringBuilder(); - appendToString(builder, indent); - return builder.toString(); - } - - private void appendToString(StringBuilder builder, String indent) { - int i = 0; - for (org.apache.parquet.schema.Type field : schema.getFields()) { - String name = field.getName(); - List values = data[i]; - ++i; - if (values != null && !values.isEmpty()) { - for (Object value : values) { - builder.append(indent).append(name); - if (value == null) { - builder.append(": NULL\n"); - } else if (value instanceof GeoParquetGroupImpl geoParquetGroupImpl) { - builder.append('\n'); - geoParquetGroupImpl.appendToString(builder, indent + " "); - } else { - builder.append(": ").append(value).append('\n'); - } - } - } - } - } - - @Override - public List getValues(int fieldIndex) { - return (List) data[fieldIndex]; - } - - private List getGroups(int fieldIndex) { - return (List) data[fieldIndex]; - } - - @Override - public Schema getSchema() { - return geoParquetSchema; - } - - @Override - public GroupType getParquetSchema() { - return schema; - } - - @Override - public GeoParquetMetadata getGeoParquetMetadata() { - return metadata; - } - - @Override - public GeoParquetGroupImpl createGroup(int fieldIndex) { - if (geoParquetSchema.fields().get(fieldIndex) instanceof EnvelopeField envelopeField) { - return new GeoParquetGroupImpl(schema.getType(fieldIndex).asGroupType(), metadata, - envelopeField.schema()); - } - - if (geoParquetSchema.fields().get(fieldIndex) instanceof GroupField groupField) { - return new GeoParquetGroupImpl(schema.getType(fieldIndex).asGroupType(), metadata, - groupField.schema()); - } - - GroupField field = ((GroupField) geoParquetSchema.fields().get(fieldIndex)); - return new GeoParquetGroupImpl( - schema.getType(fieldIndex).asGroupType(), - metadata, - field.schema()); - } - - @Override - public Binary getBinaryValue(int fieldIndex) { - return getBinaryValues(fieldIndex).get(0); - } - - @Override - public List getBinaryValues(int fieldIndex) { - return getValues(fieldIndex).stream().map(Primitive::getBinary).toList(); - } - - @Override - public Boolean getBooleanValue(int fieldIndex) { - return getBooleanValues(fieldIndex).get(0); - } - - @Override - public List getBooleanValues(int fieldIndex) { - return getValues(fieldIndex).stream().map(Primitive::getBoolean).toList(); - } - - @Override - public Double getDoubleValue(int fieldIndex) { - return getDoubleValues(fieldIndex).get(0); - } - - @Override - public List getDoubleValues(int fieldIndex) { - return getValues(fieldIndex).stream().map(Primitive::getDouble).toList(); - } - - @Override - public Float getFloatValue(int fieldIndex) { - return getFloatValues(fieldIndex).get(0); - } - - @Override - public List getFloatValues(int fieldIndex) { - return getValues(fieldIndex).stream().map(Primitive::getFloat).toList(); - } - - @Override - public Integer getIntegerValue(int fieldIndex) { - return getIntegerValues(fieldIndex).get(0); - } - - @Override - public List getIntegerValues(int fieldIndex) { - return getValues(fieldIndex).stream().map(Primitive::getInteger).toList(); - } - - @Override - public Binary getInt96Value(int fieldIndex) { - return getBinaryValues(fieldIndex).get(0); - } - - @Override - public List getInt96Values(int fieldIndex) { - return getValues(fieldIndex).stream().map(Primitive::getBinary).toList(); - } - - @Override - public Binary getNanoTimeValue(int fieldIndex) { - return getBinaryValues(fieldIndex).get(0); - } - - @Override - public List getNanoTimeValues(int fieldIndex) { - return getValues(fieldIndex).stream().map(Primitive::getBinary).toList(); - } - - @Override - public Long getLongValue(int fieldIndex) { - return getLongValues(fieldIndex).get(0); - } - - @Override - public List getLongValues(int fieldIndex) { - return getValues(fieldIndex).stream().map(Primitive::getLong).toList(); - } - - @Override - public String getStringValue(int fieldIndex) { - return getStringValues(fieldIndex).get(0); - } - - @Override - public List getStringValues(int fieldIndex) { - return getValues(fieldIndex).stream().map(Primitive::getString).toList(); - } - - @Override - public Geometry getGeometryValue(int fieldIndex) { - return getGeometryValues(fieldIndex).get(0); - } - - @Override - public List getGeometryValues(int fieldIndex) { - List geometries = new ArrayList<>(); - for (Binary binary : getBinaryValues(fieldIndex)) { - try { - geometries.add(new WKBReader().read(binary.getBytes())); - } catch (ParseException e) { - throw new GeoParquetException("WKBReader failed to parse.", e); - } - } - return geometries; - } - - @Override - public Envelope getEnvelopeValue(int fieldIndex) { - return getEnvelopeValues(fieldIndex).get(0); - } - - @Override - public List getEnvelopeValues(int fieldIndex) { - return getGroupValues(fieldIndex).stream().map(group -> { - double xMin = group.getSchema().fields().get(0).type().equals(Type.FLOAT) - ? (double) group.getFloatValue(0) - : group.getDoubleValue(0); - double yMin = group.getSchema().fields().get(1).type().equals(Type.FLOAT) - ? (double) group.getFloatValue(1) - : group.getDoubleValue(1); - double xMax = group.getSchema().fields().get(2).type().equals(Type.FLOAT) - ? (double) group.getFloatValue(2) - : group.getDoubleValue(2); - double yMax = group.getSchema().fields().get(0).type().equals(Type.FLOAT) - ? (double) group.getFloatValue(3) - : group.getDoubleValue(3); - return new Envelope(xMin, xMax, yMin, yMax); - }).toList(); - } - - @Override - public GeoParquetGroup getGroupValue(int fieldIndex) { - return getGroupValues(fieldIndex).get(0); - } - - @Override - public List getGroupValues(int fieldIndex) { - return getGroups(fieldIndex); - } - - @Override - public Binary getBinaryValue(String fieldName) { - return getBinaryValues(fieldName).get(0); - } - - @Override - public List getBinaryValues(String fieldName) { - return getBinaryValues(schema.getFieldIndex(fieldName)); - } - - @Override - public Boolean getBooleanValue(String fieldName) { - return getBooleanValues(fieldName).get(0); - } - - @Override - public List getBooleanValues(String fieldName) { - return getBooleanValues(schema.getFieldIndex(fieldName)); - } - - @Override - public Double getDoubleValue(String fieldName) { - return getDoubleValues(fieldName).get(0); - } - - @Override - public List getDoubleValues(String fieldName) { - return getDoubleValues(schema.getFieldIndex(fieldName)); - } - - @Override - public Float getFloatValue(String fieldName) { - return getFloatValues(fieldName).get(0); - } - - @Override - public List getFloatValues(String fieldName) { - return getFloatValues(schema.getFieldIndex(fieldName)); - } - - @Override - public Integer getIntegerValue(String fieldName) { - return getIntegerValues(fieldName).get(0); - } - - @Override - public List getIntegerValues(String fieldName) { - return getIntegerValues(schema.getFieldIndex(fieldName)); - } - - @Override - public Binary getInt96Value(String fieldName) { - return getBinaryValues(fieldName).get(0); - } - - @Override - public List getInt96Values(String fieldName) { - return getBinaryValues(schema.getFieldIndex(fieldName)); - } - - @Override - public Binary getNanoTimeValue(String fieldName) { - return getBinaryValues(fieldName).get(0); - } - - @Override - public List getNanoTimeValues(String fieldName) { - return getBinaryValues(schema.getFieldIndex(fieldName)); - } - - @Override - public Long getLongValue(String fieldName) { - return getLongValues(fieldName).get(0); - } - - @Override - public List getLongValues(String fieldName) { - return getLongValues(schema.getFieldIndex(fieldName)); - } - - @Override - public String getStringValue(String fieldName) { - return getStringValues(fieldName).get(0); - } - - @Override - public List getStringValues(String fieldName) { - return getStringValues(schema.getFieldIndex(fieldName)); - } - - @Override - public Geometry getGeometryValue(String fieldName) { - return getGeometryValues(fieldName).get(0); - } - - @Override - public List getGeometryValues(String fieldName) { - return getGeometryValues(schema.getFieldIndex(fieldName)); - } - - @Override - public Envelope getEnvelopeValue(String fieldName) { - return getEnvelopeValues(fieldName).get(0); - } - - @Override - public List getEnvelopeValues(String fieldName) { - return getEnvelopeValues(schema.getFieldIndex(fieldName)); - } - - @Override - public GeoParquetGroup getGroupValue(String fieldName) { - return getGroupValues(fieldName).get(0); - } - - @Override - public List getGroupValues(String fieldName) { - return getGroupValues(schema.getFieldIndex(fieldName)); - } - - @Override - public void setBinaryValue(int fieldIndex, Binary binaryValue) { - throw new UnsupportedOperationException(); - } - - @Override - public void setBinaryValues(int fieldIndex, List binaryValues) { - throw new UnsupportedOperationException(); - } - - @Override - public void setBooleanValue(int fieldIndex, Boolean booleanValue) { - throw new UnsupportedOperationException(); - } - - @Override - public void setBooleanValues(int fieldIndex, List booleanValues) { - throw new UnsupportedOperationException(); - } - - @Override - public void setDoubleValue(int fieldIndex, Double doubleValue) { - throw new UnsupportedOperationException(); - } - - @Override - public void setDoubleValues(int fieldIndex, List doubleValues) { - throw new UnsupportedOperationException(); - } - - @Override - public void setFloatValue(int fieldIndex, Float floatValue) { - throw new UnsupportedOperationException(); - } - - @Override - public void setFloatValues(int fieldIndex, List floatValues) { - throw new UnsupportedOperationException(); - } - - @Override - public void setIntegerValue(int fieldIndex, Integer integerValue) { - throw new UnsupportedOperationException(); - } - - @Override - public void setIntegerValues(int fieldIndex, List integerValues) { - throw new UnsupportedOperationException(); - } - - @Override - public void setInt96Value(int fieldIndex, Binary int96Value) { - throw new UnsupportedOperationException(); - } - - @Override - public void setInt96Values(int fieldIndex, List int96Values) { - throw new UnsupportedOperationException(); - } - - @Override - public void setNanoTimeValue(int fieldIndex, Binary nanoTimeValue) { - throw new UnsupportedOperationException(); - } - - @Override - public void setNanoTimeValues(int fieldIndex, List nanoTimeValues) { - throw new UnsupportedOperationException(); - } - - @Override - public void setLongValue(int fieldIndex, Long longValue) { - throw new UnsupportedOperationException(); - } - - @Override - public void setLongValues(int fieldIndex, List longValues) { - throw new UnsupportedOperationException(); - } - - @Override - public void setStringValue(int fieldIndex, String stringValue) { - throw new UnsupportedOperationException(); - } - - @Override - public void setStringValues(int fieldIndex, List stringValues) { - throw new UnsupportedOperationException(); - } - - @Override - public void setGeometryValue(int fieldIndex, Geometry geometryValue) { - throw new UnsupportedOperationException(); - } - - @Override - public void setGeometryValues(int fieldIndex, List geometryValues) { - throw new UnsupportedOperationException(); - } - - @Override - public void setEnvelopeValue(int fieldIndex, Envelope envelopeValue) { - throw new UnsupportedOperationException(); - } - - @Override - public void setEnvelopeValues(int fieldIndex, List envelopeValues) { - throw new UnsupportedOperationException(); - } - - @Override - public void setGroupValue(int fieldIndex, GeoParquetGroup groupValue) { - throw new UnsupportedOperationException(); - } - - @Override - public void setGroupValues(int fieldIndex, List groupValues) { - throw new UnsupportedOperationException(); - } - - @Override - public void setBinaryValue(String fieldName, Binary binaryValue) { - throw new UnsupportedOperationException(); - } - - @Override - public void setBinaryValues(String fieldName, List binaryValues) { - throw new UnsupportedOperationException(); - } - - @Override - public void setBooleanValue(String fieldName, Boolean booleanValue) { - throw new UnsupportedOperationException(); - } - - @Override - public void setBooleanValues(String fieldName, List booleanValues) { - throw new UnsupportedOperationException(); - } - - @Override - public void setDoubleValue(String fieldName, Double doubleValue) { - throw new UnsupportedOperationException(); - } - - @Override - public void setDoubleValues(String fieldName, List doubleValues) { - throw new UnsupportedOperationException(); - } - - @Override - public void setFloatValue(String fieldName, Float floatValue) { - throw new UnsupportedOperationException(); - } - - @Override - public void setFloatValues(String fieldName, List floatValues) { - throw new UnsupportedOperationException(); - } - - @Override - public void setIntegerValue(String fieldName, Integer integerValue) { - throw new UnsupportedOperationException(); - } - - @Override - public void setIntegerValues(String fieldName, List integerValues) { - throw new UnsupportedOperationException(); - } - - @Override - public void setInt96Value(String fieldName, Binary int96Value) { - throw new UnsupportedOperationException(); - } - - @Override - public void setInt96Values(String fieldName, List int96Values) { - throw new UnsupportedOperationException(); - } - - @Override - public void setNanoTimeValue(String fieldName, Binary nanoTimeValue) { - throw new UnsupportedOperationException(); - } - - @Override - public void setNanoTimeValues(String fieldName, List nanoTimeValues) { - throw new UnsupportedOperationException(); - } - - @Override - public void setLongValue(String fieldName, Long longValue) { - throw new UnsupportedOperationException(); - } - - @Override - public void setLongValues(String fieldName, List longValues) { - throw new UnsupportedOperationException(); - } - - @Override - public void setStringValue(String fieldName, String stringValue) { - throw new UnsupportedOperationException(); - } - - @Override - public void setStringValues(String fieldName, List stringValues) { - throw new UnsupportedOperationException(); - } - - @Override - public void setGeometryValue(String fieldName, Geometry geometryValue) { - throw new UnsupportedOperationException(); - } - - @Override - public void setGeometryValues(String fieldName, List geometryValues) { - throw new UnsupportedOperationException(); - } - - @Override - public void setEnvelopeValue(String fieldName, Envelope envelopeValue) { - throw new UnsupportedOperationException(); - } - - @Override - public void setEnvelopeValues(String fieldName, List envelopeValues) { - throw new UnsupportedOperationException(); - } - - @Override - public void setGroupValue(String fieldName, GeoParquetGroup groupValue) { - throw new UnsupportedOperationException(); - } - - @Override - public void setGroupValues(String fieldName, List groupValues) { - throw new UnsupportedOperationException(); - } - -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupWriter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupWriter.java deleted file mode 100644 index 911a3dda4..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupWriter.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.data; - -import org.apache.baremaps.geoparquet.common.GroupWriter; -import org.apache.parquet.io.api.RecordConsumer; -import org.apache.parquet.schema.GroupType; - -public class GeoParquetGroupWriter extends GroupWriter { - public GeoParquetGroupWriter(RecordConsumer recordConsumer, GroupType schema) { - super(recordConsumer, schema); - } -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMaterializer.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMaterializer.java deleted file mode 100644 index a8cc46a06..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMaterializer.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.data; - -import org.apache.parquet.io.api.GroupConverter; -import org.apache.parquet.io.api.RecordMaterializer; -import org.apache.parquet.schema.MessageType; - -public class GeoParquetMaterializer extends RecordMaterializer { - - private final GeoParquetGroupFactory groupFactory; - - private final GeoParquetGroupConverter root; - - public GeoParquetMaterializer(MessageType schema, GeoParquetMetadata metadata) { - this.groupFactory = new GeoParquetGroupFactory(schema, metadata); - this.root = new GeoParquetGroupConverter(null, 0, schema) { - @Override - public void start() { - this.current = groupFactory.newGroup(); - } - }; - } - - @Override - public GeoParquetGroupImpl getCurrentRecord() { - return root.getCurrentRecord(); - } - - @Override - public GroupConverter getRootConverter() { - return root; - } - -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMetadata.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMetadata.java deleted file mode 100644 index fe3955d7f..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMetadata.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.data; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.base.Objects; -import java.util.Map; -import java.util.Optional; - -public class GeoParquetMetadata { - - @JsonProperty("version") - private String version; - - @JsonProperty("primary_column") - private String primaryColumn; - - @JsonProperty("columns") - private Map columns; - - public String getVersion() { - return version; - } - - public void setVersion(String version) { - this.version = version; - } - - public String getPrimaryColumn() { - return primaryColumn; - } - - public void setPrimaryColumn(String primaryColumn) { - this.primaryColumn = primaryColumn; - } - - public Map getColumns() { - return columns; - } - - public void setColumns(Map columns) { - this.columns = columns; - } - - public int getSrid(String column) { - return Optional.ofNullable(getColumns().get(column).getCrs()).map(crs -> { - JsonNode id = crs.get("id"); - return switch (id.get("authority").asText()) { - case "OGC" -> switch (id.get("code").asText()) { - case "CRS84" -> 4326; - default -> 0; - }; - case "EPSG" -> id.get("code").asInt(); - default -> 0; - }; - }).orElse(4326); - } - - public boolean isGeometryColumn(String column) { - return columns.containsKey(column); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - GeoParquetMetadata that = (GeoParquetMetadata) o; - return Objects.equal(version, that.version) - && Objects.equal(primaryColumn, that.primaryColumn) - && Objects.equal(columns, that.columns); - } - - @Override - public int hashCode() { - return Objects.hashCode(version, primaryColumn, columns); - } -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/Int96Value.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/Int96Value.java deleted file mode 100644 index 33576e5cd..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/Int96Value.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.data; - -import org.apache.parquet.io.api.Binary; -import org.apache.parquet.io.api.RecordConsumer; - -public class Int96Value extends Primitive { - - private final Binary value; - - public Int96Value(Binary value) { - this.value = value; - } - - @Override - public Binary getInt96() { - return value; - } - - @Override - public void writeValue(RecordConsumer recordConsumer) { - recordConsumer.addBinary(value); - } - - @Override - public String toString() { - return "Int96Value{" + value + "}"; - } -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/IntegerValue.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/IntegerValue.java deleted file mode 100644 index 6b3086e9c..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/IntegerValue.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.data; - -import org.apache.parquet.io.api.RecordConsumer; - - -public class IntegerValue extends Primitive { - - private final int value; - - public IntegerValue(int value) { - this.value = value; - } - - @Override - public String toString() { - return String.valueOf(value); - } - - @Override - public int getInteger() { - return value; - } - - @Override - public void writeValue(RecordConsumer recordConsumer) { - recordConsumer.addInteger(value); - } -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/LongValue.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/LongValue.java deleted file mode 100644 index 34483b68f..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/LongValue.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.data; - -import org.apache.parquet.io.api.RecordConsumer; - -public class LongValue extends Primitive { - - private final long value; - - public LongValue(long value) { - this.value = value; - } - - @Override - public String toString() { - return String.valueOf(value); - } - - @Override - public long getLong() { - return value; - } - - @Override - public void writeValue(RecordConsumer recordConsumer) { - recordConsumer.addLong(value); - } -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/Primitive.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/Primitive.java deleted file mode 100644 index 79bd9517c..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/Primitive.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.data; - -import org.apache.parquet.io.api.Binary; -import org.apache.parquet.io.api.RecordConsumer; -import org.locationtech.jts.geom.Geometry; - -public abstract class Primitive { - - public String getString() { - throw new UnsupportedOperationException(); - } - - public int getInteger() { - throw new UnsupportedOperationException(); - } - - public long getLong() { - throw new UnsupportedOperationException(); - } - - public boolean getBoolean() { - throw new UnsupportedOperationException(); - } - - public Binary getBinary() { - throw new UnsupportedOperationException(); - } - - public Binary getInt96() { - throw new UnsupportedOperationException(); - } - - public float getFloat() { - throw new UnsupportedOperationException(); - } - - public double getDouble() { - throw new UnsupportedOperationException(); - } - - public Geometry getGeometry() { - throw new UnsupportedOperationException(); - } - - public abstract void writeValue(RecordConsumer recordConsumer); -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupReadSupport.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupReadSupport.java deleted file mode 100644 index 21dfc3767..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupReadSupport.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.hadoop; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.util.Map; -import org.apache.baremaps.geoparquet.GeoParquetException; -import org.apache.baremaps.geoparquet.data.GeoParquetGroup; -import org.apache.baremaps.geoparquet.data.GeoParquetGroupRecordConverter; -import org.apache.baremaps.geoparquet.data.GeoParquetMetadata; -import org.apache.hadoop.conf.Configuration; -import org.apache.parquet.hadoop.api.ReadSupport; -import org.apache.parquet.io.api.RecordMaterializer; -import org.apache.parquet.schema.MessageType; - -public class GeoParquetGroupReadSupport extends ReadSupport { - - @Override - public ReadContext init( - Configuration configuration, Map keyValueMetaData, - MessageType fileSchema) { - String partialSchemaString = configuration.get(ReadSupport.PARQUET_READ_SCHEMA); - MessageType requestedProjection = getSchemaForRead(fileSchema, partialSchemaString); - return new ReadContext(requestedProjection); - } - - @Override - public RecordMaterializer prepareForRead(Configuration configuration, - Map keyValueMetaData, MessageType fileSchema, - ReadContext readContext) { - - // Read the GeoParquet metadata of the Parquet file - try { - String json = keyValueMetaData.get("geo"); - GeoParquetMetadata metadata = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) - .readValue(json, GeoParquetMetadata.class); - return new GeoParquetGroupRecordConverter(readContext.getRequestedSchema(), metadata); - } catch (JsonProcessingException e) { - throw new GeoParquetException("Failed to read GeoParquet's metadata of the Parquet file", e); - } - } - -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriteSupport.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriteSupport.java deleted file mode 100644 index b28607d61..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriteSupport.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.hadoop; - -import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; - -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import org.apache.baremaps.geoparquet.data.GeoParquetGroupImpl; -import org.apache.hadoop.conf.Configuration; -import org.apache.parquet.hadoop.api.WriteSupport; -import org.apache.parquet.io.api.RecordConsumer; -import org.apache.parquet.schema.MessageType; - -public class GeoParquetGroupWriteSupport extends WriteSupport { - - public static final String PARQUET_EXAMPLE_SCHEMA = "parquet.example.schema"; - - public static void setSchema(MessageType schema, Configuration configuration) { - configuration.set(PARQUET_EXAMPLE_SCHEMA, schema.toString()); - } - - public static MessageType getSchema(Configuration configuration) { - return parseMessageType( - Objects.requireNonNull(configuration.get(PARQUET_EXAMPLE_SCHEMA), PARQUET_EXAMPLE_SCHEMA)); - } - - private MessageType schema; - private GeoParquetGroupWriter groupWriter; - private final Map extraMetaData; - - public GeoParquetGroupWriteSupport() { - this(null, new HashMap<>()); - } - - GeoParquetGroupWriteSupport(MessageType schema) { - this(schema, new HashMap<>()); - } - - GeoParquetGroupWriteSupport(MessageType schema, Map extraMetaData) { - this.schema = schema; - this.extraMetaData = extraMetaData; - } - - @Override - public String getName() { - return "example"; - } - - @Override - public WriteContext init(Configuration configuration) { - // if present, prefer the schema passed to the constructor - if (schema == null) { - schema = getSchema(configuration); - } - return new WriteContext(schema, this.extraMetaData); - } - - @Override - public void prepareForWrite(RecordConsumer recordConsumer) { - groupWriter = new GeoParquetGroupWriter(recordConsumer, schema); - } - - @Override - public void write(GeoParquetGroupImpl group) { - groupWriter.write(group); - } - -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriter.java deleted file mode 100644 index 6e579ca9c..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriter.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.hadoop; - -import org.apache.baremaps.geoparquet.common.GroupWriter; -import org.apache.parquet.io.api.RecordConsumer; -import org.apache.parquet.schema.GroupType; - -public class GeoParquetGroupWriter extends GroupWriter { - - public GeoParquetGroupWriter(RecordConsumer recordConsumer, GroupType schema) { - super(recordConsumer, schema); - } -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetInputFormat.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetInputFormat.java deleted file mode 100644 index 1c2bc8ff3..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetInputFormat.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.hadoop; - -import org.apache.baremaps.geoparquet.data.GeoParquetGroup; -import org.apache.parquet.hadoop.ParquetInputFormat; - -/** - * Example input format to read Parquet files - * - * This Input format uses a rather inefficient data model but works independently of higher level - * abstractions. - */ -public class GeoParquetInputFormat extends ParquetInputFormat { - - public GeoParquetInputFormat() { - super(GeoParquetGroupReadSupport.class); - } - -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetOutputFormat.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetOutputFormat.java deleted file mode 100644 index a4d733203..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetOutputFormat.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.hadoop; - -import org.apache.baremaps.geoparquet.data.GeoParquetGroupImpl; -import org.apache.hadoop.mapreduce.Job; -import org.apache.parquet.hadoop.ParquetOutputFormat; -import org.apache.parquet.hadoop.util.ContextUtil; -import org.apache.parquet.schema.MessageType; - -/** - * An example output format - * - * must be provided the schema up front - * - * @see GeoParquetOutputFormat#setSchema(Job, MessageType) - * @see GeoParquetGroupWriteSupport#PARQUET_EXAMPLE_SCHEMA - */ -public class GeoParquetOutputFormat extends ParquetOutputFormat { - - /** - * set the schema being written to the job conf - * - * @param job a job - * @param schema the schema of the data - */ - public static void setSchema(Job job, MessageType schema) { - GeoParquetGroupWriteSupport.setSchema(schema, ContextUtil.getConfiguration(job)); - } - - /** - * retrieve the schema from the conf - * - * @param job a job - * @return the schema - */ - public static MessageType getSchema(Job job) { - return GeoParquetGroupWriteSupport.getSchema(ContextUtil.getConfiguration(job)); - } - - public GeoParquetOutputFormat() { - super(new GeoParquetGroupWriteSupport()); - } -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetWriter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetWriter.java deleted file mode 100644 index 4395d5e65..000000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetWriter.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.hadoop; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import org.apache.baremaps.geoparquet.data.GeoParquetGroupImpl; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.parquet.column.ParquetProperties; -import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.api.WriteSupport; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; -import org.apache.parquet.io.OutputFile; -import org.apache.parquet.schema.MessageType; - -/** - * An example file writer class. THIS IS AN EXAMPLE ONLY AND NOT INTENDED FOR USE. - */ -public class GeoParquetWriter extends ParquetWriter { - - /** - * Creates a Builder for configuring ParquetWriter with the example object model. THIS IS AN - * EXAMPLE ONLY AND NOT INTENDED FOR USE. - * - * @param file the output file to create - * @return a {@link Builder} to create a {@link ParquetWriter} - */ - public static Builder builder(Path file) { - return new Builder(file); - } - - /** - * Creates a Builder for configuring ParquetWriter with the example object model. THIS IS AN - * EXAMPLE ONLY AND NOT INTENDED FOR USE. - * - * @param file the output file to create - * @return a {@link Builder} to create a {@link ParquetWriter} - */ - public static Builder builder(OutputFile file) { - return new Builder(file); - } - - /** - * Create a new {@link GeoParquetWriter}. - * - * @param file The file name to write to. - * @param writeSupport The schema to write with. - * @param compressionCodecName Compression code to use, or CompressionCodecName.UNCOMPRESSED - * @param blockSize the block size threshold. - * @param pageSize See parquet write up. Blocks are subdivided into pages for alignment and other - * purposes. - * @param enableDictionary Whether to use a dictionary to compress columns. - * @param conf The Configuration to use. - * @throws IOException - */ - @SuppressWarnings("squid:S107") - GeoParquetWriter( - Path file, - WriteSupport writeSupport, - CompressionCodecName compressionCodecName, - int blockSize, - int pageSize, - boolean enableDictionary, - boolean enableValidation, - ParquetProperties.WriterVersion writerVersion, - Configuration conf) throws IOException { - super(file, writeSupport, compressionCodecName, blockSize, pageSize, - pageSize, enableDictionary, enableValidation, writerVersion, conf); - } - - public static class Builder extends ParquetWriter.Builder { - - private MessageType type = null; - private Map extraMetaData = new HashMap(); - - private Builder(Path file) { - super(file); - } - - private Builder(OutputFile file) { - super(file); - } - - public Builder withType(MessageType type) { - this.type = type; - return this; - } - - public Builder withExtraMetaData(Map extraMetaData) { - this.extraMetaData = extraMetaData; - return this; - } - - @Override - protected Builder self() { - return this; - } - - @Override - protected WriteSupport getWriteSupport(Configuration conf) { - return new GeoParquetGroupWriteSupport(type, extraMetaData); - } - - } -} diff --git a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java index 6d942a557..849720a3e 100644 --- a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java +++ b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java @@ -20,76 +20,45 @@ import static org.junit.jupiter.api.Assertions.*; import java.net.URI; -import java.net.URISyntaxException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Stream; -import org.apache.baremaps.geoparquet.data.GeoParquetGroup; import org.apache.baremaps.testing.TestFiles; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.locationtech.jts.geom.Envelope; class GeoParquetReaderTest { @Test void read() { URI geoParquet = TestFiles.GEOPARQUET.toUri(); - final boolean isParallel = false; - final int expectedGroupCount = 5; - - readGroups(geoParquet, isParallel, expectedGroupCount); - } - - private static void readGroups(URI geoParquet, boolean parallel, - int expectedGroupCount) { GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet); - final AtomicInteger groupCount = new AtomicInteger(); - Stream geoParquetGroupStream; - if (parallel) { - geoParquetGroupStream = geoParquetReader.readParallel(); - } else { - geoParquetGroupStream = geoParquetReader.read(); - } - geoParquetGroupStream.forEach(group -> groupCount.getAndIncrement()); - - assertEquals(expectedGroupCount, groupCount.get()); + assertEquals(5, geoParquetReader.read().count()); } - @Disabled("Requires access to the Internet") @Test - void readExternal() throws URISyntaxException { - URI geoParquet = new URI( - "s3a://overturemaps-us-west-2/release/2024-03-12-alpha.0/theme=admins/type=locality_area/*.parquet"); - final boolean isParallel = true; - final int expectedGroupCount = 974708; - - readGroups(geoParquet, isParallel, expectedGroupCount); + void readFiltered() { + URI geoParquet = TestFiles.GEOPARQUET.toUri(); + GeoParquetReader geoParquetReader = + new GeoParquetReader(geoParquet, new Envelope(-172, -65, 18, 72)); + assertEquals(1, geoParquetReader.read().count()); } - @Disabled("Requires access to the Internet") @Test - void validateSchemas() throws URISyntaxException { - URI geoParquet = new URI( - "s3a://overturemaps-us-west-2/release/2024-03-12-alpha.0/theme=buildings/type=building/*.parquet"); - + void size() { + URI geoParquet = TestFiles.GEOPARQUET.toUri(); GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet); - assertTrue(geoParquetReader.validateSchemasAreIdentical(), "Schemas are identical"); + assertEquals(5, geoParquetReader.size()); } - @Disabled("Requires access to the Internet") @Test - void sizeForLocalities() throws URISyntaxException { - URI geoParquet = new URI( - "s3a://overturemaps-us-west-2/release/2024-03-12-alpha.0/theme=admins/type=locality_area/*.parquet"); + void count() { + URI geoParquet = TestFiles.GEOPARQUET.toUri(); GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet); - assertEquals(974708L, geoParquetReader.size()); + assertEquals(5, geoParquetReader.read().count()); } - @Disabled("Requires access to the Internet") @Test - void sizeForBuildings() throws URISyntaxException { - URI geoParquet = new URI( - "s3a://overturemaps-us-west-2/release/2024-03-12-alpha.0/theme=buildings/type=building/*.parquet"); + void validateSchemas() { + URI geoParquet = TestFiles.GEOPARQUET.toUri(); GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet); - assertEquals(2352441548L, geoParquetReader.size()); + assertTrue(geoParquetReader.validateSchemasAreIdentical()); } } diff --git a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/OvertureMapsTest.java b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/OvertureMapsTest.java new file mode 100644 index 000000000..0fb5b9205 --- /dev/null +++ b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/OvertureMapsTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.baremaps.geoparquet; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.net.URISyntaxException; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.locationtech.jts.geom.Envelope; + +class OvertureMapsTest { + + @Disabled("Requires access to the Internet") + @Test + void countAddresses() throws URISyntaxException { + URI geoParquet = new URI( + "s3a://overturemaps-us-west-2/release/2024-09-18.0/theme=addresses/**/*.parquet"); + Envelope switzerland = new Envelope(6.02260949059, 10.4427014502, 45.7769477403, 47.8308275417); + GeoParquetReader geoParquetReader = + new GeoParquetReader(geoParquet, switzerland, OvertureMaps.configuration()); + assertEquals(10397434, geoParquetReader.readParallel().count()); + } + + @Disabled("Requires access to the Internet") + @Test + void countAddressesInSwitzerland() throws URISyntaxException { + URI geoParquet = new URI( + "s3a://overturemaps-us-west-2/release/2024-09-18.0/theme=addresses/**/*.parquet"); + Envelope switzerland = new Envelope(6.02260949059, 10.4427014502, 45.7769477403, 47.8308275417); + GeoParquetReader geoParquetReader = + new GeoParquetReader(geoParquet, switzerland, OvertureMaps.configuration()); + assertEquals(10397434, geoParquetReader.readParallel().count()); + } + + @Disabled("Requires access to the Internet") + @Test + void validateSchemas() throws URISyntaxException { + URI geoParquet = new URI( + "s3a://overturemaps-us-west-2/release/2024-09-18.0/theme=addresses/**/*.parquet"); + GeoParquetReader geoParquetReader = + new GeoParquetReader(geoParquet, null, OvertureMaps.configuration()); + assertTrue(geoParquetReader.validateSchemasAreIdentical(), "Schemas are identical"); + } + + @Disabled("Requires access to the Internet") + @Test + void size() throws URISyntaxException { + URI geoParquet = new URI( + "s3a://overturemaps-us-west-2/release/2024-09-18.0/theme=addresses/**/*.parquet"); + GeoParquetReader geoParquetReader = + new GeoParquetReader(geoParquet, null, OvertureMaps.configuration()); + assertEquals(213535887L, geoParquetReader.size()); + } + +} diff --git a/pom.xml b/pom.xml index 3afb754d0..07acbfcdf 100644 --- a/pom.xml +++ b/pom.xml @@ -47,6 +47,7 @@ limitations under the License. + baremaps-benchmarking baremaps-cli baremaps-core baremaps-data @@ -522,6 +523,11 @@ limitations under the License. s3 ${version.lib.awssdk} + + software.amazon.awssdk + s3-transfer-manager + ${version.lib.awssdk} +