Google Cloud Transfer Service Operators¶
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
CloudDataTransferServiceCreateJobOperator¶
Create a transfer job.
The function accepts dates in two formats:
consistent with Google API
{ "year": 2019, "month": 2, "day": 11 }
as an
datetimeobject
The function accepts time in two formats:
consistent with Google API
{ "hours": 12, "minutes": 30, "seconds": 0 }
as an
timeobject
If you want to create a transfer job that copies data from AWS S3, you must have an AWS connection configured. Information about configuration for AWS is available in Amazon Web Services Connection.
For parameter definition, take a look at
CloudDataTransferServiceCreateJobOperator.
Using the operator¶
gcs_to_gcs_transfer_body = {
DESCRIPTION: "description",
STATUS: GcpTransferJobsStatus.ENABLED,
PROJECT_ID: PROJECT_ID_TRANSFER,
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + timedelta(seconds=120)).time(),
},
TRANSFER_SPEC: {
GCS_DATA_SOURCE: {BUCKET_NAME: BUCKET_NAME_SRC},
GCS_DATA_SINK: {BUCKET_NAME: BUCKET_NAME_DST},
TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
},
}
Note
For AWS S3 sources, pass aws_conn_id to the operator.
aws_to_gcs_transfer_body = {
DESCRIPTION: GCP_DESCRIPTION,
STATUS: GcpTransferJobsStatus.ENABLED,
PROJECT_ID: GCP_PROJECT_ID,
JOB_NAME: GCP_TRANSFER_JOB_NAME,
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + timedelta(minutes=1)).time(),
},
TRANSFER_SPEC: {
AWS_S3_DATA_SOURCE: {BUCKET_NAME: BUCKET_SOURCE_AWS},
GCS_DATA_SINK: {BUCKET_NAME: BUCKET_TARGET_GCS},
TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
},
}
create_transfer_job_s3_to_gcs = CloudDataTransferServiceCreateJobOperator(
task_id="create_transfer_job_s3_to_gcs", body=aws_to_gcs_transfer_body
)
Templating¶
template_fields: Sequence[str] = (
"body",
"gcp_conn_id",
"aws_conn_id",
"google_impersonation_chain",
)
More information¶
See Google Cloud Transfer Service - Method: transferJobs.create.
CloudDataTransferServiceDeleteJobOperator¶
Deletes a transfer job.
For parameter definition, take a look at
CloudDataTransferServiceDeleteJobOperator.
Using the operator¶
delete_transfer_job_s3_to_gcs = CloudDataTransferServiceDeleteJobOperator(
task_id="delete_transfer_job_s3_to_gcs",
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
project_id=GCP_PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
Templating¶
template_fields: Sequence[str] = (
"job_name",
"project_id",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
More information¶
See Google Cloud Transfer Service - REST Resource: transferJobs - Status
CloudDataTransferServiceRunJobOperator¶
Runs a transfer job.
For parameter definition, take a look at
CloudDataTransferServiceRunJobOperator.
Using the operator¶
run_transfer = CloudDataTransferServiceRunJobOperator(
task_id="run_transfer",
job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
project_id=PROJECT_ID_TRANSFER,
)
Templating¶
template_fields: Sequence[str] = (
"job_name",
"project_id",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
More information¶
See Google Cloud Transfer Service - REST Resource: transferJobs - Run
CloudDataTransferServiceUpdateJobOperator¶
Updates a transfer job.
For AWS S3 sources you must have an AWS connection configured. Information about configuration for AWS is available in Amazon Web Services Connection.
For parameter definition, take a look at
CloudDataTransferServiceUpdateJobOperator.
Using the operator¶
update_body = {
PROJECT_ID: PROJECT_ID_TRANSFER,
TRANSFER_JOB: {DESCRIPTION: "description_updated"},
TRANSFER_JOB_FIELD_MASK: "description",
}
update_transfer = CloudDataTransferServiceUpdateJobOperator(
task_id="update_transfer",
job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
body=update_body,
)
Note
For AWS S3 updates, pass aws_conn_id and include transferSpec in the update payload.
If your spec uses an IAM role (for example roleArn), static credentials are not injected.
update_body = {
PROJECT_ID: GCP_PROJECT_ID,
TRANSFER_JOB: {
DESCRIPTION: "description_updated",
TRANSFER_SPEC: {
AWS_S3_DATA_SOURCE: {BUCKET_NAME: BUCKET_SOURCE_AWS},
GCS_DATA_SINK: {BUCKET_NAME: BUCKET_TARGET_GCS},
TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
},
},
TRANSFER_JOB_FIELD_MASK: "description,transferSpec",
}
update_transfer_job_s3_to_gcs = CloudDataTransferServiceUpdateJobOperator(
task_id="update_transfer_job_s3_to_gcs",
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
body=update_body,
)
Templating¶
template_fields: Sequence[str] = (
"job_name",
"body",
"gcp_conn_id",
"aws_conn_id",
"google_impersonation_chain",
)
More information¶
See Google Cloud Transfer Service - Method: transferJobs.patch
CloudDataTransferServiceCancelOperationOperator¶
Gets a transfer operation. The result is returned to XCOM.
For parameter definition, take a look at
CloudDataTransferServiceCancelOperationOperator.
Using the operator¶
cancel_operation = CloudDataTransferServiceCancelOperationOperator(
task_id="cancel_operation",
operation_name="{{task_instance.xcom_pull("
"'wait_for_operation_to_start_2', key='sensed_operations')[0]['name']}}",
)
Templating¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
More information¶
See Google Cloud Transfer Service - Method: transferOperations.cancel
CloudDataTransferServiceGetOperationOperator¶
Gets a transfer operation. The result is returned to XCOM.
For parameter definition, take a look at
CloudDataTransferServiceGetOperationOperator.
Using the operator¶
get_operation = CloudDataTransferServiceGetOperationOperator(
task_id="get_operation", operation_name="{{task_instance.xcom_pull('list_operations')[0]['name']}}"
)
Templating¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"google_impersonation_chain",
)
More information¶
See Google Cloud Transfer Service - Method: transferOperations.get
CloudDataTransferServiceListOperationsOperator¶
List a transfer operations. The result is returned to XCOM.
For parameter definition, take a look at
CloudDataTransferServiceListOperationsOperator.
Using the operator¶
list_operations = CloudDataTransferServiceListOperationsOperator(
task_id="list_operations",
request_filter={
FILTER_PROJECT_ID: GCP_PROJECT_ID,
FILTER_JOB_NAMES: ["{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}"],
},
)
Templating¶
template_fields: Sequence[str] = (
"request_filter",
"gcp_conn_id",
"google_impersonation_chain",
)
More information¶
See Google Cloud Transfer Service - Method: transferOperations.list
CloudDataTransferServicePauseOperationOperator¶
Pauses a transfer operations.
For parameter definition, take a look at
CloudDataTransferServicePauseOperationOperator.
Using the operator¶
pause_operation = CloudDataTransferServicePauseOperationOperator(
task_id="pause_operation",
operation_name="{{task_instance.xcom_pull('wait_for_operation_to_start', "
"key='sensed_operations')[0]['name']}}",
)
Templating¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
More information¶
See Google Cloud Transfer Service - Method: transferOperations.pause
CloudDataTransferServiceResumeOperationOperator¶
Resumes a transfer operations.
For parameter definition, take a look at
CloudDataTransferServiceResumeOperationOperator.
Using the operator¶
resume_operation = CloudDataTransferServiceResumeOperationOperator(
task_id="resume_operation", operation_name="{{task_instance.xcom_pull('get_operation')['name']}}"
)
Templating¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
More information¶
See Google Cloud Transfer Service - Method: transferOperations.resume
CloudDataTransferServiceJobStatusSensor¶
Waits for at least one operation belonging to the job to have the expected status.
For parameter definition, take a look at
CloudDataTransferServiceJobStatusSensor.
Using the operator¶
wait_for_operation_to_end = CloudDataTransferServiceJobStatusSensor(
task_id="wait_for_operation_to_end",
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
project_id=GCP_PROJECT_ID,
expected_statuses={GcpTransferOperationStatus.SUCCESS},
poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
)
Templating¶
template_fields: Sequence[str] = (
"job_name",
"impersonation_chain",
)
CloudDataTransferServiceGCSToGCSOperator¶
Copy data from one GCS bucket to another.
For parameter definition, take a look at
CloudDataTransferServiceGCSToGCSOperator.
Using the operator¶
transfer_gcs_to_gcs = CloudDataTransferServiceGCSToGCSOperator(
task_id="transfer_gcs_to_gcs",
source_bucket=BUCKET_NAME_SRC,
source_path=FILE_URI,
destination_bucket=BUCKET_NAME_DST,
destination_path=FILE_URI,
wait=True,
)
Templating¶
template_fields: Sequence[str] = (
"gcp_conn_id",
"source_bucket",
"destination_bucket",
"source_path",
"destination_path",
"description",
"object_conditions",
"google_impersonation_chain",
)
Reference¶
For further information, look at: