44import os .path
55import sys
66import logging
7+ import threading
8+ import queue
79# from profilehooks import profile
810from cog .cache import Cache
911import xxhash
1315
1416
1517class TableMeta :
18+ __slots__ = ('name' , 'namespace' , 'db_instance_id' , 'column_mode' )
1619
1720 def __init__ (self , name , namespace , db_instance_id , column_mode ):
1821 self .name = name
@@ -23,10 +26,12 @@ def __init__(self, name, namespace, db_instance_id, column_mode):
2326
2427class Table :
2528
26- def __init__ (self , name , namespace , db_instance_id , config , column_mode = False , shared_cache = None ):
29+ def __init__ (self , name , namespace , db_instance_id , config , column_mode = False , shared_cache = None ,
30+ flush_interval = 1 ):
2731 self .logger = logging .getLogger ('table' )
2832 self .config = config
2933 self .shared_cache = shared_cache
34+ self .flush_interval = flush_interval
3035 self .table_meta = TableMeta (name , namespace , db_instance_id , column_mode )
3136 self .indexer = self .__create_indexer ()
3237 self .store = self .__create_store (shared_cache )
@@ -35,7 +40,12 @@ def __create_indexer(self):
3540 return Indexer (self .table_meta , self .config , self .logger )
3641
3742 def __create_store (self , shared_cache ):
38- return Store (self .table_meta , self .config , self .logger , shared_cache = shared_cache )
43+ return Store (self .table_meta , self .config , self .logger , shared_cache = shared_cache ,
44+ flush_interval = self .flush_interval )
45+
46+ def sync (self ):
47+ """Force flush pending writes to disk."""
48+ self .store .sync ()
3949
4050 def close (self ):
4151 self .indexer .close ()
@@ -48,6 +58,8 @@ class Record:
4858 Record is the basic unit of storage in cog.
4959 value_type: s - string, l - list, u - set
5060 '''
61+ __slots__ = ('key' , 'value' , 'tombstone' , 'store_position' , 'key_link' , 'value_link' , 'value_type' )
62+
5163 RECORD_LINK_LEN = 16
5264 RECORD_LINK_NULL = - 1
5365 VALUE_LINK_NULL = - 1
@@ -229,9 +241,9 @@ def put(self, key, store_position, store):
229241 self .db_mem [orig_position : orig_position + self .config .INDEX_BLOCK_LEN ] = self .get_index_key (store_position )
230242 else :
231243 # there are records in the bucket
232- # read existing record from the store
233- existing_record = Record .load_from_store ( int (index_value ), store )
234- existing_record .set_store_position (int (index_value )) # this is a bit confusing, should clean up.
244+ # read existing record from the store - use unmarshal, not load_from_store (O(1) vs O(n))
245+ existing_record = Record .unmarshal ( store . read ( int (index_value )) )
246+ existing_record .set_store_position (int (index_value ))
235247
236248 if existing_record .key == key :
237249 # the record at the top of the bucket has the same key, update the record in place.
@@ -248,7 +260,8 @@ def put(self, key, store_position, store):
248260 # check if this record exists in the bucket, if yes remove pointer.
249261 prev_record = None
250262 while existing_record .key_link != Record .RECORD_LINK_NULL :
251- existing_record = Record .load_from_store (existing_record .key_link , store )
263+ # Use unmarshal (O(1)) instead of load_from_store (O(n))
264+ existing_record = Record .unmarshal (store .read (existing_record .key_link ))
252265 existing_record .set_store_position (existing_record .key_link )
253266 if existing_record .key == key and prev_record is not None :
254267 """
@@ -299,6 +312,34 @@ def get(self, key, store):
299312 return record
300313 return None
301314
315+ def get_head_only (self , key , store ):
316+ """
317+ Get only the head record without traversing the value chain.
318+ This is O(1) compared to get() which is O(n) for multi-value keys.
319+
320+ Returns: (record, store_position) or (None, None)
321+ """
322+ index_position , raw_hash = self .get_index (key )
323+ data_at_index_position = self .db_mem [index_position :index_position + self .config .INDEX_BLOCK_LEN ]
324+ if data_at_index_position == self .empty_block :
325+ return None , None
326+ store_position = int (data_at_index_position )
327+ # Only unmarshal, don't load value chain
328+ record = Record .unmarshal (store .read (store_position ))
329+ record .set_store_position (store_position )
330+
331+ if record .key == key :
332+ return record , store_position
333+ else :
334+ # Hash collision - follow key_link chain
335+ while record .key_link != Record .RECORD_LINK_NULL :
336+ store_position = record .key_link
337+ record = Record .unmarshal (store .read (store_position ))
338+ record .set_store_position (store_position )
339+ if record .key == key :
340+ return record , store_position
341+ return None , None
342+
302343 '''
303344 Iterates through record in itr_store.
304345 '''
@@ -364,26 +405,125 @@ def flush(self):
364405
365406
366407class Store :
367-
368- def __init__ (self , tablemeta , config , logger , caching_enabled = True , shared_cache = None ):
408+ """
409+ Store manages persistence of records to disk with configurable flush behavior.
410+
411+ Args:
412+ flush_interval: Number of writes before auto-flush.
413+ 1 = flush every write (safest, default)
414+ 0 = manual flush only (fastest, use sync())
415+ N>1 = flush every N writes with async background thread
416+ """
417+
418+ def __init__ (self , tablemeta , config , logger , caching_enabled = True , shared_cache = None ,
419+ flush_interval = 1 ):
369420 self .caching_enabled = caching_enabled
370421 self .batch_mode = False # When True, defers flush() until end_batch()
371422 self .logger = logging .getLogger ('store' )
372423 self .tablemeta = tablemeta
373424 self .config = config
425+ self .flush_interval = flush_interval
426+ self .write_count = 0
427+ self ._closed = False
374428 self .empty_block = '-1' .zfill (self .config .INDEX_BLOCK_LEN ).encode ()
375429 self .store = self .config .cog_store (
376430 tablemeta .namespace , tablemeta .name , tablemeta .db_instance_id )
377431 self .store_cache = Cache (self .store , shared_cache )
378432 temp = open (self .store , 'a' ) # create if not exist
379433 temp .close ()
380434 self .store_file = open (self .store , 'rb+' )
381- logger .info ("Store for file init: " + self .store )
435+
436+ # Thread safety
437+ self ._lock = threading .Lock ()
438+
439+ # Auto-enable async flush when interval > 1
440+ self ._use_async = flush_interval > 1
441+ if self ._use_async :
442+ self ._flush_queue = queue .Queue ()
443+ self ._flush_thread = threading .Thread (target = self ._flush_worker , daemon = True )
444+ self ._flush_thread .start ()
445+ self ._shutdown = False
446+
447+ logger .info (f"Store init: { self .store } (flush_interval={ flush_interval } )" )
448+
449+ def _flush_worker (self ):
450+ """Background thread that processes flush requests."""
451+ while True :
452+ try :
453+ # Wait for flush signal (blocks until item available)
454+ item = self ._flush_queue .get (timeout = 1.0 )
455+ if item == "SHUTDOWN" :
456+ # Drain remaining items and shutdown
457+ while not self ._flush_queue .empty ():
458+ try :
459+ self ._flush_queue .get_nowait ()
460+ self ._flush_queue .task_done ()
461+ except queue .Empty :
462+ break
463+ self ._flush_queue .task_done ()
464+ break
465+ # Perform actual flush (check if not closed)
466+ if not self ._closed :
467+ with self ._lock :
468+ if not self ._closed :
469+ self .store_file .flush ()
470+ self ._flush_queue .task_done ()
471+ except queue .Empty :
472+ # Timeout - check if we should continue
473+ if getattr (self , '_shutdown' , False ):
474+ break
475+ continue
476+
477+ def _request_flush (self ):
478+ """Request a flush - async if interval > 1, sync otherwise."""
479+ if self ._closed :
480+ return
481+ if self ._use_async :
482+ self ._flush_queue .put ("FLUSH" )
483+ else :
484+ self .store_file .flush ()
485+
486+ def _handle_write_flush (self ):
487+ """Increment write count and trigger flush if threshold reached."""
488+ if not self .batch_mode :
489+ self .write_count += 1
490+ if self .flush_interval > 0 and self .write_count >= self .flush_interval :
491+ self ._request_flush ()
492+ self .write_count = 0
493+
494+ def sync (self ):
495+ """
496+ Force flush all pending writes to disk.
497+ Blocks until flush is complete.
498+ """
499+ if self ._closed :
500+ return
501+ with self ._lock :
502+ if not self ._closed :
503+ self .store_file .flush ()
504+ if self ._use_async :
505+ # Wait for async queue to drain
506+ self ._flush_queue .join ()
382507
383508 def close (self ):
384- if self .batch_mode :
385- self .store_file .flush () # Ensure pending writes are flushed on close
386- self .store_file .close ()
509+ """Close the store, ensuring all data is flushed."""
510+ if self ._closed :
511+ return
512+
513+ # Mark as closed first
514+ self ._closed = True
515+
516+ if self ._use_async :
517+ self ._shutdown = True
518+ self ._flush_queue .put ("SHUTDOWN" )
519+ self ._flush_thread .join (timeout = 5.0 )
520+
521+ with self ._lock :
522+ try :
523+ self .store_file .flush ()
524+ self .store_file .close ()
525+ except ValueError :
526+ pass # File already closed
387527
388528 def begin_batch (self ):
389529 """
@@ -396,22 +536,27 @@ def end_batch(self):
396536 """
397537 End batch mode and flush all pending writes to disk.
398538 """
399- self .store_file .flush ()
539+ with self ._lock :
540+ self .store_file .flush ()
400541 self .batch_mode = False
401542
402543 def save (self , record ):
403544 """
404- Store data
545+ Store data with configurable flush behavior.
405546 """
406- self .store_file .seek (0 , 2 )
407- store_position = self .store_file .tell ()
408- record .set_store_position (store_position )
409- marshalled_record = record .marshal ()
410- self .store_file .write (marshalled_record )
411- if not self .batch_mode :
412- self .store_file .flush ()
413- if self .caching_enabled :
414- self .store_cache .put (store_position , marshalled_record )
547+ with self ._lock :
548+ self .store_file .seek (0 , 2 )
549+ store_position = self .store_file .tell ()
550+ record .set_store_position (store_position )
551+ marshalled_record = record .marshal ()
552+ self .store_file .write (marshalled_record )
553+
554+ if self .caching_enabled :
555+ self .store_cache .put (store_position , marshalled_record )
556+
557+ # Handle flush based on interval
558+ self ._handle_write_flush ()
559+
415560 return store_position
416561
417562 def update_record_link_inplace (self , start_pos , int_value ):
@@ -421,13 +566,15 @@ def update_record_link_inplace(self, start_pos, int_value):
421566
422567 byte_value = str (int_value ).encode ().rjust (Record .RECORD_LINK_LEN )
423568 self .logger .debug ('update_record_link_inplace: ' + str (byte_value ))
424- self .store_file .seek (start_pos )
425- self .store_file .write (byte_value )
569+
570+ with self ._lock :
571+ self .store_file .seek (start_pos )
572+ self .store_file .write (byte_value )
426573
427- if self .caching_enabled :
428- self .store_cache .partial_update_from_zero_index (start_pos , byte_value )
429- if not self . batch_mode :
430- self .store_file . flush ()
574+ if self .caching_enabled :
575+ self .store_cache .partial_update_from_zero_index (start_pos , byte_value )
576+
577+ self ._handle_write_flush ()
431578
432579 # @profile
433580 def read (self , position ):
@@ -518,6 +665,11 @@ def get(self, key, store):
518665 idx = self .index_list [0 ] # only one index file.
519666 return idx .get (key , store )
520667
668+ def get_head_only (self , key , store ):
669+ """Get head record only, O(1) - doesn't traverse value chain."""
670+ idx = self .index_list [0 ]
671+ return idx .get_head_only (key , store )
672+
521673 def scanner (self , store ):
522674 for idx in self .index_list :
523675 self .logger .debug ("SCAN: index: " + idx .name )
@@ -531,6 +683,5 @@ def delete(self, key, store):
531683 else :
532684 return False
533685
534-
535686def cog_hash (string , index_capacity ):
536687 return xxhash .xxh32 (string , seed = 2 ).intdigest () % index_capacity
0 commit comments