Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
1db3fe7
feat: implement retry decorator with exponential backoff and rate lim…
a-klos Sep 2, 2025
cbdd287
refactor: improve retry decorator by separating async and sync logic;…
a-klos Sep 2, 2025
815ca87
test: add comprehensive tests for retry decorator with async and sync…
a-klos Sep 2, 2025
cdb5445
feat: add retry decorator configuration and update deployment templates
a-klos Sep 2, 2025
72e2480
docs: add documentation for retry decorator with exponential backoff …
a-klos Sep 2, 2025
bde7a8c
feat: add pytest-asyncio dependency for improved async testing support
a-klos Sep 2, 2025
b65c696
Merge branch 'main' into feat/exponential-retry-decorator
a-klos Sep 3, 2025
657715c
docs: Update libs/rag-core-lib/src/rag_core_lib/impl/settings/retry_d…
a-klos Sep 24, 2025
f94ab4e
chore: Update libs/rag-core-lib/src/rag_core_lib/impl/utils/retry_dec…
a-klos Sep 24, 2025
2dd24a0
chore: merge main
a-klos Sep 24, 2025
ae1f8e1
Merge branch 'feat/exponential-retry-decorator' of github.com:stackit…
a-klos Sep 24, 2025
8acf200
refactor: equip the langchain summarizer with a retry decorator. (#90)
a-klos Oct 9, 2025
2866593
refactor: remove redundant validation checks in RetryDecoratorSettings
a-klos Oct 9, 2025
210c241
refactor: equip the stackit embedder with a retry decorator. (#91)
a-klos Oct 9, 2025
f87d9a0
chore: merge main
a-klos Oct 9, 2025
ba120c1
refactor: clean up import statements and streamline retry decorator s…
a-klos Oct 9, 2025
63b9eae
refactor: improve readability of settings initialization in create_re…
a-klos Oct 9, 2025
c317897
refactor: enhance settings initialization and validation in Summarize…
a-klos Oct 9, 2025
b4f291b
refactor: add validation for jitter_min and jitter_max in Summarizer …
a-klos Oct 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions infrastructure/rag/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
{{- printf "%s-usecase-configmap" .Release.Name | trunc 63 | trimSuffix "-" -}}
{{- end -}}

{{- define "configmap.retryDecoratorName" -}}
{{- printf "%s-retry-decorator-configmap" .Release.Name | trunc 63 | trimSuffix "-" -}}
{{- end -}}

{{- define "secret.usecaseName" -}}
{{- printf "%s-usecase-secret" .Release.Name | trunc 63 | trimSuffix "-" -}}
{{- end -}}
2 changes: 2 additions & 0 deletions infrastructure/rag/templates/admin-backend/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ spec:
name: {{ template "configmap.keyValueStoreName" . }}
- configMapRef:
name: {{ template "configmap.sourceUploaderName" . }}
- configMapRef:
name: {{ template "configmap.retryDecoratorName" . }}
- secretRef:
name: {{ template "secret.langfuseName" . }}
- secretRef:
Expand Down
2 changes: 2 additions & 0 deletions infrastructure/rag/templates/backend/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ spec:
name: {{ template "configmap.fakeEmbedderName" . }}
- configMapRef:
name: {{ template "configmap.chatHistoryName" . }}
- configMapRef:
name: {{ template "configmap.retryDecoratorName" . }}
- secretRef:
name: {{ template "secret.langfuseName" . }}
- secretRef:
Expand Down
9 changes: 9 additions & 0 deletions infrastructure/rag/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,12 @@ data:
{{- range $key, $value := .Values.shared.envs.usecase }}
{{ $key }}: {{ $value | quote }}
{{- end }}
---
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ template "configmap.retryDecoratorName" . }}
data:
{{- range $key, $value := .Values.shared.envs.retryDecorator }}
{{ $key }}: {{ $value | quote }}
{{- end }}
24 changes: 24 additions & 0 deletions infrastructure/rag/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,14 @@ backend:
stackitEmbedder:
STACKIT_EMBEDDER_MODEL: "intfloat/e5-mistral-7b-instruct"
STACKIT_EMBEDDER_BASE_URL: https://api.openai-compat.model-serving.eu01.onstackit.cloud/v1
# Retry settings (optional). If omitted, fall back to shared RETRY_DECORATOR_* values.
STACKIT_EMBEDDER_MAX_RETRIES: "5"
STACKIT_EMBEDDER_RETRY_BASE_DELAY: "0.5"
STACKIT_EMBEDDER_RETRY_MAX_DELAY: "600"
STACKIT_EMBEDDER_BACKOFF_FACTOR: "2"
STACKIT_EMBEDDER_ATTEMPT_CAP: "6"
STACKIT_EMBEDDER_JITTER_MIN: "0.05"
STACKIT_EMBEDDER_JITTER_MAX: "0.25"
ollama:
OLLAMA_MODEL: "llama3.2:3b-instruct-fp16"
OLLAMA_BASE_URL: "http://rag-ollama:11434"
Expand Down Expand Up @@ -319,6 +327,14 @@ adminBackend:
summarizer:
SUMMARIZER_MAXIMUM_INPUT_SIZE: "8000"
SUMMARIZER_MAXIMUM_CONCURRENCY: "10"
# Retry settings (optional). If omitted, fall back to shared RETRY_DECORATOR_* values.
SUMMARIZER_MAX_RETRIES: "5"
SUMMARIZER_RETRY_BASE_DELAY: "0.5"
SUMMARIZER_RETRY_MAX_DELAY: "600"
SUMMARIZER_BACKOFF_FACTOR: "2"
SUMMARIZER_ATTEMPT_CAP: "6"
SUMMARIZER_JITTER_MIN: "0.05"
SUMMARIZER_JITTER_MAX: "0.25"
ragapi:
RAG_API_HOST: "http://backend:8080"
chunker:
Expand Down Expand Up @@ -446,6 +462,14 @@ shared:
s3:
S3_ENDPOINT: http://rag-minio:9000
S3_BUCKET: documents
retryDecorator:
RETRY_DECORATOR_MAX_RETRIES: "5"
RETRY_DECORATOR_RETRY_BASE_DELAY: "0.5"
RETRY_DECORATOR_RETRY_MAX_DELAY: "600"
RETRY_DECORATOR_BACKOFF_FACTOR: "2"
RETRY_DECORATOR_ATTEMPT_CAP: "6"
RETRY_DECORATOR_JITTER_MIN: "0.05"
RETRY_DECORATOR_JITTER_MAX: "0.25"
usecase:


Expand Down
116 changes: 112 additions & 4 deletions libs/README.md

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion libs/admin-api-lib/src/admin_api_lib/dependency_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
from rag_core_lib.impl.settings.langfuse_settings import LangfuseSettings
from rag_core_lib.impl.settings.ollama_llm_settings import OllamaSettings
from rag_core_lib.impl.settings.rag_class_types_settings import RAGClassTypeSettings
from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings
from rag_core_lib.impl.settings.stackit_vllm_settings import StackitVllmSettings
from rag_core_lib.impl.tracers.langfuse_traced_runnable import LangfuseTracedRunnable
from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore
Expand All @@ -86,6 +87,7 @@ class DependencyContainer(DeclarativeContainer):
key_value_store_settings = KeyValueSettings()
summarizer_settings = SummarizerSettings()
source_uploader_settings = SourceUploaderSettings()
retry_decorator_settings = RetryDecoratorSettings()

key_value_store = Singleton(FileStatusKeyValueStore, key_value_store_settings)
file_service = Singleton(S3Service, s3_settings=s3_settings)
Expand Down Expand Up @@ -136,7 +138,9 @@ class DependencyContainer(DeclarativeContainer):
LangchainSummarizer,
langfuse_manager=langfuse_manager,
chunker=summary_text_splitter,
semaphore=Singleton(AsyncThreadsafeSemaphore, summarizer_settings.maximum_concurrreny),
semaphore=Singleton(AsyncThreadsafeSemaphore, summarizer_settings.maximum_concurrency),
summarizer_settings=summarizer_settings,
retry_decorator_settings=retry_decorator_settings,
)

summary_enhancer = List(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Contains settings for summarizer."""

from pydantic import Field
from pydantic_settings import BaseSettings
from typing import Optional
from pydantic import Field, PositiveInt, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict


class SummarizerSettings(BaseSettings):
Expand All @@ -12,15 +13,74 @@ class SummarizerSettings(BaseSettings):
----------
maximum_input_size : int
The maximum size of the input that the summarizer can handle. Default is 8000.
maximum_concurrreny : int
maximum_concurrency : int
The maximum number of concurrent summarization processes. Default is 10.
max_retries: Optional[PositiveInt]
Total retries, not counting the initial attempt.
retry_base_delay: Optional[float]
Base delay in seconds for the first retry.
retry_max_delay: Optional[float]
Maximum delay cap in seconds for any single wait.
backoff_factor: Optional[float]
Exponential backoff factor (>= 1).
attempt_cap: Optional[int]
Cap for exponent growth (backoff_factor ** attempt_cap).
jitter_min: Optional[float]
Minimum jitter in seconds.
jitter_max: Optional[float]
Maximum jitter in seconds.
"""

class Config:
"""Config class for reading Fields from env."""

env_prefix = "SUMMARIZER_"
case_sensitive = False
model_config = SettingsConfigDict(env_prefix="SUMMARIZER_", case_sensitive=False)

maximum_input_size: int = Field(default=8000)
maximum_concurrreny: int = Field(default=10)
maximum_concurrency: int = Field(default=10)
max_retries: Optional[PositiveInt] = Field(
default=None,
title="Max Retries",
description="Total retries, not counting the initial attempt.",
)
retry_base_delay: Optional[float] = Field(
default=None,
ge=0,
title="Retry Base Delay",
description="Base delay in seconds for the first retry.",
)
retry_max_delay: Optional[float] = Field(
default=None,
gt=0,
title="Retry Max Delay",
description="Maximum delay cap in seconds for any single wait.",
)
backoff_factor: Optional[float] = Field(
default=None,
ge=1.0,
title="Backoff Factor",
description="Exponential backoff factor (>= 1).",
)
attempt_cap: Optional[int] = Field(
default=None,
ge=0,
title="Attempt Cap",
description="Cap for exponent growth (backoff_factor ** attempt_cap).",
)
jitter_min: Optional[float] = Field(
default=None,
ge=0.0,
title="Jitter Min (s)",
description="Minimum jitter in seconds.",
)
jitter_max: Optional[float] = Field(
default=None,
ge=0.0,
title="Jitter Max (s)",
description="Maximum jitter in seconds.",
)

@model_validator(mode="after")
def _check_relations(self) -> "SummarizerSettings":
if not self.jitter_min or not self.jitter_max:
return self
if self.jitter_max < self.jitter_min:
raise ValueError("jitter_max must be >= jitter_min")
return self
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
"""Module for the LangchainSummarizer class."""

import asyncio
import logging
import traceback
from typing import Optional

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_core.runnables import Runnable, RunnableConfig, ensure_config
from openai import APIConnectionError, APIError, APITimeoutError, RateLimitError

from admin_api_lib.impl.settings.summarizer_settings import SummarizerSettings
from admin_api_lib.summarizer.summarizer import (
Summarizer,
SummarizerInput,
SummarizerOutput,
)
from rag_core_lib.impl.langfuse_manager.langfuse_manager import LangfuseManager
from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings
from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore
from rag_core_lib.impl.utils.retry_decorator import create_retry_decorator_settings, retry_with_backoff

logger = logging.getLogger(__name__)

Expand All @@ -32,10 +36,13 @@ def __init__(
langfuse_manager: LangfuseManager,
chunker: RecursiveCharacterTextSplitter,
semaphore: AsyncThreadsafeSemaphore,
summarizer_settings: SummarizerSettings,
retry_decorator_settings: RetryDecoratorSettings,
):
self._chunker = chunker
self._langfuse_manager = langfuse_manager
self._semaphore = semaphore
self._retry_decorator_settings = create_retry_decorator_settings(summarizer_settings, retry_decorator_settings)

async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] = None) -> SummarizerOutput:
"""
Expand Down Expand Up @@ -65,40 +72,46 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig]
"""
assert query, "Query is empty: %s" % query # noqa S101
config = ensure_config(config)
tries_remaining = config.get("configurable", {}).get("tries_remaining", 3)
logger.debug("Tries remaining %d" % tries_remaining)

if tries_remaining < 0:
raise Exception("Summary creation failed.")
document = Document(page_content=query)
langchain_documents = self._chunker.split_documents([document])
logger.debug("Summarizing %d chunk(s)...", len(langchain_documents))

outputs = []
for langchain_document in langchain_documents:
async with self._semaphore:
try:
result = await self._create_chain().ainvoke({"text": langchain_document.page_content}, config)
# Extract content from AIMessage if it's not already a string
content = result.content if hasattr(result, "content") else str(result)
outputs.append(content)
except Exception as e:
logger.error("Error in summarizing langchain doc: %s %s", e, traceback.format_exc())
config["tries_remaining"] = tries_remaining - 1
result = await self._create_chain().ainvoke({"text": langchain_document.page_content}, config)
# Extract content from AIMessage if it's not already a string
content = result.content if hasattr(result, "content") else str(result)
outputs.append(content)
# Fan out with concurrency, bounded by your semaphore inside _summarize_chunk
tasks = [asyncio.create_task(self._summarize_chunk(doc.page_content, config)) for doc in langchain_documents]
outputs = await asyncio.gather(*tasks)

if len(outputs) == 1:
return outputs[0]
summary = " ".join(outputs)

merged = " ".join(outputs)

logger.debug(
"Reduced number of chars from %d to %d"
% (len("".join([x.page_content for x in langchain_documents])), len(summary))
"Reduced number of chars from %d to %d",
len("".join([x.page_content for x in langchain_documents])),
len(merged),
)
return await self.ainvoke(summary, config)
return await self._summarize_chunk(merged, config)

def _create_chain(self) -> Runnable:
return self._langfuse_manager.get_base_prompt(self.__class__.__name__) | self._langfuse_manager.get_base_llm(
self.__class__.__name__
)

def _retry_with_backoff_wrapper(self):
return retry_with_backoff(
settings=self._retry_decorator_settings,
exceptions=(APIError, RateLimitError, APITimeoutError, APIConnectionError),
rate_limit_exceptions=(RateLimitError,),
logger=logger,
)

async def _summarize_chunk(self, text: str, config: Optional[RunnableConfig]) -> SummarizerOutput:
@self._retry_with_backoff_wrapper()
async def _call(text: str, config: Optional[RunnableConfig]) -> SummarizerOutput:
response = await self._create_chain().ainvoke({"text": text}, config)
return response.content if hasattr(response, "content") else str(response)

# Hold the semaphore for the entire retry lifecycle
async with self._semaphore:
return await _call(text, config)
4 changes: 3 additions & 1 deletion libs/rag-core-api/src/rag_core_api/dependency_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from rag_core_lib.impl.settings.langfuse_settings import LangfuseSettings
from rag_core_lib.impl.settings.ollama_llm_settings import OllamaSettings
from rag_core_lib.impl.settings.rag_class_types_settings import RAGClassTypeSettings
from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings
from rag_core_lib.impl.settings.stackit_vllm_settings import StackitVllmSettings
from rag_core_lib.impl.tracers.langfuse_traced_runnable import LangfuseTracedRunnable
from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore
Expand All @@ -89,6 +90,7 @@ class DependencyContainer(DeclarativeContainer):
stackit_embedder_settings = StackitEmbedderSettings()
chat_history_settings = ChatHistorySettings()
sparse_embedder_settings = SparseEmbedderSettings()
retry_decorator_settings = RetryDecoratorSettings()
chat_history_config.from_dict(chat_history_settings.model_dump())

class_selector_config.from_dict(rag_class_type_settings.model_dump() | embedder_class_type_settings.model_dump())
Expand All @@ -98,7 +100,7 @@ class DependencyContainer(DeclarativeContainer):
ollama=Singleton(
LangchainCommunityEmbedder, embedder=Singleton(OllamaEmbeddings, **ollama_embedder_settings.model_dump())
),
stackit=Singleton(StackitEmbedder, stackit_embedder_settings),
stackit=Singleton(StackitEmbedder, stackit_embedder_settings, retry_decorator_settings),
)

sparse_embedder = Singleton(FastEmbedSparse, **sparse_embedder_settings.model_dump())
Expand Down
Loading