airflow.models.dagrun
¶
Module Contents¶
Classes¶
Type of return for DagRun.task_instance_scheduling_decisions. |
|
Invocation instance of a DAG. |
|
For storage of arbitrary notes concerning the dagrun instance. |
Attributes¶
- airflow.models.dagrun.RUN_ID_REGEX = '^(?:manual|scheduled|asset_triggered)__(?:\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\+00:00)$'[source]¶
- class airflow.models.dagrun.TISchedulingDecision[source]¶
Bases:
NamedTuple
Type of return for DagRun.task_instance_scheduling_decisions.
- schedulable_tis: list[airflow.models.taskinstance.TaskInstance][source]¶
- unfinished_tis: list[airflow.models.taskinstance.TaskInstance][source]¶
- finished_tis: list[airflow.models.taskinstance.TaskInstance][source]¶
- class airflow.models.dagrun.DagRun(dag_id=None, run_id=None, queued_at=NOTSET, logical_date=None, start_date=None, external_trigger=None, conf=None, state=None, run_type=None, creating_job_id=None, data_interval=None, triggered_by=None, backfill_id=None, dag_version=None)[source]¶
Bases:
airflow.models.base.Base
,airflow.utils.log.logging_mixin.LoggingMixin
Invocation instance of a DAG.
A DAG run can be created by the scheduler (i.e. scheduled runs), or by an external trigger (i.e. manual runs).
- backfill_id[source]¶
The backfill this DagRun is currently associated with.
It’s possible this could change if e.g. the dag run is cleared to be rerun, or perhaps re-backfilled.
- set_state(state)[source]¶
Change the state of the DagRan.
Changes to attributes are implemented in accordance with the following table (rows represent old states, columns represent new states):
¶ QUEUED
RUNNING
SUCCESS
FAILED
None
queued_at = timezone.utcnow()
if empty: start_date = timezone.utcnow() end_date = None
end_date = timezone.utcnow()
end_date = timezone.utcnow()
QUEUED
queued_at = timezone.utcnow()
if empty: start_date = timezone.utcnow() end_date = None
end_date = timezone.utcnow()
end_date = timezone.utcnow()
RUNNING
queued_at = timezone.utcnow() start_date = None end_date = None
end_date = timezone.utcnow()
end_date = timezone.utcnow()
SUCCESS
queued_at = timezone.utcnow() start_date = None end_date = None
start_date = timezone.utcnow() end_date = None
FAILED
queued_at = timezone.utcnow() start_date = None end_date = None
start_date = timezone.utcnow() end_date = None
- refresh_from_db(session=NEW_SESSION)[source]¶
Reload the current dagrun from the database.
- Parameters
session (sqlalchemy.orm.Session) – database session
- classmethod find(dag_id=None, run_id=None, logical_date=None, state=None, external_trigger=None, no_backfills=False, run_type=None, session=NEW_SESSION, logical_start_date=None, logical_end_date=None)[source]¶
Return a set of dag runs for the given search criteria.
- Parameters
dag_id (str | list[str] | None) – the dag_id or list of dag_id to find dag runs for
run_id (collections.abc.Iterable[str] | None) – defines the run id for this dag run
run_type (airflow.utils.types.DagRunType | None) – type of DagRun
logical_date (datetime.datetime | collections.abc.Iterable[datetime.datetime] | None) – the logical date
state (airflow.utils.state.DagRunState | None) – the state of the dag run
external_trigger (bool | None) – whether this dag run is externally triggered
no_backfills (bool) – return no backfills (True), return all (False). Defaults to False
session (sqlalchemy.orm.Session) – database session
logical_start_date (datetime.datetime | None) – dag run that was executed from this date
logical_end_date (datetime.datetime | None) – dag run that was executed until this date
- classmethod find_duplicate(dag_id, run_id, *, session=NEW_SESSION)[source]¶
Return an existing run for the DAG with a specific run_id.
None is returned if no such DAG run is found.
- Parameters
dag_id (str) – the dag_id to find duplicates for
run_id (str) – defines the run id for this dag run
session (sqlalchemy.orm.Session) – database session
- static generate_run_id(run_type, logical_date)[source]¶
Generate Run ID based on Run Type and logical Date.
- static fetch_task_instances(dag_id=None, run_id=None, task_ids=None, state=None, session=NEW_SESSION)[source]¶
Return the task instances for this dag run.
- get_task_instances(state=None, session=NEW_SESSION)[source]¶
Return the task instances for this dag run.
Redirect to DagRun.fetch_task_instances method. Keep this method because it is widely used across the code.
- get_task_instance(task_id, session=NEW_SESSION, *, map_index=-1)[source]¶
Return the task instance specified by task_id for this dag run.
- Parameters
task_id (str) – the task id
session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session
- static fetch_task_instance(dag_id, dag_run_id, task_id, session=NEW_SESSION, map_index=-1)[source]¶
Return the task instance specified by task_id for this dag run.
- Parameters
dag_id (str) – the DAG id
dag_run_id (str) – the DAG run id
task_id (str) – the task id
session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session
- static get_previous_dagrun(dag_run, state=None, session=NEW_SESSION)[source]¶
Return the previous DagRun, if there is one.
- Parameters
dag_run (DagRun) – the dag run
session (sqlalchemy.orm.Session) – SQLAlchemy ORM Session
state (airflow.utils.state.DagRunState | None) – the dag run state
- static get_previous_scheduled_dagrun(dag_run_id, session=NEW_SESSION)[source]¶
Return the previous SCHEDULED DagRun, if there is one.
- Parameters
dag_run_id (int) – the DAG run ID
session (sqlalchemy.orm.Session) – SQLAlchemy ORM Session
- update_state(session=NEW_SESSION, execute_callbacks=True)[source]¶
Determine the overall state of the DagRun based on the state of its TaskInstances.
- Parameters
session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session
execute_callbacks (bool) – Should dag callbacks (success/failure, SLA etc.) be invoked directly (default: true) or recorded as a pending request in the
returned_callback
property
- Returns
Tuple containing tis that can be scheduled in the current loop & returned_callback that needs to be executed
- Return type
tuple[list[airflow.models.taskinstance.TaskInstance], airflow.callbacks.callback_requests.DagCallbackRequest | None]
- verify_integrity(*, session=NEW_SESSION)[source]¶
Verify the DagRun by checking for removed tasks or tasks that are not in the database yet.
It will set state to removed or add the task if required.
- Missing_indexes
A dictionary of task vs indexes that are missing.
- Parameters
session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session
- schedule_tis(schedulable_tis, session=NEW_SESSION, max_tis_per_query=None)[source]¶
Set the given task instances in to the scheduled state.
Each element of
schedulable_tis
should have itstask
attribute already set.Any EmptyOperator without callbacks or outlets is instead set straight to the success state.
All the TIs should belong to this DagRun, but this code is in the hot-path, this is not checked – it is the caller’s responsibility to call this function only with TIs from a single dag run.