Skip to content

Create cookbook for LlamaIndex Workflow abstraction #138

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
31 changes: 31 additions & 0 deletions llamaindex-workflow/README.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# LlamaIndex Workflow meets Chainlit
This cookbook aims to show how to use LlamaIndex's latest Workflow abstraction with Chainlit!

<img src = "app.png">

## What is a Workflow?
A workflow is an event-driven abstraction. Unlike having to define agentic systems as digraphs, LlamaIndex's Workflow abstraction allows you to just define the events within the workflow and what happens when these events get triggered. The result is being able to write performant multi-agent / multi-llm applications within far fewer lines of code. Oh and it has type checking too so that goodbye type-related bugs!

## Getting started
You will need your TAVILY API key and OpenAI API key to run this demo. Key those into your .env file.

To install requirements:
```
pip install -r requirements.txt
```

To run the app
```
chainlit run app.py --watch
```

## The app
Our workflow application includes a function calling agent and a simple chat engine. When asked a question, the workflow triggers 2 events: a search online and answer event and a "simply answer" event without any searching. We route to both events whenever we receive a StartEvent (i.e. whenever we receive a question).
- The first event uses Tavily to do the online search, and the agent then extracts the search results to reply.
- The second event simply uses the LLM to reply without any searching.

Since the second event might cause the LLM to hallucinate, we have a 3rd `compilation` event to select the best answer.

It really is quite simple to visualize the "chain of thought" in the workflow - simply add a `@cl.step` decorator on top of LlamaIndex's Workflow `@step` decorator for each workflow event.

<img src = "workflow.png">
Binary file added llamaindex-workflow/app.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
224 changes: 224 additions & 0 deletions llamaindex-workflow/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
import chainlit as cl

import os
from dotenv import load_dotenv, find_dotenv

from llama_index.core.agent import ReActAgent
from llama_index.core.base.llms.types import ChatMessage, MessageRole
from llama_index.core.chat_engine import SimpleChatEngine
from llama_index.core.llms import LLM
from llama_index.core.workflow import (
Workflow,
Context,
Event,
StartEvent,
StopEvent,
step
)
from llama_index.llms.gemini import Gemini
## Utility function to draw out the workflow
# from llama_index.utils.workflow import (
# draw_all_possible_flows
# )
from llama_index.tools.tavily_research import TavilyToolSpec

from typing import Optional, Annotated, List, Any

_ = load_dotenv(find_dotenv())

llm = Gemini(model="models/gemini-1.5-flash-latest")

### Define tools
search_tool_spec = TavilyToolSpec(api_key=os.getenv("TAVILY"))
search_tools = search_tool_spec.to_tool_list()

### Define events
class SearchEvent(Event):
"""Requires the LLM to do an online search to answer the question"""
query: Annotated[str, "The user's query"]

class AnswerEvent(Event):
"""Allows the LLM to answer the question without searching"""
query: Annotated[str, "The user's query"]

class ResponseEvent(Event):
"""Collects LLM response"""
query: Annotated[str, "The user's query"]
answer: Annotated[str, "The LLM's response"]

### Define workfow
class HashTagWorkflow(Workflow):
def __init__(self, *args: Any, **kwargs: Any):
self.llm = llm
super().__init__(*args, **kwargs)

@cl.step(type="llm")
@step
async def generate_hashtag(self, ev: StartEvent) -> StopEvent:
response = await self.llm.acomplete(f"Generate 1-3 hashtags related to {ev.response}")
return StopEvent(str(response))

class MixtureOfAnswers(Workflow):
def __init__(
self,
*args: Any,
llm: Optional[LLM] = llm,
**kwargs: Any
):
"""Class constructor. Takes in an llm instance and constructs
1. A function calling agent with search tools
2. A simple chat engine instance
3. A common memory instance across the workflow

Args:
llm (Optional[LLM], optional): LLM instance. Defaults to Settings.llm.
"""
super().__init__(*args, **kwargs)
self.llm = llm
self.search_agent=ReActAgent.from_tools(
tools = search_tools,
llm = self.llm
)
# self.search_agent = self.search_agent_worker.as_agent()
self.answer_without_search_engine = SimpleChatEngine.from_defaults(
llm = self.llm
)
self.history: List[ChatMessage] = []

@cl.step(type="llm")
@step()
async def route_to_llm(
self,
ev: StartEvent
) -> SearchEvent | AnswerEvent:
"""Generates a search event and an answer event once given a start event"""

## Update memory
self.history.append(
ChatMessage(
role = MessageRole.USER,
content = ev.query
)
)

## Routes to both events. But you can also write a router component to decide
## which event to route to.
self.send_event(SearchEvent(query = ev.query))
self.send_event(AnswerEvent(query = ev.query))

@cl.step(type="tool")
@step()
async def search_and_answer(
self,
ev: SearchEvent
) -> ResponseEvent:
"""Uses the tavily search tool to answer the question"""

## Synthesize response
response = await self.search_agent.achat(
ev.query,
chat_history = self.history
)

## [OPTIONAL] Show intermediate response in the frontend
# await cl.Message(content="ANSWER WITH SEARCH: " + str(response)).send()

## Update memory
self.history.append(
ChatMessage(
role = MessageRole.ASSISTANT,
content = "ANSWER WITH SEARCH: " + str(response)
)
)

return ResponseEvent(query = ev.query, answer = str(response))

@cl.step(type="llm")
@step()
async def simply_answer(
self,
ev: AnswerEvent
) -> ResponseEvent:
"""Uses the LLM to simple answer the question"""

## Synthesize response
response = await self.answer_without_search_engine.achat(
ev.query,
chat_history = self.history
)

## [OPTIONAL] Show intermediate response in the frontend
# await cl.Message(content="ANSWER WITHOUT SEARCH: " + str(response)).send()

## Update memory
self.history.append(
ChatMessage(
role = MessageRole.ASSISTANT,
content = "ANSWER WITHOUT SEARCH: " + str(response)
)
)

return ResponseEvent(query = ev.query, answer = str(response))

@cl.step(type="llm")
@step()
async def compile(
self,
ctx: Context,
ev: ResponseEvent,
hashtag_workflow: Workflow = HashTagWorkflow()
) -> StopEvent:
"""Compiles and summarizes answers from all response events"""

## There are 2 response events from routing to 2 different agents. This can
## also be a dynamic number of events.
ready = ctx.collect_events(ev, [ResponseEvent] * 2)

if ready is None:
return None

response = await self.llm.acomplete(
f"""
A user has asked us a question and we have responded accordingly using a
search tool and without using a search tool. Your job is to decide which
response best answered the question and summarize the response into a crisp
reply. If both responses answered the question, summarize both responses
into a single answer.

The user's query was: {ev.query}

The responses are:
{ready[0].answer} &
{ready[1].answer}
"""
)

## Add hashtag
hashtag = await hashtag_workflow.run(response=str(response))

## Update memory
self.history.append(
ChatMessage(
role = MessageRole.ASSISTANT,
content = "FINAL ANSWER: " + str(response) + str(hashtag)
)
)

return StopEvent(result = str(response) + str(hashtag))

### Define the app - with just a few lines of code
@cl.on_chat_start
async def on_chat_start():
app = MixtureOfAnswers(
verbose = True,
timeout = 6000
) #The app times out if it runs for 6000s without any result
app.add_workflows(hashtag_workflow = HashTagWorkflow(timeout=6000))
cl.user_session.set("app", app)
await cl.Message("Hello! Ask me anything!").send()

@cl.on_message
async def on_message(message: cl.Message):
app = cl.user_session.get("app")
result = await app.run(query = message.content)
await cl.Message(content = result).send()
14 changes: 14 additions & 0 deletions llamaindex-workflow/chainlit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Welcome to Chainlit! 🚀🤖

Hi there, Developer! 👋 We're excited to have you on board. Chainlit is a powerful tool designed to help you prototype, debug and share applications built on top of LLMs.

## Useful Links 🔗

- **Documentation:** Get started with our comprehensive [Chainlit Documentation](https://docs.chainlit.io) 📚
- **Discord Community:** Join our friendly [Chainlit Discord](https://discord.gg/k73SQ3FyUh) to ask questions, share your projects, and connect with other developers! 💬

We can't wait to see what you create with Chainlit! Happy coding! 💻😊

## Welcome screen

To modify the welcome screen, edit the `chainlit.md` file at the root of your project. If you do not want a welcome screen, just leave this file empty.
Loading