1919
2020if TYPE_CHECKING :
2121 from code_indexer .server .utils .config_manager import ServerResourceConfig
22+ from code_indexer .server .storage .sqlite_backends import BackgroundJobsSqliteBackend
2223
2324
2425class JobStatus (str , Enum ):
@@ -72,12 +73,16 @@ def __init__(
7273 self ,
7374 storage_path : Optional [str ] = None ,
7475 resource_config : Optional ["ServerResourceConfig" ] = None ,
76+ use_sqlite : bool = False ,
77+ db_path : Optional [str ] = None ,
7578 ):
7679 """Initialize enhanced background job manager.
7780
7881 Args:
79- storage_path: Path for persistent job storage (optional)
82+ storage_path: Path for persistent job storage (JSON file, optional)
8083 resource_config: Resource configuration (limits, timeouts)
84+ use_sqlite: Whether to use SQLite backend instead of JSON file
85+ db_path: Path to SQLite database file (required if use_sqlite=True)
8186 """
8287 self .jobs : Dict [str , BackgroundJob ] = {}
8388 self ._lock = threading .Lock ()
@@ -87,6 +92,18 @@ def __init__(
8792
8893 # Persistence settings
8994 self .storage_path = storage_path
95+ self .use_sqlite = use_sqlite
96+ self .db_path = db_path
97+ self ._sqlite_backend : Optional ["BackgroundJobsSqliteBackend" ] = None
98+
99+ # Initialize SQLite backend if enabled
100+ if self .use_sqlite and self .db_path :
101+ from code_indexer .server .storage .sqlite_backends import (
102+ BackgroundJobsSqliteBackend ,
103+ )
104+
105+ self ._sqlite_backend = BackgroundJobsSqliteBackend (self .db_path )
106+ logging .info ("BackgroundJobManager using SQLite backend" )
90107
91108 # Resource configuration (import here to avoid circular dependency)
92109 if resource_config is None :
@@ -95,9 +112,8 @@ def __init__(
95112 resource_config = ServerResourceConfig ()
96113 self .resource_config = resource_config
97114
98- # Load persisted jobs if storage path provided
99- if self .storage_path :
100- self ._load_jobs ()
115+ # Load persisted jobs
116+ self ._load_jobs ()
101117
102118 # Background job manager initialized silently
103119
@@ -642,10 +658,16 @@ def get_jobs_by_operation_and_params(
642658
643659 def _persist_jobs (self ) -> None :
644660 """
645- Persist jobs to storage file.
661+ Persist jobs to storage (SQLite or JSON file) .
646662
647663 Note: This method should be called within a lock.
648664 """
665+ # Use SQLite backend if enabled
666+ if self ._sqlite_backend :
667+ self ._persist_jobs_sqlite ()
668+ return
669+
670+ # Fall back to JSON file storage
649671 if not self .storage_path :
650672 return
651673
@@ -678,10 +700,74 @@ def _persist_jobs(self) -> None:
678700 logging .error (f"Failed to persist jobs: { e } " )
679701 # TODO: Consider implementing retry logic for failed persistence attempts
680702
703+ def _persist_jobs_sqlite (self ) -> None :
704+ """
705+ Persist all in-memory jobs to SQLite.
706+
707+ Note: This method should be called within a lock.
708+ """
709+ if not self ._sqlite_backend :
710+ return
711+
712+ try :
713+ for job_id , job in self .jobs .items ():
714+ # Check if job exists in database
715+ existing = self ._sqlite_backend .get_job (job_id )
716+ if existing :
717+ # Update existing job
718+ self ._sqlite_backend .update_job (
719+ job_id = job_id ,
720+ status = job .status .value ,
721+ started_at = job .started_at .isoformat () if job .started_at else None ,
722+ completed_at = job .completed_at .isoformat () if job .completed_at else None ,
723+ result = job .result ,
724+ error = job .error ,
725+ progress = job .progress ,
726+ cancelled = job .cancelled ,
727+ resolution_attempts = job .resolution_attempts ,
728+ claude_actions = job .claude_actions ,
729+ failure_reason = job .failure_reason ,
730+ extended_error = job .extended_error ,
731+ language_resolution_status = job .language_resolution_status ,
732+ )
733+ else :
734+ # Insert new job
735+ self ._sqlite_backend .save_job (
736+ job_id = job_id ,
737+ operation_type = job .operation_type ,
738+ status = job .status .value ,
739+ created_at = job .created_at .isoformat (),
740+ started_at = job .started_at .isoformat () if job .started_at else None ,
741+ completed_at = job .completed_at .isoformat () if job .completed_at else None ,
742+ result = job .result ,
743+ error = job .error ,
744+ progress = job .progress ,
745+ username = job .username ,
746+ is_admin = job .is_admin ,
747+ cancelled = job .cancelled ,
748+ repo_alias = job .repo_alias ,
749+ resolution_attempts = job .resolution_attempts ,
750+ claude_actions = job .claude_actions ,
751+ failure_reason = job .failure_reason ,
752+ extended_error = job .extended_error ,
753+ language_resolution_status = job .language_resolution_status ,
754+ )
755+ except Exception as e :
756+ logging .error (f"Failed to persist jobs to SQLite: { e } " )
757+
758+ # Maximum number of jobs to load from SQLite into memory at startup
759+ MAX_JOBS_TO_LOAD = 10000
760+
681761 def _load_jobs (self ) -> None :
682762 """
683- Load jobs from storage file.
763+ Load jobs from storage (SQLite or JSON file) .
684764 """
765+ # Use SQLite backend if enabled
766+ if self ._sqlite_backend :
767+ self ._load_jobs_sqlite ()
768+ return
769+
770+ # Fall back to JSON file storage
685771 if not self .storage_path :
686772 return
687773
@@ -711,6 +797,34 @@ def _load_jobs(self) -> None:
711797 except Exception as e :
712798 logging .error (f"Failed to load jobs from storage: { e } " )
713799
800+ def _load_jobs_sqlite (self ) -> None :
801+ """
802+ Load jobs from SQLite database into memory.
803+ """
804+ if not self ._sqlite_backend :
805+ return
806+
807+ try :
808+ stored_jobs = self ._sqlite_backend .list_jobs (limit = self .MAX_JOBS_TO_LOAD )
809+
810+ for job_dict in stored_jobs :
811+ # Convert ISO strings back to datetime objects
812+ for field in ["created_at" , "started_at" , "completed_at" ]:
813+ if job_dict .get (field ) is not None :
814+ job_dict [field ] = datetime .fromisoformat (job_dict [field ])
815+
816+ # Convert string status back to enum
817+ job_dict ["status" ] = JobStatus (job_dict ["status" ])
818+
819+ # Create job object
820+ job = BackgroundJob (** job_dict )
821+ self .jobs [job_dict ["job_id" ]] = job
822+
823+ logging .info (f"Loaded { len (stored_jobs )} jobs from SQLite" )
824+
825+ except Exception as e :
826+ logging .error (f"Failed to load jobs from SQLite: { e } " )
827+
714828 def _calculate_cutoff (self , time_filter : str ) -> datetime :
715829 """
716830 Calculate cutoff datetime based on time filter.
0 commit comments