Distributed Runtime — Implementation Plan
Phase Overview
| Phase | Focus | Deliverable |
| Phase 1 | Foundation | NATS + Restate infra, distributed/ package skeleton |
| Phase 2 | Core Workflow | Durable ReAct loop in Restate, LLM streaming via NATS |
| Phase 3 | Tool Execution | Per-tool policies, idempotency, DataRef integration |
| Phase 4 | HITL | Durable promises for ask_human + tool_approval |
| Phase 5 | Gateway Rewrite | Replace monolith chat route with Restate + NATS |
| Phase 6 | Multi-Agent | OrchestratorAgent via Restate service calls |
| Phase 7 | Production Hardening | Cancel, observability, error handling, TaskStore |
| Phase 8 | Deployment | Docker Compose + k8s manifests, worker scaling |
Phase 1: Foundation
1.1 Add dependencies
# pyproject.toml
restate-sdk = ">=0.7"
nats-py = ">=2.9"
1.2 Docker Compose infra
- Add
restate service (port 8080/9070) - Add
nats service with JetStream (port 4222/8222) - Add volumes for persistence
1.3 Package skeleton
src/ravi/distributed/
├── __init__.py
├── workflow.py (stub)
├── activities.py (stub)
├── policies.py (stub)
├── streaming.py (NATSStreamingBridge)
├── restate_app.py (Restate app setup)
├── worker.py (worker entry point)
└── client.py (Restate client wrapper)
1.4 NATS streaming bridge
NATSStreamingBridge.connect() / disconnect() / publish() / subscribe() - JetStream stream
AGENT_EVENTS with subject pattern agent.events.{thread_id} - 1-hour retention per subject
1.5 Settings
# configs/settings.py additions
RESTATE_INGRESS_URL: str = "http://localhost:8080"
RESTATE_ADMIN_URL: str = "http://localhost:9070"
NATS_URL: str = "nats://localhost:4222"
1.6 Verification
- Restate server starts and accepts registrations
- NATS JetStream accepts publish/subscribe
- Unit tests for NATSStreamingBridge
Phase 2: Core Workflow
2.1 Restate AgentWorkflow
workflow.py: AgentWorkflow with run() main handler - Durable ReAct loop:
ctx.run("llm_call") → ctx.run("tool_exec") → repeat - Max iterations from payload
2.2 LLM Activity
activities.py: do_llm_call() — call LLM, publish text_delta to NATS - Restore memory from Redis at start
- Handle streaming callback → NATS publish (ephemeral tokens)
- Return full response for journal persistence
2.3 Restate App Registration
restate_app.py: Create Restate app, bind AgentWorkflow worker.py: Entry point that serves the Restate handler via hypercorn/uvicorn - Register with Restate admin API on startup
2.4 Verification
- Start worker, submit workflow via Restate HTTP API
- Verify LLM call executes and tokens appear in NATS
- Kill worker mid-LLM-call, verify retry on restart
policies.py: ToolPolicy dataclass + TOOL_POLICIES dict - Derive from existing
BaseTool.risk and BaseTool.hitl_mode attributes - timeout, needs_idempotency, requires_approval, is_hitl_input, large_payload
activities.py: do_tool_exec() — lookup tool, apply policy, execute - Idempotency key via
ctx.uuid() for critical tools - DataRef pattern for large results (>32KB → S3/file store)
- Timeout enforcement via
asyncio.wait_for()
- Publish
tool_call event to NATS before execution - Publish
tool_result event to NATS after execution - Include risk/color metadata from tool schema
3.4 Verification
- Execute workflow with tool-calling agent
- Verify tool results appear in NATS
- Test DataRef for large tool output (web_surfer HTML)
- Test idempotency: kill worker after tool exec, verify no re-execution
Phase 4: HITL
- In workflow:
ctx.promise(f"approval-{request_id}") suspends execution - Publish
tool_approval_request to NATS - Frontend shows ToolApprovalCard
- User response → API call → resolves Restate promise
- In workflow:
ctx.promise(f"human-input-{request_id}") suspends execution - Publish
human_input_request to NATS - Frontend shows HumanInputCard
- User response → API call → resolves Restate promise
4.3 HITL Resolution Endpoint
- New route:
POST /chat/respond/{request_id} → resolves Restate promise - Payload:
{workflow_id, promise_type, payload} - Works identically to existing endpoint but backed by Restate
4.4 Verification
- Start workflow that triggers ask_human
- Verify workflow suspends (0 resource consumption)
- Resolve promise via API
- Verify workflow resumes with human input
- Kill worker while HITL is pending — verify promise survives restart
Phase 5: Gateway Rewrite
5.1 New Chat Route
- Rewrite
server/routes/chat.py: - Start Restate workflow (non-blocking)
- Subscribe to NATS subject for thread_id
- Stream events as SSE to frontend
- Remove in-process agent execution
- Remove WebHITLBridge (replaced by Restate promises)
5.2 NATS SSE Fan-out
- Gateway subscribes to
agent.events.{thread_id} - Each SSE event = one NATS message
- Handle client disconnect: unsubscribe from NATS (agent continues)
- Handle client reconnect: NATS JetStream replay from last sequence
5.3 Persistence
- Worker persists messages to Postgres (inside ctx.run)
- Gateway does NOT persist (read-only proxy)
- Existing
persist_assistant_message, persist_tool_result move to activities
5.4 Verification
- Full end-to-end: frontend → gateway → Restate → worker → NATS → SSE → frontend
- Compare SSE event format with existing (must be identical)
- Test disconnect/reconnect
- Test cancel via
POST /chat/{thread_id}/cancel
Phase 6: Multi-Agent
6.1 Sub-Agent Dispatch
- OrchestratorAgent hand-off →
ctx.service_call("AgentWorkflow", payload) - Each sub-agent is a separate Restate workflow invocation
- Results flow back to orchestrator durably
6.2 Cross-Agent Communication
- Sub-agents share thread_id (same conversation memory)
- Each sub-agent publishes to same NATS subject
- Frontend sees unified SSE stream
6.3 Verification
- OrchestratorAgent dispatches to 2 sub-agents
- Kill sub-agent worker → verify restart + result delivery
- Verify orchestrator receives sub-agent results after crash recovery
Phase 7: Production Hardening
7.1 Cancel/Abort
POST /chat/{thread_id}/cancel → Restate cancel API - Agent stops at next ctx.run checkpoint
- Publish
cancelled event to NATS
7.2 Distributed Tracing
- Propagate OTEL trace context through Restate invocations
- Restate invocation ID as span attribute
- NATS message headers carry trace context
7.3 Error Handling
- Retry policies per activity (LLM: 3 retries, tool: 2 retries)
- Dead letter handling for permanently failed workflows
- Error events published to NATS for frontend display
7.4 Redis-Backed TaskStore
- Migrate
shared/tasks/store.py from in-memory to Redis - TaskStore reads/writes via Redis hashes
- Kanban state survives worker restart
7.5 Verification
- Chaos testing: random worker kills during various workflow stages
- Verify no lost messages, no duplicate executions
- Load test: 100 concurrent workflows
Phase 8: Deployment
8.1 Docker Compose
- Update
docker-compose.yml with Restate + NATS services - Worker service with
uv run python -m ravi.distributed.worker - Health checks for all services
8.2 Kubernetes
- Restate Deployment (1 replica dev, 3 HA prod)
- NATS StatefulSet (3 replicas with JetStream clustering)
- Agent Worker Deployment with HPA (auto-scale on CPU/memory)
- Ingress rules for Restate admin (internal only)
8.3 Settings & Env Vars
RESTATE_INGRESS_URL (internal, not NEXT_PUBLIC) NATS_URL (internal) - Worker-specific:
WORKER_CONCURRENCY, WORKER_TASK_QUEUE
8.4 Verification
deploy.py updated for new services - Smoke test: end-to-end chat in Kind cluster
- Rolling update: deploy new worker version, verify zero downtime