Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into size_in_bytes_api
Browse files Browse the repository at this point in the history
  • Loading branch information
SemyonSinchenko committed Aug 29, 2024
2 parents 40c3d00 + fec6867 commit 71fe59c
Show file tree
Hide file tree
Showing 26 changed files with 1,193 additions and 469 deletions.
6 changes: 2 additions & 4 deletions common/variant/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,6 @@ Each `array_val` and `object_val` must contain exactly `num_elements + 1` values

The "short string" basic type may be used as an optimization to fold string length into the type byte for strings less than 64 bytes. It is semantically identical to the "string" primitive type.

String and binary values may also be represented as an index into the metadata dictionary. (See “string from metadata” and “binary from metadata” in the “Primitive Types” table) Writers may choose to use this mechanism to avoid repeating identical string values in a Variant object.

The Decimal type contains a scale, but no precision. The implied precision of a decimal value is `floor(log_10(val)) + 1`.

# Encoding types
Expand Down Expand Up @@ -354,8 +352,6 @@ The Decimal type contains a scale, but no precision. The implied precision of a
| float | `14` | FLOAT | IEEE little-endian |
| binary | `15` | BINARY | 4 byte little-endian size, followed by bytes |
| string | `16` | STRING | 4 byte little-endian size, followed by UTF-8 encoded bytes |
| binary from metadata | `17` | BINARY | Little-endian index into the metadata dictionary. Number of bytes is equal to the metadata `offset_size`. |
| string from metadata | `18` | STRING | Little-endian index into the metadata dictionary. Number of bytes is equal to the metadata `offset_size`. |
| year-month interval | `19` | INT(32, signed)<sup>1</sup> | 1 byte denoting start field (1 bit) and end field (1 bit) starting at LSB followed by 4-byte little-endian value. |
| day-time interval | `20` | INT(64, signed)<sup>1</sup> | 1 byte denoting start field (2 bits) and end field (2 bits) starting at LSB followed by 8-byte little-endian value. |

Expand All @@ -368,6 +364,8 @@ The Decimal type contains a scale, but no precision. The implied precision of a

The year-month and day-time interval types have one byte at the beginning indicating the start and end fields. In the case of the year-month interval, the least significant bit denotes the start field and the next least significant bit denotes the end field. The remaining 6 bits are unused. A field value of 0 represents YEAR and 1 represents MONTH. In the case of the day-time interval, the least significant 2 bits denote the start field and the next least significant 2 bits denote the end field. The remaining 4 bits are unused. A field value of 0 represents DAY, 1 represents HOUR, 2 represents MINUTE, and 3 represents SECOND.

Type IDs 17 and 18 were originally reserved for a prototype feature (string-from-metadata) that was never implemented. These IDs are available for use by new types.

[1] The parquet format does not have pure equivalents for the year-month and day-time interval types. Year-month intervals are usually represented using int32 values and the day-time intervals are usually represented using int64 values. However, these values don't include the start and end fields of these types. Therefore, Spark stores them in the column metadata.

# Field ID order and uniqueness
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@ import java.io.ByteArrayOutputStream
import scala.jdk.CollectionConverters._

import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord, GenericRecordBuilder}
import org.apache.avro.io.EncoderFactory
import org.apache.avro.file.SeekableByteArrayInput
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord, GenericRecordBuilder}
import org.apache.avro.io.{DecoderFactory, EncoderFactory}

import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.avro.{functions => Fns}
import org.apache.spark.sql.execution.LocalTableScanExec
import org.apache.spark.sql.functions.{col, lit, struct}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{BinaryType, StructType}

class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
import testImplicits._
Expand Down Expand Up @@ -371,4 +373,218 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
stop = 138)))
}
}

private def serialize(record: GenericRecord, avroSchema: String): Array[Byte] = {
val schema = new Schema.Parser().parse(avroSchema)
val datumWriter = new GenericDatumWriter[GenericRecord](schema)
var outputStream: ByteArrayOutputStream = null
var bytes: Array[Byte] = null
try {
outputStream = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(outputStream, null)
datumWriter.write(record, encoder)
encoder.flush()
bytes = outputStream.toByteArray
} finally {
if (outputStream != null) {
outputStream.close()
}
}
bytes
}

private def deserialize(bytes: Array[Byte], avroSchema: String): GenericRecord = {
val schema = new Schema.Parser().parse(avroSchema)
val datumReader = new GenericDatumReader[GenericRecord](schema)
var inputStream: SeekableByteArrayInput = null
var record: GenericRecord = null
try {
inputStream = new SeekableByteArrayInput(bytes)
val decoder = DecoderFactory.get.binaryDecoder(inputStream, null)
record = datumReader.read(null, decoder)
} finally {
if (inputStream != null) {
inputStream.close()
}
}
record
}

// write: `GenericRecord` -> binary (by `serialize`) -> dataframe
// read: dataframe -> binary -> `GenericRecord` (by `deserialize`)
test("roundtrip in serialize and deserialize - GenericRecord") {
val avroSchema =
"""
|{
| "type": "record",
| "name": "person",
| "fields": [
| {"name": "name", "type": "string"},
| {"name": "age", "type": "int"},
| {"name": "country", "type": "string"}
| ]
|}
|""".stripMargin
val testTable = "test_avro"
withTable(testTable) {
val schema = new Schema.Parser().parse(avroSchema)
val person1 = new GenericRecordBuilder(schema)
.set("name", "sparkA")
.set("age", 18)
.set("country", "usa")
.build()
val person2 = new GenericRecordBuilder(schema)
.set("name", "sparkB")
.set("age", 19)
.set("country", "usb")
.build()
Seq(person1, person2)
.map(p => serialize(p, avroSchema))
.toDF("data")
.repartition(1)
.writeTo(testTable)
.create()

val expectedSchema = new StructType().add("data", BinaryType)
assert(spark.table(testTable).schema === expectedSchema)

// Note that what is returned here is `Row[Array[Byte]]`
val avroDF = sql(s"SELECT data FROM $testTable")
val readbacks = avroDF
.collect()
.map(row => deserialize(row.get(0).asInstanceOf[Array[Byte]], avroSchema))

val readbackPerson1 = readbacks.head
assert(readbackPerson1.get(0).toString === person1.get(0))
assert(readbackPerson1.get(1).asInstanceOf[Int] === person1.get(1).asInstanceOf[Int])
assert(readbackPerson1.get(2).toString === person1.get(2))

val readbackPerson2 = readbacks(1)
assert(readbackPerson2.get(0).toString === person2.get(0))
assert(readbackPerson2.get(1).asInstanceOf[Int] === person2.get(1).asInstanceOf[Int])
assert(readbackPerson2.get(2).toString === person2.get(2))
}
}

// write: `GenericRecord` -> binary (by `serialize`) -> dataframe
// read: dataframe -> binary -> struct (by `from_avro`) -> `GenericRecord`
test("use `serialize` to write GenericRecord and `from_avro` to read GenericRecord") {
val avroSchema =
"""
|{
| "type": "record",
| "name": "person",
| "fields": [
| {"name": "name", "type": "string"},
| {"name": "age", "type": "int"},
| {"name": "country", "type": "string"}
| ]
|}
|""".stripMargin
val testTable = "test_avro"
withTable(testTable) {
val schema = new Schema.Parser().parse(avroSchema)
val person1 = new GenericRecordBuilder(schema)
.set("name", "sparkA")
.set("age", 18)
.set("country", "usa")
.build()
val person2 = new GenericRecordBuilder(schema)
.set("name", "sparkB")
.set("age", 19)
.set("country", "usb")
.build()
Seq(person1, person2)
.map(p => serialize(p, avroSchema))
.toDF("data")
.repartition(1)
.writeTo(testTable)
.create()

val expectedSchema = new StructType().add("data", BinaryType)
assert(spark.table(testTable).schema === expectedSchema)

// Note that what is returned here is `Row[Struct]`
val avroDF = sql(s"SELECT from_avro(data, '$avroSchema', map()) FROM $testTable")
val readbacks = avroDF
.collect()
.map(row =>
new GenericRecordBuilder(schema)
.set("name", row.getStruct(0).getString(0))
.set("age", row.getStruct(0).getInt(1))
.set("country", row.getStruct(0).getString(2))
.build())

val readbackPerson1 = readbacks.head
assert(readbackPerson1.get(0) === person1.get(0))
assert(readbackPerson1.get(1).asInstanceOf[Int] === person1.get(1).asInstanceOf[Int])
assert(readbackPerson1.get(2) === person1.get(2))

val readbackPerson2 = readbacks(1)
assert(readbackPerson2.get(0) === person2.get(0))
assert(readbackPerson2.get(1).asInstanceOf[Int] === person2.get(1).asInstanceOf[Int])
assert(readbackPerson2.get(2) === person2.get(2))
}
}

// write: `GenericRecord` (to `struct`) -> binary (by `to_avro`) -> dataframe
// read: dataframe -> binary -> `GenericRecord` (by `deserialize`)
test("use `to_avro` to write GenericRecord and `deserialize` to read GenericRecord") {
val avroSchema =
"""
|{
| "type": "record",
| "name": "person",
| "fields": [
| {"name": "name", "type": "string"},
| {"name": "age", "type": "int"},
| {"name": "country", "type": "string"}
| ]
|}
|""".stripMargin
val testTable = "test_avro"
withTable(testTable) {
val schema = new Schema.Parser().parse(avroSchema)
val person1 = new GenericRecordBuilder(schema)
.set("name", "sparkA")
.set("age", 18)
.set("country", "usa")
.build()
val person2 = new GenericRecordBuilder(schema)
.set("name", "sparkB")
.set("age", 19)
.set("country", "usb")
.build()
Seq(person1, person2)
.map(p => (
p.get(0).asInstanceOf[String],
p.get(1).asInstanceOf[Int],
p.get(2).asInstanceOf[String]))
.toDF("name", "age", "country")
.select(Fns.to_avro(struct($"name", $"age", $"country"), avroSchema).as("data"))
.repartition(1)
.writeTo(testTable)
.create()

val expectedSchema = new StructType().add("data", BinaryType)
assert(spark.table(testTable).schema === expectedSchema)

// Note that what is returned here is `Row[Array[Byte]]`
val avroDF = sql(s"select data from $testTable")
val readbacks = avroDF
.collect()
.map(row => row.get(0).asInstanceOf[Array[Byte]])
.map(bytes => deserialize(bytes, avroSchema))

val readbackPerson1 = readbacks.head
assert(readbackPerson1.get(0).toString === person1.get(0))
assert(readbackPerson1.get(1).asInstanceOf[Int] === person1.get(1).asInstanceOf[Int])
assert(readbackPerson1.get(2).toString === person1.get(2))

val readbackPerson2 = readbacks(1)
assert(readbackPerson2.get(0).toString === person2.get(0))
assert(readbackPerson2.get(1).asInstanceOf[Int] === person2.get(1).asInstanceOf[Int])
assert(readbackPerson2.get(2).toString === person2.get(2))
}
}
}
Loading

0 comments on commit 71fe59c

Please sign in to comment.