From 28af06eac456e0a8fb8dc09b06f1075e9428f4f4 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Thu, 21 Aug 2025 11:10:42 +0200 Subject: [PATCH 1/5] Filesystem based RQ should persist to FileSystemStorage --- src/crawlee/_utils/recoverable_state.py | 7 ++++++- .../_file_system/_request_queue_client.py | 3 +++ src/crawlee/storages/_key_value_store.py | 4 +--- .../storage_clients/_file_system/test_fs_rq_client.py | 11 +++++++++-- 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/crawlee/_utils/recoverable_state.py b/src/crawlee/_utils/recoverable_state.py index fc115eb403..1704a60d9d 100644 --- a/src/crawlee/_utils/recoverable_state.py +++ b/src/crawlee/_utils/recoverable_state.py @@ -9,6 +9,7 @@ if TYPE_CHECKING: import logging + from crawlee.storage_clients import StorageClient from crawlee.storages._key_value_store import KeyValueStore TStateModel = TypeVar('TStateModel', bound=BaseModel) @@ -38,6 +39,7 @@ def __init__( persist_state_kvs_name: str | None = None, persist_state_kvs_id: str | None = None, logger: logging.Logger, + storage_client: None | StorageClient = None, ) -> None: """Initialize a new recoverable state object. @@ -52,6 +54,8 @@ def __init__( persist_state_kvs_id: The identifier of the KeyValueStore to use for persistence. If neither a name nor and id are supplied, the default store will be used. logger: A logger instance for logging operations related to state persistence + storage_client: Storage client to use for persistence. If not provided, the service locator is used to + provide suitable storage client. """ self._default_state = default_state self._state_type: type[TStateModel] = self._default_state.__class__ @@ -61,6 +65,7 @@ def __init__( self._persist_state_kvs_name = persist_state_kvs_name self._persist_state_kvs_id = persist_state_kvs_id self._key_value_store: 'KeyValueStore | None' = None # noqa: UP037 + self._storage_client = storage_client self._log = logger async def initialize(self) -> TStateModel: @@ -80,7 +85,7 @@ async def initialize(self) -> TStateModel: from crawlee.storages._key_value_store import KeyValueStore # noqa: PLC0415 self._key_value_store = await KeyValueStore.open( - name=self._persist_state_kvs_name, id=self._persist_state_kvs_id + name=self._persist_state_kvs_name, id=self._persist_state_kvs_id, storage_client=self._storage_client ) await self._load_saved_state() diff --git a/src/crawlee/storage_clients/_file_system/_request_queue_client.py b/src/crawlee/storage_clients/_file_system/_request_queue_client.py index f5e0165d68..302964ddfd 100644 --- a/src/crawlee/storage_clients/_file_system/_request_queue_client.py +++ b/src/crawlee/storage_clients/_file_system/_request_queue_client.py @@ -112,12 +112,15 @@ def __init__( self._is_empty_cache: bool | None = None """Cache for is_empty result: None means unknown, True/False is cached state.""" + from crawlee.storage_clients import FileSystemStorageClient # noqa: PLC0415 , avoid circular imports + self._state = RecoverableState[RequestQueueState]( default_state=RequestQueueState(), persist_state_key='request_queue_state', persistence_enabled=True, persist_state_kvs_name=f'__RQ_STATE_{self._metadata.id}', logger=logger, + storage_client=FileSystemStorageClient(), # It makes no sense to persist to different client. ) """Recoverable state to maintain request ordering, in-progress status, and handled status.""" diff --git a/src/crawlee/storages/_key_value_store.py b/src/crawlee/storages/_key_value_store.py index 5297925a37..03aa515539 100644 --- a/src/crawlee/storages/_key_value_store.py +++ b/src/crawlee/storages/_key_value_store.py @@ -12,7 +12,6 @@ from crawlee._types import JsonSerializable # noqa: TC001 from crawlee._utils.docs import docs_group from crawlee._utils.recoverable_state import RecoverableState -from crawlee.storage_clients.models import KeyValueStoreMetadata from ._base import Storage @@ -23,8 +22,7 @@ from crawlee.storage_clients import StorageClient from crawlee.storage_clients._base import KeyValueStoreClient from crawlee.storage_clients.models import KeyValueStoreMetadata, KeyValueStoreRecordMetadata -else: - from crawlee._utils.recoverable_state import RecoverableState + T = TypeVar('T') diff --git a/tests/unit/storage_clients/_file_system/test_fs_rq_client.py b/tests/unit/storage_clients/_file_system/test_fs_rq_client.py index 0be182fcd8..9687f2df5e 100644 --- a/tests/unit/storage_clients/_file_system/test_fs_rq_client.py +++ b/tests/unit/storage_clients/_file_system/test_fs_rq_client.py @@ -7,8 +7,9 @@ import pytest from crawlee import Request +from crawlee._service_locator import service_locator from crawlee.configuration import Configuration -from crawlee.storage_clients import FileSystemStorageClient +from crawlee.storage_clients import FileSystemStorageClient, MemoryStorageClient if TYPE_CHECKING: from collections.abc import AsyncGenerator @@ -55,7 +56,10 @@ async def test_file_and_directory_creation(configuration: Configuration) -> None await client.drop() -async def test_request_file_persistence(rq_client: FileSystemRequestQueueClient) -> None: +@pytest.mark.parametrize('set_different_storage_client_in_service_locator', [True, False]) +async def test_request_file_persistence( + rq_client: FileSystemRequestQueueClient, *, set_different_storage_client_in_service_locator: bool +) -> None: """Test that requests are properly persisted to files.""" requests = [ Request.from_url('https://example.com/1'), @@ -63,6 +67,9 @@ async def test_request_file_persistence(rq_client: FileSystemRequestQueueClient) Request.from_url('https://example.com/3'), ] + if set_different_storage_client_in_service_locator: + service_locator.set_storage_client(MemoryStorageClient()) + await rq_client.add_batch_of_requests(requests) # Verify request files are created From d3dcf6e9cc067d62e09980ccdc0d7d1789dcf788 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Thu, 21 Aug 2025 15:16:06 +0200 Subject: [PATCH 2/5] WIP --- src/crawlee/_utils/recoverable_state.py | 17 +++-- .../_file_system/_request_queue_client.py | 72 ++++++++++--------- src/crawlee/storages/_key_value_store.py | 2 +- .../storages/_storage_instance_manager.py | 53 ++++++++------ tests/unit/storages/test_key_value_store.py | 27 +++++++ .../storages/test_storage_instance_manager.py | 59 +++++++++++++++ 6 files changed, 166 insertions(+), 64 deletions(-) create mode 100644 tests/unit/storages/test_storage_instance_manager.py diff --git a/src/crawlee/_utils/recoverable_state.py b/src/crawlee/_utils/recoverable_state.py index 1704a60d9d..2090ca6118 100644 --- a/src/crawlee/_utils/recoverable_state.py +++ b/src/crawlee/_utils/recoverable_state.py @@ -9,7 +9,6 @@ if TYPE_CHECKING: import logging - from crawlee.storage_clients import StorageClient from crawlee.storages._key_value_store import KeyValueStore TStateModel = TypeVar('TStateModel', bound=BaseModel) @@ -39,7 +38,7 @@ def __init__( persist_state_kvs_name: str | None = None, persist_state_kvs_id: str | None = None, logger: logging.Logger, - storage_client: None | StorageClient = None, + key_value_store: None | KeyValueStore = None, ) -> None: """Initialize a new recoverable state object. @@ -54,8 +53,8 @@ def __init__( persist_state_kvs_id: The identifier of the KeyValueStore to use for persistence. If neither a name nor and id are supplied, the default store will be used. logger: A logger instance for logging operations related to state persistence - storage_client: Storage client to use for persistence. If not provided, the service locator is used to - provide suitable storage client. + key_value_store: KeyValueStore to use for persistence. If not provided, the service locator is used to + provide suitable KeyValueStore. """ self._default_state = default_state self._state_type: type[TStateModel] = self._default_state.__class__ @@ -64,8 +63,7 @@ def __init__( self._persist_state_key = persist_state_key self._persist_state_kvs_name = persist_state_kvs_name self._persist_state_kvs_id = persist_state_kvs_id - self._key_value_store: 'KeyValueStore | None' = None # noqa: UP037 - self._storage_client = storage_client + self._key_value_store = key_value_store self._log = logger async def initialize(self) -> TStateModel: @@ -84,9 +82,10 @@ async def initialize(self) -> TStateModel: # Import here to avoid circular imports. from crawlee.storages._key_value_store import KeyValueStore # noqa: PLC0415 - self._key_value_store = await KeyValueStore.open( - name=self._persist_state_kvs_name, id=self._persist_state_kvs_id, storage_client=self._storage_client - ) + if not self._key_value_store: + self._key_value_store = await KeyValueStore.open( + name=self._persist_state_kvs_name, id=self._persist_state_kvs_id + ) await self._load_saved_state() diff --git a/src/crawlee/storage_clients/_file_system/_request_queue_client.py b/src/crawlee/storage_clients/_file_system/_request_queue_client.py index 302964ddfd..2bed1188e5 100644 --- a/src/crawlee/storage_clients/_file_system/_request_queue_client.py +++ b/src/crawlee/storage_clients/_file_system/_request_queue_client.py @@ -90,6 +90,7 @@ def __init__( metadata: RequestQueueMetadata, storage_dir: Path, lock: asyncio.Lock, + recoverable_state: RecoverableState[RequestQueueState], ) -> None: """Initialize a new instance. @@ -112,16 +113,7 @@ def __init__( self._is_empty_cache: bool | None = None """Cache for is_empty result: None means unknown, True/False is cached state.""" - from crawlee.storage_clients import FileSystemStorageClient # noqa: PLC0415 , avoid circular imports - - self._state = RecoverableState[RequestQueueState]( - default_state=RequestQueueState(), - persist_state_key='request_queue_state', - persistence_enabled=True, - persist_state_kvs_name=f'__RQ_STATE_{self._metadata.id}', - logger=logger, - storage_client=FileSystemStorageClient(), # It makes no sense to persist to different client. - ) + self._state = recoverable_state """Recoverable state to maintain request ordering, in-progress status, and handled status.""" @override @@ -190,14 +182,9 @@ async def open( metadata = RequestQueueMetadata(**file_content) if metadata.id == id: - client = cls( - metadata=metadata, - storage_dir=storage_dir, - lock=asyncio.Lock(), + client = await cls._create_client( + metadata=metadata, storage_dir=storage_dir, update_accessed_at=True ) - await client._state.initialize() - await client._discover_existing_requests() - await client._update_metadata(update_accessed_at=True) found = True break finally: @@ -227,15 +214,7 @@ async def open( metadata.name = name - client = cls( - metadata=metadata, - storage_dir=storage_dir, - lock=asyncio.Lock(), - ) - - await client._state.initialize() - await client._discover_existing_requests() - await client._update_metadata(update_accessed_at=True) + client = await cls._create_client(metadata=metadata, storage_dir=storage_dir, update_accessed_at=True) # Otherwise, create a new dataset client. else: @@ -251,13 +230,40 @@ async def open( pending_request_count=0, total_request_count=0, ) - client = cls( - metadata=metadata, - storage_dir=storage_dir, - lock=asyncio.Lock(), - ) - await client._state.initialize() - await client._update_metadata() + client = await cls._create_client(metadata=metadata, storage_dir=storage_dir) + + return client + + @classmethod + async def _create_client( + cls, metadata: RequestQueueMetadata, storage_dir: Path, *, update_accessed_at: bool = False + ) -> FileSystemRequestQueueClient: + """Create client from metadata and storage directory.""" + from crawlee.storage_clients import FileSystemStorageClient # noqa: PLC0415 avoid circular imports + from crawlee.storages._key_value_store import KeyValueStore # noqa: PLC0415 avoid circular imports + + # Prepare kvs for recoverable state + kvs_client = await FileSystemStorageClient().create_kvs_client(name=f'__RQ_STATE_{metadata.id}') + kvs_client_metadata = await kvs_client.get_metadata() + kvs = KeyValueStore(client=kvs_client, id=kvs_client_metadata.id, name=kvs_client_metadata.name) + + # Create state + recoverable_state = RecoverableState[RequestQueueState]( + default_state=RequestQueueState(), + persist_state_key='request_queue_state', + persistence_enabled=True, + logger=logger, + key_value_store=kvs, + ) + + # Create client + client = cls( + metadata=metadata, storage_dir=storage_dir, lock=asyncio.Lock(), recoverable_state=recoverable_state + ) + + await client._state.initialize() + await client._discover_existing_requests() + await client._update_metadata(update_accessed_at=update_accessed_at) return client diff --git a/src/crawlee/storages/_key_value_store.py b/src/crawlee/storages/_key_value_store.py index 03aa515539..66e5bc666b 100644 --- a/src/crawlee/storages/_key_value_store.py +++ b/src/crawlee/storages/_key_value_store.py @@ -272,9 +272,9 @@ async def get_auto_saved_value( cache[key] = recoverable_state = RecoverableState( default_state=AutosavedValue(default_value), persistence_enabled=True, - persist_state_kvs_id=self.id, persist_state_key=key, logger=logger, + key_value_store=self, # Use self for RecoverableState. ) await recoverable_state.initialize() diff --git a/src/crawlee/storages/_storage_instance_manager.py b/src/crawlee/storages/_storage_instance_manager.py index 130a2eec63..d249b4ebb8 100644 --- a/src/crawlee/storages/_storage_instance_manager.py +++ b/src/crawlee/storages/_storage_instance_manager.py @@ -1,15 +1,16 @@ from __future__ import annotations +from collections import defaultdict from collections.abc import Awaitable, Callable -from typing import TYPE_CHECKING, TypeVar, cast +from typing import TYPE_CHECKING, TypeVar, cast, Generic from crawlee.storage_clients._base import DatasetClient, KeyValueStoreClient, RequestQueueClient -from ._base import Storage - if TYPE_CHECKING: from crawlee.configuration import Configuration + from ._base import Storage + T = TypeVar('T', bound='Storage') StorageClientType = DatasetClient | KeyValueStoreClient | RequestQueueClient @@ -19,6 +20,22 @@ """Type alias for the client opener function.""" +class StorageClientCache(Generic[T]): + """Cache for specific storage client.""" + + by_id: dict[type[Storage], dict[str, T]] + """Cache for storage instances by ID, separated by storage type.""" + by_name: dict[type[Storage], dict[str, T]] + """Cache for storage instances by name, separated by storage type.""" + default_instances: dict[type[Storage], T] + """Cache for default instances of each storage type.""" + + def __init__(self) -> None: + self.by_id = defaultdict(lambda: defaultdict(dict)) + self.by_name = defaultdict(lambda: defaultdict(dict)) + self.default_instances = defaultdict(dict) + + class StorageInstanceManager: """Manager for caching and managing storage instances. @@ -27,14 +44,7 @@ class StorageInstanceManager: """ def __init__(self) -> None: - self._cache_by_id = dict[type[Storage], dict[str, Storage]]() - """Cache for storage instances by ID, separated by storage type.""" - - self._cache_by_name = dict[type[Storage], dict[str, Storage]]() - """Cache for storage instances by name, separated by storage type.""" - - self._default_instances = dict[type[Storage], Storage]() - """Cache for default instances of each storage type.""" + self._cache_by_storage_client: dict[str, StorageClientCache] = defaultdict(StorageClientCache) async def open_storage_instance( self, @@ -64,19 +74,23 @@ async def open_storage_instance( raise ValueError('Only one of "id" or "name" can be specified, not both.') # Check for default instance - if id is None and name is None and cls in self._default_instances: - return cast('T', self._default_instances[cls]) + if ( + id is None + and name is None + and cls in self._cache_by_storage_client[client_opener.__qualname__].default_instances + ): + return cast('T', self._cache_by_storage_client[client_opener.__qualname__].default_instances[cls]) # Check cache if id is not None: - type_cache_by_id = self._cache_by_id.get(cls, {}) + type_cache_by_id = self._cache_by_storage_client[client_opener.__qualname__].by_id if id in type_cache_by_id: cached_instance = type_cache_by_id[id] if isinstance(cached_instance, cls): return cached_instance if name is not None: - type_cache_by_name = self._cache_by_name.get(cls, {}) + type_cache_by_name = self._cache_by_storage_client[client_opener.__qualname__].by_name if name in type_cache_by_name: cached_instance = type_cache_by_name[name] if isinstance(cached_instance, cls): @@ -90,16 +104,13 @@ async def open_storage_instance( instance_name = getattr(instance, 'name', None) # Cache the instance - type_cache_by_id = self._cache_by_id.setdefault(cls, {}) - type_cache_by_name = self._cache_by_name.setdefault(cls, {}) - - type_cache_by_id[instance.id] = instance + self._cache_by_storage_client[client_opener.__qualname__].by_id[cls][instance.id] = instance if instance_name is not None: - type_cache_by_name[instance_name] = instance + self._cache_by_storage_client[client_opener.__qualname__].by_name[cls][instance_name] = instance # Set as default if no id/name specified if id is None and name is None: - self._default_instances[cls] = instance + self._cache_by_storage_client[client_opener.__qualname__].default_instances[cls] = instance return instance diff --git a/tests/unit/storages/test_key_value_store.py b/tests/unit/storages/test_key_value_store.py index 25bbcb4fc0..a0d0e9b308 100644 --- a/tests/unit/storages/test_key_value_store.py +++ b/tests/unit/storages/test_key_value_store.py @@ -598,3 +598,30 @@ async def test_record_exists_after_purge(kvs: KeyValueStore) -> None: # Should no longer exist assert await kvs.record_exists('key1') is False assert await kvs.record_exists('key2') is False + + +async def test_get_auto_saved_value_with_multiple_storage_clients(tmp_path: Path) -> None: + """Test that setting storage client through service locator does not break autosaved values in other clients.""" + config = Configuration( + crawlee_storage_dir=str(tmp_path), # type: ignore[call-arg] + purge_on_start=True, + ) + + kvs1 = await KeyValueStore.open(storage_client=MemoryStorageClient(), configuration=config) + + kvs2 = await KeyValueStore.open( + storage_client=FileSystemStorageClient(), + configuration=config, + ) + assert kvs1 is not kvs2 + + expected_values = {'key': 'value'} + test_key = 'test_key' + + autosaved_value = await kvs2.get_auto_saved_value(test_key) + assert autosaved_value == {} + autosaved_value.update(expected_values) + + await kvs2.persist_autosaved_values() + + assert await kvs2.get_value(test_key) == expected_values diff --git a/tests/unit/storages/test_storage_instance_manager.py b/tests/unit/storages/test_storage_instance_manager.py new file mode 100644 index 0000000000..5dc82016da --- /dev/null +++ b/tests/unit/storages/test_storage_instance_manager.py @@ -0,0 +1,59 @@ +from pathlib import Path + +from crawlee.configuration import Configuration +from crawlee.storage_clients import FileSystemStorageClient, MemoryStorageClient +from crawlee.storages import Dataset, KeyValueStore + + +async def test_unique_storage_by_storage_client(tmp_path: Path) -> None: + config = Configuration( + crawlee_storage_dir=str(tmp_path), # type: ignore[call-arg] + purge_on_start=True, + ) + + kvs1 = await KeyValueStore.open(storage_client=MemoryStorageClient(), configuration=config) + + kvs2 = await KeyValueStore.open( + storage_client=FileSystemStorageClient(), + configuration=config, + ) + assert kvs1 is not kvs2 + + +async def test_unique_storage_by_storage_type(tmp_path: Path) -> None: + config = Configuration( + crawlee_storage_dir=str(tmp_path), # type: ignore[call-arg] + purge_on_start=True, + ) + + kvs = await KeyValueStore.open(configuration=config) + dataset = await Dataset.open(configuration=config) + assert kvs is not dataset + + +async def test_unique_storage_by_name(tmp_path: Path) -> None: + """Test that StorageInstanceManager support different storage clients at the same time.""" + config = Configuration( + crawlee_storage_dir=str(tmp_path), # type: ignore[call-arg] + purge_on_start=True, + ) + + kvs1 = await KeyValueStore.open(configuration=config, name='kvs1') + kvs2 = await KeyValueStore.open(storage_client=FileSystemStorageClient(), configuration=config, name='kvs2') + assert kvs1 is not kvs2 + + +async def test_identical_storage(tmp_path: Path) -> None: + """Test that StorageInstanceManager correctly caches storage based on the storage client.""" + config = Configuration( + crawlee_storage_dir=str(tmp_path), # type: ignore[call-arg] + purge_on_start=True, + ) + + kvs1 = await KeyValueStore.open(storage_client=MemoryStorageClient(), configuration=config) + + kvs2 = await KeyValueStore.open( + storage_client=MemoryStorageClient(), + configuration=config, + ) + assert kvs1 is kvs2 From cb042b7d51faa189ecd2f6847620412a3e350964 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Thu, 21 Aug 2025 15:50:43 +0200 Subject: [PATCH 3/5] Working draft --- .../storages/_storage_instance_manager.py | 55 +++++++++---------- 1 file changed, 27 insertions(+), 28 deletions(-) diff --git a/src/crawlee/storages/_storage_instance_manager.py b/src/crawlee/storages/_storage_instance_manager.py index d249b4ebb8..2e9d57429d 100644 --- a/src/crawlee/storages/_storage_instance_manager.py +++ b/src/crawlee/storages/_storage_instance_manager.py @@ -2,7 +2,7 @@ from collections import defaultdict from collections.abc import Awaitable, Callable -from typing import TYPE_CHECKING, TypeVar, cast, Generic +from typing import TYPE_CHECKING, TypeVar, cast from crawlee.storage_clients._base import DatasetClient, KeyValueStoreClient, RequestQueueClient @@ -20,20 +20,16 @@ """Type alias for the client opener function.""" -class StorageClientCache(Generic[T]): +class StorageClientCache: """Cache for specific storage client.""" - by_id: dict[type[Storage], dict[str, T]] - """Cache for storage instances by ID, separated by storage type.""" - by_name: dict[type[Storage], dict[str, T]] - """Cache for storage instances by name, separated by storage type.""" - default_instances: dict[type[Storage], T] - """Cache for default instances of each storage type.""" - def __init__(self) -> None: - self.by_id = defaultdict(lambda: defaultdict(dict)) - self.by_name = defaultdict(lambda: defaultdict(dict)) - self.default_instances = defaultdict(dict) + self.by_id: defaultdict[type[Storage], defaultdict[str, Storage]] = defaultdict(lambda: defaultdict()) + """Cache for storage instances by ID, separated by storage type.""" + self.by_name: defaultdict[type[Storage], defaultdict[str, Storage]] = defaultdict(lambda: defaultdict()) + """Cache for storage instances by name, separated by storage type.""" + self.default_instances: defaultdict[type[Storage], Storage] = defaultdict() + """Cache for default instances of each storage type.""" class StorageInstanceManager: @@ -83,14 +79,14 @@ async def open_storage_instance( # Check cache if id is not None: - type_cache_by_id = self._cache_by_storage_client[client_opener.__qualname__].by_id + type_cache_by_id = self._cache_by_storage_client[client_opener.__qualname__].by_id[cls] if id in type_cache_by_id: cached_instance = type_cache_by_id[id] if isinstance(cached_instance, cls): return cached_instance if name is not None: - type_cache_by_name = self._cache_by_storage_client[client_opener.__qualname__].by_name + type_cache_by_name = self._cache_by_storage_client[client_opener.__qualname__].by_name[cls] if name in type_cache_by_name: cached_instance = type_cache_by_name[name] if isinstance(cached_instance, cls): @@ -123,22 +119,25 @@ def remove_from_cache(self, storage_instance: Storage) -> None: storage_type = type(storage_instance) # Remove from ID cache - type_cache_by_id = self._cache_by_id.get(storage_type, {}) - if storage_instance.id in type_cache_by_id: - del type_cache_by_id[storage_instance.id] - - # Remove from name cache - if storage_instance.name is not None: - type_cache_by_name = self._cache_by_name.get(storage_type, {}) - if storage_instance.name in type_cache_by_name: + for client_cache in self._cache_by_storage_client.values(): + type_cache_by_id = client_cache.by_id[storage_type] + if storage_instance.id in type_cache_by_id: + del type_cache_by_id[storage_instance.id] + + # Remove from name cache + type_cache_by_name = client_cache.by_name[storage_type] + if storage_instance.name in type_cache_by_name and storage_instance.name: del type_cache_by_name[storage_instance.name] - # Remove from default instances - if storage_type in self._default_instances and self._default_instances[storage_type] is storage_instance: - del self._default_instances[storage_type] + # Remove from default instances + if ( + storage_type in client_cache.default_instances + and client_cache.default_instances[storage_type] is storage_instance + ): + del client_cache.default_instances[storage_type] def clear_cache(self) -> None: """Clear all cached storage instances.""" - self._cache_by_id.clear() - self._cache_by_name.clear() - self._default_instances.clear() + self.by_id: defaultdict[type[Storage], defaultdict[str, Storage]] = defaultdict(lambda: defaultdict()) + self.by_name: defaultdict[type[Storage], defaultdict[str, Storage]] = defaultdict(lambda: defaultdict()) + self.default_instances: defaultdict[type[Storage], Storage] = defaultdict() From df06e02373e3728b5ef1ad1b537817a9255e3cdb Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Thu, 21 Aug 2025 16:08:15 +0200 Subject: [PATCH 4/5] Add few more tests --- .../storages/_storage_instance_manager.py | 4 +- .../storages/test_storage_instance_manager.py | 37 +++++++++++++++++++ 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/src/crawlee/storages/_storage_instance_manager.py b/src/crawlee/storages/_storage_instance_manager.py index 2e9d57429d..40c5405f7b 100644 --- a/src/crawlee/storages/_storage_instance_manager.py +++ b/src/crawlee/storages/_storage_instance_manager.py @@ -138,6 +138,4 @@ def remove_from_cache(self, storage_instance: Storage) -> None: def clear_cache(self) -> None: """Clear all cached storage instances.""" - self.by_id: defaultdict[type[Storage], defaultdict[str, Storage]] = defaultdict(lambda: defaultdict()) - self.by_name: defaultdict[type[Storage], defaultdict[str, Storage]] = defaultdict(lambda: defaultdict()) - self.default_instances: defaultdict[type[Storage], Storage] = defaultdict() + self._cache_by_storage_client = defaultdict(StorageClientCache) diff --git a/tests/unit/storages/test_storage_instance_manager.py b/tests/unit/storages/test_storage_instance_manager.py index 5dc82016da..eaa168df34 100644 --- a/tests/unit/storages/test_storage_instance_manager.py +++ b/tests/unit/storages/test_storage_instance_manager.py @@ -1,5 +1,6 @@ from pathlib import Path +from crawlee import service_locator from crawlee.configuration import Configuration from crawlee.storage_clients import FileSystemStorageClient, MemoryStorageClient from crawlee.storages import Dataset, KeyValueStore @@ -57,3 +58,39 @@ async def test_identical_storage(tmp_path: Path) -> None: configuration=config, ) assert kvs1 is kvs2 + + +async def test_identical_storage_clear_cache(tmp_path: Path) -> None: + config = Configuration( + crawlee_storage_dir=str(tmp_path), # type: ignore[call-arg] + purge_on_start=True, + ) + + kvs1 = await KeyValueStore.open(storage_client=MemoryStorageClient(), configuration=config) + + # Clearing cache, so expect different instances + service_locator.storage_instance_manager.clear_cache() + + kvs2 = await KeyValueStore.open( + storage_client=MemoryStorageClient(), + configuration=config, + ) + assert kvs1 is not kvs2 + + +async def test_identical_storage_remove_from_cache(tmp_path: Path) -> None: + config = Configuration( + crawlee_storage_dir=str(tmp_path), # type: ignore[call-arg] + purge_on_start=True, + ) + + kvs1 = await KeyValueStore.open(storage_client=MemoryStorageClient(), configuration=config) + + # Removing from cache, so expect different instances + service_locator.storage_instance_manager.remove_from_cache(kvs1) + + kvs2 = await KeyValueStore.open( + storage_client=MemoryStorageClient(), + configuration=config, + ) + assert kvs1 is not kvs2 From 061c43741baa101a43b377d3b8aecee49f4f3d51 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Tue, 26 Aug 2025 08:29:10 +0200 Subject: [PATCH 5/5] Easy review comments --- src/crawlee/_utils/recoverable_state.py | 11 +++++--- .../storages/_storage_instance_manager.py | 25 +++++++++++-------- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/src/crawlee/_utils/recoverable_state.py b/src/crawlee/_utils/recoverable_state.py index 2090ca6118..f01b8e3456 100644 --- a/src/crawlee/_utils/recoverable_state.py +++ b/src/crawlee/_utils/recoverable_state.py @@ -53,9 +53,14 @@ def __init__( persist_state_kvs_id: The identifier of the KeyValueStore to use for persistence. If neither a name nor and id are supplied, the default store will be used. logger: A logger instance for logging operations related to state persistence - key_value_store: KeyValueStore to use for persistence. If not provided, the service locator is used to - provide suitable KeyValueStore. + key_value_store: KeyValueStore to use for persistence. If not provided, a system-wide KeyValueStore will be + used, based on service locator configuration. """ + if key_value_store and (persist_state_kvs_name or persist_state_kvs_id): + raise ValueError( + 'Cannot provide explicit key_value_store and persist_state_kvs_name or persist_state_kvs_id.' + ) + self._default_state = default_state self._state_type: type[TStateModel] = self._default_state.__class__ self._state: TStateModel | None = None @@ -63,8 +68,8 @@ def __init__( self._persist_state_key = persist_state_key self._persist_state_kvs_name = persist_state_kvs_name self._persist_state_kvs_id = persist_state_kvs_id - self._key_value_store = key_value_store self._log = logger + self._key_value_store = key_value_store async def initialize(self) -> TStateModel: """Initialize the recoverable state. diff --git a/src/crawlee/storages/_storage_instance_manager.py b/src/crawlee/storages/_storage_instance_manager.py index 40c5405f7b..a9860135a2 100644 --- a/src/crawlee/storages/_storage_instance_manager.py +++ b/src/crawlee/storages/_storage_instance_manager.py @@ -2,6 +2,7 @@ from collections import defaultdict from collections.abc import Awaitable, Callable +from dataclasses import dataclass, field from typing import TYPE_CHECKING, TypeVar, cast from crawlee.storage_clients._base import DatasetClient, KeyValueStoreClient, RequestQueueClient @@ -20,16 +21,20 @@ """Type alias for the client opener function.""" -class StorageClientCache: +@dataclass +class _StorageClientCache: """Cache for specific storage client.""" - def __init__(self) -> None: - self.by_id: defaultdict[type[Storage], defaultdict[str, Storage]] = defaultdict(lambda: defaultdict()) - """Cache for storage instances by ID, separated by storage type.""" - self.by_name: defaultdict[type[Storage], defaultdict[str, Storage]] = defaultdict(lambda: defaultdict()) - """Cache for storage instances by name, separated by storage type.""" - self.default_instances: defaultdict[type[Storage], Storage] = defaultdict() - """Cache for default instances of each storage type.""" + by_id: defaultdict[type[Storage], defaultdict[str, Storage]] = field( + default_factory=lambda: defaultdict(lambda: defaultdict()) + ) + """Cache for storage instances by ID, separated by storage type.""" + by_name: defaultdict[type[Storage], defaultdict[str, Storage]] = field( + default_factory=lambda: defaultdict(lambda: defaultdict()) + ) + """Cache for storage instances by name, separated by storage type.""" + default_instances: defaultdict[type[Storage], Storage] = field(default_factory=lambda: defaultdict()) + """Cache for default instances of each storage type.""" class StorageInstanceManager: @@ -40,7 +45,7 @@ class StorageInstanceManager: """ def __init__(self) -> None: - self._cache_by_storage_client: dict[str, StorageClientCache] = defaultdict(StorageClientCache) + self._cache_by_storage_client: dict[str, _StorageClientCache] = defaultdict(_StorageClientCache) async def open_storage_instance( self, @@ -138,4 +143,4 @@ def remove_from_cache(self, storage_instance: Storage) -> None: def clear_cache(self) -> None: """Clear all cached storage instances.""" - self._cache_by_storage_client = defaultdict(StorageClientCache) + self._cache_by_storage_client = defaultdict(_StorageClientCache)