Source code for airflow.providers.common.ai.example_dags.example_llm

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAGs demonstrating LLMOperator and @task.llm usage."""

from __future__ import annotations

from datetime import timedelta

from pydantic import BaseModel

from airflow.providers.common.ai.operators.llm import LLMOperator
from airflow.providers.common.compat.sdk import dag, task


# [START howto_operator_llm_basic]
@dag
[docs] def example_llm_operator(): LLMOperator( task_id="summarize", prompt="Summarize the key findings from the Q4 earnings report.", llm_conn_id="pydanticai_default", system_prompt="You are a financial analyst. Be concise.", )
# [END howto_operator_llm_basic] example_llm_operator() # [START howto_operator_llm_structured] @dag
[docs] def example_llm_operator_structured(): class Entities(BaseModel): names: list[str] locations: list[str] LLMOperator( task_id="extract_entities", prompt="Extract all named entities from the article.", llm_conn_id="pydanticai_default", system_prompt="Extract named entities.", output_type=Entities, )
# [END howto_operator_llm_structured] example_llm_operator_structured() # [START howto_operator_llm_agent_params] @dag
[docs] def example_llm_operator_agent_params(): LLMOperator( task_id="creative_writing", prompt="Write a haiku about data pipelines.", llm_conn_id="pydanticai_default", system_prompt="You are a creative writer.", agent_params={"model_settings": {"temperature": 0.9}, "retries": 3}, )
# [END howto_operator_llm_agent_params] example_llm_operator_agent_params() # [START howto_decorator_llm] @dag
[docs] def example_llm_decorator(): @task.llm(llm_conn_id="pydanticai_default", system_prompt="Summarize concisely.") def summarize(text: str): return f"Summarize this article: {text}" summarize("Apache Airflow is a platform for programmatically authoring...")
# [END howto_decorator_llm] example_llm_decorator() # [START howto_decorator_llm_structured] @dag
[docs] def example_llm_decorator_structured(): class Entities(BaseModel): names: list[str] locations: list[str] @task.llm( llm_conn_id="pydanticai_default", system_prompt="Extract named entities.", output_type=Entities, ) def extract(text: str): return f"Extract entities from: {text}" extract("Alice visited Paris and met Bob in London.")
# [END howto_decorator_llm_structured] example_llm_decorator_structured() # [START howto_operator_llm_approval] @dag
[docs] def example_llm_operator_approval(): LLMOperator( task_id="summarize_with_approval", prompt="Summarize the quarterly financial report for stakeholders.", llm_conn_id="pydanticai_default", system_prompt="You are a financial analyst. Be concise and accurate.", require_approval=True, approval_timeout=timedelta(hours=24), allow_modifications=True, )
# [END howto_operator_llm_approval] example_llm_operator_approval()

Was this entry helpful?