Skip to content

Commit 665b6a4

Browse files
authored
Add support for async pipeline wrapper methods (#122)
* Add async pipeline wrapper methods support ; ensure current logic doesn't break * fixed all mypy warnings * add missing yield * remove docstring ; reformat * refactor * Add test for async pipeline * remove unneeded fixtures * fix some lint errors * Add support for AsyncGenerator as pipeline result in chat_endpoint * add async_streaming_generator and expose it * refactor * refactor * divide unit / it tests * cleanup * do not mark mcp tests as integration * add async QA pipeline for some IT tests * add test for streaming utilities * Fix test * refactor * Better testing of streaming generators (sync/async) * Add it tests for streaming generators ; Further improve exceptions handling * Fix tests for python 3.9 * Update README adding docs about new async features * Add a simple AsyncPipeline + async_streaming_generator example * Align example pipeline with test one * Add comment * Update README
1 parent 8a220bf commit 665b6a4

File tree

24 files changed

+1208
-137
lines changed

24 files changed

+1208
-137
lines changed

README.md

Lines changed: 108 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
# Hayhooks
22

3-
**Hayhooks** makes it easy to deploy and serve [Haystack](https://haystack.deepset.ai/) pipelines as REST APIs.
3+
**Hayhooks** makes it easy to deploy and serve [Haystack](https://haystack.deepset.ai/) pipelines.
44

5-
It provides a simple way to wrap your Haystack pipelines with custom logic and expose them via HTTP endpoints, including OpenAI-compatible chat completion endpoints. With Hayhooks, you can quickly turn your Haystack pipelines into API services with minimal boilerplate code.
5+
With Hayhooks, you can:
6+
7+
- **Deploy your Haystack pipelines as REST APIs** with maximum flexibility and minimal boilerplate code.
8+
- **Expose your Haystack pipelines over the MCP protocol**, making them available as tools in AI dev environments like [Cursor](https://cursor.com) or [Claude Desktop](https://claude.ai/download). Under the hood, Hayhooks runs as an [MCP Server](https://modelcontextprotocol.io/docs/concepts/architecture), exposing each pipeline as an [MCP Tool](https://modelcontextprotocol.io/docs/concepts/tools).
9+
- **Expose your Haystack pipelines as OpenAI-compatible chat completion backends** with streaming support (to be used with [open-webui](https://openwebui.com) or any other OpenAI compatible client).
10+
- **Control Hayhooks core APIs through chat** - deploy, undeploy, list, or run Haystack pipelines by chatting with [Claude Desktop](https://claude.ai/download), [Cursor](https://cursor.com), or any other MCP client.
611

712
[![PyPI - Version](https://img.shields.io/pypi/v/hayhooks.svg)](https://pypi.org/project/hayhooks)
813
[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/hayhooks.svg)](https://pypi.org/project/hayhooks)
@@ -26,6 +31,7 @@ It provides a simple way to wrap your Haystack pipelines with custom logic and e
2631
- [PipelineWrapper](#why-a-pipeline-wrapper)
2732
- [Setup Method](#setup)
2833
- [Run API Method](#run_api)
34+
- [Async Run API Method](#run_api_async)
2935
- [PipelineWrapper development with `overwrite` option](#pipelinewrapper-development-with-overwrite-option)
3036
- [Additional Dependencies](#additional-dependencies)
3137
- [Support file uploads](#support-file-uploads)
@@ -45,7 +51,9 @@ It provides a simple way to wrap your Haystack pipelines with custom logic and e
4551
- [OpenAI-compatible endpoints generation](#openai-compatible-endpoints-generation)
4652
- [Using Hayhooks as `open-webui` backend](#using-hayhooks-as-open-webui-backend)
4753
- [Run Chat Completion Method](#run_chat_completion)
54+
- [Async Run Chat Completion Method](#run_chat_completion_async)
4855
- [Streaming Responses](#streaming-responses-in-openai-compatible-endpoints)
56+
- [Async Streaming Generator](#async_streaming_generator)
4957
- [Integration with haystack OpenAIChatGenerator](#integration-with-haystack-openaichatgenerator)
5058
- [Advanced Usage](#advanced-usage)
5159
- [Run Hayhooks Programmatically](#run-hayhooks-programmatically)
@@ -232,6 +240,30 @@ The input arguments will be used to generate a Pydantic model that will be used
232240

233241
**NOTE**: Since Hayhooks will _dynamically_ create the Pydantic models, you need to make sure that the input arguments are JSON-serializable.
234242

243+
#### run_api_async(...)
244+
245+
This method is the asynchronous version of `run_api`. It will be used to run the pipeline in API mode when you call the `{pipeline_name}/run` endpoint, but handles requests asynchronously for better performance under high load.
246+
247+
**You can define the input arguments of the method according to your needs**, just like with `run_api`.
248+
249+
```python
250+
async def run_api_async(self, urls: List[str], question: str, any_other_user_defined_argument: Any) -> str:
251+
# Use async/await with AsyncPipeline or async operations
252+
result = await self.pipeline.run_async({"fetcher": {"urls": urls}, "prompt": {"query": question}})
253+
return result["llm"]["replies"][0]
254+
```
255+
256+
This is particularly useful when:
257+
258+
- Working with `AsyncPipeline` instances that support async execution
259+
- Integrating with async-compatible Haystack components (e.g., `OpenAIChatGenerator` with async support)
260+
- Handling I/O-bound operations more efficiently
261+
- Deploying pipelines that need to handle many concurrent requests
262+
263+
**NOTE**: You can implement either `run_api`, `run_api_async`, or both. Hayhooks will automatically detect which methods are implemented and route requests accordingly.
264+
265+
You can find complete working examples of async pipeline wrappers in the [test files](tests/test_files/files/async_question_answer) and [async streaming examples](tests/test_files/files/async_chat_with_website_streaming).
266+
235267
To deploy the pipeline, run:
236268

237269
```shell
@@ -588,9 +620,38 @@ Here's a video example:
588620

589621
![chat-completion-example](./docs/assets/chat-completion.gif)
590622

623+
#### run_chat_completion_async(...)
624+
625+
This method is the asynchronous version of `run_chat_completion`. It handles OpenAI-compatible chat completion requests asynchronously, which is particularly useful for streaming responses and high-concurrency scenarios.
626+
627+
```python
628+
async def run_chat_completion_async(self, model: str, messages: List[dict], body: dict) -> Union[str, AsyncGenerator]:
629+
log.trace(f"Running pipeline with model: {model}, messages: {messages}, body: {body}")
630+
631+
question = get_last_user_message(messages)
632+
log.trace(f"Question: {question}")
633+
634+
# For async streaming responses
635+
return async_streaming_generator(
636+
pipeline=self.pipeline,
637+
pipeline_run_args={"fetcher": {"urls": URLS}, "prompt": {"query": question}},
638+
)
639+
```
640+
641+
Like `run_chat_completion`, this method has a **fixed signature** and will be called with the same arguments. The key differences are:
642+
643+
- It's declared as `async` and can use `await` for asynchronous operations
644+
- It can return an `AsyncGenerator` for streaming responses using `async_streaming_generator`
645+
- It provides better performance for concurrent chat requests
646+
- It's required when using async streaming with components that support async streaming callbacks
647+
648+
**NOTE**: You can implement either `run_chat_completion`, `run_chat_completion_async`, or both. When both are implemented, Hayhooks will prefer the async version for better performance.
649+
650+
You can find complete working examples combining async chat completion with streaming in the [async streaming test examples](tests/test_files/files/async_question_answer).
651+
591652
### Streaming responses in OpenAI-compatible endpoints
592653

593-
Hayhooks now provides a `streaming_generator` utility function that can be used to stream the pipeline output to the client.
654+
Hayhooks provides `streaming_generator` and `async_streaming_generator` utility functions that can be used to stream the pipeline output to the client.
594655

595656
Let's update the `run_chat_completion` method of the previous example:
596657

@@ -634,10 +695,54 @@ You will see the pipeline output being streamed [in OpenAI-compatible format](ht
634695

635696
Since output will be streamed to `open-webui` there's **no need to change `Stream Chat Response`** chat setting (leave it as `Default` or `On`).
636697

698+
You can find a complete working example of `streaming_generator` usage in the [examples/pipeline_wrappers/chat_with_website_streaming](examples/pipeline_wrappers/chat_with_website_streaming) directory.
699+
637700
Here's a video example:
638701

639702
![chat-completion-streaming-example](./docs/assets/chat-completion-streaming.gif)
640703

704+
#### async_streaming_generator
705+
706+
For asynchronous pipelines or when you need better concurrency handling, Hayhooks also provides an `async_streaming_generator` utility function:
707+
708+
```python
709+
from pathlib import Path
710+
from typing import AsyncGenerator, List, Union
711+
from haystack import AsyncPipeline
712+
from hayhooks import get_last_user_message, BasePipelineWrapper, log, async_streaming_generator
713+
714+
715+
URLS = ["https://haystack.deepset.ai", "https://www.redis.io", "https://ssi.inc"]
716+
717+
718+
class PipelineWrapper(BasePipelineWrapper):
719+
def setup(self) -> None:
720+
pipeline_yaml = (Path(__file__).parent / "chat_with_website.yml").read_text()
721+
self.pipeline = AsyncPipeline.loads(pipeline_yaml) # Note: AsyncPipeline
722+
723+
async def run_chat_completion_async(self, model: str, messages: List[dict], body: dict) -> AsyncGenerator:
724+
log.trace(f"Running pipeline with model: {model}, messages: {messages}, body: {body}")
725+
726+
question = get_last_user_message(messages)
727+
log.trace(f"Question: {question}")
728+
729+
# Async streaming pipeline run, will return an async generator
730+
return async_streaming_generator(
731+
pipeline=self.pipeline,
732+
pipeline_run_args={"fetcher": {"urls": URLS}, "prompt": {"query": question}},
733+
)
734+
```
735+
736+
The `async_streaming_generator` function:
737+
738+
- Works with both `Pipeline` and `AsyncPipeline` instances
739+
- Requires **components that support async streaming callbacks** (e.g., `OpenAIChatGenerator` instead of `OpenAIGenerator`)
740+
- Provides better performance for concurrent streaming requests
741+
- Returns an `AsyncGenerator` that yields chunks asynchronously
742+
- Automatically handles async pipeline execution and cleanup
743+
744+
**NOTE**: The streaming component in your pipeline must support async streaming callbacks. If you get an error about async streaming support, either use the sync `streaming_generator` or switch to async-compatible components.
745+
641746
### Integration with haystack OpenAIChatGenerator
642747

643748
Since Hayhooks is OpenAI-compatible, it can be used as a backend for the [haystack OpenAIChatGenerator](https://docs.haystack.deepset.ai/docs/openaichatgenerator).
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from pathlib import Path
2+
from typing import AsyncGenerator, List
3+
from haystack import AsyncPipeline
4+
from haystack.dataclasses import ChatMessage
5+
from hayhooks import (
6+
get_last_user_message,
7+
BasePipelineWrapper,
8+
log,
9+
async_streaming_generator,
10+
)
11+
12+
13+
SYSTEM_MESSAGE = "You are a helpful assistant that can answer questions about the world."
14+
15+
16+
class PipelineWrapper(BasePipelineWrapper):
17+
def setup(self) -> None:
18+
pipeline_yaml = (Path(__file__).parent / "question_answer.yml").read_text()
19+
self.pipeline = AsyncPipeline.loads(pipeline_yaml)
20+
21+
async def run_api_async(self, question: str) -> str:
22+
log.trace(f"Running pipeline with question: {question}")
23+
24+
result = await self.pipeline.run_async(
25+
{
26+
"prompt_builder": {
27+
"template": [
28+
ChatMessage.from_system(SYSTEM_MESSAGE),
29+
ChatMessage.from_user(question),
30+
]
31+
}
32+
}
33+
)
34+
return result["llm"]["replies"][0].text
35+
36+
async def run_chat_completion_async(self, model: str, messages: List[dict], body: dict) -> AsyncGenerator:
37+
log.trace(f"Running pipeline with model: {model}, messages: {messages}, body: {body}")
38+
39+
question = get_last_user_message(messages)
40+
log.trace(f"Question: {question}")
41+
42+
return async_streaming_generator(
43+
pipeline=self.pipeline,
44+
pipeline_run_args={
45+
"prompt_builder": {
46+
"template": [
47+
ChatMessage.from_system(SYSTEM_MESSAGE),
48+
ChatMessage.from_user(question),
49+
]
50+
},
51+
},
52+
)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
components:
2+
llm:
3+
init_parameters:
4+
api_base_url: null
5+
api_key:
6+
env_vars:
7+
- OPENAI_API_KEY
8+
strict: true
9+
type: env_var
10+
generation_kwargs: {}
11+
http_client_kwargs: null
12+
max_retries: null
13+
model: gpt-4o-mini
14+
organization: null
15+
streaming_callback: null
16+
timeout: null
17+
tools: null
18+
tools_strict: false
19+
type: haystack.components.generators.chat.openai.OpenAIChatGenerator
20+
prompt_builder:
21+
init_parameters:
22+
required_variables: null
23+
template: null
24+
variables: null
25+
type: haystack.components.builders.chat_prompt_builder.ChatPromptBuilder
26+
connection_type_validation: true
27+
connections:
28+
- receiver: llm.messages
29+
sender: prompt_builder.prompt
30+
max_runs_per_component: 100
31+
metadata: {}

pyproject.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ installer = "uv"
6060
features = ["mcp"]
6161

6262
[tool.hatch.envs.test.scripts]
63+
unit = "pytest -vv -m 'not integration' {args:tests}"
64+
integration = "pytest -vv -m integration {args:tests}"
6365
all = "pytest -vv {args:tests}"
6466
all-cov = "coverage run -m pytest {args:tests}"
6567
cov-report = [
@@ -187,3 +189,6 @@ filterwarnings = [
187189
"ignore::UserWarning",
188190
]
189191
asyncio_mode = "auto"
192+
markers = [
193+
"integration: marks tests as integration tests"
194+
]

src/hayhooks/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
is_user_message,
66
get_last_user_message,
77
streaming_generator,
8+
async_streaming_generator
89
)
910

1011
__all__ = [
@@ -13,5 +14,6 @@
1314
"is_user_message",
1415
"get_last_user_message",
1516
"streaming_generator",
17+
"async_streaming_generator",
1618
"create_app",
1719
]

src/hayhooks/server/pipelines/registry.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
from typing import Any, Dict, Optional, Tuple, Union
2-
from haystack import Pipeline
1+
from typing import Any, Dict, Optional, Union
2+
from haystack import DeserializationError, Pipeline, AsyncPipeline
33
from haystack.core.errors import PipelineError
44
from hayhooks.server.utils.base_pipeline_wrapper import BasePipelineWrapper
55
from hayhooks.server.exceptions import PipelineNotFoundError
66

7-
PipelineType = Union[Pipeline, BasePipelineWrapper]
7+
PipelineType = Union[Pipeline, AsyncPipeline, BasePipelineWrapper]
88

99

1010
class _PipelineRegistry:
@@ -19,12 +19,12 @@ def add(
1919
msg = f"A pipeline with name {name} is already in the registry."
2020
raise ValueError(msg)
2121

22-
if isinstance(source_or_pipeline, (Pipeline, BasePipelineWrapper)):
22+
if isinstance(source_or_pipeline, (Pipeline, AsyncPipeline, BasePipelineWrapper)):
2323
pipeline = source_or_pipeline
2424
else:
2525
try:
2626
pipeline = Pipeline.loads(source_or_pipeline)
27-
except PipelineError as e:
27+
except (PipelineError, DeserializationError) as e:
2828
msg = f"Unable to parse Haystack Pipeline {name}: {e}"
2929
raise ValueError(msg) from e
3030

0 commit comments

Comments
 (0)