Source code for airflow.providers.common.ai.example_dags.example_llm_classification

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG: classify pipeline incidents by severity using @task.llm with Literal output."""

from __future__ import annotations

from typing import Literal

from airflow.providers.common.compat.sdk import dag, task


# [START howto_decorator_llm_classification]
@dag
[docs] def example_llm_classification(): @task.llm( llm_conn_id="pydanticai_default", system_prompt=( "Classify the severity of the given pipeline incident. " "Use 'critical' for data loss or complete pipeline failure, " "'high' for significant delays or partial failures, " "'medium' for degraded performance, " "'low' for cosmetic issues or minor warnings." ), output_type=Literal["critical", "high", "medium", "low"], ) def classify_incident(description: str): # Pre-process the description before sending to the LLM return f"Classify this incident:\n{description.strip()}" classify_incident( "Scheduler heartbeat lost for 15 minutes. " "Multiple DAG runs stuck in queued state. " "No new tasks being scheduled across all DAGs." )
# [END howto_decorator_llm_classification] example_llm_classification()

Was this entry helpful?