@@ -2,6 +2,8 @@ package meda
2
2
3
3
import (
4
4
"context"
5
+ "fmt"
6
+ "io"
5
7
6
8
"github.com/jmoiron/sqlx"
7
9
"github.com/pkg/errors"
@@ -419,3 +421,159 @@ func (d *DB) FilesToBeReadFetcher(config *FilesToBeReadFetcherConfig) FilesToBeR
419
421
config : config ,
420
422
}
421
423
}
424
+
425
+ const filesIteratorFetchQuery = GenericQuery (`
426
+ SELECT
427
+ id, rand, path, file_size
428
+ FROM {FILES}
429
+ WHERE
430
+ id > ?
431
+ AND
432
+ id <= ?
433
+ ORDER BY id ASC
434
+ LIMIT ?
435
+ ;
436
+ ` )
437
+
438
+ const filesIteratorNextChunkQuery = GenericQuery (`
439
+ (
440
+ SELECT
441
+ id
442
+ FROM {FILES}
443
+ WHERE id > ?
444
+ ORDER BY id ASC
445
+ LIMIT ?
446
+ )
447
+ ORDER BY id DESC LIMIT 1;
448
+ ` )
449
+
450
+ type FilesIteratorConfig struct {
451
+ ChunkSize uint64
452
+ BatchSize uint64
453
+ }
454
+
455
+ type FilesIterator struct {
456
+ ctx context.Context
457
+ db * DB
458
+ config FilesIteratorConfig
459
+ // it is the ChunkIterator used to limit all queries to only a range
460
+ // of rows.
461
+ // multiple rows.
462
+ chunkIt ChunkIterator
463
+ chunkContainsRows bool
464
+ lastID uint64
465
+
466
+ completed bool
467
+
468
+ err error
469
+ batch []File
470
+ i int
471
+ }
472
+
473
+ func (f * FilesIterator ) Next () bool {
474
+ if f .err != nil {
475
+ return false
476
+ }
477
+
478
+ if f .i + 1 < len (f .batch ) {
479
+ f .i ++
480
+ } else if f .completed {
481
+ return false
482
+ } else {
483
+ err := f .fetchNextBatch ()
484
+ if err == io .EOF {
485
+ f .completed = true
486
+ return false
487
+ } else if err != nil {
488
+ f .err = err
489
+ return false
490
+ }
491
+ f .i = 0
492
+ }
493
+
494
+ return true
495
+ }
496
+
497
+ func (f * FilesIterator ) Element () * File {
498
+ return & f .batch [f .i ]
499
+ }
500
+
501
+ func (f * FilesIterator ) Error () error {
502
+ return f .err
503
+ }
504
+
505
+ func (f * FilesIterator ) initialise () {
506
+ f .chunkIt = ChunkIterator {
507
+ ChunkSize : f .config .ChunkSize ,
508
+ NextChunkQuery : filesIteratorNextChunkQuery .SubstituteAll (f .db ),
509
+ }
510
+ }
511
+
512
+ func (f * FilesIterator ) fetchNextBatch () error {
513
+ if f .chunkIt .NextChunkQuery == "" {
514
+ f .initialise ()
515
+ }
516
+
517
+ queryLimit := f .config .BatchSize
518
+
519
+ for {
520
+ if ! f .chunkContainsRows {
521
+ ok , err := f .advanceToNextChunk ()
522
+ if err != nil {
523
+ return fmt .Errorf ("(*FilesIterator).fetchNextBatch: %w" , err )
524
+ } else if ! ok {
525
+ return io .EOF
526
+ }
527
+ }
528
+
529
+ rows , err := f .db .QueryxContext (
530
+ f .ctx ,
531
+ filesIteratorFetchQuery .SubstituteAll (f .db ),
532
+ f .lastID ,
533
+ f .chunkIt .LastID (),
534
+ queryLimit ,
535
+ )
536
+ if err != nil {
537
+ return fmt .Errorf ("(*FilesIterator).fetchNextBatch: querying db: %w" , err )
538
+ }
539
+
540
+ f .batch , err = filesAppendFromRowsAndClose (f .batch [:0 ], rows )
541
+ if err != nil {
542
+ return fmt .Errorf ("(*FilesIterator).fetchNextBatch: %w" , err )
543
+ }
544
+
545
+ if uint64 (len (f .batch )) < queryLimit {
546
+ // less rows returned than requested as chunk is exhausted
547
+ f .chunkContainsRows = false
548
+ }
549
+
550
+ if len (f .batch ) > 0 {
551
+ // at least one file was fetched
552
+
553
+ f .lastID = f .batch [len (f .batch )- 1 ].ID
554
+
555
+ return nil
556
+ }
557
+ }
558
+ }
559
+
560
+ func (f * FilesIterator ) advanceToNextChunk () (bool , error ) {
561
+ if ! f .chunkIt .Next (f .ctx , f .db ) {
562
+ if f .chunkIt .Err () != nil {
563
+ return false , fmt .Errorf ("(*FilesIterator).advanceToNextChunk: %w" , f .chunkIt .Err ())
564
+ }
565
+ // no more chunks, chunkIterator exhausted
566
+ return false , nil
567
+ }
568
+
569
+ f .chunkContainsRows = true
570
+ return true , nil
571
+ }
572
+
573
+ func (d * DB ) FilesIterator (ctx context.Context , config FilesIteratorConfig ) FilesIterator {
574
+ return FilesIterator {
575
+ ctx : ctx ,
576
+ db : d ,
577
+ config : config ,
578
+ }
579
+ }
0 commit comments