Ray Job Operators

The Ray Job operators provide a high-level interface for interacting with remote Ray clusters using the Ray Jobs API. These operators can be used with clusters running on Google Cloud Vertex AI Ray, GKE (self-managed Ray clusters) or any Ray cluster reachable through a dashboard address or Ray Client address.

The operators allow you to submit jobs, monitor their progress, retrieve logs, and manage job lifecycle from Airflow.

Submitting Ray Jobs

The RaySubmitJobOperator submits a job to a Ray cluster and optionally waits for completion.

It supports waiting for job completion with wait_for_job_done and retrieving logs after completion with get_job_logs parameters.

tests/system/google/cloud/ray/example_ray_job.py[source]

submit_ray_job = RaySubmitJobOperator(
    task_id="submit_ray_job",
    cluster_address="{{ task_instance.xcom_pull(task_ids='get_ray_cluster')['dashboard_address'] }}",
    entrypoint="python3 heavy.py",
    runtime_env={
        "working_dir": "./providers/google/tests/system/google/cloud/ray/resources",
        "pip": [
            "ray==2.33.0",
        ],
    },
    get_job_logs=False,
    wait_for_job_done=False,
    submission_id=JOB_ID,
)

Stopping Ray Jobs

Use RayStopJobOperator to stop a running Ray job identified by its job ID.

tests/system/google/cloud/ray/example_ray_job.py[source]

stop_ray_job = RayStopJobOperator(
    task_id="stop_ray_job",
    job_id=JOB_ID,
    cluster_address="{{ task_instance.xcom_pull(task_ids='get_ray_cluster')['dashboard_address'] }}",
)

Deleting Ray Jobs

Use RayDeleteJobOperator to delete a job and its metadata after it reaches a terminal state.

tests/system/google/cloud/ray/example_ray_job.py[source]

delete_ray_job = RayDeleteJobOperator(
    task_id="delete_ray_job",
    cluster_address="{{ task_instance.xcom_pull(task_ids='get_ray_cluster')['dashboard_address'] }}",
    job_id=JOB_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

Retrieving Job Information

The RayGetJobInfoOperator retrieves detailed information about a Ray job, including status, timestamps, entrypoint, metadata, and runtime environment.

tests/system/google/cloud/ray/example_ray_job.py[source]

info_ray_job = RayGetJobInfoOperator(
    task_id="info_ray_job",
    cluster_address="{{ task_instance.xcom_pull(task_ids='get_ray_cluster')['dashboard_address'] }}",
    job_id=JOB_ID,
)

Listing Jobs

Use RayListJobsOperator to list all jobs that have run on the cluster.

tests/system/google/cloud/ray/example_ray_job.py[source]

list_ray_job = RayListJobsOperator(
    task_id="list_ray_job",
    cluster_address="{{ task_instance.xcom_pull(task_ids='get_ray_cluster')['dashboard_address'] }}",
)

Was this entry helpful?