Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions dojo/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from django.http import HttpResponseRedirect
from django.urls import reverse
from django.utils.functional import SimpleLazyObject
from watson.middleware import SearchContextMiddleware
from watson.search import search_context_manager

from dojo.models import Dojo_User
from dojo.product_announcements import LongRunningRequestProductAnnouncement
Expand Down Expand Up @@ -210,3 +212,76 @@ def __call__(self, request):
LongRunningRequestProductAnnouncement(request=request, duration=duration)

return response


class AsyncSearchContextMiddleware(SearchContextMiddleware):

"""
Ensures Watson index updates are triggered asynchronously.
Inherits from watson's SearchContextMiddleware to minimize the amount of code we need to maintain.
"""

def _close_search_context(self, request):
"""Override watson's close behavior to trigger async updates when above threshold."""
if search_context_manager.is_active():
from django.conf import settings # noqa: PLC0415 circular import

# Extract tasks and check if we should trigger async update
captured_tasks = self._extract_tasks_for_async()

# Get total number of instances across all model types
total_instances = sum(len(pk_list) for pk_list in captured_tasks.values())
threshold = getattr(settings, "WATSON_ASYNC_INDEX_UPDATE_THRESHOLD", 100)

# If threshold is below 0, async updating is disabled
if threshold < 0:
logger.debug(f"AsyncSearchContextMiddleware: Async updating disabled (threshold={threshold}), using synchronous update")
elif total_instances > threshold:
logger.debug(f"AsyncSearchContextMiddleware: {total_instances} instances > {threshold} threshold, triggering async update")
self._trigger_async_index_update(captured_tasks)
# Invalidate to prevent synchronous index update by super()._close_search_context()
search_context_manager.invalidate()
else:
logger.debug(f"AsyncSearchContextMiddleware: {total_instances} instances <= {threshold} threshold, using synchronous update")
# Let watson handle synchronous update for small numbers

super()._close_search_context(request)

def _extract_tasks_for_async(self):
"""Extract tasks from the search context and group by model type for async processing."""
current_tasks, _is_invalid = search_context_manager._stack[-1]

# Group by model type for efficient batch processing
model_groups = {}
for _engine, obj in current_tasks:
model_key = f"{obj._meta.app_label}.{obj._meta.model_name}"
if model_key not in model_groups:
model_groups[model_key] = []
model_groups[model_key].append(obj.pk)

# Log what we extracted per model type
for model_key, pk_list in model_groups.items():
logger.debug(f"AsyncSearchContextMiddleware: Extracted {len(pk_list)} {model_key} instances for async indexing")

return model_groups

def _trigger_async_index_update(self, model_groups):
"""Trigger async tasks to update search indexes, chunking large lists into batches of settings.WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE."""
if not model_groups:
return

# Import here to avoid circular import
from django.conf import settings # noqa: PLC0415 circular import

from dojo.tasks import update_watson_search_index_for_model # noqa: PLC0415 circular import

# Create tasks per model type, chunking large lists into configurable batches
for model_name, pk_list in model_groups.items():
# Chunk into batches using configurable batch size (compatible with Python 3.11)
batch_size = getattr(settings, "WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE", 1000)
batches = [pk_list[i:i + batch_size] for i in range(0, len(pk_list), batch_size)]

# Create tasks for each batch and log each one
for i, batch in enumerate(batches, 1):
logger.debug(f"AsyncSearchContextMiddleware: Triggering batch {i}/{len(batches)} for {model_name}: {len(batch)} instances")
update_watson_search_index_for_model(model_name, batch)
11 changes: 9 additions & 2 deletions dojo/settings/settings.dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@
DD_CELERY_TASK_SERIALIZER=(str, "pickle"),
DD_CELERY_PASS_MODEL_BY_ID=(str, True),
DD_CELERY_LOG_LEVEL=(str, "INFO"),
# Minimum number of model updated instances before search index updates as performaed asynchronously. Set to -1 to disable async updates.
DD_WATSON_ASYNC_INDEX_UPDATE_THRESHOLD=(int, 100),
DD_WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE=(int, 1000),
DD_FOOTER_VERSION=(str, ""),
# models should be passed to celery by ID, default is False (for now)
DD_FORCE_LOWERCASE_TAGS=(bool, True),
Expand Down Expand Up @@ -913,9 +916,9 @@ def generate_url(scheme, double_slashes, user, password, host, port, path, param
"dojo.middleware.LoginRequiredMiddleware",
"dojo.middleware.AdditionalHeaderMiddleware",
"social_django.middleware.SocialAuthExceptionMiddleware",
"watson.middleware.SearchContextMiddleware",
"dojo.middleware.AuditlogMiddleware",
"crum.CurrentRequestUserMiddleware",
"dojo.middleware.AuditlogMiddleware",
"dojo.middleware.AsyncSearchContextMiddleware",
"dojo.request_cache.middleware.RequestCacheMiddleware",
"dojo.middleware.LongRunningRequestAlertMiddleware",
]
Expand Down Expand Up @@ -1156,6 +1159,10 @@ def saml2_attrib_map_format(din):

CELERY_IMPORTS = ("dojo.tools.tool_issue_updater", )

# Watson async index update settings
WATSON_ASYNC_INDEX_UPDATE_THRESHOLD = env("DD_WATSON_ASYNC_INDEX_UPDATE_THRESHOLD")
WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE = env("DD_WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE")

# Celery beat scheduled tasks
CELERY_BEAT_SCHEDULE = {
"add-alerts": {
Expand Down
52 changes: 52 additions & 0 deletions dojo/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
from auditlog.models import LogEntry
from celery.utils.log import get_task_logger
from dateutil.relativedelta import relativedelta
from django.apps import apps
from django.conf import settings
from django.core.management import call_command
from django.db.models import Count, Prefetch
from django.urls import reverse
from django.utils import timezone

from dojo.celery import app
from dojo.decorators import dojo_async_task
from dojo.finding.helper import fix_loop_duplicates
from dojo.management.commands.jira_status_reconciliation import jira_status_reconciliation
from dojo.models import Alerts, Announcement, Endpoint, Engagement, Finding, Product, System_Settings, User
Expand Down Expand Up @@ -222,3 +224,53 @@ def evaluate_pro_proposition(*args, **kwargs):
@app.task
def clear_sessions(*args, **kwargs):
call_command("clearsessions")


@dojo_async_task
@app.task
def update_watson_search_index_for_model(model_name, pk_list, *args, **kwargs):
"""
Async task to update watson search indexes for a specific model type.

Args:
model_key: Model identifier like 'dojo.finding'
pk_list: List of primary keys for instances of this model type. it's advised to chunk the list into batches of 1000 or less.

"""
from watson.search import SearchContextManager, default_search_engine # noqa: PLC0415 circular import

logger.debug(f"Starting async watson index update for {len(pk_list)} {model_name} instances")

try:
# Create new SearchContextManager and start it
context_manager = SearchContextManager()
context_manager.start()

# Get the default engine and model class
engine = default_search_engine
app_label, model_name = model_name.split(".")
model_class = apps.get_model(app_label, model_name)

# Bulk load instances and add them to the context
instances = model_class.objects.filter(pk__in=pk_list)
instances_added = 0
instances_skipped = 0

for instance in instances:
try:
# Add to watson context (this will trigger indexing on end())
context_manager.add_to_context(engine, instance)
instances_added += 1
except Exception as e:
logger.warning(f"Skipping {model_name}:{instance.pk} - {e}")
instances_skipped += 1
continue

# Let watson handle the bulk indexing
context_manager.end()

logger.info(f"Completed async watson index update: {instances_added} updated, {instances_skipped} skipped")

except Exception as e:
logger.error(f"Watson async index update failed for {model_name}: {e}")
raise
44 changes: 44 additions & 0 deletions unittests/scans/acunetix/watson_test_unique.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?xml version="1.0"?>
<ScanGroup>
<Scan>
<Name>WatsonUniqueTest</Name>
<ShortName>Watson Unique Short Name</ShortName>
<StartURL>https://watson-unique-test.com</StartURL>
<StartTime>10/09/2025, 18:09:55</StartTime>
<FinishTime>10/09/2025, 21:42:41</FinishTime>
<ScanTime>212 minutes, 4 seconds</ScanTime>
<Aborted>False</Aborted>
<Responsive>True</Responsive>
<Banner></Banner>
<Os></Os>
<WebServer>Apache-Coyote/1.1</WebServer>
<ReportItems>
<ReportItem>
<Name>WatsonUniqueReportItem2025</Name>
<ModuleName>WatsonUniqueTestModule</ModuleName>
<Details>Watson Unique Test Details</Details>
<Affects><![CDATA[/watson/unique/path]]></Affects>
<Parameter></Parameter>
<AOP_SourceFile></AOP_SourceFile>
<AOP_SourceLine></AOP_SourceLine>
<AOP_Additional></AOP_Additional>
<IsFalsePositive></IsFalsePositive>
<Severity>high</Severity>
<Type>xss</Type>
<Impact>Watson Unique Test Impact XYZ123</Impact>
<DetailedInformation>Watson Unique Test Detailed Information ABC456</DetailedInformation>
<Recommendation>Watson Unique Test Recommendation DEF789</Recommendation>
<Description>Watson Unique Test Description GHI000</Description>
<CWEList>
<CWE id="79"><![CDATA[CWE-79]]></CWE>
</CWEList>
<References>
<Reference>
<URL>https://watson-unique-ref.com</URL>
</Reference>
</References>
</ReportItem>
</ReportItems>
</Scan>
</ScanGroup>

114 changes: 114 additions & 0 deletions unittests/test_watson_async_search_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from django.contrib.auth.models import User
from django.test import override_settings
from django.utils import timezone
from rest_framework.authtoken.models import Token
from rest_framework.test import APIClient
from watson import search as watson

from dojo.models import Development_Environment, Engagement, Finding, Product, Product_Type, UserContactInfo

from .dojo_test_case import DojoAPITestCase


class TestWatsonAsyncSearchIndex(DojoAPITestCase):

"""Test Watson search indexing works correctly for both sync and async updates."""

def setUp(self):
"""Set up test data and API client."""
super().setUp()

self.testuser = User.objects.create(username="admin", is_staff=True, is_superuser=True)
UserContactInfo.objects.create(user=self.testuser, block_execution=True)

self.system_settings(enable_webhooks_notifications=False)
self.system_settings(enable_product_grade=False)
self.system_settings(enable_github=False)

# Create API client with authentication
self.token = Token.objects.create(user=self.testuser)
self.client = APIClient()
self.client.force_login(self.testuser)

# Create test product type and product
self.product_type = Product_Type.objects.create(name="Test Product Type")
self.product = Product.objects.create(
name="Test Product",
description="Test product for Watson indexing",
prod_type=self.product_type,
)
self.engagement = Engagement.objects.create(
name="Test Engagement",
product=self.product,
target_start=timezone.now(),
target_end=timezone.now(),
)

# Create Development Environment
Development_Environment.objects.get_or_create(name="Development")

def _import_acunetix_scan(self):
"""Import an Acunetix scan and return the response."""
return self.import_scan_with_params(
filename="scans/acunetix/watson_test_unique.xml",
scan_type="Acunetix Scan",
engagement=self.engagement.id,
)

def _search_watson_for_finding(self, search_term):
"""Search Watson index for findings containing the search term."""
# Search the Watson index
return watson.search(search_term, models=(Finding,))

def _import_and_check_watson_index(self, expected_message):
"""Common test logic for importing scan and verifying Watson indexing works."""
# Verify no findings exist before import
search_results = self._search_watson_for_finding("WatsonUniqueReportItem2025")
found_finding_ids_before = [result.pk for result in search_results]
self.assertEqual(len(found_finding_ids_before), 0, "Should have no findings before import")

# Import the scan
response_data = self._import_acunetix_scan()

# Get test ID from response
test_id = response_data["test_id"]

# Verify finding was created
findings = Finding.objects.filter(test_id=test_id)
self.assertEqual(findings.count(), 1, "Should have created exactly one finding")
finding = findings.first()

self.assertIn("WatsonUniqueReportItem2025", finding.title, "Finding should contain 'WatsonUniqueReportItem2025' in title")

# Search Watson index for the finding
search_results = self._search_watson_for_finding("WatsonUniqueReportItem2025")

# Verify finding is in search index
found_finding_ids = [result.object.pk for result in search_results]

self.assertIn(finding.pk, found_finding_ids, expected_message.format(finding_id=finding.pk))

return finding

def test_sync_watson_indexing_single_finding(self):
"""Test that single finding import uses sync indexing and finding is searchable."""
# Default threshold is 100, so single finding should use sync indexing
self._import_and_check_watson_index(
"Finding {finding_id} should be found in Watson search index",
)

@override_settings(WATSON_ASYNC_INDEX_UPDATE_THRESHOLD=0)
def test_async_watson_indexing_single_finding(self):
"""Test that with threshold=0, single finding uses async indexing and is searchable."""
# With threshold=0, even single finding should trigger async indexing
self._import_and_check_watson_index(
"Finding {finding_id} should be found in Watson search index after async update",
)

@override_settings(WATSON_ASYNC_INDEX_UPDATE_THRESHOLD=-1)
def test_disabled_async_watson_indexing(self):
"""Test that with threshold=-1, async is disabled and sync indexing works."""
# With threshold=-1, async should be completely disabled
self._import_and_check_watson_index(
"Finding {finding_id} should be found in Watson search index with sync update",
)