diff --git a/docs/guides/code_examples/storage_clients/sql_storage_client_basic_example.py b/docs/guides/code_examples/storage_clients/sql_storage_client_basic_example.py new file mode 100644 index 0000000000..90c27f4039 --- /dev/null +++ b/docs/guides/code_examples/storage_clients/sql_storage_client_basic_example.py @@ -0,0 +1,12 @@ +from crawlee.crawlers import ParselCrawler +from crawlee.storage_clients import SqlStorageClient + + +async def main() -> None: + # Create a new instance of storage client. + # This will create an SQLite database file crawlee.db or created tables in your + # database if you pass `connection_string` or `engine` + # Use the context manager to ensure that connections are properly cleaned up. + async with SqlStorageClient() as storage_client: + # And pass it to the crawler. + crawler = ParselCrawler(storage_client=storage_client) diff --git a/docs/guides/code_examples/storage_clients/sql_storage_client_configuration_example.py b/docs/guides/code_examples/storage_clients/sql_storage_client_configuration_example.py new file mode 100644 index 0000000000..257f392683 --- /dev/null +++ b/docs/guides/code_examples/storage_clients/sql_storage_client_configuration_example.py @@ -0,0 +1,33 @@ +from sqlalchemy.ext.asyncio import create_async_engine + +from crawlee.configuration import Configuration +from crawlee.crawlers import ParselCrawler +from crawlee.storage_clients import SqlStorageClient + + +async def main() -> None: + # Create a new instance of storage client. + # On first run, also creates tables in your PostgreSQL database. + # Use the context manager to ensure that connections are properly cleaned up. + async with SqlStorageClient( + # Create an `engine` with the desired configuration + engine=create_async_engine( + 'postgresql+asyncpg://myuser:mypassword@localhost:5432/postgres', + future=True, + pool_size=5, + max_overflow=10, + pool_recycle=3600, + pool_pre_ping=True, + echo=False, + ) + ) as storage_client: + # Create a configuration with custom settings. + configuration = Configuration( + purge_on_start=False, + ) + + # And pass them to the crawler. + crawler = ParselCrawler( + storage_client=storage_client, + configuration=configuration, + ) diff --git a/docs/guides/storage_clients.mdx b/docs/guides/storage_clients.mdx index 0c2a14ffe9..57be530efe 100644 --- a/docs/guides/storage_clients.mdx +++ b/docs/guides/storage_clients.mdx @@ -8,12 +8,15 @@ import ApiLink from '@site/src/components/ApiLink'; import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; import RunnableCodeBlock from '@site/src/components/RunnableCodeBlock'; +import CodeBlock from '@theme/CodeBlock'; import MemoryStorageClientBasicExample from '!!raw-loader!roa-loader!./code_examples/storage_clients/memory_storage_client_basic_example.py'; import FileSystemStorageClientBasicExample from '!!raw-loader!roa-loader!./code_examples/storage_clients/file_system_storage_client_basic_example.py'; import FileSystemStorageClientConfigurationExample from '!!raw-loader!roa-loader!./code_examples/storage_clients/file_system_storage_client_configuration_example.py'; import CustomStorageClientExample from '!!raw-loader!roa-loader!./code_examples/storage_clients/custom_storage_client_example.py'; import RegisteringStorageClientsExample from '!!raw-loader!roa-loader!./code_examples/storage_clients/registering_storage_clients_example.py'; +import SQLStorageClientBasicExample from '!!raw-loader!roa-loader!./code_examples/storage_clients/sql_storage_client_basic_example.py'; +import SQLStorageClientConfigurationExample from '!!raw-loader!./code_examples/storage_clients/sql_storage_client_configuration_example.py'; Storage clients provide a unified interface for interacting with `Dataset`, `KeyValueStore`, and `RequestQueue`, regardless of the underlying implementation. They handle operations like creating, reading, updating, and deleting storage instances, as well as managing data persistence and cleanup. This abstraction makes it easy to switch between different environments, such as local development and cloud production setups. @@ -23,6 +26,7 @@ Crawlee provides three main storage client implementations: - `FileSystemStorageClient` - Provides persistent file system storage with in-memory caching. - `MemoryStorageClient` - Stores data in memory with no persistence. +- `SqlStorageClient` – Provides persistent storage using a SQL database ([SQLite](https://sqlite.org/) or [PostgreSQL](https://www.postgresql.org/)). Requires installing the extra dependency: 'crawlee[sql_sqlite]' for SQLite or 'crawlee[sql_postgres]' for PostgreSQL. - [`ApifyStorageClient`](https://docs.apify.com/sdk/python/reference/class/ApifyStorageClient) - Manages storage on the [Apify platform](https://apify.com), implemented in the [Apify SDK](https://github.com/apify/apify-sdk-python). ```mermaid @@ -50,6 +54,8 @@ class FileSystemStorageClient class MemoryStorageClient +class SqlStorageClient + class ApifyStorageClient %% ======================== @@ -58,6 +64,7 @@ class ApifyStorageClient StorageClient --|> FileSystemStorageClient StorageClient --|> MemoryStorageClient +StorageClient --|> SqlStorageClient StorageClient --|> ApifyStorageClient ``` @@ -125,6 +132,184 @@ The `MemoryStorageClient` does not persist data between runs. All data is lost w {MemoryStorageClientBasicExample} +### SQL storage client + +:::warning Experimental feature +The `SqlStorageClient` is experimental. Its API and behavior may change in future releases. +::: + +The `SqlStorageClient` provides persistent storage using a SQL database (SQLite by default, or PostgreSQL). It supports all Crawlee storage types and enables concurrent access from multiple independent clients or processes. + +:::note dependencies +The `SqlStorageClient` is not included in the core Crawlee package. +To use it, you need to install Crawlee with the appropriate extra dependency: + +- For SQLite support, run: + pip install 'crawlee[sql_sqlite]' +- For PostgreSQL support, run: + pip install 'crawlee[sql_postgres]' +::: + +By default, SqlStorageClient uses SQLite. +To use PostgreSQL instead, just provide a PostgreSQL connection string via the `connection_string` parameter. No other code changes are needed—the same client works for both databases. + + + {SQLStorageClientBasicExample} + + +Data is organized in relational tables. Below are the main tables and columns used for each storage type: + +```mermaid +--- +config: + class: + hideEmptyMembersBox: true +--- + +classDiagram + +%% ======================== +%% Storage Clients +%% ======================== + +class SqlDatasetClient { + <> +} + +class SqlKeyValueStoreClient { + <> +} + +%% ======================== +%% Dataset Tables +%% ======================== + +class datasets { + <> + + id (PK) + + name + + accessed_at + + created_at + + modified_at + + item_count +} + +class dataset_records { + <
> + + order_id (PK) + + metadata_id (FK) + + data +} + +%% ======================== +%% Key-Value Store Tables +%% ======================== + +class key_value_stores { + <
> + + id (PK) + + name + + accessed_at + + created_at + + modified_at +} + +class key_value_store_records { + <
> + + metadata_id (FK, PK) + + key (PK) + + value + + content_type + + size +} + +%% ======================== +%% Client to Table arrows +%% ======================== + +SqlDatasetClient --> datasets +SqlDatasetClient --> dataset_records + +SqlKeyValueStoreClient --> key_value_stores +SqlKeyValueStoreClient --> key_value_store_records +``` +```mermaid +--- +config: + class: + hideEmptyMembersBox: true +--- + +classDiagram + +%% ======================== +%% Storage Clients +%% ======================== + +class SqlRequestQueueClient { + <> +} + +%% ======================== +%% Request Queue Tables +%% ======================== + +class request_queues { + <
> + + id (PK) + + name + + accessed_at + + created_at + + modified_at + + had_multiple_clients + + handled_request_count + + pending_request_count + + total_request_count +} + +class request_queue_records { + <
> + + request_id (PK) + + metadata_id (FK, PK) + + data + + sequence_number + + is_handled + + time_blocked_until + + client_key +} + +class request_queue_state { + <
> + + metadata_id (FK, PK) + + sequence_counter + + forefront_sequence_counter +} + +%% ======================== +%% Client to Table arrows +%% ======================== + +SqlRequestQueueClient --> request_queues +SqlRequestQueueClient --> request_queue_records +SqlRequestQueueClient --> request_queue_state +``` + +Configuration options for the `SqlStorageClient` can be set through environment variables or the `Configuration` class: + +- **`storage_dir`** (env: `CRAWLEE_STORAGE_DIR`, default: `'./storage'`) - The root directory where the default SQLite database will be created if no connection string is provided. +- **`purge_on_start`** (env: `CRAWLEE_PURGE_ON_START`, default: `True`) - Whether to purge default storages on start. + +Configuration options for the `SqlStorageClient` can be set via constructor arguments: + +- **`connection_string`** (default: SQLite in `Configuration` storage dir) – SQLAlchemy connection string, e.g. `sqlite+aiosqlite:///my.db` or `postgresql+asyncpg://user:pass@host/db`. +- **`engine`** – Pre-configured SQLAlchemy AsyncEngine (optional). + +For advanced scenarios, you can configure `SqlStorageClient` with a custom SQLAlchemy engine and additional options via the `Configuration` class. This is useful, for example, when connecting to an external PostgreSQL database or customizing connection pooling. + + + {SQLStorageClientConfigurationExample} + + ## Creating a custom storage client A storage client consists of two parts: the storage client factory and individual storage type clients. The `StorageClient` acts as a factory that creates specific clients (`DatasetClient`, `KeyValueStoreClient`, `RequestQueueClient`) where the actual storage logic is implemented. diff --git a/pyproject.toml b/pyproject.toml index 050ac52211..b38cddbb27 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,7 @@ dependencies = [ ] [project.optional-dependencies] -all = ["crawlee[adaptive-crawler,beautifulsoup,cli,curl-impersonate,httpx,parsel,playwright,otel]"] +all = ["crawlee[adaptive-crawler,beautifulsoup,cli,curl-impersonate,httpx,parsel,playwright,otel,sql_sqlite,sql_postgres]"] adaptive-crawler = [ "jaro-winkler>=2.0.3", "playwright>=1.27.0", @@ -71,6 +71,14 @@ otel = [ "opentelemetry-semantic-conventions>=0.54", "wrapt>=1.17.0", ] +sql_postgres = [ + "sqlalchemy[asyncio]>=2.0.0,<3.0.0", + "asyncpg>=0.24.0" +] +sql_sqlite = [ + "sqlalchemy[asyncio]>=2.0.0,<3.0.0", + "aiosqlite>=0.21.0", +] [project.scripts] crawlee = "crawlee._cli:cli" diff --git a/src/crawlee/storage_clients/__init__.py b/src/crawlee/storage_clients/__init__.py index ce8c713ca9..7b0dfc0a79 100644 --- a/src/crawlee/storage_clients/__init__.py +++ b/src/crawlee/storage_clients/__init__.py @@ -1,9 +1,21 @@ +from crawlee._utils.try_import import install_import_hook as _install_import_hook +from crawlee._utils.try_import import try_import as _try_import + +# These imports have only mandatory dependencies, so they are imported directly. from ._base import StorageClient from ._file_system import FileSystemStorageClient from ._memory import MemoryStorageClient +_install_import_hook(__name__) + +# The following imports are wrapped in try_import to handle optional dependencies, +# ensuring the module can still function even if these dependencies are missing. +with _try_import(__name__, 'SqlStorageClient'): + from ._sql import SqlStorageClient + __all__ = [ 'FileSystemStorageClient', 'MemoryStorageClient', + 'SqlStorageClient', 'StorageClient', ] diff --git a/src/crawlee/storage_clients/_sql/__init__.py b/src/crawlee/storage_clients/_sql/__init__.py new file mode 100644 index 0000000000..56a6a2c717 --- /dev/null +++ b/src/crawlee/storage_clients/_sql/__init__.py @@ -0,0 +1,6 @@ +from ._dataset_client import SqlDatasetClient +from ._key_value_store_client import SqlKeyValueStoreClient +from ._request_queue_client import SqlRequestQueueClient +from ._storage_client import SqlStorageClient + +__all__ = ['SqlDatasetClient', 'SqlKeyValueStoreClient', 'SqlRequestQueueClient', 'SqlStorageClient'] diff --git a/src/crawlee/storage_clients/_sql/_client_mixin.py b/src/crawlee/storage_clients/_sql/_client_mixin.py new file mode 100644 index 0000000000..592ef6a03e --- /dev/null +++ b/src/crawlee/storage_clients/_sql/_client_mixin.py @@ -0,0 +1,372 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from contextlib import asynccontextmanager +from datetime import datetime, timezone +from logging import getLogger +from typing import TYPE_CHECKING, Any, ClassVar, TypedDict, overload + +from sqlalchemy import delete, select, text, update +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.dialects.sqlite import insert as lite_insert +from sqlalchemy.exc import SQLAlchemyError + +from crawlee._utils.crypto import crypto_random_object_id + +if TYPE_CHECKING: + from collections.abc import AsyncIterator + + from sqlalchemy import Insert + from sqlalchemy.ext.asyncio import AsyncSession + from sqlalchemy.orm import DeclarativeBase + from typing_extensions import NotRequired, Self + + from crawlee.storage_clients.models import DatasetMetadata, KeyValueStoreMetadata, RequestQueueMetadata + + from ._db_models import ( + DatasetItemDb, + DatasetMetadataDb, + KeyValueStoreMetadataDb, + KeyValueStoreRecordDb, + RequestDb, + RequestQueueMetadataDb, + ) + from ._storage_client import SqlStorageClient + + +logger = getLogger(__name__) + + +class MetadataUpdateParams(TypedDict, total=False): + """Parameters for updating metadata.""" + + update_accessed_at: NotRequired[bool] + update_modified_at: NotRequired[bool] + force: NotRequired[bool] + + +class SqlClientMixin(ABC): + """Mixin class for SQL clients. + + This mixin provides common SQL operations and basic methods for SQL storage clients. + """ + + _DEFAULT_NAME: ClassVar[str] + """Default name when none provided.""" + + _METADATA_TABLE: ClassVar[type[DatasetMetadataDb | KeyValueStoreMetadataDb | RequestQueueMetadataDb]] + """SQLAlchemy model for metadata.""" + + _ITEM_TABLE: ClassVar[type[DatasetItemDb | KeyValueStoreRecordDb | RequestDb]] + """SQLAlchemy model for items.""" + + _CLIENT_TYPE: ClassVar[str] + """Human-readable client type for error messages.""" + + def __init__(self, *, id: str, storage_client: SqlStorageClient) -> None: + self._id = id + self._storage_client = storage_client + + # Time tracking to reduce database writes during frequent operation + self._accessed_at_allow_update_after: datetime | None = None + self._modified_at_allow_update_after: datetime | None = None + self._accessed_modified_update_interval = storage_client.get_accessed_modified_update_interval() + + @classmethod + async def _open( + cls, + *, + id: str | None, + name: str | None, + storage_client: SqlStorageClient, + metadata_model: type[DatasetMetadata | KeyValueStoreMetadata | RequestQueueMetadata], + session: AsyncSession, + extra_metadata_fields: dict[str, Any], + ) -> Self: + """Open existing storage or create new one. + + Internal method used by _safely_open. + + Args: + id: Storage ID to open (takes precedence over name). + name: Storage name to open. + storage_client: SQL storage client instance. + metadata_model: Pydantic model for metadata validation. + session: Active database session. + extra_metadata_fields: Storage-specific metadata fields. + """ + orm_metadata: DatasetMetadataDb | KeyValueStoreMetadataDb | RequestQueueMetadataDb | None = None + if id: + orm_metadata = await session.get(cls._METADATA_TABLE, id) + if not orm_metadata: + raise ValueError(f'{cls._CLIENT_TYPE} with ID "{id}" not found.') + else: + search_name = name or cls._DEFAULT_NAME + stmt = select(cls._METADATA_TABLE).where(cls._METADATA_TABLE.name == search_name) + result = await session.execute(stmt) + orm_metadata = result.scalar_one_or_none() # type: ignore[assignment] + + if orm_metadata: + client = cls(id=orm_metadata.id, storage_client=storage_client) + await client._update_metadata(session, update_accessed_at=True) + else: + now = datetime.now(timezone.utc) + metadata = metadata_model( + id=crypto_random_object_id(), + name=name, + created_at=now, + accessed_at=now, + modified_at=now, + **extra_metadata_fields, + ) + client = cls(id=metadata.id, storage_client=storage_client) + client._accessed_at_allow_update_after = now + client._accessed_modified_update_interval + client._modified_at_allow_update_after = now + client._accessed_modified_update_interval + session.add(cls._METADATA_TABLE(**metadata.model_dump())) + + return client + + @classmethod + async def _safely_open( + cls, + *, + id: str | None, + name: str | None, + storage_client: SqlStorageClient, + metadata_model: type[DatasetMetadata | KeyValueStoreMetadata | RequestQueueMetadata], + extra_metadata_fields: dict[str, Any], + ) -> Self: + """Safely open storage with transaction handling. + + Args: + id: Storage ID to open (takes precedence over name). + name: Storage name to open. + storage_client: SQL storage client instance. + client_class: Concrete client class to instantiate. + metadata_model: Pydantic model for metadata validation. + extra_metadata_fields: Storage-specific metadata fields. + """ + async with storage_client.create_session() as session: + try: + client = await cls._open( + id=id, + name=name, + storage_client=storage_client, + metadata_model=metadata_model, + session=session, + extra_metadata_fields=extra_metadata_fields, + ) + await session.commit() + except SQLAlchemyError: + await session.rollback() + + search_name = name or cls._DEFAULT_NAME + stmt = select(cls._METADATA_TABLE).where(cls._METADATA_TABLE.name == search_name) + result = await session.execute(stmt) + orm_metadata: DatasetMetadataDb | KeyValueStoreMetadataDb | RequestQueueMetadataDb | None + orm_metadata = result.scalar_one_or_none() # type: ignore[assignment] + + if not orm_metadata: + raise ValueError(f'{cls._CLIENT_TYPE} with Name "{search_name}" not found.') from None + + client = cls(id=orm_metadata.id, storage_client=storage_client) + + return client + + @asynccontextmanager + async def get_session(self, *, with_simple_commit: bool = False) -> AsyncIterator[AsyncSession]: + """Create a new SQLAlchemy session for this storage.""" + async with self._storage_client.create_session() as session: + # For operations where a final commit is mandatory and does not require specific processing conditions + if with_simple_commit: + try: + yield session + await session.commit() + except SQLAlchemyError as e: + logger.warning(f'Error occurred during session transaction: {e}') + await session.rollback() + else: + yield session + + def build_insert_stmt_with_ignore( + self, table_model: type[DeclarativeBase], insert_values: dict[str, Any] | list[dict[str, Any]] + ) -> Insert: + """Build an insert statement with ignore for the SQL dialect. + + Args: + table_model: SQLAlchemy table model. + insert_values: Single dict or list of dicts to insert. + """ + if isinstance(insert_values, dict): + insert_values = [insert_values] + + dialect = self._storage_client.get_dialect_name() + + if dialect == 'postgresql': + return pg_insert(table_model).values(insert_values).on_conflict_do_nothing() + + if dialect == 'sqlite': + return lite_insert(table_model).values(insert_values).on_conflict_do_nothing() + + raise NotImplementedError(f'Insert with ignore not supported for dialect: {dialect}') + + def build_upsert_stmt( + self, + table_model: type[DeclarativeBase], + insert_values: dict[str, Any] | list[dict[str, Any]], + update_columns: list[str], + conflict_cols: list[str] | None = None, + ) -> Insert: + """Build an upsert statement for the SQL dialect. + + Args: + table_model: SQLAlchemy table model. + insert_values: Single dict or list of dicts to upsert. + update_columns: Column names to update on conflict. + conflict_cols: Column names that define uniqueness (for PostgreSQL/SQLite). + + """ + if isinstance(insert_values, dict): + insert_values = [insert_values] + + dialect = self._storage_client.get_dialect_name() + + if dialect == 'postgresql': + pg_stmt = pg_insert(table_model).values(insert_values) + set_ = {col: getattr(pg_stmt.excluded, col) for col in update_columns} + return pg_stmt.on_conflict_do_update(index_elements=conflict_cols, set_=set_) + + if dialect == 'sqlite': + lite_stmt = lite_insert(table_model).values(insert_values) + set_ = {col: getattr(lite_stmt.excluded, col) for col in update_columns} + return lite_stmt.on_conflict_do_update(index_elements=conflict_cols, set_=set_) + + raise NotImplementedError(f'Upsert not supported for dialect: {dialect}') + + async def _purge(self, metadata_kwargs: MetadataUpdateParams) -> None: + """Drop all items in storage and update metadata. + + Args: + metadata_kwargs: Arguments to pass to _update_metadata. + """ + stmt = delete(self._ITEM_TABLE).where(self._ITEM_TABLE.metadata_id == self._id) + async with self.get_session(with_simple_commit=True) as session: + await session.execute(stmt) + await self._update_metadata(session, **metadata_kwargs) + + async def _drop(self) -> None: + """Delete this storage and all its data. + + This operation is irreversible. Uses CASCADE deletion to remove all related items. + """ + stmt = delete(self._METADATA_TABLE).where(self._METADATA_TABLE.id == self._id) + async with self.get_session(with_simple_commit=True) as session: + if self._storage_client.get_dialect_name() == 'sqlite': + # foreign_keys=ON is set at the connection level. Required for cascade deletion. + await session.execute(text('PRAGMA foreign_keys=ON')) + await session.execute(stmt) + + @overload + async def _get_metadata(self, metadata_model: type[DatasetMetadata]) -> DatasetMetadata: ... + @overload + async def _get_metadata(self, metadata_model: type[KeyValueStoreMetadata]) -> KeyValueStoreMetadata: ... + @overload + async def _get_metadata(self, metadata_model: type[RequestQueueMetadata]) -> RequestQueueMetadata: ... + + async def _get_metadata( + self, metadata_model: type[DatasetMetadata | KeyValueStoreMetadata | RequestQueueMetadata] + ) -> DatasetMetadata | KeyValueStoreMetadata | RequestQueueMetadata: + """Retrieve client metadata.""" + async with self.get_session() as session: + orm_metadata = await session.get(self._METADATA_TABLE, self._id) + if not orm_metadata: + raise ValueError(f'Dataset with ID "{self._id}" not found.') + + return metadata_model.model_validate(orm_metadata) + + def _default_update_metadata( + self, *, update_accessed_at: bool = False, update_modified_at: bool = False, force: bool = False + ) -> dict[str, Any]: + """Prepare common metadata updates with rate limiting. + + Args: + update_accessed_at: Whether to update accessed_at timestamp. + update_modified_at: Whether to update modified_at timestamp. + force: Whether to force the update regardless of rate limiting. + """ + values_to_set: dict[str, Any] = {} + now = datetime.now(timezone.utc) + + # If the record must be updated (for example, when updating counters), we update timestamps and shift the time. + if force: + if update_modified_at: + values_to_set['modified_at'] = now + self._modified_at_allow_update_after = now + self._accessed_modified_update_interval + if update_accessed_at: + values_to_set['accessed_at'] = now + self._accessed_at_allow_update_after = now + self._accessed_modified_update_interval + + elif update_modified_at and ( + self._modified_at_allow_update_after is None or now >= self._modified_at_allow_update_after + ): + values_to_set['modified_at'] = now + self._modified_at_allow_update_after = now + self._accessed_modified_update_interval + # The record will be updated, we can update `accessed_at` and shift the time. + if update_accessed_at: + values_to_set['accessed_at'] = now + self._accessed_at_allow_update_after = now + self._accessed_modified_update_interval + + elif update_accessed_at and ( + self._accessed_at_allow_update_after is None or now >= self._accessed_at_allow_update_after + ): + values_to_set['accessed_at'] = now + self._accessed_at_allow_update_after = now + self._accessed_modified_update_interval + + return values_to_set + + @abstractmethod + def _specific_update_metadata(self, **kwargs: Any) -> dict[str, Any]: + """Prepare storage-specific metadata updates. + + Must be implemented by concrete classes. + + Args: + **kwargs: Storage-specific update parameters. + """ + + async def _update_metadata( + self, + session: AsyncSession, + *, + update_accessed_at: bool = False, + update_modified_at: bool = False, + force: bool = False, + **kwargs: Any, + ) -> bool: + """Update storage metadata combining common and specific fields. + + Args: + session: Active database session. + update_accessed_at: Whether to update accessed_at timestamp. + update_modified_at: Whether to update modified_at timestamp. + force: Whether to force the update timestamps regardless of rate limiting. + **kwargs: Additional arguments for _specific_update_metadata. + + Returns: + True if any updates were made, False otherwise + """ + values_to_set = self._default_update_metadata( + update_accessed_at=update_accessed_at, update_modified_at=update_modified_at, force=force + ) + + values_to_set.update(self._specific_update_metadata(**kwargs)) + + if values_to_set: + if (stmt := values_to_set.pop('custom_stmt', None)) is None: + stmt = update(self._METADATA_TABLE).where(self._METADATA_TABLE.id == self._id) + + stmt = stmt.values(**values_to_set) + await session.execute(stmt) + return True + + return False diff --git a/src/crawlee/storage_clients/_sql/_dataset_client.py b/src/crawlee/storage_clients/_sql/_dataset_client.py new file mode 100644 index 0000000000..fcf12ffa0e --- /dev/null +++ b/src/crawlee/storage_clients/_sql/_dataset_client.py @@ -0,0 +1,309 @@ +from __future__ import annotations + +from logging import getLogger +from typing import TYPE_CHECKING, Any + +from sqlalchemy import Select, insert, select +from typing_extensions import override + +from crawlee.storage_clients._base import DatasetClient +from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata + +from ._client_mixin import MetadataUpdateParams, SqlClientMixin +from ._db_models import DatasetItemDb, DatasetMetadataDb + +if TYPE_CHECKING: + from collections.abc import AsyncIterator + + from sqlalchemy import Select + from typing_extensions import NotRequired + + from ._storage_client import SqlStorageClient + + +logger = getLogger(__name__) + + +class _DatasetMetadataUpdateParams(MetadataUpdateParams): + """Parameters for updating dataset metadata.""" + + new_item_count: NotRequired[int] + delta_item_count: NotRequired[int] + + +class SqlDatasetClient(DatasetClient, SqlClientMixin): + """SQL implementation of the dataset client. + + This client persists dataset items to a SQL database using two tables for storage + and retrieval. Items are stored as JSON with automatic ordering preservation. + + The dataset data is stored in SQL database tables following the pattern: + - `datasets` table: Contains dataset metadata (id, name, timestamps, item_count) + - `dataset_records` table: Contains individual items with JSON data and auto-increment ordering + + Items are stored as a JSON object in SQLite and as JSONB in PostgreSQL. These objects must be JSON-serializable. + The `order_id` auto-increment primary key ensures insertion order is preserved. + All operations are wrapped in database transactions with CASCADE deletion support. + """ + + _DEFAULT_NAME = 'default' + """Default dataset name used when no name is provided.""" + + _METADATA_TABLE = DatasetMetadataDb + """SQLAlchemy model for dataset metadata.""" + + _ITEM_TABLE = DatasetItemDb + """SQLAlchemy model for dataset items.""" + + _CLIENT_TYPE = 'Dataset' + """Human-readable client type for error messages.""" + + def __init__( + self, + *, + id: str, + storage_client: SqlStorageClient, + ) -> None: + """Initialize a new instance. + + Preferably use the `SqlDatasetClient.open` class method to create a new instance. + """ + super().__init__(id=id, storage_client=storage_client) + + @classmethod + async def open( + cls, + *, + id: str | None, + name: str | None, + storage_client: SqlStorageClient, + ) -> SqlDatasetClient: + """Open an existing dataset or create a new one. + + Args: + id: The ID of the dataset to open. If provided, searches for existing dataset by ID. + name: The name of the dataset to open. If not provided, uses the default dataset. + storage_client: The SQL storage client instance. + + Returns: + An instance for the opened or created storage client. + + Raises: + ValueError: If a dataset with the specified ID is not found. + """ + return await cls._safely_open( + id=id, + name=name, + storage_client=storage_client, + metadata_model=DatasetMetadata, + extra_metadata_fields={'item_count': 0}, + ) + + @override + async def get_metadata(self) -> DatasetMetadata: + # The database is a single place of truth + return await self._get_metadata(DatasetMetadata) + + @override + async def drop(self) -> None: + """Delete this dataset and all its items from the database. + + This operation is irreversible. Uses CASCADE deletion to remove all related items. + """ + await self._drop() + + @override + async def purge(self) -> None: + """Remove all items from this dataset while keeping the dataset structure. + + Resets item_count to 0 and deletes all records from dataset_records table. + """ + await self._purge( + metadata_kwargs=_DatasetMetadataUpdateParams( + new_item_count=0, + update_accessed_at=True, + update_modified_at=True, + force=True, + ) + ) + + @override + async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None: + if not isinstance(data, list): + data = [data] + + db_items: list[dict[str, Any]] = [] + db_items = [{'metadata_id': self._id, 'data': item} for item in data] + stmt = insert(self._ITEM_TABLE).values(db_items) + + async with self.get_session(with_simple_commit=True) as session: + await session.execute(stmt) + + await self._update_metadata( + session, + **_DatasetMetadataUpdateParams( + update_accessed_at=True, + update_modified_at=True, + delta_item_count=len(data), + new_item_count=len(data), + force=True, + ), + ) + + @override + async def get_data( + self, + *, + offset: int = 0, + limit: int | None = 999_999_999_999, + clean: bool = False, + desc: bool = False, + fields: list[str] | None = None, + omit: list[str] | None = None, + unwind: list[str] | None = None, + skip_empty: bool = False, + skip_hidden: bool = False, + flatten: list[str] | None = None, + view: str | None = None, + ) -> DatasetItemsListPage: + stmt = self._prepare_get_stmt( + offset=offset, + limit=limit, + clean=clean, + desc=desc, + fields=fields, + omit=omit, + unwind=unwind, + skip_empty=skip_empty, + skip_hidden=skip_hidden, + flatten=flatten, + view=view, + ) + + async with self.get_session() as session: + result = await session.execute(stmt) + db_items = result.scalars().all() + + updated = await self._update_metadata(session, **_DatasetMetadataUpdateParams(update_accessed_at=True)) + + # Commit updates to the metadata + if updated: + await session.commit() + + items = [db_item.data for db_item in db_items] + metadata = await self.get_metadata() + return DatasetItemsListPage( + items=items, + count=len(items), + desc=desc, + limit=limit or 0, + offset=offset or 0, + total=metadata.item_count, + ) + + @override + async def iterate_items( + self, + *, + offset: int = 0, + limit: int | None = None, + clean: bool = False, + desc: bool = False, + fields: list[str] | None = None, + omit: list[str] | None = None, + unwind: list[str] | None = None, + skip_empty: bool = False, + skip_hidden: bool = False, + ) -> AsyncIterator[dict[str, Any]]: + stmt = self._prepare_get_stmt( + offset=offset, + limit=limit, + clean=clean, + desc=desc, + fields=fields, + omit=omit, + unwind=unwind, + skip_empty=skip_empty, + skip_hidden=skip_hidden, + ) + + async with self.get_session() as session: + db_items = await session.stream_scalars(stmt) + + async for db_item in db_items: + yield db_item.data + + updated = await self._update_metadata(session, **_DatasetMetadataUpdateParams(update_accessed_at=True)) + + # Commit updates to the metadata + if updated: + await session.commit() + + def _prepare_get_stmt( + self, + *, + offset: int = 0, + limit: int | None = 999_999_999_999, + clean: bool = False, + desc: bool = False, + fields: list[str] | None = None, + omit: list[str] | None = None, + unwind: list[str] | None = None, + skip_empty: bool = False, + skip_hidden: bool = False, + flatten: list[str] | None = None, + view: str | None = None, + ) -> Select: + # Check for unsupported arguments and log a warning if found. + unsupported_args: dict[str, Any] = { + 'clean': clean, + 'fields': fields, + 'omit': omit, + 'unwind': unwind, + 'skip_hidden': skip_hidden, + 'flatten': flatten, + 'view': view, + } + unsupported = {k: v for k, v in unsupported_args.items() if v not in (False, None)} + + if unsupported: + logger.warning( + f'The arguments {list(unsupported.keys())} of get_data are not supported by the ' + f'{self.__class__.__name__} client.' + ) + + stmt = select(self._ITEM_TABLE).where(self._ITEM_TABLE.metadata_id == self._id) + + if skip_empty: + # Skip items that are empty JSON objects + stmt = stmt.where(self._ITEM_TABLE.data != {}) + + # Apply ordering by insertion order (order_id) + stmt = ( + stmt.order_by(self._ITEM_TABLE.order_id.desc()) if desc else stmt.order_by(self._ITEM_TABLE.order_id.asc()) + ) + + return stmt.offset(offset).limit(limit) + + def _specific_update_metadata( + self, + new_item_count: int | None = None, + delta_item_count: int | None = None, + **_kwargs: dict[str, Any], + ) -> dict[str, Any]: + """Update the dataset metadata in the database. + + Args: + session: The SQLAlchemy AsyncSession to use for the update. + new_item_count: If provided, set item count to this value. + delta_item_count: If provided, add this value to the current item count. + """ + values_to_set: dict[str, Any] = {} + + if new_item_count is not None: + values_to_set['item_count'] = new_item_count + elif delta_item_count: + # Use database-level for atomic updates + values_to_set['item_count'] = self._METADATA_TABLE.item_count + delta_item_count + + return values_to_set diff --git a/src/crawlee/storage_clients/_sql/_db_models.py b/src/crawlee/storage_clients/_sql/_db_models.py new file mode 100644 index 0000000000..a82146b6b1 --- /dev/null +++ b/src/crawlee/storage_clients/_sql/_db_models.py @@ -0,0 +1,252 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from typing import TYPE_CHECKING, Any + +from sqlalchemy import JSON, BigInteger, Boolean, ForeignKey, Index, Integer, LargeBinary, String +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship +from sqlalchemy.types import DateTime, TypeDecorator +from typing_extensions import override + +if TYPE_CHECKING: + from sqlalchemy.engine import Dialect + from sqlalchemy.types import TypeEngine + + +# This is necessary because unique constraints don't apply to NULL values in SQL. +class NameDefaultNone(TypeDecorator): + """Custom SQLAlchemy type for handling default name values. + + Converts None values to 'default' on storage and back to None on retrieval. + """ + + impl = String(100) + cache_ok = True + + @override + def process_bind_param(self, value: str | None, dialect: Dialect) -> str: + """Convert Python value to database value.""" + return 'default' if value is None else value + + @override + def process_result_value(self, value: str | None, dialect: Dialect) -> str | None: + """Convert database value to Python value.""" + return None if value == 'default' else value + + +class AwareDateTime(TypeDecorator): + """Custom SQLAlchemy type for timezone-aware datetime handling. + + Ensures all datetime values are timezone-aware by adding UTC timezone to + naive datetime values from databases that don't store timezone information. + """ + + impl = DateTime(timezone=True) + cache_ok = True + + @override + def process_result_value(self, value: datetime | None, dialect: Dialect) -> datetime | None: + """Add UTC timezone to naive datetime values.""" + if value is not None and value.tzinfo is None: + return value.replace(tzinfo=timezone.utc) + return value + + +class JsonField(TypeDecorator): + """Uses JSONB for PostgreSQL and JSON for other databases.""" + + impl = JSON + cache_ok = True + + def load_dialect_impl(self, dialect: Dialect) -> TypeEngine[JSON | JSONB]: + """Load the appropriate dialect implementation for the JSON type.""" + if dialect.name == 'postgresql': + return dialect.type_descriptor(JSONB()) + return dialect.type_descriptor(JSON()) + + +class Base(DeclarativeBase): + """Base class for all database models for correct type annotations.""" + + +class StorageMetadataDb: + """Base database model for storage metadata.""" + + id: Mapped[str] = mapped_column(String(20), nullable=False, primary_key=True) + """Unique identifier.""" + + name: Mapped[str | None] = mapped_column(NameDefaultNone, nullable=False, index=True, unique=True) + """Human-readable name. None becomes 'default' in database to enforce uniqueness.""" + + accessed_at: Mapped[datetime] = mapped_column(AwareDateTime, nullable=False) + """Last access datetime for usage tracking.""" + + created_at: Mapped[datetime] = mapped_column(AwareDateTime, nullable=False) + """Creation datetime.""" + + modified_at: Mapped[datetime] = mapped_column(AwareDateTime, nullable=False) + """Last modification datetime.""" + + +class DatasetMetadataDb(StorageMetadataDb, Base): + """Metadata table for datasets.""" + + __tablename__ = 'datasets' + + item_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + """Number of items in the dataset.""" + + # Relationship to dataset items with cascade deletion + items: Mapped[list[DatasetItemDb]] = relationship( + back_populates='dataset', cascade='all, delete-orphan', lazy='noload' + ) + + +class RequestQueueMetadataDb(StorageMetadataDb, Base): + """Metadata table for request queues.""" + + __tablename__ = 'request_queues' + + had_multiple_clients: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) + """Flag indicating if multiple clients have accessed this queue.""" + + handled_request_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + """Number of requests processed.""" + + pending_request_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + """Number of requests waiting to be processed.""" + + total_request_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + """Total number of requests ever added to this queue.""" + + # Relationship to queue requests with cascade deletion + requests: Mapped[list[RequestDb]] = relationship( + back_populates='queue', cascade='all, delete-orphan', lazy='noload' + ) + # Relationship to queue state + state: Mapped[RequestQueueStateDb] = relationship( + back_populates='queue', cascade='all, delete-orphan', lazy='noload' + ) + + +class KeyValueStoreMetadataDb(StorageMetadataDb, Base): + """Metadata table for key-value stores.""" + + __tablename__ = 'key_value_stores' + + # Relationship to store records with cascade deletion + records: Mapped[list[KeyValueStoreRecordDb]] = relationship( + back_populates='kvs', cascade='all, delete-orphan', lazy='noload' + ) + + +class KeyValueStoreRecordDb(Base): + """Records table for key-value stores.""" + + __tablename__ = 'key_value_store_records' + + metadata_id: Mapped[str] = mapped_column( + String(20), ForeignKey('key_value_stores.id', ondelete='CASCADE'), primary_key=True, index=True + ) + """Foreign key to metadata key-value store record.""" + + key: Mapped[str] = mapped_column(String(255), primary_key=True) + """The key part of the key-value pair.""" + + value: Mapped[bytes] = mapped_column(LargeBinary, nullable=False) + """Value stored as binary data to support any content type.""" + + content_type: Mapped[str] = mapped_column(String(50), nullable=False) + """MIME type for proper value deserialization.""" + + size: Mapped[int | None] = mapped_column(Integer, nullable=False, default=0) + """Size of stored value in bytes.""" + + # Relationship back to parent store + kvs: Mapped[KeyValueStoreMetadataDb] = relationship(back_populates='records') + + +class DatasetItemDb(Base): + """Items table for datasets.""" + + __tablename__ = 'dataset_records' + + order_id: Mapped[int] = mapped_column(Integer, primary_key=True) + """Auto-increment primary key preserving insertion order.""" + + metadata_id: Mapped[str] = mapped_column( + String(20), + ForeignKey('datasets.id', ondelete='CASCADE'), + index=True, + ) + """Foreign key to metadata dataset record.""" + + data: Mapped[list[dict[str, Any]] | dict[str, Any]] = mapped_column(JsonField, nullable=False) + """JSON serializable item data.""" + + # Relationship back to parent dataset + dataset: Mapped[DatasetMetadataDb] = relationship(back_populates='items') + + +class RequestDb(Base): + """Requests table for request queues.""" + + __tablename__ = 'request_queue_records' + __table_args__ = ( + Index('idx_fetch_available', 'metadata_id', 'is_handled', 'time_blocked_until', 'sequence_number'), + ) + + request_id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + """Unique identifier for the request representing the unique_key.""" + + metadata_id: Mapped[str] = mapped_column( + String(20), ForeignKey('request_queues.id', ondelete='CASCADE'), primary_key=True + ) + """Foreign key to metadata request queue record.""" + + data: Mapped[str] = mapped_column(String, nullable=False) + """JSON-serialized Request object.""" + + sequence_number: Mapped[int] = mapped_column(Integer, nullable=False) + """Ordering sequence: negative for forefront, positive for regular.""" + + is_handled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) + """Processing status flag.""" + + time_blocked_until: Mapped[datetime | None] = mapped_column(AwareDateTime, nullable=True) + """Timestamp until which this request is considered blocked for processing by other clients.""" + + client_key: Mapped[str | None] = mapped_column(String(32), nullable=True) + """Identifier of the client that has currently locked this request for processing.""" + + # Relationship back to metadata table + queue: Mapped[RequestQueueMetadataDb] = relationship(back_populates='requests') + + +class RequestQueueStateDb(Base): + """State table for request queues.""" + + __tablename__ = 'request_queue_state' + + metadata_id: Mapped[str] = mapped_column( + String(20), ForeignKey('request_queues.id', ondelete='CASCADE'), primary_key=True + ) + """Foreign key to metadata request queue record.""" + + sequence_counter: Mapped[int] = mapped_column(Integer, nullable=False, default=1) + """Counter for regular request ordering (positive).""" + + forefront_sequence_counter: Mapped[int] = mapped_column(Integer, nullable=False, default=-1) + """Counter for forefront request ordering (negative).""" + + # Relationship back to metadata table + queue: Mapped[RequestQueueMetadataDb] = relationship(back_populates='state') + + +class VersionDb(Base): + """Table for storing the database schema version.""" + + __tablename__ = 'version' + + version: Mapped[str] = mapped_column(String(10), nullable=False, primary_key=True) diff --git a/src/crawlee/storage_clients/_sql/_key_value_store_client.py b/src/crawlee/storage_clients/_sql/_key_value_store_client.py new file mode 100644 index 0000000000..798922b1c1 --- /dev/null +++ b/src/crawlee/storage_clients/_sql/_key_value_store_client.py @@ -0,0 +1,290 @@ +from __future__ import annotations + +import json +from logging import getLogger +from typing import TYPE_CHECKING, Any + +from sqlalchemy import delete, select +from typing_extensions import override + +from crawlee._utils.file import infer_mime_type +from crawlee.storage_clients._base import KeyValueStoreClient +from crawlee.storage_clients.models import KeyValueStoreMetadata, KeyValueStoreRecord, KeyValueStoreRecordMetadata + +from ._client_mixin import MetadataUpdateParams, SqlClientMixin +from ._db_models import KeyValueStoreMetadataDb, KeyValueStoreRecordDb + +if TYPE_CHECKING: + from collections.abc import AsyncIterator + + from ._storage_client import SqlStorageClient + + +logger = getLogger(__name__) + + +class SqlKeyValueStoreClient(KeyValueStoreClient, SqlClientMixin): + """SQL implementation of the key-value store client. + + This client persists key-value data to a SQL database with transaction support and + concurrent access safety. Keys are mapped to rows in database tables with proper indexing + for efficient retrieval. + + The key-value store data is stored in SQL database tables following the pattern: + - `key_value_stores` table: Contains store metadata (id, name, timestamps) + - `key_value_store_records` table: Contains individual key-value pairs with binary value storage, content type, + and size information + + Values are serialized based on their type: JSON objects are stored as formatted JSON, + text values as UTF-8 encoded strings, and binary data as-is in the `LargeBinary` column. + The implementation automatically handles content type detection and maintains metadata + about each record including size and MIME type information. + + All database operations are wrapped in transactions with proper error handling and rollback + mechanisms. The client supports atomic upsert operations and handles race conditions when + multiple clients access the same store using composite primary keys (metadata_id, key). + """ + + _DEFAULT_NAME = 'default' + """Default dataset name used when no name is provided.""" + + _METADATA_TABLE = KeyValueStoreMetadataDb + """SQLAlchemy model for key-value store metadata.""" + + _ITEM_TABLE = KeyValueStoreRecordDb + """SQLAlchemy model for key-value store items.""" + + _CLIENT_TYPE = 'Key-value store' + """Human-readable client type for error messages.""" + + def __init__( + self, + *, + storage_client: SqlStorageClient, + id: str, + ) -> None: + """Initialize a new instance. + + Preferably use the `SqlKeyValueStoreClient.open` class method to create a new instance. + """ + super().__init__(id=id, storage_client=storage_client) + + @classmethod + async def open( + cls, + *, + id: str | None, + name: str | None, + storage_client: SqlStorageClient, + ) -> SqlKeyValueStoreClient: + """Open or create a SQL key-value store client. + + This method attempts to open an existing key-value store from the SQL database. If a KVS with the specified + ID or name exists, it loads the metadata from the database. If no existing store is found, a new one + is created. + + Args: + id: The ID of the key-value store to open. If provided, searches for existing store by ID. + name: The name of the key-value store to open. If not provided, uses the default store. + storage_client: The SQL storage client used to access the database. + + Returns: + An instance for the opened or created storage client. + + Raises: + ValueError: If a store with the specified ID is not found, or if metadata is invalid. + """ + return await cls._safely_open( + id=id, + name=name, + storage_client=storage_client, + metadata_model=KeyValueStoreMetadata, + extra_metadata_fields={}, + ) + + @override + async def get_metadata(self) -> KeyValueStoreMetadata: + # The database is a single place of truth + return await self._get_metadata(KeyValueStoreMetadata) + + @override + async def drop(self) -> None: + """Delete this key-value store and all its records from the database. + + This operation is irreversible. Uses CASCADE deletion to remove all related records. + """ + await self._drop() + + @override + async def purge(self) -> None: + """Remove all items from this key-value store while keeping the key-value store structure. + + Remove all records from key_value_store_records table. + """ + await self._purge(metadata_kwargs=MetadataUpdateParams(update_accessed_at=True, update_modified_at=True)) + + @override + async def set_value(self, *, key: str, value: Any, content_type: str | None = None) -> None: + # Special handling for None values + if value is None: + content_type = 'application/x-none' # Special content type to identify None values + value_bytes = b'' + else: + content_type = content_type or infer_mime_type(value) + + # Serialize the value to bytes. + if 'application/json' in content_type: + value_bytes = json.dumps(value, default=str, ensure_ascii=False).encode('utf-8') + elif isinstance(value, str): + value_bytes = value.encode('utf-8') + elif isinstance(value, (bytes, bytearray)): + value_bytes = value + else: + # Fallback: attempt to convert to string and encode. + value_bytes = str(value).encode('utf-8') + + size = len(value_bytes) + insert_values = { + 'metadata_id': self._id, + 'key': key, + 'value': value_bytes, + 'content_type': content_type, + 'size': size, + } + + upsert_stmt = self.build_upsert_stmt( + self._ITEM_TABLE, + insert_values=insert_values, + update_columns=['value', 'content_type', 'size'], + conflict_cols=['metadata_id', 'key'], + ) + + async with self.get_session(with_simple_commit=True) as session: + await session.execute(upsert_stmt) + + await self._update_metadata( + session, **MetadataUpdateParams(update_accessed_at=True, update_modified_at=True) + ) + + @override + async def get_value(self, *, key: str) -> KeyValueStoreRecord | None: + # Query the record by key + stmt = select(self._ITEM_TABLE).where(self._ITEM_TABLE.metadata_id == self._id, self._ITEM_TABLE.key == key) + async with self.get_session() as session: + result = await session.execute(stmt) + record_db = result.scalar_one_or_none() + + updated = await self._update_metadata(session, **MetadataUpdateParams(update_accessed_at=True)) + + # Commit updates to the metadata + if updated: + await session.commit() + + if not record_db: + return None + + # Deserialize the value based on content type + value_bytes = record_db.value + + # Handle None values + if record_db.content_type == 'application/x-none': + value = None + # Handle JSON values + elif 'application/json' in record_db.content_type: + try: + value = json.loads(value_bytes.decode('utf-8')) + except (json.JSONDecodeError, UnicodeDecodeError): + logger.warning(f'Failed to decode JSON value for key "{key}"') + return None + # Handle text values + elif record_db.content_type.startswith('text/'): + try: + value = value_bytes.decode('utf-8') + except UnicodeDecodeError: + logger.warning(f'Failed to decode text value for key "{key}"') + return None + # Handle binary values + else: + value = value_bytes + + return KeyValueStoreRecord( + key=record_db.key, + value=value, + content_type=record_db.content_type, + size=record_db.size, + ) + + @override + async def delete_value(self, *, key: str) -> None: + stmt = delete(self._ITEM_TABLE).where(self._ITEM_TABLE.metadata_id == self._id, self._ITEM_TABLE.key == key) + async with self.get_session(with_simple_commit=True) as session: + # Delete the record if it exists + result = await session.execute(stmt) + + # Update metadata if we actually deleted something + if result.rowcount > 0: + await self._update_metadata( + session, **MetadataUpdateParams(update_accessed_at=True, update_modified_at=True) + ) + + await session.commit() + + @override + async def iterate_keys( + self, + *, + exclusive_start_key: str | None = None, + limit: int | None = None, + ) -> AsyncIterator[KeyValueStoreRecordMetadata]: + # Build query for record metadata + stmt = ( + select(self._ITEM_TABLE.key, self._ITEM_TABLE.content_type, self._ITEM_TABLE.size) + .where(self._ITEM_TABLE.metadata_id == self._id) + .order_by(self._ITEM_TABLE.key) + ) + + # Apply exclusive_start_key filter + if exclusive_start_key is not None: + stmt = stmt.where(self._ITEM_TABLE.key > exclusive_start_key) + + # Apply limit + if limit is not None: + stmt = stmt.limit(limit) + + async with self.get_session() as session: + result = await session.stream(stmt.execution_options(stream_results=True)) + + async for row in result: + yield KeyValueStoreRecordMetadata( + key=row.key, + content_type=row.content_type, + size=row.size, + ) + + updated = await self._update_metadata(session, **MetadataUpdateParams(update_accessed_at=True)) + + # Commit updates to the metadata + if updated: + await session.commit() + + @override + async def record_exists(self, *, key: str) -> bool: + stmt = select(self._ITEM_TABLE.key).where(self._ITEM_TABLE.metadata_id == self._id, self._ITEM_TABLE.key == key) + async with self.get_session() as session: + # Check if record exists + result = await session.execute(stmt) + + updated = await self._update_metadata(session, **MetadataUpdateParams(update_accessed_at=True)) + + # Commit updates to the metadata + if updated: + await session.commit() + + return result.scalar_one_or_none() is not None + + @override + async def get_public_url(self, *, key: str) -> str: + raise NotImplementedError('Public URLs are not supported for SQL key-value stores.') + + def _specific_update_metadata(self, **_kwargs: dict[str, Any]) -> dict[str, Any]: + return {} diff --git a/src/crawlee/storage_clients/_sql/_request_queue_client.py b/src/crawlee/storage_clients/_sql/_request_queue_client.py new file mode 100644 index 0000000000..b7914f7d48 --- /dev/null +++ b/src/crawlee/storage_clients/_sql/_request_queue_client.py @@ -0,0 +1,710 @@ +from __future__ import annotations + +from collections import deque +from datetime import datetime, timedelta, timezone +from hashlib import sha256 +from logging import getLogger +from typing import TYPE_CHECKING, Any + +from cachetools import LRUCache +from sqlalchemy import func, or_, select, update +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.orm import load_only +from typing_extensions import NotRequired, override + +from crawlee import Request +from crawlee._utils.crypto import crypto_random_object_id +from crawlee.storage_clients._base import RequestQueueClient +from crawlee.storage_clients.models import ( + AddRequestsResponse, + ProcessedRequest, + RequestQueueMetadata, + UnprocessedRequest, +) + +from ._client_mixin import MetadataUpdateParams, SqlClientMixin +from ._db_models import RequestDb, RequestQueueMetadataDb, RequestQueueStateDb + +if TYPE_CHECKING: + from collections.abc import Sequence + + from sqlalchemy.ext.asyncio import AsyncSession + + from ._storage_client import SqlStorageClient + + +logger = getLogger(__name__) + + +class _QueueMetadataUpdateParams(MetadataUpdateParams): + """Parameters for updating queue metadata.""" + + new_handled_request_count: NotRequired[int] + new_pending_request_count: NotRequired[int] + new_total_request_count: NotRequired[int] + delta_handled_request_count: NotRequired[int] + delta_pending_request_count: NotRequired[int] + recalculate: NotRequired[bool] + update_had_multiple_clients: NotRequired[bool] + + +class SqlRequestQueueClient(RequestQueueClient, SqlClientMixin): + """SQL implementation of the request queue client. + + This client persists requests to a SQL database with transaction handling and + concurrent access safety. Requests are stored with sequence-based ordering and + efficient querying capabilities. + + The implementation uses negative sequence numbers for forefront (high-priority) requests + and positive sequence numbers for regular requests, allowing for efficient single-query + ordering. A cache mechanism reduces database queries. + + The request queue data is stored in SQL database tables following the pattern: + - `request_queues` table: Contains queue metadata (id, name, timestamps, request counts, multi-client flag) + - `request_queue_records` table: Contains individual requests with JSON data, unique keys for deduplication, + sequence numbers for ordering, and processing status flags + - `request_queue_state` table: Maintains counters for sequence numbers to ensure proper ordering of requests. + + Requests are serialized to JSON for storage and maintain proper ordering through sequence + numbers. The implementation provides concurrent access safety through transaction + handling, locking mechanisms, and optimized database indexes for efficient querying. + """ + + _DEFAULT_NAME = 'default' + """Default dataset name used when no name is provided.""" + + _MAX_BATCH_FETCH_SIZE = 10 + """Maximum number of requests to fetch from the database in a single batch operation. + + Used to limit the number of requests loaded and locked for processing at once (improves efficiency and reduces + database load). + """ + + _METADATA_TABLE = RequestQueueMetadataDb + """SQLAlchemy model for request queue metadata.""" + + _ITEM_TABLE = RequestDb + """SQLAlchemy model for request items.""" + + _CLIENT_TYPE = 'Request queue' + """Human-readable client type for error messages.""" + + _REQUEST_ID_BY_KEY: LRUCache[str, int] = LRUCache(maxsize=10000) + """Cache mapping unique keys to integer IDs.""" + + _BLOCK_REQUEST_TIME = 300 + """Number of seconds for which a request is considered blocked in the database after being fetched for processing. + """ + + def __init__( + self, + *, + id: str, + storage_client: SqlStorageClient, + ) -> None: + """Initialize a new instance. + + Preferably use the `SqlRequestQueueClient.open` class method to create a new instance. + """ + super().__init__(id=id, storage_client=storage_client) + + self._pending_fetch_cache: deque[Request] = deque() + """Cache for requests: ordered by sequence number.""" + + self.client_key = crypto_random_object_id(length=32)[:32] + """Unique identifier for this client instance.""" + + @classmethod + async def open( + cls, + *, + id: str | None, + name: str | None, + storage_client: SqlStorageClient, + ) -> SqlRequestQueueClient: + """Open an existing request queue or create a new one. + + This method first tries to find an existing queue by ID or name. + If found, it returns a client for that queue. If not found, it creates + a new queue with the specified parameters. + + Args: + id: The ID of the request queue to open. Takes precedence over name. + name: The name of the request queue to open. Uses 'default' if None. + storage_client: The SQL storage client used to access the database. + + Returns: + An instance for the opened or created request queue. + + Raises: + ValueError: If a queue with the specified ID is not found. + """ + return await cls._safely_open( + id=id, + name=name, + storage_client=storage_client, + metadata_model=RequestQueueMetadata, + extra_metadata_fields={ + 'had_multiple_clients': False, + 'handled_request_count': 0, + 'pending_request_count': 0, + 'total_request_count': 0, + }, + ) + + @override + async def get_metadata(self) -> RequestQueueMetadata: + # The database is a single place of truth + return await self._get_metadata(RequestQueueMetadata) + + @override + async def drop(self) -> None: + """Delete this request queue and all its records from the database. + + This operation is irreversible. Uses CASCADE deletion to remove all related records. + """ + await self._drop() + + self._pending_fetch_cache.clear() + + @override + async def purge(self) -> None: + """Remove all items from this dataset while keeping the dataset structure. + + Resets pending_request_count and handled_request_count to 0 and deletes all records from request_queue_records + table. + """ + await self._purge( + metadata_kwargs=_QueueMetadataUpdateParams( + update_accessed_at=True, + update_modified_at=True, + new_pending_request_count=0, + new_handled_request_count=0, + force=True, + ) + ) + + # Clear recoverable state + self._pending_fetch_cache.clear() + + @override + async def add_batch_of_requests( + self, + requests: Sequence[Request], + *, + forefront: bool = False, + ) -> AddRequestsResponse: + if not requests: + return AddRequestsResponse(processed_requests=[], unprocessed_requests=[]) + + # Clear empty cache since we're adding requests + processed_requests = [] + unprocessed_requests = [] + transaction_processed_requests = [] + transaction_processed_requests_unique_keys = set() + + metadata_recalculate = False + + # Deduplicate requests by unique_key upfront + unique_requests = {} + unique_key_by_request_id = {} + for req in requests: + if req.unique_key not in unique_requests: + request_id = self._get_int_id_from_unique_key(req.unique_key) + unique_requests[request_id] = req + unique_key_by_request_id[request_id] = req.unique_key + self._REQUEST_ID_BY_KEY[req.unique_key] = request_id + + # Get existing requests by unique keys + stmt = ( + select(self._ITEM_TABLE) + .where( + self._ITEM_TABLE.metadata_id == self._id, self._ITEM_TABLE.request_id.in_(set(unique_requests.keys())) + ) + .options( + load_only( + self._ITEM_TABLE.request_id, + self._ITEM_TABLE.is_handled, + self._ITEM_TABLE.time_blocked_until, + ) + ) + ) + + async with self.get_session() as session: + result = await session.execute(stmt) + existing_requests = {req.request_id: req for req in result.scalars()} + state = await self._get_state(session) + insert_values: list[dict] = [] + + for request_id, request in sorted(unique_requests.items()): + existing_req_db = existing_requests.get(request_id) + # New Request, add it + if existing_req_db is None: + value = { + 'request_id': request_id, + 'metadata_id': self._id, + 'data': request.model_dump_json(), + 'is_handled': False, + } + if forefront: + value['sequence_number'] = state.forefront_sequence_counter + state.forefront_sequence_counter -= 1 + else: + value['sequence_number'] = state.sequence_counter + state.sequence_counter += 1 + + insert_values.append(value) + metadata_recalculate = True + transaction_processed_requests.append( + ProcessedRequest( + unique_key=request.unique_key, + was_already_present=False, + was_already_handled=False, + ) + ) + transaction_processed_requests_unique_keys.add(request.unique_key) + # Already handled request, skip adding + elif existing_req_db and existing_req_db.is_handled: + processed_requests.append( + ProcessedRequest( + unique_key=request.unique_key, + was_already_present=True, + was_already_handled=True, + ) + ) + # Already in progress in one of the clients + elif existing_req_db and existing_req_db.time_blocked_until: + processed_requests.append( + ProcessedRequest( + unique_key=request.unique_key, + was_already_present=True, + was_already_handled=False, + ) + ) + # Request in database but not yet handled and not in progress + elif existing_req_db and not existing_req_db.is_handled and not existing_req_db.time_blocked_until: + # Forefront request, update its sequence number + if forefront: + insert_values.append( + { + 'metadata_id': self._id, + 'request_id': request_id, + 'sequence_number': state.forefront_sequence_counter, + 'data': request.model_dump_json(), + 'is_handled': False, + } + ) + state.forefront_sequence_counter -= 1 + transaction_processed_requests.append( + ProcessedRequest( + unique_key=request.unique_key, + was_already_present=True, + was_already_handled=False, + ) + ) + transaction_processed_requests_unique_keys.add(request.unique_key) + # Regular request, keep its position + else: + processed_requests.append( + ProcessedRequest( + unique_key=request.unique_key, + was_already_present=True, + was_already_handled=False, + ) + ) + # Unexpected condition + else: + unprocessed_requests.append( + UnprocessedRequest( + unique_key=request.unique_key, + url=request.url, + method=request.method, + ) + ) + + if insert_values: + if forefront: + # If the request already exists in the database, we update the sequence_number by shifting request + # to the left. + upsert_stmt = self.build_upsert_stmt( + self._ITEM_TABLE, + insert_values, + update_columns=['sequence_number'], + conflict_cols=['request_id', 'metadata_id'], + ) + await session.execute(upsert_stmt) + else: + # If the request already exists in the database, we ignore this request when inserting. + insert_stmt_with_ignore = self.build_insert_stmt_with_ignore(self._ITEM_TABLE, insert_values) + await session.execute(insert_stmt_with_ignore) + + await self._update_metadata( + session, + **_QueueMetadataUpdateParams( + recalculate=metadata_recalculate, + update_modified_at=True, + update_accessed_at=True, + force=metadata_recalculate, + ), + ) + + try: + await session.commit() + processed_requests.extend(transaction_processed_requests) + except SQLAlchemyError as e: + await session.rollback() + logger.warning(f'Failed to commit session: {e}') + await self._update_metadata( + session, recalculate=True, update_modified_at=True, update_accessed_at=True, force=True + ) + await session.commit() + transaction_processed_requests.clear() + unprocessed_requests.extend( + [ + UnprocessedRequest( + unique_key=request.unique_key, + url=request.url, + method=request.method, + ) + for request in requests + if request.unique_key in transaction_processed_requests_unique_keys + ] + ) + + return AddRequestsResponse( + processed_requests=processed_requests, + unprocessed_requests=unprocessed_requests, + ) + + @override + async def get_request(self, unique_key: str) -> Request | None: + if not (request_id := self._REQUEST_ID_BY_KEY.get(unique_key)): + request_id = self._get_int_id_from_unique_key(unique_key) + self._REQUEST_ID_BY_KEY[unique_key] = request_id + + stmt = select(self._ITEM_TABLE).where( + self._ITEM_TABLE.metadata_id == self._id, self._ITEM_TABLE.request_id == request_id + ) + async with self.get_session() as session: + result = await session.execute(stmt) + request_db = result.scalar_one_or_none() + + if request_db is None: + logger.warning(f'Request with ID "{unique_key}" not found in the queue.') + return None + + updated = await self._update_metadata(session, update_accessed_at=True) + + # Commit updates to the metadata + if updated: + await session.commit() + + return Request.model_validate_json(request_db.data) + + @override + async def fetch_next_request(self) -> Request | None: + if self._pending_fetch_cache: + return self._pending_fetch_cache.popleft() + + now = datetime.now(timezone.utc) + block_until = now + timedelta(seconds=self._BLOCK_REQUEST_TIME) + dialect = self._storage_client.get_dialect_name() + + # Get available requests not blocked by another client + stmt = ( + select(self._ITEM_TABLE) + .where( + self._ITEM_TABLE.metadata_id == self._id, + self._ITEM_TABLE.is_handled.is_(False), + or_(self._ITEM_TABLE.time_blocked_until.is_(None), self._ITEM_TABLE.time_blocked_until < now), + ) + .order_by(self._ITEM_TABLE.sequence_number.asc()) + .limit(self._MAX_BATCH_FETCH_SIZE) + ) + + async with self.get_session() as session: + # We use the `skip_locked` database mechanism to prevent the 'interception' of requests by another client + if dialect == 'postgresql': + stmt = stmt.with_for_update(skip_locked=True) + result = await session.execute(stmt) + requests_db = result.scalars().all() + + if not requests_db: + return None + + # All requests received have already been reserved for update with the help of `skip_locked`. + request_ids = {r.request_id for r in requests_db} + + update_stmt = ( + update(self._ITEM_TABLE) + .where(self._ITEM_TABLE.request_id.in_(request_ids)) + .values(time_blocked_until=block_until, client_key=self.client_key) + ) + await session.execute(update_stmt) + + blocked_ids = request_ids + else: + # For other databases, we first select the requests, then try to update them to be blocked. + result = await session.execute(stmt) + requests_db = result.scalars().all() + + if not requests_db: + return None + + request_ids = {r.request_id for r in requests_db} + + update_stmt = ( + update(self._ITEM_TABLE) + .where( + self._ITEM_TABLE.metadata_id == self._id, + self._ITEM_TABLE.request_id.in_(request_ids), + self._ITEM_TABLE.is_handled.is_(False), + or_(self._ITEM_TABLE.time_blocked_until.is_(None), self._ITEM_TABLE.time_blocked_until < now), + ) + .values(time_blocked_until=block_until, client_key=self.client_key) + .returning(self._ITEM_TABLE.request_id) + ) + + update_result = await session.execute(update_stmt) + blocked_ids = {row[0] for row in update_result.fetchall()} + + if not blocked_ids: + await session.rollback() + return None + + await self._update_metadata(session, **_QueueMetadataUpdateParams(update_accessed_at=True)) + + await session.commit() + + requests = [Request.model_validate_json(r.data) for r in requests_db if r.request_id in blocked_ids] + + if not requests: + return None + + self._pending_fetch_cache.extend(requests[1:]) + + return requests[0] + + @override + async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None: + if not (request_id := self._REQUEST_ID_BY_KEY.get(request.unique_key)): + request_id = self._get_int_id_from_unique_key(request.unique_key) + + # Update the request's handled_at timestamp. + if request.handled_at is None: + request.handled_at = datetime.now(timezone.utc) + + # Update request in Db + stmt = ( + update(self._ITEM_TABLE) + .where(self._ITEM_TABLE.metadata_id == self._id, self._ITEM_TABLE.request_id == request_id) + .values(is_handled=True, time_blocked_until=None, client_key=None, data=request.model_dump_json()) + ) + async with self.get_session() as session: + result = await session.execute(stmt) + if result.rowcount == 0: + logger.warning(f'Request {request.unique_key} not found in database.') + return None + await self._update_metadata( + session, + **_QueueMetadataUpdateParams( + delta_handled_request_count=1, + delta_pending_request_count=-1, + update_modified_at=True, + update_accessed_at=True, + force=True, + ), + ) + await session.commit() + return ProcessedRequest( + unique_key=request.unique_key, + was_already_present=True, + was_already_handled=True, + ) + + @override + async def reclaim_request( + self, + request: Request, + *, + forefront: bool = False, + ) -> ProcessedRequest | None: + if not (request_id := self._REQUEST_ID_BY_KEY.get(request.unique_key)): + request_id = self._get_int_id_from_unique_key(request.unique_key) + + stmt = update(self._ITEM_TABLE).where( + self._ITEM_TABLE.metadata_id == self._id, self._ITEM_TABLE.request_id == request_id + ) + + async with self.get_session(with_simple_commit=True) as session: + state = await self._get_state(session) + + # Update sequence number if changing priority + if forefront: + new_sequence = state.forefront_sequence_counter + state.forefront_sequence_counter -= 1 + now = datetime.now(timezone.utc) + block_until = now + timedelta(seconds=self._BLOCK_REQUEST_TIME) + # Extend blocking for forefront request, it is considered blocked by the current client. + stmt = stmt.values( + sequence_number=new_sequence, time_blocked_until=block_until, client_key=self.client_key + ) + else: + new_sequence = state.sequence_counter + state.sequence_counter += 1 + stmt = stmt.values(sequence_number=new_sequence, time_blocked_until=None, client_key=None) + + result = await session.execute(stmt) + if result.rowcount == 0: + logger.warning(f'Request {request.unique_key} not found in database.') + return None + await self._update_metadata( + session, **_QueueMetadataUpdateParams(update_modified_at=True, update_accessed_at=True) + ) + + # put the forefront request at the beginning of the cache + if forefront: + self._pending_fetch_cache.appendleft(request) + + return ProcessedRequest( + unique_key=request.unique_key, + was_already_present=True, + was_already_handled=False, + ) + + @override + async def is_empty(self) -> bool: + # Check in-memory cache for requests + if self._pending_fetch_cache: + return False + + # Check database for unhandled requests + async with self.get_session() as session: + metadata_orm = await session.get(self._METADATA_TABLE, self._id) + if not metadata_orm: + raise ValueError(f'Request queue with ID "{self._id}" not found.') + + empty = metadata_orm.pending_request_count == 0 + + updated = await self._update_metadata( + session, + **_QueueMetadataUpdateParams( + update_accessed_at=True, + # With multi-client access, counters may become out of sync. + # If the queue is not empty, we perform a recalculation to synchronize the counters in the metadata. + recalculate=not empty, + update_modified_at=not empty, + ), + ) + + # Commit updates to the metadata + if updated: + await session.commit() + + return empty + + async def _get_state(self, session: AsyncSession) -> RequestQueueStateDb: + """Get the current state of the request queue.""" + orm_state: RequestQueueStateDb | None = await session.get(RequestQueueStateDb, self._id) + if not orm_state: + insert_values = {'metadata_id': self._id} + # Create a new state if it doesn't exist + # This is a safeguard against race conditions where multiple clients might try to create the state + # simultaneously. + insert_stmt = self.build_insert_stmt_with_ignore(RequestQueueStateDb, insert_values) + await session.execute(insert_stmt) + await session.flush() + orm_state = await session.get(RequestQueueStateDb, self._id) + if not orm_state: + raise RuntimeError(f'Failed to create or retrieve state for queue {self._id}') + return orm_state + + def _specific_update_metadata( + self, + new_handled_request_count: int | None = None, + new_pending_request_count: int | None = None, + new_total_request_count: int | None = None, + delta_handled_request_count: int | None = None, + delta_pending_request_count: int | None = None, + *, + recalculate: bool = False, + update_had_multiple_clients: bool = False, + **_kwargs: dict[str, Any], + ) -> dict[str, Any]: + """Update the request queue metadata in the database. + + Args: + session: The SQLAlchemy session to use for database operations. + new_handled_request_count: If provided, update the handled_request_count to this value. + new_pending_request_count: If provided, update the pending_request_count to this value. + new_total_request_count: If provided, update the total_request_count to this value. + delta_handled_request_count: If provided, add this value to the handled_request_count. + delta_pending_request_count: If provided, add this value to the pending_request_count. + recalculate: If True, recalculate the pending_request_count, and total_request_count on request table. + update_had_multiple_clients: If True, set had_multiple_clients to True. + """ + values_to_set: dict[str, Any] = {} + + if update_had_multiple_clients: + values_to_set['had_multiple_clients'] = True + + if new_handled_request_count is not None: + values_to_set['handled_request_count'] = new_handled_request_count + elif delta_handled_request_count is not None: + values_to_set['handled_request_count'] = ( + self._METADATA_TABLE.handled_request_count + delta_handled_request_count + ) + + if new_pending_request_count is not None: + values_to_set['pending_request_count'] = new_pending_request_count + elif delta_pending_request_count is not None: + values_to_set['pending_request_count'] = ( + self._METADATA_TABLE.pending_request_count + delta_pending_request_count + ) + + if new_total_request_count is not None: + values_to_set['total_request_count'] = new_total_request_count + + if recalculate: + stmt = ( + update(self._METADATA_TABLE) + .where(self._METADATA_TABLE.id == self._id) + .values( + pending_request_count=( + select(func.count()) + .select_from(self._ITEM_TABLE) + .where(self._ITEM_TABLE.metadata_id == self._id, self._ITEM_TABLE.is_handled.is_(False)) + .scalar_subquery() + ), + total_request_count=( + select(func.count()) + .select_from(self._ITEM_TABLE) + .where(self._ITEM_TABLE.metadata_id == self._id) + .scalar_subquery() + ), + handled_request_count=( + select(func.count()) + .select_from(self._ITEM_TABLE) + .where(self._ITEM_TABLE.metadata_id == self._id, self._ITEM_TABLE.is_handled.is_(True)) + .scalar_subquery() + ), + ) + ) + + values_to_set['custom_stmt'] = stmt + + return values_to_set + + @staticmethod + def _get_int_id_from_unique_key(unique_key: str) -> int: + """Generate a deterministic integer ID for a unique_key. + + Args: + unique_key: Unique key to be used to generate ID. + + Returns: + An integer ID based on the unique_key. + """ + hashed_key = sha256(unique_key.encode('utf-8')).hexdigest() + name_length = 15 + return int(hashed_key[:name_length], 16) diff --git a/src/crawlee/storage_clients/_sql/_storage_client.py b/src/crawlee/storage_clients/_sql/_storage_client.py new file mode 100644 index 0000000000..308c8e5c57 --- /dev/null +++ b/src/crawlee/storage_clients/_sql/_storage_client.py @@ -0,0 +1,305 @@ +from __future__ import annotations + +import warnings +from datetime import timedelta +from pathlib import Path +from typing import TYPE_CHECKING + +from sqlalchemy.exc import IntegrityError, OperationalError +from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine +from sqlalchemy.sql import insert, select, text +from typing_extensions import override + +from crawlee._utils.docs import docs_group +from crawlee.configuration import Configuration +from crawlee.storage_clients._base import StorageClient + +from ._dataset_client import SqlDatasetClient +from ._db_models import Base, VersionDb +from ._key_value_store_client import SqlKeyValueStoreClient +from ._request_queue_client import SqlRequestQueueClient + +if TYPE_CHECKING: + from types import TracebackType + + from sqlalchemy.ext.asyncio import AsyncSession + + +@docs_group('Storage clients') +class SqlStorageClient(StorageClient): + """SQL implementation of the storage client. + + This storage client provides access to datasets, key-value stores, and request queues that persist data + to a SQL database using SQLAlchemy 2+. Each storage type uses two tables: one for metadata and one for + records. + + The client accepts either a database connection string or a pre-configured AsyncEngine. If neither is + provided, it creates a default SQLite database 'crawlee.db' in the storage directory. + + Database schema is automatically created during initialization. SQLite databases receive performance + optimizations including WAL mode and increased cache size. + + Warning: + This is an experimental feature. The behavior and interface may change in future versions. + """ + + _DEFAULT_DB_NAME = 'crawlee.db' + """Default database name if not specified in connection string.""" + + def __init__( + self, + *, + connection_string: str | None = None, + engine: AsyncEngine | None = None, + ) -> None: + """Initialize the SQL storage client. + + Args: + connection_string: Database connection string (e.g., "sqlite+aiosqlite:///crawlee.db"). + If not provided, defaults to SQLite database in the storage directory. + engine: Pre-configured AsyncEngine instance. If provided, connection_string is ignored. + """ + if engine is not None and connection_string is not None: + raise ValueError('Either connection_string or engine must be provided, not both.') + + self._connection_string = connection_string + self._engine = engine + self._initialized = False + self.session_maker: None | async_sessionmaker[AsyncSession] = None + + # Minimum interval to reduce database load from frequent concurrent metadata updates + self._accessed_modified_update_interval = timedelta(seconds=1) + + # Flag needed to apply optimizations only for default database + self._default_flag = self._engine is None and self._connection_string is None + self._dialect_name: str | None = None + + # Call the notification only once + warnings.warn( + 'The SqlStorageClient is experimental and may change or be removed in future releases.', + category=UserWarning, + stacklevel=2, + ) + + async def __aenter__(self) -> SqlStorageClient: + """Async context manager entry.""" + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + exc_traceback: TracebackType | None, + ) -> None: + """Async context manager exit.""" + await self.close() + + @property + def engine(self) -> AsyncEngine: + """Get the SQLAlchemy AsyncEngine instance.""" + if self._engine is None: + raise ValueError('Engine is not initialized. Call initialize() before accessing the engine.') + return self._engine + + def get_dialect_name(self) -> str | None: + """Get the database dialect name.""" + return self._dialect_name + + def get_accessed_modified_update_interval(self) -> timedelta: + """Get the interval for accessed and modified updates.""" + return self._accessed_modified_update_interval + + async def initialize(self, configuration: Configuration) -> None: + """Initialize the database schema. + + This method creates all necessary tables if they don't exist. + Should be called before using the storage client. + """ + if not self._initialized: + engine = self._get_or_create_engine(configuration) + async with engine.begin() as conn: + self._dialect_name = engine.dialect.name + + if self._dialect_name not in ('sqlite', 'postgresql'): + raise ValueError( + f'Unsupported database dialect: {self._dialect_name}. Supported: sqlite, postgresql. ' + 'Consider using a different database.', + ) + + # Create tables if they don't exist. + # Rollback the transaction when an exception occurs. + # This is likely an attempt to create a database from several parallel processes. + try: + # Set SQLite pragmas for performance and consistency + if self._default_flag: + await conn.execute(text('PRAGMA journal_mode=WAL')) # Better concurrency + await conn.execute(text('PRAGMA synchronous=NORMAL')) # Balanced safety/speed + await conn.execute(text('PRAGMA cache_size=100000')) # 100MB cache + await conn.execute(text('PRAGMA temp_store=MEMORY')) # Memory temp storage + await conn.execute(text('PRAGMA mmap_size=268435456')) # 256MB memory mapping + await conn.execute(text('PRAGMA foreign_keys=ON')) # Enforce constraints + await conn.execute(text('PRAGMA busy_timeout=30000')) # 30s busy timeout + + await conn.run_sync(Base.metadata.create_all, checkfirst=True) + + from crawlee import __version__ # Noqa: PLC0415 + + db_version = (await conn.execute(select(VersionDb))).scalar_one_or_none() + + # Raise an error if the new version creates breaking changes in the database schema. + if db_version and db_version != __version__: + warnings.warn( + f'Database version {db_version.version} does not match library version {__version__}. ' + 'This may lead to unexpected behavior.', + category=UserWarning, + stacklevel=2, + ) + elif not db_version: + await conn.execute(insert(VersionDb).values(version=__version__)) + + except (IntegrityError, OperationalError): + await conn.rollback() + + self._initialized = True + + async def close(self) -> None: + """Close the database connection pool.""" + if self._engine is not None: + await self._engine.dispose() + self._engine = None + + def create_session(self) -> AsyncSession: + """Create a new database session. + + Returns: + A new AsyncSession instance. + """ + if self.session_maker is None: + self.session_maker = async_sessionmaker(self._engine, expire_on_commit=False, autoflush=False) + return self.session_maker() + + @override + async def create_dataset_client( + self, + *, + id: str | None = None, + name: str | None = None, + configuration: Configuration | None = None, + ) -> SqlDatasetClient: + """Create or open a SQL dataset client. + + Args: + id: Specific dataset ID to open. If provided, name is ignored. + name: Dataset name to open or create. Uses 'default' if not specified. + configuration: Configuration object. Uses global config if not provided. + + Returns: + Configured dataset client ready for use. + """ + configuration = configuration or Configuration.get_global_configuration() + await self.initialize(configuration) + + client = await SqlDatasetClient.open( + id=id, + name=name, + storage_client=self, + ) + + await self._purge_if_needed(client, configuration) + return client + + @override + async def create_kvs_client( + self, + *, + id: str | None = None, + name: str | None = None, + configuration: Configuration | None = None, + ) -> SqlKeyValueStoreClient: + """Create or open a SQL key-value store client. + + Args: + id: Specific store ID to open. If provided, name is ignored. + name: Store name to open or create. Uses 'default' if not specified. + configuration: Configuration object. Uses global config if not provided. + + Returns: + Configured key-value store client ready for use. + """ + configuration = configuration or Configuration.get_global_configuration() + await self.initialize(configuration) + + client = await SqlKeyValueStoreClient.open( + id=id, + name=name, + storage_client=self, + ) + + await self._purge_if_needed(client, configuration) + return client + + @override + async def create_rq_client( + self, + *, + id: str | None = None, + name: str | None = None, + configuration: Configuration | None = None, + ) -> SqlRequestQueueClient: + """Create or open a SQL request queue client. + + Args: + id: Specific queue ID to open. If provided, name is ignored. + name: Queue name to open or create. Uses 'default' if not specified. + configuration: Configuration object. Uses global config if not provided. + + Returns: + Configured request queue client ready for use. + """ + configuration = configuration or Configuration.get_global_configuration() + await self.initialize(configuration) + + client = await SqlRequestQueueClient.open( + id=id, + name=name, + storage_client=self, + ) + + await self._purge_if_needed(client, configuration) + return client + + def _get_or_create_engine(self, configuration: Configuration) -> AsyncEngine: + """Get or create the database engine based on configuration.""" + if self._engine is not None: + return self._engine + + if self._connection_string is not None: + connection_string = self._connection_string + else: + # Create SQLite database in the storage directory + storage_dir = Path(configuration.storage_dir) + if not storage_dir.exists(): + storage_dir.mkdir(parents=True, exist_ok=True) + + db_path = storage_dir / self._DEFAULT_DB_NAME + + # Create connection string with path to default database + connection_string = f'sqlite+aiosqlite:///{db_path}' + + if 'sqlite' not in connection_string and 'postgresql' not in connection_string: + raise ValueError( + 'Unsupported database. Supported: sqlite, postgresql. Consider using a different database.' + ) + + self._engine = create_async_engine( + connection_string, + future=True, + pool_size=5, + max_overflow=10, + pool_timeout=30, + pool_recycle=600, + pool_pre_ping=True, + echo=False, + connect_args={'timeout': 30}, + ) + return self._engine diff --git a/src/crawlee/storage_clients/_sql/py.typed b/src/crawlee/storage_clients/_sql/py.typed new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/crawlee/storage_clients/models.py b/src/crawlee/storage_clients/models.py index bac8a4baed..6934408d15 100644 --- a/src/crawlee/storage_clients/models.py +++ b/src/crawlee/storage_clients/models.py @@ -20,7 +20,7 @@ class StorageMetadata(BaseModel): It contains common fields shared across all specific storage types. """ - model_config = ConfigDict(populate_by_name=True, extra='allow') + model_config = ConfigDict(populate_by_name=True, extra='allow', from_attributes=True) id: Annotated[str, Field(alias='id')] """The unique identifier of the storage.""" @@ -42,7 +42,7 @@ class StorageMetadata(BaseModel): class DatasetMetadata(StorageMetadata): """Model for a dataset metadata.""" - model_config = ConfigDict(populate_by_name=True) + model_config = ConfigDict(populate_by_name=True, from_attributes=True) item_count: Annotated[int, Field(alias='itemCount')] """The number of items in the dataset.""" @@ -52,14 +52,14 @@ class DatasetMetadata(StorageMetadata): class KeyValueStoreMetadata(StorageMetadata): """Model for a key-value store metadata.""" - model_config = ConfigDict(populate_by_name=True) + model_config = ConfigDict(populate_by_name=True, from_attributes=True) @docs_group('Storage data') class RequestQueueMetadata(StorageMetadata): """Model for a request queue metadata.""" - model_config = ConfigDict(populate_by_name=True) + model_config = ConfigDict(populate_by_name=True, from_attributes=True) had_multiple_clients: Annotated[bool, Field(alias='hadMultipleClients')] """Indicates whether the queue has been accessed by multiple clients (consumers).""" @@ -78,7 +78,7 @@ class RequestQueueMetadata(StorageMetadata): class KeyValueStoreRecordMetadata(BaseModel): """Model for a key-value store record metadata.""" - model_config = ConfigDict(populate_by_name=True) + model_config = ConfigDict(populate_by_name=True, from_attributes=True) key: Annotated[str, Field(alias='key')] """The key of the record. @@ -100,7 +100,7 @@ class KeyValueStoreRecordMetadata(BaseModel): class KeyValueStoreRecord(KeyValueStoreRecordMetadata, Generic[KvsValueType]): """Model for a key-value store record.""" - model_config = ConfigDict(populate_by_name=True) + model_config = ConfigDict(populate_by_name=True, from_attributes=True) value: Annotated[KvsValueType, Field(alias='value')] """The value of the record.""" @@ -110,7 +110,7 @@ class KeyValueStoreRecord(KeyValueStoreRecordMetadata, Generic[KvsValueType]): class DatasetItemsListPage(BaseModel): """Model for a single page of dataset items returned from a collection list method.""" - model_config = ConfigDict(populate_by_name=True) + model_config = ConfigDict(populate_by_name=True, from_attributes=True) count: Annotated[int, Field(default=0)] """The number of objects returned on this page.""" @@ -135,7 +135,7 @@ class DatasetItemsListPage(BaseModel): class ProcessedRequest(BaseModel): """Represents a processed request.""" - model_config = ConfigDict(populate_by_name=True) + model_config = ConfigDict(populate_by_name=True, from_attributes=True) id: Annotated[str | None, Field(alias='requestId', default=None)] = None """Internal representation of the request by the storage client. Only some clients use id.""" @@ -149,7 +149,7 @@ class ProcessedRequest(BaseModel): class UnprocessedRequest(BaseModel): """Represents an unprocessed request.""" - model_config = ConfigDict(populate_by_name=True) + model_config = ConfigDict(populate_by_name=True, from_attributes=True) unique_key: Annotated[str, Field(alias='uniqueKey')] url: Annotated[str, BeforeValidator(validate_http_url), Field()] @@ -165,7 +165,7 @@ class AddRequestsResponse(BaseModel): encountered issues during processing. """ - model_config = ConfigDict(populate_by_name=True) + model_config = ConfigDict(populate_by_name=True, from_attributes=True) processed_requests: Annotated[list[ProcessedRequest], Field(alias='processedRequests')] """Successfully processed requests, including information about whether they were diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index e57f190bc3..42cdd636d4 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -5,6 +5,7 @@ import logging import os +import warnings from typing import TYPE_CHECKING, cast import pytest @@ -28,6 +29,14 @@ from crawlee.http_clients._base import HttpClient +@pytest.fixture +async def suppose_user_warning() -> AsyncGenerator[None, None]: + """Suppress user warnings during tests.""" + with warnings.catch_warnings(): + warnings.simplefilter('ignore', UserWarning) + yield + + @pytest.fixture def prepare_test_env(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> Callable[[], None]: """Prepare the testing environment by resetting the global state before each test. diff --git a/tests/unit/storage_clients/_sql/test_sql_dataset_client.py b/tests/unit/storage_clients/_sql/test_sql_dataset_client.py new file mode 100644 index 0000000000..13cd71eb16 --- /dev/null +++ b/tests/unit/storage_clients/_sql/test_sql_dataset_client.py @@ -0,0 +1,241 @@ +from __future__ import annotations + +import asyncio +from datetime import timedelta +from typing import TYPE_CHECKING + +import pytest +from sqlalchemy import inspect, select +from sqlalchemy.ext.asyncio import create_async_engine + +from crawlee.configuration import Configuration +from crawlee.storage_clients import SqlStorageClient +from crawlee.storage_clients._sql._db_models import DatasetItemDb, DatasetMetadataDb + +if TYPE_CHECKING: + from collections.abc import AsyncGenerator + from pathlib import Path + + from sqlalchemy import Connection + + from crawlee.storage_clients._sql import SqlDatasetClient + + +@pytest.fixture +def configuration(tmp_path: Path) -> Configuration: + """Temporary configuration for tests.""" + return Configuration( + crawlee_storage_dir=str(tmp_path), # type: ignore[call-arg] + ) + + +# Helper function that allows you to use inspect with an asynchronous engine +def get_tables(sync_conn: Connection) -> list[str]: + inspector = inspect(sync_conn) + return inspector.get_table_names() + + +@pytest.fixture +async def dataset_client( + configuration: Configuration, + monkeypatch: pytest.MonkeyPatch, + suppose_user_warning: None, # noqa: ARG001 +) -> AsyncGenerator[SqlDatasetClient, None]: + """A fixture for a SQL dataset client.""" + async with SqlStorageClient() as storage_client: + monkeypatch.setattr(storage_client, '_accessed_modified_update_interval', timedelta(seconds=0)) + client = await storage_client.create_dataset_client( + name='test_dataset', + configuration=configuration, + ) + yield client + await client.drop() + + +@pytest.mark.usefixtures('suppose_user_warning') +async def test_create_tables_with_connection_string(configuration: Configuration, tmp_path: Path) -> None: + """Test that SQL dataset client creates tables with a connection string.""" + storage_dir = tmp_path / 'test_table.db' + + async with SqlStorageClient(connection_string=f'sqlite+aiosqlite:///{storage_dir}') as storage_client: + await storage_client.create_dataset_client( + name='new_dataset', + configuration=configuration, + ) + + async with storage_client.engine.begin() as conn: + tables = await conn.run_sync(get_tables) + assert 'dataset_records' in tables + assert 'datasets' in tables + + +@pytest.mark.usefixtures('suppose_user_warning') +async def test_create_tables_with_engine(configuration: Configuration, tmp_path: Path) -> None: + """Test that SQL dataset client creates tables with a pre-configured engine.""" + storage_dir = tmp_path / 'test_table.db' + + engine = create_async_engine(f'sqlite+aiosqlite:///{storage_dir}', future=True, echo=False) + + async with SqlStorageClient(engine=engine) as storage_client: + await storage_client.create_dataset_client( + name='new_dataset', + configuration=configuration, + ) + + async with engine.begin() as conn: + tables = await conn.run_sync(get_tables) + assert 'dataset_records' in tables + assert 'datasets' in tables + + +@pytest.mark.usefixtures('suppose_user_warning') +async def test_tables_and_metadata_record(configuration: Configuration) -> None: + """Test that SQL dataset creates proper tables and metadata records.""" + async with SqlStorageClient() as storage_client: + client = await storage_client.create_dataset_client( + name='new_dataset', + configuration=configuration, + ) + + client_metadata = await client.get_metadata() + + async with storage_client.engine.begin() as conn: + tables = await conn.run_sync(get_tables) + assert 'dataset_records' in tables + assert 'datasets' in tables + + async with client.get_session() as session: + stmt = select(DatasetMetadataDb).where(DatasetMetadataDb.name == 'new_dataset') + result = await session.execute(stmt) + orm_metadata = result.scalar_one_or_none() + assert orm_metadata is not None + assert orm_metadata.id == client_metadata.id + assert orm_metadata.name == 'new_dataset' + assert orm_metadata.item_count == 0 + + await client.drop() + + +async def test_record_and_content_verification(dataset_client: SqlDatasetClient) -> None: + """Test that dataset client can push data and verify its content.""" + item = {'key': 'value', 'number': 42} + await dataset_client.push_data(item) + + # Verify metadata record + metadata = await dataset_client.get_metadata() + assert metadata.item_count == 1 + assert metadata.created_at is not None + assert metadata.modified_at is not None + assert metadata.accessed_at is not None + + async with dataset_client.get_session() as session: + stmt = select(DatasetItemDb).where(DatasetItemDb.metadata_id == metadata.id) + result = await session.execute(stmt) + records = result.scalars().all() + assert len(records) == 1 + saved_item = records[0].data + assert saved_item == item + + # Test pushing multiple items and verify total count + items = [{'id': 1, 'name': 'Item 1'}, {'id': 2, 'name': 'Item 2'}, {'id': 3, 'name': 'Item 3'}] + await dataset_client.push_data(items) + + async with dataset_client.get_session() as session: + stmt = select(DatasetItemDb).where(DatasetItemDb.metadata_id == metadata.id) + result = await session.execute(stmt) + records = result.scalars().all() + assert len(records) == 4 + + +async def test_drop_removes_records(dataset_client: SqlDatasetClient) -> None: + """Test that dropping a dataset removes all records from the database.""" + await dataset_client.push_data({'test': 'data'}) + + client_metadata = await dataset_client.get_metadata() + + async with dataset_client.get_session() as session: + stmt = select(DatasetItemDb).where(DatasetItemDb.metadata_id == client_metadata.id) + result = await session.execute(stmt) + records = result.scalars().all() + assert len(records) == 1 + + # Drop the dataset + await dataset_client.drop() + + async with dataset_client.get_session() as session: + stmt = select(DatasetItemDb).where(DatasetItemDb.metadata_id == client_metadata.id) + result = await session.execute(stmt) + records = result.scalars().all() + assert len(records) == 0 + metadata = await session.get(DatasetMetadataDb, client_metadata.id) + assert metadata is None + + +async def test_metadata_record_updates(dataset_client: SqlDatasetClient) -> None: + """Test that metadata record is updated correctly after operations.""" + # Record initial timestamps + metadata = await dataset_client.get_metadata() + initial_created = metadata.created_at + initial_accessed = metadata.accessed_at + initial_modified = metadata.modified_at + + # Wait a moment to ensure timestamps can change + await asyncio.sleep(0.01) + + # Perform an operation that updates accessed_at + await dataset_client.get_data() + + # Verify timestamps + metadata = await dataset_client.get_metadata() + assert metadata.created_at == initial_created + assert metadata.accessed_at > initial_accessed + assert metadata.modified_at == initial_modified + + accessed_after_get = metadata.accessed_at + + # Wait a moment to ensure timestamps can change + await asyncio.sleep(0.01) + + # Perform an operation that updates modified_at + await dataset_client.push_data({'new': 'item'}) + + # Verify timestamps again + metadata = await dataset_client.get_metadata() + assert metadata.created_at == initial_created + assert metadata.modified_at > initial_modified + assert metadata.accessed_at > accessed_after_get + + # Verify metadata record is updated in db + async with dataset_client.get_session() as session: + orm_metadata = await session.get(DatasetMetadataDb, metadata.id) + assert orm_metadata is not None + orm_metadata.item_count = 1 + assert orm_metadata.created_at == initial_created + assert orm_metadata.accessed_at == metadata.accessed_at + assert orm_metadata.modified_at == metadata.modified_at + + +@pytest.mark.usefixtures('suppose_user_warning') +async def test_data_persistence_across_reopens(configuration: Configuration) -> None: + """Test that data persists correctly when reopening the same dataset.""" + async with SqlStorageClient() as storage_client: + original_client = await storage_client.create_dataset_client( + name='persistence-test', + configuration=configuration, + ) + + test_data = {'test_item': 'test_value', 'id': 123} + await original_client.push_data(test_data) + + dataset_id = (await original_client.get_metadata()).id + + reopened_client = await storage_client.create_dataset_client( + id=dataset_id, + configuration=configuration, + ) + + data = await reopened_client.get_data() + assert len(data.items) == 1 + assert data.items[0] == test_data + + await reopened_client.drop() diff --git a/tests/unit/storage_clients/_sql/test_sql_kvs_client.py b/tests/unit/storage_clients/_sql/test_sql_kvs_client.py new file mode 100644 index 0000000000..0287cd07bf --- /dev/null +++ b/tests/unit/storage_clients/_sql/test_sql_kvs_client.py @@ -0,0 +1,292 @@ +from __future__ import annotations + +import asyncio +import json +from datetime import timedelta +from typing import TYPE_CHECKING + +import pytest +from sqlalchemy import inspect, select +from sqlalchemy.ext.asyncio import create_async_engine + +from crawlee.configuration import Configuration +from crawlee.storage_clients import SqlStorageClient +from crawlee.storage_clients._sql._db_models import KeyValueStoreMetadataDb, KeyValueStoreRecordDb +from crawlee.storage_clients.models import KeyValueStoreMetadata + +if TYPE_CHECKING: + from collections.abc import AsyncGenerator + from pathlib import Path + + from sqlalchemy import Connection + + from crawlee.storage_clients._sql import SqlKeyValueStoreClient + + +@pytest.fixture +def configuration(tmp_path: Path) -> Configuration: + """Temporary configuration for tests.""" + return Configuration( + crawlee_storage_dir=str(tmp_path), # type: ignore[call-arg] + ) + + +@pytest.fixture +async def kvs_client( + configuration: Configuration, + monkeypatch: pytest.MonkeyPatch, + suppose_user_warning: None, # noqa: ARG001 +) -> AsyncGenerator[SqlKeyValueStoreClient, None]: + """A fixture for a SQL key-value store client.""" + async with SqlStorageClient() as storage_client: + monkeypatch.setattr(storage_client, '_accessed_modified_update_interval', timedelta(seconds=0)) + client = await storage_client.create_kvs_client( + name='test_kvs', + configuration=configuration, + ) + monkeypatch.setattr(client, '_accessed_modified_update_interval', timedelta(seconds=0)) + yield client + await client.drop() + + +# Helper function that allows you to use inspect with an asynchronous engine +def get_tables(sync_conn: Connection) -> list[str]: + inspector = inspect(sync_conn) + return inspector.get_table_names() + + +@pytest.mark.usefixtures('suppose_user_warning') +async def test_create_tables_with_connection_string(configuration: Configuration, tmp_path: Path) -> None: + """Test that SQL key-value store client creates tables with a connection string.""" + storage_dir = tmp_path / 'test_table.db' + + async with SqlStorageClient(connection_string=f'sqlite+aiosqlite:///{storage_dir}') as storage_client: + await storage_client.create_kvs_client( + name='new_kvs', + configuration=configuration, + ) + + async with storage_client.engine.begin() as conn: + tables = await conn.run_sync(get_tables) + assert 'key_value_stores' in tables + assert 'key_value_store_records' in tables + + +@pytest.mark.usefixtures('suppose_user_warning') +async def test_create_tables_with_engine(configuration: Configuration, tmp_path: Path) -> None: + """Test that SQL key-value store client creates tables with a pre-configured engine.""" + storage_dir = tmp_path / 'test_table.db' + + engine = create_async_engine(f'sqlite+aiosqlite:///{storage_dir}', future=True, echo=False) + + async with SqlStorageClient(engine=engine) as storage_client: + await storage_client.create_kvs_client( + name='new_kvs', + configuration=configuration, + ) + + async with engine.begin() as conn: + tables = await conn.run_sync(get_tables) + assert 'key_value_stores' in tables + assert 'key_value_store_records' in tables + + +@pytest.mark.usefixtures('suppose_user_warning') +async def test_tables_and_metadata_record(configuration: Configuration) -> None: + """Test that SQL key-value store creates proper tables and metadata records.""" + async with SqlStorageClient() as storage_client: + client = await storage_client.create_kvs_client( + name='new_kvs', + configuration=configuration, + ) + + client_metadata = await client.get_metadata() + + async with storage_client.engine.begin() as conn: + tables = await conn.run_sync(get_tables) + assert 'key_value_stores' in tables + assert 'key_value_store_records' in tables + + async with client.get_session() as session: + stmt = select(KeyValueStoreMetadataDb).where(KeyValueStoreMetadataDb.name == 'new_kvs') + result = await session.execute(stmt) + orm_metadata = result.scalar_one_or_none() + metadata = KeyValueStoreMetadata.model_validate(orm_metadata) + assert metadata.id == client_metadata.id + assert metadata.name == 'new_kvs' + + await client.drop() + + +async def test_value_record_creation(kvs_client: SqlKeyValueStoreClient) -> None: + """Test that SQL key-value store client can create a record.""" + test_key = 'test-key' + test_value = 'Hello, world!' + await kvs_client.set_value(key=test_key, value=test_value) + async with kvs_client.get_session() as session: + stmt = select(KeyValueStoreRecordDb).where(KeyValueStoreRecordDb.key == test_key) + result = await session.execute(stmt) + record = result.scalar_one_or_none() + assert record is not None + assert record.key == test_key + assert record.content_type == 'text/plain; charset=utf-8' + assert record.size == len(test_value.encode('utf-8')) + assert record.value == test_value.encode('utf-8') + + +async def test_binary_data_persistence(kvs_client: SqlKeyValueStoreClient) -> None: + """Test that binary data is stored correctly without corruption.""" + test_key = 'test-binary' + test_value = b'\x00\x01\x02\x03\x04' + await kvs_client.set_value(key=test_key, value=test_value) + + async with kvs_client.get_session() as session: + stmt = select(KeyValueStoreRecordDb).where(KeyValueStoreRecordDb.key == test_key) + result = await session.execute(stmt) + record = result.scalar_one_or_none() + assert record is not None + assert record.key == test_key + assert record.content_type == 'application/octet-stream' + assert record.size == len(test_value) + assert record.value == test_value + + verify_record = await kvs_client.get_value(key=test_key) + assert verify_record is not None + assert verify_record.value == test_value + assert verify_record.content_type == 'application/octet-stream' + + +async def test_json_serialization_to_record(kvs_client: SqlKeyValueStoreClient) -> None: + """Test that JSON objects are properly serialized to records.""" + test_key = 'test-json' + test_value = {'name': 'John', 'age': 30, 'items': [1, 2, 3]} + await kvs_client.set_value(key=test_key, value=test_value) + + async with kvs_client.get_session() as session: + stmt = select(KeyValueStoreRecordDb).where(KeyValueStoreRecordDb.key == test_key) + result = await session.execute(stmt) + record = result.scalar_one_or_none() + assert record is not None + assert record.key == test_key + assert json.loads(record.value.decode('utf-8')) == test_value + + +async def test_record_deletion_on_value_delete(kvs_client: SqlKeyValueStoreClient) -> None: + """Test that deleting a value removes its record from the database.""" + test_key = 'test-delete' + test_value = 'Delete me' + + # Set a value + await kvs_client.set_value(key=test_key, value=test_value) + + async with kvs_client.get_session() as session: + stmt = select(KeyValueStoreRecordDb).where(KeyValueStoreRecordDb.key == test_key) + result = await session.execute(stmt) + record = result.scalar_one_or_none() + assert record is not None + assert record.key == test_key + assert record.value == test_value.encode('utf-8') + + # Delete the value + await kvs_client.delete_value(key=test_key) + + # Verify record was deleted + async with kvs_client.get_session() as session: + stmt = select(KeyValueStoreRecordDb).where(KeyValueStoreRecordDb.key == test_key) + result = await session.execute(stmt) + record = result.scalar_one_or_none() + assert record is None + + +async def test_drop_removes_records(kvs_client: SqlKeyValueStoreClient) -> None: + """Test that drop removes all records from the database.""" + await kvs_client.set_value(key='test', value='test-value') + + client_metadata = await kvs_client.get_metadata() + + async with kvs_client.get_session() as session: + stmt = select(KeyValueStoreRecordDb).where(KeyValueStoreRecordDb.key == 'test') + result = await session.execute(stmt) + record = result.scalar_one_or_none() + assert record is not None + + # Drop the store + await kvs_client.drop() + + async with kvs_client.get_session() as session: + stmt = select(KeyValueStoreRecordDb).where(KeyValueStoreRecordDb.key == 'test') + result = await session.execute(stmt) + record = result.scalar_one_or_none() + assert record is None + metadata = await session.get(KeyValueStoreMetadataDb, client_metadata.id) + assert metadata is None + + +async def test_metadata_record_updates(kvs_client: SqlKeyValueStoreClient) -> None: + """Test that read/write operations properly update metadata record timestamps.""" + # Record initial timestamps + metadata = await kvs_client.get_metadata() + initial_created = metadata.created_at + initial_accessed = metadata.accessed_at + initial_modified = metadata.modified_at + + # Wait a moment to ensure timestamps can change + await asyncio.sleep(0.01) + + # Perform a read operation + await kvs_client.get_value(key='nonexistent') + + # Verify accessed timestamp was updated + metadata = await kvs_client.get_metadata() + assert metadata.created_at == initial_created + assert metadata.accessed_at > initial_accessed + assert metadata.modified_at == initial_modified + + accessed_after_read = metadata.accessed_at + + # Wait a moment to ensure timestamps can change + await asyncio.sleep(0.01) + + # Perform a write operation + await kvs_client.set_value(key='test', value='test-value') + + # Verify modified timestamp was updated + metadata = await kvs_client.get_metadata() + assert metadata.created_at == initial_created + assert metadata.modified_at > initial_modified + assert metadata.accessed_at > accessed_after_read + + async with kvs_client.get_session() as session: + orm_metadata = await session.get(KeyValueStoreMetadataDb, metadata.id) + assert orm_metadata is not None + assert orm_metadata.created_at == metadata.created_at + assert orm_metadata.accessed_at == metadata.accessed_at + assert orm_metadata.modified_at == metadata.modified_at + + +@pytest.mark.usefixtures('suppose_user_warning') +async def test_data_persistence_across_reopens(configuration: Configuration) -> None: + """Test that data persists correctly when reopening the same key-value store.""" + async with SqlStorageClient() as storage_client: + original_client = await storage_client.create_kvs_client( + name='persistence-test', + configuration=configuration, + ) + + test_key = 'persistent-key' + test_value = 'persistent-value' + await original_client.set_value(key=test_key, value=test_value) + + kvs_id = (await original_client.get_metadata()).id + + # Reopen by ID and verify data persists + reopened_client = await storage_client.create_kvs_client( + id=kvs_id, + configuration=configuration, + ) + + record = await reopened_client.get_value(key=test_key) + assert record is not None + assert record.value == test_value + + await reopened_client.drop() diff --git a/tests/unit/storage_clients/_sql/test_sql_rq_client.py b/tests/unit/storage_clients/_sql/test_sql_rq_client.py new file mode 100644 index 0000000000..d052f31c83 --- /dev/null +++ b/tests/unit/storage_clients/_sql/test_sql_rq_client.py @@ -0,0 +1,244 @@ +from __future__ import annotations + +import asyncio +import json +from datetime import timedelta +from typing import TYPE_CHECKING + +import pytest +from sqlalchemy import inspect, select +from sqlalchemy.ext.asyncio import create_async_engine + +from crawlee import Request +from crawlee.configuration import Configuration +from crawlee.storage_clients import SqlStorageClient +from crawlee.storage_clients._sql._db_models import RequestDb, RequestQueueMetadataDb +from crawlee.storage_clients.models import RequestQueueMetadata + +if TYPE_CHECKING: + from collections.abc import AsyncGenerator + from pathlib import Path + + from sqlalchemy import Connection + + from crawlee.storage_clients._sql import SqlRequestQueueClient + + +@pytest.fixture +def configuration(tmp_path: Path) -> Configuration: + """Temporary configuration for tests.""" + return Configuration( + crawlee_storage_dir=str(tmp_path), # type: ignore[call-arg] + ) + + +@pytest.fixture +async def rq_client( + configuration: Configuration, + monkeypatch: pytest.MonkeyPatch, + suppose_user_warning: None, # noqa: ARG001 +) -> AsyncGenerator[SqlRequestQueueClient, None]: + """A fixture for a SQL request queue client.""" + async with SqlStorageClient() as storage_client: + monkeypatch.setattr(storage_client, '_accessed_modified_update_interval', timedelta(seconds=0)) + client = await storage_client.create_rq_client( + name='test_request_queue', + configuration=configuration, + ) + monkeypatch.setattr(client, '_accessed_modified_update_interval', timedelta(seconds=0)) + yield client + await client.drop() + + +# Helper function that allows you to use inspect with an asynchronous engine +def get_tables(sync_conn: Connection) -> list[str]: + inspector = inspect(sync_conn) + return inspector.get_table_names() + + +@pytest.mark.usefixtures('suppose_user_warning') +async def test_create_tables_with_connection_string(configuration: Configuration, tmp_path: Path) -> None: + """Test that SQL request queue client creates tables with a connection string.""" + storage_dir = tmp_path / 'test_table.db' + + async with SqlStorageClient(connection_string=f'sqlite+aiosqlite:///{storage_dir}') as storage_client: + await storage_client.create_rq_client( + name='test_request_queue', + configuration=configuration, + ) + + async with storage_client.engine.begin() as conn: + tables = await conn.run_sync(get_tables) + assert 'request_queues' in tables + assert 'request_queue_records' in tables + assert 'request_queue_state' in tables + + +@pytest.mark.usefixtures('suppose_user_warning') +async def test_create_tables_with_engine(configuration: Configuration, tmp_path: Path) -> None: + """Test that SQL request queue client creates tables with a pre-configured engine.""" + storage_dir = tmp_path / 'test_table.db' + + engine = create_async_engine(f'sqlite+aiosqlite:///{storage_dir}', future=True, echo=False) + + async with SqlStorageClient(engine=engine) as storage_client: + await storage_client.create_rq_client( + name='test_request_queue', + configuration=configuration, + ) + + async with engine.begin() as conn: + tables = await conn.run_sync(get_tables) + assert 'request_queues' in tables + assert 'request_queue_records' in tables + assert 'request_queue_state' in tables + + +@pytest.mark.usefixtures('suppose_user_warning') +async def test_tables_and_metadata_record(configuration: Configuration) -> None: + """Test that SQL request queue creates proper tables and metadata records.""" + async with SqlStorageClient() as storage_client: + client = await storage_client.create_rq_client( + name='test_request_queue', + configuration=configuration, + ) + + client_metadata = await client.get_metadata() + + async with storage_client.engine.begin() as conn: + tables = await conn.run_sync(get_tables) + assert 'request_queues' in tables + assert 'request_queue_records' in tables + assert 'request_queue_state' in tables + + async with client.get_session() as session: + stmt = select(RequestQueueMetadataDb).where(RequestQueueMetadataDb.name == 'test_request_queue') + result = await session.execute(stmt) + orm_metadata = result.scalar_one_or_none() + metadata = RequestQueueMetadata.model_validate(orm_metadata) + assert metadata.id == client_metadata.id + assert metadata.name == 'test_request_queue' + + await client.drop() + + +async def test_request_records_persistence(rq_client: SqlRequestQueueClient) -> None: + """Test that all added requests are persisted and can be retrieved from the database.""" + requests = [ + Request.from_url('https://example.com/1'), + Request.from_url('https://example.com/2'), + Request.from_url('https://example.com/3'), + ] + + await rq_client.add_batch_of_requests(requests) + + metadata_client = await rq_client.get_metadata() + + async with rq_client.get_session() as session: + stmt = select(RequestDb).where(RequestDb.metadata_id == metadata_client.id) + result = await session.execute(stmt) + db_requests = result.scalars().all() + assert len(db_requests) == 3 + for db_request in db_requests: + request = json.loads(db_request.data) + assert request['url'] in ['https://example.com/1', 'https://example.com/2', 'https://example.com/3'] + + +async def test_drop_removes_records(rq_client: SqlRequestQueueClient) -> None: + """Test that drop removes all records from the database.""" + await rq_client.add_batch_of_requests([Request.from_url('https://example.com')]) + metadata = await rq_client.get_metadata() + async with rq_client.get_session() as session: + stmt = select(RequestDb).where(RequestDb.metadata_id == metadata.id) + result = await session.execute(stmt) + records = result.scalars().all() + assert len(records) == 1 + + await rq_client.drop() + + async with rq_client.get_session() as session: + stmt = select(RequestDb).where(RequestDb.metadata_id == metadata.id) + result = await session.execute(stmt) + records = result.scalars().all() + assert len(records) == 0 + db_metadata = await session.get(RequestQueueMetadataDb, metadata.id) + assert db_metadata is None + + +async def test_metadata_record_updates(rq_client: SqlRequestQueueClient) -> None: + """Test that metadata record updates correctly after operations.""" + # Record initial timestamps + metadata = await rq_client.get_metadata() + initial_created = metadata.created_at + initial_accessed = metadata.accessed_at + initial_modified = metadata.modified_at + + # Wait a moment to ensure timestamps can change + await asyncio.sleep(0.01) + + # Perform a read operation + await rq_client.is_empty() + + # Verify accessed timestamp was updated + metadata = await rq_client.get_metadata() + assert metadata.created_at == initial_created + assert metadata.accessed_at > initial_accessed + assert metadata.modified_at == initial_modified + + accessed_after_read = metadata.accessed_at + + # Wait a moment to ensure timestamps can change + await asyncio.sleep(0.01) + + # Perform a write operation + await rq_client.add_batch_of_requests([Request.from_url('https://example.com')]) + + # Verify modified timestamp was updated + metadata = await rq_client.get_metadata() + assert metadata.created_at == initial_created + assert metadata.modified_at > initial_modified + assert metadata.accessed_at > accessed_after_read + + async with rq_client.get_session() as session: + orm_metadata = await session.get(RequestQueueMetadataDb, metadata.id) + assert orm_metadata is not None + assert orm_metadata.created_at == metadata.created_at + assert orm_metadata.accessed_at == metadata.accessed_at + assert orm_metadata.modified_at == metadata.modified_at + + +@pytest.mark.usefixtures('suppose_user_warning') +async def test_data_persistence_across_reopens(configuration: Configuration) -> None: + """Test that data persists correctly when reopening the same request queue.""" + async with SqlStorageClient() as storage_client: + original_client = await storage_client.create_rq_client( + name='persistence-test', + configuration=configuration, + ) + + test_requests = [ + Request.from_url('https://example.com/1'), + Request.from_url('https://example.com/2'), + ] + await original_client.add_batch_of_requests(test_requests) + + rq_id = (await original_client.get_metadata()).id + + # Reopen by ID and verify data persists + reopened_client = await storage_client.create_rq_client( + id=rq_id, + configuration=configuration, + ) + + metadata = await reopened_client.get_metadata() + assert metadata.total_request_count == 2 + + # Fetch requests to verify they're still there + request1 = await reopened_client.fetch_next_request() + request2 = await reopened_client.fetch_next_request() + + assert request1 is not None + assert request2 is not None + assert {request1.url, request2.url} == {'https://example.com/1', 'https://example.com/2'} + + await reopened_client.drop() diff --git a/tests/unit/storages/test_dataset.py b/tests/unit/storages/test_dataset.py index b4f75bc6b4..7a2f5476d0 100644 --- a/tests/unit/storages/test_dataset.py +++ b/tests/unit/storages/test_dataset.py @@ -3,12 +3,13 @@ from __future__ import annotations +import warnings from typing import TYPE_CHECKING import pytest from crawlee.configuration import Configuration -from crawlee.storage_clients import FileSystemStorageClient, MemoryStorageClient +from crawlee.storage_clients import FileSystemStorageClient, MemoryStorageClient, SqlStorageClient from crawlee.storages import Dataset, KeyValueStore if TYPE_CHECKING: @@ -19,11 +20,15 @@ from crawlee.storage_clients import StorageClient -@pytest.fixture(params=['memory', 'file_system']) +@pytest.fixture(params=['memory', 'file_system', 'sql']) def storage_client(request: pytest.FixtureRequest) -> StorageClient: """Parameterized fixture to test with different storage clients.""" if request.param == 'memory': return MemoryStorageClient() + if request.param == 'sql': + with warnings.catch_warnings(): + warnings.simplefilter('ignore', UserWarning) + return SqlStorageClient() return FileSystemStorageClient() diff --git a/tests/unit/storages/test_key_value_store.py b/tests/unit/storages/test_key_value_store.py index 25bbcb4fc0..3795424915 100644 --- a/tests/unit/storages/test_key_value_store.py +++ b/tests/unit/storages/test_key_value_store.py @@ -4,12 +4,13 @@ from __future__ import annotations import json +import warnings from typing import TYPE_CHECKING import pytest from crawlee.configuration import Configuration -from crawlee.storage_clients import FileSystemStorageClient, MemoryStorageClient +from crawlee.storage_clients import FileSystemStorageClient, MemoryStorageClient, SqlStorageClient from crawlee.storages import KeyValueStore if TYPE_CHECKING: @@ -19,11 +20,15 @@ from crawlee.storage_clients import StorageClient -@pytest.fixture(params=['memory', 'file_system']) +@pytest.fixture(params=['memory', 'file_system', 'sql']) def storage_client(request: pytest.FixtureRequest) -> StorageClient: """Parameterized fixture to test with different storage clients.""" if request.param == 'memory': return MemoryStorageClient() + if request.param == 'sql': + with warnings.catch_warnings(): + warnings.simplefilter('ignore', UserWarning) + return SqlStorageClient() return FileSystemStorageClient() diff --git a/tests/unit/storages/test_request_queue.py b/tests/unit/storages/test_request_queue.py index 7227504a95..5e49b4da9d 100644 --- a/tests/unit/storages/test_request_queue.py +++ b/tests/unit/storages/test_request_queue.py @@ -4,13 +4,14 @@ from __future__ import annotations import asyncio +import warnings from typing import TYPE_CHECKING import pytest from crawlee import Request, service_locator from crawlee.configuration import Configuration -from crawlee.storage_clients import FileSystemStorageClient, MemoryStorageClient, StorageClient +from crawlee.storage_clients import FileSystemStorageClient, MemoryStorageClient, SqlStorageClient, StorageClient from crawlee.storages import RequestQueue if TYPE_CHECKING: @@ -20,11 +21,15 @@ from crawlee.storage_clients import StorageClient -@pytest.fixture(params=['memory', 'file_system']) +@pytest.fixture(params=['memory', 'file_system', 'sql']) def storage_client(request: pytest.FixtureRequest) -> StorageClient: """Parameterized fixture to test with different storage clients.""" if request.param == 'memory': return MemoryStorageClient() + if request.param == 'sql': + with warnings.catch_warnings(): + warnings.simplefilter('ignore', UserWarning) + return SqlStorageClient() return FileSystemStorageClient() diff --git a/uv.lock b/uv.lock index a58e64f995..f64e44eebd 100644 --- a/uv.lock +++ b/uv.lock @@ -7,6 +7,18 @@ resolution-markers = [ "python_full_version < '3.11'", ] +[[package]] +name = "aiosqlite" +version = "0.21.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/13/7d/8bca2bf9a247c2c5dfeec1d7a5f40db6518f88d314b8bca9da29670d2671/aiosqlite-0.21.0.tar.gz", hash = "sha256:131bb8056daa3bc875608c631c678cda73922a2d4ba8aec373b19f18c17e7aa3", size = 13454, upload-time = "2025-02-03T07:30:16.235Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f5/10/6c25ed6de94c49f88a91fa5018cb4c0f3625f31d5be9f771ebe5cc7cd506/aiosqlite-0.21.0-py3-none-any.whl", hash = "sha256:2549cf4057f95f53dcba16f2b64e8e2791d7e1adedb13197dd8ed77bb226d7d0", size = 15792, upload-time = "2025-02-03T07:30:13.6Z" }, +] + [[package]] name = "annotated-types" version = "0.7.0" @@ -86,6 +98,58 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f8/ed/e97229a566617f2ae958a6b13e7cc0f585470eac730a73e9e82c32a3cdd2/arrow-1.3.0-py3-none-any.whl", hash = "sha256:c728b120ebc00eb84e01882a6f5e7927a53960aa990ce7dd2b10f39005a67f80", size = 66419, upload-time = "2023-09-30T22:11:16.072Z" }, ] +[[package]] +name = "async-timeout" +version = "5.0.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/a5/ae/136395dfbfe00dfc94da3f3e136d0b13f394cba8f4841120e34226265780/async_timeout-5.0.1.tar.gz", hash = "sha256:d9321a7a3d5a6a5e187e824d2fa0793ce379a202935782d555d6e9d2735677d3", size = 9274, upload-time = "2024-11-06T16:41:39.6Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fe/ba/e2081de779ca30d473f21f5b30e0e737c438205440784c7dfc81efc2b029/async_timeout-5.0.1-py3-none-any.whl", hash = "sha256:39e3809566ff85354557ec2398b55e096c8364bacac9405a7a1fa429e77fe76c", size = 6233, upload-time = "2024-11-06T16:41:37.9Z" }, +] + +[[package]] +name = "asyncpg" +version = "0.30.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "async-timeout", marker = "python_full_version < '3.11'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/2f/4c/7c991e080e106d854809030d8584e15b2e996e26f16aee6d757e387bc17d/asyncpg-0.30.0.tar.gz", hash = "sha256:c551e9928ab6707602f44811817f82ba3c446e018bfe1d3abecc8ba5f3eac851", size = 957746, upload-time = "2024-10-20T00:30:41.127Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/bb/07/1650a8c30e3a5c625478fa8aafd89a8dd7d85999bf7169b16f54973ebf2c/asyncpg-0.30.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:bfb4dd5ae0699bad2b233672c8fc5ccbd9ad24b89afded02341786887e37927e", size = 673143, upload-time = "2024-10-20T00:29:08.846Z" }, + { url = "https://files.pythonhosted.org/packages/a0/9a/568ff9b590d0954553c56806766914c149609b828c426c5118d4869111d3/asyncpg-0.30.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:dc1f62c792752a49f88b7e6f774c26077091b44caceb1983509edc18a2222ec0", size = 645035, upload-time = "2024-10-20T00:29:12.02Z" }, + { url = "https://files.pythonhosted.org/packages/de/11/6f2fa6c902f341ca10403743701ea952bca896fc5b07cc1f4705d2bb0593/asyncpg-0.30.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3152fef2e265c9c24eec4ee3d22b4f4d2703d30614b0b6753e9ed4115c8a146f", size = 2912384, upload-time = "2024-10-20T00:29:13.644Z" }, + { url = "https://files.pythonhosted.org/packages/83/83/44bd393919c504ffe4a82d0aed8ea0e55eb1571a1dea6a4922b723f0a03b/asyncpg-0.30.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c7255812ac85099a0e1ffb81b10dc477b9973345793776b128a23e60148dd1af", size = 2947526, upload-time = "2024-10-20T00:29:15.871Z" }, + { url = "https://files.pythonhosted.org/packages/08/85/e23dd3a2b55536eb0ded80c457b0693352262dc70426ef4d4a6fc994fa51/asyncpg-0.30.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:578445f09f45d1ad7abddbff2a3c7f7c291738fdae0abffbeb737d3fc3ab8b75", size = 2895390, upload-time = "2024-10-20T00:29:19.346Z" }, + { url = "https://files.pythonhosted.org/packages/9b/26/fa96c8f4877d47dc6c1864fef5500b446522365da3d3d0ee89a5cce71a3f/asyncpg-0.30.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:c42f6bb65a277ce4d93f3fba46b91a265631c8df7250592dd4f11f8b0152150f", size = 3015630, upload-time = "2024-10-20T00:29:21.186Z" }, + { url = "https://files.pythonhosted.org/packages/34/00/814514eb9287614188a5179a8b6e588a3611ca47d41937af0f3a844b1b4b/asyncpg-0.30.0-cp310-cp310-win32.whl", hash = "sha256:aa403147d3e07a267ada2ae34dfc9324e67ccc4cdca35261c8c22792ba2b10cf", size = 568760, upload-time = "2024-10-20T00:29:22.769Z" }, + { url = "https://files.pythonhosted.org/packages/f0/28/869a7a279400f8b06dd237266fdd7220bc5f7c975348fea5d1e6909588e9/asyncpg-0.30.0-cp310-cp310-win_amd64.whl", hash = "sha256:fb622c94db4e13137c4c7f98834185049cc50ee01d8f657ef898b6407c7b9c50", size = 625764, upload-time = "2024-10-20T00:29:25.882Z" }, + { url = "https://files.pythonhosted.org/packages/4c/0e/f5d708add0d0b97446c402db7e8dd4c4183c13edaabe8a8500b411e7b495/asyncpg-0.30.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:5e0511ad3dec5f6b4f7a9e063591d407eee66b88c14e2ea636f187da1dcfff6a", size = 674506, upload-time = "2024-10-20T00:29:27.988Z" }, + { url = "https://files.pythonhosted.org/packages/6a/a0/67ec9a75cb24a1d99f97b8437c8d56da40e6f6bd23b04e2f4ea5d5ad82ac/asyncpg-0.30.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:915aeb9f79316b43c3207363af12d0e6fd10776641a7de8a01212afd95bdf0ed", size = 645922, upload-time = "2024-10-20T00:29:29.391Z" }, + { url = "https://files.pythonhosted.org/packages/5c/d9/a7584f24174bd86ff1053b14bb841f9e714380c672f61c906eb01d8ec433/asyncpg-0.30.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1c198a00cce9506fcd0bf219a799f38ac7a237745e1d27f0e1f66d3707c84a5a", size = 3079565, upload-time = "2024-10-20T00:29:30.832Z" }, + { url = "https://files.pythonhosted.org/packages/a0/d7/a4c0f9660e333114bdb04d1a9ac70db690dd4ae003f34f691139a5cbdae3/asyncpg-0.30.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3326e6d7381799e9735ca2ec9fd7be4d5fef5dcbc3cb555d8a463d8460607956", size = 3109962, upload-time = "2024-10-20T00:29:33.114Z" }, + { url = "https://files.pythonhosted.org/packages/3c/21/199fd16b5a981b1575923cbb5d9cf916fdc936b377e0423099f209e7e73d/asyncpg-0.30.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:51da377487e249e35bd0859661f6ee2b81db11ad1f4fc036194bc9cb2ead5056", size = 3064791, upload-time = "2024-10-20T00:29:34.677Z" }, + { url = "https://files.pythonhosted.org/packages/77/52/0004809b3427534a0c9139c08c87b515f1c77a8376a50ae29f001e53962f/asyncpg-0.30.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:bc6d84136f9c4d24d358f3b02be4b6ba358abd09f80737d1ac7c444f36108454", size = 3188696, upload-time = "2024-10-20T00:29:36.389Z" }, + { url = "https://files.pythonhosted.org/packages/52/cb/fbad941cd466117be58b774a3f1cc9ecc659af625f028b163b1e646a55fe/asyncpg-0.30.0-cp311-cp311-win32.whl", hash = "sha256:574156480df14f64c2d76450a3f3aaaf26105869cad3865041156b38459e935d", size = 567358, upload-time = "2024-10-20T00:29:37.915Z" }, + { url = "https://files.pythonhosted.org/packages/3c/0a/0a32307cf166d50e1ad120d9b81a33a948a1a5463ebfa5a96cc5606c0863/asyncpg-0.30.0-cp311-cp311-win_amd64.whl", hash = "sha256:3356637f0bd830407b5597317b3cb3571387ae52ddc3bca6233682be88bbbc1f", size = 629375, upload-time = "2024-10-20T00:29:39.987Z" }, + { url = "https://files.pythonhosted.org/packages/4b/64/9d3e887bb7b01535fdbc45fbd5f0a8447539833b97ee69ecdbb7a79d0cb4/asyncpg-0.30.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:c902a60b52e506d38d7e80e0dd5399f657220f24635fee368117b8b5fce1142e", size = 673162, upload-time = "2024-10-20T00:29:41.88Z" }, + { url = "https://files.pythonhosted.org/packages/6e/eb/8b236663f06984f212a087b3e849731f917ab80f84450e943900e8ca4052/asyncpg-0.30.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:aca1548e43bbb9f0f627a04666fedaca23db0a31a84136ad1f868cb15deb6e3a", size = 637025, upload-time = "2024-10-20T00:29:43.352Z" }, + { url = "https://files.pythonhosted.org/packages/cc/57/2dc240bb263d58786cfaa60920779af6e8d32da63ab9ffc09f8312bd7a14/asyncpg-0.30.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6c2a2ef565400234a633da0eafdce27e843836256d40705d83ab7ec42074efb3", size = 3496243, upload-time = "2024-10-20T00:29:44.922Z" }, + { url = "https://files.pythonhosted.org/packages/f4/40/0ae9d061d278b10713ea9021ef6b703ec44698fe32178715a501ac696c6b/asyncpg-0.30.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1292b84ee06ac8a2ad8e51c7475aa309245874b61333d97411aab835c4a2f737", size = 3575059, upload-time = "2024-10-20T00:29:46.891Z" }, + { url = "https://files.pythonhosted.org/packages/c3/75/d6b895a35a2c6506952247640178e5f768eeb28b2e20299b6a6f1d743ba0/asyncpg-0.30.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:0f5712350388d0cd0615caec629ad53c81e506b1abaaf8d14c93f54b35e3595a", size = 3473596, upload-time = "2024-10-20T00:29:49.201Z" }, + { url = "https://files.pythonhosted.org/packages/c8/e7/3693392d3e168ab0aebb2d361431375bd22ffc7b4a586a0fc060d519fae7/asyncpg-0.30.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:db9891e2d76e6f425746c5d2da01921e9a16b5a71a1c905b13f30e12a257c4af", size = 3641632, upload-time = "2024-10-20T00:29:50.768Z" }, + { url = "https://files.pythonhosted.org/packages/32/ea/15670cea95745bba3f0352341db55f506a820b21c619ee66b7d12ea7867d/asyncpg-0.30.0-cp312-cp312-win32.whl", hash = "sha256:68d71a1be3d83d0570049cd1654a9bdfe506e794ecc98ad0873304a9f35e411e", size = 560186, upload-time = "2024-10-20T00:29:52.394Z" }, + { url = "https://files.pythonhosted.org/packages/7e/6b/fe1fad5cee79ca5f5c27aed7bd95baee529c1bf8a387435c8ba4fe53d5c1/asyncpg-0.30.0-cp312-cp312-win_amd64.whl", hash = "sha256:9a0292c6af5c500523949155ec17b7fe01a00ace33b68a476d6b5059f9630305", size = 621064, upload-time = "2024-10-20T00:29:53.757Z" }, + { url = "https://files.pythonhosted.org/packages/3a/22/e20602e1218dc07692acf70d5b902be820168d6282e69ef0d3cb920dc36f/asyncpg-0.30.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:05b185ebb8083c8568ea8a40e896d5f7af4b8554b64d7719c0eaa1eb5a5c3a70", size = 670373, upload-time = "2024-10-20T00:29:55.165Z" }, + { url = "https://files.pythonhosted.org/packages/3d/b3/0cf269a9d647852a95c06eb00b815d0b95a4eb4b55aa2d6ba680971733b9/asyncpg-0.30.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:c47806b1a8cbb0a0db896f4cd34d89942effe353a5035c62734ab13b9f938da3", size = 634745, upload-time = "2024-10-20T00:29:57.14Z" }, + { url = "https://files.pythonhosted.org/packages/8e/6d/a4f31bf358ce8491d2a31bfe0d7bcf25269e80481e49de4d8616c4295a34/asyncpg-0.30.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9b6fde867a74e8c76c71e2f64f80c64c0f3163e687f1763cfaf21633ec24ec33", size = 3512103, upload-time = "2024-10-20T00:29:58.499Z" }, + { url = "https://files.pythonhosted.org/packages/96/19/139227a6e67f407b9c386cb594d9628c6c78c9024f26df87c912fabd4368/asyncpg-0.30.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:46973045b567972128a27d40001124fbc821c87a6cade040cfcd4fa8a30bcdc4", size = 3592471, upload-time = "2024-10-20T00:30:00.354Z" }, + { url = "https://files.pythonhosted.org/packages/67/e4/ab3ca38f628f53f0fd28d3ff20edff1c975dd1cb22482e0061916b4b9a74/asyncpg-0.30.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:9110df111cabc2ed81aad2f35394a00cadf4f2e0635603db6ebbd0fc896f46a4", size = 3496253, upload-time = "2024-10-20T00:30:02.794Z" }, + { url = "https://files.pythonhosted.org/packages/ef/5f/0bf65511d4eeac3a1f41c54034a492515a707c6edbc642174ae79034d3ba/asyncpg-0.30.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:04ff0785ae7eed6cc138e73fc67b8e51d54ee7a3ce9b63666ce55a0bf095f7ba", size = 3662720, upload-time = "2024-10-20T00:30:04.501Z" }, + { url = "https://files.pythonhosted.org/packages/e7/31/1513d5a6412b98052c3ed9158d783b1e09d0910f51fbe0e05f56cc370bc4/asyncpg-0.30.0-cp313-cp313-win32.whl", hash = "sha256:ae374585f51c2b444510cdf3595b97ece4f233fde739aa14b50e0d64e8a7a590", size = 560404, upload-time = "2024-10-20T00:30:06.537Z" }, + { url = "https://files.pythonhosted.org/packages/c8/a4/cec76b3389c4c5ff66301cd100fe88c318563ec8a520e0b2e792b5b84972/asyncpg-0.30.0-cp313-cp313-win_amd64.whl", hash = "sha256:f59b430b8e27557c3fb9869222559f7417ced18688375825f8f12302c34e915e", size = 621623, upload-time = "2024-10-20T00:30:09.024Z" }, +] + [[package]] name = "backports-asyncio-runner" version = "1.2.0" @@ -603,7 +667,9 @@ adaptive-crawler = [ { name = "scikit-learn" }, ] all = [ + { name = "aiosqlite" }, { name = "apify-fingerprint-datapoints" }, + { name = "asyncpg" }, { name = "beautifulsoup4", extra = ["lxml"] }, { name = "browserforge" }, { name = "cookiecutter" }, @@ -622,6 +688,7 @@ all = [ { name = "playwright" }, { name = "rich" }, { name = "scikit-learn" }, + { name = "sqlalchemy", extra = ["asyncio"] }, { name = "typer" }, { name = "wrapt" }, ] @@ -660,6 +727,14 @@ playwright = [ { name = "browserforge" }, { name = "playwright" }, ] +sql-postgres = [ + { name = "asyncpg" }, + { name = "sqlalchemy", extra = ["asyncio"] }, +] +sql-sqlite = [ + { name = "aiosqlite" }, + { name = "sqlalchemy", extra = ["asyncio"] }, +] [package.dev-dependencies] dev = [ @@ -687,9 +762,11 @@ dev = [ [package.metadata] requires-dist = [ + { name = "aiosqlite", marker = "extra == 'sql-sqlite'", specifier = ">=0.21.0" }, { name = "apify-fingerprint-datapoints", marker = "extra == 'adaptive-crawler'", specifier = ">=0.0.2" }, { name = "apify-fingerprint-datapoints", marker = "extra == 'httpx'", specifier = ">=0.0.2" }, { name = "apify-fingerprint-datapoints", marker = "extra == 'playwright'", specifier = ">=0.0.2" }, + { name = "asyncpg", marker = "extra == 'sql-postgres'", specifier = ">=0.24.0" }, { name = "beautifulsoup4", extras = ["lxml"], marker = "extra == 'beautifulsoup'", specifier = ">=4.12.0" }, { name = "browserforge", marker = "extra == 'adaptive-crawler'", specifier = ">=1.2.3" }, { name = "browserforge", marker = "extra == 'httpx'", specifier = ">=1.2.3" }, @@ -697,7 +774,7 @@ requires-dist = [ { name = "cachetools", specifier = ">=5.5.0" }, { name = "colorama", specifier = ">=0.4.0" }, { name = "cookiecutter", marker = "extra == 'cli'", specifier = ">=2.6.0" }, - { name = "crawlee", extras = ["adaptive-crawler", "beautifulsoup", "cli", "curl-impersonate", "httpx", "parsel", "playwright", "otel"], marker = "extra == 'all'" }, + { name = "crawlee", extras = ["adaptive-crawler", "beautifulsoup", "cli", "curl-impersonate", "httpx", "parsel", "playwright", "otel", "sql-sqlite", "sql-postgres"], marker = "extra == 'all'" }, { name = "curl-cffi", marker = "extra == 'curl-impersonate'", specifier = ">=0.9.0" }, { name = "html5lib", marker = "extra == 'beautifulsoup'", specifier = ">=1.0" }, { name = "httpx", extras = ["brotli", "http2", "zstd"], marker = "extra == 'httpx'", specifier = ">=0.27.0" }, @@ -721,13 +798,15 @@ requires-dist = [ { name = "pyee", specifier = ">=9.0.0" }, { name = "rich", marker = "extra == 'cli'", specifier = ">=13.9.0" }, { name = "scikit-learn", marker = "extra == 'adaptive-crawler'", specifier = ">=1.6.0" }, + { name = "sqlalchemy", extras = ["asyncio"], marker = "extra == 'sql-postgres'", specifier = ">=2.0.0,<3.0.0" }, + { name = "sqlalchemy", extras = ["asyncio"], marker = "extra == 'sql-sqlite'", specifier = ">=2.0.0,<3.0.0" }, { name = "tldextract", specifier = ">=5.1.0" }, { name = "typer", marker = "extra == 'cli'", specifier = ">=0.12.0" }, { name = "typing-extensions", specifier = ">=4.1.0" }, { name = "wrapt", marker = "extra == 'otel'", specifier = ">=1.17.0" }, { name = "yarl", specifier = ">=1.18.0" }, ] -provides-extras = ["all", "adaptive-crawler", "beautifulsoup", "cli", "curl-impersonate", "httpx", "parsel", "playwright", "otel"] +provides-extras = ["all", "adaptive-crawler", "beautifulsoup", "cli", "curl-impersonate", "httpx", "parsel", "playwright", "otel", "sql-postgres", "sql-sqlite"] [package.metadata.requires-dev] dev = [ @@ -2848,6 +2927,56 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e7/9c/0e6afc12c269578be5c0c1c9f4b49a8d32770a080260c333ac04cc1c832d/soupsieve-2.7-py3-none-any.whl", hash = "sha256:6e60cc5c1ffaf1cebcc12e8188320b72071e922c2e897f737cadce79ad5d30c4", size = 36677, upload-time = "2025-04-20T18:50:07.196Z" }, ] +[[package]] +name = "sqlalchemy" +version = "2.0.42" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "greenlet", marker = "(python_full_version < '3.14' and platform_machine == 'AMD64') or (python_full_version < '3.14' and platform_machine == 'WIN32') or (python_full_version < '3.14' and platform_machine == 'aarch64') or (python_full_version < '3.14' and platform_machine == 'amd64') or (python_full_version < '3.14' and platform_machine == 'ppc64le') or (python_full_version < '3.14' and platform_machine == 'win32') or (python_full_version < '3.14' and platform_machine == 'x86_64')" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/5a/03/a0af991e3a43174d6b83fca4fb399745abceddd1171bdabae48ce877ff47/sqlalchemy-2.0.42.tar.gz", hash = "sha256:160bedd8a5c28765bd5be4dec2d881e109e33b34922e50a3b881a7681773ac5f", size = 9749972, upload-time = "2025-07-29T12:48:09.323Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/3a/12/33ff43214c2c6cc87499b402fe419869d2980a08101c991daae31345e901/sqlalchemy-2.0.42-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:172b244753e034d91a826f80a9a70f4cbac690641207f2217f8404c261473efe", size = 2130469, upload-time = "2025-07-29T13:25:15.215Z" }, + { url = "https://files.pythonhosted.org/packages/63/c4/4d2f2c21ddde9a2c7f7b258b202d6af0bac9fc5abfca5de367461c86d766/sqlalchemy-2.0.42-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:be28f88abd74af8519a4542185ee80ca914933ca65cdfa99504d82af0e4210df", size = 2120393, upload-time = "2025-07-29T13:25:16.367Z" }, + { url = "https://files.pythonhosted.org/packages/a8/0d/5ff2f2dfbac10e4a9ade1942f8985ffc4bd8f157926b1f8aed553dfe3b88/sqlalchemy-2.0.42-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:98b344859d282fde388047f1710860bb23f4098f705491e06b8ab52a48aafea9", size = 3206173, upload-time = "2025-07-29T13:29:00.623Z" }, + { url = "https://files.pythonhosted.org/packages/1f/59/71493fe74bd76a773ae8fa0c50bfc2ccac1cbf7cfa4f9843ad92897e6dcf/sqlalchemy-2.0.42-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:97978d223b11f1d161390a96f28c49a13ce48fdd2fed7683167c39bdb1b8aa09", size = 3206910, upload-time = "2025-07-29T13:24:50.58Z" }, + { url = "https://files.pythonhosted.org/packages/a9/51/01b1d85bbb492a36b25df54a070a0f887052e9b190dff71263a09f48576b/sqlalchemy-2.0.42-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:e35b9b000c59fcac2867ab3a79fc368a6caca8706741beab3b799d47005b3407", size = 3145479, upload-time = "2025-07-29T13:29:02.3Z" }, + { url = "https://files.pythonhosted.org/packages/fa/78/10834f010e2a3df689f6d1888ea6ea0074ff10184e6a550b8ed7f9189a89/sqlalchemy-2.0.42-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:bc7347ad7a7b1c78b94177f2d57263113bb950e62c59b96ed839b131ea4234e1", size = 3169605, upload-time = "2025-07-29T13:24:52.135Z" }, + { url = "https://files.pythonhosted.org/packages/0c/75/e6fdd66d237582c8488dd1dfa90899f6502822fbd866363ab70e8ac4a2ce/sqlalchemy-2.0.42-cp310-cp310-win32.whl", hash = "sha256:739e58879b20a179156b63aa21f05ccacfd3e28e08e9c2b630ff55cd7177c4f1", size = 2098759, upload-time = "2025-07-29T13:23:55.809Z" }, + { url = "https://files.pythonhosted.org/packages/a5/a8/366db192641c2c2d1ea8977e7c77b65a0d16a7858907bb76ea68b9dd37af/sqlalchemy-2.0.42-cp310-cp310-win_amd64.whl", hash = "sha256:1aef304ada61b81f1955196f584b9e72b798ed525a7c0b46e09e98397393297b", size = 2122423, upload-time = "2025-07-29T13:23:56.968Z" }, + { url = "https://files.pythonhosted.org/packages/ea/3c/7bfd65f3c2046e2fb4475b21fa0b9d7995f8c08bfa0948df7a4d2d0de869/sqlalchemy-2.0.42-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:c34100c0b7ea31fbc113c124bcf93a53094f8951c7bf39c45f39d327bad6d1e7", size = 2133779, upload-time = "2025-07-29T13:25:18.446Z" }, + { url = "https://files.pythonhosted.org/packages/66/17/19be542fe9dd64a766090e90e789e86bdaa608affda6b3c1e118a25a2509/sqlalchemy-2.0.42-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:ad59dbe4d1252448c19d171dfba14c74e7950b46dc49d015722a4a06bfdab2b0", size = 2123843, upload-time = "2025-07-29T13:25:19.749Z" }, + { url = "https://files.pythonhosted.org/packages/14/fc/83e45fc25f0acf1c26962ebff45b4c77e5570abb7c1a425a54b00bcfa9c7/sqlalchemy-2.0.42-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f9187498c2149919753a7fd51766ea9c8eecdec7da47c1b955fa8090bc642eaa", size = 3294824, upload-time = "2025-07-29T13:29:03.879Z" }, + { url = "https://files.pythonhosted.org/packages/b9/81/421efc09837104cd1a267d68b470e5b7b6792c2963b8096ca1e060ba0975/sqlalchemy-2.0.42-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1f092cf83ebcafba23a247f5e03f99f5436e3ef026d01c8213b5eca48ad6efa9", size = 3294662, upload-time = "2025-07-29T13:24:53.715Z" }, + { url = "https://files.pythonhosted.org/packages/2f/ba/55406e09d32ed5e5f9e8aaec5ef70c4f20b4ae25b9fa9784f4afaa28e7c3/sqlalchemy-2.0.42-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:fc6afee7e66fdba4f5a68610b487c1f754fccdc53894a9567785932dbb6a265e", size = 3229413, upload-time = "2025-07-29T13:29:05.638Z" }, + { url = "https://files.pythonhosted.org/packages/d4/c4/df596777fce27bde2d1a4a2f5a7ddea997c0c6d4b5246aafba966b421cc0/sqlalchemy-2.0.42-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:260ca1d2e5910f1f1ad3fe0113f8fab28657cee2542cb48c2f342ed90046e8ec", size = 3255563, upload-time = "2025-07-29T13:24:55.17Z" }, + { url = "https://files.pythonhosted.org/packages/16/ed/b9c4a939b314400f43f972c9eb0091da59d8466ef9c51d0fd5b449edc495/sqlalchemy-2.0.42-cp311-cp311-win32.whl", hash = "sha256:2eb539fd83185a85e5fcd6b19214e1c734ab0351d81505b0f987705ba0a1e231", size = 2098513, upload-time = "2025-07-29T13:23:58.946Z" }, + { url = "https://files.pythonhosted.org/packages/91/72/55b0c34e39feb81991aa3c974d85074c356239ac1170dfb81a474b4c23b3/sqlalchemy-2.0.42-cp311-cp311-win_amd64.whl", hash = "sha256:9193fa484bf00dcc1804aecbb4f528f1123c04bad6a08d7710c909750fa76aeb", size = 2123380, upload-time = "2025-07-29T13:24:00.155Z" }, + { url = "https://files.pythonhosted.org/packages/61/66/ac31a9821fc70a7376321fb2c70fdd7eadbc06dadf66ee216a22a41d6058/sqlalchemy-2.0.42-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:09637a0872689d3eb71c41e249c6f422e3e18bbd05b4cd258193cfc7a9a50da2", size = 2132203, upload-time = "2025-07-29T13:29:19.291Z" }, + { url = "https://files.pythonhosted.org/packages/fc/ba/fd943172e017f955d7a8b3a94695265b7114efe4854feaa01f057e8f5293/sqlalchemy-2.0.42-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:a3cb3ec67cc08bea54e06b569398ae21623534a7b1b23c258883a7c696ae10df", size = 2120373, upload-time = "2025-07-29T13:29:21.049Z" }, + { url = "https://files.pythonhosted.org/packages/ea/a2/b5f7d233d063ffadf7e9fff3898b42657ba154a5bec95a96f44cba7f818b/sqlalchemy-2.0.42-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e87e6a5ef6f9d8daeb2ce5918bf5fddecc11cae6a7d7a671fcc4616c47635e01", size = 3317685, upload-time = "2025-07-29T13:26:40.837Z" }, + { url = "https://files.pythonhosted.org/packages/86/00/fcd8daab13a9119d41f3e485a101c29f5d2085bda459154ba354c616bf4e/sqlalchemy-2.0.42-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0b718011a9d66c0d2f78e1997755cd965f3414563b31867475e9bc6efdc2281d", size = 3326967, upload-time = "2025-07-29T13:22:31.009Z" }, + { url = "https://files.pythonhosted.org/packages/a3/85/e622a273d648d39d6771157961956991a6d760e323e273d15e9704c30ccc/sqlalchemy-2.0.42-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:16d9b544873fe6486dddbb859501a07d89f77c61d29060bb87d0faf7519b6a4d", size = 3255331, upload-time = "2025-07-29T13:26:42.579Z" }, + { url = "https://files.pythonhosted.org/packages/3a/a0/2c2338b592c7b0a61feffd005378c084b4c01fabaf1ed5f655ab7bd446f0/sqlalchemy-2.0.42-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:21bfdf57abf72fa89b97dd74d3187caa3172a78c125f2144764a73970810c4ee", size = 3291791, upload-time = "2025-07-29T13:22:32.454Z" }, + { url = "https://files.pythonhosted.org/packages/41/19/b8a2907972a78285fdce4c880ecaab3c5067eb726882ca6347f7a4bf64f6/sqlalchemy-2.0.42-cp312-cp312-win32.whl", hash = "sha256:78b46555b730a24901ceb4cb901c6b45c9407f8875209ed3c5d6bcd0390a6ed1", size = 2096180, upload-time = "2025-07-29T13:16:08.952Z" }, + { url = "https://files.pythonhosted.org/packages/48/1f/67a78f3dfd08a2ed1c7be820fe7775944f5126080b5027cc859084f8e223/sqlalchemy-2.0.42-cp312-cp312-win_amd64.whl", hash = "sha256:4c94447a016f36c4da80072e6c6964713b0af3c8019e9c4daadf21f61b81ab53", size = 2123533, upload-time = "2025-07-29T13:16:11.705Z" }, + { url = "https://files.pythonhosted.org/packages/e9/7e/25d8c28b86730c9fb0e09156f601d7a96d1c634043bf8ba36513eb78887b/sqlalchemy-2.0.42-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:941804f55c7d507334da38133268e3f6e5b0340d584ba0f277dd884197f4ae8c", size = 2127905, upload-time = "2025-07-29T13:29:22.249Z" }, + { url = "https://files.pythonhosted.org/packages/e5/a1/9d8c93434d1d983880d976400fcb7895a79576bd94dca61c3b7b90b1ed0d/sqlalchemy-2.0.42-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:95d3d06a968a760ce2aa6a5889fefcbdd53ca935735e0768e1db046ec08cbf01", size = 2115726, upload-time = "2025-07-29T13:29:23.496Z" }, + { url = "https://files.pythonhosted.org/packages/a2/cc/d33646fcc24c87cc4e30a03556b611a4e7bcfa69a4c935bffb923e3c89f4/sqlalchemy-2.0.42-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4cf10396a8a700a0f38ccd220d940be529c8f64435c5d5b29375acab9267a6c9", size = 3246007, upload-time = "2025-07-29T13:26:44.166Z" }, + { url = "https://files.pythonhosted.org/packages/67/08/4e6c533d4c7f5e7c4cbb6fe8a2c4e813202a40f05700d4009a44ec6e236d/sqlalchemy-2.0.42-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9cae6c2b05326d7c2c7c0519f323f90e0fb9e8afa783c6a05bb9ee92a90d0f04", size = 3250919, upload-time = "2025-07-29T13:22:33.74Z" }, + { url = "https://files.pythonhosted.org/packages/5c/82/f680e9a636d217aece1b9a8030d18ad2b59b5e216e0c94e03ad86b344af3/sqlalchemy-2.0.42-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:f50f7b20677b23cfb35b6afcd8372b2feb348a38e3033f6447ee0704540be894", size = 3180546, upload-time = "2025-07-29T13:26:45.648Z" }, + { url = "https://files.pythonhosted.org/packages/7d/a2/8c8f6325f153894afa3775584c429cc936353fb1db26eddb60a549d0ff4b/sqlalchemy-2.0.42-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:9d88a1c0d66d24e229e3938e1ef16ebdbd2bf4ced93af6eff55225f7465cf350", size = 3216683, upload-time = "2025-07-29T13:22:34.977Z" }, + { url = "https://files.pythonhosted.org/packages/39/44/3a451d7fa4482a8ffdf364e803ddc2cfcafc1c4635fb366f169ecc2c3b11/sqlalchemy-2.0.42-cp313-cp313-win32.whl", hash = "sha256:45c842c94c9ad546c72225a0c0d1ae8ef3f7c212484be3d429715a062970e87f", size = 2093990, upload-time = "2025-07-29T13:16:13.036Z" }, + { url = "https://files.pythonhosted.org/packages/4b/9e/9bce34f67aea0251c8ac104f7bdb2229d58fb2e86a4ad8807999c4bee34b/sqlalchemy-2.0.42-cp313-cp313-win_amd64.whl", hash = "sha256:eb9905f7f1e49fd57a7ed6269bc567fcbbdac9feadff20ad6bd7707266a91577", size = 2120473, upload-time = "2025-07-29T13:16:14.502Z" }, + { url = "https://files.pythonhosted.org/packages/ee/55/ba2546ab09a6adebc521bf3974440dc1d8c06ed342cceb30ed62a8858835/sqlalchemy-2.0.42-py3-none-any.whl", hash = "sha256:defcdff7e661f0043daa381832af65d616e060ddb54d3fe4476f51df7eaa1835", size = 1922072, upload-time = "2025-07-29T13:09:17.061Z" }, +] + +[package.optional-dependencies] +asyncio = [ + { name = "greenlet" }, +] + [[package]] name = "text-unidecode" version = "1.3"