LLMOperator

Use LLMOperator for general-purpose LLM calls — summarization, extraction, classification, structured output, or any prompt-based task.

The operator sends a prompt to an LLM via PydanticAIHook and returns the output as XCom.

Basic Usage

Provide a prompt and the operator returns the LLM’s response as a string:

airflow/providers/common/ai/example_dags/example_llm.py[source]

@dag(tags=["example"])
def example_llm_operator():
    LLMOperator(
        task_id="summarize",
        prompt="Summarize the key findings from the Q4 earnings report.",
        llm_conn_id="pydanticai_default",
        system_prompt="You are a financial analyst. Be concise.",
    )


Structured Output

Set output_type to a Pydantic BaseModel subclass. The LLM is instructed to return structured data, and the model instance is pushed to XCom unchanged so downstream tasks can type-hint the class directly (def downstream(result: MyModel)) and use attribute access (result.field).

The declared output_type (and any BaseModel reachable from Union/Optional/list shapes) is registered for XCom deserialization by the worker when it loads the DAG, before any task runs – so no edit to [core] allowed_deserialization_classes is needed. The Pydantic class must be defined at module scope and bound to an attribute matching its __name__; classes nested inside a function or @dag-decorated body, parameterized generics, and dynamically-built classes whose __name__ does not match the attribute they are bound to cannot be re-imported, so they are skipped with a warning at worker startup and the value fails to deserialize at the consumer.

airflow/providers/common/ai/example_dags/example_llm.py[source]

# Pydantic output classes must be defined at module scope so they survive
# XCom serialization (their qualname is used to re-import them downstream).
class Entities(BaseModel):
    """Named entities extracted from a text."""

    names: list[str]
    locations: list[str]


airflow/providers/common/ai/example_dags/example_llm.py[source]

@dag(tags=["example"])
def example_llm_operator_structured():
    LLMOperator(
        task_id="extract_entities",
        prompt="Extract all named entities from the article.",
        llm_conn_id="pydanticai_default",
        system_prompt="Extract named entities.",
        output_type=Entities,
    )


Registration covers downstream tasks in the same DAG: every worker walks the loaded DAG’s tasks at startup and registers each declared class, so it also works for mapped producers (.expand(...)) and for workers that load DAGs from a cache that bypasses operator construction.

The Airflow UI’s XCom viewer renders Pydantic instances via the stringify path, which produces a representation like my_module.MyModel@version=1(field=value,...) without consulting the allow-list. It is not pretty (no field-by-field rendering today), but the value shows up; no configuration is required.

The remaining gap is cross-DAG xcom_pull – a task in a different DAG that pulls this XCom only parses its own DAG file, not the producer’s, so the class is not auto-registered. Add the class qualified name to [core] allowed_deserialization_classes (or a glob that matches it) to make that pattern work.

If a downstream consumer needs the dict shape (e.g. forwarding to an external system that expects JSON-style payloads), pass serialize_output=True and the operator calls model_dump() before pushing to XCom. The pre-PR behavior is available on demand without giving up the typed default.

Agent Parameters

Pass additional keyword arguments to the pydantic-ai Agent constructor via agent_params — for example, retries, model_settings, or tools. See the pydantic-ai Agent docs for the full list of supported parameters.

airflow/providers/common/ai/example_dags/example_llm.py[source]

@dag(tags=["example"])
def example_llm_operator_agent_params():
    LLMOperator(
        task_id="creative_writing",
        prompt="Write a haiku about data pipelines.",
        llm_conn_id="pydanticai_default",
        system_prompt="You are a creative writer.",
        agent_params={"model_settings": {"temperature": 0.9}, "retries": 3},
    )


Usage Limits

Set usage_limits to a pydantic-ai UsageLimits to fail the task when the run exceeds a configured budget — request count, input/output tokens, or tool calls. The check happens inside pydantic-ai’s run loop, so the limit applies even when retries triggers multiple model calls within a single task.

airflow/providers/common/ai/example_dags/example_llm.py[source]

@dag(tags=["example"])
def example_llm_operator_usage_limits():
    LLMOperator(
        task_id="capped_summary",
        prompt="Summarize the attached design doc in three bullet points.",
        llm_conn_id="pydanticai_default",
        system_prompt="You are a concise technical reviewer.",
        # Fail the task if the run exceeds 5 model requests, 4_000 input
        # tokens, or 1_000 output tokens.  Useful for guardrails on shared
        # connections or untrusted prompts.
        usage_limits=UsageLimits(
            request_limit=5,
            input_tokens_limit=4_000,
            output_tokens_limit=1_000,
        ),
    )


Common knobs on UsageLimits:

  • request_limit — max model requests per run (caps retry/tool-loop blow-ups). pydantic-ai applies a default of 50 when UsageLimits() is constructed without an explicit value, so passing UsageLimits(input_tokens_limit=4_000) silently inherits that 50-request cap. Set request_limit=None to disable it explicitly when you only want a token cap.

  • input_tokens_limit / output_tokens_limit — per-run token caps.

  • total_tokens_limit — combined input + output cap.

  • tool_calls_limit — max tool invocations (AgentOperator only).

When the limit is hit pydantic-ai raises UsageLimitExceeded, which propagates to Airflow as a task failure — Airflow’s standard retry policy applies on top.

TaskFlow Decorator

The @task.llm decorator wraps LLMOperator. The function returns the prompt string; all other parameters are passed to the operator:

airflow/providers/common/ai/example_dags/example_llm.py[source]

@dag(tags=["example"])
def example_llm_decorator():
    @task.llm(llm_conn_id="pydanticai_default", system_prompt="Summarize concisely.")
    def summarize(text: str):
        return f"Summarize this article: {text}"

    summarize("Apache Airflow is a platform for programmatically authoring...")


With structured output:

airflow/providers/common/ai/example_dags/example_llm.py[source]

@dag(tags=["example"])
def example_llm_decorator_structured():
    @task.llm(
        llm_conn_id="pydanticai_default",
        system_prompt="Extract named entities.",
        output_type=Entities,
    )
    def extract(text: str):
        return f"Extract entities from: {text}"

    extract("Alice visited Paris and met Bob in London.")


Multimodal prompts

@task.llm accepts the same prompt shape as @task.agent – the callable may return either a str or a non-empty Sequence[UserContent] (e.g., ["Describe this:", ImageUrl(url="...")]) for vision, audio, or document inputs. See @task.agent multimodal prompts for the full example. require_approval=True is not currently supported with a Sequence prompt – the approval session model expects a string – and will raise at the approval boundary; widening that path is tracked as a follow-up.

Classification with Literal

Set output_type to a Literal to constrain the LLM to a fixed set of labels — useful for classification tasks:

airflow/providers/common/ai/example_dags/example_llm_classification.py[source]

@dag(tags=["example"])
def example_llm_classification():
    @task.llm(
        llm_conn_id="pydanticai_default",
        system_prompt=(
            "Classify the severity of the given pipeline incident. "
            "Use 'critical' for data loss or complete pipeline failure, "
            "'high' for significant delays or partial failures, "
            "'medium' for degraded performance, "
            "'low' for cosmetic issues or minor warnings."
        ),
        output_type=Literal["critical", "high", "medium", "low"],
    )
    def classify_incident(description: str):
        # Pre-process the description before sending to the LLM
        return f"Classify this incident:\n{description.strip()}"

    classify_incident(
        "Scheduler heartbeat lost for 15 minutes. "
        "Multiple DAG runs stuck in queued state. "
        "No new tasks being scheduled across all DAGs."
    )


Multi-task pipeline with dynamic mapping

Combine @task.llm with upstream and downstream tasks. Use .expand() to process a list of items in parallel:

airflow/providers/common/ai/example_dags/example_llm_analysis_pipeline.py[source]

@dag(tags=["example"])
def example_llm_analysis_pipeline():
    @task
    def get_support_tickets():
        """Fetch unprocessed support tickets."""
        return [
            (
                "Our nightly ETL pipeline has been failing for the past 3 days. "
                "The error shows a connection timeout to the Postgres source database. "
                "This is blocking our daily financial reports."
            ),
            (
                "We'd like to add a new connection type for our internal ML model registry. "
                "Is there documentation on creating custom hooks?"
            ),
            (
                "After upgrading to the latest version, the Grid view takes over "
                "30 seconds to load for DAGs with more than 500 tasks. "
                "Previously it loaded in under 5 seconds."
            ),
        ]

    @task.llm(
        llm_conn_id="pydanticai_default",
        system_prompt=(
            "Analyze the support ticket and extract: "
            "priority (critical/high/medium/low), "
            "category (bug/feature_request/question/performance), "
            "a one-sentence summary, and a suggested next action."
        ),
        output_type=TicketAnalysis,
    )
    def analyze_ticket(ticket: str):
        return f"Analyze this support ticket:\n\n{ticket}"

    @task
    def store_results(analyses: list[TicketAnalysis]):
        """Store ticket analyses. In production, this would write to a database or ticketing system."""
        for analysis in analyses:
            print(f"[{analysis.priority.upper()}] {analysis.category}: {analysis.summary}")

    tickets = get_support_tickets()
    analyses = analyze_ticket.expand(ticket=tickets)
    store_results(analyses)


Human-in-the-Loop Approval

Set require_approval=True to pause the task after the LLM generates its output and wait for a human reviewer to approve or reject it via the Airflow HITL interface. Optionally allow the reviewer to edit the output before approving with allow_modifications=True, and set a deadline with approval_timeout:

airflow/providers/common/ai/example_dags/example_llm.py[source]

@dag(tags=["example"])
def example_llm_operator_approval():

    LLMOperator(
        task_id="summarize_with_approval",
        prompt="Summarize the quarterly financial report for stakeholders.",
        llm_conn_id="pydanticai_default",
        system_prompt="You are a financial analyst. Be concise and accurate.",
        require_approval=True,
        approval_timeout=timedelta(hours=24),
        allow_modifications=True,
    )


Parameters

  • prompt: The prompt to send to the LLM (operator) or the return value of the decorated function (decorator).

  • llm_conn_id: Airflow connection ID for the LLM provider.

  • model_id: Model identifier (e.g. "openai:gpt-5"). Overrides the connection’s extra field.

  • system_prompt: System-level instructions for the agent. Supports Jinja templating.

  • output_type: Expected output type (default: str). Set to a Pydantic BaseModel for structured output.

  • agent_params: Additional keyword arguments passed to the pydantic-ai Agent constructor (e.g. retries, model_settings, tools). Supports Jinja templating.

  • usage_limits: Optional pydantic-ai UsageLimits enforced on the run. Fails the task when token / request / tool-call budgets are exceeded. Default None.

  • require_approval: If True, the task defers after generating output and waits for human review. Default False.

  • approval_timeout: Maximum time to wait for a review (timedelta). None means wait indefinitely. Default None.

  • allow_modifications: If True, the reviewer can edit the output before approving. Default False.

Logging

After each LLM call, the operator logs a summary with model name, token usage, and request count at INFO level. At DEBUG level, the LLM output is also logged (truncated to 500 characters). See AgentOperator — Logging for details on the log format.

Was this entry helpful?