Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
*.swp
*.ipynb
*.jpg
*.jpeg
*.png
Expand Down
12 changes: 8 additions & 4 deletions akd/agents/search/components/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
providing deep integration of research workflow capabilities.
"""

from .triage import TriageComponent
from .clarification import ClarificationComponent
from .clarification import ClarificationComponent
from .content_condensation import ContentCondensationComponent
from .content_condenser import ContentCondenser
from .instruction_builder import InstructionBuilderComponent
from .research_synthesis import ResearchSynthesisComponent
from .triage import TriageComponent

__all__ = [
"TriageComponent",
"ClarificationComponent",
"InstructionBuilderComponent",
"InstructionBuilderComponent",
"ResearchSynthesisComponent",
]
"ContentCondensationComponent",
"ContentCondenser",
]
208 changes: 208 additions & 0 deletions akd/agents/search/components/content_condensation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
"""
LLM-based content condensation for research synthesis.
"""

from typing import List

import tiktoken
from loguru import logger
from pydantic import Field

from akd._base import IOSchema
from akd.agents._base import BaseAgentConfig, LangBaseAgent
from akd.configs.prompts import CONTENT_CONDENSATION_PROMPT
from akd.structures import SearchResultItem


class ContentCondensationInputSchema(IOSchema):
"""Input schema for content condensation."""

research_question: str = Field(
description="The research question to extract relevant content for"
)
search_results: List[SearchResultItem] = Field(
description="Search results with full text content to condense"
)
max_tokens: int = Field(
default=4000, description="Maximum total tokens for condensed output"
)


class ContentCondensationOutputSchema(IOSchema):
"""Output schema for content condensation."""

condensed_results: List[SearchResultItem] = Field(
description="Search results with condensed content"
)
total_tokens_reduced: int = Field(
description="Total tokens reduced through condensation"
)
compression_ratio: float = Field(
description="Ratio of final to original token count"
)


class ContentCondensationConfig(BaseAgentConfig):
"""Configuration for content condensation."""

model_name: str = Field(
default="gpt-4o-mini", description="Model to use for content condensation"
)
temperature: float = Field(
default=0.1, description="Temperature for content condensation"
)


class ContentCondensationComponent(LangBaseAgent):
"""
Simple LLM-based component that condenses SearchResultItem content to extract
only information relevant to a research question, honoring token limits.

Uses dependency injection for better testability and flexibility.
"""

input_schema = ContentCondensationInputSchema
output_schema = ContentCondensationOutputSchema
config_schema = ContentCondensationConfig

def __init__(
self,
config: ContentCondensationConfig | None = None,
debug: bool = False,
):
config = config or ContentCondensationConfig()
super().__init__(config=config, debug=debug)

# Initialize tokenizer for token counting
try:
self.tokenizer = tiktoken.encoding_for_model(self.config.model_name)
except KeyError:
self.tokenizer = tiktoken.get_encoding("cl100k_base")

def _count_tokens(self, text: str) -> int:
"""Count tokens in text."""
return len(self.tokenizer.encode(text))

async def _condense_single_result(
self, result: SearchResultItem, research_question: str, target_tokens: int
) -> SearchResultItem:
"""Condense content in a single search result."""
if not result.content or len(result.content.strip()) < 100:
return result

original_tokens = self._count_tokens(result.content)
if original_tokens <= target_tokens:
return result

prompt = CONTENT_CONDENSATION_PROMPT.format(
research_question=research_question,
source_title=result.title or "Unknown",
source_url=result.url,
content=result.content,
target_tokens=target_tokens,
)

try:
response = await self.client.ainvoke([{"role": "user", "content": prompt}])

condensed_content = response.content.strip()

# Check if content was deemed irrelevant
if (
condensed_content == "[NO RELEVANT CONTENT]"
or len(condensed_content) < 10
):
condensed_content = ""

# Create new result with condensed content
condensed_result = result.model_copy()
condensed_result.content = condensed_content

if self.debug:
new_tokens = self._count_tokens(condensed_content)
logger.debug(
f"Condensed {result.url}: {original_tokens} -> {new_tokens} tokens"
)

return condensed_result

except Exception as e:
if self.debug:
logger.warning(f"Error condensing {result.url}: {e}")
return result

async def _arun(
self, params: ContentCondensationInputSchema, **kwargs
) -> ContentCondensationOutputSchema:
"""
Condense content in search results to extract only information relevant
to the research question.
"""

# Filter to only results with substantial content
results_with_content = [
r
for r in params.search_results
if r.content and len(r.content.strip()) >= 100
]

if not results_with_content:
return ContentCondensationOutputSchema(
condensed_results=params.search_results,
total_tokens_reduced=0,
compression_ratio=1.0,
)

# Calculate original token count
original_tokens = sum(
self._count_tokens(r.content) for r in results_with_content
)

if self.debug:
logger.debug(
f"Condensing {len(results_with_content)} results with {original_tokens} total tokens"
)

# If already under limit, return as-is
if original_tokens <= params.max_tokens:
return ContentCondensationOutputSchema(
condensed_results=params.search_results,
total_tokens_reduced=0,
compression_ratio=1.0,
)

# Allocate tokens per result
tokens_per_result = params.max_tokens // len(results_with_content)

# Condense each result
condensed_results = []
for result in params.search_results:
if result.content and len(result.content.strip()) >= 100:
condensed = await self._condense_single_result(
result, params.research_question, tokens_per_result
)
condensed_results.append(condensed)
else:
condensed_results.append(result)

# Calculate final metrics
final_tokens = sum(
self._count_tokens(r.content) for r in condensed_results if r.content
)

tokens_reduced = original_tokens - final_tokens
compression_ratio = (
final_tokens / original_tokens if original_tokens > 0 else 1.0
)

if self.debug:
logger.debug(
f"Condensation complete: {original_tokens} -> {final_tokens} tokens "
f"(reduced {tokens_reduced}, ratio: {compression_ratio:.3f})"
)

return ContentCondensationOutputSchema(
condensed_results=condensed_results,
total_tokens_reduced=tokens_reduced,
compression_ratio=compression_ratio,
)
119 changes: 108 additions & 11 deletions akd/agents/search/components/research_synthesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
from akd.configs.prompts import DEEP_RESEARCH_AGENT_PROMPT
from akd.structures import SearchResultItem

from .content_condensation import (
ContentCondensationComponent,
ContentCondensationConfig,
ContentCondensationInputSchema,
)


class ResearchSynthesisInputSchema(InputSchema):
"""Input schema for the ResearchSynthesisAgent."""
Expand Down Expand Up @@ -98,11 +104,59 @@ def __init__(
self,
config: Optional[ResearchSynthesisAgentConfig] = None,
debug: bool = False,
condensation_component: Optional[ContentCondensationComponent] = None,
):
self.debug = debug
# Create the internal agent
self._agent = ResearchSynthesisAgent(config=config, debug=debug)

if condensation_component is not None:
self._condensation_component = condensation_component
else:
# Create default condensation component
condensation_config = ContentCondensationConfig(
model_name="gpt-4o-mini", temperature=0.1, debug=debug
)
self._condensation_component = ContentCondensationComponent(
config=condensation_config, debug=debug
)

async def _condense_content_for_synthesis(
self,
results: List[SearchResultItem],
research_question: str,
max_tokens: int = None,
) -> List[SearchResultItem]:
"""
Condense content in search results to focus only on information
relevant to the research question.
"""
if max_tokens is None:
max_tokens = self._agent.config.max_tokens

try:
condensation_input = ContentCondensationInputSchema(
research_question=research_question,
search_results=results,
max_tokens=max_tokens,
)

condensation_output = await self._condensation_component.arun(
condensation_input
)

if self.debug:
logger.debug(
f"Content condensation: {condensation_output.total_tokens_reduced} tokens reduced, "
f"compression ratio: {condensation_output.compression_ratio:.3f}"
)

return condensation_output.condensed_results

except Exception as e:
if self.debug:
logger.warning(f"Error in content condensation: {e}")
return results

async def synthesize(
self,
results: List[SearchResultItem],
Expand Down Expand Up @@ -142,7 +196,27 @@ async def synthesize(
+ "\n".join(f"- {trace}" for trace in research_trace[-5:])
)

# Create input for the agent
# Use simple content condensation
if self.debug:
logger.debug("Using content condensation for synthesis")

try:
managed_results = await self._condense_content_for_synthesis(
results=results,
research_question=original_query,
max_tokens=self._agent.config.max_tokens,
)

if self.debug:
logger.debug(
f"Content condensation: {len(results)} -> {len(managed_results)} results"
)
except Exception as e:
if self.debug:
logger.warning(f"Content condensation failed, using fallback: {e}")
managed_results = results[:10] # Simple fallback: take first 10 results

# Single synthesis attempt with properly sized content
agent_input = ResearchSynthesisInputSchema(
query=original_query,
search_results=results,
Expand All @@ -162,15 +236,38 @@ async def synthesize(
return agent_output

except Exception as e:
logger.error(f"Error in agent synthesis: {e}")
# Create a simple fallback if agent fails
return self._create_fallback_output(
results,
original_query,
quality_scores,
research_trace,
iterations_performed,
)
if self._is_context_length_error(e):
if self.debug:
logger.error("Context length still exceeded after condensation.")

# Emergency fallback: try with even fewer results
emergency_results = managed_results[:5]
emergency_input = ResearchSynthesisInputSchema(
query=original_query,
search_results=emergency_results,
context=context,
)

try:
return await self._agent.arun(emergency_input)
except Exception as emergency_e:
if self.debug:
logger.error(f"Emergency fallback also failed: {emergency_e}")
raise emergency_e
else:
# Non context-length errors: log and fallback
if self.debug:
logger.error(f"Error in agent synthesis: {e}")
# Fall through to fallback

# Fallback if all retries failed
return self._create_fallback_output(
results,
original_query,
quality_scores,
research_trace,
iterations_performed,
)

def _create_fallback_output(
self,
Expand Down
Loading