Skip to content

Streaming Events

Strands Agents SDK provides real-time streaming capabilities that allow you to monitor and process events as they occur during agent execution. This enables responsive user interfaces, real-time monitoring, and custom output formatting.

Strands has multiple approaches for handling streaming events:

Both methods receive the same event types but differ in their execution model and use cases.

All streaming methods yield the same set of events:

  • init_event_loop: True at the start of agent invocation initializing
  • start_event_loop: True when the event loop is starting
  • message: Present when a new message is created
  • event: Raw event from the model stream
  • force_stop: True if the event loop was forced to stop
    • force_stop_reason: Reason for forced stop
  • result: The final AgentResult
  • data: Text chunk from the model’s output
  • delta: Raw delta content from the model
  • reasoning: True for reasoning events
    • reasoningText: Text from reasoning process
    • reasoning_signature: Signature from reasoning process
    • redactedContent: Reasoning content redacted by the model
  • current_tool_use: Information about the current tool being used, including:
    • toolUseId: Unique ID for this tool use
    • name: Name of the tool
    • input: Tool input parameters (accumulated as streaming occurs)
  • tool_stream_event: Information about an event streamed from a tool, including:
    • tool_use: The ToolUse for the tool that streamed the event
    • data: The data streamed from the tool

Multi-agent systems (Graph and Swarm) emit additional coordination events:

  • multiagent_node_start: When a node begins execution
    • type: "multiagent_node_start"
    • node_id: Unique identifier for the node
    • node_type: Type of node ("agent", "swarm", "graph")
  • multiagent_node_stream: Forwarded events from agents/multi-agents with node context
    • type: "multiagent_node_stream"
    • node_id: Identifier of the node generating the event
    • event: The original agent event (nested)
  • multiagent_node_stop: When a node completes execution
    • type: "multiagent_node_stop"
    • node_id: Unique identifier for the node
    • node_result: Complete NodeResult with execution details, metrics, and status
  • multiagent_handoff: When control is handed off between agents (Swarm) or batch transitions (Graph)
    • type: "multiagent_handoff"
    • from_node_ids: List of node IDs completing execution
    • to_node_ids: List of node IDs beginning execution
    • message: Optional handoff message (typically used in Swarm)
  • multiagent_result: Final multi-agent result
    • type: "multiagent_result"
    • result: The final GraphResult or SwarmResult

See Graph streaming and Swarm streaming for usage examples.

When streaming events over the wire (SSE, WebSockets, HTTP responses), TypeScript events are automatically serialized to compact JSON. Fields like agent, tool, cancel, and retry are excluded from serialization since they contain large internal references or mutable control flags — these fields are still available for direct programmatic access in-process.

for await (const event of agent.stream('Hello')) {
// Automatically serialized to compact, wire-safe JSON
res.write(`data: ${JSON.stringify(event)}\n\n`)
}

Each serialized event includes a type discriminator and its relevant data fields:

EventSerialized fields
ModelStreamUpdateEvent{ type, event }
ContentBlockEvent{ type, contentBlock }
ModelMessageEvent{ type, message, stopReason }
ToolResultEvent{ type, result }
ToolStreamUpdateEvent{ type, event }
AgentResultEvent{ type, result }
MessageAddedEvent{ type, message }
BeforeToolCallEvent{ type, toolUse }
AfterToolCallEvent{ type, toolUse, result, error? }
AfterModelCallEvent{ type, stopData?, error? }
BeforeToolsEvent / AfterToolsEvent{ type, message }
Lifecycle events (InitializedEvent, BeforeInvocationEvent, AfterInvocationEvent, BeforeModelCallEvent){ type }

Async Iterator Pattern

async for event in agent.stream_async("Calculate 2+2"):
if "data" in event:
print(event["data"], end="")

Callback Handler Pattern

def handle_events(**kwargs):
if "data" in kwargs:
print(kwargs["data"], end="")
agent = Agent(callback_handler=handle_events)
agent("Calculate 2+2")

This example demonstrates how to identify event emitted from an agent:

from strands import Agent
from strands_tools import calculator
def process_event(event):
"""Shared event processor for both async iterators and callback handlers"""
# Track event loop lifecycle
if event.get("init_event_loop", False):
print("🔄 Event loop initialized")
elif event.get("start_event_loop", False):
print("▶️ Event loop cycle starting")
elif "message" in event:
print(f"📬 New message created: {event['message']['role']}")
elif "result" in event:
print("✅ Agent completed with result")
elif event.get("force_stop", False):
print(f"🛑 Event loop force-stopped: {event.get('force_stop_reason', 'unknown reason')}")
# Track tool usage
if "current_tool_use" in event and event["current_tool_use"].get("name"):
tool_name = event["current_tool_use"]["name"]
print(f"🔧 Using tool: {tool_name}")
# Show text snippets
if "data" in event:
data_snippet = event["data"][:20] + ("..." if len(event["data"]) > 20 else "")
print(f"📟 Text: {data_snippet}")
agent = Agent(tools=[calculator], callback_handler=None)
async for event in agent.stream_async("What is the capital of France and what is 42+7?"):
process_event(event)

Utilizing both agents as a tool and tool streaming, this example shows how to stream events from sub-agents:

from typing import AsyncIterator
from dataclasses import dataclass
from strands import Agent, tool
from strands_tools import calculator
@dataclass
class SubAgentResult:
agent: Agent
event: dict
@tool
async def math_agent(query: str) -> AsyncIterator:
"""Solve math problems using the calculator tool."""
agent = Agent(
name="Math Expert",
system_prompt="You are a math expert. Use the calculator tool for calculations.",
callback_handler=None,
tools=[calculator]
)
result = None
async for event in agent.stream_async(query):
yield SubAgentResult(agent=agent, event=event)
if "result" in event:
result = event["result"]
yield str(result)
def process_sub_agent_events(event):
"""Shared processor for sub-agent streaming events"""
tool_stream = event.get("tool_stream_event", {}).get("data")
if isinstance(tool_stream, SubAgentResult):
current_tool = tool_stream.event.get("current_tool_use", {})
tool_name = current_tool.get("name")
if tool_name:
print(f"Agent '{tool_stream.agent.name}' using tool '{tool_name}'")
# Also show regular text output
if "data" in event:
print(event["data"], end="")
# Using with async iterators
orchestrator_async_iterator = Agent(
system_prompt="Route math questions to the math_agent tool.",
callback_handler=None,
tools=[math_agent]
)
# With async-iterator
async for event in orchestrator_async_iterator.stream_async("What is 3+3?"):
process_sub_agent_events(event)
# With callback handler
def handle_events(**kwargs):
process_sub_agent_events(kwargs)
orchestrator_callback = Agent(
system_prompt="Route math questions to the math_agent tool.",
callback_handler=handle_events,
tools=[math_agent]
)
orchestrator_callback("What is 3+3?")