diff --git a/infrastructure/rag/values.yaml b/infrastructure/rag/values.yaml index 507e06d2..31903790 100644 --- a/infrastructure/rag/values.yaml +++ b/infrastructure/rag/values.yaml @@ -192,6 +192,14 @@ backend: stackitEmbedder: STACKIT_EMBEDDER_MODEL: "intfloat/e5-mistral-7b-instruct" STACKIT_EMBEDDER_BASE_URL: https://api.openai-compat.model-serving.eu01.onstackit.cloud/v1 + # Retry settings (optional). If omitted, fall back to shared RETRY_DECORATOR_* values. + STACKIT_EMBEDDER_MAX_RETRIES: "5" + STACKIT_EMBEDDER_RETRY_BASE_DELAY: "0.5" + STACKIT_EMBEDDER_RETRY_MAX_DELAY: "600" + STACKIT_EMBEDDER_BACKOFF_FACTOR: "2" + STACKIT_EMBEDDER_ATTEMPT_CAP: "6" + STACKIT_EMBEDDER_JITTER_MIN: "0.05" + STACKIT_EMBEDDER_JITTER_MAX: "0.25" ollama: OLLAMA_MODEL: "llama3.2:3b-instruct-fp16" OLLAMA_BASE_URL: "http://rag-ollama:11434" @@ -314,6 +322,7 @@ adminBackend: summarizer: SUMMARIZER_MAXIMUM_INPUT_SIZE: "8000" SUMMARIZER_MAXIMUM_CONCURRENCY: "10" + # Retry settings (optional). If omitted, fall back to shared RETRY_DECORATOR_* values. SUMMARIZER_MAX_RETRIES: "5" SUMMARIZER_RETRY_BASE_DELAY: "0.5" SUMMARIZER_RETRY_MAX_DELAY: "600" diff --git a/libs/README.md b/libs/README.md index f2b82e13..960e7cfb 100644 --- a/libs/README.md +++ b/libs/README.md @@ -8,6 +8,7 @@ It consists of the following python packages: - [1.1 Requirements](#11-requirements) - [1.2 Endpoints](#12-endpoints) - [1.3 Replaceable parts](#13-replaceable-parts) + - [1.4 Embedder retry behavior](#14-embedder-retry-behavior) - [`2. Admin API lib`](#2-admin-api-lib) - [2.1 Requirements](#21-requirements) - [2.2 Endpoints](#22-endpoints) @@ -99,6 +100,32 @@ Uploaded documents are required to contain the following metadata: | chat_endpoint | [`rag_core_api.api_endpoints.chat.Chat`](./rag-core-api/src/rag_core_api/api_endpoints/chat.py) | [`rag_core_api.impl.api_endpoints.default_chat.DefaultChat`](./rag-core-api/src/rag_core_api/impl/api_endpoints/default_chat.py) | Implementation of the chat endpoint. Default implementation just calls the *traced_chat_graph* | | ragas_llm | `langchain_core.language_models.chat_models.BaseChatModel` | `langchain_openai.ChatOpenAI` or `langchain_ollama.ChatOllama` | The LLM used for the ragas evaluation. | +### 1.4 Embedder retry behavior + +The default STACKIT embedder implementation (`StackitEmbedder`) uses the shared retry decorator with exponential backoff from the `rag-core-lib`. + +- Decorator: `rag_core_lib.impl.utils.retry_decorator.retry_with_backoff` +- Base settings (fallback): [`RetryDecoratorSettings`](./rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py) +- Per-embedder overrides: [`StackitEmbedderSettings`](./rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py) + +How it resolves settings + +- Each retry-related field in `StackitEmbedderSettings` is optional. When a field is provided (not None), it overrides the corresponding value from `RetryDecoratorSettings`. +- When a field is not provided (None), the embedder falls back to the value from `RetryDecoratorSettings`. + +Configuring via environment variables + +- Embedder-specific (prefix `STACKIT_EMBEDDER_`): + - `STACKIT_EMBEDDER_MAX_RETRIES` + - `STACKIT_EMBEDDER_RETRY_BASE_DELAY` + - `STACKIT_EMBEDDER_RETRY_MAX_DELAY` + - `STACKIT_EMBEDDER_BACKOFF_FACTOR` + - `STACKIT_EMBEDDER_ATTEMPT_CAP` + - `STACKIT_EMBEDDER_JITTER_MIN` + - `STACKIT_EMBEDDER_JITTER_MAX` +- Global fallback (prefix `RETRY_DECORATOR_`): see section [4.2](#42-retry-decorator-exponential-backoff) for all keys and defaults. +- Helm chart: set the same keys under `backend.envs.stackitEmbedder` in [infrastructure/rag/values.yaml](../infrastructure/rag/values.yaml). + ## 2. Admin API Lib The Admin API Library contains all required components for file management capabilities for RAG systems, handling all document lifecycle operations. It also includes a default `dependency_container`, that is pre-configured and should fit most use-cases. diff --git a/libs/rag-core-api/src/rag_core_api/dependency_container.py b/libs/rag-core-api/src/rag_core_api/dependency_container.py index 57cff6e3..05d9d4da 100644 --- a/libs/rag-core-api/src/rag_core_api/dependency_container.py +++ b/libs/rag-core-api/src/rag_core_api/dependency_container.py @@ -63,6 +63,7 @@ from rag_core_lib.impl.settings.langfuse_settings import LangfuseSettings from rag_core_lib.impl.settings.ollama_llm_settings import OllamaSettings from rag_core_lib.impl.settings.rag_class_types_settings import RAGClassTypeSettings +from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings from rag_core_lib.impl.settings.stackit_vllm_settings import StackitVllmSettings from rag_core_lib.impl.tracers.langfuse_traced_runnable import LangfuseTracedRunnable from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore @@ -89,6 +90,7 @@ class DependencyContainer(DeclarativeContainer): stackit_embedder_settings = StackitEmbedderSettings() chat_history_settings = ChatHistorySettings() sparse_embedder_settings = SparseEmbedderSettings() + retry_decorator_settings = RetryDecoratorSettings() chat_history_config.from_dict(chat_history_settings.model_dump()) class_selector_config.from_dict(rag_class_type_settings.model_dump() | embedder_class_type_settings.model_dump()) @@ -98,7 +100,7 @@ class DependencyContainer(DeclarativeContainer): ollama=Singleton( LangchainCommunityEmbedder, embedder=Singleton(OllamaEmbeddings, **ollama_embedder_settings.model_dump()) ), - stackit=Singleton(StackitEmbedder, stackit_embedder_settings), + stackit=Singleton(StackitEmbedder, stackit_embedder_settings, retry_decorator_settings), ) sparse_embedder = Singleton(FastEmbedSparse, **sparse_embedder_settings.model_dump()) diff --git a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py index 65d67a1f..63fcad32 100644 --- a/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py +++ b/libs/rag-core-api/src/rag_core_api/impl/embeddings/stackit_embedder.py @@ -1,16 +1,23 @@ """Module that contains the StackitEmbedder class.""" from langchain_core.embeddings import Embeddings -from openai import OpenAI +from openai import OpenAI, APIConnectionError, APIError, APITimeoutError, RateLimitError from rag_core_api.embeddings.embedder import Embedder from rag_core_api.impl.settings.stackit_embedder_settings import StackitEmbedderSettings +import logging +from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings +from rag_core_lib.impl.utils.retry_decorator import retry_with_backoff + +logger = logging.getLogger(__name__) class StackitEmbedder(Embedder, Embeddings): """A class that represents any Langchain provided Embedder.""" - def __init__(self, stackit_embedder_settings: StackitEmbedderSettings): + def __init__( + self, stackit_embedder_settings: StackitEmbedderSettings, retry_decorator_settings: RetryDecoratorSettings + ): """ Initialize the StackitEmbedder with the given settings. @@ -18,12 +25,17 @@ def __init__(self, stackit_embedder_settings: StackitEmbedderSettings): ---------- stackit_embedder_settings : StackitEmbedderSettings The settings for configuring the StackitEmbedder, including the API key and base URL. + retry_decorator_settings : RetryDecoratorSettings + Default retry settings used as fallback when StackitEmbedderSettings leaves fields unset. """ self._client = OpenAI( api_key=stackit_embedder_settings.api_key, base_url=stackit_embedder_settings.base_url, ) self._settings = stackit_embedder_settings + self._retry_decorator_settings = self._create_retry_decorator_settings( + stackit_embedder_settings, retry_decorator_settings + ) def get_embedder(self) -> "StackitEmbedder": """Return the embedder instance. @@ -48,12 +60,16 @@ def embed_documents(self, texts: list[str]) -> list[list[float]]: list[list[float]] A list where each element is a list of floats representing the embedded vector of a document. """ - responses = self._client.embeddings.create( - input=texts, - model=self._settings.model, - ) - return [data.embedding for data in responses.data] + @self._retry_with_backoff_wrapper() + def _call(texts: list[str]) -> list[list[float]]: + responses = self._client.embeddings.create( + input=texts, + model=self._settings.model, + ) + return [data.embedding for data in responses.data] + + return _call(texts) def embed_query(self, text: str) -> list[float]: """ @@ -69,4 +85,54 @@ def embed_query(self, text: str) -> list[float]: list[float] The embedded representation of the query text. """ - return self.embed_documents([text])[0] + embeddings_list = self.embed_documents([text]) + if embeddings_list: + embeddings = embeddings_list[0] + return embeddings if embeddings else [] + logger.warning("No embeddings found for query: %s", text) + return embeddings_list + + def _create_retry_decorator_settings( + self, + stackit_settings: StackitEmbedderSettings, + retry_defaults: RetryDecoratorSettings, + ) -> RetryDecoratorSettings: + # Prefer values from StackitEmbedderSettings when provided; + # otherwise fall back to RetryDecoratorSettings defaults + return RetryDecoratorSettings( + max_retries=( + stackit_settings.max_retries if stackit_settings.max_retries is not None else retry_defaults.max_retries + ), + retry_base_delay=( + stackit_settings.retry_base_delay + if stackit_settings.retry_base_delay is not None + else retry_defaults.retry_base_delay + ), + retry_max_delay=( + stackit_settings.retry_max_delay + if stackit_settings.retry_max_delay is not None + else retry_defaults.retry_max_delay + ), + backoff_factor=( + stackit_settings.backoff_factor + if stackit_settings.backoff_factor is not None + else retry_defaults.backoff_factor + ), + attempt_cap=( + stackit_settings.attempt_cap if stackit_settings.attempt_cap is not None else retry_defaults.attempt_cap + ), + jitter_min=( + stackit_settings.jitter_min if stackit_settings.jitter_min is not None else retry_defaults.jitter_min + ), + jitter_max=( + stackit_settings.jitter_max if stackit_settings.jitter_max is not None else retry_defaults.jitter_max + ), + ) + + def _retry_with_backoff_wrapper(self): + return retry_with_backoff( + settings=self._retry_decorator_settings, + exceptions=(APIError, RateLimitError, APITimeoutError, APIConnectionError), + rate_limit_exceptions=(RateLimitError,), + logger=logger, + ) diff --git a/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py b/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py index e451f06c..ddf09b70 100644 --- a/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py +++ b/libs/rag-core-api/src/rag_core_api/impl/settings/stackit_embedder_settings.py @@ -1,6 +1,7 @@ """Module contains settings regarding the stackit embedder.""" -from pydantic import Field +from typing import Optional +from pydantic import Field, PositiveInt from pydantic_settings import BaseSettings @@ -17,6 +18,20 @@ class StackitEmbedderSettings(BaseSettings): (default "https://e629124b-accc-4e25-a1cc-dc57ac741e1d.model-serving.eu01.onstackit.cloud/v1"). api_key : str The API key for authentication. + max_retries: Optional[PositiveInt] + Total retries, not counting the initial attempt. + retry_base_delay: Optional[float] + Base delay in seconds for the first retry. + retry_max_delay: Optional[float] + Maximum delay cap in seconds for any single wait. + backoff_factor: Optional[float] + Exponential backoff factor (>= 1). + attempt_cap: Optional[int] + Cap for exponent growth (backoff_factor ** attempt_cap). + jitter_min: Optional[float] + Minimum jitter in seconds. + jitter_max: Optional[float] + Maximum jitter in seconds. """ class Config: @@ -28,3 +43,44 @@ class Config: model: str = Field(default="intfloat/e5-mistral-7b-instruct") base_url: str = Field(default="https://e629124b-accc-4e25-a1cc-dc57ac741e1d.model-serving.eu01.onstackit.cloud/v1") api_key: str = Field(default="") + max_retries: Optional[PositiveInt] = Field( + default=None, + title="Max Retries", + description="Total retries, not counting the initial attempt.", + ) + retry_base_delay: Optional[float] = Field( + default=None, + ge=0, + title="Retry Base Delay", + description="Base delay in seconds for the first retry.", + ) + retry_max_delay: Optional[float] = Field( + default=None, + gt=0, + title="Retry Max Delay", + description="Maximum delay cap in seconds for any single wait.", + ) + backoff_factor: Optional[float] = Field( + default=None, + ge=1.0, + title="Backoff Factor", + description="Exponential backoff factor (>= 1).", + ) + attempt_cap: Optional[int] = Field( + default=None, + ge=0, + title="Attempt Cap", + description="Cap for exponent growth (backoff_factor ** attempt_cap).", + ) + jitter_min: Optional[float] = Field( + default=None, + ge=0.0, + title="Jitter Min (s)", + description="Minimum jitter in seconds.", + ) + jitter_max: Optional[float] = Field( + default=None, + ge=0.0, + title="Jitter Max (s)", + description="Maximum jitter in seconds.", + )