LLMOperator¶
Use LLMOperator for
general-purpose LLM calls — summarization, extraction, classification,
structured output, or any prompt-based task.
The operator sends a prompt to an LLM via
PydanticAIHook and
returns the output as XCom.
See also
Basic Usage¶
Provide a prompt and the operator returns the LLM’s response as a string:
@dag
def example_llm_operator():
LLMOperator(
task_id="summarize",
prompt="Summarize the key findings from the Q4 earnings report.",
llm_conn_id="pydanticai_default",
system_prompt="You are a financial analyst. Be concise.",
)
Structured Output¶
Set output_type to a Pydantic BaseModel subclass. The LLM is instructed
to return structured data, and the result is serialized via model_dump()
for XCom:
@dag
def example_llm_operator_structured():
class Entities(BaseModel):
names: list[str]
locations: list[str]
LLMOperator(
task_id="extract_entities",
prompt="Extract all named entities from the article.",
llm_conn_id="pydanticai_default",
system_prompt="Extract named entities.",
output_type=Entities,
)
Agent Parameters¶
Pass additional keyword arguments to the pydantic-ai Agent constructor
via agent_params — for example, retries, model_settings, or tools.
See the pydantic-ai Agent docs for
the full list of supported parameters.
@dag
def example_llm_operator_agent_params():
LLMOperator(
task_id="creative_writing",
prompt="Write a haiku about data pipelines.",
llm_conn_id="pydanticai_default",
system_prompt="You are a creative writer.",
agent_params={"model_settings": {"temperature": 0.9}, "retries": 3},
)
TaskFlow Decorator¶
The @task.llm decorator wraps LLMOperator. The function returns the
prompt string; all other parameters are passed to the operator:
@dag
def example_llm_decorator():
@task.llm(llm_conn_id="pydanticai_default", system_prompt="Summarize concisely.")
def summarize(text: str):
return f"Summarize this article: {text}"
summarize("Apache Airflow is a platform for programmatically authoring...")
With structured output:
@dag
def example_llm_decorator_structured():
class Entities(BaseModel):
names: list[str]
locations: list[str]
@task.llm(
llm_conn_id="pydanticai_default",
system_prompt="Extract named entities.",
output_type=Entities,
)
def extract(text: str):
return f"Extract entities from: {text}"
extract("Alice visited Paris and met Bob in London.")
Classification with Literal¶
Set output_type to a Literal to constrain the LLM to a fixed set of
labels — useful for classification tasks:
@dag
def example_llm_classification():
@task.llm(
llm_conn_id="pydanticai_default",
system_prompt=(
"Classify the severity of the given pipeline incident. "
"Use 'critical' for data loss or complete pipeline failure, "
"'high' for significant delays or partial failures, "
"'medium' for degraded performance, "
"'low' for cosmetic issues or minor warnings."
),
output_type=Literal["critical", "high", "medium", "low"],
)
def classify_incident(description: str):
# Pre-process the description before sending to the LLM
return f"Classify this incident:\n{description.strip()}"
classify_incident(
"Scheduler heartbeat lost for 15 minutes. "
"Multiple DAG runs stuck in queued state. "
"No new tasks being scheduled across all DAGs."
)
Multi-task pipeline with dynamic mapping¶
Combine @task.llm with upstream and downstream tasks. Use .expand()
to process a list of items in parallel:
@dag
def example_llm_analysis_pipeline():
class TicketAnalysis(BaseModel):
priority: str
category: str
summary: str
suggested_action: str
@task
def get_support_tickets():
"""Fetch unprocessed support tickets."""
return [
(
"Our nightly ETL pipeline has been failing for the past 3 days. "
"The error shows a connection timeout to the Postgres source database. "
"This is blocking our daily financial reports."
),
(
"We'd like to add a new connection type for our internal ML model registry. "
"Is there documentation on creating custom hooks?"
),
(
"After upgrading to the latest version, the Grid view takes over "
"30 seconds to load for DAGs with more than 500 tasks. "
"Previously it loaded in under 5 seconds."
),
]
@task.llm(
llm_conn_id="pydanticai_default",
system_prompt=(
"Analyze the support ticket and extract: "
"priority (critical/high/medium/low), "
"category (bug/feature_request/question/performance), "
"a one-sentence summary, and a suggested next action."
),
output_type=TicketAnalysis,
)
def analyze_ticket(ticket: str):
return f"Analyze this support ticket:\n\n{ticket}"
@task
def store_results(analyses: list[dict]):
"""Store ticket analyses. In production, this would write to a database or ticketing system."""
for analysis in analyses:
print(f"[{analysis['priority'].upper()}] {analysis['category']}: {analysis['summary']}")
tickets = get_support_tickets()
analyses = analyze_ticket.expand(ticket=tickets)
store_results(analyses)
Human-in-the-Loop Approval¶
Set require_approval=True to pause the task after the LLM generates its
output and wait for a human reviewer to approve or reject it via the Airflow
HITL interface. Optionally allow the reviewer to edit the output before
approving with allow_modifications=True, and set a deadline with
approval_timeout:
@dag
def example_llm_operator_approval():
LLMOperator(
task_id="summarize_with_approval",
prompt="Summarize the quarterly financial report for stakeholders.",
llm_conn_id="pydanticai_default",
system_prompt="You are a financial analyst. Be concise and accurate.",
require_approval=True,
approval_timeout=timedelta(hours=24),
allow_modifications=True,
)
Parameters¶
prompt: The prompt to send to the LLM (operator) or the return value of the decorated function (decorator).llm_conn_id: Airflow connection ID for the LLM provider.model_id: Model identifier (e.g."openai:gpt-5"). Overrides the connection’s extra field.system_prompt: System-level instructions for the agent. Supports Jinja templating.output_type: Expected output type (default:str). Set to a PydanticBaseModelfor structured output.agent_params: Additional keyword arguments passed to the pydantic-aiAgentconstructor (e.g.retries,model_settings,tools). Supports Jinja templating.require_approval: IfTrue, the task defers after generating output and waits for human review. DefaultFalse.approval_timeout: Maximum time to wait for a review (timedelta).Nonemeans wait indefinitely. DefaultNone.allow_modifications: IfTrue, the reviewer can edit the output before approving. DefaultFalse.
Logging¶
After each LLM call, the operator logs a summary with model name, token usage, and request count at INFO level. At DEBUG level, the LLM output is also logged (truncated to 500 characters). See AgentOperator — Logging for details on the log format.