Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* <p>
* it is basically a copy from org.apache.avro.hadoop.file.SortedKeyValueFile with changes so every block in the avro
* file is a b-tree node, and every row has one additional "long" field that points to this record's child node if
* if is not a leaf record
* it is not a leaf record
*/
public class AvroBtreeFile {
public static final String DATA_SIZE_KEY = "data_bytes";
Expand Down Expand Up @@ -59,6 +59,14 @@ public Schema getValueSchema() {
return mValueSchema;
}

public long getFileHeaderEnd() {
return fileHeaderEnd;
}

public DataFileReader<GenericRecord> getmFileReader() {
return mFileReader;
}

/**
* A class to encapsulate the options of a Reader.
*/
Expand Down Expand Up @@ -303,22 +311,25 @@ public static class Writer implements Closeable {
private final int mHeight;

private GenericRecord mPreviousKey;
private Node curNode = new Node();
private final Node root = curNode;
private Node curNode;
private final Node root;

private class Node {
private static class Node {
List<GenericRecord> records;
List<Node> childs;
Node prev;
int height;

public Node() {
public Node(int mInterval) {
records = new ArrayList<>(mInterval);
childs = new ArrayList<>(mInterval);
}

public Node(Node prevNode) {
records = new ArrayList<>(mInterval);
public Node(Node prevNode, int mInterval) {
this(mInterval);
prev = prevNode;
height = prevNode.height + 1;
prevNode.childs.add(this);
}

public GenericRecord getCurRecord() {
Expand Down Expand Up @@ -483,10 +494,12 @@ public Writer(Options options) throws IOException {
}
logger.debug("Created directory " + options.getPath());

curNode = new Node(mInterval);
root = curNode;

// Open a writer for the data file.
Path dataFilePath = options.getPath();
logger.debug("Creating writer for avro data file: " + dataFilePath);
List<Schema.Field> schemaFields = new ArrayList<>();
mRecordSchema = createSchema(mKeySchema, mValueSchema);
String keys = String.join(",", mKeySchema.getFields().stream().map(Schema.Field::name).toArray(String[]::new));
String values = String.join(",", mValueSchema.getFields().stream().map(Schema.Field::name).toArray(String[]::new));
Expand Down Expand Up @@ -517,14 +530,14 @@ public void append(GenericRecord key, GenericRecord value) throws IOException {
if (curNode.height == 0 || curNode.records.size() < mInterval) {
curNode.addRecord(dataRecord);
if (curNode.height < mHeight) {
curNode = new Node(curNode);
curNode = new Node(curNode, mInterval);
}
} else {
while (curNode.records.size() == mInterval && curNode.height > 0) {
flush();
curNode = curNode.prev;
}
curNode.addRecord(dataRecord);
curNode = new Node(curNode);
curNode = new Node(curNode, mInterval);
}

}
Expand All @@ -534,26 +547,32 @@ public void append(GenericRecord key, GenericRecord value) throws IOException {
*/
@Override
public void close() throws IOException {
while (curNode != root) {
flush();
}
flush();
writeNodesReversePreOrder(root);
bufferedWriter.reverseAndClose(fileSystem.create(filename));
}

private void flush() throws IOException {
private long writeNodesReversePreOrder(Writer.Node node) throws IOException {
for (int i=node.childs.size()-1; i>=0; i--) {
Writer.Node child = node.childs.get(i);
long pos = writeNodesReversePreOrder(child);
if (pos>0) // first block is the root block
node.records.get(i).put(METADATA_COL_NAME, pos);
}
return writeNodeRecordsInMemory(node);
}

private long writeNodeRecordsInMemory(Writer.Node curNode) throws IOException {
int numRecordsWriting = curNode.records.size();
logger.debug("writing {} records in height {}, records: {}", numRecordsWriting, curNode.height, curNode.records);
for (GenericRecord record : curNode.records) {
bufferedWriter.append(record);
}

// the reader will see the blocks backwards, so need to take the sync marker AFTER the block is written:
long position = bufferedWriter.sync();
curNode = curNode.prev;
if (curNode != null) {
if (numRecordsWriting > 0)
curNode.getCurRecord().put(METADATA_COL_NAME, position);
}
// remove data from memory as it was already written to buffer
curNode.records = null;
return position;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import com.paypal.dione.avro.utils.AvroExtensions
import org.apache.avro.SchemaBuilder
import org.junit.jupiter.api.{Assertions, Test}

import scala.collection.mutable.ArrayBuffer

class TestAvroBtreeStorageFile extends AvroExtensions {
val simpleSchema = SchemaBuilder.record("single_string").fields().requiredString("val1").endRecord()
val simpleSchema2 = SchemaBuilder.record("single_string2").fields().requiredString("val2").endRecord()
Expand All @@ -13,6 +15,8 @@ class TestAvroBtreeStorageFile extends AvroExtensions {
val tuplesStorage = AvroBtreeStorageFileFactory(simpleSchema, tupleSchema)

val filename = "TestData/TestAvroBtreeStorageFile"
val printDebug = false
val strList = new ArrayBuffer[String]

@Test
def testOneLevel(): Unit = {
Expand Down Expand Up @@ -44,6 +48,8 @@ class TestAvroBtreeStorageFile extends AvroExtensions {
kvStorageFileWriter.write(entries, filename)

val kvStorageFileReader = simpleStorage.reader(filename)
printBtreeAvroFile(kvStorageFileReader)

Assertions.assertEquals("002", kvStorageFileReader.get(simpleSchema.createRecord("002")).get.get("val2").toString)
Assertions.assertEquals(None, kvStorageFileReader.get(simpleSchema.createRecord("100")))
Assertions.assertEquals(None, kvStorageFileReader.get(simpleSchema.createRecord("000")))
Expand Down Expand Up @@ -122,6 +128,38 @@ class TestAvroBtreeStorageFile extends AvroExtensions {
val kvStorageFileReader = simpleStorage.reader(filename)
Assertions.assertEquals(entries.toList.mkString(","),
kvStorageFileReader.getIterator().map(_._2.get("val2")).toList.mkString(","))

printBtreeAvroFile(kvStorageFileReader)
}

def printBtreeAvroFile(kvStorageFileReader: AvroBtreeStorageFileReader): Unit = {
import scala.collection.JavaConverters._
val fileS = kvStorageFileReader.fileReader.getmFileReader
fileS.sync(0)
val it = fileS.iterator().asScala
var block = 0L
val headerPos = kvStorageFileReader.fileReader.getFileHeaderEnd
printDebug("datasize: " + kvStorageFileReader.fileReader.getmFileReader.getMetaLong("data_bytes"))
printDebug("header end at: " + headerPos)

strList.clear()
it.foreach(r => {
val lastSync = kvStorageFileReader.fileReader.getmFileReader.previousSync()
if (lastSync!=block) {
printDebug(r.toString)
if (block>0)
printDebug("block ^^ at:" + (block-headerPos))
block = lastSync
} else {
printDebug(r.toString)
}
})
}

def printDebug(log: String): Unit = {
if (printDebug)
println(log)
strList.append(log)
}

@Test
Expand All @@ -136,6 +174,43 @@ class TestAvroBtreeStorageFile extends AvroExtensions {
@Test
def testIterator10_3(): Unit = btreeProps(100,10, 3)

@Test
def testIterator30_3_3(): Unit = {
btreeProps(20,2, 3)
Assertions.assertEquals(
"""{val1: 001, val2: 001, metadata: 295}
|{val1: 008, val2: 008, metadata: 195}
|{val1: 015, val2: 015, metadata: 93}
|block ^^ at:0
|{val1: 002, val2: 002, metadata: 259}
|{val1: 005, val2: 005, metadata: 227}
|block ^^ at:45
|{val1: 003, val2: 003, metadata: null}
|{val1: 004, val2: 004, metadata: null}
|block ^^ at:81
|{val1: 006, val2: 006, metadata: null}
|{val1: 007, val2: 007, metadata: null}
|block ^^ at:113
|{val1: 009, val2: 009, metadata: 157}
|{val1: 012, val2: 012, metadata: 125}
|block ^^ at:145
|{val1: 010, val2: 010, metadata: null}
|{val1: 011, val2: 011, metadata: null}
|block ^^ at:183
|{val1: 013, val2: 013, metadata: null}
|{val1: 014, val2: 014, metadata: null}
|block ^^ at:215
|{val1: 016, val2: 016, metadata: 59}
|{val1: 019, val2: 019, metadata: 27}
|block ^^ at:247
|{val1: 017, val2: 017, metadata: null}
|{val1: 018, val2: 018, metadata: null}
|block ^^ at:281
|{val1: 020, val2: 020, metadata: null}
|block ^^ at:313""".stripMargin,
strList.mkString("\n").replaceAll("\"", ""))
}

@Test
def testIteratorWithKey(): Unit = {
val kvStorageFileWriter = simpleStorage.writer(2, 3)
Expand Down