airflow.providers.common.io.state_store.backend¶
Attributes¶
Classes¶
Object-storage backend for task and asset store. |
Module Contents¶
- class airflow.providers.common.io.state_store.backend.StateStoreObjectStorageBackend[source]¶
Bases:
airflow.sdk._shared.state.BaseStoreBackendObject-storage backend for task and asset store.
Config keys (all under
[common.io]):state_store_objectstorage_path: base path, e.g.s3://conn_id@bucket/task-state/state_store_objectstorage_compression: optional compression, e.g.gzip
- get(scope, key, *, session=None)[source]¶
Return the stored JSON encoded value string, or None if the key does not exist.
Must handle both
TaskScopeandAssetScope. The execution API callsjson.loadson the returned string from here, so it must be a valid JSON document.
- set(scope, key, value, *, expires_at=None, session=None)[source]¶
Write or overwrite
valuefor the given key.Must handle both
TaskScopeandAssetScope.valueis always a JSON encoded string (the execution API callsjson.dumpsbefore passing it here); store it verbatim sogetcan return it unchanged.expires_atis an absolute UTC datetime after which the row may be deleted. PassNone(default) for a key that should never expire — stored asNULL, skipped by garbage collection.
- delete(scope, key, *, session=None)[source]¶
Delete a single key. No-op if the key does not exist.
Must handle both
TaskScopeandAssetScope.
- clear(scope, *, all_map_indices=False, session=None)[source]¶
Delete all keys under the given scope.
Must handle both
TaskScopeandAssetScope.For
TaskScope: by default, only keys for the exactmap_indexon the scope are cleared. Whenall_map_indices=True, themap_indexfilter is dropped and state is wiped across every mapped instance — for use by external callers (UI, CLI) only, not from within a running task. ForAssetScopethe flag has no effect.
- abstract aget(scope, key, *, session=None)[source]¶
- Async:
Async variant of
getwhich returns a JSON encoded value string or None.Must handle both
TaskScopeandAssetScope.sessionis used directly when provided; otherwise implementations manage their own session internally.
- abstract aset(scope, key, value, *, expires_at=None, session=None)[source]¶
- Async:
Async variant of
set.valueis always a JSON encoded string.Must handle both
TaskScopeandAssetScope.sessionis used directly when provided; otherwise implementations manage their own session internally.
- abstract adelete(scope, key, *, session=None)[source]¶
- Async:
Async variant of delete. Must handle both
TaskScopeandAssetScope.sessionis optional. If provided, implementations should use it directly. IfNone, implementations manage their own async session internally.
- abstract aclear(scope, *, all_map_indices=False, session=None)[source]¶
- Async:
Async variant of clear. Must handle both
TaskScopeandAssetScope.For
TaskScope: by default, only keys for the exactmap_indexon the scope are cleared. Whenall_map_indices=True, themap_indexfilter is dropped and state is wiped across every mapped instance — for use by external callers (UI, CLI) only, not from within a running task. ForAssetScopethe flag has no effect.sessionis optional. If provided, implementations should use it directly. IfNone, implementations manage their own async session internally.
- serialize_task_state_store_to_ref(*, value, key, scope)[source]¶
Serialize a task state store value before it is sent to the execution API for db persistence.
Called by
TaskStateStoreAccessor.set()on the worker. The return value is what gets stored in the DB — typically a reference path (e.g. an S3 key) rather than the actual value. Default: returnvalueunchanged.Important: return only the raw reference string. The worker framework automatically wraps it in
{"__airflow_state_ref__": "<ref>"}before writing to the DB, and strips that wrapper before passingstoredtodeserialize_task_state_store_from_ref(). Do not wrap the reference yourself.The returned reference must be deterministic — given the same
scopeandkeyit must always return the same string. Do not use timestamps or random UUIDs as part of the reference, otherwisedelete()/clear()cannot reconstruct it and the external object will be orphaned. By default, it JSON dumps the value and returns a JSON string.
- deserialize_task_state_store_from_ref(stored)[source]¶
Resolve a stored task state store reference back to the actual value.
Called by
TaskStateStoreAccessor.get()after the stored string is retrieved from the execution API. By default, it JSON decodesstoredto reverse the defaultserialize_task_state_store_to_refencoding.
- serialize_asset_state_store_to_ref(*, value, key, scope)[source]¶
Serialize an asset state store value before it is sent to the Execution API for db persistence.
Called by
AssetStateStoreAccessor.set()on the worker. The return value is what gets stored in the DB — typically a reference path rather than the actual value. Default: returnvalueunchanged.Important: return only the raw reference string. The worker framework automatically wraps it in
{"__airflow_state_ref__": "<ref>"}before writing to the DB, and strips that wrapper before passingstoredtodeserialize_asset_state_store_from_ref(). Do not wrap the reference yourself.The returned reference must be deterministic — given the same
scopeandkeyit must always return the same string. Do not use timestamps or random UUIDs as part of the reference, otherwisedelete()/clear()cannot reconstruct it and the external object will be orphaned. By default, it JSON dumps the value and returns a JSON string.
- deserialize_asset_state_store_from_ref(stored)[source]¶
Resolve a stored asset state store reference back to the actual value.
Called by
AssetStateStoreAccessor.get()after the stored string is retrieved from the Execution API. By default, it JSON decodesstoredto reverse the defaultserialize_asset_state_store_to_refencoding.