airflow.providers.amazon.aws.sensors.emr

Module Contents

Classes

EmrBaseSensor

Contains general sensor behavior for EMR.

EmrServerlessJobSensor

Poll the state of the job run until it reaches a terminal state; fails if the job run fails.

EmrServerlessApplicationSensor

Poll the state of the application until it reaches a terminal state; fails if the application fails.

EmrContainerSensor

Poll the state of the job run until it reaches a terminal state; fail if the job run fails.

EmrNotebookExecutionSensor

Poll the EMR notebook until it reaches any of the target states; raise AirflowException on failure.

EmrJobFlowSensor

Poll the EMR JobFlow Cluster until it reaches any of the target states; raise AirflowException on failure.

EmrStepSensor

Poll the state of the step until it reaches any of the target states; raise AirflowException on failure.

class airflow.providers.amazon.aws.sensors.emr.EmrBaseSensor(*, aws_conn_id='aws_default', **kwargs)[source]

Bases: airflow.sensors.base.BaseSensorOperator

Contains general sensor behavior for EMR.

Subclasses should implement following methods:
  • get_emr_response()

  • state_from_response()

  • failure_message_from_response()

Subclasses should set target_states and failed_states fields.

Parameters

aws_conn_id (str | None) – The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node).

ui_color = '#66c3ff'[source]
hook()[source]
poke(context)[source]

Override when deriving this class.

abstract get_emr_response(context)[source]

Make an API call with boto3 and get response.

Returns

response

Return type

dict[str, Any]

abstract static state_from_response(response)[source]

Get state from boto3 response.

Parameters

response (dict[str, Any]) – response from AWS API

Returns

state

Return type

str

abstract static failure_message_from_response(response)[source]

Get state from boto3 response.

Parameters

response (dict[str, Any]) – response from AWS API

Returns

failure message

Return type

str | None

class airflow.providers.amazon.aws.sensors.emr.EmrServerlessJobSensor(*, application_id, job_run_id, target_states=frozenset(EmrServerlessHook.JOB_SUCCESS_STATES), aws_conn_id='aws_default', **kwargs)[source]

Bases: airflow.sensors.base.BaseSensorOperator

Poll the state of the job run until it reaches a terminal state; fails if the job run fails.

See also

For more information on how to use this sensor, take a look at the guide: Wait on an EMR Serverless Job state

Parameters
  • application_id (str) – application_id to check the state of

  • job_run_id (str) – job_run_id to check the state of

  • target_states (set | frozenset) – a set of states to wait for, defaults to ‘SUCCESS’

  • aws_conn_id (str | None) – aws connection to use, defaults to ‘aws_default’ If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node).

template_fields: collections.abc.Sequence[str] = ('application_id', 'job_run_id')[source]
poke(context)[source]

Override when deriving this class.

hook()[source]

Create and return an EmrServerlessHook.

static failure_message_from_response(response)[source]

Get failure message from response dictionary.

Parameters

response (dict[str, Any]) – response from AWS API

Returns

failure message

Return type

str | None

class airflow.providers.amazon.aws.sensors.emr.EmrServerlessApplicationSensor(*, application_id, target_states=frozenset(EmrServerlessHook.APPLICATION_SUCCESS_STATES), aws_conn_id='aws_default', **kwargs)[source]

Bases: airflow.sensors.base.BaseSensorOperator

Poll the state of the application until it reaches a terminal state; fails if the application fails.

See also

For more information on how to use this sensor, take a look at the guide: Wait on an EMR Serverless Application state

Parameters
  • application_id (str) – application_id to check the state of

  • target_states (set | frozenset) – a set of states to wait for, defaults to {‘CREATED’, ‘STARTED’}

  • aws_conn_id (str | None) – aws connection to use, defaults to ‘aws_default’ If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node).

template_fields: collections.abc.Sequence[str] = ('application_id',)[source]
poke(context)[source]

Override when deriving this class.

hook()[source]

Create and return an EmrServerlessHook.

static failure_message_from_response(response)[source]

Get failure message from response dictionary.

Parameters

response (dict[str, Any]) – response from AWS API

Returns

failure message

Return type

str | None

class airflow.providers.amazon.aws.sensors.emr.EmrContainerSensor(*, virtual_cluster_id, job_id, max_retries=None, aws_conn_id='aws_default', poll_interval=10, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

Bases: airflow.sensors.base.BaseSensorOperator

Poll the state of the job run until it reaches a terminal state; fail if the job run fails.

See also

For more information on how to use this sensor, take a look at the guide: Wait on an Amazon EMR virtual cluster job

Parameters
  • job_id (str) – job_id to check the state of

  • max_retries (int | None) – Number of times to poll for query state before returning the current state, defaults to None

  • aws_conn_id (str | None) – aws connection to use, defaults to ‘aws_default’ If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node).

  • poll_interval (int) – Time in seconds to wait between two consecutive call to check query status on athena, defaults to 10

  • deferrable (bool) – Run sensor in the deferrable mode.

INTERMEDIATE_STATES = ('PENDING', 'SUBMITTED', 'RUNNING')[source]
FAILURE_STATES = ('FAILED', 'CANCELLED', 'CANCEL_PENDING')[source]
SUCCESS_STATES = ('COMPLETED',)[source]
template_fields: collections.abc.Sequence[str] = ('virtual_cluster_id', 'job_id')[source]
template_ext: collections.abc.Sequence[str] = ()[source]
ui_color = '#66c3ff'[source]
hook()[source]
poke(context)[source]

Override when deriving this class.

execute(context)[source]

Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

execute_complete(context, event=None)[source]
class airflow.providers.amazon.aws.sensors.emr.EmrNotebookExecutionSensor(notebook_execution_id, target_states=None, failed_states=None, **kwargs)[source]

Bases: EmrBaseSensor

Poll the EMR notebook until it reaches any of the target states; raise AirflowException on failure.

See also

For more information on how to use this sensor, take a look at the guide: Wait on an EMR notebook execution state

Parameters

notebook_execution_id (str) – Unique id of the notebook execution to be poked.

Target_states

the states the sensor will wait for the execution to reach. Default target_states is FINISHED.

Failed_states

if the execution reaches any of the failed_states, the sensor will fail. Default failed_states is FAILED.

template_fields: collections.abc.Sequence[str] = ('notebook_execution_id',)[source]
FAILURE_STATES[source]
COMPLETED_STATES[source]
get_emr_response(context)[source]

Make an API call with boto3 and get response.

Returns

response

Return type

dict[str, Any]

static state_from_response(response)[source]

Make an API call with boto3 and get cluster-level details.

Returns

response

Return type

str

static failure_message_from_response(response)[source]

Get failure message from response dictionary.

Parameters

response (dict[str, Any]) – response from AWS API

Returns

failure message

Return type

str | None

class airflow.providers.amazon.aws.sensors.emr.EmrJobFlowSensor(*, job_flow_id, target_states=None, failed_states=None, max_attempts=60, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

Bases: EmrBaseSensor

Poll the EMR JobFlow Cluster until it reaches any of the target states; raise AirflowException on failure.

With the default target states, sensor waits cluster to be terminated. When target_states is set to [‘RUNNING’, ‘WAITING’] sensor waits until job flow to be ready (after ‘STARTING’ and ‘BOOTSTRAPPING’ states)

See also

For more information on how to use this sensor, take a look at the guide: Wait on an Amazon EMR job flow state

Parameters
  • job_flow_id (str) – job_flow_id to check the state of

  • target_states (collections.abc.Iterable[str] | None) – the target states, sensor waits until job flow reaches any of these states. In deferrable mode it would run until reach the terminal state.

  • failed_states (collections.abc.Iterable[str] | None) – the failure states, sensor fails when job flow reaches any of these states

  • max_attempts (int) – Maximum number of tries before failing

  • deferrable (bool) – Run sensor in the deferrable mode.

template_fields: collections.abc.Sequence[str] = ('job_flow_id', 'target_states', 'failed_states')[source]
template_ext: collections.abc.Sequence[str] = ()[source]
get_emr_response(context)[source]

Make an API call with boto3 and get cluster-level details.

Returns

response

Return type

dict[str, Any]

static state_from_response(response)[source]

Get state from response dictionary.

Parameters

response (dict[str, Any]) – response from AWS API

Returns

current state of the cluster

Return type

str

static failure_message_from_response(response)[source]

Get failure message from response dictionary.

Parameters

response (dict[str, Any]) – response from AWS API

Returns

failure message

Return type

str | None

execute(context)[source]

Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

execute_complete(context, event=None)[source]
class airflow.providers.amazon.aws.sensors.emr.EmrStepSensor(*, job_flow_id, step_id, target_states=None, failed_states=None, max_attempts=60, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

Bases: EmrBaseSensor

Poll the state of the step until it reaches any of the target states; raise AirflowException on failure.

With the default target states, sensor waits step to be completed.

See also

For more information on how to use this sensor, take a look at the guide: Wait on an Amazon EMR step state

Parameters
  • job_flow_id (str) – job_flow_id which contains the step check the state of

  • step_id (str) – step to check the state of

  • target_states (collections.abc.Iterable[str] | None) – the target states, sensor waits until step reaches any of these states. In case of deferrable sensor it will for reach to terminal state

  • failed_states (collections.abc.Iterable[str] | None) – the failure states, sensor fails when step reaches any of these states

  • max_attempts (int) – Maximum number of tries before failing

  • deferrable (bool) – Run sensor in the deferrable mode.

template_fields: collections.abc.Sequence[str] = ('job_flow_id', 'step_id', 'target_states', 'failed_states')[source]
template_ext: collections.abc.Sequence[str] = ()[source]
get_emr_response(context)[source]

Make an API call with boto3 and get details about the cluster step.

Returns

response

Return type

dict[str, Any]

static state_from_response(response)[source]

Get state from response dictionary.

Parameters

response (dict[str, Any]) – response from AWS API

Returns

execution state of the cluster step

Return type

str

static failure_message_from_response(response)[source]

Get failure message from response dictionary.

Parameters

response (dict[str, Any]) – response from AWS API

Returns

failure message

Return type

str | None

execute(context)[source]

Derive when creating an operator.

Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

execute_complete(context, event=None)[source]

Was this entry helpful?