Skip to content

Conversation

@valentijnscholten
Copy link
Member

@valentijnscholten valentijnscholten commented Sep 10, 2025

While working on some other PRs around performance I noticed that Watson was using a lot of time to update the search index.

Summary

This PR introduces asynchronous Watson search index updates to significantly improve API response times during large data imports while maintaining search functionality. Instead of blocking API responses while updating search indexes synchronously, updates are now processed in the background via Celery tasks.

🏗️ Implementation

Core Components

  1. AsyncSearchContextMiddleware - Inherits from Watson's SearchContextMiddleware to intercept and defer index updates.
  2. update_watson_search_index_for_model - Celery task that processes batched index updates using Watson's own bulk processing logic

The coupling with django-watson internals is limited to keep the code maintainable and this PR justifiable. Please note we're considering alternative search engines as django-watson is pretty basic and doesn't utilize advanced postgres features.

The upstream watson middleware keeps track of all model instances that are changed during a request. It does this by storing the model instance in a SearchContextMiddleware instance. It does this via a post_save signal. At the end of the request these instances are used to update the search index by calling the end() method on the SearchContextMiddleware .

Our implementation still tracks all the model instances in a SearchContextMiddleware instance. But at the end of the request we extract the models name and pk. These are send to the celery task. This task retrieves the model instances via the model name and pk. It then populates a SearchContextMiddleware instance again. It then calls the end() method the SearchContextMiddleware to update the search index. So we reuse most of the watson logic, we just pass some model names and pks around. It does mean the celery task will perform an extra query to retrieve the model instances from the database. This is done in 1 query per batch. The performance impact of this "read" query is negligible compared to the writes happening to the search index that happens right afterwards.

⚙️ Configuration

Environment Variables

Variable Default Description
DD_WATSON_ASYNC_INDEX_UPDATE_THRESHOLD 100 Minimum instances to trigger async updates
DD_WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE 1000 Maximum instances per Celery task

Configuration Examples

# Development (disable async for immediate indexing)
DD_WATSON_ASYNC_INDEX_UPDATE_THRESHOLD=-1

# Conservative (only very large updates async)
DD_WATSON_ASYNC_INDEX_UPDATE_THRESHOLD=500
DD_WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE=2000

# Aggressive (most updates async, small batches)
DD_WATSON_ASYNC_INDEX_UPDATE_THRESHOLD=10
DD_WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE=500

🎛️ Behavior

Smart Threshold Logic

  • threshold < 0 → All updates synchronous (async disabled)
  • instances <= threshold → Synchronous updates (fast, immediate)
  • instances > threshold → Asynchronous updates (prevents blocking)

📊 Performance Impact

When using the JFrog Unified Very Many sample file the importer would spend ~50-60s on updating the search index. This is 10-15% of the total import time. With this PR this all happens in a celery task making the import 10-15% faster.

I didn't add a test to test_importers_performance.py as that is not using the API and won't go through the middleware. I did add a testcase to make sure the async index update doesn't break in the future.

@valentijnscholten valentijnscholten added this to the 2.51.0 milestone Sep 10, 2025
@github-actions github-actions bot added settings_changes Needs changes to settings.py based on changes in settings.dist.py included in this PR unittests labels Sep 10, 2025
@valentijnscholten valentijnscholten marked this pull request as ready for review September 10, 2025 21:37
@dryrunsecurity
Copy link

dryrunsecurity bot commented Sep 10, 2025

DryRun Security

This pull request introduces middleware/tasks that dynamically load models by name (built from obj._meta) with no whitelist or validation, so an attacker who can trigger modifications to sensitive models (e.g., auth.User) could cause those records to be indexed and potentially expose sensitive data. It also accumulates all modified object primary keys in memory before batching, meaning a single request that modifies a very large number of objects could exhaust worker memory and cause a denial-of-service.

Arbitrary Model Loading in dojo/tasks.py
Vulnerability Arbitrary Model Loading
Description The update_watson_search_index_for_model task dynamically loads a Django model using apps.get_model() based on the model_name parameter. This model_name is constructed in AsyncSearchContextMiddleware from obj._meta.app_label and obj._meta.model_name of any modified object. There is no whitelist or validation to restrict which models can be indexed. An attacker who can trigger a modification on a sensitive model (e.g., their own auth.User object) could cause that model's data to be indexed by the search engine. If django-watson indexes sensitive fields by default and these are exposed via search results, this could lead to information disclosure.

@app.task
def clear_sessions(*args, **kwargs):
call_command("clearsessions")
@dojo_async_task
@app.task
def update_watson_search_index_for_model(model_name, pk_list, *args, **kwargs):
"""
Async task to update watson search indexes for a specific model type.
Args:
model_key: Model identifier like 'dojo.finding'
pk_list: List of primary keys for instances of this model type. it's advised to chunk the list into batches of 1000 or less.
"""
from watson.search import SearchContextManager, default_search_engine # noqa: PLC0415 circular import
logger.debug(f"Starting async watson index update for {len(pk_list)} {model_name} instances")
try:
# Create new SearchContextManager and start it
context_manager = SearchContextManager()
context_manager.start()
# Get the default engine and model class
engine = default_search_engine
app_label, model_name = model_name.split(".")
model_class = apps.get_model(app_label, model_name)
# Bulk load instances and add them to the context
instances = model_class.objects.filter(pk__in=pk_list)
instances_added = 0
instances_skipped = 0
for instance in instances:
try:
# Add to watson context (this will trigger indexing on end())
context_manager.add_to_context(engine, instance)
instances_added += 1
except Exception as e:
logger.warning(f"Skipping {model_name}:{instance.pk} - {e}")
instances_skipped += 1
continue
# Let watson handle the bulk indexing
context_manager.end()
logger.info(f"Completed async watson index update: {instances_added} updated, {instances_skipped} skipped")
except Exception as e:
logger.error(f"Watson async index update failed for {model_name}: {e}")
raise

Uncontrolled Resource Consumption in dojo/middleware.py
Vulnerability Uncontrolled Resource Consumption
Description The _extract_tasks_for_async method in AsyncSearchContextMiddleware aggregates all primary keys (PKs) of modified objects into an in-memory dictionary (model_groups) before any batching or asynchronous processing occurs. If a single web request were to modify a very large number of database objects (e.g., millions), this could lead to excessive memory consumption within the web worker process, potentially causing a Denial of Service (DoS) due to memory exhaustion.

LongRunningRequestProductAnnouncement(request=request, duration=duration)
return response
class AsyncSearchContextMiddleware(SearchContextMiddleware):
"""
Ensures Watson index updates are triggered asynchronously.
Inherits from watson's SearchContextMiddleware to minimize the amount of code we need to maintain.
"""
def _close_search_context(self, request):
"""Override watson's close behavior to trigger async updates when above threshold."""
if search_context_manager.is_active():
from django.conf import settings # noqa: PLC0415 circular import
# Extract tasks and check if we should trigger async update
captured_tasks = self._extract_tasks_for_async()
# Get total number of instances across all model types
total_instances = sum(len(pk_list) for pk_list in captured_tasks.values())
threshold = getattr(settings, "WATSON_ASYNC_INDEX_UPDATE_THRESHOLD", 100)
# If threshold is below 0, async updating is disabled
if threshold < 0:
logger.debug(f"AsyncSearchContextMiddleware: Async updating disabled (threshold={threshold}), using synchronous update")
elif total_instances > threshold:
logger.debug(f"AsyncSearchContextMiddleware: {total_instances} instances > {threshold} threshold, triggering async update")
self._trigger_async_index_update(captured_tasks)
# Invalidate to prevent synchronous index update by super()._close_search_context()
search_context_manager.invalidate()
else:
logger.debug(f"AsyncSearchContextMiddleware: {total_instances} instances <= {threshold} threshold, using synchronous update")
# Let watson handle synchronous update for small numbers
super()._close_search_context(request)
def _extract_tasks_for_async(self):
"""Extract tasks from the search context and group by model type for async processing."""
current_tasks, _is_invalid = search_context_manager._stack[-1]
# Group by model type for efficient batch processing
model_groups = {}
for _engine, obj in current_tasks:
model_key = f"{obj._meta.app_label}.{obj._meta.model_name}"
if model_key not in model_groups:
model_groups[model_key] = []
model_groups[model_key].append(obj.pk)
# Log what we extracted per model type
for model_key, pk_list in model_groups.items():
logger.debug(f"AsyncSearchContextMiddleware: Extracted {len(pk_list)} {model_key} instances for async indexing")
return model_groups
def _trigger_async_index_update(self, model_groups):
"""Trigger async tasks to update search indexes, chunking large lists into batches of settings.WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE."""
if not model_groups:
return
# Import here to avoid circular import
from django.conf import settings # noqa: PLC0415 circular import
from dojo.tasks import update_watson_search_index_for_model # noqa: PLC0415 circular import
# Create tasks per model type, chunking large lists into configurable batches
for model_name, pk_list in model_groups.items():
# Chunk into batches using configurable batch size (compatible with Python 3.11)
batch_size = getattr(settings, "WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE", 1000)
batches = [pk_list[i:i + batch_size] for i in range(0, len(pk_list), batch_size)]
# Create tasks for each batch and log each one
for i, batch in enumerate(batches, 1):
logger.debug(f"AsyncSearchContextMiddleware: Triggering batch {i}/{len(batches)} for {model_name}: {len(batch)} instances")
update_watson_search_index_for_model(model_name, batch)


All finding details can be found in the DryRun Security Dashboard.

@github-actions
Copy link
Contributor

This pull request has conflicts, please resolve those before we can evaluate the pull request.

@github-actions
Copy link
Contributor

Conflicts have been resolved. A maintainer will review the pull request shortly.

Copy link
Contributor

@mtesauro mtesauro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved

@valentijnscholten valentijnscholten merged commit 3cd24d1 into DefectDojo:dev Sep 16, 2025
84 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

settings_changes Needs changes to settings.py based on changes in settings.dist.py included in this PR unittests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants