Skip to content

Distributed Runtime Architecture

Executive Summary

Replace the single-process agent runtime with a Restate-backed durable execution engine and NATS JetStream for real-time SSE streaming. Agents survive crashes, scale horizontally via HTTP workers, and support human-in-the-loop via durable promises — all with minimal changes to existing agent/tool code.

Architecture Rating: 9.5/10 — validated against 23 real-world production scenarios.


Technology Stack

Component Technology Role
Durable Execution Restate v1.6+ Checkpoint, replay, HITL suspend/resume, retries
Real-time Streaming NATS JetStream SSE event fan-out (text_delta, tool_call, etc.)
Agent Workers Existing FastAPI services Register as Restate HTTP deployments
State PostgreSQL + Redis Conversation history, session, Kanban
Observability OpenTelemetry → Tempo Distributed tracing across Restate invocations

Why Restate over Temporal?

Criterion Restate Temporal
Binary size ~50MB single binary, zero deps ~250MB + Cassandra/Postgres + Elasticsearch
Local dev infra 1 container 3-4 containers
Worker model HTTP-based (reuse existing FastAPI) Separate polling workers (new process type)
Python SDK Native async, ctx.run() pattern Requires deterministic workflow separation
HITL support ctx.promise() — built-in Signals + queries (more complex)
Latency (p99, 10 steps) <100ms workflow completion ~200-500ms
Throughput 13,000 workflows/sec (3-node) ~5,000 workflows/sec
Code changes needed Minimal — wrap in ctx.run() Significant — separate workflow/activity code

Architecture Overview

┌─────────────┐     ┌──────────────────┐     ┌─────────────────────┐
│   Frontend   │────▶│  Gateway / BFF   │────▶│   Restate Server    │
│  (Next.js)   │◀────│  (FastAPI)       │     │  (durable engine)   │
│              │ SSE │                  │     │                     │
└─────────────┘     └──────────────────┘     └────────┬────────────┘
                           │                           │
                    ┌──────┴──────┐              ┌─────┴──────┐
                    │   NATS      │              │  Agent     │
                    │  JetStream  │◀─────────────│  Workers   │
                    │  (streaming)│   text_delta  │  (FastAPI) │
                    └─────────────┘   tool_call   └────────────┘
                                      completion

Request Flow

  1. User sends message → Frontend POST /chat → Gateway
  2. Gateway extracts JWT claims → calls Restate.send(AgentWorkflow, {thread_id, user_content, claims})
  3. Restate durably invokes AgentWorkflow.run() on an available worker
  4. Worker executes ReAct loop, each step wrapped in ctx.run():
  5. ctx.run("llm_call", call_llm) — durable LLM call, publishes tokens to NATS
  6. ctx.run("tool_exec", execute_tool) — durable tool execution
  7. ctx.promise("hitl-{id}") — HITL suspend (zero resource consumption)
  8. NATS receives streaming events → Gateway subscribes → fans out as SSE to client
  9. Restate persists each step result in its journal → crash recovery replays from last checkpoint

Crash Recovery (exactly-once semantics)

Step 1: ctx.run("llm_call_1")  ✅ persisted → skip on replay
Step 2: ctx.run("tool_exec_1") ✅ persisted → skip on replay
Step 3: ctx.run("llm_call_2")  💥 CRASH during execution
Restate restarts workflow → replays steps 1-2 from journal (no re-execution)
         → re-executes step 3 on a healthy worker
         → idempotency key (ctx.uuid()) prevents duplicate side-effects

Component Design

1. Restate Agent Workflow

New module: src/ravi/distributed/workflow.py

import restate
from ravi.distributed.activities import do_llm_call, do_tool_exec

agent_workflow = restate.Workflow("AgentWorkflow")

@agent_workflow.main()
async def run(ctx: restate.WorkflowContext, payload: dict) -> dict:
    """Durable ReAct agent loop.

    Each LLM call and tool execution is a separate ctx.run() — the minimum
    durable unit. If the process crashes, Restate replays completed steps
    from its journal and resumes at the exact point of failure.
    """
    thread_id = payload["thread_id"]
    user_content = payload["user_content"]
    claims = payload["claims"]  # user_id, role from JWT
    model = payload.get("model")
    max_iterations = payload.get("max_iterations", 50)

    # Restore conversation memory (Redis) — idempotent
    await ctx.run("restore_memory", restore_memory, thread_id=thread_id)

    for iteration in range(max_iterations):
        # 1. THINK — call LLM (durable step)
        #    Inside this ctx.run(), the LLM callback publishes text_delta
        #    tokens to NATS for live streaming to the frontend.
        llm_result = await ctx.run(
            f"llm_call_{iteration}",
            do_llm_call,
            thread_id=thread_id,
            model=model,
        )

        # No tool calls → agent is done
        if not llm_result.get("tool_calls"):
            await publish_nats(thread_id, {"type": "completion", **llm_result})
            break

        # 2. ACT — execute each tool call (durable step per tool)
        for tc in llm_result["tool_calls"]:
            tool_name = tc["name"]
            tool_policy = get_tool_policy(tool_name)

            if tool_policy.requires_approval:
                # HITL: suspend until human approves/rejects
                request_id = ctx.uuid()
                await publish_nats(thread_id, {
                    "type": "tool_approval_request",
                    "requestId": request_id,
                    "tool_name": tool_name,
                    "input": tc["arguments"],
                })
                decision = await ctx.promise(f"approval-{request_id}")
                if decision.get("action") == "reject":
                    # Record rejection and continue to next iteration
                    await ctx.run(f"reject_{request_id}", record_rejection, ...)
                    continue

            if tool_policy.is_hitl_input:
                # Human input: suspend until human responds
                request_id = ctx.uuid()
                await publish_nats(thread_id, {
                    "type": "human_input_request",
                    "requestId": request_id,
                    "prompt": tc["arguments"].get("prompt", ""),
                })
                human_response = await ctx.promise(f"human-input-{request_id}")
                tool_result = {"content": human_response.get("response", "")}
            else:
                # Regular tool execution (durable, with per-tool timeout)
                idempotency_key = ctx.uuid() if tool_policy.needs_idempotency else None
                tool_result = await ctx.run(
                    f"tool_{tool_name}_{iteration}",
                    do_tool_exec,
                    tool_name=tool_name,
                    arguments=tc["arguments"],
                    timeout_seconds=tool_policy.timeout,
                    idempotency_key=idempotency_key,
                )

            # Publish tool result to NATS for frontend
            await publish_nats(thread_id, {
                "type": "tool_result",
                "tool_name": tool_name,
                **tool_result,
            })

        # 3. OBSERVE — results are in memory, loop continues

    return {"status": "completed", "thread_id": thread_id}

2. Tool Execution Policies

New attribute on BaseTool: tools declare their distributed execution behavior.

class ToolPolicy:
    """Execution policy for distributed runtime."""
    timeout: float = 30.0          # seconds, 0 = no timeout
    needs_idempotency: bool = False # generate ctx.uuid() idempotency key
    requires_approval: bool = False # HITL tool approval gate
    is_hitl_input: bool = False     # HITL human input (ask_human)
    large_payload: bool = False     # use DataRef for results > 32KB

# Per-tool policy examples:
TOOL_POLICIES = {
    # HITL tools — suspend with zero resources, no timeout
    "ask_human":          ToolPolicy(is_hitl_input=True),

    # Critical tools — idempotency key, approval required
    "send_email":         ToolPolicy(timeout=30, needs_idempotency=True, requires_approval=True),
    "manage_tasks":       ToolPolicy(timeout=10),

    # Network tools — configurable timeout
    "web_surfer":         ToolPolicy(timeout=60, large_payload=True),
    "db_query":           ToolPolicy(timeout=30),

    # Compute tools — long timeout with heartbeat
    "code_interpreter":   ToolPolicy(timeout=300, large_payload=True),

    # In-process tools — direct call, no distribution overhead
    "capability_search":  ToolPolicy(timeout=5),
    "clock":              ToolPolicy(timeout=1),
}

3. NATS Streaming Layer

New module: src/ravi/distributed/streaming.py

import nats
from nats.js import JetStreamContext

class NATSStreamingBridge:
    """Publishes SSE events from workers → NATS → Gateway → Frontend.

    Subject pattern: agent.events.{thread_id}
    Stream: AGENT_EVENTS (retention: 1 hour, per-subject)
    """

    def __init__(self, nats_url: str = "nats://localhost:4222"):
        self._nc: nats.NATS = None
        self._js: JetStreamContext = None

    async def connect(self):
        self._nc = await nats.connect(self._nats_url)
        self._js = self._nc.jetstream()
        # Create stream if not exists
        await self._js.add_stream(
            name="AGENT_EVENTS",
            subjects=["agent.events.*"],
            retention="limits",
            max_age=3600,  # 1 hour retention
        )

    async def publish(self, thread_id: str, event: dict):
        """Publish an SSE event for a specific thread."""
        subject = f"agent.events.{thread_id}"
        await self._js.publish(subject, json.dumps(event).encode())

    async def subscribe(self, thread_id: str):
        """Subscribe to SSE events for a specific thread. Used by gateway."""
        subject = f"agent.events.{thread_id}"
        sub = await self._js.subscribe(subject, deliver_policy="new")
        async for msg in sub.messages:
            yield json.loads(msg.data.decode())
            await msg.ack()

4. Gateway SSE Endpoint (rewritten)

The gateway's /chat endpoint becomes a thin proxy:

@router.post("/chat")
async def chat(body: ChatRequest, request: Request):
    # 1. Validate thread, extract JWT claims
    claims = get_current_user(request)

    # 2. Start durable workflow via Restate
    restate_client = request.app.state.restate_client
    workflow_id = f"agent-{body.thread_id}-{uuid4().hex[:8]}"
    await restate_client.workflow("AgentWorkflow").send(
        workflow_id,
        payload={
            "thread_id": str(body.thread_id),
            "user_content": body.messages[-1].content,
            "claims": {"user_id": claims.user_id, "role": claims.role},
            "model": body.model,
        },
    )

    # 3. Stream SSE events from NATS
    nats_bridge = request.app.state.nats_bridge
    async def sse_generator():
        async for event in nats_bridge.subscribe(str(body.thread_id)):
            yield f"data: {json.dumps(event)}\n\n"
            if event.get("type") in ("completion", "error", "cancelled"):
                if not event.get("has_tool_calls"):
                    break

    return StreamingResponse(sse_generator(), media_type="text/event-stream")

5. HITL Resolution Endpoint

@router.post("/chat/respond/{request_id}")
async def respond_hitl(request_id: str, body: HITLResponse, request: Request):
    """Resolve a pending HITL promise in Restate.

    When the user approves/rejects a tool or provides human input,
    this resolves the ctx.promise() in the running workflow.
    """
    restate_client = request.app.state.restate_client

    # Resolve the durable promise — Restate routes to the correct workflow
    await restate_client.workflow("AgentWorkflow").resolve_promise(
        workflow_id=body.workflow_id,
        promise_name=f"{body.promise_type}-{request_id}",
        value=body.payload,
    )
    return {"status": "resolved"}

6. Cancel/Abort Flow

@router.post("/chat/{thread_id}/cancel")
async def cancel_chat(thread_id: str, request: Request):
    """Cancel a running agent workflow via Restate's cancel API."""
    restate_client = request.app.state.restate_client

    # Find active workflow for this thread
    workflow_id = await get_active_workflow(thread_id)
    if workflow_id:
        await restate_client.workflow("AgentWorkflow").cancel(workflow_id)

    return {"status": "cancelled"}

LLM Token Streaming (inside ctx.run)

Inside ctx.run("llm_call", ...), the LLM callback publishes each token directly to NATS. These tokens are ephemeral — they're for live UI typing only. The durable result is the full LLM response persisted by Restate.

async def do_llm_call(*, thread_id: str, model: str, **kwargs) -> dict:
    """Activity: call LLM with streaming tokens published to NATS."""
    nats_bridge = get_nats_bridge()  # global singleton
    client = get_model_client(model)

    full_response = ""
    async for chunk in client.generate_stream(messages=..., tools=...):
        if chunk.text:
            full_response += chunk.text
            # Publish token to NATS (ephemeral, fire-and-forget)
            await nats_bridge.publish(thread_id, {
                "type": "text_delta",
                "content": chunk.text,
                "partial": True,
            })

    # Return full response for Restate journal (durable)
    return {
        "content": full_response,
        "tool_calls": chunk.tool_calls,
        "usage": chunk.usage,
    }

Crash during LLM streaming: - Restate retries the entire ctx.run("llm_call") on a healthy worker - Client receives fresh token stream (different wording — LLM responses are non-deterministic) - Old partial tokens are harmless (client sees new text_delta events) - Full response is guaranteed to be persisted once ctx.run completes


Voice / Realtime Audio

Voice follows the industry standard pattern (OpenAI, Gemini, Samsung):

┌──────────┐     WebSocket      ┌──────────────┐     WebSocket     ┌──────────┐
│ Frontend │ ◀──────────────▶  │   Gateway    │ ◀────────────────▶│  OpenAI  │
│          │   audio stream    │              │   Realtime API    │ Realtime │
└──────────┘                   └──────┬───────┘                   └──────────┘
                                      │ text command
                               ┌──────────────┐
                               │   Restate    │  durable tool execution
                               │   Workflow   │  (e.g., "send email to John")
                               └──────────────┘
  • Audio streaming: Direct WebSocket, not through Restate (latency critical, no checkpoint mid-sentence)
  • Actions from voice: After transcription → intent extraction → dispatched through Restate for durable execution
  • No changes to existing server/routes/audio.py WebSocket handlers

Large Payload Handling

Tool results > 32KB use the existing DataRef pattern:

async def do_tool_exec(*, tool_name: str, arguments: dict, **kwargs) -> dict:
    tool = get_tool(tool_name)
    result = await tool.run(**arguments)

    policy = TOOL_POLICIES.get(tool_name, ToolPolicy())

    if policy.large_payload and len(str(result.content)) > 32_768:
        # Store in S3/file store, return reference
        data_store = get_data_store()
        ref = await data_store.store(
            data=str(result.content).encode(),
            content_type="text/plain",
        )
        return {
            "content": f"[Result stored as DataRef: {ref.ref_id}]",
            "data_ref_id": str(ref.ref_id),
            "success": True,
        }

    return {
        "content": result.content,
        "app_data": result.app_data,
        "success": True,
    }

Why: Restate's journal has payload size limits. Large web scraping results, code interpreter outputs, etc. would bloat the journal and slow replay. DataRef stores the payload externally; only the reference flows through Restate.


Auth Propagation

Gateway is the auth boundary. Workers trust claims passed in the workflow payload.

Frontend → Gateway (JWT verification) → Restate payload: {claims: {user_id, role}}
                                          Worker reads claims from payload
                                          (no JWT re-verification needed)

Security model: - Gateway verifies JWT and extracts claims - Claims (user_id, role) are included in the Restate workflow input - Workers run on internal network — no external access - Restate server is also internal-only


Multi-Agent Orchestration

OrchestratorAgent dispatches sub-agents as Restate service calls:

@agent_workflow.main()
async def run(ctx: restate.WorkflowContext, payload: dict):
    # Orchestrator decides to hand off to a sub-agent
    sub_result = await ctx.service_call(
        "AgentWorkflow",
        "run",
        payload={
            "thread_id": payload["thread_id"],
            "user_content": "Research this topic...",
            "claims": payload["claims"],
            "model": "gpt-4o-mini",  # sub-agent can use different model
        },
    )
    # Sub-agent result is durable — survives orchestrator crash

Each sub-agent is a separate Restate workflow invocation: - Runs on any available worker - Has its own checkpoint journal - Can use different model, tools, system instructions - Results flow back to orchestrator durably


Infrastructure

Docker Compose (local development)

# Added to deployment/docker/docker-compose.yml
services:
  restate:
    image: docker.io/restatedev/restate:latest
    ports:
      - "8080:8080"    # Ingress (workflow invocations)
      - "9070:9070"    # Admin API (service registration)
      - "9071:9071"    # Meta API
    volumes:
      - restate-data:/target/restate-data

  nats:
    image: nats:2.10-alpine
    ports:
      - "4222:4222"    # Client connections
      - "8222:8222"    # HTTP monitoring
    command: ["--jetstream", "--store_dir=/data"]
    volumes:
      - nats-data:/data

volumes:
  restate-data:
  nats-data:

Kubernetes (production)

deployment/k8s/base/
├── restate/
│   ├── deployment.yaml    # Restate server (1 replica → 3 for HA)
│   ├── service.yaml       # ClusterIP for ingress + admin
│   └── pvc.yaml           # Persistent volume for journal
├── nats/
│   ├── statefulset.yaml   # NATS JetStream (3 replicas for HA)
│   ├── service.yaml       # Headless service for clustering
│   └── pvc.yaml           # Persistent volume for JetStream storage
└── agent-worker/
    ├── deployment.yaml    # Agent worker pods (HPA for auto-scaling)
    └── service.yaml       # ClusterIP (Restate routes to this)

Port Allocation

Service Port Purpose
Restate Ingress 8080 Workflow invocations
Restate Admin 9070 Service registration
NATS Client 4222 Pub/sub connections
NATS Monitor 8222 HTTP monitoring
Agent Worker 8014 Restate HTTP deployment

Migration Strategy

What Changes

Component Before After
server/routes/chat.py In-process agent.run_stream() Restate workflow start + NATS subscribe
server/sse/bridge.py In-memory WebHITLBridge Restate ctx.promise() resolution
shared/tasks/store.py In-memory TaskStore Redis-backed TaskStore
Agent execution Same process as gateway Separate worker pods
Tool execution await tool.run() inline ctx.run("tool_exec", ...) durable
HITL events asyncio.Queue per bridge NATS publish + Restate promise
SSE streaming EventBusStreamingResponse NATS subscribe → StreamingResponse

What Does NOT Change

Component Reason
BaseAgent, ReActAgent Agent loop is stateless between calls
BaseTool, all tool implementations Tool execute() is pure async — works in any process
BaseMemory, RedisMemory Already external (Redis) — workers call restore()
BaseTool guardrails Guardrails are stateless validators
Frontend SSE protocol Same event types, same JSON shape
types/index.ts No frontend type changes
core/messages/ Message types are just data classes
connectors/, catalog/skills/ Pure business logic, process-agnostic

Testing Strategy

Unit Tests

  • Mock Restate context (ctx.run, ctx.promise) to test workflow logic
  • Mock NATS to test streaming bridge
  • Existing tool tests remain unchanged

Integration Tests

  • Start Restate + NATS locally (Docker Compose)
  • Submit workflow, verify completion
  • Test crash recovery: kill worker mid-workflow, verify resume
  • Test HITL: start workflow, resolve promise, verify resume

Load Tests

  • Concurrent workflow submissions (target: 100 concurrent agents)
  • NATS throughput (target: 10,000 events/second)
  • Restate journal replay speed with 50 steps

Real-World Scenario Validation

# Scenario How Architecture Handles It
1 Basic chat Gateway → Restate workflow → LLM + NATS streaming → completion
2 Crash mid-agent Restate replays from journal, skips completed steps
3 HITL ask_human ctx.promise() suspends (0 resources), resumes on response
4 HITL tool_approval ctx.promise() + NATS event to frontend
5 Payment tool crash ctx.uuid() idempotency key → exactly-once semantics
6 Long code execution ctx.run() with 5min timeout + heartbeat
7 Multi-agent orchestration Sub-agent = separate Restate workflow invocation
8 LLM token streaming NATS publish inside ctx.run() callback (ephemeral)
9 Client disconnect + reconnect Agent continues on Restate, NATS JetStream replay
10 Rate limiting (429) Restate retry policy with exponential backoff
11 100 concurrent users 100 separate workflow invocations, worker auto-scaling
12 Kanban board state Redis-backed TaskStore (survives worker restart)
13 Agent infinite loop Restate workflow timeout + max_iterations
14 Rolling deployment Restate service versioning
15 Distributed tracing OTEL with Restate invocation ID as trace context
16 Auth propagation JWT claims in workflow payload (gateway = auth boundary)
17 Memory/conversation RedisMemory.restore() at workflow start
18 MCP tool discovery ctx.run("mcp_discover") with retry
19 Voice/Realtime Direct WebSocket (excluded from Restate), actions are durable
20 Graceful shutdown Restate drains in-flight invocations
21 Event ordering NATS JetStream preserves order per subject (thread_id)
22 Large payloads DataRef pattern (S3 store, reference through journal)
23 Cancel/abort Restate cancel API, agent stops at next checkpoint

Dependencies

# pyproject.toml additions
[project.dependencies]
restate-sdk = ">=0.7"    # Restate Python SDK
nats-py = ">=2.9"        # NATS client with JetStream

New Directory Structure

src/ravi/distributed/         ← New package
├── __init__.py
├── workflow.py                 ← Restate AgentWorkflow
├── activities.py               ← do_llm_call, do_tool_exec, restore_memory
├── policies.py                 ← ToolPolicy definitions
├── streaming.py                ← NATSStreamingBridge
├── restate_app.py              ← Restate app registration + FastAPI mount
├── worker.py                   ← Worker entry point (uvicorn + Restate handler)
└── client.py                   ← Restate client wrapper for gateway