Task and Asset Store Configuration

Added in version 3.3.

The task and asset store is the persistence layer for task store and asset store. By default, both are stored in the Airflow metadata database. This page describes the available configuration options, garbage-collection semantics, and how to provide a custom backend. Configuration ([state_store]), the CLI (airflow state-store), and the backend base class (BaseStoreBackend) all use the state_store name for this feature.

Configuration reference

All options live under the [state_store] section of airflow.cfg.

Note

The config section is [state_store], not [task_store].

backend

Full dotted path to a class that implements BaseStoreBackend. Defaults to the built-in metastore backend.

[state_store]
backend = mypackage.state.CustomStateBackend

default_retention_days

Number of days after which task store rows expire. When a key is written with no explicit retention, expires_at is computed on the worker as now + default_retention_days. Changing this setting does not affect already-written rows.

  • Set to 0 to disable time-based cleanup entirely.

  • Default: 30.

  • This setting does not apply to asset store rows.

[state_store]
default_retention_days = 30

clear_on_success

When True, all task store keys for a task instance are automatically deleted when that task instance moves to the success state. Defaults to False, which preserves task store entries after success for observability (e.g. the submitted job ID or the last row count is still readable from the UI or REST API after the run completes).

Important

clear_on_success clears task store only. It has no effect on asset store. Asset store is scoped to the asset rather than the task instance and must be cleared explicitly.

[state_store]
clear_on_success = False

state_cleanup_batch_size

Number of rows deleted per batch during garbage collection cleanup. Set to 0 (default) to delete all matching rows in a single statement. Tune this on deployments with large task_store tables to reduce lock contention.

[state_store]
state_cleanup_batch_size = 10000

Worker-side backend ([workers] state_backend)

A separate, optional config key under [workers] lets you route task store and asset store values through a worker-side backend before they reach the API server.

[workers]
state_backend = mypackage.state.S3StateBackend

When this is set, TaskStoreAccessor.set() calls serialize_task_store_to_ref() on the worker-side backend before sending the returned value (a reference to the actual storage) to the Execution API, and get() calls deserialize_task_store_from_ref() after receiving the stored reference from the Execution API. See Custom worker-side backends below.

Garbage collection semantics

The cleanup task, also known as “garbage collection” is triggered using the Airflow CLI. The command to trigger the cleanup task is airflow state-store cleanup-task-store. This process removes store rows according to the following rules:

Time-based expiry (task store only)

Rows whose expires_at < now() are deleted. expires_at is computed on the worker at write time, not by the server.

``default_retention_days`` fallback (task store only)

Keys written with no explicit retention get an expires_at of now + default_retention_days computed at write time. Garbage collection deletes rows where expires_at < now().”

``NEVER_EXPIRE`` keys

Keys set with retention=NEVER_EXPIRE are stored with expires_at = NULL and a flag that tells the garbage collection to skip them unconditionally. They are never deleted by time-based cleanup, regardless of default_retention_days.

``on_delete=CASCADE`` (asset store)

When an asset is deleted, all corresponding asset store rows for that asset are deleted.

Important

Garbage collection only works for the MetastoreStateBackend. Custom backends are explicitly skipped.

Custom backends

A custom backend must subclass BaseStoreBackend and implement its abstract methods: get, set, delete, and clear for synchronous callers and the aget, aset, adelete, and aclear async equivalents. Refer to BaseStoreBackend for the full API.

Each method receives a scope argument that is either a TaskScope or an AssetScope. Use isinstance to dispatch:

from airflow.sdk.state import BaseStoreBackend, TaskScope, AssetScope


class MyBackend(BaseStoreBackend):
    def get(self, scope, key, *, session=None):
        if isinstance(scope, TaskScope):
            return self._task_store.get(scope, key)
        elif isinstance(scope, AssetScope):
            return self._asset_store.get(scope, key)

AssetScope has three optional fields: asset_id (integer, server-side only), name, and uri. At least one must be set. Server-side operations (REST API calls) provide asset_id. Worker-side operations provide name or uri (workers do not have access to the integer asset_id).

Configure the class via [state_store] backend:

[state_store]
backend = mypackage.state.MyBackend

Custom worker-side backends

Worker-side backends extend BaseStoreBackend with two pairs of serialization hooks. They are configured separately via [workers] state_backend and run on the worker process, not on the API server. This lets you store large payloads or credentialed data directly using worker infrastructure while only a compact reference string is kept in the database.

Override four serialization hooks from BaseStoreBackend:

  • serialize_task_store_to_ref: called by TaskStoreAccessor.set() before the value is sent to the Execution API; return a compact reference string (e.g. an S3 key) to be stored in the database instead of the raw value.

  • deserialize_task_store_from_ref: called by TaskStoreAccessor.get() after retrieving the reference from the backend; return the actual value.

  • serialize_asset_store_to_ref: same as the task variant but for asset store; receives the asset name or URI as asset_ref.

  • deserialize_asset_store_from_ref: called by AssetStoreAccessor.get() to resolve the stored reference back to the actual value.

Important

References must be deterministic. Given the same inputs (ti_id + key for task store; asset_ref + key for asset store), the serialization method must always return the same reference string. Do not embed timestamps, random UUIDs, or any other non-deterministic component in the reference path.

When a key is deleted or cleared, Airflow clears the database reference first, then calls the backend’s delete() or clear() method. If backend cleanup fails after the DB row is gone, the external object is orphaned. Because the reference is deterministic, a subsequent set() for the same key will overwrite the orphaned object, making the failure recoverable. A non-deterministic reference would leave the external object permanently orphaned with no way to locate it.

Example skeleton:

from airflow.sdk.state import BaseStoreBackend, TaskScope, AssetScope

if TYPE_CHECKING:
    from pydantic import JsonValue


class S3StateBackend(BaseStoreBackend):

    def _task_ref(self, ti_id: str, key: str) -> str:
        return f"airflow/task-store/{ti_id}/{key}"

    def _asset_ref(self, asset_ref: str, key: str) -> str:
        import hashlib

        safe = hashlib.sha256(asset_ref.encode()).hexdigest()[:16]
        return f"airflow/asset-store/{safe}/{key}"

    def serialize_task_store_to_ref(self, *, value: JsonValue, key: str, ti_id: str) -> str:
        s3_key = self._task_ref(ti_id, key)
        s3_client.put_object(Bucket=BUCKET, Key=s3_key, Body=json.dumps(value).encode())
        return s3_key

    def deserialize_task_store_from_ref(self, stored: str) -> JsonValue:
        s3_object = s3_client.get_object(Bucket=BUCKET, Key=stored)
        return json.loads(s3_object["Body"].read().decode())

    def serialize_asset_store_to_ref(self, *, value: JsonValue, key: str, asset_ref: str) -> str:
        s3_key = self._asset_ref(asset_ref, key)
        s3_client.put_object(Bucket=BUCKET, Key=s3_key, Body=json.dumps(value).encode())
        return s3_key

    def deserialize_asset_store_from_ref(self, stored: str) -> JsonValue:
        s3_object = s3_client.get_object(Bucket=BUCKET, Key=stored)
        return json.loads(s3_object["Body"].read().decode())

    # Implement the remaining abstract methods as pass-throughs or delegating to the
    # default MetastoreStateBackend for the DB side
    ...

Was this entry helpful?