airflow.models.dagrun

Module Contents

Classes

TISchedulingDecision

Type of return for DagRun.task_instance_scheduling_decisions.

DagRun

Invocation instance of a DAG.

DagRunNote

For storage of arbitrary notes concerning the dagrun instance.

Attributes

CreatedTasks

RUN_ID_REGEX

airflow.models.dagrun.CreatedTasks[source]
airflow.models.dagrun.RUN_ID_REGEX = '^(?:manual|scheduled|asset_triggered)__(?:\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\+00:00)$'[source]
class airflow.models.dagrun.TISchedulingDecision[source]

Bases: NamedTuple

Type of return for DagRun.task_instance_scheduling_decisions.

tis: list[airflow.models.taskinstance.TaskInstance][source]
schedulable_tis: list[airflow.models.taskinstance.TaskInstance][source]
changed_tis: bool[source]
unfinished_tis: list[airflow.models.taskinstance.TaskInstance][source]
finished_tis: list[airflow.models.taskinstance.TaskInstance][source]
class airflow.models.dagrun.DagRun(dag_id=None, run_id=None, queued_at=NOTSET, logical_date=None, start_date=None, external_trigger=None, conf=None, state=None, run_type=None, creating_job_id=None, data_interval=None, triggered_by=None, backfill_id=None, dag_version=None)[source]

Bases: airflow.models.base.Base, airflow.utils.log.logging_mixin.LoggingMixin

Invocation instance of a DAG.

A DAG run can be created by the scheduler (i.e. scheduled runs), or by an external trigger (i.e. manual runs).

property stats_tags: dict[str, str][source]
property state[source]
__tablename__ = 'dag_run'[source]
id[source]
dag_id[source]
queued_at[source]
logical_date[source]
start_date[source]
end_date[source]
run_id[source]
creating_job_id[source]
external_trigger[source]
run_type[source]
triggered_by[source]
conf[source]
data_interval_start[source]
data_interval_end[source]
last_scheduling_decision[source]
log_template_id[source]
updated_at[source]
clear_number[source]
backfill_id[source]

The backfill this DagRun is currently associated with.

It’s possible this could change if e.g. the dag run is cleared to be rerun, or perhaps re-backfilled.

dag_version_id[source]
dag_version[source]
__table_args__ = ()[source]
task_instances[source]
dag_model[source]
dag_run_note[source]
backfill[source]
backfill_max_active_runs[source]
max_active_runs[source]
note[source]
DEFAULT_DAGRUNS_TO_EXAMINE[source]
__repr__()[source]

Return repr(self).

validate_run_id(key, run_id)[source]
get_state()[source]
set_state(state)[source]

Change the state of the DagRan.

Changes to attributes are implemented in accordance with the following table (rows represent old states, columns represent new states):

State transition matrix

QUEUED

RUNNING

SUCCESS

FAILED

None

queued_at = timezone.utcnow()

if empty: start_date = timezone.utcnow() end_date = None

end_date = timezone.utcnow()

end_date = timezone.utcnow()

QUEUED

queued_at = timezone.utcnow()

if empty: start_date = timezone.utcnow() end_date = None

end_date = timezone.utcnow()

end_date = timezone.utcnow()

RUNNING

queued_at = timezone.utcnow() start_date = None end_date = None

end_date = timezone.utcnow()

end_date = timezone.utcnow()

SUCCESS

queued_at = timezone.utcnow() start_date = None end_date = None

start_date = timezone.utcnow() end_date = None

FAILED

queued_at = timezone.utcnow() start_date = None end_date = None

start_date = timezone.utcnow() end_date = None

refresh_from_db(session=NEW_SESSION)[source]

Reload the current dagrun from the database.

Parameters

session (sqlalchemy.orm.Session) – database session

classmethod find(dag_id=None, run_id=None, logical_date=None, state=None, external_trigger=None, no_backfills=False, run_type=None, session=NEW_SESSION, logical_start_date=None, logical_end_date=None)[source]

Return a set of dag runs for the given search criteria.

Parameters
classmethod find_duplicate(dag_id, run_id, *, session=NEW_SESSION)[source]

Return an existing run for the DAG with a specific run_id.

None is returned if no such DAG run is found.

Parameters
  • dag_id (str) – the dag_id to find duplicates for

  • run_id (str) – defines the run id for this dag run

  • session (sqlalchemy.orm.Session) – database session

static generate_run_id(run_type, logical_date)[source]

Generate Run ID based on Run Type and logical Date.

static fetch_task_instances(dag_id=None, run_id=None, task_ids=None, state=None, session=NEW_SESSION)[source]

Return the task instances for this dag run.

get_task_instances(state=None, session=NEW_SESSION)[source]

Return the task instances for this dag run.

Redirect to DagRun.fetch_task_instances method. Keep this method because it is widely used across the code.

get_task_instance(task_id, session=NEW_SESSION, *, map_index=-1)[source]

Return the task instance specified by task_id for this dag run.

Parameters
static fetch_task_instance(dag_id, dag_run_id, task_id, session=NEW_SESSION, map_index=-1)[source]

Return the task instance specified by task_id for this dag run.

Parameters
  • dag_id (str) – the DAG id

  • dag_run_id (str) – the DAG run id

  • task_id (str) – the task id

  • session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session

get_dag()[source]

Return the Dag associated with this DagRun.

Returns

DAG

Return type

airflow.models.dag.DAG

static get_previous_dagrun(dag_run, state=None, session=NEW_SESSION)[source]

Return the previous DagRun, if there is one.

Parameters
static get_previous_scheduled_dagrun(dag_run_id, session=NEW_SESSION)[source]

Return the previous SCHEDULED DagRun, if there is one.

Parameters
update_state(session=NEW_SESSION, execute_callbacks=True)[source]

Determine the overall state of the DagRun based on the state of its TaskInstances.

Parameters
  • session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session

  • execute_callbacks (bool) – Should dag callbacks (success/failure, SLA etc.) be invoked directly (default: true) or recorded as a pending request in the returned_callback property

Returns

Tuple containing tis that can be scheduled in the current loop & returned_callback that needs to be executed

Return type

tuple[list[airflow.models.taskinstance.TaskInstance], airflow.callbacks.callback_requests.DagCallbackRequest | None]

task_instance_scheduling_decisions(session=NEW_SESSION)[source]
notify_dagrun_state_changed(msg='')[source]
verify_integrity(*, session=NEW_SESSION)[source]

Verify the DagRun by checking for removed tasks or tasks that are not in the database yet.

It will set state to removed or add the task if required.

Missing_indexes

A dictionary of task vs indexes that are missing.

Parameters

session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session

classmethod get_latest_runs(session=NEW_SESSION)[source]

Return the latest DagRun for each DAG.

schedule_tis(schedulable_tis, session=NEW_SESSION, max_tis_per_query=None)[source]

Set the given task instances in to the scheduled state.

Each element of schedulable_tis should have its task attribute already set.

Any EmptyOperator without callbacks or outlets is instead set straight to the success state.

All the TIs should belong to this DagRun, but this code is in the hot-path, this is not checked – it is the caller’s responsibility to call this function only with TIs from a single dag run.

get_log_template(*, session=NEW_SESSION)[source]
class airflow.models.dagrun.DagRunNote(content, user_id=None)[source]

Bases: airflow.models.base.Base

For storage of arbitrary notes concerning the dagrun instance.

__tablename__ = 'dag_run_note'[source]
user_id[source]
dag_run_id[source]
content[source]
created_at[source]
updated_at[source]
dag_run[source]
__table_args__ = ()[source]
__repr__()[source]

Return repr(self).

Was this entry helpful?