AWS DataSync

AWS DataSync is a data-transfer service that simplifies, automates, and accelerates moving and replicating data between on-premises storage systems and AWS storage services over the internet or AWS Direct Connect.

Prerequisite Tasks

To use these operators, you must do a few things:

Generic Parameters

aws_conn_id

Reference to Amazon Web Services Connection ID. If this parameter is set to None then the default boto3 behaviour is used without a connection lookup. Otherwise use the credentials stored in the Connection. Default: aws_default

region_name

AWS Region Name. If this parameter is set to None or omitted then region_name from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default: None

verify

Whether or not to verify SSL certificates.

  • False - Do not validate SSL certificates.

  • path/to/cert/bundle.pem - A filename of the CA cert bundle to use. You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.

If this parameter is set to None or is omitted then verify from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default: None

botocore_config

The provided dictionary is used to construct a botocore.config.Config. This configuration can be used to configure Avoid Throttling exceptions, timeouts, etc.

Example, for more detail about parameters please have a look botocore.config.Config
{
    "signature_version": "unsigned",
    "s3": {
        "us_east_1_regional_endpoint": True,
    },
    "retries": {
      "mode": "standard",
      "max_attempts": 10,
    },
    "connect_timeout": 300,
    "read_timeout": 300,
    "tcp_keepalive": True,
}

If this parameter is set to None or omitted then config_kwargs from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default: None

Note

Specifying an empty dictionary, {}, will overwrite the connection configuration for botocore.config.Config

Operators

Interact with AWS DataSync Tasks

You can use DataSyncOperator to find, create, update, execute and delete AWS DataSync tasks.

Once the DataSyncOperator has identified the correct TaskArn to run (either because you specified it, or because it was found), it will then be executed. Whenever an AWS DataSync Task is executed it creates an AWS DataSync TaskExecution, identified by a TaskExecutionArn.

The TaskExecutionArn will be monitored until completion (success / failure), and its status will be periodically written to the Airflow task log.

The DataSyncOperator supports optional passing of additional kwargs to the underlying boto3.start_task_execution() API. This is done with the task_execution_kwargs parameter. This is useful for example to limit bandwidth or filter included files, see the boto3 Datasync documentation for more details.

Execute a task

To execute a specific task, you can pass the task_arn to the operator.

tests/system/amazon/aws/example_datasync.py

# Execute a specific task
execute_task_by_arn = DataSyncOperator(
    task_id="execute_task_by_arn",
    task_arn=created_task_arn,
)

Search and execute a task

To search for a task, you can specify the source_location_uri and destination_location_uri to the operator. If one task is found, this one will be executed. If more than one task is found, the operator will raise an Exception. To avoid this, you can set allow_random_task_choice to True to randomly choose from candidate tasks.

tests/system/amazon/aws/example_datasync.py

# Search and execute a task
execute_task_by_locations = DataSyncOperator(
    task_id="execute_task_by_locations",
    source_location_uri=f"s3://{s3_bucket_source}/test",
    destination_location_uri=f"s3://{s3_bucket_destination}/test",
    # Only transfer files from /test/subdir folder
    task_execution_kwargs={
        "Includes": [{"FilterType": "SIMPLE_PATTERN", "Value": "/test/subdir"}],
    },
)

Create and execute a task

When searching for a task, if no task is found you have the option to create one before executing it. In order to do that, you need to provide the extra parameters create_task_kwargs, create_source_location_kwargs and create_destination_location_kwargs.

These extra parameters provide a way for the operator to automatically create a Task and/or Locations if no suitable existing Task was found. If these are left to their default value (None) then no create will be attempted.

Also, because delete_task_after_execution is set to True, the task will be deleted from AWS DataSync after it completes successfully.

tests/system/amazon/aws/example_datasync.py

# Create a task (the task does not exist)
create_and_execute_task = DataSyncOperator(
    task_id="create_and_execute_task",
    source_location_uri=f"s3://{s3_bucket_source}/test_create",
    destination_location_uri=f"s3://{s3_bucket_destination}/test_create",
    create_task_kwargs={"Name": "Created by Airflow"},
    create_source_location_kwargs={
        "Subdirectory": "test_create",
        "S3BucketArn": get_s3_bucket_arn(s3_bucket_source),
        "S3Config": {
            "BucketAccessRoleArn": test_context[ROLE_ARN_KEY],
        },
    },
    create_destination_location_kwargs={
        "Subdirectory": "test_create",
        "S3BucketArn": get_s3_bucket_arn(s3_bucket_destination),
        "S3Config": {
            "BucketAccessRoleArn": test_context[ROLE_ARN_KEY],
        },
    },
    delete_task_after_execution=False,
)

When creating a Task, the DataSyncOperator will try to find and use existing LocationArns rather than creating new ones. If multiple LocationArns match the specified URIs then we need to choose one to use. In this scenario, the operator behaves similarly to how it chooses a single Task from many Tasks:

The operator will raise an Exception. To avoid this, you can set allow_random_location_choice to True to randomly choose from candidate Locations.

Was this entry helpful?