Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,95 @@ def delete_all_documents(self, recreate_index: bool = False) -> None: # noqa: F
msg = f"Failed to delete all documents from Azure AI Search: {e!s}"
raise HttpResponseError(msg) from e

def delete_by_filter(self, filters: dict[str, Any]) -> int:
"""
Deletes all documents that match the provided filters.

Azure AI Search does not support server-side delete by query, so this method
first searches for matching documents, then deletes them in a batch operation.

:param filters: The filters to apply to select documents for deletion.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:returns: The number of documents deleted.
"""
try:
normalized_filters = _normalize_filters(filters)

# Search for all documents matching the filter (only need the id field)
results = list(self.client.search(search_text="*", filter=normalized_filters, select=["id"], top=100000))

if not results:
return 0

# Prepare documents for deletion
documents_to_delete = [{"id": doc["id"]} for doc in results]

# Delete the documents
self.client.delete_documents(documents=documents_to_delete)

logger.info(
"Deleted {n_docs} documents from index '{idx_name}' using filters.",
n_docs=len(documents_to_delete),
idx_name=self._index_name,
)
return len(documents_to_delete)

except Exception as e:
msg = f"Failed to delete documents by filter from Azure AI Search: {e!s}"
raise HttpResponseError(msg) from e

def update_by_filter(self, filters: dict[str, Any], fields: dict[str, Any]) -> int:
"""
Updates the fields of all documents that match the provided filters.

Azure AI Search does not support server-side update by query, so this method
first searches for matching documents, then updates them using merge operations.

:param filters: The filters to apply to select documents for updating.
For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering)
:param fields: The fields to update. These fields must exist in the index schema.
:returns: The number of documents updated.
"""
try:
# Validate that fields to update exist in the index schema
invalid_fields = [key for key in fields.keys() if key not in self._index_fields]
if invalid_fields:
msg = f"Fields {invalid_fields} are not defined in index schema. Available fields: {self._index_fields}"
raise ValueError(msg)

normalized_filters = _normalize_filters(filters)

# Search for all documents matching the filter (only need the id field)
results = list(self.client.search(search_text="*", filter=normalized_filters, select=["id"], top=100000))

if not results:
return 0

# Prepare documents for merge (partial update)
# Each document needs its id plus the fields to update
documents_to_update = []
for doc in results:
update_doc = {"id": doc["id"]}
update_doc.update(fields)
documents_to_update.append(update_doc)

# Use merge_documents for partial updates
self.client.merge_documents(documents=documents_to_update)

logger.info(
"Updated {n_docs} documents in index '{idx_name}' using filters.",
n_docs=len(documents_to_update),
idx_name=self._index_name,
)
return len(documents_to_update)

except ValueError:
# Re-raise ValueError for invalid fields without wrapping
raise
except Exception as e:
msg = f"Failed to update documents by filter in Azure AI Search: {e!s}"
raise HttpResponseError(msg) from e

def get_documents_by_id(self, document_ids: list[str]) -> list[Document]:
return self._convert_search_result_to_documents(self._get_raw_documents_by_id(document_ids))

Expand Down
14 changes: 14 additions & 0 deletions integrations/azure_ai_search/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ def document_store(request):
original_write_documents = store.write_documents
original_delete_documents = store.delete_documents
original_delete_all_documents = store.delete_all_documents
original_delete_by_filter = store.delete_by_filter
original_update_by_filter = store.update_by_filter

def write_documents_and_wait(documents, policy=DuplicatePolicy.OVERWRITE):
written_docs = original_write_documents(documents, policy)
Expand All @@ -63,6 +65,16 @@ def delete_all_documents_and_wait():
original_delete_all_documents()
time.sleep(SLEEP_TIME_IN_SECONDS)

def delete_by_filter_and_wait(filters):
deleted_count = original_delete_by_filter(filters)
time.sleep(SLEEP_TIME_IN_SECONDS)
return deleted_count

def update_by_filter_and_wait(filters, fields):
updated_count = original_update_by_filter(filters, fields)
time.sleep(SLEEP_TIME_IN_SECONDS)
return updated_count

# Helper function to wait for the index to be deleted, needed to cover latency
def wait_for_index_deletion(client, index_name):
start_time = time.time()
Expand All @@ -75,6 +87,8 @@ def wait_for_index_deletion(client, index_name):
store.write_documents = write_documents_and_wait
store.delete_documents = delete_documents_and_wait
store.delete_all_documents = delete_all_documents_and_wait
store.delete_by_filter = delete_by_filter_and_wait
store.update_by_filter = update_by_filter_and_wait

yield store
try:
Expand Down
99 changes: 99 additions & 0 deletions integrations/azure_ai_search/tests/test_document_store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-FileCopyrightText: 2023-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

import os
import random
from datetime import datetime, timezone
Expand Down Expand Up @@ -302,6 +303,104 @@ def test_delete_all_documents_empty_index(self, document_store: AzureAISearchDoc
document_store.delete_all_documents()
assert document_store.count_documents() == 0

def test_delete_by_filter(self, document_store: AzureAISearchDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A"}),
Document(content="Doc 2", meta={"category": "B"}),
Document(content="Doc 3", meta={"category": "A"}),
]
document_store.write_documents(docs)
assert document_store.count_documents() == 3

# Delete documents with category="A"
deleted_count = document_store.delete_by_filter(
filters={"field": "meta.category", "operator": "==", "value": "A"}
)
assert deleted_count == 2
assert document_store.count_documents() == 1

# Verify only category B remains
remaining_docs = document_store.filter_documents()
assert len(remaining_docs) == 1
assert remaining_docs[0].meta["category"] == "B"

def test_delete_by_filter_no_matches(self, document_store: AzureAISearchDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A"}),
Document(content="Doc 2", meta={"category": "B"}),
]
document_store.write_documents(docs)
assert document_store.count_documents() == 2

# Try to delete documents with category="C" (no matches)
deleted_count = document_store.delete_by_filter(
filters={"field": "meta.category", "operator": "==", "value": "C"}
)
assert deleted_count == 0
assert document_store.count_documents() == 2

def test_update_by_filter(self, document_store: AzureAISearchDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A", "status": "draft"}),
Document(content="Doc 2", meta={"category": "B", "status": "draft"}),
Document(content="Doc 3", meta={"category": "A", "status": "draft"}),
]
document_store.write_documents(docs)
assert document_store.count_documents() == 3

# Update status for category="A" documents
updated_count = document_store.update_by_filter(
filters={"field": "meta.category", "operator": "==", "value": "A"},
fields={"status": "published"},
)
assert updated_count == 2

# Verify the updates
published_docs = document_store.filter_documents(
filters={"field": "meta.status", "operator": "==", "value": "published"}
)
assert len(published_docs) == 2
for doc in published_docs:
assert doc.meta["category"] == "A"
assert doc.meta["status"] == "published"

# Verify category B still has draft status
draft_docs = document_store.filter_documents(
filters={"field": "meta.status", "operator": "==", "value": "draft"}
)
assert len(draft_docs) == 1
assert draft_docs[0].meta["category"] == "B"

def test_update_by_filter_no_matches(self, document_store: AzureAISearchDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A", "status": "draft"}),
Document(content="Doc 2", meta={"category": "B", "status": "draft"}),
]
document_store.write_documents(docs)
assert document_store.count_documents() == 2

# Try to update documents with category="C" (no matches)
updated_count = document_store.update_by_filter(
filters={"field": "meta.category", "operator": "==", "value": "C"},
fields={"status": "published"},
)
assert updated_count == 0

def test_update_by_filter_invalid_field(self, document_store: AzureAISearchDocumentStore):
docs = [
Document(content="Doc 1", meta={"category": "A", "status": "draft"}),
]
document_store.write_documents(docs)

# Try to update a field that doesn't exist in the schema
with pytest.raises(ValueError) as exc_info:
document_store.update_by_filter(
filters={"field": "meta.category", "operator": "==", "value": "A"},
fields={"nonexistent_field": "value"},
)
assert "nonexistent_field" in str(exc_info.value)
assert "not defined in the index schema" in str(exc_info.value)


def _random_embeddings(n):
return [round(random.random(), 7) for _ in range(n)] # nosec: S311
Expand Down
Loading