Tasks

A Task is the basic unit of execution in Airflow. Tasks are arranged into DAGs, and then have upstream and downstream dependencies set between them in order to express the order they should run in.

There are three basic kinds of Task:

  • Operators, predefined task templates that you can string together quickly to build most parts of your DAGs.

  • Sensors, a special subclass of Operators which are entirely about waiting for an external event to happen.

  • A TaskFlow-decorated @task, which is a custom Python function packaged up as a Task.

Internally, these are all actually subclasses of Airflow’s BaseOperator, and the concepts of Task and Operator are somewhat interchangeable, but it’s useful to think of them as separate concepts - essentially, Operators and Sensors are templates, and when you call one in a DAG file, you’re making a Task.

Relationships

The key part of using Tasks is defining how they relate to each other - their dependencies, or as we say in Airflow, their upstream and downstream tasks. You declare your Tasks first, and then you declare their dependencies second.

Note

We call the upstream task the one that is directly preceding the other task. We used to call it a parent task before. Be aware that this concept does not describe the tasks that are higher in the tasks hierarchy (i.e. they are not a direct parents of the task). Same definition applies to downstream task, which needs to be a direct child of the other task.

There are two ways of declaring dependencies - using the >> and << (bitshift) operators:

first_task >> second_task >> [third_task, fourth_task]

Or the more explicit set_upstream and set_downstream methods:

first_task.set_downstream(second_task)
third_task.set_upstream(second_task)

These both do exactly the same thing, but in general we recommend you use the bitshift operators, as they are easier to read in most cases.

By default, a Task will run when all of its upstream (parent) tasks have succeeded, but there are many ways of modifying this behaviour to add branching, to only wait for some upstream tasks, or to change behaviour based on where the current run is in history. For more, see Control Flow.

Tasks don’t pass information to each other by default, and run entirely independently. If you want to pass information from one Task to another, you should use XComs.

Task Instances

Much in the same way that a DAG is instantiated into a DAG Run each time it runs, the tasks under a DAG are instantiated into Task Instances.

An instance of a Task is a specific run of that task for a given DAG (and thus for a given data interval). They are also the representation of a Task that has state, representing what stage of the lifecycle it is in.

The possible states for a Task Instance are:

  • none: The Task has not yet been queued for execution (its dependencies are not yet met)

  • scheduled: The scheduler has determined the Task’s dependencies are met and it should run

  • queued: The task has been assigned to an Executor and is awaiting a worker

  • running: The task is running on a worker (or on a local/synchronous executor)

  • success: The task finished running without errors

  • restarting: The task was externally requested to restart when it was running

  • failed: The task had an error during execution and failed to run

  • skipped: The task was skipped due to branching, LatestOnly, or similar.

  • upstream_failed: An upstream task failed and the Trigger Rule says we needed it

  • up_for_retry: The task failed, but has retry attempts left and will be rescheduled.

  • up_for_reschedule: The task is a Sensor that is in reschedule mode

  • deferred: The task has been deferred to a trigger

  • removed: The task has vanished from the DAG since the run started

../_images/task_lifecycle_diagram.png

Ideally, a task should flow from none, to scheduled, to queued, to running, and finally to success.

When any custom Task (Operator) is running, it will get a copy of the task instance passed to it; as well as being able to inspect task metadata, it also contains methods for things like XComs.

Relationship Terminology

For any given Task Instance, there are two types of relationships it has with other instances.

Firstly, it can have upstream and downstream tasks:

task1 >> task2 >> task3

When a DAG runs, it will create instances for each of these tasks that are upstream/downstream of each other, but which all have the same data interval.

There may also be instances of the same task, but for different data intervals - from other runs of the same DAG. We call these previous and next - it is a different relationship to upstream and downstream!

Note

Some older Airflow documentation may still use “previous” to mean “upstream”. If you find an occurrence of this, please help us fix it!

Timeouts

If you want a task to have a maximum runtime, set its execution_timeout attribute to a datetime.timedelta value that is the maximum permissible runtime. This applies to all Airflow tasks, including sensors. execution_timeout controls the maximum time allowed for every execution. If execution_timeout is breached, the task times out and AirflowTaskTimeout is raised.

In addition, sensors have a timeout parameter. This only matters for sensors in reschedule mode. timeout controls the maximum time allowed for the sensor to succeed. If timeout is breached, AirflowSensorTimeout will be raised and the sensor fails immediately without retrying.

The following SFTPSensor example illustrates this. The sensor is in reschedule mode, meaning it is periodically executed and rescheduled until it succeeds.

  • Each time the sensor pokes the SFTP server, it is allowed to take maximum 60 seconds as defined by execution_timeout.

  • If it takes the sensor more than 60 seconds to poke the SFTP server, AirflowTaskTimeout will be raised. The sensor is allowed to retry when this happens. It can retry up to 2 times as defined by retries.

  • From the start of the first execution, till it eventually succeeds (i.e. after the file ‘root/test’ appears), the sensor is allowed maximum 3600 seconds as defined by timeout. In other words, if the file does not appear on the SFTP server within 3600 seconds, the sensor will raise AirflowSensorTimeout. It will not retry when this error is raised.

  • If the sensor fails due to other reasons such as network outages during the 3600 seconds interval, it can retry up to 2 times as defined by retries. Retrying does not reset the timeout. It will still have up to 3600 seconds in total for it to succeed.

sensor = SFTPSensor(
    task_id="sensor",
    path="/root/test",
    execution_timeout=timedelta(seconds=60),
    timeout=3600,
    retries=2,
    mode="reschedule",
)

SLAs

The SLA feature from Airflow 2 has been removed in 3.0 and will be replaced with a new implementation in Airflow 3.1

Special Exceptions

If you want to control your task’s state from within custom Task/Operator code, Airflow provides two special exceptions you can raise:

  • AirflowSkipException will mark the current task as skipped

  • AirflowFailException will mark the current task as failed ignoring any remaining retry attempts

These can be useful if your code has extra knowledge about its environment and wants to fail/skip faster - e.g., skipping when it knows there’s no data available, or fast-failing when it detects its API key is invalid (as that will not be fixed by a retry).

Zombie/Undead Tasks

No system runs perfectly, and task instances are expected to die once in a while. Airflow detects two kinds of task/process mismatch:

  • Zombie tasks are TaskInstances stuck in a running state despite their associated jobs being inactive (e.g. their process did not send a recent heartbeat as it got killed, or the machine died). Airflow will find these periodically, clean them up, and either fail or retry the task depending on its settings. Tasks can become zombies for many reasons, including:

    • The Airflow worker ran out of memory and was OOMKilled.

    • The Airflow worker failed its liveness probe, so the system (for example, Kubernetes) restarted the worker.

    • The system (for example, Kubernetes) scaled down and moved an Airflow worker from one node to another.

  • Undead tasks are tasks that are not supposed to be running but are, often caused when you manually edit Task Instances via the UI. Airflow will find them periodically and terminate them.

Below is the code snippet from the Airflow scheduler that runs periodically to detect zombie/undead tasks.

airflow/jobs/scheduler_job_runner.py[source]

    def _find_and_purge_zombies(self) -> None:
        """
        Find and purge zombie task instances.

        Zombie instances are tasks that failed to heartbeat for too long, or
        have a no-longer-running LocalTaskJob.

        A TaskCallbackRequest is also created for the killed zombie to be
        handled by the DAG processor, and the executor is informed to no longer
        count the zombie as running when it calculates parallelism.
        """
        with create_session() as session:
            if zombies := self._find_zombies(session=session):
                self._purge_zombies(zombies, session=session)

    def _find_zombies(self, *, session: Session) -> list[tuple[TI, str]]:
        self.log.debug("Finding 'running' jobs without a recent heartbeat")
        limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)
        zombies = session.execute(
            select(TI, DM.fileloc)
            .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
            .join(DM, TI.dag_id == DM.dag_id)
            .where(
                TI.state.in_((TaskInstanceState.RUNNING, TaskInstanceState.RESTARTING)),
                TI.last_heartbeat_at < limit_dttm,
            )
            .where(TI.queued_by_job_id == self.job.id)
        ).all()
        if zombies:
            self.log.warning("Failing %s TIs without heartbeat after %s", len(zombies), limit_dttm)
        return zombies

    def _purge_zombies(self, zombies: list[tuple[TI, str]], *, session: Session) -> None:
        for ti, file_loc in zombies:
            zombie_message_details = self._generate_zombie_message_details(ti)
            request = TaskCallbackRequest(
                full_filepath=file_loc,
                ti=ti,
                msg=str(zombie_message_details),
            )
            session.add(
                Log(
                    event="heartbeat timeout",
                    task_instance=ti.key,
                    extra=(
                        f"Task did not emit heartbeat within time limit ({self._zombie_threshold_secs} "
                        "seconds) and will be terminated. "
                        "See https://airflow.apache.org/docs/apache-airflow/"
                        "stable/core-concepts/tasks.html#zombie-undead-tasks"
                    ),
                )
            )
            self.log.error(
                "Detected zombie job: %s "
                "(See https://airflow.apache.org/docs/apache-airflow/"
                "stable/core-concepts/tasks.html#zombie-undead-tasks)",
                request,
            )
            self.job.executor.send_callback(request)
            if (executor := self._try_to_load_executor(ti.executor)) is None:
                self.log.warning("Cannot clean up zombie %r with non-existent executor %s", ti, ti.executor)
                continue
            executor.change_state(ti.key, TaskInstanceState.FAILED, remove_running=True)
            Stats.incr("zombies_killed", tags={"dag_id": ti.dag_id, "task_id": ti.task_id})

The explanation of the criteria used in the above snippet to detect zombie tasks is as below:

  1. Task Instance State

    Only task instances in the RUNNING state are considered potential zombies.

  2. Job State and Heartbeat Check

    Zombie tasks are identified if the associated job is not in the RUNNING state or if the latest heartbeat of the job is earlier than the calculated time threshold (limit_dttm). The heartbeat is a mechanism to indicate that a task or job is still alive and running.

  3. Job Type

    The job associated with the task must be of type LocalTaskJob.

  4. Queued by Job ID

    Only tasks queued by the same job that is currently being processed are considered.

These conditions collectively help identify running tasks that may be zombies based on their state, associated job state, heartbeat status, job type, and the specific job that queued them. If a task meets these criteria, it is considered a potential zombie, and further actions, such as logging and sending a callback request, are taken.

Reproducing zombie tasks locally

If you’d like to reproduce zombie tasks for development/testing processes, follow the steps below:

  1. Set the below environment variables for your local Airflow setup (alternatively you could tweak the corresponding config values in airflow.cfg)

export AIRFLOW__SCHEDULER__LOCAL_TASK_JOB_HEARTBEAT_SEC=600
export AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD=2
export AIRFLOW__SCHEDULER__ZOMBIE_DETECTION_INTERVAL=5
  1. Have a DAG with a task that takes about 10 minutes to complete(i.e. a long-running task). For example, you could use the below DAG:

from airflow.decorators import dag
from airflow.providers.standard.operators.bash import BashOperator
from datetime import datetime


@dag(start_date=datetime(2021, 1, 1), schedule="@once", catchup=False)
def sleep_dag():
    t1 = BashOperator(
        task_id="sleep_10_minutes",
        bash_command="sleep 600",
    )


sleep_dag()

Run the above DAG and wait for a while. You should see the task instance becoming a zombie task and then being killed by the scheduler.

Executor Configuration

Some Executors allow optional per-task configuration - such as the KubernetesExecutor, which lets you set an image to run the task on.

This is achieved via the executor_config argument to a Task or Operator. Here’s an example of setting the Docker image for a task that will run on the KubernetesExecutor:

MyOperator(...,
    executor_config={
        "KubernetesExecutor":
            {"image": "myCustomDockerImage"}
    }
)

The settings you can pass into executor_config vary by executor, so read the individual executor documentation in order to see what you can set.

Was this entry helpful?