LLMBranchOperator

Use LLMBranchOperator for LLM-driven branching — where the LLM decides which downstream task(s) to execute.

The operator discovers downstream tasks automatically from the DAG topology and presents them to the LLM as a constrained enum via pydantic-ai structured output. No text parsing or manual validation is needed.

Basic Usage

Connect the operator to downstream tasks. The LLM chooses which branch to execute based on the prompt:

airflow/providers/common/ai/example_dags/example_llm_branch.py[source]

@dag
def example_llm_branch_operator():
    route = LLMBranchOperator(
        task_id="route_ticket",
        prompt="User says: 'My password reset email never arrived.'",
        llm_conn_id="pydanticai_default",
        system_prompt="Route support tickets to the right team.",
    )

    @task
    def handle_billing():
        return "Handling billing issue"

    @task
    def handle_auth():
        return "Handling auth issue"

    @task
    def handle_general():
        return "Handling general issue"

    route >> [handle_billing(), handle_auth(), handle_general()]


Multiple Branches

Set allow_multiple_branches=True to let the LLM select more than one downstream task. All selected branches run; unselected branches are skipped:

airflow/providers/common/ai/example_dags/example_llm_branch.py[source]

@dag
def example_llm_branch_multi():
    route = LLMBranchOperator(
        task_id="classify",
        prompt="This product is great but shipping was slow and the box was damaged.",
        llm_conn_id="pydanticai_default",
        system_prompt="Select all applicable categories for this customer review.",
        allow_multiple_branches=True,
    )

    @task
    def handle_positive():
        return "Processing positive feedback"

    @task
    def handle_shipping():
        return "Escalating shipping issue"

    @task
    def handle_packaging():
        return "Escalating packaging issue"

    route >> [handle_positive(), handle_shipping(), handle_packaging()]


TaskFlow Decorator

The @task.llm_branch decorator wraps LLMBranchOperator. The function returns the prompt string; all other parameters are passed to the operator:

airflow/providers/common/ai/example_dags/example_llm_branch.py[source]

@dag
def example_llm_branch_decorator():
    @task.llm_branch(
        llm_conn_id="pydanticai_default",
        system_prompt="Route support tickets to the right team.",
    )
    def route_ticket(message: str):
        return f"Route this support ticket: {message}"

    @task
    def handle_billing():
        return "Handling billing issue"

    @task
    def handle_auth():
        return "Handling auth issue"

    @task
    def handle_general():
        return "Handling general issue"

    route_ticket("I was charged twice for my subscription.") >> [
        handle_billing(),
        handle_auth(),
        handle_general(),
    ]


With multiple branches:

airflow/providers/common/ai/example_dags/example_llm_branch.py[source]

@dag
def example_llm_branch_decorator_multi():
    @task.llm_branch(
        llm_conn_id="pydanticai_default",
        system_prompt="Select all applicable categories for this customer review.",
        allow_multiple_branches=True,
    )
    def classify_review(review: str):
        return f"Classify this review: {review}"

    @task
    def handle_positive():
        return "Processing positive feedback"

    @task
    def handle_shipping():
        return "Escalating shipping issue"

    @task
    def handle_packaging():
        return "Escalating packaging issue"

    classify_review("Great product but shipping was slow.") >> [
        handle_positive(),
        handle_shipping(),
        handle_packaging(),
    ]


How It Works

At execution time, the operator:

  1. Reads self.downstream_task_ids from the DAG topology.

  2. Creates a dynamic Enum with one member per downstream task ID.

  3. Passes that enum as output_type to pydantic-ai, constraining the LLM to valid task IDs only.

  4. Converts the LLM’s structured output to task ID string(s) and calls do_branch() to skip non-selected downstream tasks.

Parameters

  • prompt: The prompt to send to the LLM (operator) or the return value of the decorated function (decorator).

  • llm_conn_id: Airflow connection ID for the LLM provider.

  • model_id: Model identifier (e.g. "openai:gpt-5"). Overrides the connection’s extra field.

  • system_prompt: System-level instructions for the agent. Supports Jinja templating.

  • allow_multiple_branches: When False (default) the LLM returns a single task ID. When True the LLM may return one or more task IDs.

  • agent_params: Additional keyword arguments passed to the pydantic-ai Agent constructor (e.g. retries, model_settings). Supports Jinja templating.

Logging

After each LLM call, the operator logs a summary with model name, token usage, and request count at INFO level. See AgentOperator — Logging for details on the log format.

Was this entry helpful?