Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into SPARK-49700
Browse files Browse the repository at this point in the history
  • Loading branch information
hvanhovell committed Nov 13, 2024
2 parents 1128897 + a84ca5e commit bf60e2d
Show file tree
Hide file tree
Showing 84 changed files with 2,237 additions and 703 deletions.
16 changes: 6 additions & 10 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1045,16 +1045,6 @@
"The input of <functionName> can't be <dataType> type data."
]
},
"UNSUPPORTED_UDF_INPUT_TYPE" : {
"message" : [
"UDFs do not support '<dataType>' as an input data type."
]
},
"UNSUPPORTED_UDF_OUTPUT_TYPE" : {
"message" : [
"UDFs do not support '<dataType>' as an output data type."
]
},
"VALUE_OUT_OF_RANGE" : {
"message" : [
"The <exprName> must be between <valueRange> (current value = <currentValue>)."
Expand Down Expand Up @@ -3319,6 +3309,12 @@
],
"sqlState" : "22023"
},
"INVALID_VARIANT_SHREDDING_SCHEMA" : {
"message" : [
"The schema `<schema>` is not a valid variant shredding schema."
],
"sqlState" : "22023"
},
"INVALID_WHERE_CONDITION" : {
"message" : [
"The WHERE condition <condition> contains invalid expressions: <expressionList>.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,18 @@ public ObjectField getFieldAtIndex(int index) {
});
}

// Get the dictionary ID for the object field at the `index` slot. Throws malformedVariant if
// `index` is out of the bound of `[0, objectSize())`.
// It is only legal to call it when `getType()` is `Type.OBJECT`.
public int getDictionaryIdAtIndex(int index) {
return handleObject(value, pos, (size, idSize, offsetSize, idStart, offsetStart, dataStart) -> {
if (index < 0 || index >= size) {
throw malformedVariant();
}
return readUnsigned(value, idStart + idSize * index, idSize);
});
}

// Get the number of array elements in the variant.
// It is only legal to call it when `getType()` is `Type.ARRAY`.
public int arraySize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ public Variant result() {
return new Variant(Arrays.copyOfRange(writeBuffer, 0, writePos), metadata);
}

// Return the variant value only, without metadata.
// Used in shredding to produce a final value, where all shredded values refer to a common
// metadata. It is expected to be called instead of `result()`, although it is valid to call both
// methods, in any order.
public byte[] valueWithoutMetadata() {
return Arrays.copyOfRange(writeBuffer, 0, writePos);
}

public void appendString(String str) {
byte[] text = str.getBytes(StandardCharsets.UTF_8);
boolean longStr = text.length > MAX_SHORT_STR_SIZE;
Expand Down Expand Up @@ -404,15 +412,26 @@ private void appendVariantImpl(byte[] value, byte[] metadata, int pos) {
});
break;
default:
int size = valueSize(value, pos);
checkIndex(pos + size - 1, value.length);
checkCapacity(size);
System.arraycopy(value, pos, writeBuffer, writePos, size);
writePos += size;
shallowAppendVariantImpl(value, pos);
break;
}
}

// Append the variant value without rewriting or creating any metadata. This is used when
// building an object during shredding, where there is a fixed pre-existing metadata that
// all shredded values will refer to.
public void shallowAppendVariant(Variant v) {
shallowAppendVariantImpl(v.value, v.pos);
}

private void shallowAppendVariantImpl(byte[] value, int pos) {
int size = valueSize(value, pos);
checkIndex(pos + size - 1, value.length);
checkCapacity(size);
System.arraycopy(value, pos, writeBuffer, writePos, size);
writePos += size;
}

private void checkCapacity(int additional) {
int required = writePos + additional;
if (required > writeBuffer.length) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.types.variant;

import java.util.HashMap;
import java.util.Map;

/**
* Defines a valid shredding schema, as described in
* https://github.com/apache/parquet-format/blob/master/VariantShredding.md.
* A shredding schema contains a value and optional typed_value field.
* If a typed_value is an array or struct, it recursively contain its own shredding schema for
* elements and fields, respectively.
* The schema also contains a metadata field at the top level, but not in recursively shredded
* fields.
*/
public class VariantSchema {

// Represents one field of an object in the shredding schema.
public static final class ObjectField {
public final String fieldName;
public final VariantSchema schema;

public ObjectField(String fieldName, VariantSchema schema) {
this.fieldName = fieldName;
this.schema = schema;
}

@Override
public String toString() {
return "ObjectField{" +
"fieldName=" + fieldName +
", schema=" + schema +
'}';
}
}

public abstract static class ScalarType {
}

public static final class StringType extends ScalarType {
}

public enum IntegralSize {
BYTE, SHORT, INT, LONG
}

public static final class IntegralType extends ScalarType {
public final IntegralSize size;

public IntegralType(IntegralSize size) {
this.size = size;
}
}

public static final class FloatType extends ScalarType {
}

public static final class DoubleType extends ScalarType {
}

public static final class BooleanType extends ScalarType {
}

public static final class BinaryType extends ScalarType {
}

public static final class DecimalType extends ScalarType {
public final int precision;
public final int scale;

public DecimalType(int precision, int scale) {
this.precision = precision;
this.scale = scale;
}
}

public static final class DateType extends ScalarType {
}

public static final class TimestampType extends ScalarType {
}

public static final class TimestampNTZType extends ScalarType {
}

// The index of the typed_value, value, and metadata fields in the schema, respectively. If a
// given field is not in the schema, its value must be set to -1 to indicate that it is invalid.
// The indices of valid fields should be contiguous and start from 0.
public final int typedIdx;
public final int variantIdx;
// topLevelMetadataIdx must be non-negative in the top-level schema, and -1 at all other nesting
// levels.
public final int topLevelMetadataIdx;
// The number of fields in the schema. I.e. a value between 1 and 3, depending on which of value,
// typed_value and metadata are present.
public final int numFields;

public final ScalarType scalarSchema;
public final ObjectField[] objectSchema;
// Map for fast lookup of object fields by name. The values are an index into `objectSchema`.
public final Map<String, Integer> objectSchemaMap;
public final VariantSchema arraySchema;

public VariantSchema(int typedIdx, int variantIdx, int topLevelMetadataIdx, int numFields,
ScalarType scalarSchema, ObjectField[] objectSchema,
VariantSchema arraySchema) {
this.typedIdx = typedIdx;
this.numFields = numFields;
this.variantIdx = variantIdx;
this.topLevelMetadataIdx = topLevelMetadataIdx;
this.scalarSchema = scalarSchema;
this.objectSchema = objectSchema;
if (objectSchema != null) {
objectSchemaMap = new HashMap<>();
for (int i = 0; i < objectSchema.length; i++) {
objectSchemaMap.put(objectSchema[i].fieldName, i);
}
} else {
objectSchemaMap = null;
}

this.arraySchema = arraySchema;
}

@Override
public String toString() {
return "VariantSchema{" +
"typedIdx=" + typedIdx +
", variantIdx=" + variantIdx +
", topLevelMetadataIdx=" + topLevelMetadataIdx +
", numFields=" + numFields +
", scalarSchema=" + scalarSchema +
", objectSchema=" + objectSchema +
", arraySchema=" + arraySchema +
'}';
}
}
Loading

0 comments on commit bf60e2d

Please sign in to comment.