Google Cloud Composer Operators¶
Cloud Composer is a fully managed workflow orchestration service, enabling you to create, schedule, monitor, and manage workflows that span across clouds and on-premises data centers.
Cloud Composer is built on the popular Apache Airflow open source project and operates using the Python programming language.
By using Cloud Composer instead of a local instance of Apache Airflow, you can benefit from the best of Airflow with no installation or management overhead. Cloud Composer helps you create Airflow environments quickly and use Airflow-native tools, such as the powerful Airflow web interface and command-line tools, so you can focus on your workflows and not your infrastructure.
For more information about the service visit Cloud Composer production documentation <Product documentation
Create a environment¶
Before you create a cloud composer environment you need to define it. For more information about the available fields to pass when creating a environment, visit Cloud Composer create environment API.
A simple environment configuration can look as followed:
ENVIRONMENT_ID = f"test-{DAG_ID}-{ENV_ID}".replace("_", "-")
ENVIRONMENT_ID_ASYNC = f"test-deferrable-{DAG_ID}-{ENV_ID}".replace("_", "-")
ENVIRONMENT = {
"config": {
"software_config": {"image_version": "composer-2-airflow-2"},
}
}
With this configuration we can create the environment:
CloudComposerCreateEnvironmentOperator
create_env = CloudComposerCreateEnvironmentOperator(
task_id="create_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
environment=ENVIRONMENT,
)
or you can define the same operator in the deferrable mode:
CloudComposerCreateEnvironmentOperator
defer_create_env = CloudComposerCreateEnvironmentOperator(
task_id="defer_create_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
environment=ENVIRONMENT,
deferrable=True,
)
Get a environment¶
To get a environment you can use:
CloudComposerGetEnvironmentOperator
get_env = CloudComposerGetEnvironmentOperator(
task_id="get_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
)
List a environments¶
To get a environment you can use:
CloudComposerListEnvironmentsOperator
list_envs = CloudComposerListEnvironmentsOperator(
task_id="list_envs", project_id=PROJECT_ID, region=REGION
)
Update a environments¶
You can update the environments by providing a environments config and an updateMask. In the updateMask argument you specifies the path, relative to Environment, of the field to update. For more information on updateMask and other parameters take a look at Cloud Composer update environment API.
An example of a new service config and the updateMask:
UPDATED_ENVIRONMENT = {
"labels": {
"label": "testing",
}
}
UPDATE_MASK = {"paths": ["labels.label"]}
To update a service you can use:
CloudComposerUpdateEnvironmentOperator
update_env = CloudComposerUpdateEnvironmentOperator(
task_id="update_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
update_mask=UPDATE_MASK,
environment=UPDATED_ENVIRONMENT,
)
or you can define the same operator in the deferrable mode:
CloudComposerCreateEnvironmentOperator
defer_update_env = CloudComposerUpdateEnvironmentOperator(
task_id="defer_update_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
update_mask=UPDATE_MASK,
environment=UPDATED_ENVIRONMENT,
deferrable=True,
)
Delete a service¶
To delete a service you can use:
CloudComposerDeleteEnvironmentOperator
delete_env = CloudComposerDeleteEnvironmentOperator(
task_id="delete_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
)
or you can define the same operator in the deferrable mode:
CloudComposerDeleteEnvironmentOperator
defer_delete_env = CloudComposerDeleteEnvironmentOperator(
task_id="defer_delete_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
deferrable=True,
)
List of Composer Images¶
You can also list all supported Cloud Composer images:
CloudComposerListImageVersionsOperator
image_versions = CloudComposerListImageVersionsOperator(
task_id="image_versions",
project_id=PROJECT_ID,
region=REGION,
)
Run Airflow CLI commands¶
You can run Airflow CLI commands in your environments, use:
CloudComposerRunAirflowCLICommandOperator
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
command=COMMAND,
)
or you can define the same operator in the deferrable mode:
defer_run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="defer_run_airflow_cli_cmd",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
command=COMMAND,
deferrable=True,
)
Check if a DAG run has completed¶
You can use sensor that checks if a DAG run has completed in your environments, use:
CloudComposerDAGRunSensor
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
composer_dag_id="airflow_monitoring",
allowed_states=["success"],
)
or you can define the same sensor in the deferrable mode:
defer_dag_run_sensor = CloudComposerDAGRunSensor(
task_id="defer_dag_run_sensor",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
composer_dag_id="airflow_monitoring",
allowed_states=["success"],
deferrable=True,
)