Amazon SageMaker

Amazon SageMaker is a fully managed machine learning service. With Amazon SageMaker, data scientists and developers can quickly build and train machine learning models, and then deploy them into a production-ready hosted environment.

Airflow provides operators to create and interact with SageMaker Jobs and Pipelines.

Prerequisite Tasks

To use these operators, you must do a few things:

Operators

Create an Amazon SageMaker processing job

To create an Amazon Sagemaker processing job to sanitize your dataset you can use SageMakerProcessingOperator.

tests/system/amazon/aws/example_sagemaker.py

preprocess_raw_data = SageMakerProcessingOperator(
    task_id="preprocess_raw_data",
    config=test_setup["processing_config"],
)

# SageMakerProcessingOperator waits by default, setting as False to test the Sensor below.
preprocess_raw_data.wait_for_completion = False

Create an Amazon SageMaker training job

To create an Amazon Sagemaker training job you can use SageMakerTrainingOperator.

tests/system/amazon/aws/example_sagemaker.py

train_model = SageMakerTrainingOperator(
    task_id="train_model",
    config=test_setup["training_config"],
)

Create an Amazon SageMaker model

To create an Amazon Sagemaker model you can use SageMakerModelOperator.

tests/system/amazon/aws/example_sagemaker.py

create_model = SageMakerModelOperator(
    task_id="create_model",
    config=test_setup["model_config"],
)

Start a hyperparameter tuning job

To start a hyperparameter tuning job for an Amazon Sagemaker model you can use SageMakerTuningOperator.

tests/system/amazon/aws/example_sagemaker.py

tune_model = SageMakerTuningOperator(
    task_id="tune_model",
    config=test_setup["tuning_config"],
)

Delete an Amazon SageMaker model

To delete an Amazon Sagemaker model you can use SageMakerDeleteModelOperator.

tests/system/amazon/aws/example_sagemaker.py

delete_model = SageMakerDeleteModelOperator(
    task_id="delete_model",
    config={"ModelName": test_setup["model_name"]},
)

Create an Amazon SageMaker transform job

To create an Amazon Sagemaker transform job you can use SageMakerTransformOperator.

tests/system/amazon/aws/example_sagemaker.py

test_model = SageMakerTransformOperator(
    task_id="test_model",
    config=test_setup["transform_config"],
)

Create an Amazon SageMaker endpoint config job

To create an Amazon Sagemaker endpoint config job you can use SageMakerEndpointConfigOperator.

tests/system/amazon/aws/example_sagemaker_endpoint.py

configure_endpoint = SageMakerEndpointConfigOperator(
    task_id="configure_endpoint",
    config=test_setup["endpoint_config_config"],
)

Create an Amazon SageMaker endpoint job

To create an Amazon Sagemaker endpoint you can use SageMakerEndpointOperator.

tests/system/amazon/aws/example_sagemaker_endpoint.py

deploy_endpoint = SageMakerEndpointOperator(
    task_id="deploy_endpoint",
    config=test_setup["deploy_endpoint_config"],
)

Start an Amazon SageMaker pipeline execution

To trigger an execution run for an already-defined Amazon Sagemaker pipeline, you can use SageMakerStartPipelineOperator.

tests/system/amazon/aws/example_sagemaker_pipeline.py

start_pipeline1 = SageMakerStartPipelineOperator(
    task_id="start_pipeline1",
    pipeline_name=pipeline_name,
)

Stop an Amazon SageMaker pipeline execution

To stop an Amazon Sagemaker pipeline execution that is currently running, you can use SageMakerStopPipelineOperator.

tests/system/amazon/aws/example_sagemaker_pipeline.py

stop_pipeline1 = SageMakerStopPipelineOperator(
    task_id="stop_pipeline1",
    pipeline_exec_arn=start_pipeline1.output,
)

Register a Sagemaker Model Version

To register a model version, you can use SageMakerRegisterModelVersionOperator. The result of executing this operator is a model package. A model package is a reusable model artifacts abstraction that packages all ingredients necessary for inference. It consists of an inference specification that defines the inference image to use along with a model weights location. A model package group is a collection of model packages. You can use this operator to add a new version and model package to the group for every DAG run.

tests/system/amazon/aws/example_sagemaker.py

register_model = SageMakerRegisterModelVersionOperator(
    task_id="register_model",
    image_uri=test_setup["inference_code_image"],
    model_url=test_setup["model_trained_weights"],
    package_group_name=test_setup["model_package_group_name"],
)

Launch an AutoML experiment

To launch an AutoML experiment, a.k.a. SageMaker Autopilot, you can use SageMakerAutoMLOperator. An AutoML experiment will take some input data in CSV and the column it should learn to predict, and train models on it without needing human supervision. The output is placed in an S3 bucket, and automatically deployed if configured for it.

tests/system/amazon/aws/example_sagemaker.py

automl = SageMakerAutoMLOperator(
    task_id="auto_ML",
    job_name=test_setup["auto_ml_job_name"],
    s3_input=test_setup["input_data_uri"],
    target_attribute="class",
    s3_output=test_setup["output_data_uri"],
    role_arn=test_context[ROLE_ARN_KEY],
    time_limit=30,  # will stop the job before it can do anything, but it's not the point here
)

Create an Experiment for later use

To create a SageMaker experiment, you can use SageMakerCreateExperimentOperator. This creates an experiment so that it’s ready to be associated with processing, training and transform jobs.

tests/system/amazon/aws/example_sagemaker.py

create_experiment = SageMakerCreateExperimentOperator(
    task_id="create_experiment", name=test_setup["experiment_name"]
)

Create a SageMaker Notebook Instance

To create a SageMaker Notebook Instance , you can use SageMakerCreateNotebookOperator. This creates a SageMaker Notebook Instance ready to run Jupyter notebooks.

tests/system/amazon/aws/example_sagemaker_notebook.py

instance = SageMakerCreateNotebookOperator(
    task_id="create_instance",
    instance_name=instance_name,
    instance_type="ml.t3.medium",
    role_arn=role_arn,
    wait_for_completion=True,
)

Stop a SageMaker Notebook Instance

To terminate SageMaker Notebook Instance , you can use SageMakerStopNotebookOperator. This terminates the ML compute instance and disconnects the ML storage volume.

tests/system/amazon/aws/example_sagemaker_notebook.py

stop_instance = SageMakerStopNotebookOperator(
    task_id="stop_instance",
    instance_name=instance_name,
)

Start a SageMaker Notebook Instance

To launch a SageMaker Notebook Instance and re-attach an ML storage volume, you can use SageMakerStartNotebookOperator. This launches a new ML compute instance with the latest version of the libraries and attached your ML storage volume.

tests/system/amazon/aws/example_sagemaker_notebook.py

start_instance = SageMakerStartNoteBookOperator(
    task_id="start_instance",
    instance_name=instance_name,
)

Delete a SageMaker Notebook Instance

To delete a SageMaker Notebook Instance, you can use SageMakerDeleteNotebookOperator. This terminates the instance and deletes the ML storage volume and network interface associated with the instance. The instance must be stopped before it can be deleted.

tests/system/amazon/aws/example_sagemaker_notebook.py

delete_instance = SageMakerDeleteNotebookOperator(task_id="delete_instance", instance_name=instance_name)

Sensors

Wait on an Amazon SageMaker training job state

To check the state of an Amazon Sagemaker training job until it reaches a terminal state you can use SageMakerTrainingSensor.

tests/system/amazon/aws/example_sagemaker.py

await_training = SageMakerTrainingSensor(
    task_id="await_training",
    job_name=test_setup["training_job_name"],
)

Wait on an Amazon SageMaker transform job state

To check the state of an Amazon Sagemaker transform job until it reaches a terminal state you can use SageMakerTransformOperator.

tests/system/amazon/aws/example_sagemaker.py

await_transform = SageMakerTransformSensor(
    task_id="await_transform",
    job_name=test_setup["transform_job_name"],
)

Wait on an Amazon SageMaker tuning job state

To check the state of an Amazon Sagemaker hyperparameter tuning job until it reaches a terminal state you can use SageMakerTuningSensor.

tests/system/amazon/aws/example_sagemaker.py

await_tuning = SageMakerTuningSensor(
    task_id="await_tuning",
    job_name=test_setup["tuning_job_name"],
)

Wait on an Amazon SageMaker endpoint state

To check the state of an Amazon Sagemaker endpoint until it reaches a terminal state you can use SageMakerEndpointSensor.

tests/system/amazon/aws/example_sagemaker_endpoint.py

await_endpoint = SageMakerEndpointSensor(
    task_id="await_endpoint",
    endpoint_name=test_setup["endpoint_name"],
)

Wait on an Amazon SageMaker pipeline execution state

To check the state of an Amazon Sagemaker pipeline execution until it reaches a terminal state you can use SageMakerPipelineSensor.

tests/system/amazon/aws/example_sagemaker_pipeline.py

await_pipeline2 = SageMakerPipelineSensor(
    task_id="await_pipeline2",
    pipeline_exec_arn=start_pipeline2.output,
)

Wait on an Amazon SageMaker AutoML experiment state

To check the state of an Amazon Sagemaker AutoML job until it reaches a terminal state you can use SageMakerAutoMLSensor.

tests/system/amazon/aws/example_sagemaker.py

automl = SageMakerAutoMLOperator(
    task_id="auto_ML",
    job_name=test_setup["auto_ml_job_name"],
    s3_input=test_setup["input_data_uri"],
    target_attribute="class",
    s3_output=test_setup["output_data_uri"],
    role_arn=test_context[ROLE_ARN_KEY],
    time_limit=30,  # will stop the job before it can do anything, but it's not the point here
)

Wait on an Amazon SageMaker processing job state

To check the state of an Amazon Sagemaker processing job until it reaches a terminal state you can use SageMakerProcessingSensor.

tests/system/amazon/aws/example_sagemaker.py

await_preprocess = SageMakerProcessingSensor(
    task_id="await_preprocess", job_name=test_setup["processing_job_name"]
)

Was this entry helpful?