Source code for airflow.providers.common.ai.example_dags.example_llamaindex_rag

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAGs demonstrating RAG pipelines with LlamaIndex operators.

Three patterns:

1. Full RAG pipeline -- load -> embed -> retrieve -> answer in one DAG.
2. Separate index/query DAGs -- production-shaped split (scheduled
   indexing job + on-demand query DAG).
3. Multi-source RAG -- combine multiple loaders with source metadata.

The ``LLMOperator`` synthesis step uses a ``pydanticai_default`` connection
because :class:`~airflow.providers.common.ai.operators.llm.LLMOperator` is
pydantic-ai-backed; the LlamaIndex operators use ``llamaindex_default``.
The two connection types are intentional -- they back different frameworks.
"""

from __future__ import annotations

from airflow.providers.common.ai.operators.document_loader import DocumentLoaderOperator
from airflow.providers.common.ai.operators.llamaindex_embedding import LlamaIndexEmbeddingOperator
from airflow.providers.common.ai.operators.llamaindex_retrieval import LlamaIndexRetrievalOperator
from airflow.providers.common.ai.operators.llm import LLMOperator
from airflow.providers.common.compat.sdk import dag, task

# ---------------------------------------------------------------------------
# 1. Full RAG pipeline: load -> embed -> retrieve -> answer
# ---------------------------------------------------------------------------


# [START howto_llamaindex_rag_pipeline]
@dag(schedule=None, tags=["example"])
[docs] def example_llamaindex_rag_pipeline(): """End-to-end RAG pipeline in a single DAG. 1. Parse local text files into document dicts. 2. Chunk and embed the documents, persisting the index to disk. 3. Retrieve relevant chunks for a user question. 4. Synthesize an answer using the retrieved context. """ load = DocumentLoaderOperator( task_id="load_docs", source_path="/opt/airflow/data/knowledge_base/", file_extensions=[".txt", ".md", ".pdf"], ) embed = LlamaIndexEmbeddingOperator( task_id="embed_docs", documents=load.output, embed_model="text-embedding-3-small", llm_conn_id="llamaindex_default", chunk_size=512, chunk_overlap=50, persist_dir="/opt/airflow/data/indexes/kb_index", ) retrieve = LlamaIndexRetrievalOperator( task_id="retrieve", query="What are the main components of Apache Airflow?", index_persist_dir="/opt/airflow/data/indexes/kb_index", embed_model="text-embedding-3-small", llm_conn_id="llamaindex_default", top_k=5, ) @task def format_context(retrieval_result: dict) -> str: chunks = retrieval_result["chunks"] return "\n\n---\n\n".join(chunk["text"] for chunk in chunks) context = format_context(retrieve.output) answer = LLMOperator( task_id="answer", prompt=( "Using the context below, answer the question: " "What are the main components of Apache Airflow?\n\n" "Context:\n{{ ti.xcom_pull(task_ids='format_context') }}" ), llm_conn_id="pydanticai_default", system_prompt="Answer based only on the provided context. Cite sources when possible.", ) embed >> retrieve >> context >> answer
# [END howto_llamaindex_rag_pipeline] example_llamaindex_rag_pipeline() # --------------------------------------------------------------------------- # 2. Production-shaped split: scheduled indexing + on-demand query # --------------------------------------------------------------------------- # [START howto_llamaindex_index_dag] @dag(schedule="@weekly", tags=["example"])
[docs] def example_llamaindex_index_pdf(): """Weekly indexing DAG -- keep the vector index fresh as PDFs arrive. The companion query DAG (below) reads the persisted index on demand. """ load = DocumentLoaderOperator( task_id="load_pdfs", source_path="/opt/airflow/data/reports/*.pdf", ) build_index = LlamaIndexEmbeddingOperator( task_id="build_index", documents=load.output, embed_model="text-embedding-3-small", llm_conn_id="llamaindex_default", chunk_size=1024, chunk_overlap=100, persist_dir="/opt/airflow/data/indexes/reports_index", ) load >> build_index
# [END howto_llamaindex_index_dag] example_llamaindex_index_pdf() # [START howto_llamaindex_query_dag] @dag( schedule=None, params={"question": "Summarize the key findings from the latest quarterly report."}, tags=["example"], )
[docs] def example_llamaindex_query(): """On-demand query DAG -- retrieve from a pre-built index and synthesize. Trigger manually or via API with a ``question`` parameter. """ retrieve = LlamaIndexRetrievalOperator( task_id="retrieve", query="{{ params.question }}", index_persist_dir="/opt/airflow/data/indexes/reports_index", embed_model="text-embedding-3-small", llm_conn_id="llamaindex_default", top_k=5, ) @task def format_context(retrieval_result: dict) -> str: chunks = retrieval_result["chunks"] numbered = [f"[{i + 1}] {chunk['text']}" for i, chunk in enumerate(chunks)] return "\n\n".join(numbered) context = format_context(retrieve.output) synthesize = LLMOperator( task_id="synthesize", prompt=( "Question: {{ params.question }}\n\n" "Relevant excerpts:\n{{ ti.xcom_pull(task_ids='format_context') }}\n\n" "Provide a detailed answer with references to the excerpt numbers." ), llm_conn_id="pydanticai_default", system_prompt=( "You are a research assistant. Answer the question using only the " "provided excerpts. Reference excerpt numbers in square brackets." ), ) context >> synthesize
# [END howto_llamaindex_query_dag] example_llamaindex_query() # --------------------------------------------------------------------------- # 3. Multi-source RAG: combine CSV product data with text documentation # --------------------------------------------------------------------------- # [START howto_llamaindex_multi_source] @dag(schedule=None, tags=["example"])
[docs] def example_llamaindex_multi_source(): """Combine multiple loaders with source-tagging metadata. Shows how ``DocumentLoaderOperator`` handles different file formats and how ``metadata_fields`` tags documents by source for filtered retrieval downstream. """ load_products = DocumentLoaderOperator( task_id="load_products", source_path="/opt/airflow/data/products.csv", metadata_fields={"source": "product_catalog", "department": "engineering"}, ) load_docs = DocumentLoaderOperator( task_id="load_docs", source_path="/opt/airflow/data/documentation/", file_extensions=[".md", ".txt"], metadata_fields={"source": "documentation"}, ) @task def merge_documents(products: list[dict], docs: list[dict]) -> list[dict]: return products + docs merged = merge_documents(load_products.output, load_docs.output) embed_all = LlamaIndexEmbeddingOperator( task_id="embed_all", documents=merged, embed_model="text-embedding-3-small", llm_conn_id="llamaindex_default", persist_dir="/opt/airflow/data/indexes/multi_source_index", ) embed_all
# [END howto_llamaindex_multi_source] example_llamaindex_multi_source()

Was this entry helpful?