Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions docs/concepts/pipeline-wrapper.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,69 @@ async def run_chat_completion_async(self, model: str, messages: List[dict], body
)
```

## Streaming from Multiple Components

!!! info "Automatic Multi-Component Streaming"
Hayhooks automatically enables streaming for **all** streaming-capable components in your pipeline - no special configuration needed!

When your pipeline contains multiple components that support streaming (e.g., multiple LLMs), all of them stream their outputs automatically as the pipeline executes.

### Example: Sequential LLMs with Streaming

```python
class MultiLLMWrapper(BasePipelineWrapper):
def setup(self) -> None:
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage

self.pipeline = Pipeline()

# First LLM - initial answer
self.pipeline.add_component(
"prompt_1",
ChatPromptBuilder(
template=[
ChatMessage.from_system("You are a helpful assistant."),
ChatMessage.from_user("{{query}}")
]
)
)
self.pipeline.add_component("llm_1", OpenAIChatGenerator(model="gpt-4o-mini"))

# Second LLM - refines the answer using Jinja2 to access ChatMessage attributes
self.pipeline.add_component(
"prompt_2",
ChatPromptBuilder(
template=[
ChatMessage.from_system("You are a helpful assistant that refines responses."),
ChatMessage.from_user(
"Previous response: {{previous_response[0].text}}\n\nRefine this."
)
]
)
)
self.pipeline.add_component("llm_2", OpenAIChatGenerator(model="gpt-4o-mini"))

# Connect components - LLM 1's replies go directly to prompt_2
self.pipeline.connect("prompt_1.prompt", "llm_1.messages")
self.pipeline.connect("llm_1.replies", "prompt_2.previous_response")
self.pipeline.connect("prompt_2.prompt", "llm_2.messages")

def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Generator:
question = get_last_user_message(messages)

# Both LLMs will stream automatically!
return streaming_generator(
pipeline=self.pipeline,
pipeline_run_args={"prompt_1": {"template_variables": {"query": question}}}
)
```

**What happens:** Both LLMs **automatically stream** their responses token by token as the pipeline executes. The second prompt builder uses Jinja2 syntax (`{{previous_response[0].text}}`) to access the text content from the first LLM's `ChatMessage` response. **No custom extraction components needed** - streaming works for any number of components.

See the [Multi-LLM Streaming Example](https://github.com/deepset-ai/hayhooks/tree/main/examples/pipeline_wrappers/multi_llm_streaming) for a complete working implementation.

## File Upload Support

Hayhooks can handle file uploads by adding a `files` parameter:
Expand Down
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ This directory contains various examples demonstrating different use cases and f

| Example | Description | Key Features | Use Case |
|---------|-------------|--------------|----------|
| [multi_llm_streaming](./pipeline_wrappers/multi_llm_streaming/) | Multiple LLM components with automatic streaming | • Two sequential LLMs<br/>• Automatic multi-component streaming<br/>• No special configuration needed<br/>• Shows default streaming behavior | Demonstrating how hayhooks automatically streams from all components in a pipeline |
| [async_question_answer](./pipeline_wrappers/async_question_answer/) | Async question-answering pipeline with streaming support | • Async pipeline execution<br/>• Streaming responses<br/>• OpenAI Chat Generator<br/>• Both API and chat completion interfaces | Building conversational AI systems that need async processing and real-time streaming responses |
| [chat_with_website](./pipeline_wrappers/chat_with_website/) | Answer questions about website content | • Web content fetching<br/>• HTML to document conversion<br/>• Content-based Q&A<br/>• Configurable URLs | Creating AI assistants that can answer questions about specific websites or web-based documentation |
| [chat_with_website_mcp](./pipeline_wrappers/chat_with_website_mcp/) | MCP-compatible website chat pipeline | • MCP (Model Context Protocol) support<br/>• Website content analysis<br/>• API-only interface<br/>• Simplified deployment | Integrating website analysis capabilities into MCP-compatible AI systems and tools |
Expand Down
98 changes: 98 additions & 0 deletions examples/pipeline_wrappers/multi_llm_streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Multi-LLM Streaming Example

This example demonstrates hayhooks' automatic multi-component streaming support.

## Overview

The pipeline contains **two LLM components in sequence**:

1. **LLM 1** (`gpt-5-nano` with `reasoning_effort: low`): Provides a short, concise initial answer to the user's question
2. **LLM 2** (`gpt-5-nano` with `reasoning_effort: medium`): Refines and expands the answer into a detailed, professional response

Both LLMs automatically stream their responses - no special configuration needed!

![Multi-LLM Streaming Example](./multi_stream.gif)

## How It Works

Hayhooks automatically enables streaming for **all** streaming-capable components. Both LLMs stream their responses serially (one after another) without any special configuration.

The pipeline connects LLM 1's replies directly to the second prompt builder. Using Jinja2 template syntax, the second prompt builder can access the `ChatMessage` attributes directly: `{{previous_response[0].text}}`. This approach is simple and doesn't require any custom extraction components.

This example also demonstrates injecting a visual separator (`**[LLM 2 - Refining the response]**`) between the two LLM outputs using `StreamingChunk.component_info` to detect component transitions.

## Usage

### Deploy with Hayhooks

```bash
# Set your OpenAI API key
export OPENAI_API_KEY=your_api_key_here

# Deploy the pipeline
hayhooks deploy examples/pipeline_wrappers/multi_llm_streaming

# Test it via OpenAI-compatible API
curl -X POST http://localhost:1416/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "multi_llm_streaming",
"messages": [{"role": "user", "content": "What is machine learning?"}],
"stream": true
}'
```

### Use Directly in Code

```python
from haystack import Pipeline
from haystack.components.builders import ChatPromptBuilder
from haystack.dataclasses import ChatMessage
from hayhooks import streaming_generator

# Create your pipeline with multiple streaming components
pipeline = Pipeline()
# ... add LLM 1 and prompt_builder_1 ...

# Add second prompt builder that accesses ChatMessage attributes via Jinja2
pipeline.add_component(
"prompt_builder_2",
ChatPromptBuilder(
template=[
ChatMessage.from_system("You are a helpful assistant."),
ChatMessage.from_user("Previous: {{previous_response[0].text}}\n\nRefine this.")
]
)
)
# ... add LLM 2 ...

# Connect: LLM 1 replies directly to prompt_builder_2
pipeline.connect("llm_1.replies", "prompt_builder_2.previous_response")

# streaming_generator automatically streams from ALL components
for chunk in streaming_generator(
pipeline=pipeline,
pipeline_run_args={"prompt_builder_1": {"template_variables": {"query": "Your question"}}}
):
print(chunk.content, end="", flush=True)
```

## Integration with OpenWebUI

This pipeline works seamlessly with OpenWebUI:

1. Configure OpenWebUI to connect to hayhooks (see [OpenWebUI Integration docs](../../../docs/features/openwebui-integration.md))
2. Deploy this pipeline
3. Select it as a model in OpenWebUI
4. Watch both LLMs stream their responses in real-time

## Technical Details

- **Pipeline Flow**: `LLM 1 → Prompt Builder 2 → LLM 2`
- **Jinja2 Templates**: `ChatPromptBuilder` uses Jinja2, allowing direct access to `ChatMessage` attributes in templates
- **Template Variables**: LLM 1's `List[ChatMessage]` replies are passed directly as `previous_response` to the second prompt builder
- **Accessing ChatMessage Content**: Use `{{previous_response[0].text}}` in templates to access the text content
- **Streaming**: Serial execution with automatic callback management for all components
- **Transition Detection**: Uses `StreamingChunk.component_info.name` to detect when LLM 2 starts
- **Visual Separator**: Injects a `StreamingChunk` between LLM outputs
- **Error Handling**: Stream terminates gracefully if any component fails
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
129 changes: 129 additions & 0 deletions examples/pipeline_wrappers/multi_llm_streaming/pipeline_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from collections.abc import Generator
from typing import Any, List, Union # noqa: UP035

from haystack import Pipeline
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage, StreamingChunk
from haystack.utils import Secret

from hayhooks import BasePipelineWrapper, get_last_user_message, streaming_generator


class PipelineWrapper(BasePipelineWrapper):
"""
A pipeline with two sequential LLM components that both stream.

The first LLM (low reasoning) provides a concise answer, and the second LLM
(medium reasoning) refines and expands it with more detail.
Both automatically stream their responses - this is the default behavior in hayhooks.
"""

def setup(self) -> None:
"""Initialize the pipeline with two streaming LLM components."""
self.pipeline = Pipeline()

# First stage: Initial answer
self.pipeline.add_component(
"prompt_builder_1",
ChatPromptBuilder(
template=[
ChatMessage.from_system(
"You are a helpful assistant. \nAnswer the user's question in a short and concise manner."
),
ChatMessage.from_user("{{query}}"),
]
),
)
self.pipeline.add_component(
"llm_1",
OpenAIChatGenerator(
api_key=Secret.from_env_var("OPENAI_API_KEY"),
model="gpt-5-nano",
generation_kwargs={
"reasoning_effort": "low",
},
),
)

# Second stage: Refinement
# The prompt builder can directly access ChatMessage attributes via Jinja2
self.pipeline.add_component(
"prompt_builder_2",
ChatPromptBuilder(
template=[
ChatMessage.from_system("You are a helpful assistant that refines and improves responses."),
ChatMessage.from_user(
"Here is the previous response:\n\n{{previous_response[0].text}}\n\n"
"Please refine and improve this response. "
"Make it a bit more detailed, clear, and professional. "
"Please state that you're refining the response in the beginning of your answer."
),
]
),
)
self.pipeline.add_component(
"llm_2",
OpenAIChatGenerator(
api_key=Secret.from_env_var("OPENAI_API_KEY"),
model="gpt-5-nano",
generation_kwargs={
"reasoning_effort": "medium",
},
),
)

# Connect the components
self.pipeline.connect("prompt_builder_1.prompt", "llm_1.messages")
self.pipeline.connect("llm_1.replies", "prompt_builder_2.previous_response")
self.pipeline.connect("prompt_builder_2.prompt", "llm_2.messages")

def run_api(self, query: str) -> dict[str, Any]:
"""Run the pipeline in non-streaming mode."""
result = self.pipeline.run(
{
"prompt_builder_1": {"template_variables": {"query": query}},
}
)
return {"reply": result["llm_2"]["replies"][0].text if result["llm_2"]["replies"] else ""}

def run_chat_completion(self, model: str, messages: List[dict], body: dict) -> Union[str, Generator]: # noqa: ARG002, UP006
"""
Run the pipeline in streaming mode.

Both LLMs will automatically stream their responses thanks to
hayhooks' built-in multi-component streaming support.

We inject a visual separator between LLM 1 and LLM 2 outputs.
"""
question = get_last_user_message(messages)

def custom_streaming():
"""
Enhanced streaming that injects a visual separator between LLM outputs.

Uses StreamingChunk.component_info.name to reliably detect which component
is streaming, avoiding fragile chunk counting or heuristics.

NOTE: This is simply a workaround to inject a visual separator between LLM outputs.
"""
llm2_started = False

for chunk in streaming_generator(
pipeline=self.pipeline,
pipeline_run_args={
"prompt_builder_1": {"template_variables": {"query": question}},
},
):
# Use component_info to detect which LLM is streaming
if hasattr(chunk, "component_info") and chunk.component_info:
component_name = chunk.component_info.name

# When we see llm_2 for the first time, inject a visual separator
if component_name == "llm_2" and not llm2_started:
llm2_started = True
yield StreamingChunk(content="\n\n**[LLM 2 - Refining the response]**\n\n")

yield chunk

return custom_streaming()
Loading