Managed Service for Apache Airflow (formerly Cloud Composer) Operators¶
Managed Service for Apache Airflow (formerly Cloud Composer) is a fully managed workflow orchestration service, enabling you to create, schedule, monitor, and manage workflows that span across clouds and on-premises data centers.
Managed Service for Apache Airflow (formerly Cloud Composer) is built on the popular Apache Airflow open source project and operates using the Python programming language.
By using Managed Service for Apache Airflow (formerly Cloud Composer) instead of a local instance of Apache Airflow, you can benefit from the best of Airflow with no installation or management overhead. Managed Service for Apache Airflow (formerly Cloud Composer) helps you create Airflow environments quickly and use Airflow-native tools, such as the powerful Airflow web interface and command-line tools, so you can focus on your workflows and not your infrastructure.
For more information about the service visit the product documentation.
Create an environment¶
Before you create a Managed Service for Apache Airflow environment you need to define it. For more information about the available fields to pass when creating an environment, visit the Managed Service for Apache Airflow create environment API.
A simple environment configuration can look as followed:
ENVIRONMENT_ID = f"test-{DAG_ID}-{ENV_ID}".replace("_", "-")
ENVIRONMENT_ID_ASYNC = f"test-deferrable-{DAG_ID}-{ENV_ID}".replace("_", "-")
ENVIRONMENT = {
"config": {
"software_config": {"image_version": "composer-3-airflow-2"},
"node_config": {
"service_account": f"{PROJECT_NUMBER}-compute@developer.gserviceaccount.com",
},
},
}
With this configuration we can create the environment:
ManagedAirflowCreateEnvironmentOperator
The executable example below still imports the compatibility name
CloudComposerCreateEnvironmentOperator. The preferred alias for new code is
ManagedAirflowCreateEnvironmentOperator.
The create operator only succeeds after the Composer environment reaches the RUNNING state.
If the long-running create operation finishes but the environment remains in another state such as
ERROR or CREATING, the task fails so downstream tasks do not run against an unusable environment.
create_env = CloudComposerCreateEnvironmentOperator(
task_id="create_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
environment=ENVIRONMENT,
retries=1,
retry_delay=timedelta(minutes=1),
on_retry_callback=cleanup_failed_environment_before_retry,
)
or you can define the same operator in the deferrable mode:
ManagedAirflowCreateEnvironmentOperator
defer_create_env = CloudComposerCreateEnvironmentOperator(
task_id="defer_create_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
environment=ENVIRONMENT,
deferrable=True,
retries=1,
retry_delay=timedelta(minutes=1),
on_retry_callback=cleanup_failed_environment_before_retry,
)
For retry-heavy system tests, you can clean up a failed environment before retrying the create task.
The example below only deletes environments that are already in the ERROR state and leaves other
states untouched.
def cleanup_failed_environment_before_retry(context: dict[str, Any]) -> None:
task = context["task"]
hook = CloudComposerHook(
gcp_conn_id=task.gcp_conn_id,
impersonation_chain=task.impersonation_chain,
)
environment_id = task.environment_id
project_id = task.project_id
region = task.region
log.info(
"Retry cleanup started for Composer env. project_id=%s region=%s environment_id=%s",
project_id,
region,
environment_id,
)
try:
environment = hook.get_environment(
project_id=project_id,
region=region,
environment_id=environment_id,
)
except NotFound:
log.info("Environment does not exist. Nothing to clean up.")
return
env_state = Environment.State(environment.state)
log.info(
"Environment exists before retry. state=%s value=%s",
env_state.name,
env_state.value,
)
if env_state != Environment.State.ERROR:
log.info(
"Skipping cleanup before retry because environment is not in ERROR state. current_state=%s",
env_state.name,
)
return
log.info("Deleting Composer environment %s in ERROR state before retry.", environment_id)
delete_operation = hook.delete_environment(
project_id=project_id,
region=region,
environment_id=environment_id,
)
hook.wait_for_operation(operation=delete_operation)
log.info("Environment %s deleted before retry.", environment_id)
Get an environment¶
To get an environment you can use:
ManagedAirflowGetEnvironmentOperator
The executable example below still imports the compatibility name
CloudComposerGetEnvironmentOperator. The preferred alias for new code is
ManagedAirflowGetEnvironmentOperator.
get_env = CloudComposerGetEnvironmentOperator(
task_id="get_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
)
List environments¶
To list environments you can use:
ManagedAirflowListEnvironmentsOperator
The executable example below still imports the compatibility name
CloudComposerListEnvironmentsOperator. The preferred alias for new code is
ManagedAirflowListEnvironmentsOperator.
list_envs = CloudComposerListEnvironmentsOperator(
task_id="list_envs", project_id=PROJECT_ID, region=REGION
)
Update environments¶
You can update the environments by providing an environment config and an updateMask. In the updateMask argument you specify the path, relative to the environment, of the field to update. For more information on updateMask and other parameters take a look at the Managed Service for Apache Airflow update environment API.
An example of a new service config and the updateMask:
UPDATED_ENVIRONMENT = {
"labels": {
"label": "testing",
}
}
UPDATE_MASK = {"paths": ["labels.label"]}
To update a service you can use:
ManagedAirflowUpdateEnvironmentOperator
The executable example below still imports the compatibility name
CloudComposerUpdateEnvironmentOperator. The preferred alias for new code is
ManagedAirflowUpdateEnvironmentOperator.
update_env = CloudComposerUpdateEnvironmentOperator(
task_id="update_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
update_mask=UPDATE_MASK,
environment=UPDATED_ENVIRONMENT,
)
or you can define the same operator in the deferrable mode:
ManagedAirflowUpdateEnvironmentOperator
defer_update_env = CloudComposerUpdateEnvironmentOperator(
task_id="defer_update_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
update_mask=UPDATE_MASK,
environment=UPDATED_ENVIRONMENT,
deferrable=True,
)
Delete an environment¶
To delete an environment you can use:
ManagedAirflowDeleteEnvironmentOperator
The executable example below still imports the compatibility name
CloudComposerDeleteEnvironmentOperator. The preferred alias for new code is
ManagedAirflowDeleteEnvironmentOperator.
delete_env = CloudComposerDeleteEnvironmentOperator(
task_id="delete_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
)
or you can define the same operator in the deferrable mode:
ManagedAirflowDeleteEnvironmentOperator
defer_delete_env = CloudComposerDeleteEnvironmentOperator(
task_id="defer_delete_env",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
deferrable=True,
)
List of Managed Airflow Images¶
You can also list all supported Managed Service for Apache Airflow images:
ManagedAirflowListImageVersionsOperator
The executable example below still imports the compatibility name
CloudComposerListImageVersionsOperator. The preferred alias for new code is
ManagedAirflowListImageVersionsOperator.
image_versions = CloudComposerListImageVersionsOperator(
task_id="image_versions",
project_id=PROJECT_ID,
region=REGION,
)
Run Airflow CLI commands¶
You can run Airflow CLI commands in your environments, use:
ManagedAirflowRunAirflowCLICommandOperator
The executable example below still imports the compatibility name
CloudComposerRunAirflowCLICommandOperator. The preferred alias for new code is
ManagedAirflowRunAirflowCLICommandOperator.
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
command=COMMAND,
)
or you can define the same operator in the deferrable mode:
defer_run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="defer_run_airflow_cli_cmd",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
command=COMMAND,
deferrable=True,
)
Check if a Dag run has completed¶
You can use sensor that checks if a Dag run has completed in your environments, use:
CloudComposerDAGRunSensor
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
composer_dag_id="airflow_monitoring",
allowed_states=["success"],
)
or you can define the same sensor in the deferrable mode:
defer_dag_run_sensor = CloudComposerDAGRunSensor(
task_id="defer_dag_run_sensor",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
composer_dag_id="airflow_monitoring",
allowed_states=["success"],
deferrable=True,
)
Trigger a Dag run¶
You can trigger a DAG in another Managed Service for Apache Airflow environment, use:
ManagedAirflowTriggerDAGRunOperator
The executable example below still imports the compatibility name
CloudComposerTriggerDAGRunOperator. The preferred alias for new code is
ManagedAirflowTriggerDAGRunOperator.
trigger_dag_run = CloudComposerTriggerDAGRunOperator(
task_id="trigger_dag_run",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
composer_dag_id="airflow_monitoring",
)
Waits for a different Dag, task group, or task to complete¶
You can use sensor that waits for a different DAG, task group, or task to complete for a specific
Managed Service for Apache Airflow environment, use:
CloudComposerExternalTaskSensor
external_task_sensor = CloudComposerExternalTaskSensor(
task_id="external_task_sensor",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID,
composer_external_dag_id="airflow_monitoring",
composer_external_task_id="echo",
allowed_states=["success"],
execution_range=[datetime.now() - timedelta(1), datetime.now()],
)
or you can define the same sensor in the deferrable mode:
defer_external_task_sensor = CloudComposerExternalTaskSensor(
task_id="defer_external_task_sensor",
project_id=PROJECT_ID,
region=REGION,
environment_id=ENVIRONMENT_ID_ASYNC,
composer_external_dag_id="airflow_monitoring",
composer_external_task_id="echo",
allowed_states=["success"],
execution_range=[datetime.now() - timedelta(1), datetime.now()],
deferrable=True,
)