airflow.models.dag

Module Contents

Classes

DAG

A dag (directed acyclic graph) is a collection of tasks with directional dependencies.

DagTag

A tag name per dag, to allow quick filtering in the DAG view.

DagOwnerAttributes

Table defining different owner attributes.

DagModel

Table containing DAG properties.

Functions

get_last_dagrun(dag_id, session[, ...])

Return the last dag run for a dag, None if there was none.

get_asset_triggered_next_run_info(dag_ids, *, session)

Get next run info for a list of dag_ids.

Attributes

log

DEFAULT_VIEW_PRESETS

ORIENTATION_PRESETS

AssetT

TAG_MAX_LEN

DagStateChangeCallback

ScheduleInterval

ScheduleArg

dag

airflow.models.dag.log[source]
airflow.models.dag.DEFAULT_VIEW_PRESETS = ['grid', 'graph', 'duration', 'gantt', 'landing_times'][source]
airflow.models.dag.ORIENTATION_PRESETS = ['LR', 'TB', 'RL', 'BT'][source]
airflow.models.dag.AssetT[source]
airflow.models.dag.TAG_MAX_LEN = 100[source]
airflow.models.dag.DagStateChangeCallback[source]
airflow.models.dag.ScheduleInterval[source]
airflow.models.dag.ScheduleArg[source]
exception airflow.models.dag.InconsistentDataInterval(instance, start_field_name, end_field_name)[source]

Bases: airflow.exceptions.AirflowException

Exception raised when a model populates data interval fields incorrectly.

The data interval fields should either both be None (for runs scheduled prior to AIP-39), or both be datetime (for runs scheduled after AIP-39 is implemented). This is raised if exactly one of the fields is None.

__str__()[source]

Return str(self).

airflow.models.dag.get_last_dagrun(dag_id, session, include_externally_triggered=False)[source]

Return the last dag run for a dag, None if there was none.

Last dag run can be any type of run e.g. scheduled or backfilled. Overridden DagRuns are ignored.

airflow.models.dag.get_asset_triggered_next_run_info(dag_ids, *, session)[source]

Get next run info for a list of dag_ids.

Given a list of dag_ids, get string representing how close any that are asset triggered are their next run, e.g. “1 of 2 assets updated”.

airflow.models.dag.dag[source]
class airflow.models.dag.DAG(context=None)[source]

Bases: airflow.sdk.definitions.dag.DAG, airflow.utils.log.logging_mixin.LoggingMixin

A dag (directed acyclic graph) is a collection of tasks with directional dependencies.

A dag also has a schedule, a start date and an end date (optional). For each schedule, (say daily or hourly), the DAG needs to run each individual tasks as their dependencies are met. Certain tasks have the property of depending on their own past, meaning that they can’t run until their previous schedule (and upstream tasks) are completed.

DAGs essentially act as namespaces for tasks. A task_id can only be added once to a DAG.

Note that if you plan to use time zones all the dates provided should be pendulum dates. See Time zone aware DAGs.

New in version 2.4: The schedule argument to specify either time-based scheduling logic (timetable), or asset-driven triggers.

Changed in version 3.0: The default value of schedule has been changed to None (no schedule). The previous default was timedelta(days=1).

Parameters
  • dag_id – The id of the DAG; must consist exclusively of alphanumeric characters, dashes, dots and underscores (all ASCII)

  • description – The description for the DAG to e.g. be shown on the webserver

  • schedule – If provided, this defines the rules according to which DAG runs are scheduled. Possible values include a cron expression string, timedelta object, Timetable, or list of Asset objects. See also Customizing DAG Scheduling with Timetables.

  • start_date – The timestamp from which the scheduler will attempt to backfill. If this is not provided, backfilling must be done manually with an explicit time range.

  • end_date – A date beyond which your DAG won’t run, leave to None for open-ended scheduling.

  • template_searchpath – This list of folders (non-relative) defines where jinja will look for your templates. Order matters. Note that jinja/airflow includes the path of your DAG file by default

  • template_undefined – Template undefined type.

  • user_defined_macros – a dictionary of macros that will be exposed in your jinja templates. For example, passing dict(foo='bar') to this argument allows you to {{ foo }} in all jinja templates related to this DAG. Note that you can pass any type of object here.

  • user_defined_filters – a dictionary of filters that will be exposed in your jinja templates. For example, passing dict(hello=lambda name: 'Hello %s' % name) to this argument allows you to {{ 'world' | hello }} in all jinja templates related to this DAG.

  • default_args – A dictionary of default parameters to be used as constructor keyword parameters when initialising operators. Note that operators have the same hook, and precede those defined here, meaning that if your dict contains ‘depends_on_past’: True here and ‘depends_on_past’: False in the operator’s call default_args, the actual value will be False.

  • params – a dictionary of DAG level parameters that are made accessible in templates, namespaced under params. These params can be overridden at the task level.

  • max_active_tasks – the number of task instances allowed to run concurrently

  • max_active_runs – maximum number of active DAG runs, beyond this number of DAG runs in a running state, the scheduler won’t create new active DAG runs

  • max_consecutive_failed_dag_runs – (experimental) maximum number of consecutive failed DAG runs, beyond this the scheduler will disable the DAG

  • dagrun_timeout – Specify the duration a DagRun should be allowed to run before it times out or fails. Task instances that are running when a DagRun is timed out will be marked as skipped.

  • sla_miss_callback – DEPRECATED - The SLA feature is removed in Airflow 3.0, to be replaced with a new implementation in 3.1

  • default_view – Specify DAG default view (grid, graph, duration, gantt, landing_times), default grid

  • orientation – Specify DAG orientation in graph view (LR, TB, RL, BT), default LR

  • catchup – Perform scheduler catchup (or only run latest)? Defaults to True

  • on_failure_callback – A function or list of functions to be called when a DagRun of this dag fails. A context dictionary is passed as a single parameter to this function.

  • on_success_callback – Much like the on_failure_callback except that it is executed when the dag succeeds.

  • access_control – Specify optional DAG-level actions, e.g., “{‘role1’: {‘can_read’}, ‘role2’: {‘can_read’, ‘can_edit’, ‘can_delete’}}” or it can specify the resource name if there is a DAGs Run resource, e.g., “{‘role1’: {‘DAG Runs’: {‘can_create’}}, ‘role2’: {‘DAGs’: {‘can_read’, ‘can_edit’, ‘can_delete’}}”

  • is_paused_upon_creation – Specifies if the dag is paused when created for the first time. If the dag exists already, this flag will be ignored. If this optional parameter is not specified, the global config setting will be used.

  • jinja_environment_kwargs

    additional configuration options to be passed to Jinja Environment for template rendering

    Example: to avoid Jinja from removing a trailing newline from template strings

    DAG(
        dag_id="my-dag",
        jinja_environment_kwargs={
            "keep_trailing_newline": True,
            # some other jinja2 Environment options here
        },
    )
    

    See: Jinja Environment documentation

  • render_template_as_native_obj – If True, uses a Jinja NativeEnvironment to render templates as native Python types. If False, a Jinja Environment is used to render templates as string values.

  • tags – List of tags to help filtering DAGs in the UI.

  • owner_links – Dict of owners and their links, that will be clickable on the DAGs view UI. Can be used as an HTTP link (for example the link to your Slack channel), or a mailto link. e.g: {“dag_owner”: “https://airflow.apache.org/”}

  • auto_register – Automatically register this DAG when it is used in a with block

  • fail_stop – Fails currently running tasks when task in DAG fails. Warning: A fail stop dag can only have tasks with the default trigger rule (“all_success”). An exception will be thrown if any task in a fail stop dag has a non default trigger rule.

  • dag_display_name – The display name of the DAG which appears on the UI.

property safe_dag_id[source]
property dag_id: str[source]
property timetable_summary: str[source]
property relative_fileloc: pathlib.Path[source]

File location of the importable dag ‘file’ relative to the configured DAGs folder.

partial: bool = False[source]
last_loaded: datetime.datetime | None[source]
default_view: str[source]
orientation: str[source]
max_consecutive_failed_dag_runs: int[source]
validate()[source]

Validate the DAG has a coherent setup.

This is called by the DAG bag before bagging the DAG.

validate_executor_field()[source]
next_dagrun_info(last_automated_dagrun, *, restricted=True)[source]

Get information about the next DagRun of this dag after date_last_automated_dagrun.

This calculates what time interval the next DagRun should operate on (its logical date) and when it can be scheduled, according to the dag’s timetable, start_date, end_date, etc. This doesn’t check max active run or any other “max_active_tasks” type limits, but only performs calculations based on the various date and interval fields of this dag and its tasks.

Parameters
  • last_automated_dagrun (None | airflow.timetables.base.DataInterval) – The max(logical_date) of existing “automated” DagRuns for this dag (scheduled or backfill, but not manual).

  • restricted (bool) – If set to False (default is True), ignore start_date, end_date, and catchup specified on the DAG or tasks.

Returns

DagRunInfo of the next dagrun, or None if a dagrun is not going to be scheduled.

Return type

airflow.timetables.base.DagRunInfo | None

iter_dagrun_infos_between(earliest, latest, *, align=True)[source]

Yield DagRunInfo using this DAG’s timetable between given interval.

DagRunInfo instances yielded if their logical_date is not earlier than earliest, nor later than latest. The instances are ordered by their logical_date from earliest to latest.

If align is False, the first run will happen immediately on earliest, even if it does not fall on the logical timetable schedule. The default is True.

Example: A DAG is scheduled to run every midnight (0 0 * * *). If earliest is 2021-06-03 23:00:00, the first DagRunInfo would be 2021-06-03 23:00:00 if align=False, and 2021-06-04 00:00:00 if align=True.

get_last_dagrun(session=NEW_SESSION, include_externally_triggered=False)[source]
has_dag_runs(session=NEW_SESSION, include_externally_triggered=True)[source]
get_concurrency_reached(session=NEW_SESSION)[source]

Return a boolean indicating whether the max_active_tasks limit for this DAG has been reached.

get_is_active(session=NEW_SESSION)[source]

Return a boolean indicating whether this DAG is active.

get_is_paused(session=NEW_SESSION)[source]

Return a boolean indicating whether this DAG is paused.

classmethod get_serialized_fields()[source]

Stringified DAGs and operators contain exactly these fields.

static fetch_callback(dag, run_id, success=True, reason=None, *, session=NEW_SESSION)[source]

Fetch the appropriate callbacks depending on the value of success.

This method gets the context of a single TaskInstance part of this DagRun and returns it along the list of callbacks.

Parameters
  • dag (DAG) – DAG object

  • run_id (str) – The DAG run ID

  • success (bool) – Flag to specify if failure or success callback should be called

  • reason (str | None) – Completion reason

  • session (sqlalchemy.orm.session.Session) – Database session

handle_callback(dagrun, success=True, reason=None, session=NEW_SESSION)[source]

Triggers on_failure_callback or on_success_callback as appropriate.

This method gets the context of a single TaskInstance part of this DagRun and passes that to the callable along with a ‘reason’, primarily to differentiate DagRun failures.

Parameters
  • dagrun (airflow.models.dagrun.DagRun) – DagRun object

  • success – Flag to specify if failure or success callback should be called

  • reason – Completion reason

  • session – Database session

classmethod execute_callback(callbacks, context, dag_id)[source]

Triggers the callbacks with the given context.

Parameters
  • callbacks (list[Callable] | None) – List of callbacks to call

  • context (airflow.models.taskinstance.Context | None) – Context to pass to all callbacks

  • dag_id (str) – The dag_id of the DAG to find.

get_active_runs()[source]

Return a list of dag run logical dates currently running.

Returns

List of logical dates

static fetch_dagrun(dag_id, run_id, session=NEW_SESSION)[source]

Return the dag run for a given run_id if it exists, otherwise none.

Parameters
Returns

The DagRun if found, otherwise None.

Return type

airflow.models.dagrun.DagRun

get_dagrun(run_id, session=NEW_SESSION)[source]
get_dagruns_between(start_date, end_date, session=NEW_SESSION)[source]

Return the list of dag runs between start_date (inclusive) and end_date (inclusive).

Parameters
  • start_date – The starting logical date of the DagRun to find.

  • end_date – The ending logical date of the DagRun to find.

  • session

Returns

The list of DagRuns found.

get_latest_logical_date(session=NEW_SESSION)[source]

Return the latest date for which at least one dag run exists.

get_task_instances_before(base_date, num, *, session=NEW_SESSION)[source]

Get num task instances before (including) base_date.

The returned list may contain exactly num task instances corresponding to any DagRunType. It can have less if there are less than num scheduled DAG runs before base_date.

get_task_instances(start_date=None, end_date=None, state=None, session=NEW_SESSION)[source]
set_task_instance_state(*, task_id, map_indexes=None, run_id=None, state, upstream=False, downstream=False, future=False, past=False, commit=True, session=NEW_SESSION)[source]

Set the state of a TaskInstance and clear downstream tasks in failed or upstream_failed state.

Parameters
  • task_id (str) – Task ID of the TaskInstance

  • map_indexes (collections.abc.Collection[int] | None) – Only set TaskInstance if its map_index matches. If None (default), all mapped TaskInstances of the task are set.

  • run_id (str | None) – The run_id of the TaskInstance

  • state (airflow.utils.state.TaskInstanceState) – State to set the TaskInstance to

  • upstream (bool) – Include all upstream tasks of the given task_id

  • downstream (bool) – Include all downstream tasks of the given task_id

  • future (bool) – Include all future TaskInstances of the given task_id

  • commit (bool) – Commit changes

  • past (bool) – Include all past TaskInstances of the given task_id

set_task_group_state(*, group_id, run_id=None, state, upstream=False, downstream=False, future=False, past=False, commit=True, session=NEW_SESSION)[source]

Set TaskGroup to the given state and clear downstream tasks in failed or upstream_failed state.

Parameters
  • group_id (str) – The group_id of the TaskGroup

  • run_id (str | None) – The run_id of the TaskInstance

  • state (airflow.utils.state.TaskInstanceState) – State to set the TaskInstance to

  • upstream (bool) – Include all upstream tasks of the given task_id

  • downstream (bool) – Include all downstream tasks of the given task_id

  • future (bool) – Include all future TaskInstances of the given task_id

  • commit (bool) – Commit changes

  • past (bool) – Include all past TaskInstances of the given task_id

  • session (sqlalchemy.orm.session.Session) – new session

clear(*, dry_run: airflow.typing_compat.Literal[True], task_ids: collections.abc.Collection[str | tuple[str, int]] | None = None, start_date: datetime.datetime | None = None, end_date: datetime.datetime | None = None, only_failed: bool = False, only_running: bool = False, confirm_prompt: bool = False, dag_run_state: airflow.utils.state.DagRunState = DagRunState.QUEUED, session: sqlalchemy.orm.session.Session = NEW_SESSION, dag_bag: airflow.models.dagbag.DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), exclude_run_ids: frozenset[str] | None = frozenset()) list[airflow.models.taskinstance.TaskInstance][source]
clear(*, task_ids: collections.abc.Collection[str | tuple[str, int]] | None = None, start_date: datetime.datetime | None = None, end_date: datetime.datetime | None = None, only_failed: bool = False, only_running: bool = False, confirm_prompt: bool = False, dag_run_state: airflow.utils.state.DagRunState = DagRunState.QUEUED, dry_run: airflow.typing_compat.Literal[False] = False, session: sqlalchemy.orm.session.Session = NEW_SESSION, dag_bag: airflow.models.dagbag.DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), exclude_run_ids: frozenset[str] | None = frozenset()) int

Clear a set of task instances associated with the current dag for a specified date range.

Parameters
  • task_ids – List of task ids or (task_id, map_index) tuples to clear

  • start_date – The minimum logical_date to clear

  • end_date – The maximum logical_date to clear

  • only_failed – Only clear failed tasks

  • only_running – Only clear running tasks.

  • confirm_prompt – Ask for confirmation

  • dag_run_state – state to set DagRun to. If set to False, dagrun state will not be changed.

  • dry_run – Find the tasks to clear but don’t clear them.

  • session – The sqlalchemy session to use

  • dag_bag – The DagBag used to find the dags (Optional)

  • exclude_task_ids – A set of task_id or (task_id, map_index) tuples that should not be cleared

  • exclude_run_ids – A set of run_id or (run_id)

classmethod clear_dags(dags, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, dag_run_state=DagRunState.QUEUED, dry_run=False)[source]
cli()[source]

Exposes a CLI specific to this DAG.

test(logical_date=None, run_conf=None, conn_file_path=None, variable_file_path=None, use_executor=False, mark_success_pattern=None, session=NEW_SESSION)[source]

Execute one single DagRun for a given DAG and logical date.

Parameters
  • logical_date (datetime.datetime | None) – logical date for the DAG run

  • run_conf (dict[str, Any] | None) – configuration to pass to newly created dagrun

  • conn_file_path (str | None) – file path to a connection file in either yaml or json

  • variable_file_path (str | None) – file path to a variable file in either yaml or json

  • use_executor (bool) – if set, uses an executor to test the DAG

  • mark_success_pattern (re.Pattern | str | None) – regex of task_ids to mark as success instead of running

  • session (sqlalchemy.orm.session.Session) – database connection (optional)

create_dagrun(state, *, triggered_by, logical_date=None, run_id=None, start_date=None, external_trigger=False, conf=None, run_type=None, session=NEW_SESSION, dag_version=None, creating_job_id=None, data_interval=None, backfill_id=None)[source]

Create a dag run from this dag including the tasks associated with this dag.

Returns the dag run.

Parameters
  • state (airflow.utils.state.DagRunState) – the state of the dag run

  • triggered_by (airflow.utils.types.DagRunTriggeredByType | None) – The entity which triggers the DagRun

  • run_id (str | None) – defines the run id for this dag run

  • run_type (airflow.utils.types.DagRunType | None) – type of DagRun

  • logical_date (datetime.datetime | None) – the logical date of this dag run

  • start_date (datetime.datetime | None) – the date this dag run should be evaluated

  • external_trigger (bool | None) – whether this dag run is externally triggered

  • conf (dict | None) – Dict containing configuration/parameters to pass to the DAG

  • creating_job_id (int | None) – id of the job creating this DagRun

  • session (sqlalchemy.orm.session.Session) – database session

  • dag_version (airflow.models.dag_version.DagVersion | None) – The DagVersion object for this run

  • data_interval (tuple[datetime.datetime, datetime.datetime] | None) – Data interval of the DagRun

  • backfill_id (int | None) – id of the backfill run if one exists

classmethod bulk_write_to_db(dags, session=NEW_SESSION)[source]

Ensure the DagModel rows for the given dags are up-to-date in the dag table in the DB.

Parameters

dags (collections.abc.Collection[airflow.serialization.serialized_objects.MaybeSerializedDAG]) – the DAG objects to save to the DB

Returns

None

sync_to_db(session=NEW_SESSION)[source]

Save attributes about this DAG to the DB.

Returns

None

get_default_view()[source]

Allow backward compatible jinja2 templates.

static deactivate_unknown_dags(active_dag_ids, session=NEW_SESSION)[source]

Given a list of known DAGs, deactivate any other DAGs that are marked as active in the ORM.

Parameters

active_dag_ids – list of DAG IDs that are active

Returns

None

static deactivate_stale_dags(expiration_date, session=NEW_SESSION)[source]

Deactivate any DAGs that were last touched by the scheduler before the expiration date.

These DAGs were likely deleted.

Parameters

expiration_date – set inactive DAGs that were touched before this time

Returns

None

static get_num_task_instances(dag_id, run_id=None, task_ids=None, states=None, session=NEW_SESSION)[source]

Return the number of task instances in the given DAG.

Parameters
  • session – ORM session

  • dag_id – ID of the DAG to get the task concurrency of

  • run_id – ID of the DAG run to get the task concurrency of

  • task_ids – A list of valid task IDs for the given DAG

  • states – A list of states to filter by if supplied

Returns

The number of running tasks

Return type

int

get_task_assets(inlets=True, outlets=True, of_type=Asset)[source]
class airflow.models.dag.DagTag[source]

Bases: airflow.models.base.Base

A tag name per dag, to allow quick filtering in the DAG view.

__tablename__ = 'dag_tag'[source]
name[source]
dag_id[source]
__table_args__ = ()[source]
__repr__()[source]

Return repr(self).

class airflow.models.dag.DagOwnerAttributes[source]

Bases: airflow.models.base.Base

Table defining different owner attributes.

For example, a link for an owner that will be passed as a hyperlink to the “DAGs” view.

__tablename__ = 'dag_owner_attributes'[source]
dag_id[source]
owner[source]
__repr__()[source]

Return repr(self).

classmethod get_all(session)[source]
class airflow.models.dag.DagModel(**kwargs)[source]

Bases: airflow.models.base.Base

Table containing DAG properties.

property next_dagrun_data_interval: airflow.timetables.base.DataInterval | None[source]
property timezone[source]
property safe_dag_id[source]
property relative_fileloc: pathlib.Path | None[source]

File location of the importable dag ‘file’ relative to the configured DAGs folder.

__tablename__ = 'dag'[source]

These items are stored in the database for state related information

dag_id[source]
is_paused_at_creation[source]
is_paused[source]
is_active[source]
last_parsed_time[source]
last_expired[source]
fileloc[source]
bundle_name[source]
latest_bundle_version[source]
owners[source]
description[source]
default_view[source]
timetable_summary[source]
timetable_description[source]
asset_expression[source]
tags[source]
max_active_tasks[source]
max_active_runs[source]
max_consecutive_failed_dag_runs[source]
has_task_concurrency_limits[source]
has_import_errors[source]
next_dagrun[source]
next_dagrun_data_interval_start[source]
next_dagrun_data_interval_end[source]
next_dagrun_create_after[source]
__table_args__ = ()[source]
schedule_asset_references[source]
schedule_asset_alias_references[source]
schedule_assets[source]
task_outlet_asset_references[source]
NUM_DAGS_PER_DAGRUN_QUERY[source]
dag_versions[source]
__repr__()[source]

Return repr(self).

static get_dagmodel(dag_id, session=NEW_SESSION)[source]
classmethod get_current(dag_id, session=NEW_SESSION)[source]
get_last_dagrun(session=NEW_SESSION, include_externally_triggered=False)[source]
get_is_paused(*, session=None)[source]

Provide interface compatibility to ‘DAG’.

get_is_active(*, session=None)[source]

Provide interface compatibility to ‘DAG’.

static get_paused_dag_ids(dag_ids, session=NEW_SESSION)[source]

Given a list of dag_ids, get a set of Paused Dag Ids.

Parameters
Returns

Paused Dag_ids

Return type

set[str]

get_default_view()[source]

Get the Default DAG View, returns the default config value if DagModel does not have a value.

set_is_paused(is_paused, session=NEW_SESSION)[source]

Pause/Un-pause a DAG.

Parameters
  • is_paused (bool) – Is the DAG paused

  • session – session

dag_display_name()[source]
classmethod deactivate_deleted_dags(alive_dag_filelocs, session=NEW_SESSION)[source]

Set is_active=False on the DAGs for which the DAG files have been removed.

Parameters
classmethod dags_needing_dagruns(session)[source]

Return (and lock) a list of Dag objects that are due to create a new DagRun.

This will return a resultset of rows that is row-level-locked with a “SELECT … FOR UPDATE” query, you should ensure that any scheduling decisions are made in a single transaction – as soon as the transaction is committed it will be unlocked.

calculate_dagrun_date_fields(dag, last_automated_dag_run)[source]

Calculate next_dagrun and next_dagrun_create_after`.

Parameters
get_asset_triggered_next_run_info(*, session=NEW_SESSION)[source]

Was this entry helpful?