Source code for airflow.providers.common.ai.example_dags.example_llamaindex_hook
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAGs demonstrating LlamaIndexHook + LlamaIndex operator usage.
Each DAG covers a single pattern. The docs reference these via
``.. exampleinclude::`` so the runnable snippets stay in sync.
"""
from __future__ import annotations
from airflow.providers.common.ai.operators.document_loader import DocumentLoaderOperator
from airflow.providers.common.ai.operators.llamaindex_embedding import LlamaIndexEmbeddingOperator
from airflow.providers.common.ai.operators.llamaindex_retrieval import LlamaIndexRetrievalOperator
from airflow.providers.common.compat.sdk import dag, task
# [START howto_hook_llamaindex_embed]
@dag(schedule=None, tags=["example"])
[docs]
def example_llamaindex_embed():
"""Chunk + embed a directory of documents and persist the index locally."""
load = DocumentLoaderOperator(
task_id="load",
source_path="/opt/airflow/data/library/**/*",
file_extensions=[".pdf", ".md", ".txt"],
)
embed = LlamaIndexEmbeddingOperator(
task_id="embed",
documents=load.output, # XCom direct -- never via Jinja (list[dict])
embed_model="text-embedding-3-small",
llm_conn_id="llamaindex_default",
chunk_size=512,
chunk_overlap=50,
persist_dir="/opt/airflow/data/library_index",
)
load >> embed
# [END howto_hook_llamaindex_embed]
example_llamaindex_embed()
# [START howto_hook_llamaindex_retrieve]
@dag(schedule=None, tags=["example"])
[docs]
def example_llamaindex_retrieve():
"""Load a persisted index and run similarity search."""
retrieve = LlamaIndexRetrievalOperator(
task_id="retrieve",
query="{{ params.query }}",
index_persist_dir="/opt/airflow/data/library_index",
embed_model="text-embedding-3-small",
llm_conn_id="llamaindex_default",
top_k=5,
)
retrieve
# [END howto_hook_llamaindex_retrieve]
example_llamaindex_retrieve()
# [START howto_hook_llamaindex_cloud_persist]
@dag(schedule=None, tags=["example"])
[docs]
def example_llamaindex_cloud_persist():
"""Persist the index directly to S3 -- no separate upload step."""
load = DocumentLoaderOperator(
task_id="load",
source_path="s3://my-bucket/library/",
source_conn_id="aws_default",
file_extensions=[".pdf"],
)
embed = LlamaIndexEmbeddingOperator(
task_id="embed",
documents=load.output,
embed_model="text-embedding-3-small",
llm_conn_id="llamaindex_default",
persist_dir="s3://my-bucket/indexes/library/",
persist_conn_id="aws_default",
)
load >> embed
# [END howto_hook_llamaindex_cloud_persist]
example_llamaindex_cloud_persist()
# [START howto_hook_llamaindex_byo_embed_model]
@dag(schedule=None, tags=["example"])
[docs]
def example_llamaindex_byo_embed_model():
"""Use a non-OpenAI embedding by instantiating the LlamaIndex class directly.
LlamaIndex doesn't ship a universal init helper, so the operator accepts
a pre-built ``BaseEmbedding`` instance and bypasses the hook entirely.
Install the matching extra:
``pip install llama-index-embeddings-cohere``.
"""
@task
def build_cohere_embedder():
from llama_index.embeddings.cohere import CohereEmbedding
from airflow.providers.common.compat.sdk import BaseHook
conn = BaseHook.get_connection("cohere_default")
return CohereEmbedding(model_name="embed-english-v3.0", cohere_api_key=conn.password)
@task
def empty_doc_list() -> list[dict]:
return [{"text": "Cohere demo content", "metadata": {}}]
embed = LlamaIndexEmbeddingOperator(
task_id="embed",
documents=empty_doc_list(),
embed_model=build_cohere_embedder(),
persist_dir="/opt/airflow/data/cohere_index",
)
embed
# [END howto_hook_llamaindex_byo_embed_model]
example_llamaindex_byo_embed_model()