diff --git a/openhands-sdk/openhands/sdk/conversation/event_store.py b/openhands-sdk/openhands/sdk/conversation/event_store.py index 677f468059..cec748b65d 100644 --- a/openhands-sdk/openhands/sdk/conversation/event_store.py +++ b/openhands-sdk/openhands/sdk/conversation/event_store.py @@ -16,17 +16,34 @@ logger = get_logger(__name__) +LOCK_FILE_NAME = ".eventlog.lock" +LOCK_TIMEOUT_SECONDS = 30 + class EventLog(EventsListBase): + """Persistent event log with locking for concurrent writes. + + This class provides thread-safe and process-safe event storage using + the FileStore's locking mechanism. Events are persisted to disk and + can be accessed by index or event ID. + + Note: + For LocalFileStore, file locking via flock() does NOT work reliably + on NFS mounts or network filesystems. Users deploying with shared + storage should use alternative coordination mechanisms. + """ + _fs: FileStore _dir: str _length: int + _lock_path: str def __init__(self, fs: FileStore, dir_path: str = EVENTS_DIR) -> None: self._fs = fs self._dir = dir_path self._id_to_idx: dict[EventID, int] = {} self._idx_to_id: dict[int, EventID] = {} + self._lock_path = f"{dir_path}/{LOCK_FILE_NAME}" self._length = self._scan_and_build_index() def get_index(self, event_id: EventID) -> int: @@ -54,7 +71,6 @@ def __getitem__(self, idx: SupportsIndex | slice) -> Event | list[Event]: if isinstance(idx, slice): start, stop, step = idx.indices(self._length) return [self._get_single_item(i) for i in range(start, stop, step)] - # idx is int-like (SupportsIndex) return self._get_single_item(idx) def _get_single_item(self, idx: SupportsIndex) -> Event: @@ -75,26 +91,82 @@ def __iter__(self) -> Iterator[Event]: continue evt = Event.model_validate_json(txt) evt_id = evt.id - # only backfill mapping if missing if i not in self._idx_to_id: self._idx_to_id[i] = evt_id self._id_to_idx.setdefault(evt_id, i) yield evt def append(self, event: Event) -> None: + """Append an event with locking for thread/process safety. + + Raises: + TimeoutError: If the lock cannot be acquired within LOCK_TIMEOUT_SECONDS. + ValueError: If an event with the same ID already exists. + """ evt_id = event.id - # Check for duplicate ID - if evt_id in self._id_to_idx: - existing_idx = self._id_to_idx[evt_id] - raise ValueError( - f"Event with ID '{evt_id}' already exists at index {existing_idx}" + + try: + with self._fs.lock(self._lock_path, timeout=LOCK_TIMEOUT_SECONDS): + # Sync with disk in case another process wrote while we waited + disk_length = self._count_events_on_disk() + if disk_length > self._length: + self._sync_from_disk(disk_length) + + if evt_id in self._id_to_idx: + existing_idx = self._id_to_idx[evt_id] + raise ValueError( + f"Event with ID '{evt_id}' already exists at index " + f"{existing_idx}" + ) + + target_path = self._path(self._length, event_id=evt_id) + self._fs.write(target_path, event.model_dump_json(exclude_none=True)) + self._idx_to_id[self._length] = evt_id + self._id_to_idx[evt_id] = self._length + self._length += 1 + except TimeoutError: + logger.error( + f"Failed to acquire EventLog lock within {LOCK_TIMEOUT_SECONDS}s " + f"for event {evt_id}" ) + raise - path = self._path(self._length, event_id=evt_id) - self._fs.write(path, event.model_dump_json(exclude_none=True)) - self._idx_to_id[self._length] = evt_id - self._id_to_idx[evt_id] = self._length - self._length += 1 + def _count_events_on_disk(self) -> int: + """Count event files on disk.""" + try: + paths = self._fs.list(self._dir) + except FileNotFoundError: + # Directory doesn't exist yet - expected for new event logs + return 0 + except Exception as e: + logger.warning("Error listing event directory %s: %s", self._dir, e) + return 0 + return sum( + 1 + for p in paths + if p.rsplit("/", 1)[-1].startswith("event-") and p.endswith(".json") + ) + + def _sync_from_disk(self, disk_length: int) -> None: + """Sync state for events written by other processes. + + Preserves existing index mappings and only scans new events. + """ + # Preserve existing mappings + existing_idx_to_id = dict(self._idx_to_id) + + # Re-scan to pick up new events + scanned_length = self._scan_and_build_index() + + # Restore any mappings that were lost (e.g., for non-UUID event IDs) + for idx, evt_id in existing_idx_to_id.items(): + if idx not in self._idx_to_id: + self._idx_to_id[idx] = evt_id + if evt_id not in self._id_to_idx: + self._id_to_idx[evt_id] = idx + + # Use the higher of scanned length or disk_length + self._length = max(scanned_length, disk_length) def __len__(self) -> int: return self._length diff --git a/openhands-sdk/openhands/sdk/io/base.py b/openhands-sdk/openhands/sdk/io/base.py index 2f93b40b4f..ce0d50afee 100644 --- a/openhands-sdk/openhands/sdk/io/base.py +++ b/openhands-sdk/openhands/sdk/io/base.py @@ -1,4 +1,6 @@ from abc import ABC, abstractmethod +from collections.abc import Iterator +from contextlib import contextmanager class FileStore(ABC): @@ -6,6 +8,9 @@ class FileStore(ABC): This class defines the interface for file storage backends that can handle basic file operations like reading, writing, listing, and deleting files. + + Implementations should provide a locking mechanism via the `lock()` context + manager for thread/process-safe operations. """ @abstractmethod @@ -46,3 +51,50 @@ def delete(self, path: str) -> None: Args: path: The file or directory path to delete. """ + + @abstractmethod + def exists(self, path: str) -> bool: + """Check if a file or directory exists at the specified path. + + Args: + path: The file or directory path to check. + + Returns: + True if the path exists, False otherwise. + """ + + @abstractmethod + def get_absolute_path(self, path: str) -> str: + """Get the absolute filesystem path for a given relative path. + + Args: + path: The relative path within the file store. + + Returns: + The absolute path on the filesystem. + """ + + @abstractmethod + @contextmanager + def lock(self, path: str, timeout: float = 30.0) -> Iterator[None]: + """Acquire an exclusive lock for the given path. + + This context manager provides thread and process-safe locking. + Implementations may use file-based locking, threading locks, or + other mechanisms as appropriate. + + Args: + path: The path to lock (used to identify the lock). + timeout: Maximum seconds to wait for lock acquisition. + + Yields: + None when lock is acquired. + + Raises: + TimeoutError: If lock cannot be acquired within timeout. + + Note: + File-based locking (flock) does NOT work reliably on NFS mounts + or network filesystems. + """ + yield # pragma: no cover diff --git a/openhands-sdk/openhands/sdk/io/local.py b/openhands-sdk/openhands/sdk/io/local.py index 86dfd05ff6..3a656d43ec 100644 --- a/openhands-sdk/openhands/sdk/io/local.py +++ b/openhands-sdk/openhands/sdk/io/local.py @@ -1,5 +1,9 @@ import os import shutil +from collections.abc import Iterator +from contextlib import contextmanager + +from filelock import FileLock, Timeout from openhands.sdk.io.cache import MemoryLRUCache from openhands.sdk.logger import get_logger @@ -117,3 +121,24 @@ def delete(self, path: str) -> None: except Exception as e: logger.error(f"Error clearing local file store: {str(e)}") + + def exists(self, path: str) -> bool: + """Check if a file or directory exists.""" + return os.path.exists(self.get_full_path(path)) + + def get_absolute_path(self, path: str) -> str: + """Get absolute filesystem path.""" + return self.get_full_path(path) + + @contextmanager + def lock(self, path: str, timeout: float = 30.0) -> Iterator[None]: + """Acquire file-based lock using flock.""" + lock_path = self.get_full_path(path) + os.makedirs(os.path.dirname(lock_path), exist_ok=True) + file_lock = FileLock(lock_path) + try: + with file_lock.acquire(timeout=timeout): + yield + except Timeout: + logger.error(f"Failed to acquire lock within {timeout}s: {lock_path}") + raise TimeoutError(f"Lock acquisition timed out: {path}") diff --git a/openhands-sdk/openhands/sdk/io/memory.py b/openhands-sdk/openhands/sdk/io/memory.py index a03d0f1015..ed7c71cc0c 100644 --- a/openhands-sdk/openhands/sdk/io/memory.py +++ b/openhands-sdk/openhands/sdk/io/memory.py @@ -1,4 +1,8 @@ import os +import threading +import uuid +from collections.abc import Iterator +from contextlib import contextmanager from openhands.sdk.io.base import FileStore from openhands.sdk.logger import get_logger @@ -9,9 +13,13 @@ class InMemoryFileStore(FileStore): files: dict[str, str] + _instance_id: str + _lock: threading.Lock def __init__(self, files: dict[str, str] | None = None) -> None: self.files = {} + self._instance_id = uuid.uuid4().hex + self._lock = threading.Lock() if files is not None: self.files = files @@ -51,4 +59,29 @@ def delete(self, path: str) -> None: del self.files[key] logger.debug(f"Cleared in-memory file store: {path}") except Exception as e: - logger.error(f"Error clearing in-memory file store: {str(e)}") + logger.error(f"Error clearing in-memory file store: {e}") + + def exists(self, path: str) -> bool: + """Check if a file exists.""" + if path in self.files: + return True + return any(f.startswith(path + "/") for f in self.files) + + def get_absolute_path(self, path: str) -> str: + """Get absolute path (uses temp dir with unique instance ID).""" + import tempfile + + return os.path.join( + tempfile.gettempdir(), f"openhands_inmemory_{self._instance_id}", path + ) + + @contextmanager + def lock(self, path: str, timeout: float = 30.0) -> Iterator[None]: + """Acquire thread lock for in-memory store.""" + acquired = self._lock.acquire(timeout=timeout) + if not acquired: + raise TimeoutError(f"Lock acquisition timed out: {path}") + try: + yield + finally: + self._lock.release() diff --git a/openhands-sdk/pyproject.toml b/openhands-sdk/pyproject.toml index aefabb02cd..2bffd402d1 100644 --- a/openhands-sdk/pyproject.toml +++ b/openhands-sdk/pyproject.toml @@ -7,6 +7,7 @@ requires-python = ">=3.12" dependencies = [ "deprecation>=2.1.0", "fastmcp>=2.11.3", + "filelock>=3.20.1", "httpx>=0.27.0", "litellm>=1.80.10", "pydantic>=2.12.5", @@ -14,7 +15,7 @@ dependencies = [ "python-json-logger>=3.3.0", "tenacity>=9.1.2", "websockets>=12", - "lmnr>=0.7.24" + "lmnr>=0.7.24", ] [project.optional-dependencies] diff --git a/tests/sdk/conversation/test_event_store.py b/tests/sdk/conversation/test_event_store.py index 2f37b6d994..f0d988262b 100644 --- a/tests/sdk/conversation/test_event_store.py +++ b/tests/sdk/conversation/test_event_store.py @@ -243,13 +243,14 @@ def test_event_log_index_gaps_detection(): def test_event_log_file_store_exceptions(): """Test handling of file store exceptions.""" - # Mock file store that raises exceptions + import tempfile + mock_fs = Mock() mock_fs.list.side_effect = Exception("File system error") - - # Should handle gracefully - log = EventLog(mock_fs) - assert len(log) == 0 + with tempfile.TemporaryDirectory() as temp_dir: + mock_fs.get_absolute_path.return_value = f"{temp_dir}/.eventlog.lock" + log = EventLog(mock_fs) + assert len(log) == 0 def test_event_log_iteration_with_missing_files(): @@ -343,3 +344,61 @@ def test_event_log_large_index_formatting(): assert log.get_index("large-index-event") == 99999 assert log.get_id(99999) == "large-index-event" + + +def test_event_log_concurrent_append_thread_safety(): + """Test concurrent appends from multiple threads.""" + import tempfile + import threading + + from openhands.sdk.io.local import LocalFileStore + + with tempfile.TemporaryDirectory() as temp_dir: + fs = LocalFileStore(temp_dir) + log = EventLog(fs) + errors: list[Exception] = [] + lock = threading.Lock() + + def append_events(thread_id: int, num_events: int): + for i in range(num_events): + try: + event = create_test_event( + f"t{thread_id}-e{i}", f"Thread {thread_id}" + ) + log.append(event) + except Exception as e: + with lock: + errors.append(e) + + threads = [] + for t_id in range(5): + t = threading.Thread(target=append_events, args=(t_id, 10)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + assert len(errors) == 0, f"Errors: {errors}" + assert len(log) == 50 + + +def test_event_log_concurrent_writes_serialized(): + """Test two EventLog instances serialize writes correctly.""" + import tempfile + + from openhands.sdk.io.local import LocalFileStore + + with tempfile.TemporaryDirectory() as temp_dir: + fs = LocalFileStore(temp_dir) + log1 = EventLog(fs) + log2 = EventLog(fs) + + log1.append(create_test_event("event-1", "First")) + log2.append(create_test_event("event-2", "Second")) + + assert log1._length == 1 + assert log2._length == 2 + + files = [f for f in fs.list("events") if not f.endswith(".lock")] + assert len(files) == 2 diff --git a/uv.lock b/uv.lock index 2e156324d8..7feb970218 100644 --- a/uv.lock +++ b/uv.lock @@ -2149,6 +2149,7 @@ source = { editable = "openhands-sdk" } dependencies = [ { name = "deprecation" }, { name = "fastmcp" }, + { name = "filelock" }, { name = "httpx" }, { name = "litellm" }, { name = "lmnr" }, @@ -2169,6 +2170,7 @@ requires-dist = [ { name = "boto3", marker = "extra == 'boto3'", specifier = ">=1.35.0" }, { name = "deprecation", specifier = ">=2.1.0" }, { name = "fastmcp", specifier = ">=2.11.3" }, + { name = "filelock", specifier = ">=3.15.0" }, { name = "httpx", specifier = ">=0.27.0" }, { name = "litellm", specifier = ">=1.80.10" }, { name = "lmnr", specifier = ">=0.7.24" },