Middleware¶
The problem¶
There is a long list of things you want to happen around an agent's model calls that have nothing to do with the agent's actual reasoning: cache identical requests, retry on transient failures, validate the output against a schema, redact PII, log every call for audit, rate-limit a tenant. Stuffing all of that into the agent loop would turn a 50-line ReAct loop into a 500-line mess and make every behaviour non-reusable.
Middleware pulls these cross-cutting concerns out into composable layers that wrap the call.
The onion model¶
Middleware forms an onion around the real work. Each layer can act before the call (inspect/modify the request), pass control inward with call_next(), then act after (inspect/modify the result) as control unwinds.
%%{init: {'theme': 'base', 'themeVariables': {'primaryColor': '#E8EAF6','primaryTextColor': '#1A237E','primaryBorderColor': '#3949AB','lineColor': '#546E7A','fontSize': '13px'}}}%%
graph TB
classDef mw fill:#E8EAF6,stroke:#3949AB,color:#1A237E,font-weight:bold
classDef core fill:#E8F5E9,stroke:#2E7D32,color:#1B5E20,font-weight:bold
subgraph M1["RateLimiter"]
subgraph M2["Cache"]
subgraph M3["Retry"]
subgraph M4["SchemaValidator"]
CORE["the real model call"]:::core
end
end
end
end A request travels inward through every layer to the core, and the response travels outward back through them in reverse. Any layer can short-circuit (the cache returns a hit without calling inward) or abort (a guardrail raises and the inner layers never run).
The contract¶
A middleware is anything with a process method. The pipeline calls it with the shared context and a call_next continuation:
class MiddlewareProtocol(Protocol[ContextT_contra]):
async def process(
self, context: ContextT_contra, call_next: Callable[[], Awaitable[None]]
) -> None: ...
The shape of a layer is always the same:
class TimingMiddleware:
async def process(self, context, call_next):
start = time.monotonic() # ── before
await call_next() # ── go inward
context.metadata["elapsed"] = time.monotonic() - start # ── after
- Do work, then
await call_next(), then do more work. That's the onion. - Skip
call_next()to short-circuit — the inner layers never run (this is how the cache returns a hit). - Raise to abort — typically
MiddlewareTerminationfor a guardrail block.
The MiddlewarePipeline simply threads the layers together: execute(context, final) builds the chain so that the last call_next() invokes final — the actual model call.
%%{init: {'theme': 'base', 'themeVariables': {'actorBkg': '#E8EAF6','actorBorder': '#3949AB','actorTextColor': '#1A237E','signalColor': '#546E7A','noteBkgColor': '#FFFDE7','noteBorderColor': '#F57F17','fontSize': '12px'}}}%%
sequenceDiagram
autonumber
participant P as Pipeline
participant C as Cache
participant R as Retry
participant F as final() — model call
P->>C: process(ctx, call_next)
C->>C: cache miss → continue
C->>R: call_next()
R->>F: call_next()
F-->>R: LLMResponse
R-->>C: (unwinds)
C->>C: store response in cache
C-->>P: done The context is the payload¶
Middleware doesn't return values — it reads and writes a shared context object that flows through the chain. There are three context types depending on what's being wrapped:
| Context | Wraps | Key fields |
|---|---|---|
ChatContext | A single model call | messages, system_instructions, tools, result: LLMResponse |
AgentCallContext | A whole agent run | messages, result: AgentRunResult |
FunctionContext | A single tool call | function_name, arguments, result: ToolExecutionResult |
A layer reads inputs from the context before call_next() and reads or mutates result/metadata after. For example, the schema validator parses context.result and stashes the parsed object in context.metadata["parsed"] rather than mutating the frozen LLMResponse.
What ships in the box¶
These live in agents/middleware/ and follow the contract above:
| Middleware | What it does |
|---|---|
Cache | Returns a stored response for identical requests (short-circuits) |
Retry | Re-attempts the inner call on transient failures with backoff |
RateLimiter | Caps calls per tenant/window |
SchemaValidator | Validates/parses model output against a JSON schema |
AuditLogger | Records every call for compliance |
Observability | Emits spans/metrics around the call |
ContentTruncator / HistoryTruncator | Bound payload size before the call |
FileValidator | Checks file inputs before a tool runs |
| Guardrails | A whole family — see Guardrails |
Compose them in the order you want them to wrap:
from ravi.agents.middleware import MiddlewarePipeline
pipeline = MiddlewarePipeline([
RateLimiter(...),
Cache(...),
Retry(...),
SchemaValidator(...),
])
agent = ReActAgent("bot", model=model, middleware=pipeline)
The Worker runs the pipeline around the agent run; the agent loop runs it around each model call. Either way the agent's own code stays clean.
Middleware vs. Hooks — when to use which¶
Both observe the run, but they differ in power:
- Middleware sits in the call path. It can modify the request, change or replace the result, short-circuit, and abort. Use it when you need to influence behaviour.
- Hooks sit beside the call path. They get read-only notifications and cannot change anything; an exception in a hook is swallowed. Use them for pure observation (metrics, logging) where you must never affect the run.
Where this lives¶
| Piece | Location |
|---|---|
MiddlewarePipeline, MiddlewareProtocol | agents/middleware/pipeline.py |
Context types (ChatContext, …) | agents/middleware/_contracts.py |
| Built-in middlewares | agents/middleware/*.py |
| Guardrail middlewares | agents/middleware/guardrails/ |
Next: Guardrails — the safety-focused middleware family.