Amazon EMR

Amazon EMR (previously called Amazon Elastic MapReduce) is a managed cluster platform that simplifies running big data frameworks, such as Apache Hadoop and Apache Spark, on AWS to process and analyze vast amounts of data. Using these frameworks and related open-source projects, you can process data for analytics purposes and business intelligence workloads. Amazon EMR also lets you transform and move large amounts of data into and out of other AWS data stores and databases, such as Amazon Simple Storage Service (Amazon S3) and Amazon DynamoDB.

Prerequisite Tasks

To use these operators, you must do a few things:

Operators

Note

In order to run the examples successfully, you need to create the IAM Service Roles(EMR_EC2_DefaultRole and EMR_DefaultRole) for Amazon EMR. You can create these roles using the AWS CLI: aws emr create-default-roles.

Create an EMR job flow

You can use EmrCreateJobFlowOperator to create a new EMR job flow. The cluster will be terminated automatically after finishing the steps.

The default behaviour is to mark the DAG Task node as success as soon as the cluster is launched (wait_policy=None). It is possible to modify this behaviour by using a different wait_policy. Available options are:

  • WaitPolicy.WAIT_FOR_COMPLETION - DAG Task node waits for the cluster to be running

  • WaitPolicy.WAIT_FOR_STEPS_COMPLETION - DAG Task node waits for the cluster to terminate

This operator can be run in deferrable mode by passing deferrable=True as a parameter. Using deferrable mode will release worker slots and leads to efficient utilization of resources within Airflow cluster.However this mode will need the Airflow triggerer to be available in your deployment.

JobFlow configuration

To create a job flow on EMR, you need to specify the configuration for the EMR cluster:

tests/system/amazon/aws/example_emr.py

SPARK_STEPS = [
    {
        "Name": "calculate_pi",
        "ActionOnFailure": "CONTINUE",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "10"],
        },
    }
]

JOB_FLOW_OVERRIDES: dict[str, Any] = {
    "Name": "PiCalc",
    "ReleaseLabel": "emr-7.1.0",
    "Applications": [{"Name": "Spark"}],
    "Instances": {
        "InstanceGroups": [
            {
                "Name": "Primary node",
                "Market": "ON_DEMAND",
                "InstanceRole": "MASTER",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 1,
            },
        ],
        # If the EMR steps complete too quickly the cluster will be torn down before the other system test
        # tasks have a chance to run (such as the modify cluster step, the addition of more EMR steps, etc).
        # Set KeepJobFlowAliveWhenNoSteps to False to avoid the cluster from being torn down prematurely.
        "KeepJobFlowAliveWhenNoSteps": True,
        "TerminationProtected": False,
    },
    "Steps": SPARK_STEPS,
    "JobFlowRole": "EMR_EC2_DefaultRole",
    "ServiceRole": "EMR_DefaultRole",
}

Here we create an EMR single-node Cluster PiCalc. It only has a single step calculate_pi which calculates the value of Pi using Spark. The config 'KeepJobFlowAliveWhenNoSteps': False tells the cluster to shut down after the step is finished. Alternatively, a config without a Steps value can be used and Steps can be added at a later date using EmrAddStepsOperator. See details below.

Note

EMR clusters launched with the EMR API like this one are not visible to all users by default, so you may not see the cluster in the EMR Management Console - you can change this by adding 'VisibleToAllUsers': True at the end of the JOB_FLOW_OVERRIDES dict.

For more config information, please refer to Boto3 EMR client.

Create the Job Flow

In the following code we are creating a new job flow using the configuration as explained above.

tests/system/amazon/aws/example_emr.py

create_job_flow = EmrCreateJobFlowOperator(
    task_id="create_job_flow",
    job_flow_overrides=JOB_FLOW_OVERRIDES,
)

Add Steps to an EMR job flow

To add steps to an existing EMR Job flow you can use EmrAddStepsOperator. This operator can be run in deferrable mode by passing deferrable=True as a parameter. Using deferrable mode will release worker slots and leads to efficient utilization of resources within Airflow cluster.However this mode will need the Airflow triggerer to be available in your deployment.

tests/system/amazon/aws/example_emr.py

add_steps = EmrAddStepsOperator(
    task_id="add_steps",
    job_flow_id=create_job_flow.output,
    steps=SPARK_STEPS,
    execution_role_arn=execution_role_arn,
)

Terminate an EMR job flow

To terminate an EMR Job Flow you can use EmrTerminateJobFlowOperator. This operator can be run in deferrable mode by passing deferrable=True as a parameter. Using deferrable mode will release worker slots and leads to efficient utilization of resources within Airflow cluster.However this mode will need the Airflow triggerer to be available in your deployment.

tests/system/amazon/aws/example_emr.py

remove_cluster = EmrTerminateJobFlowOperator(
    task_id="remove_cluster",
    job_flow_id=create_job_flow.output,
)

Modify Amazon EMR container

To modify an existing EMR container you can use EmrContainerSensor.

tests/system/amazon/aws/example_emr.py

modify_cluster = EmrModifyClusterOperator(
    task_id="modify_cluster", cluster_id=create_job_flow.output, step_concurrency_level=1
)

Start an EMR notebook execution

You can use EmrStartNotebookExecutionOperator to start a notebook execution on an existing notebook attached to a running cluster.

tests/system/amazon/aws/example_emr_notebook_execution.py

start_execution = EmrStartNotebookExecutionOperator(
    task_id="start_execution",
    editor_id=editor_id,
    cluster_id=cluster_id,
    relative_path="EMR-System-Test.ipynb",
    service_role="EMR_Notebooks_DefaultRole",
)

Stop an EMR notebook execution

You can use EmrStopNotebookExecutionOperator to stop a running notebook execution.

tests/system/amazon/aws/example_emr_notebook_execution.py

stop_execution = EmrStopNotebookExecutionOperator(
    task_id="stop_execution",
    notebook_execution_id=notebook_execution_id_1,
)

Sensors

Wait on an EMR notebook execution state

To monitor the state of an EMR notebook execution you can use EmrNotebookExecutionSensor.

tests/system/amazon/aws/example_emr_notebook_execution.py

wait_for_execution_start = EmrNotebookExecutionSensor(
    task_id="wait_for_execution_start",
    notebook_execution_id=notebook_execution_id_1,
    target_states={"RUNNING"},
    poke_interval=5,
)

Wait on an Amazon EMR job flow state

To monitor the state of an EMR job flow you can use EmrJobFlowSensor.

tests/system/amazon/aws/example_emr.py

check_job_flow = EmrJobFlowSensor(task_id="check_job_flow", job_flow_id=create_job_flow.output)

Wait on an Amazon EMR step state

To monitor the state of an EMR job step you can use EmrStepSensor.

tests/system/amazon/aws/example_emr.py

wait_for_step = EmrStepSensor(
    task_id="wait_for_step",
    job_flow_id=create_job_flow.output,
    step_id=get_step_id(add_steps.output),
)

Throttling

Amazon EMR has relatively low service quotas, which can be viewed in detail here. As a consequence, you might experience throttling issues when using any of the operators and sensors listed in this page. To circumvent this limitation, consider customizing the AWS connection configuration to modify the default Boto3 retry strategy. See AWS connection configuration documentation.

Was this entry helpful?