Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 84 additions & 12 deletions openhands-sdk/openhands/sdk/conversation/event_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,34 @@

logger = get_logger(__name__)

LOCK_FILE_NAME = ".eventlog.lock"
LOCK_TIMEOUT_SECONDS = 30


class EventLog(EventsListBase):
"""Persistent event log with locking for concurrent writes.

This class provides thread-safe and process-safe event storage using
the FileStore's locking mechanism. Events are persisted to disk and
can be accessed by index or event ID.

Note:
For LocalFileStore, file locking via flock() does NOT work reliably
on NFS mounts or network filesystems. Users deploying with shared
storage should use alternative coordination mechanisms.
"""

_fs: FileStore
_dir: str
_length: int
_lock_path: str

def __init__(self, fs: FileStore, dir_path: str = EVENTS_DIR) -> None:
self._fs = fs
self._dir = dir_path
self._id_to_idx: dict[EventID, int] = {}
self._idx_to_id: dict[int, EventID] = {}
self._lock_path = f"{dir_path}/{LOCK_FILE_NAME}"
self._length = self._scan_and_build_index()

def get_index(self, event_id: EventID) -> int:
Expand Down Expand Up @@ -54,7 +71,6 @@ def __getitem__(self, idx: SupportsIndex | slice) -> Event | list[Event]:
if isinstance(idx, slice):
start, stop, step = idx.indices(self._length)
return [self._get_single_item(i) for i in range(start, stop, step)]
# idx is int-like (SupportsIndex)
return self._get_single_item(idx)

def _get_single_item(self, idx: SupportsIndex) -> Event:
Expand All @@ -75,26 +91,82 @@ def __iter__(self) -> Iterator[Event]:
continue
evt = Event.model_validate_json(txt)
evt_id = evt.id
# only backfill mapping if missing
if i not in self._idx_to_id:
self._idx_to_id[i] = evt_id
self._id_to_idx.setdefault(evt_id, i)
yield evt

def append(self, event: Event) -> None:
"""Append an event with locking for thread/process safety.

Raises:
TimeoutError: If the lock cannot be acquired within LOCK_TIMEOUT_SECONDS.
ValueError: If an event with the same ID already exists.
"""
evt_id = event.id
# Check for duplicate ID
if evt_id in self._id_to_idx:
existing_idx = self._id_to_idx[evt_id]
raise ValueError(
f"Event with ID '{evt_id}' already exists at index {existing_idx}"

try:
with self._fs.lock(self._lock_path, timeout=LOCK_TIMEOUT_SECONDS):
# Sync with disk in case another process wrote while we waited
disk_length = self._count_events_on_disk()
if disk_length > self._length:
self._sync_from_disk(disk_length)

if evt_id in self._id_to_idx:
existing_idx = self._id_to_idx[evt_id]
raise ValueError(
f"Event with ID '{evt_id}' already exists at index "
f"{existing_idx}"
)

target_path = self._path(self._length, event_id=evt_id)
self._fs.write(target_path, event.model_dump_json(exclude_none=True))
self._idx_to_id[self._length] = evt_id
self._id_to_idx[evt_id] = self._length
self._length += 1
except TimeoutError:
logger.error(
f"Failed to acquire EventLog lock within {LOCK_TIMEOUT_SECONDS}s "
f"for event {evt_id}"
)
raise

path = self._path(self._length, event_id=evt_id)
self._fs.write(path, event.model_dump_json(exclude_none=True))
self._idx_to_id[self._length] = evt_id
self._id_to_idx[evt_id] = self._length
self._length += 1
def _count_events_on_disk(self) -> int:
"""Count event files on disk."""
try:
paths = self._fs.list(self._dir)
except FileNotFoundError:
# Directory doesn't exist yet - expected for new event logs
return 0
except Exception as e:
logger.warning("Error listing event directory %s: %s", self._dir, e)
return 0
return sum(
1
for p in paths
if p.rsplit("/", 1)[-1].startswith("event-") and p.endswith(".json")
)

def _sync_from_disk(self, disk_length: int) -> None:
"""Sync state for events written by other processes.

Preserves existing index mappings and only scans new events.
"""
# Preserve existing mappings
existing_idx_to_id = dict(self._idx_to_id)

# Re-scan to pick up new events
scanned_length = self._scan_and_build_index()

# Restore any mappings that were lost (e.g., for non-UUID event IDs)
for idx, evt_id in existing_idx_to_id.items():
if idx not in self._idx_to_id:
self._idx_to_id[idx] = evt_id
if evt_id not in self._id_to_idx:
self._id_to_idx[evt_id] = idx

# Use the higher of scanned length or disk_length
self._length = max(scanned_length, disk_length)

def __len__(self) -> int:
return self._length
Expand Down
52 changes: 52 additions & 0 deletions openhands-sdk/openhands/sdk/io/base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from abc import ABC, abstractmethod
from collections.abc import Iterator
from contextlib import contextmanager


class FileStore(ABC):
"""Abstract base class for file storage operations.

This class defines the interface for file storage backends that can
handle basic file operations like reading, writing, listing, and deleting files.

Implementations should provide a locking mechanism via the `lock()` context
manager for thread/process-safe operations.
"""

@abstractmethod
Expand Down Expand Up @@ -46,3 +51,50 @@ def delete(self, path: str) -> None:
Args:
path: The file or directory path to delete.
"""

@abstractmethod
def exists(self, path: str) -> bool:
"""Check if a file or directory exists at the specified path.

Args:
path: The file or directory path to check.

Returns:
True if the path exists, False otherwise.
"""

@abstractmethod
def get_absolute_path(self, path: str) -> str:
"""Get the absolute filesystem path for a given relative path.

Args:
path: The relative path within the file store.

Returns:
The absolute path on the filesystem.
"""

@abstractmethod
@contextmanager
def lock(self, path: str, timeout: float = 30.0) -> Iterator[None]:
"""Acquire an exclusive lock for the given path.

This context manager provides thread and process-safe locking.
Implementations may use file-based locking, threading locks, or
other mechanisms as appropriate.

Args:
path: The path to lock (used to identify the lock).
timeout: Maximum seconds to wait for lock acquisition.

Yields:
None when lock is acquired.

Raises:
TimeoutError: If lock cannot be acquired within timeout.

Note:
File-based locking (flock) does NOT work reliably on NFS mounts
or network filesystems.
"""
yield # pragma: no cover
25 changes: 25 additions & 0 deletions openhands-sdk/openhands/sdk/io/local.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import os
import shutil
from collections.abc import Iterator
from contextlib import contextmanager

from filelock import FileLock, Timeout

from openhands.sdk.io.cache import MemoryLRUCache
from openhands.sdk.logger import get_logger
Expand Down Expand Up @@ -117,3 +121,24 @@ def delete(self, path: str) -> None:

except Exception as e:
logger.error(f"Error clearing local file store: {str(e)}")

def exists(self, path: str) -> bool:
"""Check if a file or directory exists."""
return os.path.exists(self.get_full_path(path))

def get_absolute_path(self, path: str) -> str:
"""Get absolute filesystem path."""
return self.get_full_path(path)

@contextmanager
def lock(self, path: str, timeout: float = 30.0) -> Iterator[None]:
"""Acquire file-based lock using flock."""
lock_path = self.get_full_path(path)
os.makedirs(os.path.dirname(lock_path), exist_ok=True)
file_lock = FileLock(lock_path)
try:
with file_lock.acquire(timeout=timeout):
yield
except Timeout:
logger.error(f"Failed to acquire lock within {timeout}s: {lock_path}")
raise TimeoutError(f"Lock acquisition timed out: {path}")
35 changes: 34 additions & 1 deletion openhands-sdk/openhands/sdk/io/memory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import os
import threading
import uuid
from collections.abc import Iterator
from contextlib import contextmanager

from openhands.sdk.io.base import FileStore
from openhands.sdk.logger import get_logger
Expand All @@ -9,9 +13,13 @@

class InMemoryFileStore(FileStore):
files: dict[str, str]
_instance_id: str
_lock: threading.Lock

def __init__(self, files: dict[str, str] | None = None) -> None:
self.files = {}
self._instance_id = uuid.uuid4().hex
self._lock = threading.Lock()
if files is not None:
self.files = files

Expand Down Expand Up @@ -51,4 +59,29 @@ def delete(self, path: str) -> None:
del self.files[key]
logger.debug(f"Cleared in-memory file store: {path}")
except Exception as e:
logger.error(f"Error clearing in-memory file store: {str(e)}")
logger.error(f"Error clearing in-memory file store: {e}")

def exists(self, path: str) -> bool:
"""Check if a file exists."""
if path in self.files:
return True
return any(f.startswith(path + "/") for f in self.files)

def get_absolute_path(self, path: str) -> str:
"""Get absolute path (uses temp dir with unique instance ID)."""
import tempfile

return os.path.join(
tempfile.gettempdir(), f"openhands_inmemory_{self._instance_id}", path
)

@contextmanager
def lock(self, path: str, timeout: float = 30.0) -> Iterator[None]:
"""Acquire thread lock for in-memory store."""
acquired = self._lock.acquire(timeout=timeout)
if not acquired:
raise TimeoutError(f"Lock acquisition timed out: {path}")
try:
yield
finally:
self._lock.release()
3 changes: 2 additions & 1 deletion openhands-sdk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ requires-python = ">=3.12"
dependencies = [
"deprecation>=2.1.0",
"fastmcp>=2.11.3",
"filelock>=3.20.1",
"httpx>=0.27.0",
"litellm>=1.80.10",
"pydantic>=2.12.5",
"python-frontmatter>=1.1.0",
"python-json-logger>=3.3.0",
"tenacity>=9.1.2",
"websockets>=12",
"lmnr>=0.7.24"
"lmnr>=0.7.24",
]

[project.optional-dependencies]
Expand Down
69 changes: 64 additions & 5 deletions tests/sdk/conversation/test_event_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,14 @@ def test_event_log_index_gaps_detection():

def test_event_log_file_store_exceptions():
"""Test handling of file store exceptions."""
# Mock file store that raises exceptions
import tempfile

mock_fs = Mock()
mock_fs.list.side_effect = Exception("File system error")

# Should handle gracefully
log = EventLog(mock_fs)
assert len(log) == 0
with tempfile.TemporaryDirectory() as temp_dir:
mock_fs.get_absolute_path.return_value = f"{temp_dir}/.eventlog.lock"
log = EventLog(mock_fs)
assert len(log) == 0


def test_event_log_iteration_with_missing_files():
Expand Down Expand Up @@ -343,3 +344,61 @@ def test_event_log_large_index_formatting():

assert log.get_index("large-index-event") == 99999
assert log.get_id(99999) == "large-index-event"


def test_event_log_concurrent_append_thread_safety():
"""Test concurrent appends from multiple threads."""
import tempfile
import threading

from openhands.sdk.io.local import LocalFileStore

with tempfile.TemporaryDirectory() as temp_dir:
fs = LocalFileStore(temp_dir)
log = EventLog(fs)
errors: list[Exception] = []
lock = threading.Lock()

def append_events(thread_id: int, num_events: int):
for i in range(num_events):
try:
event = create_test_event(
f"t{thread_id}-e{i}", f"Thread {thread_id}"
)
log.append(event)
except Exception as e:
with lock:
errors.append(e)

threads = []
for t_id in range(5):
t = threading.Thread(target=append_events, args=(t_id, 10))
threads.append(t)
t.start()

for t in threads:
t.join()

assert len(errors) == 0, f"Errors: {errors}"
assert len(log) == 50


def test_event_log_concurrent_writes_serialized():
"""Test two EventLog instances serialize writes correctly."""
import tempfile

from openhands.sdk.io.local import LocalFileStore

with tempfile.TemporaryDirectory() as temp_dir:
fs = LocalFileStore(temp_dir)
log1 = EventLog(fs)
log2 = EventLog(fs)

log1.append(create_test_event("event-1", "First"))
log2.append(create_test_event("event-2", "Second"))

assert log1._length == 1
assert log2._length == 2

files = [f for f in fs.list("events") if not f.endswith(".lock")]
assert len(files) == 2
Loading
Loading