The current landscape of Gen AI is rapidly changing thereby demanding more flexible and powerful tools to handle complex workflows and data processing pipelines. The traditional approaches based on DAG are not optimal in this regard as they cannot accommodate feedback mechanisms or loops which are very important in AI/LLM based agentic systems. This article explores the recently released LlamaIndex workflows and demonstrates its utility in building LLM applications around RAG.
Table of Contents
- Workflows in Data Pipelines
- Overview of LlamaIndex Workflows
- Building a RAG Workflow using LlamaIndex Workflows
Workflows in Data Pipelines
Workflows are an important approach in designing, building and implementing applications which require an exhaustive data processing pipeline. These use the concept of Directed Acyclic Graphs (DAGs) for designing linear flow of tasks or events conceptually. DAGs are directed graphs with no loops i.e., the edge directed from one vertex to another never forms a closed loop. In data processing and engineering environments, the data pipeline, a series of computations and processing flow is often represented using a DAG. This facilitates rapid understanding of data lineage and its usability.
One of the primary benefits of DAG-based workflows is their flexibility and scalability. They allow for parallel execution of independent tasks, significantly improving processing speed and efficiency. It also facilitates easier maintenance and updates to the pipeline, as new tasks can be added or existing ones modified without disrupting the entire workflow. This modular approach supports iterative development and allows teams to gradually enhance their data pipelines over time. DAGs also enable better resource management, as the system can allocate computational resources more effectively based on the structure and requirements of the workflow.
However, implementing DAG-based workflows in data pipelines also comes with challenges, especially in the non-linear AI based agent development. One significant issue is the complexity of designing and managing large DAGs, especially in systems with numerous interdependent tasks. As the number of nodes and edges increases, it becomes more difficult to visualize and debug the workflow. DAGs are also acyclic in nature, meaning that they cannot implement loops if implemented in an AI application’s logic, which is not permissible in case of agentic/self-correction based approaches.
Overview of LlamaIndex Workflows
LlamaIndex announced Workflows Beta on Aug 1, 2024 which is a new method for creating complex AI applications using workflow orchestration mechanisms. This approach uses a dispatch mechanism for events to go back and forth through a collection of Python functions called steps. Each step corresponds to a component of the system which can be a simple implementation of a web page reader, vector data storage, LLM response generation, etc.
Each step in the workflow processes events and can relay events to other components as when required. Workflows in LlamaIndex are simple event-driven abstractions which can be implemented to chain multiple events. LlamaIndex workflows are automatically instrumented providing observability and explainability as well.
Building a RAG Workflow using LlamaIndex Workflows
Let us implement a simple RAG workflow using LlamaIndex Workflows.
Step 1: Install the required libraries –
- llama-index – Base library for using LlamaIndex framework
- llama-index-llms-openai – We use this library to work with OpenAI models and LlamaIndex.
- llama-index-readers-web – We will use it for reading and scraping data off a web site
- pyvis – It will be used for creating network visualizations
!pip install -U --quiet llama-index llama-index-llms-openai llama-index-readers-web pyvis
Step 2: Set up the OpenAI API key –
from google.colab import userdata
import os
os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI_API_KEY")
Step 3: LlamaIndex workflows use user-defined Pydantic objects as events.
from llama_index.core.workflow import Event
from llama_index.core.schema import NodeWithScore
class RetrieverEvent(Event):
nodes: list[NodeWithScore]
Step 4: Let us now construct the workflow and steps –
from llama_index.core import VectorStoreIndex
from llama_index.readers.web import SimpleWebPageReader
from llama_index.core.response_synthesizers import CompactAndRefine
from llama_index.core.workflow import (
Context,
Workflow,
StartEvent,
StopEvent,
step,
)
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
class RAGWorkflow(Workflow):
@step(pass_context=True)
async def ingest(self, ctx: Context, ev: StartEvent) -> StopEvent | None:
"""Entry point to ingest a document, triggered by a StartEvent with `dirname`."""
dirname = ev.get("dirname")
if not dirname:
return None
documents = SimpleWebPageReader(html_to_text=True).load_data(
["https://jhmhp.amegroups.org/article/view/8842/html"])
ctx.data["index"] = VectorStoreIndex.from_documents(
documents=documents,
embed_model=OpenAIEmbedding(model_name="text-embedding-3-small"),
)
return StopEvent(result=f"Indexed {len(documents)} documents.")
@step(pass_context=True)
async def retrieve(
self, ctx: Context, ev: StartEvent
) -> RetrieverEvent | None:
"RAG Entry, triggered by a StartEvent with `query`."
query = ev.get("query")
if not query:
return None
print(f"Query the database with: {query}")
# storing the query in the global context
ctx.data["query"] = query
# getting the index from the global context
index = ctx.data.get("index")
if index is None:
print("Index is empty, load some documents before querying!")
return None
retriever = index.as_retriever(similarity_top_k=2)
nodes = retriever.retrieve(query)
print(f"Retrieved {len(nodes)} nodes.")
return RetrieverEvent(nodes=nodes)
@step(pass_context=True)
async def synthesize(self, ctx: Context, ev: RetrieverEvent) -> StopEvent:
llm = OpenAI(model="gpt-4o-mini")
summarizer = CompactAndRefine(llm=llm, streaming=True, verbose=True)
query = ctx.data.get("query")
response = await summarizer.asynthesize(query, nodes=ev.nodes)
return StopEvent(result=response)
The code above implements three primary steps consisting of function definitions holding the data ingestion from the specified web url, retrieval and synthesize processes for generating appropriate response. The system uses the workflow context to store the user query, and it specifies the event sequences for node passing and streaming the final response.
Step 5: Executing and visualizing the workflow –
from llama_index.core.workflow import (
draw_all_possible_flows,
draw_most_recent_execution,
)
draw_all_possible_flows(RAGWorkflow, filename="rag_flow.html")
w = RAGWorkflow()
await w.run(dirname="data")
result = await w.run(query="What is the data about?")
async for chunk in result.async_response_gen():
print(chunk, end="", flush=True)
draw_most_recent_execution(w, filename="rag_flow_recent.html")
Output:
rag_flow.html
Query the database with: What is the data about?Retrieved 2 nodes.
The data pertains to a narrative review discussing the advancement of generative artificial intelligence in healthcare. It includes references to various articles, their citations, and options for accessing the content, such as PDF and full text. Additionally, it outlines the journal’s aims and scope, as well as information for authors, reviewers, and ethical policies related to publication.
rag_flow_recent.html
Check the generated html files for the overall workflow(rag_flow.html) and the current execution flow (rag_flow_recent.html) as shown below:
rag_flow.html
rag_flow_recent.html
Final Words
Recently released LlamaIndex workflows are in beta but still pack a powerful punch when it comes to event-driven abstractions and modularity based designs in building complex AI applications. With built-in visualization and observability abilities, LlamaIndex workflows offer a promising solution in designing and building scalable and maintainable AI-driven data processing systems and pipelines.