airflow.providers.common.ai.operators.llamaindex_embedding

Operator for document chunking and embedding via LlamaIndex.

Classes

LlamaIndexEmbeddingOperator

Chunk documents and produce embedding vectors using LlamaIndex.

Module Contents

class airflow.providers.common.ai.operators.llamaindex_embedding.LlamaIndexEmbeddingOperator(*, documents, embed_model=None, llm_conn_id=None, embed_conn_id=None, chunk_size=512, chunk_overlap=50, persist_dir=None, persist_conn_id=None, **kwargs)[source]

Bases: airflow.providers.common.compat.sdk.BaseOperator

Chunk documents and produce embedding vectors using LlamaIndex.

Bridges document loading (e.g. DocumentLoaderOperator output) and vector storage (pgvector, Pinecone, Weaviate, …). Input is list[dict] with text and metadata keys; output includes the embedding vectors ready for downstream storage ingest.

The operator passes the embedding model directly to VectorStoreIndex(..., embed_model=...) – it does not mutate LlamaIndex’s global Settings singleton, so concurrent tasks in the same worker don’t race on shared state.

Parameters:
  • documents (list[dict[str, Any]]) – List of dicts with text and metadata keys, typically from DocumentLoaderOperator or a @task. Templated, so binding via my_loader.output (XCom direct) resolves to the native list[dict] before execute runs.

  • embed_model (str | llama_index.core.base.embeddings.base.BaseEmbedding | None) –

    Either:

    • a string model name (e.g. "text-embedding-3-small") – the operator constructs an LlamaIndexHook-backed OpenAIEmbedding from llm_conn_id / embed_conn_id, or

    • a pre-built BaseEmbedding instance – bypass the hook entirely for non-OpenAI vendors (e.g. CohereEmbedding(...), BedrockEmbedding(...)).

    Templated, so it works with both literal strings and @task output that builds a custom embedder.

  • llm_conn_id (str | None) – Airflow connection ID for the embedding API. Falls back to LlamaIndexHook.default_conn_name when None.

  • embed_conn_id (str | None) – Optional separate Airflow connection ID for the embedding provider. Falls back to llm_conn_id when None.

  • chunk_size (int) – Chunk size for the sentence splitter.

  • chunk_overlap (int) – Overlap between chunks.

  • persist_dir (str | None) – Optional path to persist the index. Accepts local paths and storage URIs (s3://, gs://, …) resolved via ObjectStoragePath.

  • persist_conn_id (str | None) – Airflow connection ID for cloud-storage credentials when persist_dir is a URI.

template_fields: collections.abc.Sequence[str] = ('documents', 'embed_model', 'llm_conn_id', 'embed_conn_id', 'persist_dir', 'persist_conn_id')[source]
documents[source]
embed_model = None[source]
llm_conn_id = None[source]
embed_conn_id = None[source]
chunk_size = 512[source]
chunk_overlap = 50[source]
persist_dir = None[source]
persist_conn_id = None[source]
execute(context)[source]

Derive when creating an operator.

The main method to execute the task. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

Was this entry helpful?