Ray Job Operators¶
The Ray Job operators provide a high-level interface for interacting with remote Ray clusters using the Ray Jobs API. These operators can be used with clusters running on Google Cloud Vertex AI Ray, GKE (self-managed Ray clusters) or any Ray cluster reachable through a dashboard address or Ray Client address.
The operators allow you to submit jobs, monitor their progress, retrieve logs, and manage job lifecycle from Airflow.
Submitting Ray Jobs¶
The RaySubmitJobOperator
submits a job to a Ray cluster and optionally waits for completion.
It supports waiting for job completion with wait_for_job_done
and retrieving logs after completion with get_job_logs parameters.
submit_ray_job = RaySubmitJobOperator(
task_id="submit_ray_job",
cluster_address="{{ task_instance.xcom_pull(task_ids='get_ray_cluster')['dashboard_address'] }}",
entrypoint="python3 heavy.py",
runtime_env={
"working_dir": "./providers/google/tests/system/google/cloud/ray/resources",
"pip": [
"ray==2.33.0",
],
},
get_job_logs=False,
wait_for_job_done=False,
submission_id=JOB_ID,
)
Stopping Ray Jobs¶
Use RayStopJobOperator
to stop a running Ray job identified by its job ID.
stop_ray_job = RayStopJobOperator(
task_id="stop_ray_job",
job_id=JOB_ID,
cluster_address="{{ task_instance.xcom_pull(task_ids='get_ray_cluster')['dashboard_address'] }}",
)
Deleting Ray Jobs¶
Use RayDeleteJobOperator
to delete a job and its metadata after it reaches a terminal state.
delete_ray_job = RayDeleteJobOperator(
task_id="delete_ray_job",
cluster_address="{{ task_instance.xcom_pull(task_ids='get_ray_cluster')['dashboard_address'] }}",
job_id=JOB_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
Retrieving Job Information¶
The RayGetJobInfoOperator
retrieves detailed information about a Ray job, including status, timestamps,
entrypoint, metadata, and runtime environment.
info_ray_job = RayGetJobInfoOperator(
task_id="info_ray_job",
cluster_address="{{ task_instance.xcom_pull(task_ids='get_ray_cluster')['dashboard_address'] }}",
job_id=JOB_ID,
)
Listing Jobs¶
Use RayListJobsOperator
to list all jobs that have run on the cluster.
list_ray_job = RayListJobsOperator(
task_id="list_ray_job",
cluster_address="{{ task_instance.xcom_pull(task_ids='get_ray_cluster')['dashboard_address'] }}",
)