Skip to content

Hooks cookbook

Practical hook patterns you can copy and adapt. Each recipe includes a complete implementation and a usage example. See hooks concepts for background and using hooks for registration basics.

Force specific tools to always use predetermined parameter values. Useful for enforcing security policies or maintaining consistency across invocations.

from typing import Any
from strands.hooks import HookProvider, HookRegistry, BeforeToolCallEvent
class ConstantToolArguments(HookProvider):
"""Use constant argument values for specific parameters of a tool."""
def __init__(self, fixed_tool_arguments: dict[str, dict[str, Any]]):
"""Initialize fixed parameter values for tools.
Args:
fixed_tool_arguments: Maps tool names to dictionaries of parameter
names and their fixed values. These values override any values
the agent provides when the tool is invoked.
"""
self._tools_to_fix = fixed_tool_arguments
def register_hooks(self, registry: HookRegistry, **kwargs: Any) -> None:
registry.add_callback(BeforeToolCallEvent, self._fix_tool_arguments)
def _fix_tool_arguments(self, event: BeforeToolCallEvent) -> None:
if parameters_to_fix := self._tools_to_fix.get(event.tool_use["name"]):
tool_input: dict[str, Any] = event.tool_use["input"]
tool_input.update(parameters_to_fix)

Force the calculator tool to always use a precision of 1:

from strands import Agent
fix_parameters = ConstantToolArguments({
"calculator": {
"precision": 1,
}
})
agent = Agent(tools=[calculator], hooks=[fix_parameters])
result = agent("What is 2 / 3?")

Prevent runaway tool usage by capping calls per invocation. Useful for rate limiting, usage quotas, and preventing infinite loops.

from typing import Any
from threading import Lock
from strands.hooks import HookProvider, HookRegistry, BeforeToolCallEvent, BeforeInvocationEvent
class LimitToolCounts(HookProvider):
"""Limit the number of times each tool can be called per agent invocation."""
def __init__(self, max_tool_counts: dict[str, int]):
"""Initialize with per-tool call limits.
Args:
max_tool_counts: Maps tool names to maximum allowed call counts.
Tools not listed have no limit.
"""
self.max_tool_counts = max_tool_counts
self.tool_counts: dict[str, int] = {}
self._lock = Lock()
def register_hooks(self, registry: HookRegistry, **kwargs: Any) -> None:
registry.add_callback(BeforeInvocationEvent, self.reset_counts)
registry.add_callback(BeforeToolCallEvent, self.intercept_tool)
def reset_counts(self, event: BeforeInvocationEvent) -> None:
with self._lock:
self.tool_counts = {}
def intercept_tool(self, event: BeforeToolCallEvent) -> None:
tool_name = event.tool_use["name"]
with self._lock:
max_tool_count = self.max_tool_counts.get(tool_name)
tool_count = self.tool_counts.get(tool_name, 0) + 1
self.tool_counts[tool_name] = tool_count
if max_tool_count and tool_count > max_tool_count:
event.cancel_tool = (
f"Tool '{tool_name}' has been invoked too many times and is now being throttled. "
f"DO NOT CALL THIS TOOL ANYMORE"
)

Limit the sleep tool to 3 calls per invocation:

from strands import Agent
limit_hook = LimitToolCounts(max_tool_counts={"sleep": 3})
agent = Agent(tools=[sleep], hooks=[limit_hook])
# Only 3 successful sleeps occur
agent("Sleep 5 times for 10ms each or until you can't anymore")
# Count resets on the next invocation
agent("Sleep once")

Retry model calls on transient errors with exponential backoff.

import asyncio
import logging
from typing import Any
from strands import Agent
from strands.hooks import HookProvider, HookRegistry, BeforeInvocationEvent, AfterModelCallEvent
logger = logging.getLogger(__name__)
class RetryOnServiceUnavailable(HookProvider):
"""Retry model calls when ServiceUnavailable errors occur."""
def __init__(self, max_retries: int = 3):
self.max_retries = max_retries
self.retry_count = 0
def register_hooks(self, registry: HookRegistry, **kwargs: Any) -> None:
registry.add_callback(BeforeInvocationEvent, self.reset_counts)
registry.add_callback(AfterModelCallEvent, self.handle_retry)
def reset_counts(self, event: BeforeInvocationEvent) -> None:
self.retry_count = 0
async def handle_retry(self, event: AfterModelCallEvent) -> None:
# Hook callbacks can be async when they need to await
if event.exception:
if "ServiceUnavailable" in str(event.exception):
logger.info("ServiceUnavailable encountered")
if self.retry_count < self.max_retries:
logger.info("Retrying model call")
self.retry_count += 1
event.retry = True
await asyncio.sleep(2 ** self.retry_count)
else:
self.retry_count = 0

Use it:

from strands import Agent
retry_hook = RetryOnServiceUnavailable(max_retries=3)
agent = Agent(hooks=[retry_hook])
result = agent("What is the capital of France?")

Re-execute failed tool calls automatically.

import logging
from typing import Any
from strands.hooks import HookProvider, HookRegistry, AfterToolCallEvent
logger = logging.getLogger(__name__)
class RetryOnToolError(HookProvider):
"""Retry tool calls that fail with errors."""
def __init__(self, max_retries: int = 1):
self.max_retries = max_retries
self._attempt_counts: dict[str, int] = {}
def register_hooks(self, registry: HookRegistry, **kwargs: Any) -> None:
registry.add_callback(AfterToolCallEvent, self.handle_retry)
def handle_retry(self, event: AfterToolCallEvent) -> None:
tool_use_id = str(event.tool_use.get("toolUseId", ""))
tool_name = event.tool_use.get("name", "unknown")
attempt = self._attempt_counts.get(tool_use_id, 0) + 1
self._attempt_counts[tool_use_id] = attempt
if event.result.get("status") == "error" and attempt <= self.max_retries:
logger.info(f"Retrying tool '{tool_name}' (attempt {attempt}/{self.max_retries})")
event.retry = True
elif event.result.get("status") != "error":
self._attempt_counts.pop(tool_use_id, None)

Retry a flaky API call once before giving up:

from strands import Agent, tool
@tool
def flaky_api_call(query: str) -> str:
"""Call an external API that sometimes fails.
Args:
query: The query to send.
"""
import random
if random.random() < 0.5:
raise RuntimeError("Service temporarily unavailable")
return f"Result for: {query}"
retry_hook = RetryOnToolError(max_retries=1)
agent = Agent(tools=[flaky_api_call], hooks=[retry_hook])
result = agent("Look up the weather")

Trigger follow-up agent invocations automatically after the current one completes. Useful for autonomous looping and iterative refinement.

These examples use bare functions with agent.add_hook() instead of HookProvider classes for brevity. For production use, wrap the state in a class to avoid global variables.

Summarize after tools (single resume):

from strands import Agent
from strands.hooks import AfterInvocationEvent
resume_count = 0
async def summarize_after_tools(event: AfterInvocationEvent):
"""Resume once to ask the model to summarize its work."""
global resume_count
if resume_count == 0 and event.result and event.result.stop_reason == "end_turn":
resume_count += 1
event.resume = "Now summarize what you just did in one sentence."
agent = Agent()
agent.add_hook(summarize_after_tools)
# The agent processes the request, then automatically
# performs a second invocation to generate the summary
result = agent("Look up the weather in Seattle")

Refine iteratively up to N times with a termination condition:

from strands import Agent
from strands.hooks import AfterInvocationEvent
MAX_ITERATIONS = 3
iteration = 0
async def iterative_refinement(event: AfterInvocationEvent):
"""Re-invoke the agent up to MAX_ITERATIONS times for iterative refinement."""
global iteration
if iteration < MAX_ITERATIONS and event.result:
iteration += 1
event.resume = f"Review your previous response and improve it. Iteration {iteration} of {MAX_ITERATIONS}."
agent = Agent()
agent.add_hook(iterative_refinement)
result = agent("Draft a haiku about programming")

Automatically approve or handle interrupted tool calls without returning to the caller. When an agent invocation ends because of an interrupt, a hook can resume with interrupt responses to continue execution.

When the agent is in an interrupt state, resume with a list of interruptResponse objects. Passing a plain string raises a TypeError.

from strands import Agent, tool
from strands.hooks import AfterInvocationEvent, BeforeToolCallEvent
@tool
def send_email(to: str, body: str) -> str:
"""Send an email.
Args:
to: Recipient address.
body: Email body.
"""
return f"Email sent to {to}"
def require_approval(event: BeforeToolCallEvent):
"""Interrupt before sending emails to require approval."""
if event.tool_use["name"] == "send_email":
event.interrupt("email_approval", reason="Approve this email?")
async def auto_approve(event: AfterInvocationEvent):
"""Automatically approve all interrupted tool calls."""
if event.result and event.result.stop_reason == "interrupt":
responses = [
{"interruptResponse": {"interruptId": intr.id, "response": "approved"}}
for intr in event.result.interrupts
]
event.resume = responses
agent = Agent(tools=[send_email])
agent.add_hook(require_approval)
agent.add_hook(auto_approve)
# The interrupt is handled automatically by the hook,
# the caller receives the final result directly
result = agent("Send an email to alice@example.com saying hello")