Source code for tests.system.google.cloud.gcs.example_gcs_to_gcs

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG for Google Cloud Storage GCSSynchronizeBucketsOperator and
GCSToGCSOperator operators.
"""

from __future__ import annotations

import os
import shutil
from datetime import datetime

from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.gcs import (
    GCSCreateBucketOperator,
    GCSDeleteBucketOperator,
    GCSListObjectsOperator,
    GCSSynchronizeBucketsOperator,
)
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule

from providers.tests.system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID

[docs]ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
[docs]PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
[docs]DAG_ID = "gcs_to_gcs"
[docs]BUCKET_NAME_SRC = f"bucket_{DAG_ID}_{ENV_ID}"
[docs]BUCKET_NAME_DST = f"bucket_dst_{DAG_ID}_{ENV_ID}"
RANDOM_FILE_NAME = OBJECT_1 = OBJECT_2 = "random.bin"
[docs]HOME = "/home/airflow/gcs"
[docs]PREFIX = f"{HOME}/data/{DAG_ID}_{ENV_ID}/"
def _assert_copied_files_exist(ti): objects = ti.xcom_pull(task_ids=["list_objects"], key="return_value")[0] assert PREFIX + OBJECT_1 in objects assert f"{PREFIX}subdir/{OBJECT_1}" in objects assert f"backup/{OBJECT_1}" in objects assert f"backup_{OBJECT_1}" in objects with DAG( DAG_ID, schedule="@once", start_date=datetime(2021, 1, 1), catchup=False, tags=["gcs", "example"], ) as dag: @task
[docs] def create_workdir() -> str: """ Task creates working directory. The logic behind this task is a workaround that provides sustainable execution in Composer environment: local files can be safely shared among tasks if they are located within '/home/airflow/gcs/data/' folder which is mounted to GCS bucket under the hood (https://cloud.google.com/composer/docs/composer-2/cloud-storage). Otherwise, worker nodes don't share local path and thus files created by one task aren't guaranteed to be accessible be others. """ workdir = PREFIX if os.path.exists(HOME) else HOME os.makedirs(PREFIX) return workdir
create_workdir_task = create_workdir() generate_random_file = BashOperator( task_id="generate_random_file", bash_command=f"cat /dev/urandom | head -c $((1 * 1024 * 1024)) > {PREFIX + RANDOM_FILE_NAME}", ) create_bucket_src = GCSCreateBucketOperator( task_id="create_bucket_src", bucket_name=BUCKET_NAME_SRC, project_id=PROJECT_ID, ) create_bucket_dst = GCSCreateBucketOperator( task_id="create_bucket_dst", bucket_name=BUCKET_NAME_DST, project_id=PROJECT_ID, ) upload_file_src = LocalFilesystemToGCSOperator( task_id="upload_file_src", src=PREFIX + RANDOM_FILE_NAME, dst=PREFIX + RANDOM_FILE_NAME, bucket=BUCKET_NAME_SRC, ) upload_file_src_sub = LocalFilesystemToGCSOperator( task_id="upload_file_src_sub", src=PREFIX + RANDOM_FILE_NAME, dst=f"{PREFIX}subdir/{RANDOM_FILE_NAME}", bucket=BUCKET_NAME_SRC, ) upload_file_dst = LocalFilesystemToGCSOperator( task_id="upload_file_dst", src=PREFIX + RANDOM_FILE_NAME, dst=PREFIX + RANDOM_FILE_NAME, bucket=BUCKET_NAME_DST, ) upload_file_dst_sub = LocalFilesystemToGCSOperator( task_id="upload_file_dst_sub", src=PREFIX + RANDOM_FILE_NAME, dst=f"{PREFIX}subdir/{RANDOM_FILE_NAME}", bucket=BUCKET_NAME_DST, ) # [START howto_synch_bucket] sync_bucket = GCSSynchronizeBucketsOperator( task_id="sync_bucket", source_bucket=BUCKET_NAME_SRC, destination_bucket=BUCKET_NAME_DST ) # [END howto_synch_bucket] # [START howto_synch_full_bucket] sync_full_bucket = GCSSynchronizeBucketsOperator( task_id="sync_full_bucket", source_bucket=BUCKET_NAME_SRC, destination_bucket=BUCKET_NAME_DST, delete_extra_files=True, allow_overwrite=True, ) # [END howto_synch_full_bucket] # [START howto_synch_to_subdir] sync_to_subdirectory = GCSSynchronizeBucketsOperator( task_id="sync_to_subdirectory", source_bucket=BUCKET_NAME_SRC, destination_bucket=BUCKET_NAME_DST, destination_object="subdir/", ) # [END howto_synch_to_subdir] # [START howto_sync_from_subdir] sync_from_subdirectory = GCSSynchronizeBucketsOperator( task_id="sync_from_subdirectory", source_bucket=BUCKET_NAME_SRC, source_object="subdir/", destination_bucket=BUCKET_NAME_DST, ) # [END howto_sync_from_subdir] # [START howto_operator_gcs_to_gcs_single_file] copy_single_file = GCSToGCSOperator( task_id="copy_single_gcs_file", source_bucket=BUCKET_NAME_SRC, source_object=PREFIX + OBJECT_1, destination_bucket=BUCKET_NAME_DST, # If not supplied the source_bucket value will be used destination_object="backup_" + OBJECT_1, # If not supplied the source_object value will be used exact_match=True, ) # [END howto_operator_gcs_to_gcs_single_file] # [START howto_operator_gcs_to_gcs_without_wildcard] copy_files = GCSToGCSOperator( task_id="copy_files", source_bucket=BUCKET_NAME_SRC, source_object=PREFIX + "subdir/", destination_bucket=BUCKET_NAME_DST, destination_object="backup/", ) # [END howto_operator_gcs_to_gcs_without_wildcard] # [START howto_operator_gcs_to_gcs_match_glob] copy_files_with_match_glob = GCSToGCSOperator( task_id="copy_files_with_match_glob", source_bucket=BUCKET_NAME_SRC, source_object="data/", destination_bucket=BUCKET_NAME_DST, destination_object="backup/", match_glob="**/*.txt", ) # [END howto_operator_gcs_to_gcs_match_glob] # [START howto_operator_gcs_to_gcs_list] copy_files_with_list = GCSToGCSOperator( task_id="copy_files_with_list", source_bucket=BUCKET_NAME_SRC, source_objects=[ PREFIX + OBJECT_1, PREFIX + OBJECT_2, ], # Instead of files each element could be a wildcard expression destination_bucket=BUCKET_NAME_DST, destination_object="backup/", ) # [END howto_operator_gcs_to_gcs_list] # [START howto_operator_gcs_to_gcs_single_file_move] move_single_file = GCSToGCSOperator( task_id="move_single_file", source_bucket=BUCKET_NAME_SRC, source_object=PREFIX + OBJECT_1, destination_bucket=BUCKET_NAME_DST, destination_object="backup_" + OBJECT_1, exact_match=True, move_object=True, ) # [END howto_operator_gcs_to_gcs_single_file_move] # [START howto_operator_gcs_to_gcs_list_move] move_files_with_list = GCSToGCSOperator( task_id="move_files_with_list", source_bucket=BUCKET_NAME_SRC, source_objects=[PREFIX + OBJECT_1, PREFIX + OBJECT_2], destination_bucket=BUCKET_NAME_DST, destination_object="backup/", ) # [END howto_operator_gcs_to_gcs_list_move] list_objects = GCSListObjectsOperator(task_id="list_objects", bucket=BUCKET_NAME_DST) assert_copied_files_exist = PythonOperator( task_id="assert_copied_files_exist", python_callable=_assert_copied_files_exist, ) delete_bucket_src = GCSDeleteBucketOperator( task_id="delete_bucket_src", bucket_name=BUCKET_NAME_SRC, trigger_rule=TriggerRule.ALL_DONE ) delete_bucket_dst = GCSDeleteBucketOperator( task_id="delete_bucket_dst", bucket_name=BUCKET_NAME_DST, trigger_rule=TriggerRule.ALL_DONE ) @task(trigger_rule=TriggerRule.ALL_DONE) def delete_work_dir(create_workdir_result: str) -> None: shutil.rmtree(create_workdir_result) chain( # TEST SETUP create_workdir_task, generate_random_file, [create_bucket_src, create_bucket_dst], [upload_file_src, upload_file_src_sub], [upload_file_dst, upload_file_dst_sub], # TEST BODY sync_bucket, sync_full_bucket, sync_to_subdirectory, sync_from_subdirectory, copy_single_file, copy_files, copy_files_with_match_glob, copy_files_with_list, move_single_file, move_files_with_list, list_objects, assert_copied_files_exist, # TEST TEARDOWN [delete_bucket_src, delete_bucket_dst, delete_work_dir(create_workdir_task)], ) from tests_common.test_utils.watcher import watcher # This test needs watcher in order to properly mark success/failure # when "tearDown" task with trigger rule is part of the DAG list(dag.tasks) >> watcher() from tests_common.test_utils.system_tests import get_test_run # noqa: E402 # Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
[docs]test_run = get_test_run(dag)

Was this entry helpful?