Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ async def start(cls, config: dict[str, Any]) -> dict[str, Any]:
data_path = home / ".syft-space" / "chromadb"
await data_path.mkdir(parents=True, exist_ok=True)

# Recover from corrupted (0-byte) database before anything else
await cls._verify_database_integrity(data_path)

# PID file for process tracking
pid_file = data_path / "chromadb.pid"

Expand Down Expand Up @@ -139,8 +142,21 @@ async def _log_stream(stream: asyncio.StreamReader, level: str) -> None:
if proc.stderr:
asyncio.create_task(_log_stream(proc.stderr, "err"))

# Wait for health
await cls._wait_for_healthy(http_port)
# Wait for health — clean up on any startup failure
try:
await cls._wait_for_healthy(http_port, proc=proc)
except (TimeoutError, RuntimeError):
logger.error(f"ChromaDB process {proc.pid} failed to start, cleaning up")
await cls.stop({"pid": proc.pid, "pid_file": str(pid_file)})
raise

# Final liveness check: ensure our child is still the one serving
if proc.returncode is not None:
await cls.stop({"pid": proc.pid, "pid_file": str(pid_file)})
raise RuntimeError(
f"ChromaDB process {proc.pid} exited (code {proc.returncode}) "
"after passing health check — another process may hold the port"
)

return {
"pid": proc.pid,
Expand All @@ -167,15 +183,19 @@ async def stop(cls, state: dict[str, Any]) -> None:
else:
os.kill(pid, signal.SIGTERM)

# Wait for graceful shutdown
await asyncio.sleep(2.0)

# Check if still running, force kill if needed
if await cls._is_process_running(pid):
os.kill(pid, signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM)
logger.info(f"Force killed ChromaDB process {pid}")
# Poll for graceful exit (up to 10s) to avoid SIGKILL corruption
for _ in range(20): # 20 * 0.5s = 10s
await asyncio.sleep(0.5)
if not await cls._is_process_running(pid):
logger.info(f"ChromaDB process {pid} stopped gracefully")
break
else:
logger.info(f"ChromaDB process {pid} stopped gracefully")
# Still running after 10s — force kill as last resort
os.kill(
pid,
signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM,
)
logger.warning(f"Force killed ChromaDB process {pid} after 10s")

except (OSError, ProcessLookupError):
logger.info(f"ChromaDB process {pid} already stopped")
Expand Down Expand Up @@ -243,6 +263,24 @@ async def wait_until_ready(cls, state: dict[str, Any]) -> None:
raise TimeoutError("No HTTP port in provisioner state")
await cls._wait_for_healthy(int(http_port))

@classmethod
async def _verify_database_integrity(cls, data_path: AsyncPath) -> None:
"""Delete ChromaDB's internal database if it is 0 bytes (corrupted).

A 0-byte ``chroma.sqlite3`` is produced when the process is killed
mid-write (e.g. by process-wick SIGKILL). ChromaDB cannot recover
from this on its own and will fail with "no such table: tenants".
Removing the file lets ChromaDB recreate it on the next start.
"""
db_file = data_path / "chroma.sqlite3"
if await db_file.exists():
stat = await db_file.stat()
if stat.st_size == 0:
logger.warning(
"Detected 0-byte chroma.sqlite3 — deleting corrupted database"
)
await db_file.unlink()

@classmethod
async def _is_process_running(cls, pid: int) -> bool:
"""Check if a ChromaDB process is running.
Expand Down Expand Up @@ -275,19 +313,32 @@ async def _is_process_running(cls, pid: int) -> bool:
return False

@classmethod
async def _wait_for_healthy(cls, http_port: int, timeout: float = 60.0) -> None:
async def _wait_for_healthy(
cls,
http_port: int,
timeout: float = 60.0,
proc: asyncio.subprocess.Process | None = None,
) -> None:
"""Wait for ChromaDB to be healthy.

Args:
http_port: HTTP port to check
timeout: Timeout in seconds
proc: Optional subprocess to monitor for early exit

Raises:
TimeoutError: If not healthy within timeout
RuntimeError: If the subprocess exits before becoming healthy
"""
start_time = time.time()

while time.time() - start_time < timeout:
# Fail fast if the child process already exited
if proc is not None and proc.returncode is not None:
raise RuntimeError(
f"ChromaDB process exited with code {proc.returncode} "
"before becoming healthy"
)
if await cls._check_health(http_port):
logger.info("ChromaDB is healthy")
return
Expand Down
11 changes: 7 additions & 4 deletions backend/syft_space/components/ingestion/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,9 +518,7 @@ async def _process_single_job(self, job: IngestionJob) -> None:
job.dataset_id, job.tenant_id
)
if not dataset:
logger.info(
f"Dataset deleted during ingestion prep for job {job.id}"
)
logger.info(f"Dataset deleted during ingestion prep for job {job.id}")
await self._ingestion_repository.update_status(
job.id,
IngestionJobStatus.CANCELLED,
Expand All @@ -529,7 +527,12 @@ async def _process_single_job(self, job: IngestionJob) -> None:
return

# Call ingest (native async)
await dataset_type.ingest(ctx, ingest_request)
try:
await dataset_type.ingest(ctx, ingest_request)
except Exception as ingest_err:
raise RuntimeError(
f"[{dataset.dtype}] Ingestion error: {ingest_err}"
) from ingest_err

# Success
await self._ingestion_repository.update_status(
Expand Down
Loading