airflow.providers.common.ai.durable.caching_toolset

Caching toolset wrapper for durable execution.

Attributes

log

Classes

CachingToolset

Wraps a toolset to cache tool call results in ObjectStorage for durable execution.

Module Contents

airflow.providers.common.ai.durable.caching_toolset.log[source]
class airflow.providers.common.ai.durable.caching_toolset.CachingToolset[source]

Bases: pydantic_ai.toolsets.wrapper.WrapperToolset[Any]

Wraps a toolset to cache tool call results in ObjectStorage for durable execution.

On each call_tool() invocation, checks if a cached result exists for the current step index. If so, returns the cached result without executing the tool. Otherwise, executes the tool and caches the result.

The step index is grabbed before the first await, so parallel tool calls via asyncio.gather get deterministic indices (tasks start executing their synchronous preamble in creation order).

storage: airflow.providers.common.ai.durable.storage.DurableStorage[source]
counter: airflow.providers.common.ai.durable.step_counter.DurableStepCounter[source]
async call_tool(name, tool_args, ctx, tool)[source]

Call a tool with the given arguments.

Args:

name: The name of the tool to call. tool_args: The arguments to pass to the tool. ctx: The run context. tool: The tool definition returned by [get_tools][pydantic_ai.toolsets.AbstractToolset.get_tools] that was called.

Was this entry helpful?