Skip to content

Conversation

Mantisus
Copy link
Collaborator

@Mantisus Mantisus commented Sep 13, 2025

Description

This PR implements a storage client RedisStorageClient based on Redis v8+. The minimum version 8 requirement is due to the fact that all data structures used are only available starting from Redis Open-Source version 8, without any additional extensions.

Testing

  • Added new unit tests
  • For testing without actual Redis usage, fakeredis is used

@Mantisus Mantisus self-assigned this Sep 13, 2025
@Mantisus Mantisus changed the title feat: Add RedisStorageClient feat: Add RedisStorageClient based on Redis v8.0+ Sep 14, 2025
@Mantisus
Copy link
Collaborator Author

Performance test.

1 client

Code to run

import asyncio

from crawlee import ConcurrencySettings
from crawlee.crawlers import ParselCrawler, ParselCrawlingContext
from crawlee.http_clients import HttpxHttpClient
from crawlee.storage_clients import RedisStorageClient

CONNECTION = 'redis://localhost:6379'


async def main() -> None:
    storage_client = RedisStorageClient(connection_string=CONNECTION)
    http_client = HttpxHttpClient()

    crawler = ParselCrawler(
        storage_client=storage_client,
        http_client=http_client,
        concurrency_settings=ConcurrencySettings(desired_concurrency=20),
    )

    @crawler.router.default_handler
    async def request_handler(context: ParselCrawlingContext) -> None:
        context.log.info(f'Processing URL: {context.request.url}...')
        data = {
            'url': context.request.url,
            'title': context.selector.css('title::text').get(),
        }
        await context.push_data(data)
        await context.enqueue_links()

    await crawler.run(['https://crawlee.dev'])


if __name__ == '__main__':
    asyncio.run(main())
[ParselCrawler] INFO  Final request statistics:
┌───────────────────────────────┬────────────┐
│ requests_finished             │ 2363       │
│ requests_failed               │ 0          │
│ retry_histogram               │ [2363]     │
│ request_avg_failed_duration   │ None       │
│ request_avg_finished_duration │ 358.4ms    │
│ requests_finished_per_minute  │ 3545       │
│ requests_failed_per_minute    │ 0          │
│ request_total_duration        │ 14min 6.8s │
│ requests_total                │ 2363       │
│ crawler_runtime               │ 39.99s     │
└───────────────────────────────┴────────────┘

3 clients

Code to run

import asyncio
from concurrent.futures import ProcessPoolExecutor

from crawlee import ConcurrencySettings, service_locator
from crawlee.crawlers import ParselCrawler, ParselCrawlingContext
from crawlee.http_clients import HttpxHttpClient
from crawlee.storage_clients import RedisStorageClient
from crawlee.storages import RequestQueue

CONNECTION = 'redis://localhost:6379'

async def run(queue_name: str) -> None:
    storage_client = RedisStorageClient(connection_string=CONNECTION)

    service_locator.set_storage_client(storage_client)
    queue = await RequestQueue.open(name=queue_name)

    http_client = HttpxHttpClient()

    crawler = ParselCrawler(
        http_client=http_client,
        request_manager=queue,
        concurrency_settings=ConcurrencySettings(desired_concurrency=20),
    )

    @crawler.router.default_handler
    async def request_handler(context: ParselCrawlingContext) -> None:
        context.log.info(f'Processing URL: {context.request.url}...')
        data = {
            'url': context.request.url,
            'title': context.selector.css('title::text').get(),
        }
        await context.push_data(data)
        await context.enqueue_links()

    await crawler.run(['https://crawlee.dev'])

def process_run(queue_name: str) -> None:
    asyncio.run(run(queue_name))

def multi_run(queue_name: str = 'multi') -> None:
    workers = 3
    with ProcessPoolExecutor(max_workers=workers) as executor:
        executor.map(process_run, [queue_name for i in range(workers)])

if __name__ == '__main__':
    multi_run()
[ParselCrawler] INFO  Final request statistics:
┌───────────────────────────────┬────────────┐
│ requests_finished             │ 779        │
│ requests_failed               │ 0          │
│ retry_histogram               │ [779]      │
│ request_avg_failed_duration   │ None       │
│ request_avg_finished_duration │ 356.9ms    │
│ requests_finished_per_minute  │ 2996       │
│ requests_failed_per_minute    │ 0          │
│ request_total_duration        │ 4min 38.0s │
│ requests_total                │ 779        │
│ crawler_runtime               │ 15.60s     │
└───────────────────────────────┴────────────┘
[ParselCrawler] INFO  Final request statistics:
┌───────────────────────────────┬────────────┐
│ requests_finished             │ 762        │
│ requests_failed               │ 0          │
│ retry_histogram               │ [762]      │
│ request_avg_failed_duration   │ None       │
│ request_avg_finished_duration │ 360.0ms    │
│ requests_finished_per_minute  │ 2931       │
│ requests_failed_per_minute    │ 0          │
│ request_total_duration        │ 4min 34.3s │
│ requests_total                │ 762        │
│ crawler_runtime               │ 15.60s     │
└───────────────────────────────┴────────────┘
[ParselCrawler] INFO  Final request statistics:
┌───────────────────────────────┬────────────┐
│ requests_finished             │ 822        │
│ requests_failed               │ 0          │
│ retry_histogram               │ [822]      │
│ request_avg_failed_duration   │ None       │
│ request_avg_finished_duration │ 342.2ms    │
│ requests_finished_per_minute  │ 3161       │
│ requests_failed_per_minute    │ 0          │
│ request_total_duration        │ 4min 41.3s │
│ requests_total                │ 822        │
│ crawler_runtime               │ 15.60s     │
└───────────────────────────────┴────────────┘

@Mantisus Mantisus marked this pull request as ready for review September 15, 2025 18:21
@Mantisus
Copy link
Collaborator Author

In RedisRequestQueueClient, I used a Bloom filter in Redis for deduplication and tracking handled requests. This approach differs from what we used in other clients (using set). My main motivation was that Redis is an in-memory database, and memory consumption can be critical in some cases.

Since a Bloom filter is a probabilistic data structure, the final data structure size is affected by the error probability; I used 1e-7. This means that with a probability of 1e-7, we may get a false positive when checking the filter. In our case, this translates to a probability of skipping a request: with probability 1e-7, a request that wasn't added to the queue will be considered as already added. Similarly, with probability 1e-7, a request that hasn't been handled yet will be considered as already processed.

Memory consumption for records in the format 'https://crawlee.dev/{i}' (record size doesn't matter for Bloom filters):

Redis Bloom filter:

  • 100,000 - 427 KB
  • 1,000,000 - 4 MB
  • 10,000,000 - 42 MB

Redis set:

  • 100,000 - 6 MB
  • 1,000,000 - 61 MB
  • 10,000,000 - 662 MB

Discussion about whether it's worth pursuing this approach is welcome!

@janbuchar
Copy link
Collaborator

I haven't read the PR yet, but I did look into bloom filters for request deduplication in the past and what you wrote piqued my interest 🙂 I am a little worried about the chance of dropping a URL completely, even with a super small probability.

Perhaps we should default to a solution that tolerates some percentage of the "opposite" errors and allows a URL to get processed multiple times in rare cases. A fixed size hash table is an example of such data structure. I don't know if anything more sophisticated exists.

But maybe I have an irrational fear of probabilistic stuff 🙂

@Mantisus
Copy link
Collaborator Author

I am a little worried about the chance of dropping a URL completely, even with a super small probability.

Yes, I agree that this may be a little disturbing. And if we go down this route, it will need to be highlighted separately for the user.

But perhaps I am not sufficiently afraid of probabilistic structures, as I have used them before. 🙂

@Mantisus
Copy link
Collaborator Author

Since we have already added the ability to parameterize queue behavior in the SDK (single, shared), I updated RedisStorageClient with a parameter defining the deduplication strategy queue_dedup_strategy. If set to default, it will use Set, bloom Bloom filters.

@Pijukatel
Copy link
Collaborator

Pijukatel commented Oct 20, 2025

It also works with:

@Mantisus
Copy link
Collaborator Author

It also works with:

Thanks for checking that out!

It's a real surprise to me that the Redis client is fully compatible with these.

Copy link
Collaborator

@Pijukatel Pijukatel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some small comments and questions. I am really looking forward to using this. Good work.

redis: The Redis client instance.
with_wait: Whether to wait for the storage to be created if it doesn't exist.
"""
if with_wait:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please explain the usecases?
Is there any scenario where someone would call this with with_wait=False for non-existent storage?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially, this is a synchronizer at the Redis level for working with multiple clients.
If, when opening the storage, we find that the name_to_id index already contains the name/alias of the storage, we cannot be sure whether it has already been completely created or not. In this case, the with_wait synchronizer is used.

)
client = cls(storage_name=internal_name, storage_id=metadata.id, redis=redis, **instance_kwargs)
created = await client._create_metadata_and_storage(internal_name, metadata.model_dump())
# The client was probably not created due to a race condition. Let's try to open it using the name.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can happen for two attempts to create same storage at the same time?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's why the with_wait synchronizer is used. So that when the client encounters a race condition, it waits until the first client finishes creating the storage and reports it.

data = [data]

async with self._get_pipeline() as pipe:
# Incorrect signature for args type in redis-py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an existing issue about this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can expect this to be fixed in the next release.
redis/redis-py#3780

async with self._get_pipeline() as pipe:
# Incorrect signature for args type in redis-py
pipe.json().arrappend(self._items_key, '$', *data) # type: ignore[arg-type]
delta_item_count = len(data)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably just inline this since the argument name is same as variable name:
delta_item_count=len(data)

view: str | None = None,
) -> DatasetItemsListPage:
# Check for unsupported arguments and log a warning if found
# When implementing, explore the capabilities of jsonpath to determine what can be done at the Redis level.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems more like a dev comment to himself, is it supposed to stay in the code?

_MAX_BATCH_FETCH_SIZE = 10
"""Maximum number of requests to fetch in a single batch operation."""

_BLOCK_REQUEST_TIME = 300_000 # milliseconds
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please explain this a bit?

Based on the description, I get the impression that I can't call reclaim_request for 300 s, but that is probably not the case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the time for which the request is blocked in the storage for other clients. After it expires, it will be returned to the queue. This protects against queue freezing if any of the clients crash.

I will improve the documentation.

delta_total = len(actually_added)

for unique_key in new_unique_keys:
if unique_key in actually_added:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe simplify to

  processed_requests.append(
      ProcessedRequest(
          unique_key=unique_key,
          was_already_present=not unique_key in actually_added,
          was_already_handled=False,
      )
  )

- 'bloom': Uses Redis Bloom filters for probabilistic deduplication with lower memory usage. When using
this approach, there is a possibility 1e-7 that requests will be skipped in the queue.
"""
if redis is not None and connection_string is not None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe pattern matching again would be more readable


@pytest.fixture(params=['memory', 'file_system', 'sql'])
def storage_client(request: pytest.FixtureRequest) -> StorageClient:
@pytest.fixture(params=['memory', 'file_system', 'sql', ('redis', 'default'), ('redis', 'bloom')])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just redis_default and redis_bloom would make it simpler

    elif storage_type == 'redis_default':
        storage_client = RedisStorageClient(redis=redis_client, queue_dedup_strategy='default')
    elif storage_type == 'redis_bloom':
        storage_client = RedisStorageClient(redis=redis_client, queue_dedup_strategy='bloom')

name = await await_redis_response(dataset_client.redis.hget('datasets:id_to_name', metadata.id))

assert name is not None
assert (name.decode() if isinstance(name, bytes) else name) == 'test_dataset'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can it be bytes or string?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the decode_responses parameter in the Redis constructor.

Copy link
Collaborator

@janbuchar janbuchar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should reach out to somebody who knows Redis better to review this for us. Or do you feel confident @Pijukatel? 😁

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should mention that Redis persistence is unlike that of filesystem or SQL storage and link to https://redis.io/docs/latest/operate/oss_and_stack/management/persistence/

return await response if isinstance(response, Awaitable) else response


def read_lua_script(file_path: Path) -> str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this accept just the file name and prepend the path to the lua_scripts directory automatically?


# Call the notification only once
warnings.warn(
'The RedisStorageClient is experimental and may change or be removed in future releases.',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we'll want to remove it 🙂 The storage "schema" could change, though - perhaps we should mention that.

Returns:
An instance for the opened or created storage client.
"""
internal_name = name or alias or cls._DEFAULT_NAME
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this - does it mean that there is no difference in behavior of named and aliased storages?

Copy link
Collaborator Author

@Mantisus Mantisus Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for alias in the metadata object name=None.

internal_name is used to form the key prefix. As in FileSystemStorageClient, the folder name is formed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants