Skip to content

Streaming Events

Strands Agents SDK provides real-time streaming capabilities that allow you to monitor and process events as they occur during agent execution. This enables responsive user interfaces, real-time monitoring, and custom output formatting.

Strands has multiple approaches for handling streaming events:

Both methods receive the same event types but differ in their execution model and use cases.

All streaming methods yield the same set of events:

  • init_event_loop: True at the start of agent invocation initializing
  • start_event_loop: True when the event loop is starting
  • message: Present when a new message is created
  • event: Raw event from the model stream
  • force_stop: True if the event loop was forced to stop
    • force_stop_reason: Reason for forced stop
  • result: The final AgentResult
  • data: Text chunk from the model’s output
  • delta: Raw delta content from the model
  • reasoning: True for reasoning events
    • reasoningText: Text from reasoning process
    • reasoning_signature: Signature from reasoning process
    • redactedContent: Reasoning content redacted by the model
  • current_tool_use: Information about the current tool being used, including:
    • toolUseId: Unique ID for this tool use
    • name: Name of the tool
    • input: Tool input parameters (accumulated as streaming occurs)
  • tool_stream_event: Information about an event streamed from a tool, including:
    • tool_use: The ToolUse for the tool that streamed the event
    • data: The data streamed from the tool

Multi-agent systems (Graph and Swarm) emit additional coordination events:

  • multiagent_node_start: When a node begins execution
    • type: "multiagent_node_start"
    • node_id: Unique identifier for the node
    • node_type: Type of node ("agent", "swarm", "graph")
  • multiagent_node_stream: Forwarded events from agents/multi-agents with node context
    • type: "multiagent_node_stream"
    • node_id: Identifier of the node generating the event
    • event: The original agent event (nested)
  • multiagent_node_stop: When a node completes execution
    • type: "multiagent_node_stop"
    • node_id: Unique identifier for the node
    • node_result: Complete NodeResult with execution details, metrics, and status
  • multiagent_handoff: When control is handed off between agents (Swarm) or batch transitions (Graph)
    • type: "multiagent_handoff"
    • from_node_ids: List of node IDs completing execution
    • to_node_ids: List of node IDs beginning execution
    • message: Optional handoff message (typically used in Swarm)
  • multiagent_result: Final multi-agent result
    • type: "multiagent_result"
    • result: The final GraphResult or SwarmResult

See Graph streaming and Swarm streaming for usage examples.

Python streaming events are plain dictionaries. The SDK does not include a built-in serialization filter — you have full control over which events and fields to forward from your processes and servers.

When serving streamed responses (for example, over SSE or WebSockets), you can filter the yielded events to keep payloads compact:

import json
def filter_event(event: dict) -> dict | None:
"""Filter streaming events to only forward relevant data over the wire."""
# Forward text deltas for real-time display
if "data" in event:
return {"type": "text", "data": event["data"]}
# Forward tool usage for progress indicators
if "current_tool_use" in event and event["current_tool_use"].get("name"):
return {"type": "tool", "name": event["current_tool_use"]["name"]}
# Forward the final result
if "result" in event:
return {"type": "result", "stop_reason": str(event["result"].stop_reason)}
# Skip everything else (lifecycle signals, raw deltas, reasoning, etc.)
return None
async for event in agent.stream_async("Hello"):
filtered = filter_event(event)
if filtered:
await response.write(f"data: {json.dumps(filtered)}\n\n")

This approach lets you tailor the streamed output to your use case — for example, forwarding only text deltas for a chat UI or including tool events for a progress dashboard.

Async Iterator Pattern

async for event in agent.stream_async("Calculate 2+2"):
if "data" in event:
print(event["data"], end="")

Callback Handler Pattern

def handle_events(**kwargs):
if "data" in kwargs:
print(kwargs["data"], end="")
agent = Agent(callback_handler=handle_events)
agent("Calculate 2+2")

This example demonstrates how to identify event emitted from an agent:

from strands import Agent
from strands_tools import calculator
def process_event(event):
"""Shared event processor for both async iterators and callback handlers"""
# Track event loop lifecycle
if event.get("init_event_loop", False):
print("🔄 Event loop initialized")
elif event.get("start_event_loop", False):
print("▶️ Event loop cycle starting")
elif "message" in event:
print(f"📬 New message created: {event['message']['role']}")
elif "result" in event:
print("✅ Agent completed with result")
elif event.get("force_stop", False):
print(f"🛑 Event loop force-stopped: {event.get('force_stop_reason', 'unknown reason')}")
# Track tool usage
if "current_tool_use" in event and event["current_tool_use"].get("name"):
tool_name = event["current_tool_use"]["name"]
print(f"🔧 Using tool: {tool_name}")
# Show text snippets
if "data" in event:
data_snippet = event["data"][:20] + ("..." if len(event["data"]) > 20 else "")
print(f"📟 Text: {data_snippet}")
agent = Agent(tools=[calculator], callback_handler=None)
async for event in agent.stream_async("What is the capital of France and what is 42+7?"):
process_event(event)

Utilizing both agents as a tool and tool streaming, this example shows how to stream events from sub-agents:

from typing import AsyncIterator
from dataclasses import dataclass
from strands import Agent, tool
from strands_tools import calculator
@dataclass
class SubAgentResult:
agent: Agent
event: dict
@tool
async def math_agent(query: str) -> AsyncIterator:
"""Solve math problems using the calculator tool."""
agent = Agent(
name="Math Expert",
system_prompt="You are a math expert. Use the calculator tool for calculations.",
callback_handler=None,
tools=[calculator]
)
result = None
async for event in agent.stream_async(query):
yield SubAgentResult(agent=agent, event=event)
if "result" in event:
result = event["result"]
yield str(result)
def process_sub_agent_events(event):
"""Shared processor for sub-agent streaming events"""
tool_stream = event.get("tool_stream_event", {}).get("data")
if isinstance(tool_stream, SubAgentResult):
current_tool = tool_stream.event.get("current_tool_use", {})
tool_name = current_tool.get("name")
if tool_name:
print(f"Agent '{tool_stream.agent.name}' using tool '{tool_name}'")
# Also show regular text output
if "data" in event:
print(event["data"], end="")
# Using with async iterators
orchestrator_async_iterator = Agent(
system_prompt="Route math questions to the math_agent tool.",
callback_handler=None,
tools=[math_agent]
)
# With async-iterator
async for event in orchestrator_async_iterator.stream_async("What is 3+3?"):
process_sub_agent_events(event)
# With callback handler
def handle_events(**kwargs):
process_sub_agent_events(kwargs)
orchestrator_callback = Agent(
system_prompt="Route math questions to the math_agent tool.",
callback_handler=handle_events,
tools=[math_agent]
)
orchestrator_callback("What is 3+3?")