LLMOperator¶
Use LLMOperator for
general-purpose LLM calls — summarization, extraction, classification,
structured output, or any prompt-based task.
The operator sends a prompt to an LLM via
PydanticAIHook and
returns the output as XCom.
See also
Basic Usage¶
Provide a prompt and the operator returns the LLM’s response as a string:
@dag(tags=["example"])
def example_llm_operator():
LLMOperator(
task_id="summarize",
prompt="Summarize the key findings from the Q4 earnings report.",
llm_conn_id="pydanticai_default",
system_prompt="You are a financial analyst. Be concise.",
)
Structured Output¶
Set output_type to a Pydantic BaseModel subclass. The LLM is instructed
to return structured data, and the model instance is pushed to XCom unchanged
so downstream tasks can type-hint the class directly
(def downstream(result: MyModel)) and use attribute access (result.field).
The declared output_type (and any BaseModel reachable from
Union/Optional/list shapes) is registered for XCom deserialization by
the worker when it loads the DAG, before any task runs – so no edit to
[core] allowed_deserialization_classes is needed. The Pydantic class must be
defined at module scope and bound to an attribute matching its __name__;
classes nested inside a function or @dag-decorated body, parameterized
generics, and dynamically-built classes whose __name__ does not match the
attribute they are bound to cannot be re-imported, so they are skipped with a
warning at worker startup and the value fails to deserialize at the consumer.
# Pydantic output classes must be defined at module scope so they survive
# XCom serialization (their qualname is used to re-import them downstream).
class Entities(BaseModel):
"""Named entities extracted from a text."""
names: list[str]
locations: list[str]
@dag(tags=["example"])
def example_llm_operator_structured():
LLMOperator(
task_id="extract_entities",
prompt="Extract all named entities from the article.",
llm_conn_id="pydanticai_default",
system_prompt="Extract named entities.",
output_type=Entities,
)
Registration covers downstream tasks in the same DAG: every worker walks the
loaded DAG’s tasks at startup and registers each declared class, so it also works
for mapped producers (.expand(...)) and for workers that load DAGs from a
cache that bypasses operator construction.
The Airflow UI’s XCom viewer renders Pydantic instances via the
stringify path, which produces a representation like
my_module.MyModel@version=1(field=value,...) without consulting the
allow-list. It is not pretty (no field-by-field rendering today), but the value
shows up; no configuration is required.
The remaining gap is cross-DAG xcom_pull – a task in a different DAG
that pulls this XCom only parses its own DAG file, not the producer’s, so the
class is not auto-registered. Add the class qualified name to
[core] allowed_deserialization_classes (or a glob that matches it) to make
that pattern work.
If a downstream consumer needs the dict shape (e.g. forwarding to an external
system that expects JSON-style payloads), pass serialize_output=True and the
operator calls model_dump() before pushing to XCom. The pre-PR behavior is
available on demand without giving up the typed default.
Agent Parameters¶
Pass additional keyword arguments to the pydantic-ai Agent constructor
via agent_params — for example, retries, model_settings, or tools.
See the pydantic-ai Agent docs for
the full list of supported parameters.
@dag(tags=["example"])
def example_llm_operator_agent_params():
LLMOperator(
task_id="creative_writing",
prompt="Write a haiku about data pipelines.",
llm_conn_id="pydanticai_default",
system_prompt="You are a creative writer.",
agent_params={"model_settings": {"temperature": 0.9}, "retries": 3},
)
Usage Limits¶
Set usage_limits to a
pydantic-ai UsageLimits
to fail the task when the run exceeds a configured budget — request count,
input/output tokens, or tool calls. The check happens inside pydantic-ai’s
run loop, so the limit applies even when retries triggers multiple model
calls within a single task.
@dag(tags=["example"])
def example_llm_operator_usage_limits():
LLMOperator(
task_id="capped_summary",
prompt="Summarize the attached design doc in three bullet points.",
llm_conn_id="pydanticai_default",
system_prompt="You are a concise technical reviewer.",
# Fail the task if the run exceeds 5 model requests, 4_000 input
# tokens, or 1_000 output tokens. Useful for guardrails on shared
# connections or untrusted prompts.
usage_limits=UsageLimits(
request_limit=5,
input_tokens_limit=4_000,
output_tokens_limit=1_000,
),
)
Common knobs on UsageLimits:
request_limit— max model requests per run (caps retry/tool-loop blow-ups). pydantic-ai applies a default of50whenUsageLimits()is constructed without an explicit value, so passingUsageLimits(input_tokens_limit=4_000)silently inherits that 50-request cap. Setrequest_limit=Noneto disable it explicitly when you only want a token cap.input_tokens_limit/output_tokens_limit— per-run token caps.total_tokens_limit— combined input + output cap.tool_calls_limit— max tool invocations (AgentOperatoronly).
When the limit is hit pydantic-ai raises UsageLimitExceeded, which
propagates to Airflow as a task failure — Airflow’s standard retry policy
applies on top.
TaskFlow Decorator¶
The @task.llm decorator wraps LLMOperator. The function returns the
prompt string; all other parameters are passed to the operator:
@dag(tags=["example"])
def example_llm_decorator():
@task.llm(llm_conn_id="pydanticai_default", system_prompt="Summarize concisely.")
def summarize(text: str):
return f"Summarize this article: {text}"
summarize("Apache Airflow is a platform for programmatically authoring...")
With structured output:
@dag(tags=["example"])
def example_llm_decorator_structured():
@task.llm(
llm_conn_id="pydanticai_default",
system_prompt="Extract named entities.",
output_type=Entities,
)
def extract(text: str):
return f"Extract entities from: {text}"
extract("Alice visited Paris and met Bob in London.")
Multimodal prompts¶
@task.llm accepts the same prompt shape as @task.agent – the callable
may return either a str or a non-empty Sequence[UserContent] (e.g.,
["Describe this:", ImageUrl(url="...")]) for vision, audio, or document
inputs. See @task.agent multimodal prompts for
the full example. require_approval=True is not currently supported with a
Sequence prompt – the approval session model expects a string – and will
raise at the approval boundary; widening that path is tracked as a follow-up.
Classification with Literal¶
Set output_type to a Literal to constrain the LLM to a fixed set of
labels — useful for classification tasks:
@dag(tags=["example"])
def example_llm_classification():
@task.llm(
llm_conn_id="pydanticai_default",
system_prompt=(
"Classify the severity of the given pipeline incident. "
"Use 'critical' for data loss or complete pipeline failure, "
"'high' for significant delays or partial failures, "
"'medium' for degraded performance, "
"'low' for cosmetic issues or minor warnings."
),
output_type=Literal["critical", "high", "medium", "low"],
)
def classify_incident(description: str):
# Pre-process the description before sending to the LLM
return f"Classify this incident:\n{description.strip()}"
classify_incident(
"Scheduler heartbeat lost for 15 minutes. "
"Multiple DAG runs stuck in queued state. "
"No new tasks being scheduled across all DAGs."
)
Multi-task pipeline with dynamic mapping¶
Combine @task.llm with upstream and downstream tasks. Use .expand()
to process a list of items in parallel:
@dag(tags=["example"])
def example_llm_analysis_pipeline():
@task
def get_support_tickets():
"""Fetch unprocessed support tickets."""
return [
(
"Our nightly ETL pipeline has been failing for the past 3 days. "
"The error shows a connection timeout to the Postgres source database. "
"This is blocking our daily financial reports."
),
(
"We'd like to add a new connection type for our internal ML model registry. "
"Is there documentation on creating custom hooks?"
),
(
"After upgrading to the latest version, the Grid view takes over "
"30 seconds to load for DAGs with more than 500 tasks. "
"Previously it loaded in under 5 seconds."
),
]
@task.llm(
llm_conn_id="pydanticai_default",
system_prompt=(
"Analyze the support ticket and extract: "
"priority (critical/high/medium/low), "
"category (bug/feature_request/question/performance), "
"a one-sentence summary, and a suggested next action."
),
output_type=TicketAnalysis,
)
def analyze_ticket(ticket: str):
return f"Analyze this support ticket:\n\n{ticket}"
@task
def store_results(analyses: list[TicketAnalysis]):
"""Store ticket analyses. In production, this would write to a database or ticketing system."""
for analysis in analyses:
print(f"[{analysis.priority.upper()}] {analysis.category}: {analysis.summary}")
tickets = get_support_tickets()
analyses = analyze_ticket.expand(ticket=tickets)
store_results(analyses)
Human-in-the-Loop Approval¶
Set require_approval=True to pause the task after the LLM generates its
output and wait for a human reviewer to approve or reject it via the Airflow
HITL interface. Optionally allow the reviewer to edit the output before
approving with allow_modifications=True, and set a deadline with
approval_timeout:
@dag(tags=["example"])
def example_llm_operator_approval():
LLMOperator(
task_id="summarize_with_approval",
prompt="Summarize the quarterly financial report for stakeholders.",
llm_conn_id="pydanticai_default",
system_prompt="You are a financial analyst. Be concise and accurate.",
require_approval=True,
approval_timeout=timedelta(hours=24),
allow_modifications=True,
)
Parameters¶
prompt: The prompt to send to the LLM (operator) or the return value of the decorated function (decorator).llm_conn_id: Airflow connection ID for the LLM provider.model_id: Model identifier (e.g."openai:gpt-5"). Overrides the connection’s extra field.system_prompt: System-level instructions for the agent. Supports Jinja templating.output_type: Expected output type (default:str). Set to a PydanticBaseModelfor structured output.agent_params: Additional keyword arguments passed to the pydantic-aiAgentconstructor (e.g.retries,model_settings,tools). Supports Jinja templating.usage_limits: Optional pydantic-aiUsageLimitsenforced on the run. Fails the task when token / request / tool-call budgets are exceeded. DefaultNone.require_approval: IfTrue, the task defers after generating output and waits for human review. DefaultFalse.approval_timeout: Maximum time to wait for a review (timedelta).Nonemeans wait indefinitely. DefaultNone.allow_modifications: IfTrue, the reviewer can edit the output before approving. DefaultFalse.
Logging¶
After each LLM call, the operator logs a summary with model name, token usage, and request count at INFO level. At DEBUG level, the LLM output is also logged (truncated to 500 characters). See AgentOperator — Logging for details on the log format.