LLMOperator

Use LLMOperator for general-purpose LLM calls — summarization, extraction, classification, structured output, or any prompt-based task.

The operator sends a prompt to an LLM via PydanticAIHook and returns the output as XCom.

Basic Usage

Provide a prompt and the operator returns the LLM’s response as a string:

airflow/providers/common/ai/example_dags/example_llm.py[source]

@dag
def example_llm_operator():
    LLMOperator(
        task_id="summarize",
        prompt="Summarize the key findings from the Q4 earnings report.",
        llm_conn_id="pydanticai_default",
        system_prompt="You are a financial analyst. Be concise.",
    )


Structured Output

Set output_type to a Pydantic BaseModel subclass. The LLM is instructed to return structured data, and the result is serialized via model_dump() for XCom:

airflow/providers/common/ai/example_dags/example_llm.py[source]

@dag
def example_llm_operator_structured():
    class Entities(BaseModel):
        names: list[str]
        locations: list[str]

    LLMOperator(
        task_id="extract_entities",
        prompt="Extract all named entities from the article.",
        llm_conn_id="pydanticai_default",
        system_prompt="Extract named entities.",
        output_type=Entities,
    )


Agent Parameters

Pass additional keyword arguments to the pydantic-ai Agent constructor via agent_params — for example, retries, model_settings, or tools. See the pydantic-ai Agent docs for the full list of supported parameters.

airflow/providers/common/ai/example_dags/example_llm.py[source]

@dag
def example_llm_operator_agent_params():
    LLMOperator(
        task_id="creative_writing",
        prompt="Write a haiku about data pipelines.",
        llm_conn_id="pydanticai_default",
        system_prompt="You are a creative writer.",
        agent_params={"model_settings": {"temperature": 0.9}, "retries": 3},
    )


TaskFlow Decorator

The @task.llm decorator wraps LLMOperator. The function returns the prompt string; all other parameters are passed to the operator:

airflow/providers/common/ai/example_dags/example_llm.py[source]

@dag
def example_llm_decorator():
    @task.llm(llm_conn_id="pydanticai_default", system_prompt="Summarize concisely.")
    def summarize(text: str):
        return f"Summarize this article: {text}"

    summarize("Apache Airflow is a platform for programmatically authoring...")


With structured output:

airflow/providers/common/ai/example_dags/example_llm.py[source]

@dag
def example_llm_decorator_structured():
    class Entities(BaseModel):
        names: list[str]
        locations: list[str]

    @task.llm(
        llm_conn_id="pydanticai_default",
        system_prompt="Extract named entities.",
        output_type=Entities,
    )
    def extract(text: str):
        return f"Extract entities from: {text}"

    extract("Alice visited Paris and met Bob in London.")


Classification with Literal

Set output_type to a Literal to constrain the LLM to a fixed set of labels — useful for classification tasks:

airflow/providers/common/ai/example_dags/example_llm_classification.py[source]

@dag
def example_llm_classification():
    @task.llm(
        llm_conn_id="pydanticai_default",
        system_prompt=(
            "Classify the severity of the given pipeline incident. "
            "Use 'critical' for data loss or complete pipeline failure, "
            "'high' for significant delays or partial failures, "
            "'medium' for degraded performance, "
            "'low' for cosmetic issues or minor warnings."
        ),
        output_type=Literal["critical", "high", "medium", "low"],
    )
    def classify_incident(description: str):
        # Pre-process the description before sending to the LLM
        return f"Classify this incident:\n{description.strip()}"

    classify_incident(
        "Scheduler heartbeat lost for 15 minutes. "
        "Multiple DAG runs stuck in queued state. "
        "No new tasks being scheduled across all DAGs."
    )


Multi-task pipeline with dynamic mapping

Combine @task.llm with upstream and downstream tasks. Use .expand() to process a list of items in parallel:

airflow/providers/common/ai/example_dags/example_llm_analysis_pipeline.py[source]

@dag
def example_llm_analysis_pipeline():
    class TicketAnalysis(BaseModel):
        priority: str
        category: str
        summary: str
        suggested_action: str

    @task
    def get_support_tickets():
        """Fetch unprocessed support tickets."""
        return [
            (
                "Our nightly ETL pipeline has been failing for the past 3 days. "
                "The error shows a connection timeout to the Postgres source database. "
                "This is blocking our daily financial reports."
            ),
            (
                "We'd like to add a new connection type for our internal ML model registry. "
                "Is there documentation on creating custom hooks?"
            ),
            (
                "After upgrading to the latest version, the Grid view takes over "
                "30 seconds to load for DAGs with more than 500 tasks. "
                "Previously it loaded in under 5 seconds."
            ),
        ]

    @task.llm(
        llm_conn_id="pydanticai_default",
        system_prompt=(
            "Analyze the support ticket and extract: "
            "priority (critical/high/medium/low), "
            "category (bug/feature_request/question/performance), "
            "a one-sentence summary, and a suggested next action."
        ),
        output_type=TicketAnalysis,
    )
    def analyze_ticket(ticket: str):
        return f"Analyze this support ticket:\n\n{ticket}"

    @task
    def store_results(analyses: list[dict]):
        """Store ticket analyses. In production, this would write to a database or ticketing system."""
        for analysis in analyses:
            print(f"[{analysis['priority'].upper()}] {analysis['category']}: {analysis['summary']}")

    tickets = get_support_tickets()
    analyses = analyze_ticket.expand(ticket=tickets)
    store_results(analyses)


Human-in-the-Loop Approval

Set require_approval=True to pause the task after the LLM generates its output and wait for a human reviewer to approve or reject it via the Airflow HITL interface. Optionally allow the reviewer to edit the output before approving with allow_modifications=True, and set a deadline with approval_timeout:

airflow/providers/common/ai/example_dags/example_llm.py[source]

@dag
def example_llm_operator_approval():

    LLMOperator(
        task_id="summarize_with_approval",
        prompt="Summarize the quarterly financial report for stakeholders.",
        llm_conn_id="pydanticai_default",
        system_prompt="You are a financial analyst. Be concise and accurate.",
        require_approval=True,
        approval_timeout=timedelta(hours=24),
        allow_modifications=True,
    )


Parameters

  • prompt: The prompt to send to the LLM (operator) or the return value of the decorated function (decorator).

  • llm_conn_id: Airflow connection ID for the LLM provider.

  • model_id: Model identifier (e.g. "openai:gpt-5"). Overrides the connection’s extra field.

  • system_prompt: System-level instructions for the agent. Supports Jinja templating.

  • output_type: Expected output type (default: str). Set to a Pydantic BaseModel for structured output.

  • agent_params: Additional keyword arguments passed to the pydantic-ai Agent constructor (e.g. retries, model_settings, tools). Supports Jinja templating.

  • require_approval: If True, the task defers after generating output and waits for human review. Default False.

  • approval_timeout: Maximum time to wait for a review (timedelta). None means wait indefinitely. Default None.

  • allow_modifications: If True, the reviewer can edit the output before approving. Default False.

Logging

After each LLM call, the operator logs a summary with model name, token usage, and request count at INFO level. At DEBUG level, the LLM output is also logged (truncated to 500 characters). See AgentOperator — Logging for details on the log format.

Was this entry helpful?