LlamaIndex LlamaIndexEmbeddingOperator

Chunk a list[dict] of documents and produce embedding vectors using LlamaIndex. Designed to feed the output of DocumentLoaderOperator into vector storage (pgvector, Pinecone, Weaviate, …).

The operator passes the embedding model directly to VectorStoreIndex(..., embed_model=...) – it does not mutate LlamaIndex’s global Settings singleton, so concurrent tasks in the same worker process don’t race on shared model state.

Basic usage

airflow/providers/common/ai/example_dags/example_llamaindex_hook.py[source]

@dag(schedule=None, tags=["example"])
def example_llamaindex_embed():
    """Chunk + embed a directory of documents and persist the index locally."""

    load = DocumentLoaderOperator(
        task_id="load",
        source_path="/opt/airflow/data/library/**/*",
        file_extensions=[".pdf", ".md", ".txt"],
    )

    embed = LlamaIndexEmbeddingOperator(
        task_id="embed",
        documents=load.output,  # XCom direct -- never via Jinja (list[dict])
        embed_model="text-embedding-3-small",
        llm_conn_id="llamaindex_default",
        chunk_size=512,
        chunk_overlap=50,
        persist_dir="/opt/airflow/data/library_index",
    )

    load >> embed


documents is templated, so loader.output (XCom direct) is resolved to a native list[dict] before execute runs.

Bring-your-own embedding model

LlamaIndex doesn’t ship a universal embedding-model initializer, so the operator’s embed_model parameter accepts either:

  • a string model name (e.g. "text-embedding-3-small") – the operator constructs an OpenAIEmbedding via LlamaIndexHook using llm_conn_id / embed_conn_id, or

  • a pre-built BaseEmbedding instance – bypass the hook entirely. Use this for Cohere, Bedrock, Vertex, HuggingFace, etc.:

airflow/providers/common/ai/example_dags/example_llamaindex_hook.py[source]

@dag(schedule=None, tags=["example"])
def example_llamaindex_byo_embed_model():
    """Use a non-OpenAI embedding by instantiating the LlamaIndex class directly.

    LlamaIndex doesn't ship a universal init helper, so the operator accepts
    a pre-built ``BaseEmbedding`` instance and bypasses the hook entirely.
    Install the matching extra:
    ``pip install llama-index-embeddings-cohere``.
    """

    @task
    def build_cohere_embedder():
        from llama_index.embeddings.cohere import CohereEmbedding

        from airflow.providers.common.compat.sdk import BaseHook

        conn = BaseHook.get_connection("cohere_default")
        return CohereEmbedding(model_name="embed-english-v3.0", cohere_api_key=conn.password)

    @task
    def empty_doc_list() -> list[dict]:
        return [{"text": "Cohere demo content", "metadata": {}}]

    embed = LlamaIndexEmbeddingOperator(
        task_id="embed",
        documents=empty_doc_list(),
        embed_model=build_cohere_embedder(),
        persist_dir="/opt/airflow/data/cohere_index",
    )

    embed


Persisting to cloud storage

persist_dir accepts local paths and storage URIs (s3://, gs://, azure://, file://) resolved via ObjectStoragePath. Pass persist_conn_id to point at the Airflow connection that holds the cloud credentials:

airflow/providers/common/ai/example_dags/example_llamaindex_hook.py[source]

@dag(schedule=None, tags=["example"])
def example_llamaindex_cloud_persist():
    """Persist the index directly to S3 -- no separate upload step."""

    load = DocumentLoaderOperator(
        task_id="load",
        source_path="s3://my-bucket/library/",
        source_conn_id="aws_default",
        file_extensions=[".pdf"],
    )

    embed = LlamaIndexEmbeddingOperator(
        task_id="embed",
        documents=load.output,
        embed_model="text-embedding-3-small",
        llm_conn_id="llamaindex_default",
        persist_dir="s3://my-bucket/indexes/library/",
        persist_conn_id="aws_default",
    )

    load >> embed


Parameters

Parameter

Description

documents

list[dict] with text / metadata keys. Templated, so binding loader.output resolves to the native list before execute.

embed_model

String model name OR pre-built BaseEmbedding instance.

llm_conn_id

Airflow connection ID used when embed_model is a string. Falls back to LlamaIndexHook.default_conn_name (llamaindex_default) when None.

embed_conn_id

Optional separate connection ID for the embedding provider. Falls back to llm_conn_id when None.

chunk_size

Sentence-splitter chunk size (default 512).

chunk_overlap

Overlap between chunks (default 50).

persist_dir

Local path or storage URI to persist the LlamaIndex index.

persist_conn_id

Cloud credentials connection ID for persist_dir URIs.

Output

Returns a dict with:

{
    "document_count": int,
    "chunk_count": int,
    "persist_dir": str | None,
    "chunks": [
        {"text": str, "metadata": dict, "vector": list[float]},
        ...
    ],
}

Was this entry helpful?