Agent Types¶
What this is¶
The kernel (L0) defines what an agent is in a single sentence — an object with an id and one method, async run(ctx, inbox). But a Protocol can't think. It's a shape, an empty promise. This page is about the classes that actually fill that shape with intelligence — the concrete agents that live at layer L1 (agents/).
Every agent here is a normal Python class that satisfies the kernel Agent Protocol. There is no base class to inherit, no @agent decorator. If your class has an id attribute and an async def run(self, ctx, inbox), the runtime will happily run it. The classes on this page are the batteries-included ones Ravi ships with.
This page is the implementation, the kernel page is the contract
The Runtime Contracts defines the Agent Protocol and the ctx your run() receives. The Agent Model tells the story — why agents are addresses, not objects. This page is the concrete middle: the real classes, real method names, real code. Read those two for the "why" and the "rules"; read this for "what Ravi actually gives you and how each one works."
These classes may import from kernel (the layer below) but never from capabilities or fabric (the layers above). That's the one architectural rule that keeps the agent loop reusable everywhere.
The cast, at a glance¶
| Agent | Role (plain English) | When to reach for it |
|---|---|---|
ReActAgent | A worker who thinks, picks a tool, uses it, looks at the result, repeats | The default. Any chatbot or task-doer that uses tools. The centerpiece. |
OrchestratorAgent | A manager who breaks a job up and delegates to specialists | A task that needs several different sub-agents coordinated |
UserProxyAgent | A stand-in for the human at the keyboard | Pausing a run to ask a person a question (HITL) |
InformationAgent | A newsroom that summarizes a source and broadcasts it | Demo: producer side of a feed (RSS / YouTube monitor) |
PersonalFeedAgent | A subscriber who curates what the newsroom broadcasts | Demo: consumer side of a feed (per-user curation) |
%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#E8EAF6','primaryTextColor': '#1A237E','primaryBorderColor': '#3949AB','lineColor': '#546E7A','fontSize': '13px'}}}%%
classDiagram
class Agent {
<<Protocol — kernel L0>>
+AgentId id
+run(ctx, inbox) None
}
class ReActAgent {
+AgentId id
+run(ctx, inbox)
-_handle_message(ctx, msg)
-_react_loop(...)
}
class OrchestratorAgent {
+AgentId id
+run(ctx, inbox)
-_handle_message(ctx, msg)
-_build_tools()
}
class UserProxyAgent {
+AgentId id
+run(ctx, inbox)
}
class InformationAgent {
+AgentId id
+run(ctx, inbox)
}
class PersonalFeedAgent {
+AgentId id
+run(ctx, inbox)
}
Agent <|.. ReActAgent
Agent <|.. OrchestratorAgent
Agent <|.. UserProxyAgent
Agent <|.. InformationAgent
Agent <|.. PersonalFeedAgent They all share the same front door
Notice every class has the exact same public surface: an id and a run(ctx, inbox). The Worker calls run and doesn't care which class it got. That uniformity is the whole point — an orchestrator can spawn a ReActAgent, a ReActAgent can ask a UserProxyAgent, all through the same address-and-message machinery.
ReActAgent — the centerpiece¶
What & why: ReActAgent runs the ReAct pattern — Reason + Act. The model reasons about the task, decides to act by calling a tool, sees the tool's result, reasons again, and keeps going until it has a final answer (or runs out of its iteration budget).
Analogy — a careful worker at a workbench
Picture someone fixing a bike. They think ("the chain is loose"), pick a tool (a wrench), use it, look at the result ("still loose"), think again, pick another tool, and repeat — until the bike is fixed or they've tried everything they're allowed to. The model is the brain; the tools are the wrench set; the loop is the patient back-and-forth.
From run() to the loop¶
A run touches three methods, each smaller and more focused than the last:
%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#E8EAF6','primaryTextColor': '#1A237E','primaryBorderColor': '#3949AB','lineColor': '#546E7A','fontSize': '13px'}}}%%
flowchart TD
classDef agent fill:#E8EAF6,stroke:#3949AB,color:#1A237E,font-weight:bold
classDef runtime fill:#E3F2FD,stroke:#1565C0,color:#0D47A1
classDef terminal fill:#E8F5E9,stroke:#2E7D32,color:#1B5E20,font-weight:bold
WK(["Worker leases the run"]):::runtime --> RUN["run(ctx, inbox)<br/>loop over each Message"]:::agent
RUN --> HM["_handle_message(ctx, msg)<br/>load history, build messages"]:::agent
HM --> RL["_react_loop(...)<br/>the Reason plus Act cycle"]:::agent
RL --> DONE(["persist turns and deliver the answer"]):::terminal run(ctx, inbox) is tiny. It stamps a couple of context variables (so tools like the Kanban board know which agent and label they belong to), then walks the inbox one message at a time. Between messages it calls ctx.check() — the cooperative cancellation point that lets a parent or a deadline interrupt it cleanly.
async def run(self, ctx: RunContext, inbox: list[Message]) -> None:
_task_agent_id.set(str(self.id))
_task_agent_label.set(self.name)
for msg in inbox:
ctx.check()
await self._handle_message(ctx, msg)
_handle_message(ctx, msg) sets up the conversation. It derives a session_id (the conversation thread — falls back to ctx.run_id if the message carries no correlation_id), loads the prior history, converts the incoming message to a chat turn, and hands the assembled messages list to the loop.
async def _handle_message(self, ctx: RunContext, msg: Message) -> None:
session_id = msg.correlation_id or ctx.run_id
_task_thread_id.set(session_id)
_task_parent_agent_id.set(msg.metadata.get("parent_agent_id") or None)
history_messages = await load_history(self._context, self.id, session_id)
user_turn = message_to_chat(msg)
messages: list[ChatMessage] = history_messages + [user_turn]
await self._react_loop(ctx, msg, session_id, messages, len(history_messages))
Why the ContextVars are stamped here, inside the Worker task
The Worker runs in a different asyncio context from the code that submitted the message. ContextVars set outside don't cross that boundary — so _task_thread_id and _task_parent_agent_id are set inside _handle_message, which is the code actually running in the Worker task. This is a real footgun the team hit and fixed; don't move these.
The ReAct loop itself¶
_react_loop is the engine. It iterates up to max_iterations times. Each pass:
- Compact a view of the messages (more on this below).
- Call the model via
await ctx.llm(...). - Record the assistant turn and (if any) charge the execution budget.
- Look for tool calls. No tool calls → the model is done; break out.
- Run each tool via
await ctx.tool(name, **args), collect the results, append them as aTOOLturn, and loop again.
%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#E3F2FD','primaryTextColor': '#0D47A1','primaryBorderColor': '#1565C0','lineColor': '#546E7A','fontSize': '13px'}}}%%
flowchart TD
classDef process fill:#E8EAF6,stroke:#3949AB,color:#1A237E
classDef decision fill:#FFF3E0,stroke:#E65100,color:#BF360C,font-weight:bold
classDef terminal fill:#E8F5E9,stroke:#2E7D32,color:#1B5E20,font-weight:bold
classDef error fill:#FFEBEE,stroke:#C62828,color:#B71C1C
START(["enter loop — iteration counter at 0"]):::terminal --> CHK["ctx.check() — honor cancellation"]:::process
CHK --> COMP["compact a VIEW of messages<br/>(pipeline.compact)"]:::process
COMP --> LLM["resp = await ctx.llm(llm_messages, options)"]:::process
LLM --> BUD["consume execution budget<br/>(tokens plus 1 turn)"]:::process
BUD --> APP["append assistant turn to messages"]:::process
APP --> Q{"any ToolUseBlock<br/>in the response?"}:::decision
Q -->|"no — final answer"| FINI["persist new turns then deliver"]:::process
FINI --> DONE(["run complete"]):::terminal
Q -->|"yes"| TOOLS["for each call — await ctx.tool(name, args)<br/>collect ToolResultBlocks"]:::process
TOOLS --> APP2["append TOOL turn to messages"]:::process
APP2 --> CAP{"iterations exhausted?"}:::decision
CAP -->|"no"| CHK
CAP -->|"yes"| BUDX["raise BudgetExhaustedError"]:::error Compaction runs before every model call¶
This is the detail people miss. Tool results can be huge (a 5,000-line file, a web page), and they pile up across iterations. So before each ctx.llm() call, the agent compacts:
# Compact a *view* of messages here and keep the full list intact for persistence.
llm_messages = await self._context.pipeline.compact(messages)
resp = await ctx.llm(llm_messages, options=options)
The key word is view. compact() returns a trimmed copy that goes to the model; the full untrimmed messages list is what gets persisted afterwards. The model sees a manageable window; the conversation record stays complete. See the compaction pipeline for the strategies.
initial_tool_choice forces the first move only
If you pass initial_tool_choice, the loop applies it to the very first LLM call (e.g. "you must call the search tool first"), then immediately reverts to the unconstrained base_options so later iterations are free to answer with text or call other tools.
Tool dispatch goes through ctx.tool()¶
The agent never calls a tool's execute() directly. It asks the context:
tool_calls = [b for b in resp.content if isinstance(b, ToolUseBlock)]
if not tool_calls:
break
results: list[ToolResultBlock] = []
for tc in tool_calls:
ctx.check()
inv_result = await ctx.tool(tc.tool_name, **tc.arguments)
results.append(
ToolResultBlock(
call_id=tc.call_id,
content=[TextBlock(text=inv_result.text or "")],
is_error=inv_result.status != "ok",
)
)
messages.append(ChatMessage(role=Role.TOOL, content=results))
Routing through ctx.tool() is what makes a tool call journaled — recorded once so that on a crash-and-replay it returns the cached result instead of re-running. That's the durability guarantee from Durability, and the agent gets it for free just by calling ctx.tool() instead of the tool object.
The iteration cap is a hard budget¶
The loop is a Python for ... range(self._max_iterations). If it completes all iterations without breaking (i.e. the model kept calling tools and never settled on an answer), the for/else fires:
for _ in range(self._max_iterations):
...
else:
from ravi.kernel.core.errors import BudgetExhaustedError
raise BudgetExhaustedError(
f"Agent reached max iterations limit ({self._max_iterations})"
)
A runaway agent fails loudly with BudgetExhaustedError rather than looping forever and burning tokens. When the loop does break normally, the agent persists the new turns and delivers the answer:
new_turns = messages[n_loaded:]
await persist_turns(self._context, self.id, session_id, ctx.run_id, new_turns)
ans = final_text(messages)
await deliver(ctx, msg, {"text": ans}, sender=self.id, output_topic=self._output_topic)
A run, end to end¶
%%{init: {'theme': 'base', 'themeVariables': {'actorBkg': '#E8EAF6','actorBorder': '#3949AB','actorTextColor': '#1A237E','noteBkgColor': '#FFFDE7','noteBorderColor': '#F57F17','signalColor': '#546E7A','fontSize': '12px'}}}%%
sequenceDiagram
autonumber
participant WK as Worker
participant AG as ReActAgent
participant CTX as RunContext
participant LLM as Model (ctx.llm)
participant TL as Tool (ctx.tool)
WK->>AG: run(ctx, inbox)
activate AG
AG->>AG: _handle_message — load history, build messages
loop until final answer or max_iterations
AG->>CTX: compact a view of messages
AG->>LLM: ctx.llm(llm_messages, options)
activate LLM
LLM-->>AG: assistant turn (maybe tool calls)
deactivate LLM
alt response has tool calls
AG->>TL: ctx.tool(name, args)
activate TL
TL-->>AG: ToolExecutionResult
deactivate TL
AG->>AG: append TOOL turn, loop again
else no tool calls
AG->>AG: break — this is the final answer
end
end
AG->>CTX: persist_turns then deliver(answer)
AG-->>WK: returns None
deactivate AG What gets configured at construction
ReActAgent.__init__ accepts the model, tools (a ToolRegistry or a plain list, auto-wrapped in a Toolbox), a ContextConfig (history provider + compaction pipeline), system_instructions, max_iterations, approval_handler / approval_required_risk (for tool approval gates), an ExecutionTracker budget, lifecycle hooks, and a middleware pipeline. Most of these are optional — the minimal agent in the agent model sets only model, tools, context, and instructions.
OrchestratorAgent — the manager¶
What & why: Some jobs are too big or too varied for one agent. An OrchestratorAgent holds a roster of sub-agents, each declared as a SubAgentConfig. The model decides which specialist to delegate to by calling a tool whose name maps to that sub-agent. The orchestrator then spawns the sub-agent as its own run and asks it for the answer.
Analogy — a project manager
A PM doesn't write the code, design the logo, and run the ads themselves. They break the project into pieces and hand each to a specialist, then collect the results and write the summary. The orchestrator is that PM; the sub-agents are the specialists; ctx.spawn is "hey, start on this" and ctx.ask is "are you done? what did you find?"
Sub-agents become delegation tools¶
Each SubAgentConfig carries the sub-agent, a description, an ask_timeout, and a Priority:
@dataclass
class SubAgentConfig:
agent: Agent
description: str = ""
ask_timeout: float = 120.0
priority: Priority = Priority.NORMAL
At message time, _build_tools() synthesizes one _DelegateTool per sub-agent, named handoff_<key>. These are fake tools — their execute() deliberately raises if called. They exist only so the model sees them in GenerationOptions(tools=...) and can pick one. The actual work is done by the orchestrator, not the tool.
def _build_tools(self) -> list[AnyTool]:
return [
_DelegateTool(
name=f"handoff_{cfg.agent.id.key}",
description=cfg.description or f"Delegate to the {cfg.agent.id.key} sub-agent",
)
for cfg in self._sub_agents
]
Spawn, ask, and the SpawnTracker¶
The orchestrator runs its own ReAct-style loop. When the model emits a delegation call, the orchestrator:
- Looks up the matching
SubAgentConfig. - Calls
spawn_tracker.acquire(...)— theSpawnTrackerenforces theSpawnBudget(a headcount cap with priority preemption) so you can't fork an unbounded swarm. - Builds a boot
Messagefor the sub-agent, stamping its own id asparent_agent_idso the sub-agent's task board nests under it in the UI. await ctx.spawn(...)starts the sub-agent run, thenawait ctx.ask(handle, ..., timeout=...)waits for the reply.- Releases the tracker slot in a
finally— slot freed even on error.
spawn_tracker.acquire(cfg.agent.id, priority=cfg.priority)
task_text = dispatch.arguments.get("task", str(dispatch.arguments))
boot_msg = Message(
target=cfg.agent.id, sender=self.id,
payload=ChatPayload(message=ChatMessage(role=Role.USER,
content=[TextBlock(text=str(task_text))])),
correlation_id=session_id,
metadata={"parent_agent_id": str(self.id)},
)
try:
handle = await ctx.spawn(cfg.agent.id, boot=boot_msg)
outcome = await ctx.ask(handle, boot_msg, timeout=cfg.ask_timeout)
finally:
spawn_tracker.release(cfg.agent.id)
Branch on outcome.kind — never assume it replied
ctx.ask returns an AskOutcome. The orchestrator checks outcome.kind == "replied" before reading outcome.result. Any other outcome (timed_out, target_failed, target_cancelled) becomes an error ToolResultBlock fed back to the model — it does not crash the orchestrator. This is the canonical "don't treat a timeout as a failure" discipline from the runtime contracts.
%%{init: {'theme': 'base', 'themeVariables': {'actorBkg': '#E8EAF6','actorBorder': '#3949AB','actorTextColor': '#1A237E','noteBkgColor': '#FFFDE7','noteBorderColor': '#F57F17','signalColor': '#546E7A','fontSize': '12px'}}}%%
sequenceDiagram
autonumber
participant LLM as Model
participant OR as OrchestratorAgent
participant ST as SpawnTracker
participant CTX as RunContext
participant SUB as Sub-agent run
OR->>LLM: ctx.llm(messages, tools = handoff_*)
LLM-->>OR: ToolUseBlock — handoff_researcher
OR->>ST: acquire(researcher, priority)
OR->>CTX: ctx.spawn(researcher, boot)
CTX-->>OR: RunHandle
OR->>CTX: ctx.ask(handle, boot, timeout)
activate SUB
Note over SUB: sub-agent runs its own ReAct loop
SUB-->>CTX: replies with result
deactivate SUB
CTX-->>OR: AskOutcome(kind = replied, result)
OR->>ST: release(researcher)
OR->>LLM: feed result back, loop or finish The orchestrator is itself just an Agent
It has the same id + run(ctx, inbox) shape as everything else, and it can be spawned by another orchestrator. Delegation is recursive because there's nothing special about being a parent — it's all the same message-passing, one level up. See Supervision & Budgets for how SpawnBudget and priority preemption bound the tree.
UserProxyAgent — the human in the loop¶
What & why: Sometimes an agent needs a person, not a model — "should I really send this $10,000 invoice?" The UserProxyAgent is an agent that represents the human. It has no model and no tools. Its whole job is to receive a question, surface it, go to sleep for free, and wake up when a human answers.
Analogy — a receptionist with a callback slip
A coworker asks the receptionist a question for the boss. The receptionist writes it on a slip, pins it to the board (logs it), and stops waiting at the desk — they go do other things at zero cost. When the boss finally answers, a bell rings (a signal), and the receptionist relays the answer back to whoever asked.
async def _handle_message(self, ctx: RunContext, msg: Message) -> None:
cid = msg.correlation_id
question_text = self._extract_text(msg)
await ctx._log("hitl.question", {"correlation_id": cid, "text": question_text})
signal_name = f"human_reply:{cid}"
human_payload = await ctx.sleep_until_signal(signal_name)
if msg.reply_to:
await ctx.reply(msg, human_payload)
The magic line is await ctx.sleep_until_signal(...). The run goes SUSPENDED — zero RAM, zero CPU, just rows in storage. The serving layer surfaces the logged hitl.question to the user (over SSE), and when the human replies it calls SignalBus.signal(run_id, "human_reply:<cid>", {...}). That signal wakes the proxy, which relays the answer back to the agent that asked via ctx.reply.
Waiting is free — that's the point
A proxy parked for three hours waiting on a human costs nothing because SUSPENDED runs are not threads, they're table rows. The full story is in Human-in-the-Loop.
InformationAgent and PersonalFeedAgent — the demo pair¶
These two are demo-grade, not production agents
InformationAgent and PersonalFeedAgent exist to prove a use case: the social / pub-sub model (an agent that follows another agent's broadcasts). They're deliberately simple — no history, no tools, no compaction — and are best read as worked examples of the FollowGraph + fan-out machinery, not as something you'd ship as-is.
Together they demonstrate a producer/consumer feed with no polling:
%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#E8EAF6','primaryTextColor': '#1A237E','primaryBorderColor': '#3949AB','lineColor': '#546E7A','fontSize': '13px'}}}%%
flowchart LR
classDef agent fill:#E8EAF6,stroke:#3949AB,color:#1A237E,font-weight:bold
classDef runtime fill:#E3F2FD,stroke:#1565C0,color:#0D47A1,font-weight:bold
classDef store fill:#F3E5F5,stroke:#6A1B9A,color:#4A148C,font-weight:bold
classDef external fill:#FFF3E0,stroke:#E65100,color:#BF360C
SRC(["new source item<br/>(RSS / YouTube / X)"]):::external --> IA["InformationAgent<br/>summarize via ctx.llm"]:::agent
IA -->|"ctx.emit(topic, summary)"| FAN["fan-out via FollowGraph"]:::runtime
FAN -->|"deliver"| PFA1["PersonalFeedAgent<br/>(user 42)"]:::agent
FAN -->|"deliver"| PFA2["PersonalFeedAgent<br/>(user 99)"]:::agent
PFA1 -->|"ctx._log feed.curated"| LOG[("EventLog")]:::store
PFA2 -->|"ctx._log feed.curated"| LOG InformationAgent is the producer. On boot it summarizes any inbox item via the shared summarize helper, emits the result to its output_topic, then sleeps in a loop waiting for the next new_source_item signal:
async def run(self, ctx: RunContext, inbox: list[Message]) -> None:
for msg in inbox:
ctx.check()
await self._process_item(ctx, msg)
while True:
ctx.check()
item_payload = await ctx.sleep_until_signal(self._source_signal)
await self._process_item_from_signal(ctx, item_payload)
PersonalFeedAgent is the consumer. On its first wake it subscribes to its topics via ctx.follow(topic) (the subscription is durable in the FollowGraph, so it survives across runs), curates each delivered item against the user's preferences, logs a feed.curated entry, and sleeps until the next delivery:
async def run(self, ctx: RunContext, inbox: list[Message]) -> None:
if not self._subscribed:
for topic in self._follow_topics:
await ctx.follow(topic)
self._subscribed = True
for msg in inbox:
ctx.check()
await self._curate(ctx, msg)
await ctx.sleep_until_signal("new_feed_item")
The takeaway: producers emit, consumers follow, and the runtime's fan-out does the delivery — the same Inbox / FollowGraph contracts described in the runtime page.
The shared helpers: _loop.py¶
Every agent above repeats the same conversation chores: turn a Message into a chat turn, load history, persist new turns, extract the final answer, deliver the result. Rather than copy-paste, those live in one small module, agents/core/_loop.py, and are imported by all the agents.
| Helper | What it does |
|---|---|
message_to_chat(msg) | Converts an inbox Message (ChatPayload / DataPayload / other) into a USER ChatMessage |
load_history(cfg, agent_id, session_id) | Reads session history from the provider and runs the compaction pipeline over it |
persist_turns(cfg, agent_id, session_id, run_id, new_turns) | Appends the new turns back to the history provider via append_many |
final_text(messages) | Walks the list backwards and returns the text of the last ASSISTANT turn |
deliver(ctx, src_msg, result, *, sender, output_topic=None) | If the source had a reply_to, calls ctx.reply; else emits to output_topic (if given) |
summarize(ctx, text, *, instructions) | A one-shot journaled ctx.llm() summarization pass — used by the demo agents |
deliver chooses reply-vs-emit for you
A request/response call (someone used ctx.ask, so reply_to is set) gets ctx.reply. A fire-and-forget broadcast (no reply_to, but an output_topic) gets ctx.emit. Same helper, two routing modes — that's how ReActAgent answers a user and how InformationAgent broadcasts to a topic with the same code path.
The factory: convenience constructors¶
Wiring a ReActAgent by hand means assembling a Toolbox, a ContextConfig, and a CompactionPipeline. agents/factory.py packages the common recipes so serving code (monolith and the distributed path) builds agents the same way.
-
create_assistant_agent(...)— the everyday constructor. Give it amodel_client, optionaltools,system_instructions, and amemoryprovider; it builds theToolbox, defaults the compaction pipeline to aSlidingWindowCompaction, and returns a ready (but unregistered)ReActAgent. You still callawait runtime.register(agent)yourself — the factory stays decoupled from the runtime so it's testable without live infra. -
load_session_memory(...)— the cold-start helper. Given asession_idand aload_persisted_stepscallback, it checks whether aHistoryProvideralready has that session; on a miss it seeds the session from cold storage (rebuildingChatMessageturns from persisted step rows) and returns the provider. This is how a conversation resumes after a restart without re-reading everything every turn. -
rebuild_messages_from_steps(...)— the lower-level converter that turns persisted step rows (user_message,assistant_message,tool_result, …) back into the unifiedChatMessagelist the agent loop expects.
There's also rebuild_agent(spec, ...)
For full cold resume, rebuild_agent reconstructs a whole ReActAgent from the spec dict saved at submit time (model, system_instructions, tool_names, max_iterations, session_id, …) — pairing with load_session_memory to bring a crashed conversation fully back to life.
Where this lives¶
| Concept | Source file |
|---|---|
ReActAgent | agents/core/react.py |
OrchestratorAgent, SubAgentConfig | agents/core/orchestrator.py |
UserProxyAgent | agents/core/proxy.py |
InformationAgent | agents/core/information_agent.py |
PersonalFeedAgent | agents/core/personal_feed_agent.py |
Shared helpers (message_to_chat, load_history, persist_turns, final_text, deliver, summarize) | agents/core/_loop.py |
create_assistant_agent, load_session_memory, rebuild_messages_from_steps, rebuild_agent | agents/factory.py |
SpawnTracker, SpawnBudget | agents/supervision/budget.py, kernel/agent/supervision.py |
ExecutionTracker (budget) | agents/resources/budget.py |
The Agent Protocol these implement | kernel/runtime/agent.py |
Next: The In-Process Runtime — the Runtime facade and Worker that actually call run(ctx, inbox), lease runs, and drive the durable loop these agents ride on.