Using LLamaIndex Workflow to Implement an Agent Handoff Feature Like OpenAI Swarm

Example: a customer service chatbot project

Using LLamaIndex Workflow to Implement an Agent Handoff Feature Like OpenAI Swarm.
Using LLamaIndex Workflow to Implement an Agent Handoff Feature Like OpenAI Swarm. Image by DALL-E-3

Happy Lunar New Year, my friends!

In the last article, I introduced the Workflow framework of LlamaIndex.

Deep Dive into LlamaIndex Workflow: Event-driven LLM architecture
What I think about the progress and shortcomings after practice

Today, I will show you how to use LlamaIndex Workflow to implement a multi-agent orchestration feature similar to OpenAI Swarm, using a customer service chatbot project as an example.

Introduction

Remember the Swarm framework released by OpenAI not long ago? Its biggest feature is agents and handoffs.

The agents are straightforward: they use a set of specific commands and tools to get tasks done. It's like putting a LLM function call into a neat package.

And handoffs are different. They allow an agent to pass the work to another agent seamlessly based on the context of the current conversation, making agents work together without any hiccups.

Why this is important

Let's look at a diagram explaining the whole process of a ReactAgent.

The ReactAgent needs at least three accesses to LLM to complete.
The ReactAgent needs at least three accesses to LLM to complete. Image by Author

Just a simple agent call, like one, two, three, needs at least three accesses to LLM to complete.

Traditional agent applications are like this, keeping conversation context and user state, and the agent call chain is usually fixed. For each user request, agents have to call LLM multiple times to check the state, and honestly, some calls are unnecessary.

Here's an example: imagine we have an e-commerce website, and we need a customer service team to answer users' questions.

In an agent chain, agents are invoked sequentially. Image by Author

In a chain agent application, every question from a user goes to the front desk, and then the front desk asks for the pre-sales service. If they can't answer, the front desk asks for after-sales service, and then the front desk reorganizes the answers from the backend and replies to the customer.

Isn't that silly? Look at all the unnecessary delays and call costs it causes!

How Swarm does it

Swarm uses a handoff approach that fits the real world better. Let me use that customer service example again:

Agent handoff allows you to interact directly with the corresponding customer service.
Agent handoff allows you to interact directly with the corresponding customer service. Image by Author

Imagine a store called Swarm. When a customer asks the front desk a question, the front desk figures out what kind of question it is (pre-sale or after-sale) and passes the customer to the corresponding service. Then, the customer talks to that service directly.

Sounds reasonable, right? So why don't we just use Swarm?

Why not just use Swarm

Because Swarm is still just an experimental framework. According to the official statement:

Swarm is currently an experimental sample framework intended to explore ergonomic interfaces for multi-agent systems. It is not intended to be used in production and therefore has no official support. (This also means we will not be reviewing PRs or issues!)

So, we can't use Swarm directly in production systems.

But what we need is the agent handoff capability, right? Since that's the case, why not build a similar framework yourself?

Today's article is written for this purpose. We will develop a project using a customer service system as an example, which will use Workflow to implement agent orchestration and handoff capabilities. Let's get started.


Project in Practice: A Customer Service Chatbot with Agent Handoff Capability

This project is quite complex. To help you understand my implementation, I have put the entire project code at the end of the article. You can freely read and modify it without my permission.

💡
Want to know more about my work in AI applications or the field of data science? Feel free to Subscribe Now, everything is free!

Step one, set up an interactive interface

Whether you use an agent or not, you always need to adjust your prompts and code logic. At this point, a what-you-see-is-what-you-get chat UI becomes very important.

In this section, I'll use chainlit to quickly implement a super cool web-based chat window.

Chainlit is a Python library built on Streamlit. This means you don't need any frontend skills to quickly build a Chatbot prototype. (Hooray)

Let's get moving.

The scaffold of our project.
The scaffold of our project. Image by Author

First, we create a .env file in the project's root directory, which stores important environmental variables like OPENAI_API_KEY and OPENAI_BASE_URL. Later, I will use dotenv to read it.

This is important because by using the .env file, you can strip the API_KEY from your code, then you can freely publish your code.

Next, we need to set up a simple project scaffold. Our project will contain two folders: src and data. Our Python source code files will be placed in the src folder, while text source files for RAG use will be placed in the data folder.

In the src directory, first create an app.py file, which will act as the view to launch the chainlit interface. This file consists of three parts:

  1. Code to prepare the Workflow program.
  2. Code to respond to the user lifecycle, outputting intermediate processes.
  3. Actual code to call the Workflow agent and conduct the conversation.

The code flowchart is shown below:

Flowchart of the project UI interface.
Flowchart of the project UI interface. Image by Author

As a production-ready system, we often need to connect to the enterprise-private deployment of large model ports. How to connect to a private large model can be referred to in this article.

How to Connect LlamaIndex with Private LLM API Deployments
When your enterprise doesn’t use public models like OpenAI

To make our customer service less rigid, we can set the temperature a bit higher. Here is the code for initializing the system environment, I will talk about the implementation of CustomerService later:

llm = OpenAILike(
    model="qwen-max-latest",
    is_chat_model=True,
    is_function_calling_model=True,
    temperature=0.35
)
Settings.llm = llm

Imagine, when the next customer service takes over to answer your question, what will she do first? Right, she needs to check the conversation history first.

So we need to create a unique, conversation-context and user-state-preserving workflow for each distinguished user in the user session:

GREETINGS = "Hello, what can I do for you?"


def ready_my_workflow() -> CustomerService:
    memory = ChatMemoryBuffer(
        llm=llm,
        token_limit=5000
    )

    agent = CustomerService(
        memory=memory,
        timeout=None,
        user_state=initialize_user_state()
    )
    return agent


def initialize_user_state() -> dict[str, str | None]:
    return {
        "name": None
    }


@cl.on_chat_start
async def start():
    workflow = ready_my_workflow()
    cl.user_session.set("workflow", workflow)

    await cl.Message(
        author="assistant", content=GREETINGS
    ).send()

At the same time, I will also use chainlit's cl.step decorator to implement a simple logging method, which can help us output some process logs on the page, letting users know where we are now:

@cl.step(type="run", show_input=False)
async def on_progress(message: str):
    return message

Then there is the main method, which is called every round of conversation.

@cl.on_message
async def main(message: cl.Message):
    workflow: CustomerService = cl.user_session.get("workflow")
    context = cl.user_session.get("context")
    msg = cl.Message(content="", author="assistant")
    user_msg = message.content
    handler = workflow.run(
        msg=user_msg,
        ctx=context
    )
    async for event in handler.stream_events():
        if isinstance(event, ProgressEvent):
            await on_progress(event.msg)

    await msg.send()
    result = await handler
    msg.content = result
    await msg.update()
    cl.user_session.set("context", handler.ctx)

In this method, we first get the user-inputted dialogue, then call the workflow's run method to start the agent routing, while iterating through the events in the workflow pipeline and calling on_progress to output to the page. Finally, we output the result of the dialogue on the page and update the Context.

To match the construction of the chainlit interface, we can first write a simple workflow:

class CustomerService(Workflow):
    def __init__(
            self,
            llm: OpenAILike | None = None,
            memory: ChatMemoryBuffer = None,
            user_state: dict[str, str | None] = None,
            *args,
            **kwargs
    ):
        self.llm = llm or Settings.llm
        self.memory = memory or ChatMemoryBuffer()
        self.user_state = user_state
        super().__init__(*args, **kwargs)

    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StopEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="We're making some progress."))
        return StopEvent(result="Hello World")

Tada, our interactive interface is out:

Our UI interface for this project.
Our UI interface for this project. Image by Author

Next, we can start preparing the ingredients for today, and text source files for RAG use.

Step two, generate text files

Since this project is about simulating a customer support team for an online drone e-commerce website, I plan to set the background to an online unmanned aerial vehicle e-commerce site.

I need two files: one file to introduce the drones being sold in the store and their details. Another file contains common FAQs about drone use and after-sales terms.

To avoid business and data licensing issues, I plan to use LLM to generate the text I want. I specifically instructed LLM not to include any brands or real product information.

Here is a screenshot of my file generation:

Screenshot of data file generated using LLM.
Screenshot of data file generated using LLM. Image by Author

You can take my prompt as a reference:

SKUS_TEMPLATE_EN = """
    You are the owner of an online drone store, please generate a description in English of all the drones for sale.
    Include the drone model number, selling price, detailed specifications, and a detailed description in more than 400 words.
    Do not include brand names.
    No less than 20 types of drones, ranging from consumer to industrial use.
"""
TERMS_TEMPLATE_EN = """
    You are the head of a brand's back office department, and you are asked to generate a standardized response to after-sales FAQs in English that is greater than 25,000 words.
    The text should include common usage questions, as well as questions related to returns and repairs after the sale.
    This text will be used as a reference for the customer service team when answering customer questions about after-sales issues.
    Only the body text is generated, no preamble or explanation is added.
"""

Step three, handle indexing and retrieve privatized data

The foundation LLM does not contain corporate internal data. For enterprise applications, it is inevitable to use RAG to allow the LLM to access corporate privatized data.

Our drone store is no exception. Before letting the agent staff start work, we need to provide them with some tools to access the product catalog and after-sales policy.

LlamaIndex provides many indexes suitable for different occasions. If used in a real system, I would prefer to use KnowledgeGraphIndex for product information text.

However, to make the sample project easy to understand, I still choose to use chromadb and VectorStoreIndex:

def get_index(collection_name: str,
              files: list[str]) -> VectorStoreIndex:
    chroma_client = chromadb.PersistentClient(path="temp/.chroma")

    collection = chroma_client.get_or_create_collection(collection_name)
    vector_store = ChromaVectorStore(chroma_collection=collection)
    storage_context = StorageContext.from_defaults(vector_store=vector_store)

    ready = collection.count()
    if ready > 0:
        print("File already loaded")
        index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
    else:
        print("File not loaded.")
        docs = SimpleDirectoryReader(input_files=files).load_data()
        index = VectorStoreIndex.from_documents(
            docs, storage_context=storage_context, embed_model=embed_model,
            transformer=[SentenceSplitter(chunk_size=512, chunk_overlap=20)]
        )

    return index


INDEXES = {
    "SKUS": get_index("skus_docs", ["data/skus_en.txt"]),
    "TERMS": get_index("terms_docs", ["data/terms_en.txt"])
}

The running flowchart of this code is as follows:

The running flowchart of the code.
The running flowchart of the code. Image by Author

If vector data already exists, return the index directly. If the data has not been loaded yet, first load the data into the vector store, then return the index.

Then we add a tool method to help the agent get the corresponding retriever:

async def query_docs(
        index: VectorStoreIndex, query: str,
        similarity_top_k: int = 1
) -> str:
    retriever = index.as_retriever(similarity_top_k=similarity_top_k)
    nodes = await retriever.aretrieve(query)
    result = ""
    for node in nodes:
        result += node.get_content() + "\n\n"
    return result
💡
Want to know more about my work in AI applications or the field of data science? Feel free to Subscribe Now, everything is free!

Step four, hire a few agents

Since we are building a smart customer service project, it is necessary to hire a few customer service agents.

💡 Unlock Full Access for Free!
Subscribe now to read this article and get instant access to all exclusive member content + join our data science community discussions.