HITLOperator (Human-in-the-loop)¶
Added in version 3.1.
Human-in-the-Loop (HITL) functionality allows you to incorporate human decision-making directly into your workflows. This powerful feature enables workflows to pause and wait for human input, making it perfect for approval processes, manual quality checks, and scenarios where human judgment is essential.
In this tutorial, we will explore how to use the HITL operators in workflows.
An HITL Example Dag¶
Here is what HITL looks like in a Dag. We’ll break it down and dive into it.
/opt/airflow/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
import datetime
import pendulum
from airflow.providers.standard.operators.hitl import (
ApprovalOperator,
HITLBranchOperator,
HITLEntryOperator,
HITLOperator,
)
from airflow.sdk import DAG, Param, task
with DAG(
dag_id="example_hitl_operator",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example", "HITL"],
):
wait_for_input = HITLEntryOperator(
task_id="wait_for_input",
subject="Please provide required information: ",
params={"information": Param("", type="string")},
)
wait_for_option = HITLOperator(
task_id="wait_for_option",
subject="Please choose one option to proceed: ",
options=["option 1", "option 2", "option 3"],
)
valid_input_and_options = ApprovalOperator(
task_id="valid_input_and_options",
subject="Are the following input and options valid?",
body="""
Input: {{ task_instance.xcom_pull(task_ids='wait_for_input', key='return_value')["params_input"]["information"] }}
Option: {{ task_instance.xcom_pull(task_ids='wait_for_option', key='return_value')["chosen_options"] }}
""",
defaults="Reject",
execution_timeout=datetime.timedelta(minutes=1),
)
choose_a_branch_to_run = HITLBranchOperator(
task_id="choose_a_branch_to_run",
subject="You're now allowed to proceeded. Please choose one task to run: ",
options=["task_1", "task_2", "task_3"],
)
@task
def task_1(): ...
@task
def task_2(): ...
@task
def task_3(): ...
(
[wait_for_input, wait_for_option]
>> valid_input_and_options
>> choose_a_branch_to_run
>> [task_1(), task_2(), task_3()]
)
Input Provision¶
Users can provide input using params that is used for subsequent tasks. This is useful for workflows involving human guidance within large language model (LLM) workflows.
/opt/airflow/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
wait_for_input = HITLEntryOperator(
task_id="wait_for_input",
subject="Please provide required information: ",
params={"information": Param("", type="string")},
)
Option Selection¶
Input can be provided in the form of options. Users can select one of the available options, which can be used to direct the workflow.
/opt/airflow/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
wait_for_option = HITLOperator(
task_id="wait_for_option",
subject="Please choose one option to proceed: ",
options=["option 1", "option 2", "option 3"],
)
Approval or Rejection¶
A specialized form of option selection, which has only ‘Approval’ and ‘Rejection’ as options.
/opt/airflow/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
valid_input_and_options = ApprovalOperator(
task_id="valid_input_and_options",
subject="Are the following input and options valid?",
body="""
Input: {{ task_instance.xcom_pull(task_ids='wait_for_input', key='return_value')["params_input"]["information"] }}
Option: {{ task_instance.xcom_pull(task_ids='wait_for_option', key='return_value')["chosen_options"] }}
""",
defaults="Reject",
execution_timeout=datetime.timedelta(minutes=1),
)
As you can see in the body of this code snippet, you can use XComs to get information provided by the user.
Branch Selection¶
Users can choose which branches to follow within the Dag. This is commonly applied in scenarios such as content moderation, where human judgment is sometimes required.
This is like option selection, but the option needs to be a task. And remember to specify their relationship in the workflow.
/opt/airflow/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
choose_a_branch_to_run = HITLBranchOperator(
task_id="choose_a_branch_to_run",
subject="You're now allowed to proceeded. Please choose one task to run: ",
options=["task_1", "task_2", "task_3"],
)
/opt/airflow/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
@task
def task_1(): ...
@task
def task_2(): ...
@task
def task_3(): ...
(
[wait_for_input, wait_for_option]
>> valid_input_and_options
>> choose_a_branch_to_run
>> [task_1(), task_2(), task_3()]
)
Benefits and Common Use Cases¶
HITL functionality is valuable in large language model (LLM) workflows, where human-provided guidance can be essential for achieving better results. It is also highly beneficial in enterprise data pipelines, where human validation can complement and enhance automated processes.