Google Cloud Composer Operators

Cloud Composer is a fully managed workflow orchestration service, enabling you to create, schedule, monitor, and manage workflows that span across clouds and on-premises data centers.

Cloud Composer is built on the popular Apache Airflow open source project and operates using the Python programming language.

By using Cloud Composer instead of a local instance of Apache Airflow, you can benefit from the best of Airflow with no installation or management overhead. Cloud Composer helps you create Airflow environments quickly and use Airflow-native tools, such as the powerful Airflow web interface and command-line tools, so you can focus on your workflows and not your infrastructure.

For more information about the service visit Cloud Composer production documentation <Product documentation

Create an environment

Before you create a cloud composer environment you need to define it. For more information about the available fields to pass when creating an environment, visit Cloud Composer create environment API.

A simple environment configuration can look as followed:

google/tests/system/google/cloud/composer/example_cloud_composer.py[source]


ENVIRONMENT_ID = f"test-{DAG_ID}-{ENV_ID}".replace("_", "-")
ENVIRONMENT_ID_ASYNC = f"test-deferrable-{DAG_ID}-{ENV_ID}".replace("_", "-")

ENVIRONMENT = {
    "config": {
        "software_config": {"image_version": "composer-2-airflow-2"},
    }
}

With this configuration we can create the environment: CloudComposerCreateEnvironmentOperator

google/tests/system/google/cloud/composer/example_cloud_composer.py[source]

create_env = CloudComposerCreateEnvironmentOperator(
    task_id="create_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    environment=ENVIRONMENT,
)

or you can define the same operator in the deferrable mode: CloudComposerCreateEnvironmentOperator

google/tests/system/google/cloud/composer/example_cloud_composer.py[source]

defer_create_env = CloudComposerCreateEnvironmentOperator(
    task_id="defer_create_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    environment=ENVIRONMENT,
    deferrable=True,
)

Get an environment

To get an environment you can use:

CloudComposerGetEnvironmentOperator

google/tests/system/google/cloud/composer/example_cloud_composer.py[source]

get_env = CloudComposerGetEnvironmentOperator(
    task_id="get_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
)

List environments

To get an environment you can use:

CloudComposerListEnvironmentsOperator

google/tests/system/google/cloud/composer/example_cloud_composer.py[source]

list_envs = CloudComposerListEnvironmentsOperator(
    task_id="list_envs", project_id=PROJECT_ID, region=REGION
)

Update environments

You can update the environments by providing an environment config and an updateMask. In the updateMask argument you specify the path, relative to the environment, of the field to update. For more information on updateMask and other parameters take a look at Cloud Composer update environment API.

An example of a new service config and the updateMask:

google/tests/system/google/cloud/composer/example_cloud_composer.py[source]

UPDATED_ENVIRONMENT = {
    "labels": {
        "label": "testing",
    }
}
UPDATE_MASK = {"paths": ["labels.label"]}

To update a service you can use: CloudComposerUpdateEnvironmentOperator

google/tests/system/google/cloud/composer/example_cloud_composer.py[source]

update_env = CloudComposerUpdateEnvironmentOperator(
    task_id="update_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    update_mask=UPDATE_MASK,
    environment=UPDATED_ENVIRONMENT,
)

or you can define the same operator in the deferrable mode: CloudComposerCreateEnvironmentOperator

google/tests/system/google/cloud/composer/example_cloud_composer.py[source]

defer_update_env = CloudComposerUpdateEnvironmentOperator(
    task_id="defer_update_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    update_mask=UPDATE_MASK,
    environment=UPDATED_ENVIRONMENT,
    deferrable=True,
)

Delete a service

To delete a service you can use:

CloudComposerDeleteEnvironmentOperator

google/tests/system/google/cloud/composer/example_cloud_composer.py[source]

delete_env = CloudComposerDeleteEnvironmentOperator(
    task_id="delete_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
)

or you can define the same operator in the deferrable mode: CloudComposerDeleteEnvironmentOperator

google/tests/system/google/cloud/composer/example_cloud_composer.py[source]

defer_delete_env = CloudComposerDeleteEnvironmentOperator(
    task_id="defer_delete_env",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    deferrable=True,
)

List of Composer Images

You can also list all supported Cloud Composer images:

CloudComposerListImageVersionsOperator

google/tests/system/google/cloud/composer/example_cloud_composer.py[source]

image_versions = CloudComposerListImageVersionsOperator(
    task_id="image_versions",
    project_id=PROJECT_ID,
    region=REGION,
)

Run Airflow CLI commands

You can run Airflow CLI commands in your environments, use: CloudComposerRunAirflowCLICommandOperator

google/tests/system/google/cloud/composer/example_cloud_composer.py[source]

run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="run_airflow_cli_cmd",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    command=COMMAND,
)

or you can define the same operator in the deferrable mode:

google/tests/system/google/cloud/composer/example_cloud_composer.py[source]

defer_run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
    task_id="defer_run_airflow_cli_cmd",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    command=COMMAND,
    deferrable=True,
)

Check if a DAG run has completed

You can use sensor that checks if a DAG run has completed in your environments, use: CloudComposerDAGRunSensor

google/tests/system/google/cloud/composer/example_cloud_composer.py[source]

dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="dag_run_sensor",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID,
    composer_dag_id="airflow_monitoring",
    allowed_states=["success"],
)

or you can define the same sensor in the deferrable mode:

google/tests/system/google/cloud/composer/example_cloud_composer.py[source]

defer_dag_run_sensor = CloudComposerDAGRunSensor(
    task_id="defer_dag_run_sensor",
    project_id=PROJECT_ID,
    region=REGION,
    environment_id=ENVIRONMENT_ID_ASYNC,
    composer_dag_id="airflow_monitoring",
    allowed_states=["success"],
    deferrable=True,
)

Was this entry helpful?