LLMBranchOperator¶
Use LLMBranchOperator
for LLM-driven branching — where the LLM decides which downstream task(s) to
execute.
The operator discovers downstream tasks automatically from the DAG topology and presents them to the LLM as a constrained enum via pydantic-ai structured output. No text parsing or manual validation is needed.
See also
Basic Usage¶
Connect the operator to downstream tasks. The LLM chooses which branch to execute based on the prompt:
@dag
def example_llm_branch_operator():
route = LLMBranchOperator(
task_id="route_ticket",
prompt="User says: 'My password reset email never arrived.'",
llm_conn_id="pydanticai_default",
system_prompt="Route support tickets to the right team.",
)
@task
def handle_billing():
return "Handling billing issue"
@task
def handle_auth():
return "Handling auth issue"
@task
def handle_general():
return "Handling general issue"
route >> [handle_billing(), handle_auth(), handle_general()]
Multiple Branches¶
Set allow_multiple_branches=True to let the LLM select more than one
downstream task. All selected branches run; unselected branches are skipped:
@dag
def example_llm_branch_multi():
route = LLMBranchOperator(
task_id="classify",
prompt="This product is great but shipping was slow and the box was damaged.",
llm_conn_id="pydanticai_default",
system_prompt="Select all applicable categories for this customer review.",
allow_multiple_branches=True,
)
@task
def handle_positive():
return "Processing positive feedback"
@task
def handle_shipping():
return "Escalating shipping issue"
@task
def handle_packaging():
return "Escalating packaging issue"
route >> [handle_positive(), handle_shipping(), handle_packaging()]
TaskFlow Decorator¶
The @task.llm_branch decorator wraps LLMBranchOperator. The function
returns the prompt string; all other parameters are passed to the operator:
@dag
def example_llm_branch_decorator():
@task.llm_branch(
llm_conn_id="pydanticai_default",
system_prompt="Route support tickets to the right team.",
)
def route_ticket(message: str):
return f"Route this support ticket: {message}"
@task
def handle_billing():
return "Handling billing issue"
@task
def handle_auth():
return "Handling auth issue"
@task
def handle_general():
return "Handling general issue"
route_ticket("I was charged twice for my subscription.") >> [
handle_billing(),
handle_auth(),
handle_general(),
]
With multiple branches:
@dag
def example_llm_branch_decorator_multi():
@task.llm_branch(
llm_conn_id="pydanticai_default",
system_prompt="Select all applicable categories for this customer review.",
allow_multiple_branches=True,
)
def classify_review(review: str):
return f"Classify this review: {review}"
@task
def handle_positive():
return "Processing positive feedback"
@task
def handle_shipping():
return "Escalating shipping issue"
@task
def handle_packaging():
return "Escalating packaging issue"
classify_review("Great product but shipping was slow.") >> [
handle_positive(),
handle_shipping(),
handle_packaging(),
]
How It Works¶
At execution time, the operator:
Reads
self.downstream_task_idsfrom the DAG topology.Creates a dynamic
Enumwith one member per downstream task ID.Passes that enum as
output_typetopydantic-ai, constraining the LLM to valid task IDs only.Converts the LLM’s structured output to task ID string(s) and calls
do_branch()to skip non-selected downstream tasks.
Parameters¶
prompt: The prompt to send to the LLM (operator) or the return value of the decorated function (decorator).llm_conn_id: Airflow connection ID for the LLM provider.model_id: Model identifier (e.g."openai:gpt-5"). Overrides the connection’s extra field.system_prompt: System-level instructions for the agent. Supports Jinja templating.allow_multiple_branches: WhenFalse(default) the LLM returns a single task ID. WhenTruethe LLM may return one or more task IDs.agent_params: Additional keyword arguments passed to the pydantic-aiAgentconstructor (e.g.retries,model_settings). Supports Jinja templating.
Logging¶
After each LLM call, the operator logs a summary with model name, token usage, and request count at INFO level. See AgentOperator — Logging for details on the log format.