ResumableJobMixin

Added in version 3.3.0.

ResumableJobMixin is a mixin for operators that submit long-running jobs to an external system and poll for its completion. It makes the operator crash-safe by persisting the external job identifier to task state store before polling begins. If the worker is restarted or the host is preempted, the next retry reconnects to the already running job instead of submitting a duplicate.

This mixin is not a replacement for deferrable operators. Deferrable operators free the worker slot during polling and are the recommended approach when a Triggerer is available. Use this mixin when you want crash safety on an existing synchronous operator without migrating to the deferrable pattern, or when your deployment does not include a Triggerer.

For guidance on choosing between deferrable, resumable, and async approaches, see Resumable Tasks.

Interface

Subclasses must implement these six methods:

def submit_job(self, context: Context) -> JsonValue: ...
def get_job_status(self, external_id: JsonValue, context: Context) -> str: ...
def is_job_active(self, status: str) -> bool: ...
def is_job_succeeded(self, status: str) -> bool: ...
def poll_until_complete(self, external_id: JsonValue, context: Context) -> None: ...
def get_job_result(self, external_id: JsonValue, context: Context) -> Any: ...

Implementing the mixin

Add inheritance to ResumableJobMixin in your operator class, then call execute_resumable(context) from your execute method. The mixin requires you to implement six methods that describe how to interact with your external system:

submit_job(context)

Submit the job and return its external identifier. The returned value is stored in task_store and passed back to the other methods on retry. Return None only if the external system does not provide a trackable identifier; in that case the mixin cannot provide crash safety and will resubmit on every retry.

get_job_status(external_id, context)

Query the external system for the current job status. Return a raw string from the external system. This method is called on retry to determine whether the job is still running, succeeded, or failed.

is_job_active(status)

Return True if the job is still running and can be reconnected to. status is the raw string returned by get_job_status, a backend-specific value from the external system, not an Airflow state:

def is_job_active(self, status: str) -> bool:
    return status in ("RUNNING", "PENDING", "ACCEPTED")
is_job_succeeded(status)

Return True if the job completed successfully. status is the same raw string from the external system:

def is_job_succeeded(self, status: str) -> bool:
    return status == "SUCCEEDED"
poll_until_complete(external_id, context)

Block until the job reaches a terminal state. Raise on failure.

get_job_result(external_id, context)

Return the job result after successful completion. Return None if not applicable.

How it works

On the first run, after submit_job returns the external identifier, the mixin persists that identifier to task_store before calling poll_until_complete. If the worker crashes during polling, the next retry reads the stored identifier and calls get_job_status to check the current state of the job:

  • If the job is still running, the mixin calls poll_until_complete to reconnect and continue waiting.

  • If the job already completed successfully, the mixin calls get_job_result and returns immediately without resubmitting.

  • If the job is in a terminal failure state, the mixin falls through and submits a fresh job.

Note

There is a small window between submit_job returning and task_store.set completing. If the worker crashes in that gap, the next retry does not have the identifier and will submit a fresh job. For most workloads this window is negligible.

Example

from airflow.sdk import BaseOperator, ResumableJobMixin
from pydantic import JsonValue


class MyBatchOperator(BaseOperator, ResumableJobMixin):

    external_id_key = "batch_job_id"

    def execute(self, context):
        return self.execute_resumable(context)

    def submit_job(self, context) -> JsonValue:
        return self.hook.submit_batch(...)

    def get_job_status(self, external_id: JsonValue, context) -> str:
        return self.hook.get_status(external_id)

    def is_job_active(self, status: str) -> bool:
        return status in ("RUNNING", "PENDING", "QUEUED")

    def is_job_succeeded(self, status: str) -> bool:
        return status == "SUCCEEDED"

    def poll_until_complete(self, external_id: JsonValue, context) -> None:
        self.hook.wait(external_id)

    def get_job_result(self, external_id: JsonValue, context):
        return None

External ID key

The external_id_key class attribute controls which key is used to store the job identifier in task_store. The default value is "remote_job_id". You can override it on your subclass to use a more descriptive name:

class MyBatchOperator(ResumableJobMixin, BaseOperator):
    external_id_key = "batch_job_id"

Warning

Do not rename external_id_key on an operator that is already deployed and has in-flight task instances. The old key is already stored in the task state store under the previous name. A rename causes the mixin to treat every active retry as a fresh submission, defeating the crash-safety guarantee.

Was this entry helpful?