Google Cloud Transfer Service Operators

Prerequisite Tasks

To use these operators, you must do a few things:

CloudDataTransferServiceCreateJobOperator

Create a transfer job.

The function accepts dates in two formats:

  • consistent with Google API

    { "year": 2019, "month": 2, "day": 11 }
    
  • as an datetime object

The function accepts time in two formats:

  • consistent with Google API

    { "hours": 12, "minutes": 30, "seconds": 0 }
    
  • as an time object

If you want to create a transfer job that copies data from AWS S3, you must have an AWS connection configured. Information about configuration for AWS is available in Amazon Web Services Connection.

For parameter definition, take a look at CloudDataTransferServiceCreateJobOperator.

Using the operator

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py[source]

gcs_to_gcs_transfer_body = {
    DESCRIPTION: "description",
    STATUS: GcpTransferJobsStatus.ENABLED,
    PROJECT_ID: PROJECT_ID_TRANSFER,
    SCHEDULE: {
        SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
        SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
        START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + timedelta(seconds=120)).time(),
    },
    TRANSFER_SPEC: {
        GCS_DATA_SOURCE: {BUCKET_NAME: BUCKET_NAME_SRC},
        GCS_DATA_SINK: {BUCKET_NAME: BUCKET_NAME_DST},
        TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
    },
}

Note

For AWS S3 sources, pass aws_conn_id to the operator.

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py[source]

aws_to_gcs_transfer_body = {
    DESCRIPTION: GCP_DESCRIPTION,
    STATUS: GcpTransferJobsStatus.ENABLED,
    PROJECT_ID: GCP_PROJECT_ID,
    JOB_NAME: GCP_TRANSFER_JOB_NAME,
    SCHEDULE: {
        SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
        SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
        START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + timedelta(minutes=1)).time(),
    },
    TRANSFER_SPEC: {
        AWS_S3_DATA_SOURCE: {BUCKET_NAME: BUCKET_SOURCE_AWS},
        GCS_DATA_SINK: {BUCKET_NAME: BUCKET_TARGET_GCS},
        TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
    },
}

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py[source]

create_transfer_job_s3_to_gcs = CloudDataTransferServiceCreateJobOperator(
    task_id="create_transfer_job_s3_to_gcs", body=aws_to_gcs_transfer_body
)

Templating

template_fields: Sequence[str] = (
    "body",
    "gcp_conn_id",
    "aws_conn_id",
    "google_impersonation_chain",
)

More information

See Google Cloud Transfer Service - Method: transferJobs.create.

CloudDataTransferServiceDeleteJobOperator

Deletes a transfer job.

For parameter definition, take a look at CloudDataTransferServiceDeleteJobOperator.

Using the operator

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py[source]

delete_transfer_job_s3_to_gcs = CloudDataTransferServiceDeleteJobOperator(
    task_id="delete_transfer_job_s3_to_gcs",
    job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
    project_id=GCP_PROJECT_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

Templating

template_fields: Sequence[str] = (
    "job_name",
    "project_id",
    "gcp_conn_id",
    "api_version",
    "google_impersonation_chain",
)

More information

See Google Cloud Transfer Service - REST Resource: transferJobs - Status

CloudDataTransferServiceRunJobOperator

Runs a transfer job.

For parameter definition, take a look at CloudDataTransferServiceRunJobOperator.

Using the operator

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py[source]

run_transfer = CloudDataTransferServiceRunJobOperator(
    task_id="run_transfer",
    job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
    project_id=PROJECT_ID_TRANSFER,
)

Templating

template_fields: Sequence[str] = (
    "job_name",
    "project_id",
    "gcp_conn_id",
    "api_version",
    "google_impersonation_chain",
)

More information

See Google Cloud Transfer Service - REST Resource: transferJobs - Run

CloudDataTransferServiceUpdateJobOperator

Updates a transfer job.

For AWS S3 sources you must have an AWS connection configured. Information about configuration for AWS is available in Amazon Web Services Connection.

For parameter definition, take a look at CloudDataTransferServiceUpdateJobOperator.

Using the operator

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py[source]

update_body = {
    PROJECT_ID: PROJECT_ID_TRANSFER,
    TRANSFER_JOB: {DESCRIPTION: "description_updated"},
    TRANSFER_JOB_FIELD_MASK: "description",
}

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py[source]

update_transfer = CloudDataTransferServiceUpdateJobOperator(
    task_id="update_transfer",
    job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
    body=update_body,
)

Note

For AWS S3 updates, pass aws_conn_id and include transferSpec in the update payload. If your spec uses an IAM role (for example roleArn), static credentials are not injected.

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py[source]

update_body = {
    PROJECT_ID: GCP_PROJECT_ID,
    TRANSFER_JOB: {
        DESCRIPTION: "description_updated",
        TRANSFER_SPEC: {
            AWS_S3_DATA_SOURCE: {BUCKET_NAME: BUCKET_SOURCE_AWS},
            GCS_DATA_SINK: {BUCKET_NAME: BUCKET_TARGET_GCS},
            TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
        },
    },
    TRANSFER_JOB_FIELD_MASK: "description,transferSpec",
}

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py[source]

update_transfer_job_s3_to_gcs = CloudDataTransferServiceUpdateJobOperator(
    task_id="update_transfer_job_s3_to_gcs",
    job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
    body=update_body,
)

Templating

template_fields: Sequence[str] = (
    "job_name",
    "body",
    "gcp_conn_id",
    "aws_conn_id",
    "google_impersonation_chain",
)

More information

See Google Cloud Transfer Service - Method: transferJobs.patch

CloudDataTransferServiceCancelOperationOperator

Gets a transfer operation. The result is returned to XCOM.

For parameter definition, take a look at CloudDataTransferServiceCancelOperationOperator.

Using the operator

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py[source]

cancel_operation = CloudDataTransferServiceCancelOperationOperator(
    task_id="cancel_operation",
    operation_name="{{task_instance.xcom_pull("
    "'wait_for_operation_to_start_2', key='sensed_operations')[0]['name']}}",
)

Templating

template_fields: Sequence[str] = (
    "operation_name",
    "gcp_conn_id",
    "api_version",
    "google_impersonation_chain",
)

More information

See Google Cloud Transfer Service - Method: transferOperations.cancel

CloudDataTransferServiceGetOperationOperator

Gets a transfer operation. The result is returned to XCOM.

For parameter definition, take a look at CloudDataTransferServiceGetOperationOperator.

Using the operator

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py[source]

get_operation = CloudDataTransferServiceGetOperationOperator(
    task_id="get_operation", operation_name="{{task_instance.xcom_pull('list_operations')[0]['name']}}"
)

Templating

template_fields: Sequence[str] = (
    "operation_name",
    "gcp_conn_id",
    "google_impersonation_chain",
)

More information

See Google Cloud Transfer Service - Method: transferOperations.get

CloudDataTransferServiceListOperationsOperator

List a transfer operations. The result is returned to XCOM.

For parameter definition, take a look at CloudDataTransferServiceListOperationsOperator.

Using the operator

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py[source]

list_operations = CloudDataTransferServiceListOperationsOperator(
    task_id="list_operations",
    request_filter={
        FILTER_PROJECT_ID: GCP_PROJECT_ID,
        FILTER_JOB_NAMES: ["{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}"],
    },
)

Templating

template_fields: Sequence[str] = (
    "request_filter",
    "gcp_conn_id",
    "google_impersonation_chain",
)

More information

See Google Cloud Transfer Service - Method: transferOperations.list

CloudDataTransferServicePauseOperationOperator

Pauses a transfer operations.

For parameter definition, take a look at CloudDataTransferServicePauseOperationOperator.

Using the operator

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py[source]

pause_operation = CloudDataTransferServicePauseOperationOperator(
    task_id="pause_operation",
    operation_name="{{task_instance.xcom_pull('wait_for_operation_to_start', "
    "key='sensed_operations')[0]['name']}}",
)

Templating

template_fields: Sequence[str] = (
    "operation_name",
    "gcp_conn_id",
    "api_version",
    "google_impersonation_chain",
)

More information

See Google Cloud Transfer Service - Method: transferOperations.pause

CloudDataTransferServiceResumeOperationOperator

Resumes a transfer operations.

For parameter definition, take a look at CloudDataTransferServiceResumeOperationOperator.

Using the operator

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py[source]

resume_operation = CloudDataTransferServiceResumeOperationOperator(
    task_id="resume_operation", operation_name="{{task_instance.xcom_pull('get_operation')['name']}}"
)

Templating

template_fields: Sequence[str] = (
    "operation_name",
    "gcp_conn_id",
    "api_version",
    "google_impersonation_chain",
)

More information

See Google Cloud Transfer Service - Method: transferOperations.resume

CloudDataTransferServiceJobStatusSensor

Waits for at least one operation belonging to the job to have the expected status.

For parameter definition, take a look at CloudDataTransferServiceJobStatusSensor.

Using the operator

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py[source]

wait_for_operation_to_end = CloudDataTransferServiceJobStatusSensor(
    task_id="wait_for_operation_to_end",
    job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
    project_id=GCP_PROJECT_ID,
    expected_statuses={GcpTransferOperationStatus.SUCCESS},
    poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
)

Templating

template_fields: Sequence[str] = (
    "job_name",
    "impersonation_chain",
)

CloudDataTransferServiceGCSToGCSOperator

Copy data from one GCS bucket to another.

For parameter definition, take a look at CloudDataTransferServiceGCSToGCSOperator.

Using the operator

tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcs_to_gcs.py[source]

transfer_gcs_to_gcs = CloudDataTransferServiceGCSToGCSOperator(
    task_id="transfer_gcs_to_gcs",
    source_bucket=BUCKET_NAME_SRC,
    source_path=FILE_URI,
    destination_bucket=BUCKET_NAME_DST,
    destination_path=FILE_URI,
    wait=True,
)

Templating

template_fields: Sequence[str] = (
    "gcp_conn_id",
    "source_bucket",
    "destination_bucket",
    "source_path",
    "destination_path",
    "description",
    "object_conditions",
    "google_impersonation_chain",
)

Reference

For further information, look at:

Was this entry helpful?