Execution Pipeline¶
End-to-end trace of a user message through the Agent Framework — from browser keystroke to streamed SSE token — with component-level box boundaries.
Component Boundaries¶
| Box | Components inside |
|---|---|
| Browser | React UI (page.tsx), EventSource SSE consumer |
| Next.js BFF | app/api/chat/route.ts, app/api/chat/respond/[id]/route.ts |
| FastAPI Server | server/app.py, server/routes/chat.py, server/routes/workflows.py, WebHITLBridge |
| Restate Cluster | Restate ingress (HTTP), durable journal, promise store |
| Worker | integrations/runtime/restate/worker.py, activities.py, ReActAgent loop |
| Infrastructure | Redis (conversation memory), PostgreSQL (thread/message persistence), NATS (pub/sub fan-out), OpenAI API |
Normal Execution Flow¶
sequenceDiagram
box Browser
participant UI as React UI
participant SSE as EventSource
end
box Next.js BFF
participant BFF as /api/chat route
participant HITLProxy as /api/chat/respond
end
box FastAPI Server
participant API as FastAPI
participant Bridge as WebHITLBridge
participant WFClient as RestateWorkflowClient
end
box Restate Cluster
participant Restate as Restate Ingress
participant Journal as Durable Journal
end
box Worker
participant Worker as Worker Process
participant Act as Activities
participant Agent as ReActAgent
end
box Infrastructure
participant Redis as Redis
participant PG as PostgreSQL
participant NATS as NATS
participant LLM as OpenAI API
end
UI->>BFF: POST /api/chat {message, threadId}
BFF->>API: POST /chat (proxied)
API->>Bridge: register SSE channel (conversationId)
API->>WFClient: start_agent_workflow(conv_id, message)
WFClient->>Restate: POST /invoke/AgentWorkflow/{conv_id}/agent_run
Restate-->>WFClient: 200 OK (workflow accepted)
WFClient-->>API: AgentWorkflowHandle
API-->>BFF: 200 + SSE stream open
BFF-->>SSE: SSE stream open
SSE-->>UI: stream connected
Restate->>Worker: dispatch agent_run(input)
Worker->>Act: configure() [already done at startup]
Act->>Redis: restore_memory(conv_id)
Redis-->>Act: message history
loop ReAct Loop
Act->>LLM: do_llm_call(messages)
LLM-->>Act: AssistantMessage (text or tool_use)
Journal->>Journal: persist llm result
alt stop_reason == tool_use
Act->>Act: do_tool_exec(tool_name, args)
Journal->>Journal: persist tool result
Act->>Redis: persist_message(tool_result)
Act->>PG: persist_message(tool_result)
end
Act->>NATS: publish_event(text_delta / tool_call / tool_result)
NATS->>Bridge: fan-out to SSE channel
Bridge-->>SSE: SSE event
SSE-->>UI: render token / tool bubble
end
Act->>NATS: publish_event(completion)
NATS->>Bridge: fan-out
Bridge-->>SSE: completion event
SSE-->>UI: finalize assistant turn HITL (Human-In-The-Loop) Approval Flow¶
Triggered when a tool has requires_approval=True or the agent calls ask_human.
sequenceDiagram
box Browser
participant UI as React UI
participant SSE as EventSource
end
box Next.js BFF
participant HITLProxy as /api/chat/respond/[id]
end
box FastAPI Server
participant API as FastAPI /hitl/respond
participant WFClient as RestateWorkflowClient
end
box Restate Cluster
participant Restate as Restate Ingress
participant Promise as Durable Promise Store
end
box Worker
participant Act as Activities
end
Act->>Promise: ctx.promise("hitl-{requestId}")
Act->>Act: publish_event(tool_approval_request / human_input_request)
Note over Act: worker suspends — thread freed
Act-->>SSE: SSE: tool_approval_request {requestId, tool_name, input}
SSE-->>UI: render ToolApprovalCard / HumanInputCard
UI->>HITLProxy: POST /api/chat/respond/{requestId} {approved, value}
HITLProxy->>API: POST /hitl/respond/{requestId}
API->>WFClient: resolve_promise(AgentWorkflow, conv_id, "hitl-{id}", value)
WFClient->>Restate: POST /invoke/AgentWorkflow/{conv_id}/resolve_approval
Restate->>Promise: resolve("hitl-{requestId}", value)
Note over Restate: workflow resumes
Restate->>Act: return resolved value to awaiting ctx.promise().get()
Act->>Act: continue tool execution with approved inputs HITL Human Input Flow¶
Identical to approval above but triggered by the ask_human tool.
sequenceDiagram
box Browser
participant UI as React UI
end
box Next.js BFF
participant HITLProxy as /api/chat/respond/[id]
end
box FastAPI Server
participant API as FastAPI /hitl/respond
participant WFClient as RestateWorkflowClient
end
box Restate Cluster
participant Promise as Durable Promise Store
end
box Worker
participant Act as Activities
end
Act->>Promise: ctx.promise("hitl-{requestId}") [ask_human variant]
Act-->>UI: SSE: human_input_request {requestId, prompt, options}
UI-->>UI: render HumanInputCard
UI->>HITLProxy: POST /api/chat/respond/{requestId} {response: "user typed answer"}
HITLProxy->>API: POST /hitl/respond/{requestId}
API->>WFClient: resolve_promise(..., "hitl-{id}", {response: "..."})
WFClient->>Promise: resolve
Promise-->>Act: ctx.promise().get() returns user answer
Act->>Act: feed user answer back into ReAct loop as ToolExecutionResultMessage Pipeline Workflow Execution¶
sequenceDiagram
box FastAPI Server
participant API as FastAPI
participant WFClient as RestateWorkflowClient
end
box Restate Cluster
participant Restate as Restate Ingress
participant Journal as Durable Journal
end
box Worker
participant PW as PipelineWorkflow
participant Act as Activities
end
box Infrastructure
participant Adapter as External Adapters
end
API->>WFClient: start_pipeline_workflow(id, steps, context)
WFClient->>Restate: POST /invoke/PipelineWorkflow/{id}/pipeline_run
Restate->>PW: dispatch pipeline_run(steps)
loop For each step
PW->>Act: execute_adapter_step(step, resolved_inputs)
Note over PW,Act: $prev.field / $step[n].field refs resolved
Act->>Adapter: call adapter (web_surfer / code_interpreter / email_sender …)
Adapter-->>Act: step result
Journal->>Journal: persist step result
Act-->>PW: step_output
end
PW-->>Restate: pipeline complete
Restate-->>WFClient: workflow finished Startup Sequence (Worker)¶
sequenceDiagram
box Worker
participant Main as worker.py main()
participant Act as activities.configure()
end
box Infrastructure
participant Redis as Redis
participant PG as PostgreSQL
participant NATS as NATS
participant LLM as OpenAI (client init)
end
box Restate Cluster
participant Admin as Restate Admin API
end
Main->>LLM: OpenAIClient(api_key, model)
Main->>Redis: RedisMemory.connect()
Main->>PG: engine connect (asyncpg)
Main->>NATS: NATSBridge.connect()
Main->>Act: configure(streaming, model_client, tools, redis_memory, …)
Main->>Admin: register_deployment(worker_url)
Admin-->>Main: 200 OK — Restate knows activity endpoints
Note over Main: Worker ready — polling Restate for workflow tasks Key Data Flows Summary¶
User keystroke
→ Next.js BFF (POST /api/chat)
→ FastAPI (register SSE, start workflow)
→ RestateWorkflowClient (HTTP POST to Restate ingress)
→ Restate (persist in journal, dispatch)
→ Worker (run ReAct loop via Activities)
→ Redis (memory restore/persist)
→ OpenAI (LLM call — result journalled)
→ Tool (execute — result journalled)
→ PG (persist message)
→ NATS (publish SSE event)
→ WebHITLBridge (fan-out)
→ EventSource (Browser SSE stream)
→ React UI (render token)