airflow.providers.common.ai.durable.storage

ObjectStorage-backed durable storage for pydantic-ai agent step caching.

Attributes

log

SECTION

Classes

DurableStorage

Stores step-level caches in a single JSON file on ObjectStorage.

Module Contents

airflow.providers.common.ai.durable.storage.log[source]
airflow.providers.common.ai.durable.storage.SECTION = 'common.ai'[source]
class airflow.providers.common.ai.durable.storage.DurableStorage(*, dag_id, task_id, run_id, map_index=-1)[source]

Stores step-level caches in a single JSON file on ObjectStorage.

All step caches (model responses and tool results) are stored as entries in a single JSON blob, written to a file named after the task execution: {base_path}/{dag_id}_{task_id}_{run_id}[_{map_index}].json.

The file survives Airflow task retries since it lives outside the XCom system. It is deleted on successful task completion.

Parameters:
  • dag_id (str) – DAG ID of the running task.

  • task_id (str) – Task ID of the running task.

  • run_id (str) – DAG run ID.

  • map_index (int) – Map index for mapped tasks (-1 for non-mapped).

save_model_response(key, response)[source]

Serialize and store a ModelResponse in the cache.

load_model_response(key)[source]

Load a cached ModelResponse, or return None if not cached.

save_tool_result(key, result)[source]

Store a tool call result in the cache.

Non-serializable results (e.g. BinaryContent from MCP tools) are skipped with a warning – the tool call still succeeds, but won’t be replayed on retry.

load_tool_result(key)[source]

Load a cached tool result.

Returns (found, value) tuple since the cached value itself could be None.

cleanup()[source]

Delete the cache file after successful execution.

Was this entry helpful?