-
Notifications
You must be signed in to change notification settings - Fork 79
Description
Describe the enhancement requested
Part 4 in the Avro series, following on from #731. This will allow reading and writing whole files in the Avro container format as a series of batches. Each batch will correspond to one Avro file block and fill a single VSR. The VSR can be recycled between batches. Input and output can be to Avro encoder / decoder (set up externally) or to Java's native byte channels (which are set up with default binary encoder / decoder). To cater for async scenarios, the reader API should know how many bytes are required for a block before attempting to read it.
I'd like to propose the following API - hopefully this is going in the right direction. I've taken some inspiration from ArrowFilleReader / Writer and Json Reader / Writer, but it's not identical (and they're not identical to each other). If there is a desire to line up on specific naming / conventions then certainly happy to do that, in which case I'll need a steer on exactly how it should be. Otherwise if anyone has radically different ideas of what it should look like, please do share!
class AvroFileWriter {
// Writer owns a channel / encoder and will close them
// VSR and optional dictionaries are not owned and will not be closed
// VSR can be recycled or supplied as a stream
// Avro encoder configured externally
public AvroFileWriter(
Encoder encoder,
VectorSchemaRoot firstBatch,
DictionaryProvider dictionaries)
// Sets up a defaulr binary encoder for the channel
public AvroFileWriter(
WritableByteChannel channel,
VectorSchemaRoot firstBatch,
DictionaryProvider dictionaries)
// Write the Avro header (throws if already written)
void writeHeader()
// Write the contents of the VSR as an Avro data block
// Writes header if not yet written
// Expects new data to be in the batch (i.e. VSR can be recycled)
void writeBatch()
// Reset vectors in all the producders
// Supports a stream of VSRs if source VSR is not recycled
void resetBatch(VectorSchemaRoot batch)
// Closes encoder and / or channel
// Does not close VSR or dictionary vectors
void close()
}
Now writing data looks like this:
void writeAvro(MyApp app) {
var root = app.prepareVsr();
var dictionaries = app.prepareDictionaries()
try (var writer = new AvroFileWriter(app.openChannel(), root, dictionaries)) {
writer.writeHeader();
// Assume recycling, loadBatch() puts fresh data into root
while (app.loadBatch()) {
writer.writeBatch()
}
}
}
And then for the reader:
class AvroFileReader implements DictionaryProvider {
// Writer owns a channel / decoder and will close them
// Schema / VSR / dictionaries are created when header is read
// VSR / dictionaries are cleaned up on close
// Dictionaries accessible through DictionaryProvider iface
// Avro decoder configured externally
public AvroFileWriter(
Decoder decoder,
BufferAllocator allocator)
// Sets up a defaulr binary deocder for the channel
// Avro read sequentially so seekable channel not needed
public AvroFileWriter(
ReadableByteChannel channel,
BufferAllocator allocator)
// Read the Avro header and set up schema / VSR / dictionaries
void readHeader()
// Schema and VSR available after readHeader()
Schema getSchema()
VectorSchemaRoot getVectorSchemaRoot()
// Read the next Avro block and load it into the VSR
// Return true if successful, false if EOS
// Also false in non-blocking mode if need more data
boolean readBatch()
// Check for position and size of the next Avro data block
// Provides a mechanism for non-blocking / reactive styles
boolean hasNextBatch();
long nextBatchPosition();
long nextBatchSize();
// Closes encoder and / or channel
// Also closes VSR and dictionary vectors
void close()
}
So reading looks like this:
// Blocking style
void readAvro(MyApp app) {
try (var reader = new AvroFileReader(app.openChannel(), app.allocator()) {
reader.readHeader();
app.setSchema(reader.getSchema());
app.setVsr(reader.getVectorSchemaRoot());
app.setDictionaries(reader);
while (reader.readBatch())) {
app.saveBatch();
}
}
}
// Non-blocking stage to process one batch
CompletionStage<Boolean> readAvroAsync(AvroFileReader reader) {
if (reader.hasNextBatch()) {
var start = reader.nextBatchStart();
var end = reader.nextBatchEnd();
return app.ensureBytesAvailable(start, end)
.thenApply(x -> {
if (reader.readBatch()) {
app.saveBatch();
}
return reader;
})
.thenCompose(this::readAsync);
}
else {
return CompletableFuture.completedFuture(true);
}
}
The non-blocking read is quite important for me as I have a web service that receives bytes in a stream. There is a slight gotcha because we need the first 8 bytes of the next batch before we know its size, but we can implement hasNextBatch() without them and the probably expose the batch padding size as a constant.
Compression is probably worth thinking about now - each block is compressed individually so the implementation needs to treat the contents of each block as a separate chunk, that can be fed through a codec. My guess is this is fairly straightforward for codecs that are already available so we might as well include it rather than reworking later.
If this looks broadly right I'll make a start on top of #779