diff --git a/python/beeai_framework/backend/backend.py b/python/beeai_framework/backend/backend.py index 68399a396..48440f9c5 100644 --- a/python/beeai_framework/backend/backend.py +++ b/python/beeai_framework/backend/backend.py @@ -7,17 +7,113 @@ class Backend: + """A unified interface for managing chat and embedding models. + + The Backend class provides a convenient way to bundle together a chat model + and an embedding model, which are commonly used together in AI applications. + It offers factory methods for easy instantiation from provider names. + + Attributes: + chat: The chat model instance for text generation and conversation. + embedding: The embedding model instance for text vectorization. + + Example: + >>> # Create backend with specific models + >>> from beeai_framework.backend import Backend + >>> from beeai_framework.adapters.openai import OpenAIChatModel, OpenAIEmbeddingModel + >>> backend = Backend( + ... chat=OpenAIChatModel("gpt-4"), + ... embedding=OpenAIEmbeddingModel("text-embedding-3-small") + ... ) + + >>> # Create backend from provider names + >>> backend = Backend.from_name(chat="openai:gpt-4", embedding="openai:text-embedding-3-small") + + >>> # Create backend using same provider for both + >>> backend = Backend.from_provider("openai") + """ + def __init__(self, *, chat: ChatModel, embedding: EmbeddingModel) -> None: + """Initialize a Backend with chat and embedding models. + + Args: + chat: The chat model instance to use for text generation. + embedding: The embedding model instance to use for text vectorization. + + Example: + >>> from beeai_framework.adapters.openai import OpenAIChatModel, OpenAIEmbeddingModel + >>> backend = Backend( + ... chat=OpenAIChatModel("gpt-4"), + ... embedding=OpenAIEmbeddingModel("text-embedding-3-small") + ... ) + """ self.chat = chat self.embedding = embedding @staticmethod def from_name(*, chat: str | ProviderName, embedding: str | ProviderName) -> "Backend": + """Create a Backend instance from provider and model names. + + This factory method allows you to instantiate a Backend by specifying + the provider and model names as strings, without needing to import + specific model classes. + + Args: + chat: The chat model identifier in the format "provider:model" or just "provider". + Examples: "openai:gpt-4", "anthropic:claude-3-opus", "ollama". + embedding: The embedding model identifier in the format "provider:model" or just "provider". + Examples: "openai:text-embedding-3-small", "ollama:nomic-embed-text". + + Returns: + A new Backend instance with the specified chat and embedding models. + + Example: + >>> backend = Backend.from_name( + ... chat="openai:gpt-4", + ... embedding="openai:text-embedding-3-small" + ... ) + >>> backend = Backend.from_name( + ... chat="anthropic:claude-3-opus", + ... embedding="ollama:nomic-embed-text" + ... ) + """ return Backend(chat=ChatModel.from_name(chat), embedding=EmbeddingModel.from_name(embedding)) @staticmethod def from_provider(name: str | ProviderName) -> "Backend": + """Create a Backend instance using the same provider for both models. + + This is a convenience method for when you want to use the same provider + for both chat and embedding models. It uses the provider's default models. + + Args: + name: The provider name (e.g., "openai", "anthropic", "ollama"). + The provider's default chat and embedding models will be used. + + Returns: + A new Backend instance with both chat and embedding models from the same provider. + + Example: + >>> # Uses OpenAI's default chat and embedding models + >>> backend = Backend.from_provider("openai") + + >>> # Uses Ollama's default chat and embedding models + >>> backend = Backend.from_provider("ollama") + """ return Backend.from_name(chat=name, embedding=name) async def clone(self) -> "Backend": + """Create a deep copy of this Backend instance. + + This method clones both the chat and embedding models, creating + independent copies that can be modified without affecting the original. + + Returns: + A new Backend instance with cloned chat and embedding models. + + Example: + >>> original = Backend.from_provider("openai") + >>> cloned = await original.clone() + >>> # Modifications to cloned won't affect original + """ return Backend(chat=await self.chat.clone(), embedding=await self.embedding.clone()) diff --git a/python/beeai_framework/backend/chat.py b/python/beeai_framework/backend/chat.py index 1a30903f4..3d21646a8 100644 --- a/python/beeai_framework/backend/chat.py +++ b/python/beeai_framework/backend/chat.py @@ -61,6 +61,27 @@ class ChatModelKwargs(TypedDict, total=False): + """Configuration options for initializing a ChatModel. + + This TypedDict defines all the optional keyword arguments that can be passed + to a ChatModel constructor to customize its behavior. + + Attributes: + tool_call_fallback_via_response_format: Enable fallback to response format for tool calls. + retry_on_empty_response: Automatically retry when the model returns an empty response. + model_supports_tool_calling: Whether the underlying model supports native tool calling. + allow_parallel_tool_calls: Allow the model to make multiple tool calls simultaneously. + ignore_parallel_tool_calls: Ignore all but the first tool call when multiple are returned. + use_strict_tool_schema: Use strict JSON schema validation for tool parameters. + use_strict_model_schema: Use strict JSON schema validation for structured outputs. + supports_top_level_unions: Whether the model supports union types at the top level. + parameters: Default parameters for model generation (temperature, max_tokens, etc.). + cache: Cache implementation for storing and retrieving model outputs. + settings: Additional provider-specific settings. + middlewares: List of middleware to apply during model execution. + tool_choice_support: Set of supported tool choice modes (required, none, single, auto). + fix_invalid_tool_calls: Automatically attempt to fix malformed tool calls. + """ tool_call_fallback_via_response_format: bool retry_on_empty_response: bool model_supports_tool_calling: bool @@ -168,6 +189,64 @@ class ChatModelOptions(RunnableOptions, total=False): class ChatModel(Runnable[ChatModelOutput]): + """Abstract base class for all chat model implementations. + + ChatModel provides a unified interface for interacting with various LLM providers + (OpenAI, Anthropic, Ollama, etc.). It handles tool calling, structured outputs, + streaming, caching, retries, and error handling. + + The class is designed to be subclassed by provider-specific implementations that + implement the `_create` and `_create_stream` abstract methods. + + Class Attributes: + tool_choice_support: Set of supported tool choice modes for this model. + Default: {"required", "none", "single", "auto"} + + Instance Attributes: + tool_call_fallback_via_response_format: Use response format as fallback for tool calls. + model_supports_tool_calling: Whether the model has native tool calling support. + use_strict_model_schema: Use strict JSON schema for structured outputs. + use_strict_tool_schema: Use strict JSON schema for tool parameters. + retry_on_empty_response: Retry automatically when model returns empty response. + fix_invalid_tool_calls: Attempt to fix malformed tool calls automatically. + parameters: Default generation parameters (temperature, max_tokens, etc.). + cache: Cache implementation for storing model outputs. + + Example: + >>> from beeai_framework.adapters.openai import OpenAIChatModel + >>> from beeai_framework.backend.message import UserMessage + >>> + >>> # Create a chat model + >>> model = OpenAIChatModel("gpt-4") + >>> + >>> # Run the model + >>> result = await model.run([UserMessage("Hello, world!")]) + >>> print(result.last_message.text) + >>> + >>> # Use with tools + >>> from beeai_framework.tools import tool + >>> @tool + >>> def get_weather(location: str) -> str: + ... '''Get the weather for a location.''' + ... return f"Weather in {location}: Sunny" + >>> + >>> result = await model.run( + ... [UserMessage("What's the weather in Paris?")], + ... tools=[get_weather], + ... tool_choice="required" + ... ) + >>> + >>> # Use structured output + >>> from pydantic import BaseModel + >>> class Person(BaseModel): + ... name: str + ... age: int + >>> + >>> result = await model.run( + ... [UserMessage("Extract: John is 30 years old")], + ... response_format=Person + ... ) + """ tool_choice_support: ClassVar[set[ToolChoiceType]] = {"required", "none", "single", "auto"} tool_call_fallback_via_response_format: bool model_supports_tool_calling: bool @@ -179,14 +258,31 @@ class ChatModel(Runnable[ChatModelOutput]): @property @abstractmethod def model_id(self) -> str: + """The unique identifier for this model (e.g., 'gpt-4', 'claude-3-opus').""" pass @property @abstractmethod def provider_id(self) -> ProviderName: + """The provider name for this model (e.g., 'openai', 'anthropic').""" pass def __init__(self, **kwargs: Unpack[ChatModelKwargs]) -> None: + """Initialize a ChatModel with the given configuration. + + Args: + **kwargs: Configuration options as defined in ChatModelKwargs. + See ChatModelKwargs documentation for all available options. + + Example: + >>> from beeai_framework.adapters.openai import OpenAIChatModel + >>> model = OpenAIChatModel( + ... "gpt-4", + ... tool_call_fallback_via_response_format=True, + ... retry_on_empty_response=True, + ... parameters=ChatModelParameters(temperature=0.7, max_tokens=1000) + ... ) + """ super().__init__(middlewares=kwargs.get("middlewares", [])) self._settings = kwargs.get("settings", {}) self._settings.update(**exclude_non_annotated(kwargs, ChatModelKwargs)) @@ -220,6 +316,11 @@ def emitter(self) -> Emitter: return self._create_emitter() def _create_emitter(self) -> Emitter: + """Create an event emitter for this chat model. + + Returns: + An Emitter instance configured for chat model events. + """ return Emitter.root().child( namespace=["backend", self.provider_id, "chat"], creator=self, @@ -232,6 +333,21 @@ async def _create( input: ChatModelInput, run: RunContext, ) -> ChatModelOutput: + """Generate a single completion from the model (non-streaming). + + This method must be implemented by subclasses to provide the actual + model invocation logic for non-streaming requests. + + Args: + input: The prepared input containing messages, tools, and parameters. + run: The execution context for this run. + + Returns: + The model's output containing generated messages and metadata. + + Raises: + NotImplementedError: If the subclass doesn't implement this method. + """ raise NotImplementedError @abstractmethod @@ -240,6 +356,21 @@ def _create_stream( input: ChatModelInput, run: RunContext, ) -> AsyncGenerator[ChatModelOutput]: + """Generate a streaming completion from the model. + + This method must be implemented by subclasses to provide the actual + model invocation logic for streaming requests. + + Args: + input: The prepared input containing messages, tools, and parameters. + run: The execution context for this run. + + Yields: + ChatModelOutput chunks as they are generated by the model. + + Raises: + NotImplementedError: If the subclass doesn't implement this method. + """ raise NotImplementedError def _prepare_model_input( @@ -488,6 +619,35 @@ def config( parameters: ChatModelParameters | Callable[[ChatModelParameters], ChatModelParameters] | None = None, cache: ChatModelCache | Callable[[ChatModelCache], ChatModelCache] | None = None, ) -> None: + """Configure the chat model's parameters and cache. + + This method allows you to update the model's configuration after initialization. + You can pass either new values directly or functions that transform the existing values. + + Args: + parameters: New parameters or a function to transform existing parameters. + If a function is provided, it receives the current parameters and should + return the updated parameters. + cache: New cache instance or a function to transform the existing cache. + If a function is provided, it receives the current cache and should + return the updated cache. + + Example: + >>> model = ChatModel.from_name("openai:gpt-4") + >>> + >>> # Set new parameters directly + >>> model.config(parameters=ChatModelParameters(temperature=0.9, max_tokens=2000)) + >>> + >>> # Transform existing parameters + >>> model.config(parameters=lambda p: ChatModelParameters( + ... temperature=p.temperature * 1.5, + ... max_tokens=p.max_tokens + ... )) + >>> + >>> # Update cache + >>> from beeai_framework.cache import SlidingCache + >>> model.config(cache=SlidingCache(max_size=100)) + """ if cache is not None: self.cache = cache(self.cache) if callable(cache) else cache @@ -501,6 +661,40 @@ def from_name( /, **kwargs: Any, ) -> ChatModel: + """Create a ChatModel instance from a provider and model name. + + This factory method allows you to instantiate a chat model by specifying + the provider and model name as a string, without needing to import + provider-specific classes. + + Args: + name: The model identifier in the format "provider:model" or just "provider". + Examples: "openai:gpt-4", "anthropic:claude-3-opus", "ollama:llama2" + options: Optional parameters for the model. Can be a ChatModelParameters + instance or a dictionary of options. + **kwargs: Additional keyword arguments passed to the model constructor. + See ChatModelKwargs for available options. + + Returns: + A ChatModel instance of the appropriate provider-specific subclass. + + Example: + >>> # Create with just model name + >>> model = ChatModel.from_name("openai:gpt-4") + >>> + >>> # Create with parameters + >>> model = ChatModel.from_name( + ... "anthropic:claude-3-opus", + ... ChatModelParameters(temperature=0.7, max_tokens=1000) + ... ) + >>> + >>> # Create with additional options + >>> model = ChatModel.from_name( + ... "openai:gpt-4", + ... tool_call_fallback_via_response_format=False, + ... retry_on_empty_response=True + ... ) + """ parsed_model = parse_model(name) TargetChatModel = load_model(parsed_model.provider_id, "chat") # type: ignore # noqa: N806 if options and isinstance(options, ChatModelParameters): @@ -536,6 +730,26 @@ def _force_tool_call_via_response_format( return not self.model_supports_tool_calling or not tool_choice_supported async def clone(self) -> Self: + """Create a deep copy of this ChatModel instance. + + This method creates an independent copy of the model that can be + modified without affecting the original. Subclasses should override + this method to properly clone their specific state. + + Returns: + A new ChatModel instance with the same configuration. + + Note: + The default implementation returns self and logs a warning. + Provider-specific implementations should override this method + to create proper clones. + + Example: + >>> original = ChatModel.from_name("openai:gpt-4") + >>> cloned = await original.clone() + >>> # Modifications to cloned won't affect original + >>> cloned.config(parameters=ChatModelParameters(temperature=0.9)) + """ if type(self).clone == ChatModel.clone: logging.warning(f"ChatModel ({type(self)!s}) does not implement the 'clone' method.") @@ -543,6 +757,18 @@ async def clone(self) -> Self: @classmethod def get_default_parameters(cls) -> ChatModelParameters: + """Get the default parameters for this chat model class. + + Returns: + ChatModelParameters with default values (temperature=0). + + Note: + Subclasses can override this method to provide different defaults. + + Example: + >>> params = ChatModel.get_default_parameters() + >>> print(params.temperature) # 0 + """ return ChatModelParameters(temperature=0) def _assert_tool_response(self, *, input: ChatModelInput, output: ChatModelOutput) -> None: diff --git a/python/beeai_framework/backend/document_loader.py b/python/beeai_framework/backend/document_loader.py index 62d3d0224..63fba72f8 100644 --- a/python/beeai_framework/backend/document_loader.py +++ b/python/beeai_framework/backend/document_loader.py @@ -13,9 +13,50 @@ class DocumentLoader(ABC): + """Abstract base class for loading documents from various sources. + + DocumentLoader provides a unified interface for loading documents from different + sources such as files, URLs, databases, or APIs. It supports dynamic instantiation + of provider-specific loaders through the `from_name` factory method. + + Subclasses must implement the `load` method to define how documents are loaded + from their specific source, and the `_class_from_name` method for dynamic loading. + + Example: + >>> # Load documents using LangChain's UnstructuredMarkdownLoader + >>> loader = DocumentLoader.from_name( + ... "langchain:UnstructuredMarkdownLoader", + ... file_path="README.md" + ... ) + >>> documents = await loader.load() + >>> print(len(documents)) + >>> + >>> # Load PDF documents + >>> loader = DocumentLoader.from_name( + ... "langchain:PyPDFLoader", + ... file_path="document.pdf" + ... ) + >>> documents = await loader.load() + """ + @classmethod @abstractmethod def _class_from_name(cls, class_name: str, **kwargs: Any) -> DocumentLoader: + """Create a document loader instance from a class name (internal method). + + This method must be implemented by integration-specific subclasses to + handle the dynamic instantiation of document loader classes. + + Args: + class_name: The name of the document loader class to instantiate. + **kwargs: Arguments to pass to the document loader constructor. + + Returns: + An instantiated DocumentLoader of the specified class. + + Raises: + NotImplementedError: If the subclass doesn't implement this method. + """ raise NotImplementedError("Implement me") @classmethod diff --git a/python/beeai_framework/backend/embedding.py b/python/beeai_framework/backend/embedding.py index 16af171ab..bee6546fe 100644 --- a/python/beeai_framework/backend/embedding.py +++ b/python/beeai_framework/backend/embedding.py @@ -28,6 +28,15 @@ class EmbeddingModelKwargs(TypedDict, total=False): + """Configuration options for initializing an EmbeddingModel. + + This TypedDict defines all the optional keyword arguments that can be passed + to an EmbeddingModel constructor to customize its behavior. + + Attributes: + middlewares: List of middleware to apply during model execution. + settings: Additional provider-specific settings. + """ middlewares: Sequence[RunMiddlewareType] settings: dict[str, Any] @@ -38,21 +47,66 @@ class EmbeddingModelKwargs(TypedDict, total=False): class EmbeddingModel(ABC): + """Abstract base class for all embedding model implementations. + + EmbeddingModel provides a unified interface for converting text into vector + embeddings using various providers (OpenAI, Ollama, Cohere, etc.). It handles + batching, retries, error handling, and event emission. + + The class is designed to be subclassed by provider-specific implementations + that implement the `_create` abstract method. + + Attributes: + middlewares: List of middleware functions to apply during execution. + + Example: + >>> from beeai_framework.adapters.openai import OpenAIEmbeddingModel + >>> + >>> # Create an embedding model + >>> model = OpenAIEmbeddingModel("text-embedding-3-small") + >>> + >>> # Generate embeddings for text + >>> result = await model.create(["Hello, world!", "How are you?"]) + >>> print(len(result.embeddings)) # 2 + >>> print(len(result.embeddings[0])) # 1536 (dimension size) + >>> + >>> # Use with retry + >>> result = await model.create( + ... ["Text to embed"], + ... max_retries=3 + ... ) + >>> + >>> # Create from name + >>> model = EmbeddingModel.from_name("openai:text-embedding-3-small") + """ + @property @abstractmethod def model_id(self) -> str: + """The unique identifier for this model (e.g., 'text-embedding-3-small').""" pass @property @abstractmethod def provider_id(self) -> ProviderName: + """The provider name for this model (e.g., 'openai', 'ollama').""" pass @cached_property def emitter(self) -> Emitter: + """Get the event emitter for this embedding model. + + Returns: + An Emitter instance configured for embedding model events. + """ return self._create_emitter() def _create_emitter(self) -> Emitter: + """Create an event emitter for this embedding model. + + Returns: + An Emitter instance configured with the appropriate namespace and events. + """ return Emitter.root().child( namespace=["backend", self.provider_id, "embedding"], creator=self, @@ -60,6 +114,19 @@ def _create_emitter(self) -> Emitter: ) def __init__(self, **kwargs: Unpack[EmbeddingModelKwargs]) -> None: + """Initialize an EmbeddingModel with the given configuration. + + Args: + **kwargs: Configuration options as defined in EmbeddingModelKwargs. + See EmbeddingModelKwargs documentation for all available options. + + Example: + >>> from beeai_framework.adapters.openai import OpenAIEmbeddingModel + >>> model = OpenAIEmbeddingModel( + ... "text-embedding-3-small", + ... settings={"dimensions": 512} + ... ) + """ self._settings: dict[str, Any] = kwargs.get("settings", {}) self._settings.update(**exclude_non_annotated(kwargs, EmbeddingModelKwargs)) @@ -69,6 +136,48 @@ def __init__(self, **kwargs: Unpack[EmbeddingModelKwargs]) -> None: def create( self, values: list[str], *, signal: AbortSignal | None = None, max_retries: int | None = None ) -> Run[EmbeddingModelOutput]: + """Generate embeddings for a list of text strings. + + This method converts text strings into vector embeddings that can be used + for semantic search, similarity comparison, clustering, and other ML tasks. + + Args: + values: List of text strings to convert into embeddings. + signal: Optional abort signal to cancel the operation. + max_retries: Maximum number of retry attempts on failure. Defaults to 0. + + Returns: + A Run object that yields EmbeddingModelOutput containing the embeddings + and usage information. + + Raises: + EmbeddingModelError: If the embedding generation fails. + + Example: + >>> model = EmbeddingModel.from_name("openai:text-embedding-3-small") + >>> + >>> # Generate embeddings + >>> result = await model.create([ + ... "The quick brown fox", + ... "jumps over the lazy dog" + ... ]) + >>> print(len(result.embeddings)) # 2 + >>> print(len(result.embeddings[0])) # 1536 + >>> + >>> # With retry + >>> result = await model.create( + ... ["Text to embed"], + ... max_retries=3 + ... ) + >>> + >>> # With abort signal + >>> from beeai_framework.utils import AbortController + >>> controller = AbortController() + >>> result = await model.create( + ... ["Text to embed"], + ... signal=controller.signal + ... ) + """ model_input = EmbeddingModelInput(values=values, signal=signal, max_retries=max_retries or 0) async def handler(context: RunContext) -> EmbeddingModelOutput: @@ -104,6 +213,34 @@ async def handler(context: RunContext) -> EmbeddingModelOutput: @staticmethod def from_name(name: str | ProviderName, **kwargs: Any) -> "EmbeddingModel": + """Create an EmbeddingModel instance from a provider and model name. + + This factory method allows you to instantiate an embedding model by specifying + the provider and model name as a string, without needing to import + provider-specific classes. + + Args: + name: The model identifier in the format "provider:model" or just "provider". + Examples: "openai:text-embedding-3-small", "ollama:nomic-embed-text" + **kwargs: Additional keyword arguments passed to the model constructor. + See EmbeddingModelKwargs for available options. + + Returns: + An EmbeddingModel instance of the appropriate provider-specific subclass. + + Example: + >>> # Create with just model name + >>> model = EmbeddingModel.from_name("openai:text-embedding-3-small") + >>> + >>> # Create with provider default + >>> model = EmbeddingModel.from_name("openai") + >>> + >>> # Create with additional options + >>> model = EmbeddingModel.from_name( + ... "openai:text-embedding-3-small", + ... settings={"dimensions": 512} + ... ) + """ parsed_model = parse_model(name) TargetChatModel: type = load_model(parsed_model.provider_id, "embedding") # noqa: N806 return TargetChatModel(parsed_model.model_id, **kwargs) # type: ignore @@ -114,13 +251,57 @@ async def _create( input: EmbeddingModelInput, run: RunContext, ) -> EmbeddingModelOutput: + """Generate embeddings for the given input (implementation method). + + This method must be implemented by subclasses to provide the actual + embedding generation logic for the specific provider. + + Args: + input: The prepared input containing text values and parameters. + run: The execution context for this run. + + Returns: + The embedding output containing vectors and usage information. + + Raises: + NotImplementedError: If the subclass doesn't implement this method. + """ raise NotImplementedError async def clone(self) -> Self: + """Create a deep copy of this EmbeddingModel instance. + + This method creates an independent copy of the model that can be + modified without affecting the original. Subclasses should override + this method to properly clone their specific state. + + Returns: + A new EmbeddingModel instance with the same configuration. + + Note: + The default implementation returns self and logs a warning. + Provider-specific implementations should override this method + to create proper clones. + + Example: + >>> original = EmbeddingModel.from_name("openai:text-embedding-3-small") + >>> cloned = await original.clone() + >>> # Modifications to cloned won't affect original + """ if type(self).clone == EmbeddingModel.clone: logging.warning(f"EmbeddingModel ({type(self)!s}) does not implement the 'clone' method.") return self def destroy(self) -> None: + """Clean up resources used by this embedding model. + + This method destroys the event emitter and releases any associated resources. + Should be called when the model is no longer needed. + + Example: + >>> model = EmbeddingModel.from_name("openai:text-embedding-3-small") + >>> # Use the model... + >>> model.destroy() + """ self.emitter.destroy() diff --git a/python/beeai_framework/backend/text_splitter.py b/python/beeai_framework/backend/text_splitter.py index 660c1e5c9..2bd152883 100644 --- a/python/beeai_framework/backend/text_splitter.py +++ b/python/beeai_framework/backend/text_splitter.py @@ -13,6 +13,46 @@ class TextSplitter(ABC): + """Abstract base class for splitting text and documents into smaller chunks. + + TextSplitter provides a unified interface for breaking down large texts or documents + into smaller, manageable chunks. This is essential for processing long documents that + exceed model context limits or for creating more focused embeddings. + + The class supports dynamic instantiation of provider-specific splitters through + the `from_name` factory method, allowing integration with various text splitting + implementations (e.g., LangChain's RecursiveCharacterTextSplitter). + + Subclasses must implement: + - `split_documents`: Split a list of documents into chunks + - `split_text`: Split raw text into chunks + - `_class_from_name`: Factory method for dynamic instantiation + + Example: + >>> # Split documents using LangChain's RecursiveCharacterTextSplitter + >>> splitter = TextSplitter.from_name( + ... "langchain:RecursiveCharacterTextSplitter", + ... chunk_size=1000, + ... chunk_overlap=200 + ... ) + >>> + >>> # Split a list of documents + >>> documents = [Document(content="Long text here...")] + >>> chunks = await splitter.split_documents(documents) + >>> print(len(chunks)) + >>> + >>> # Split raw text + >>> text = "Very long text that needs to be split..." + >>> text_chunks = await splitter.split_text(text) + >>> + >>> # Use with different strategies + >>> splitter = TextSplitter.from_name( + ... "langchain:CharacterTextSplitter", + ... separator="\\n\\n", + ... chunk_size=500 + ... ) + """ + @classmethod def from_name(cls, name: str, **kwargs: Any) -> TextSplitter: """ @@ -47,15 +87,73 @@ def from_name(cls, name: str, **kwargs: Any) -> TextSplitter: @abstractmethod async def split_documents(self, documents: list[Document]) -> list[Document]: - """Split a list of documents into smaller chunks.""" + """Split a list of documents into smaller chunks. + + This method takes a list of documents and splits each one into smaller + chunks based on the splitter's configuration (chunk size, overlap, etc.). + Each chunk becomes a new Document with the same metadata as the original. + + Args: + documents: List of documents to split into chunks. + + Returns: + A list of Document objects, where each represents a chunk of the + original documents. + + Raises: + NotImplementedError: If the subclass doesn't implement this method. + + Example: + >>> documents = [ + ... Document(content="Long document text...", metadata={"source": "file.txt"}) + ... ] + >>> chunks = await splitter.split_documents(documents) + >>> print(len(chunks)) # More chunks than original documents + """ raise NotImplementedError("Implement me") @abstractmethod async def split_text(self, text: str) -> list[str]: - """Split text into smaller chunks.""" + """Split text into smaller chunks. + + This method takes a raw text string and splits it into smaller chunks + based on the splitter's configuration. Unlike `split_documents`, this + returns plain strings without metadata. + + Args: + text: The text string to split into chunks. + + Returns: + A list of text strings, where each represents a chunk of the + original text. + + Raises: + NotImplementedError: If the subclass doesn't implement this method. + + Example: + >>> text = "This is a very long text that needs to be split..." + >>> chunks = await splitter.split_text(text) + >>> print(len(chunks)) + >>> print(chunks[0]) # First chunk + """ raise NotImplementedError("Implement me") @classmethod @abstractmethod def _class_from_name(cls, class_name: str, **kwargs: Any) -> TextSplitter: + """Create a text splitter instance from a class name (internal method). + + This method must be implemented by integration-specific subclasses to + handle the dynamic instantiation of text splitter classes. + + Args: + class_name: The name of the text splitter class to instantiate. + **kwargs: Arguments to pass to the text splitter constructor. + + Returns: + An instantiated TextSplitter of the specified class. + + Raises: + NotImplementedError: If the subclass doesn't implement this method. + """ raise NotImplementedError("Implement me") diff --git a/python/beeai_framework/backend/vector_store.py b/python/beeai_framework/backend/vector_store.py index fd7fc69a7..08e39b445 100644 --- a/python/beeai_framework/backend/vector_store.py +++ b/python/beeai_framework/backend/vector_store.py @@ -12,6 +12,11 @@ class QueryLike(Protocol): + """Protocol for objects that can be converted to query strings. + + Any object that implements the `__str__` method can be used as a query + for vector store searches. + """ def __str__(self) -> str: ... @@ -19,6 +24,54 @@ def __str__(self) -> str: ... class VectorStore(ABC): + """Abstract base class for vector database implementations. + + VectorStore provides a unified interface for storing and retrieving document + embeddings in vector databases. It supports semantic search by finding documents + similar to a query based on vector similarity. + + The class handles: + - Adding documents with automatic embedding generation + - Semantic search with similarity scoring + - Dynamic instantiation of provider-specific stores + + Subclasses must implement: + - `add_documents`: Store documents in the vector database + - `search`: Perform similarity search + - `_class_from_name`: Factory method for dynamic instantiation + + Example: + >>> from beeai_framework.backend import EmbeddingModel, VectorStore + >>> + >>> # Create embedding model + >>> embedding_model = EmbeddingModel.from_name("openai:text-embedding-3-small") + >>> + >>> # Create vector store + >>> vector_store = VectorStore.from_name( + ... "langchain:Chroma", + ... embedding_model=embedding_model, + ... collection_name="my_documents" + ... ) + >>> + >>> # Add documents + >>> documents = [ + ... Document(content="Python is a programming language", metadata={"source": "doc1"}), + ... Document(content="JavaScript is used for web development", metadata={"source": "doc2"}) + ... ] + >>> ids = await vector_store.add_documents(documents) + >>> + >>> # Search for similar documents + >>> results = await vector_store.search("programming languages", k=2) + >>> for doc_with_score in results: + ... print(f"Score: {doc_with_score.score}, Content: {doc_with_score.document.content}") + >>> + >>> # Use different vector stores + >>> vector_store = VectorStore.from_name( + ... "langchain:FAISS", + ... embedding_model=embedding_model + ... ) + """ + @classmethod def from_name(cls, name: str, *, embedding_model: EmbeddingModel, **kwargs: Any) -> VectorStore: """ @@ -59,13 +112,88 @@ def from_name(cls, name: str, *, embedding_model: EmbeddingModel, **kwargs: Any) @abstractmethod async def add_documents(self, documents: list[Document]) -> list[str]: + """Add documents to the vector store. + + This method takes a list of documents, generates embeddings for their content + using the configured embedding model, and stores them in the vector database. + Each document is assigned a unique identifier. + + Args: + documents: List of Document objects to add to the vector store. + Each document should have content and optional metadata. + + Returns: + A list of unique identifiers (IDs) for the added documents, + in the same order as the input documents. + + Raises: + NotImplementedError: If the subclass doesn't implement this method. + + Example: + >>> documents = [ + ... Document(content="First document", metadata={"source": "file1.txt"}), + ... Document(content="Second document", metadata={"source": "file2.txt"}) + ... ] + >>> ids = await vector_store.add_documents(documents) + >>> print(ids) # ['id1', 'id2'] + """ raise NotImplementedError("Implement me") @abstractmethod async def search(self, query: QueryLike, k: int = 4, **kwargs: Any) -> list[DocumentWithScore]: + """Search for documents similar to the query. + + This method performs semantic search by converting the query to an embedding + and finding the k most similar documents in the vector store based on + vector similarity (e.g., cosine similarity). + + Args: + query: The search query. Can be a string or any object implementing + the QueryLike protocol (has __str__ method). + k: The number of most similar documents to return. Defaults to 4. + **kwargs: Additional provider-specific search parameters + (e.g., filter conditions, score threshold). + + Returns: + A list of DocumentWithScore objects, sorted by similarity score + (highest first). Each contains the document and its similarity score. + + Raises: + NotImplementedError: If the subclass doesn't implement this method. + + Example: + >>> # Basic search + >>> results = await vector_store.search("machine learning", k=3) + >>> for result in results: + ... print(f"Score: {result.score:.3f}") + ... print(f"Content: {result.document.content}") + >>> + >>> # Search with filters (provider-specific) + >>> results = await vector_store.search( + ... "python programming", + ... k=5, + ... filter={"source": "documentation"} + ... ) + """ raise NotImplementedError("Implement me") @classmethod @abstractmethod def _class_from_name(cls, class_name: str, embedding_model: EmbeddingModel, **kwargs: Any) -> VectorStore: + """Create a vector store instance from a class name (internal method). + + This method must be implemented by integration-specific subclasses to + handle the dynamic instantiation of vector store classes. + + Args: + class_name: The name of the vector store class to instantiate. + embedding_model: The embedding model to use for generating vectors. + **kwargs: Arguments to pass to the vector store constructor. + + Returns: + An instantiated VectorStore of the specified class. + + Raises: + NotImplementedError: If the subclass doesn't implement this method. + """ raise NotImplementedError("Implement me")