@@ -275,6 +275,90 @@ public void close() {
275
275
}
276
276
}
277
277
278
+ /**
279
+ * A range-based iterator for RocksDB that merges results from two column families.
280
+ *
281
+ * <p>This iterator supports traversal over two RocksDB column families: one containing timestamped values and
282
+ * another containing non-timestamped values. It ensures that the keys from both column families are merged and
283
+ * sorted lexicographically, respecting the iteration order (forward or reverse) and the specified range
284
+ * boundaries.</p>
285
+ *
286
+ * <h2>Key Features</h2>
287
+ *
288
+ * <ul>
289
+ * <li>Merges results from the "with-timestamp" and "no-timestamp" column families.</li>
290
+ * <li>Supports range-based queries with open or closed boundaries.</li>
291
+ * <li>Handles both forward and reverse iteration seamlessly.</li>
292
+ * <li>Ensures correct handling of inclusive and exclusive upper boundaries.</li>
293
+ * <li>Integrates efficiently with Kafka Streams state store mechanisms.</li>
294
+ * </ul>
295
+ *
296
+ * <h2>Usage</h2>
297
+ *
298
+ * <p>The iterator can be used for different types of range-based operations, such as:
299
+ * <ul>
300
+ * <li>Iterating over all keys within a range.</li>
301
+ * <li>Prefix-based scans (when combined with dynamically calculated range endpoints).</li>
302
+ * <li>Open-ended range queries (e.g., from a given key to the end of the dataset).</li>
303
+ * </ul>
304
+ * </p>
305
+ *
306
+ * <h2>Implementation Details</h2>
307
+ *
308
+ * <p>The class extends {@link AbstractIterator} and implements {@link ManagedKeyValueIterator}. It uses RocksDB's
309
+ * native iterators for efficient traversal of keys within the specified range. Keys from the two column families
310
+ * are merged during iteration, ensuring proper order and de-duplication where applicable.</p>
311
+ *
312
+ * <h3>Key Methods:</h3>
313
+ *
314
+ * <ul>
315
+ * <li><b>{@code makeNext()}:</b> Retrieves the next key-value pair in the merged range, ensuring
316
+ * the result is within the specified range and boundary conditions.</li>
317
+ * <li><b>{@code initializeIterators()}:</b> Initializes the RocksDB iterators based on the specified range and direction.</li>
318
+ * <li><b>{@code isInRange()}:</b> Verifies if the current key-value pair is within the range defined by {@code from} and {@code to}.</li>
319
+ * <li><b>{@code fetchNextKeyValue()}:</b> Determines the next key-value pair to return based on the state of both iterators.</li>
320
+ * </ul>
321
+ *
322
+ * <h3>Thread Safety:</h3>
323
+ *
324
+ * <p>The iterator is thread-safe for sequential operations but should not be accessed concurrently from multiple
325
+ * threads without external synchronization.</p>
326
+ *
327
+ * <h2>Examples</h2>
328
+ *
329
+ * <h3>Iterate over a range:</h3>
330
+ *
331
+ * <pre>{@code
332
+ * RocksIterator noTimestampIterator = accessor.newIterator(noTimestampColumnFamily);
333
+ * RocksIterator withTimestampIterator = accessor.newIterator(withTimestampColumnFamily);
334
+ *
335
+ * try (RocksDBDualCFRangeIterator iterator = new RocksDBDualCFRangeIterator(
336
+ * new Bytes("keyStart".getBytes()),
337
+ * new Bytes("keyEnd".getBytes()),
338
+ * noTimestampIterator,
339
+ * withTimestampIterator,
340
+ * "storeName",
341
+ * true, // Forward iteration
342
+ * true // Inclusive upper boundary
343
+ * )) {
344
+ * while (iterator.hasNext()) {
345
+ * KeyValue<Bytes, byte[]> entry = iterator.next();
346
+ * System.out.println("Key: " + entry.key + ", Value: " + Arrays.toString(entry.value));
347
+ * }
348
+ * }
349
+ * }</pre>
350
+ *
351
+ * <h2>Exceptions</h2>
352
+ *
353
+ * <ul>
354
+ * <li><b>{@link InvalidStateStoreException}:</b> Thrown if the iterator is accessed after being closed.</li>
355
+ * <li><b>{@link IllegalStateException}:</b> Thrown if the close callback is not properly set before usage.</li>
356
+ * </ul>
357
+ *
358
+ * @see AbstractIterator
359
+ * @see ManagedKeyValueIterator
360
+ * @see RocksDBStore
361
+ */
278
362
private static class RocksDBDualCFRangeIterator extends AbstractIterator <KeyValue <Bytes , byte []>> implements ManagedKeyValueIterator <Bytes , byte []> {
279
363
private Runnable closeCallback ;
280
364
private byte [] noTimestampNext ;
@@ -288,6 +372,20 @@ private static class RocksDBDualCFRangeIterator extends AbstractIterator<KeyValu
288
372
private final byte [] rawLastKey ;
289
373
private volatile boolean open = true ;
290
374
375
+ /**
376
+ * Constructs a new {@code RocksDBDualCFRangeIterator}.
377
+ *
378
+ * <p>Initializes the RocksDB iterators for two column families (timestamped and non-timestamped) and sets up
379
+ * the range and direction for iteration.</p>
380
+ *
381
+ * @param from The starting key of the range. Can be {@code null} for an open range.
382
+ * @param to The ending key of the range. Can be {@code null} for an open range.
383
+ * @param noTimestampIterator The iterator for the non-timestamped column family.
384
+ * @param withTimestampIterator The iterator for the timestamped column family.
385
+ * @param storeName The name of the store associated with this iterator.
386
+ * @param forward {@code true} for forward iteration; {@code false} for reverse iteration.
387
+ * @param toInclusive Whether the upper boundary of the range is inclusive.
388
+ */
291
389
RocksDBDualCFRangeIterator (final Bytes from ,
292
390
final Bytes to ,
293
391
final RocksIterator noTimestampIterator ,
@@ -304,6 +402,15 @@ private static class RocksDBDualCFRangeIterator extends AbstractIterator<KeyValu
304
402
this .rawLastKey = initializeIterators (from , to );
305
403
}
306
404
405
+ /**
406
+ * Retrieves the next key-value pair in the range.
407
+ *
408
+ * <p>This method determines the next key-value pair to return by merging the results from the two column
409
+ * families. If both column families have keys, it selects the one that matches the iteration order and range
410
+ * conditions. Keys outside the specified range are skipped.</p>
411
+ *
412
+ * @return The next {@link KeyValue} pair in the range, or {@code null} if no more elements are available.
413
+ */
307
414
@ Override
308
415
protected KeyValue <Bytes , byte []> makeNext () {
309
416
loadNextKeys ();
@@ -312,24 +419,58 @@ protected KeyValue<Bytes, byte[]> makeNext() {
312
419
return isInRange (next ) ? next : allDone ();
313
420
}
314
421
422
+ /**
423
+ * Returns the next key in the range without advancing the iterator.
424
+ *
425
+ * <p>This method retrieves the key of the next {@link KeyValue} pair that would be returned by {@link #next()},
426
+ * without moving the iterator forward. This is useful for inspecting the next key without affecting the
427
+ * iterator's state.</p>
428
+ *
429
+ * @return The next key as a {@link Bytes} object.
430
+ *
431
+ * @throws NoSuchElementException If there are no more elements in the iterator.
432
+ */
315
433
@ Override
316
434
public Bytes peekNextKey () {
317
435
if (!hasNext ()) throw new NoSuchElementException ();
318
436
return super .peek ().key ;
319
437
}
320
438
439
+ /**
440
+ * Advances the iterator and returns the next key-value pair.
441
+ *
442
+ * @return The next {@link KeyValue} pair in the range.
443
+ *
444
+ * @throws InvalidStateStoreException If the iterator has been closed.
445
+ */
321
446
@ Override
322
447
public synchronized KeyValue <Bytes , byte []> next () {
323
448
ensureOpen ();
324
449
return super .next ();
325
450
}
326
451
452
+ /**
453
+ * Checks if there are more elements available in the range.
454
+ *
455
+ * @return {@code true} if the iterator has more elements; {@code false} otherwise.
456
+ *
457
+ * @throws InvalidStateStoreException If the iterator has been closed.
458
+ */
327
459
@ Override
328
460
public synchronized boolean hasNext () {
329
461
ensureOpen ();
330
462
return super .hasNext ();
331
463
}
332
464
465
+ /**
466
+ * Closes the iterator and releases associated resources.
467
+ *
468
+ * <p>This method ensures that the RocksDB iterators for both column families are properly closed. After this
469
+ * method is called, any subsequent calls to {@link #hasNext()}, {@link #next()}, or {@link #peekNextKey()} will
470
+ * result in an {@link InvalidStateStoreException}.</p>
471
+ *
472
+ * @throws IllegalStateException If the close callback has not been set before calling this method.
473
+ */
333
474
@ Override
334
475
public synchronized void close () {
335
476
if (closeCallback == null ) {
@@ -343,6 +484,11 @@ public synchronized void close() {
343
484
open = false ;
344
485
}
345
486
487
+ /**
488
+ * Registers a callback to be executed when the iterator is closed.
489
+ *
490
+ * @param closeCallback A {@link Runnable} to execute during the {@link #close()} operation.
491
+ */
346
492
@ Override
347
493
public void onClose (final Runnable closeCallback ) {
348
494
this .closeCallback = closeCallback ;
@@ -357,6 +503,15 @@ private KeyValue<Bytes, byte[]> compareAndHandleKeys() {
357
503
}
358
504
}
359
505
506
+ /**
507
+ * Determines the next key-value pair to return.
508
+ *
509
+ * <p>If one of the column family iterators is exhausted, the method returns the result from the other iterator.
510
+ * If both iterators have keys, the method compares the keys and returns the appropriate result based on the
511
+ * iteration direction.</p>
512
+ *
513
+ * @return The next {@link KeyValue} pair to return.
514
+ */
360
515
private KeyValue <Bytes , byte []> fetchNextKeyValue () {
361
516
if (noTimestampNext == null ) {
362
517
return handleWithTimestampOnly ();
@@ -381,6 +536,16 @@ private KeyValue<Bytes, byte[]> handleWithTimestampOnly() {
381
536
return result ;
382
537
}
383
538
539
+ /**
540
+ * Checks if the given key-value pair is within the specified range.
541
+ *
542
+ * <p>The method compares the key against the range's upper boundary ({@code rawLastKey}) and determines if it
543
+ * falls within the range.</p>
544
+ *
545
+ * @param keyValue The key-value pair to check.
546
+ *
547
+ * @return {@code true} if the key is within the range; {@code false} otherwise.
548
+ */
384
549
private boolean isInRange (final KeyValue <Bytes , byte []> keyValue ) {
385
550
if (rawLastKey == null ) return true ; // Open-ended range
386
551
final int comparison = comparator .compare (keyValue .key .get (), rawLastKey );
@@ -389,6 +554,17 @@ private boolean isInRange(final KeyValue<Bytes, byte[]> keyValue) {
389
554
: comparison > 0 || (toInclusive && comparison == 0 );
390
555
}
391
556
557
+ /**
558
+ * Initializes the RocksDB iterators based on the specified range and direction.
559
+ *
560
+ * <p>This method positions the iterators at the starting point of the range and determines the raw byte
561
+ * representation of the upper boundary (if provided).</p>
562
+ *
563
+ * @param from The starting key of the range. Can be {@code null} for an open range.
564
+ * @param to The ending key of the range. Can be {@code null} for an open range.
565
+ *
566
+ * @return The raw byte representation of the upper boundary, or {@code null} if no boundary is specified.
567
+ */
392
568
private byte [] initializeIterators (final Bytes from , final Bytes to ) {
393
569
if (forward ) {
394
570
seekIterator (from , withTimestampIterator , true );
@@ -408,11 +584,22 @@ private void ensureOpen() {
408
584
}
409
585
}
410
586
587
+ /**
588
+ * Loads the next keys from the iterators if they are valid.
589
+ *
590
+ * <p>This method checks whether the next key for each column family is null. If the corresponding iterator is
591
+ * valid, it fetches the next key.</p>
592
+ */
411
593
private void loadNextKeys () {
412
594
if (noTimestampNext == null && noTimestampIterator .isValid ()) noTimestampNext = noTimestampIterator .key ();
413
595
if (withTimestampNext == null && withTimestampIterator .isValid ()) withTimestampNext = withTimestampIterator .key ();
414
596
}
415
597
598
+ /**
599
+ * Advances the given iterator based on the iteration direction.
600
+ *
601
+ * @param iterator The {@link RocksIterator} to advance.
602
+ */
416
603
private void moveIterator (final RocksIterator iterator ) {
417
604
if (forward ) {
418
605
iterator .next ();
@@ -421,6 +608,16 @@ private void moveIterator(final RocksIterator iterator) {
421
608
}
422
609
}
423
610
611
+ /**
612
+ * Seeks the iterator to the specified position.
613
+ *
614
+ * <p>If the target is {@code null}, the iterator is positioned at the start or end of the dataset, depending on
615
+ * the direction.</p>
616
+ *
617
+ * @param iterator The {@link RocksIterator} to seek.
618
+ * @param target The target key to seek to. Can be {@code null}.
619
+ * @param forward {@code true} for forward iteration; {@code false} for reverse.
620
+ */
424
621
private void seekIterator (final Bytes target , final RocksIterator iterator , final boolean forward ) {
425
622
if (target == null ) {
426
623
if (forward ) {
0 commit comments