Skip to content

Hooks

Hooks are a composable extensibility mechanism for extending agent functionality by subscribing to events throughout the agent lifecycle. The hook system enables both built-in components and user code to react to or modify agent behavior through strongly-typed event callbacks.

The hooks system is a composable, type-safe system that supports multiple subscribers per event type.

A Hook Event is a specific event in the lifecycle that callbacks can be associated with. A Hook Callback is a callback function that is invoked when the hook event is emitted.

Hooks enable use cases such as:

  • Monitoring agent execution and tool usage
  • Modifying tool execution behavior
  • Adding validation and error handling
  • Monitoring multi-agent execution flow and node transitions
  • Debugging complex orchestration patterns
  • Implementing custom logging and metrics collection

Hook callbacks are registered against specific event types and receive strongly-typed event objects when those events occur during agent execution. Each event carries relevant data for that stage of the agent lifecycle - for example, BeforeInvocationEvent includes agent and request details, while BeforeToolCallEvent provides tool information and parameters.

The simplest way to register a hook callback is using the agent.add_hook() method:

from strands import Agent
from strands.hooks import BeforeInvocationEvent, BeforeToolCallEvent
agent = Agent()
# Register individual callbacks
def my_callback(event: BeforeInvocationEvent) -> None:
print("Custom callback triggered")
agent.add_hook(my_callback, BeforeInvocationEvent)
# Type inference: If your callback has a type hint, the event type is inferred
def typed_callback(event: BeforeToolCallEvent) -> None:
print(f"Tool called: {event.tool_use['name']}")
agent.add_hook(typed_callback) # Event type inferred from type hint

For multi-agent orchestrators, you can register callbacks for orchestration events:

# Create your orchestrator (Graph or Swarm)
orchestrator = Graph(...)
# Register individual callbacks
def my_callback(event: BeforeNodeCallEvent) -> None:
print(f"Custom callback triggered")
orchestrator.hooks.add_callback(BeforeNodeCallEvent, my_callback)

For packaging multiple related hooks together, Plugins provide a convenient way to bundle hooks with configuration and tools:

from strands import Agent
from strands.plugins import Plugin, hook
from strands.hooks import BeforeToolCallEvent, AfterToolCallEvent
class LoggingPlugin(Plugin):
name = "logging-plugin"
@hook
def log_before(self, event: BeforeToolCallEvent) -> None:
print(f"Calling: {event.tool_use['name']}")
@hook
def log_after(self, event: AfterToolCallEvent) -> None:
print(f"Completed: {event.tool_use['name']}")
agent = Agent(plugins=[LoggingPlugin()])

See Plugins for more information on creating and using plugins.

The following diagram shows when hook events are emitted during a typical agent invocation where tools are invoked:

flowchart LR
subgraph Start["Request Start Events"]
direction TB
BeforeInvocationEvent["BeforeInvocationEvent"]
StartMessage["MessageAddedEvent"]
BeforeInvocationEvent --> StartMessage
end
subgraph Model["Model Events"]
direction TB
BeforeModelCallEvent["BeforeModelCallEvent"]
AfterModelCallEvent["AfterModelCallEvent"]
ModelMessage["MessageAddedEvent"]
BeforeModelCallEvent --> AfterModelCallEvent
AfterModelCallEvent --> ModelMessage
end
subgraph Tool["Tool Events"]
direction TB
BeforeToolCallEvent["BeforeToolCallEvent"]
AfterToolCallEvent["AfterToolCallEvent"]
ToolMessage["MessageAddedEvent"]
BeforeToolCallEvent --> AfterToolCallEvent
AfterToolCallEvent --> ToolMessage
end
subgraph End["Request End Events"]
direction TB
AfterInvocationEvent["AfterInvocationEvent"]
end
Start --> Model
Model <--> Tool
Tool --> End

The following diagram shows when multi-agent hook events are emitted during orchestrator execution:

flowchart LR
subgraph Init["Initialization"]
direction TB
MultiAgentInitializedEvent["MultiAgentInitializedEvent"]
end
subgraph Invocation["Invocation Lifecycle"]
direction TB
BeforeMultiAgentInvocationEvent["BeforeMultiAgentInvocationEvent"]
AfterMultiAgentInvocationEvent["AfterMultiAgentInvocationEvent"]
BeforeMultiAgentInvocationEvent --> NodeExecution
NodeExecution --> AfterMultiAgentInvocationEvent
end
subgraph NodeExecution["Node Execution (Repeated)"]
direction TB
BeforeNodeCallEvent["BeforeNodeCallEvent"]
AfterNodeCallEvent["AfterNodeCallEvent"]
BeforeNodeCallEvent --> AfterNodeCallEvent
end
Init --> Invocation
EventDescription
AgentInitializedEventTriggered when an agent has been constructed and finished initialization at the end of the agent constructor.
BeforeInvocationEventTriggered at the beginning of a new agent invocation request
AfterInvocationEventTriggered at the end of an agent request, regardless of success or failure. Uses reverse callback ordering
MessageAddedEventTriggered when a message is added to the agent’s conversation history
BeforeModelCallEventTriggered before the model is invoked for inference
AfterModelCallEventTriggered after model invocation completes. Uses reverse callback ordering
BeforeToolCallEventTriggered before a tool is invoked
AfterToolCallEventTriggered after tool invocation completes. Uses reverse callback ordering
MultiAgentInitializedEventTriggered when multi-agent orchestrator is initialized
BeforeMultiAgentInvocationEventTriggered before orchestrator execution starts
AfterMultiAgentInvocationEventTriggered after orchestrator execution completes. Uses reverse callback ordering
BeforeNodeCallEventTriggered before individual node execution starts
AfterNodeCallEventTriggered after individual node execution completes. Uses reverse callback ordering

Most event properties are read-only to prevent unintended modifications. However, certain properties can be modified to influence agent behavior:

Some events come in pairs, such as Before/After events. The After event callbacks are always called in reverse order from the Before event callbacks to ensure proper cleanup semantics.

Invocation state provides configuration and context data passed through the agent or orchestrator invocation. This is particularly useful for:

  1. Custom Objects: Access database client objects, connection pools, or other Python objects
  2. Request Context: Access session IDs, user information, settings, or request-specific data
  3. Multi-Agent Shared State: In multi-agent patterns, access state shared across all agents - see Shared State Across Multi-Agent Patterns
  4. Custom Parameters: Pass any additional data that hooks might need
from strands.hooks import BeforeToolCallEvent
import logging
def log_with_context(event: BeforeToolCallEvent) -> None:
"""Log tool invocations with context from invocation state."""
# Access invocation state from the event
user_id = event.invocation_state.get("user_id", "unknown")
session_id = event.invocation_state.get("session_id")
# Access non-JSON serializable objects like database connections
db_connection = event.invocation_state.get("database_connection")
logger_instance = event.invocation_state.get("custom_logger")
# Use custom logger if provided, otherwise use default
logger = logger_instance if logger_instance else logging.getLogger(__name__)
logger.info(
f"User {user_id} in session {session_id} "
f"invoking tool: {event.tool_use['name']} "
f"with DB connection: {db_connection is not None}"
)
# Register the hook
agent = Agent(tools=[my_tool])
agent.hooks.add_callback(BeforeToolCallEvent, log_with_context)
# Execute with context including non-serializable objects
import sqlite3
custom_logger = logging.getLogger("custom")
db_conn = sqlite3.connect(":memory:")
result = agent(
"Process the data",
user_id="user123",
session_id="sess456",
database_connection=db_conn, # Non-JSON serializable object
custom_logger=custom_logger # Non-JSON serializable object
)

Multi-agent hook events provide access to:

  • source: The multi-agent orchestrator instance (for example: Graph/Swarm)
  • node_id: Identifier of the node being executed (for node-level events)
  • invocation_state: Configuration and context data passed through the orchestrator invocation

Multi-agent hooks provide configuration and context data passed through the orchestrator’s lifecycle.

Modify or replace tools before execution:

class ToolInterceptor(HookProvider):
def register_hooks(self, registry: HookRegistry) -> None:
registry.add_callback(BeforeToolCallEvent, self.intercept_tool)
def intercept_tool(self, event: BeforeToolCallEvent) -> None:
if event.tool_use.name == "sensitive_tool":
# Replace with a safer alternative
event.selected_tool = self.safe_alternative_tool
event.tool_use["name"] = "safe_tool"

Modify tool results after execution:

class ResultProcessor(HookProvider):
def register_hooks(self, registry: HookRegistry) -> None:
registry.add_callback(AfterToolCallEvent, self.process_result)
def process_result(self, event: AfterToolCallEvent) -> None:
if event.tool_use.name == "calculator":
# Add formatting to calculator results
original_content = event.result["content"][0]["text"]
event.result["content"][0]["text"] = f"Result: {original_content}"

Implement custom logic to modify orchestration behavior in multi-agent systems:

class ConditionalExecutionHook(HookProvider):
def __init__(self, skip_conditions: dict[str, callable]):
self.skip_conditions = skip_conditions
def register_hooks(self, registry: HookRegistry) -> None:
registry.add_callback(BeforeNodeCallEvent, self.check_execution_conditions)
def check_execution_conditions(self, event: BeforeNodeCallEvent) -> None:
node_id = event.node_id
if node_id in self.skip_conditions:
condition_func = self.skip_conditions[node_id]
if condition_func(event.invocation_state):
print(f"Skipping node {node_id} due to condition")
# Note: Actual node skipping would require orchestrator-specific implementation

Design hooks to be composable and reusable:

class RequestLoggingHook(HookProvider):
def register_hooks(self, registry: HookRegistry) -> None:
registry.add_callback(BeforeInvocationEvent, self.log_request)
registry.add_callback(AfterInvocationEvent, self.log_response)
registry.add_callback(BeforeToolCallEvent, self.log_tool_use)
...

When modifying event properties, log the changes for debugging and audit purposes:

class ResultProcessor(HookProvider):
def register_hooks(self, registry: HookRegistry) -> None:
registry.add_callback(AfterToolCallEvent, self.process_result)
def process_result(self, event: AfterToolCallEvent) -> None:
if event.tool_use.name == "calculator":
original_content = event.result["content"][0]["text"]
logger.info(f"Modifying calculator result: {original_content}")
event.result["content"][0]["text"] = f"Result: {original_content}"

Design multi-agent hooks to work with different orchestrator types:

class UniversalMultiAgentHook(HookProvider):
def register_hooks(self, registry: HookRegistry) -> None:
registry.add_callback(BeforeNodeCallEvent, self.handle_node_execution)
def handle_node_execution(self, event: BeforeNodeCallEvent) -> None:
orchestrator_type = type(event.source).__name__
print(f"Executing node {event.node_id} in {orchestrator_type} orchestrator")
# Handle orchestrator-specific logic if needed
if orchestrator_type == "Graph":
self.handle_graph_node(event)
elif orchestrator_type == "Swarm":
self.handle_swarm_node(event)
def handle_graph_node(self, event: BeforeNodeCallEvent) -> None:
# Graph-specific handling
pass
def handle_swarm_node(self, event: BeforeNodeCallEvent) -> None:
# Swarm-specific handling
pass

Multi-agent hooks complement single-agent hooks. Individual agents within the orchestrator can still have their own hooks, creating a layered monitoring and customization system:

# Single-agent hook for individual agents
class AgentLevelHook(HookProvider):
def register_hooks(self, registry: HookRegistry) -> None:
registry.add_callback(BeforeToolCallEvent, self.log_tool_use)
def log_tool_use(self, event: BeforeToolCallEvent) -> None:
print(f"Agent tool call: {event.tool_use['name']}")
# Multi-agent hook for orchestrator
class OrchestratorLevelHook(HookProvider):
def register_hooks(self, registry: HookRegistry) -> None:
registry.add_callback(BeforeNodeCallEvent, self.log_node_execution)
def log_node_execution(self, event: BeforeNodeCallEvent) -> None:
print(f"Orchestrator node execution: {event.node_id}")
# Create agents with individual hooks
agent1 = Agent(tools=[tool1], hooks=[AgentLevelHook()])
agent2 = Agent(tools=[tool2], hooks=[AgentLevelHook()])
# Create orchestrator with multi-agent hooks
orchestrator = Graph(
agents={"agent1": agent1, "agent2": agent2},
hooks=[OrchestratorLevelHook()]
)

This layered approach provides comprehensive observability and control across both individual agent execution and orchestrator-level coordination.

This section contains practical hook implementations for common use cases.

Useful for enforcing security policies, maintaining consistency, or overriding agent decisions with system-level requirements. This hook ensures specific tools always use predetermined parameter values regardless of what the agent specifies.

from typing import Any
from strands.hooks import HookProvider, HookRegistry, BeforeToolCallEvent
class ConstantToolArguments(HookProvider):
"""Use constant argument values for specific parameters of a tool."""
def __init__(self, fixed_tool_arguments: dict[str, dict[str, Any]]):
"""
Initialize fixed parameter values for tools.
Args:
fixed_tool_arguments: A dictionary mapping tool names to dictionaries of
parameter names and their fixed values. These values will override any
values provided by the agent when the tool is invoked.
"""
self._tools_to_fix = fixed_tool_arguments
def register_hooks(self, registry: HookRegistry, **kwargs: Any) -> None:
registry.add_callback(BeforeToolCallEvent, self._fix_tool_arguments)
def _fix_tool_arguments(self, event: BeforeToolCallEvent):
# If the tool is in our list of parameters, then use those parameters
if parameters_to_fix := self._tools_to_fix.get(event.tool_use["name"]):
tool_input: dict[str, Any] = event.tool_use["input"]
tool_input.update(parameters_to_fix)

For example, to always force the calculator tool to use precision of 1 digit:

fix_parameters = ConstantToolArguments({
"calculator": {
"precision": 1,
}
})
agent = Agent(tools=[calculator], hooks=[fix_parameters])
result = agent("What is 2 / 3?")

Useful for preventing runaway tool usage, implementing rate limiting, or enforcing usage quotas. This hook tracks tool invocations per request and replaces tools with error messages when limits are exceeded.

from strands import tool
from strands.hooks import HookRegistry, HookProvider, BeforeToolCallEvent, BeforeInvocationEvent
from threading import Lock
class LimitToolCounts(HookProvider):
"""Limits the number of times tools can be called per agent invocation"""
def __init__(self, max_tool_counts: dict[str, int]):
"""
Initializer.
Args:
max_tool_counts: A dictionary mapping tool names to max call counts for
tools. If a tool is not specified in it, the tool can be called as many
times as desired
"""
self.max_tool_counts = max_tool_counts
self.tool_counts = {}
self._lock = Lock()
def register_hooks(self, registry: HookRegistry) -> None:
registry.add_callback(BeforeInvocationEvent, self.reset_counts)
registry.add_callback(BeforeToolCallEvent, self.intercept_tool)
def reset_counts(self, event: BeforeInvocationEvent) -> None:
with self._lock:
self.tool_counts = {}
def intercept_tool(self, event: BeforeToolCallEvent) -> None:
tool_name = event.tool_use["name"]
with self._lock:
max_tool_count = self.max_tool_counts.get(tool_name)
tool_count = self.tool_counts.get(tool_name, 0) + 1
self.tool_counts[tool_name] = tool_count
if max_tool_count and tool_count > max_tool_count:
event.cancel_tool = (
f"Tool '{tool_name}' has been invoked too many and is now being throttled. "
f"DO NOT CALL THIS TOOL ANYMORE "
)

For example, to limit the sleep tool to 3 invocations per invocation:

limit_hook = LimitToolCounts(max_tool_counts={"sleep": 3})
agent = Agent(tools=[sleep], hooks=[limit_hook])
# This call will only have 3 successful sleeps
agent("Sleep 5 times for 10ms each or until you can't anymore")
# This will sleep successfully again because the count resets every invocation
agent("Sleep once")

Useful for implementing custom retry logic for model invocations. The AfterModelCallEvent.retry field allows hooks to request retries based on any criteria—exceptions, response validation, content quality checks, or any custom logic. This example demonstrates retrying on exceptions with exponential backoff:

import asyncio
import logging
from strands.hooks import HookProvider, HookRegistry, BeforeInvocationEvent, AfterModelCallEvent
logger = logging.getLogger(__name__)
class RetryOnServiceUnavailable(HookProvider):
"""Retry model calls when ServiceUnavailable errors occur."""
def __init__(self, max_retries: int = 3):
self.max_retries = max_retries
self.retry_count = 0
def register_hooks(self, registry: HookRegistry) -> None:
registry.add_callback(BeforeInvocationEvent, self.reset_counts)
registry.add_callback(AfterModelCallEvent, self.handle_retry)
def reset_counts(self, event: BeforeInvocationEvent = None) -> None:
self.retry_count = 0
async def handle_retry(self, event: AfterModelCallEvent) -> None:
if event.exception:
if "ServiceUnavailable" in str(event.exception):
logger.info("ServiceUnavailable encountered")
if self.retry_count < self.max_retries:
logger.info("Retrying model call")
self.retry_count += 1
event.retry = True
await asyncio.sleep(2 ** self.retry_count) # Exponential backoff
else:
# Reset counts on successful call
self.reset_counts()

For example, to retry up to 3 times on service unavailable errors:

from strands import Agent
retry_hook = RetryOnServiceUnavailable(max_retries=3)
agent = Agent(hooks=[retry_hook])
result = agent("What is the capital of France?")

Useful for implementing custom retry logic for tool invocations. The AfterToolCallEvent.retry field allows hooks to request that a tool be re-executed—for example, to handle transient errors, timeouts, or flaky external services. When retry is set to True, the tool executor discards the current result and invokes the tool again with the same tool_use_id.

import logging
from strands.hooks import HookProvider, HookRegistry, AfterToolCallEvent
logger = logging.getLogger(__name__)
class RetryOnToolError(HookProvider):
"""Retry tool calls that fail with errors."""
def __init__(self, max_retries: int = 1):
self.max_retries = max_retries
self._attempt_counts: dict[str, int] = {}
def register_hooks(self, registry: HookRegistry) -> None:
registry.add_callback(AfterToolCallEvent, self.handle_retry)
def handle_retry(self, event: AfterToolCallEvent) -> None:
tool_use_id = str(event.tool_use.get("toolUseId", ""))
tool_name = event.tool_use.get("name", "unknown")
# Track attempts per tool_use_id
attempt = self._attempt_counts.get(tool_use_id, 0) + 1
self._attempt_counts[tool_use_id] = attempt
if event.result.get("status") == "error" and attempt <= self.max_retries:
logger.info(f"Retrying tool '{tool_name}' (attempt {attempt}/{self.max_retries})")
event.retry = True
elif event.result.get("status") != "error":
# Clean up tracking on success
self._attempt_counts.pop(tool_use_id, None)

For example, to retry failed tool calls once:

from strands import Agent, tool
@tool
def flaky_api_call(query: str) -> str:
"""Call an external API that sometimes fails.
Args:
query: The query to send.
"""
import random
if random.random() < 0.5:
raise RuntimeError("Service temporarily unavailable")
return f"Result for: {query}"
retry_hook = RetryOnToolError(max_retries=1)
agent = Agent(tools=[flaky_api_call], hooks=[retry_hook])
result = agent("Look up the weather")

The AfterInvocationEvent.resume property enables a hook to trigger a follow-up agent invocation after the current one completes. When you set resume to any valid agent input (a string, content blocks, or messages), the agent automatically re-invokes itself with that input instead of returning to the caller. This starts a full new invocation cycle, including firing BeforeInvocationEvent.

This is useful for building autonomous looping patterns where the agent continues processing based on its previous result—for example, re-evaluating after tool execution, injecting additional context, or implementing multi-step workflows within a single call.

The following example checks the agent result and triggers one follow-up invocation to ask the model to summarize its work:

from strands import Agent
from strands.hooks import AfterInvocationEvent
resume_count = 0
async def summarize_after_tools(event: AfterInvocationEvent):
"""Resume once to ask the model to summarize its work."""
global resume_count
if resume_count == 0 and event.result and event.result.stop_reason == "end_turn":
resume_count += 1
event.resume = "Now summarize what you just did in one sentence."
agent = Agent()
agent.add_hook(summarize_after_tools)
# The agent processes the initial request, then automatically
# performs a second invocation to generate the summary
result = agent("Look up the weather in Seattle")

You can also use resume to chain multiple re-invocations. Make sure to include a termination condition to avoid infinite loops:

from strands import Agent
from strands.hooks import AfterInvocationEvent
MAX_ITERATIONS = 3
iteration = 0
async def iterative_refinement(event: AfterInvocationEvent):
"""Re-invoke the agent up to MAX_ITERATIONS times for iterative refinement."""
global iteration
if iteration < MAX_ITERATIONS and event.result:
iteration += 1
event.resume = f"Review your previous response and improve it. Iteration {iteration} of {MAX_ITERATIONS}."
agent = Agent()
agent.add_hook(iterative_refinement)
result = agent("Draft a haiku about programming")

The resume property integrates with the interrupt system. When an agent invocation ends because of an interrupt, a hook can automatically handle the interrupt by resuming with interrupt responses. This avoids returning the interrupt to the caller.

When the agent is in an interrupt state, you must resume with a list of interruptResponse objects. Passing a plain string raises a TypeError.

from strands import Agent, tool
from strands.hooks import AfterInvocationEvent, BeforeToolCallEvent
@tool
def send_email(to: str, body: str) -> str:
"""Send an email.
Args:
to: Recipient address.
body: Email body.
"""
return f"Email sent to {to}"
def require_approval(event: BeforeToolCallEvent):
"""Interrupt before sending emails to require approval."""
if event.tool_use["name"] == "send_email":
event.interrupt("email_approval", reason="Approve this email?")
async def auto_approve(event: AfterInvocationEvent):
"""Automatically approve all interrupted tool calls."""
if event.result and event.result.stop_reason == "interrupt":
responses = [
{"interruptResponse": {"interruptId": intr.id, "response": "approved"}}
for intr in event.result.interrupts
]
event.resume = responses
agent = Agent(tools=[send_email])
agent.add_hook(require_approval)
agent.add_hook(auto_approve)
# The interrupt is handled automatically by the hook—
# the caller receives the final result directly
result = agent("Send an email to alice@example.com saying hello")

For advanced use cases, you can implement the HookProvider protocol to create objects that register multiple callbacks at once. This is useful when building reusable hook collections without the full plugin infrastructure:

from strands.hooks import HookProvider, HookRegistry, BeforeInvocationEvent, AfterInvocationEvent
class RequestLogger(HookProvider):
def register_hooks(self, registry: HookRegistry) -> None:
registry.add_callback(BeforeInvocationEvent, self.log_start)
registry.add_callback(AfterInvocationEvent, self.log_end)
def log_start(self, event: BeforeInvocationEvent) -> None:
print(f"Request started for agent: {event.agent.name}")
def log_end(self, event: AfterInvocationEvent) -> None:
print(f"Request completed for agent: {event.agent.name}")
# Pass via hooks parameter
agent = Agent(hooks=[RequestLogger()])
# Or add after creation
agent.hooks.add_hook(RequestLogger())

For most use cases, Plugins provide a more convenient way to bundle multiple hooks with additional features like auto-discovery and tool registration.