diff --git a/dione-hadoop/src/main/java/com/paypal/dione/avro/hadoop/file/AvroBtreeFile.java b/dione-hadoop/src/main/java/com/paypal/dione/avro/hadoop/file/AvroBtreeFile.java
index 63938abb..1bb40efd 100644
--- a/dione-hadoop/src/main/java/com/paypal/dione/avro/hadoop/file/AvroBtreeFile.java
+++ b/dione-hadoop/src/main/java/com/paypal/dione/avro/hadoop/file/AvroBtreeFile.java
@@ -31,7 +31,7 @@
*
* it is basically a copy from org.apache.avro.hadoop.file.SortedKeyValueFile with changes so every block in the avro
* file is a b-tree node, and every row has one additional "long" field that points to this record's child node if
- * if is not a leaf record
+ * it is not a leaf record
*/
public class AvroBtreeFile {
public static final String DATA_SIZE_KEY = "data_bytes";
@@ -59,6 +59,14 @@ public Schema getValueSchema() {
return mValueSchema;
}
+ public long getFileHeaderEnd() {
+ return fileHeaderEnd;
+ }
+
+ public DataFileReader getmFileReader() {
+ return mFileReader;
+ }
+
/**
* A class to encapsulate the options of a Reader.
*/
@@ -303,22 +311,25 @@ public static class Writer implements Closeable {
private final int mHeight;
private GenericRecord mPreviousKey;
- private Node curNode = new Node();
- private final Node root = curNode;
+ private Node curNode;
+ private final Node root;
- private class Node {
+ private static class Node {
List records;
+ List childs;
Node prev;
int height;
- public Node() {
+ public Node(int mInterval) {
records = new ArrayList<>(mInterval);
+ childs = new ArrayList<>(mInterval);
}
- public Node(Node prevNode) {
- records = new ArrayList<>(mInterval);
+ public Node(Node prevNode, int mInterval) {
+ this(mInterval);
prev = prevNode;
height = prevNode.height + 1;
+ prevNode.childs.add(this);
}
public GenericRecord getCurRecord() {
@@ -483,10 +494,12 @@ public Writer(Options options) throws IOException {
}
logger.debug("Created directory " + options.getPath());
+ curNode = new Node(mInterval);
+ root = curNode;
+
// Open a writer for the data file.
Path dataFilePath = options.getPath();
logger.debug("Creating writer for avro data file: " + dataFilePath);
- List schemaFields = new ArrayList<>();
mRecordSchema = createSchema(mKeySchema, mValueSchema);
String keys = String.join(",", mKeySchema.getFields().stream().map(Schema.Field::name).toArray(String[]::new));
String values = String.join(",", mValueSchema.getFields().stream().map(Schema.Field::name).toArray(String[]::new));
@@ -517,14 +530,14 @@ public void append(GenericRecord key, GenericRecord value) throws IOException {
if (curNode.height == 0 || curNode.records.size() < mInterval) {
curNode.addRecord(dataRecord);
if (curNode.height < mHeight) {
- curNode = new Node(curNode);
+ curNode = new Node(curNode, mInterval);
}
} else {
while (curNode.records.size() == mInterval && curNode.height > 0) {
- flush();
+ curNode = curNode.prev;
}
curNode.addRecord(dataRecord);
- curNode = new Node(curNode);
+ curNode = new Node(curNode, mInterval);
}
}
@@ -534,26 +547,32 @@ public void append(GenericRecord key, GenericRecord value) throws IOException {
*/
@Override
public void close() throws IOException {
- while (curNode != root) {
- flush();
- }
- flush();
+ writeNodesReversePreOrder(root);
bufferedWriter.reverseAndClose(fileSystem.create(filename));
}
- private void flush() throws IOException {
+ private long writeNodesReversePreOrder(Writer.Node node) throws IOException {
+ for (int i=node.childs.size()-1; i>=0; i--) {
+ Writer.Node child = node.childs.get(i);
+ long pos = writeNodesReversePreOrder(child);
+ if (pos>0) // first block is the root block
+ node.records.get(i).put(METADATA_COL_NAME, pos);
+ }
+ return writeNodeRecordsInMemory(node);
+ }
+
+ private long writeNodeRecordsInMemory(Writer.Node curNode) throws IOException {
int numRecordsWriting = curNode.records.size();
logger.debug("writing {} records in height {}, records: {}", numRecordsWriting, curNode.height, curNode.records);
for (GenericRecord record : curNode.records) {
bufferedWriter.append(record);
}
+
// the reader will see the blocks backwards, so need to take the sync marker AFTER the block is written:
long position = bufferedWriter.sync();
- curNode = curNode.prev;
- if (curNode != null) {
- if (numRecordsWriting > 0)
- curNode.getCurRecord().put(METADATA_COL_NAME, position);
- }
+ // remove data from memory as it was already written to buffer
+ curNode.records = null;
+ return position;
}
}
diff --git a/dione-hadoop/src/test/scala/com/paypal/dione/kvstorage/hadoop/avro/TestAvroBtreeStorageFile.scala b/dione-hadoop/src/test/scala/com/paypal/dione/kvstorage/hadoop/avro/TestAvroBtreeStorageFile.scala
index 32c03a26..6545005d 100644
--- a/dione-hadoop/src/test/scala/com/paypal/dione/kvstorage/hadoop/avro/TestAvroBtreeStorageFile.scala
+++ b/dione-hadoop/src/test/scala/com/paypal/dione/kvstorage/hadoop/avro/TestAvroBtreeStorageFile.scala
@@ -4,6 +4,8 @@ import com.paypal.dione.avro.utils.AvroExtensions
import org.apache.avro.SchemaBuilder
import org.junit.jupiter.api.{Assertions, Test}
+import scala.collection.mutable.ArrayBuffer
+
class TestAvroBtreeStorageFile extends AvroExtensions {
val simpleSchema = SchemaBuilder.record("single_string").fields().requiredString("val1").endRecord()
val simpleSchema2 = SchemaBuilder.record("single_string2").fields().requiredString("val2").endRecord()
@@ -13,6 +15,8 @@ class TestAvroBtreeStorageFile extends AvroExtensions {
val tuplesStorage = AvroBtreeStorageFileFactory(simpleSchema, tupleSchema)
val filename = "TestData/TestAvroBtreeStorageFile"
+ val printDebug = false
+ val strList = new ArrayBuffer[String]
@Test
def testOneLevel(): Unit = {
@@ -44,6 +48,8 @@ class TestAvroBtreeStorageFile extends AvroExtensions {
kvStorageFileWriter.write(entries, filename)
val kvStorageFileReader = simpleStorage.reader(filename)
+ printBtreeAvroFile(kvStorageFileReader)
+
Assertions.assertEquals("002", kvStorageFileReader.get(simpleSchema.createRecord("002")).get.get("val2").toString)
Assertions.assertEquals(None, kvStorageFileReader.get(simpleSchema.createRecord("100")))
Assertions.assertEquals(None, kvStorageFileReader.get(simpleSchema.createRecord("000")))
@@ -122,6 +128,38 @@ class TestAvroBtreeStorageFile extends AvroExtensions {
val kvStorageFileReader = simpleStorage.reader(filename)
Assertions.assertEquals(entries.toList.mkString(","),
kvStorageFileReader.getIterator().map(_._2.get("val2")).toList.mkString(","))
+
+ printBtreeAvroFile(kvStorageFileReader)
+ }
+
+ def printBtreeAvroFile(kvStorageFileReader: AvroBtreeStorageFileReader): Unit = {
+ import scala.collection.JavaConverters._
+ val fileS = kvStorageFileReader.fileReader.getmFileReader
+ fileS.sync(0)
+ val it = fileS.iterator().asScala
+ var block = 0L
+ val headerPos = kvStorageFileReader.fileReader.getFileHeaderEnd
+ printDebug("datasize: " + kvStorageFileReader.fileReader.getmFileReader.getMetaLong("data_bytes"))
+ printDebug("header end at: " + headerPos)
+
+ strList.clear()
+ it.foreach(r => {
+ val lastSync = kvStorageFileReader.fileReader.getmFileReader.previousSync()
+ if (lastSync!=block) {
+ printDebug(r.toString)
+ if (block>0)
+ printDebug("block ^^ at:" + (block-headerPos))
+ block = lastSync
+ } else {
+ printDebug(r.toString)
+ }
+ })
+ }
+
+ def printDebug(log: String): Unit = {
+ if (printDebug)
+ println(log)
+ strList.append(log)
}
@Test
@@ -136,6 +174,43 @@ class TestAvroBtreeStorageFile extends AvroExtensions {
@Test
def testIterator10_3(): Unit = btreeProps(100,10, 3)
+ @Test
+ def testIterator30_3_3(): Unit = {
+ btreeProps(20,2, 3)
+ Assertions.assertEquals(
+ """{val1: 001, val2: 001, metadata: 295}
+ |{val1: 008, val2: 008, metadata: 195}
+ |{val1: 015, val2: 015, metadata: 93}
+ |block ^^ at:0
+ |{val1: 002, val2: 002, metadata: 259}
+ |{val1: 005, val2: 005, metadata: 227}
+ |block ^^ at:45
+ |{val1: 003, val2: 003, metadata: null}
+ |{val1: 004, val2: 004, metadata: null}
+ |block ^^ at:81
+ |{val1: 006, val2: 006, metadata: null}
+ |{val1: 007, val2: 007, metadata: null}
+ |block ^^ at:113
+ |{val1: 009, val2: 009, metadata: 157}
+ |{val1: 012, val2: 012, metadata: 125}
+ |block ^^ at:145
+ |{val1: 010, val2: 010, metadata: null}
+ |{val1: 011, val2: 011, metadata: null}
+ |block ^^ at:183
+ |{val1: 013, val2: 013, metadata: null}
+ |{val1: 014, val2: 014, metadata: null}
+ |block ^^ at:215
+ |{val1: 016, val2: 016, metadata: 59}
+ |{val1: 019, val2: 019, metadata: 27}
+ |block ^^ at:247
+ |{val1: 017, val2: 017, metadata: null}
+ |{val1: 018, val2: 018, metadata: null}
+ |block ^^ at:281
+ |{val1: 020, val2: 020, metadata: null}
+ |block ^^ at:313""".stripMargin,
+ strList.mkString("\n").replaceAll("\"", ""))
+ }
+
@Test
def testIteratorWithKey(): Unit = {
val kvStorageFileWriter = simpleStorage.writer(2, 3)