Skip to content

Agent Framework — Architecture Diagrams

Eight diagrams covering every layer of the system, from individual components to the full end-to-end request flow.


1. Top-Level Component Map

Shows every module and how they connect. The frontend talks only to FastAPI routes; routes pull all dependencies from ServerContext via Depends(get_ctx); the agent uses Redis + Postgres for memory; the LLM is OpenAI.

graph TB
    subgraph Frontend["🖥️ Next.js Frontend (ai-chatbot-ui)"]
        UI[page.tsx\nChat UI]
        SSEClient[EventSource\nSSE Consumer]
        HIC[HumanInputCard]
        TAC[ToolApprovalCard]
        KB[KanbanPanel]
    end

    subgraph FastAPI["⚙️ FastAPI Server (agent-framework)"]
        subgraph Routes["server/routes/"]
            ChatRoute[POST /chat\nchat.py]
            HITLRoute[POST /chat/respond\nhitl.py]
            CancelRoute[POST /chat/cancel\ncancel.py]
            TasksRoute["GET/PATCH /tasks\ntasks.py"]
            ThreadsRoute[threads.py]
            FilesRoute["GET/POST/DELETE /files\nfiles.py"]
        end

        subgraph Context["server/context.py"]
            ServerCtx[ServerContext\nDI Container]
        end

        subgraph Services["server/services/"]
            AgentSvc[agent_service.py\nAgent factory +\nmemory restore]
            FileSvc[file_service.py\nsave / extract / purge]
        end

        subgraph SSE["server/sse/"]
            EB[EventBus\nevents.py]
            HITL[WebHITLBridge\nbridge.py]
            BR[BridgeRegistry\nbridge.py]
        end

        subgraph Agent["core/agents/"]
            ReAct[ReActAgent\nreact_agent.py]
        end

        subgraph Memory["core/memory/"]
            RM[RedisMemory]
            UBM[UnboundedMemory]
            SWC[SlidingWindowContext]
        end

        subgraph FileSystem["core/storage/"]
            FSAbC[FileStore ABC\nbase.py]
            LocalFS[LocalFileStore\nlocal.py]
            S3FS[S3FileStore\ns3.py]
            EncFS[EncryptedFileStore\nencrypted.py]
            TC[TenantContext\ntenant.py]
            FSFact[create_file_store\nfactory.py]
        end

        subgraph Tools["tools/"]
            AHT[AskHumanTool\nHITL input]
            TMT[TaskManagerTool\nKanban board]
            FMT[FileManagerTool\nagent file CRUD]
            BT[BuiltinTools\ncalculator, time, etc.]
            MCP[MCPTools\nexternal integrations]
        end
    end

    subgraph Storage["💾 Storage"]
        Redis[(Redis 7\nHot session store)]
        PG[(PostgreSQL 16\nCold / persistent store\n+ FileMetadata table)]
        ObjStore[(LocalFS volume\nor MinIO / S3\nFile object store)]
    end

    subgraph LLM["🤖 LLM Provider"]
        OAI[OpenAI API\ngpt-4o-mini]
    end

    UI -->|POST /chat| ChatRoute
    SSEClient -->|SSE stream| ChatRoute
    HIC -->|POST /chat/respond| HITLRoute
    TAC -->|POST /chat/respond| HITLRoute
    KB -->|GET /tasks| TasksRoute
    UI -->|POST /files upload| FilesRoute

    ChatRoute -->|Depends get_ctx| ServerCtx
    HITLRoute -->|Depends get_ctx| ServerCtx
    CancelRoute -->|Depends get_ctx| ServerCtx
    FilesRoute -->|Depends get_ctx| ServerCtx

    ServerCtx --> BR
    ServerCtx --> RM
    ServerCtx --> BT
    ServerCtx --> FSAbC

    ChatRoute --> AgentSvc
    AgentSvc --> ReAct
    AgentSvc --> RM

    FilesRoute --> FileSvc
    FileSvc --> FSAbC
    FileSvc --> PG

    ChatRoute --> FileSvc

    FSFact --> LocalFS
    FSFact --> S3FS
    LocalFS & S3FS --> EncFS
    FSAbC --> TC
    LocalFS -.-> ObjStore
    S3FS -.-> ObjStore

    ReAct -->|run_stream| EB
    ReAct --> OAI
    ReAct --> Tools
    ReAct --> FMT

    FMT --> FSAbC
    FMT --> PG

    EB -->|typed events| ChatRoute
    HITL -->|HITL events| ChatRoute

    BR -->|acquire/release| HITL
    HITLRoute -->|resolve| BR

    RM <--> Redis
    AgentSvc <--> PG

2. EventBus — How Events Flow from Agent to SSE

The EventBus is a typed wrapper around asyncio.Queue. Two background tasks write into it: - agent_worker emits strongly-typed events (TextDeltaEvent, CompletionEvent, etc.) - hitl_worker drains the bridge's outgoing queue and re-emits as RawDictEvent

The SSE generator polls the bus every 200 ms. On a TimeoutError it checks for browser disconnect or explicit cancel. When bus.close() is called it pushes the BUS_CLOSED sentinel which tells the consumer to stop.

graph LR
    subgraph Producers["Producers (write to bus)"]
        AW[agent_worker\nasyncio Task]
        HW[hitl_worker\nasyncio Task]
    end

    subgraph EventBus["EventBus — asyncio.Queue wrapper"]
        Q[asyncio.Queue\ntyped AgentEvent items]
        SENTINEL[_BUS_DONE sentinel\npublic alias: BUS_CLOSED]
        EMIT[emit — put typed event]
        EMITD[emit_dict — wrap as RawDictEvent]
        POLL[poll timeout 200ms\nraises TimeoutError if empty]
        CLOSE[close — push BUS_CLOSED\nidempotent]
    end

    subgraph EventTypes["Typed Events emitted by agent_worker"]
        TDE[TextDeltaEvent\nstreaming text chunk]
        RDE[ReasoningDeltaEvent\nthinking chunk]
        TCE[ToolCallEvent\ntool requested]
        TRE[ToolResultEvent\ntool completed]
        TARE[ToolApprovalRequestEvent\nwaiting approval]
        HIRE[HumanInputRequestEvent\nwaiting input]
        CE[CompletionEvent\nagent done]
        EE[ErrorEvent\nfailure]
        RDE2[RawDictEvent\nescape hatch for dicts\ntask_updated, etc.]
    end

    subgraph Consumer["Consumer — SSE generator loop in chat.py"]
        LOOP[poll loop\nevery 200 ms]
        DISC[disconnect check\nrequest.is_disconnected]
        CANCEL[cancel check\ncancel_event.is_set]
        SERIALISE[to_sse_line\nor json.dumps item.to_dict]
        SSE[yield SSE bytes\ndata: JSON]
    end

    AW -->|emit / emit_dict| EMIT
    HW -->|emit_dict for HITL events\nclose when BRIDGE_DONE| CLOSE
    EMIT --> Q
    EMITD --> Q
    CLOSE --> SENTINEL
    SENTINEL --> Q

    Q --> POLL
    POLL -->|item or TimeoutError| LOOP
    LOOP -->|TimeoutError| DISC
    LOOP -->|TimeoutError| CANCEL
    LOOP -->|BUS_CLOSED| SSE
    LOOP -->|typed event| SERIALISE
    SERIALISE --> SSE

    TDE -.-> Q
    RDE -.-> Q
    TCE -.-> Q
    TRE -.-> Q
    TARE -.-> Q
    HIRE -.-> Q
    CE -.-> Q
    EE -.-> Q
    RDE2 -.-> Q

3. WebHITLBridge — HITL Approval Sequence

WebHITLBridge is a two-way async channel. The agent blocks on an asyncio.Future. The future's ID is broadcast over SSE to the frontend which shows a UI card. When the user clicks Approve/Deny it POSTs to /chat/respond/{requestId} which calls future.set_result() to unblock the agent. On disconnect, cancel_all_pending() settles all futures immediately so the agent never hangs.

sequenceDiagram
    participant Agent as ReActAgent
    participant Bridge as WebHITLBridge
    participant HW as hitl_worker Task
    participant SSE as SSE Generator
    participant FE as Frontend Browser
    participant HIEP as POST /chat/respond

    Note over Agent,FE: Normal HITL approval flow (BLOCKING mode)

    Agent->>Bridge: tool needs approval\n_handle_approval(request)
    Bridge->>Bridge: create asyncio.Future\nstore in _pending[request_id]
    Bridge->>Bridge: save payload in _pending_payloads
    Bridge->>Bridge: _outgoing.put({type: tool_approval_request, ...})
    Note over Agent: Agent is BLOCKED\nawait asyncio.wait_for(future, 300s)

    HW->>Bridge: get_event() — pulls from _outgoing
    HW->>SSE: bus.emit_dict({type: tool_approval_request})
    SSE->>FE: data: {"type":"tool_approval_request","requestId":"..."}

    FE->>FE: render ToolApprovalCard
    FE->>HIEP: POST /chat/respond/{requestId}\n{"action":"approve"}

    HIEP->>Bridge: BridgeRegistry.resolve(requestId, data)
    Bridge->>Bridge: future.set_result({"action":"approve"})
    Note over Agent: Agent UNBLOCKS\nreceives ToolApprovalResponse

    Agent->>Agent: executes tool\ncontinues ReAct loop

    Note over Agent,FE: Browser disconnect while HITL pending

    FE--xSSE: TCP disconnect
    SSE->>SSE: poll() → TimeoutError\nrequest.is_disconnected() = True
    SSE->>Bridge: cancel_all_pending("session_disconnected")
    Bridge->>Bridge: future.set_result({session_disconnected: True})
    Note over Agent: Agent UNBLOCKS\ngets session_disconnected → DENY
    SSE->>SSE: bridge.signal_done()\nagent_task.cancel()

    Note over Bridge: Bridge KEPT ALIVE in BridgeRegistry\n(has_pending was True during the run)\nUser can reconnect and POST respond later

4. ReActAgent — ReAct Loop Internals

Shows the full Think → Act → Observe loop: LLM generates text/tool calls, each tool either runs directly or goes through the approval handler, results feed back into memory, and the loop repeats until finish_reason=stop or max_iterations is hit.

flowchart TD
    START([run_stream called\nwith user_content]) --> SEED

    SEED[seed system message\nif memory empty] --> IGSEED

    IGSEED[run input guardrails\nMaxTokenGuardrail etc.] --> ADDUSER

    ADDUSER[memory.add_message\nUserMessage] --> LLM

    LLM[call LLM\nmodel_client.generate_stream\nwith SlidingWindowContext] --> STREAM

    STREAM{LLM response\ntype?}

    STREAM -->|TextDelta chunk| TXT[yield TextDeltaChunk\nto caller]
    TXT --> STREAM

    STREAM -->|ReasoningDelta chunk| RSN[yield ReasoningDeltaChunk\nto caller]
    RSN --> STREAM

    STREAM -->|CompletionChunk\nfinish_reason=stop| DONE_NO_TOOLS

    STREAM -->|CompletionChunk\nfinish_reason=tool_calls| PARSE

    DONE_NO_TOOLS[yield CompletionChunk\nmemory.add_message AssistantMessage\nrun output guardrails] --> RETURN

    PARSE[parse tool calls\nnormalise to _ParsedToolCall list] --> ADDASSIST

    ADDASSIST[memory.add_message\nAssistantMessage with tool_calls] --> ITERTOOLS

    ITERTOOLS[for each tool call...] --> APPROVAL

    APPROVAL{tool in\ntools_requiring_approval?}

    APPROVAL -->|yes| HITL_CHECK[call tool_approval_handler\nwait for future\nblocking / timeout / fire-and-continue]

    HITL_CHECK -->|DENY| SKIP[emit synthetic\nTool Denied result]
    HITL_CHECK -->|APPROVE may modify args| EXEC

    APPROVAL -->|no| EXEC

    EXEC[execute tool\ntool.execute with timeout\ntool_retry_policy on error] --> RESULT

    RESULT[yield ToolExecutionResultMessage\nmemory.add_message result] --> NEXTCALL

    NEXTCALL{more tool calls\nin this iteration?}
    NEXTCALL -->|yes| APPROVAL
    NEXTCALL -->|no| CHECKITER

    SKIP --> NEXTCALL

    CHECKITER{iterations <\nmax_iterations?}
    CHECKITER -->|yes, loop again| LLM
    CHECKITER -->|no, max hit| FORCESTOP[yield ErrorEvent\nmax iterations reached]

    RETURN([AgentRunResult\nwith all steps])
    FORCESTOP --> RETURN

5. Memory System — Redis Hot Path + PostgreSQL Cold Store

On every request, RedisMemory.restore() tries Redis first and loads all stored messages into the local in-process list. On a miss it reads from Postgres and seeds Redis. During the run, every add_message() does a fire-and-forget background RPUSH to Redis — no blocking writes. SlidingWindowContext is the only layer that limits history — it selects the last model_context_window messages at LLM-call time. Postgres is written synchronously only for CompletionChunk and ToolResultMessage (inline, before the SSE event is emitted).

flowchart TD
    subgraph Request["Per-Request Flow in agent_service.py"]
        START([load_agent_for_thread called]) --> TRY_REDIS

        TRY_REDIS[per_request_mem = RedisMemory.for_session\nshares parent connection pool] --> RESTORE

        RESTORE[await per_request_mem.restore\nloads ALL messages from Redis\nno limit -- full history] --> HIT?

        HIT?{Redis hit?\nmessages found?}

        HIT? -->|yes, hot path| HOT[use per_request_mem directly\nfull history in local list\nalready loaded]

        HIT? -->|no, cold path| COLD[load_messages_for_memory\nfetch ordered steps from PostgreSQL]

        COLD --> REBUILD[_rebuild_messages\nmap step rows to message objects\nSystemMessage UserMessage\nAssistantMessage ToolExecutionResultMessage]

        REBUILD --> SEED_REDIS[add all messages to RedisMemory\nseeds Redis from Postgres cold store]

        SEED_REDIS --> HOT

        HOT --> CTX[wrap in SlidingWindowContext\nmax_messages = context_window e.g. 40\nfilters at LLM-call time -- LLM sees last N\nfull history kept in local list + Redis]

        CTX --> AGENT[create ReActAgent\nwith per_request_mem + SlidingWindowContext\nall add_message calls write-through\nto Redis via fire-and-forget task]
    end

    subgraph WriteThrough["Write-Through During Agent Run"]
        AT[agent.run_stream\nadd_message on every step] --> ADD

        ADD[RedisMemory.add_message\nappends to local list\nschedules background task] --> BG

        BG[asyncio background task\nRPUSH to Redis\nLTRIM to max_messages cap\nEXPIRE TTL refresh]
    end

    subgraph PersistRun["Inline DB Persistence in agent_worker"]
        COMP[CompletionChunk received\nby agent_worker] --> PCOMP

        PCOMP[persist_assistant_message\nPOSTGRES upsert\nBEFORE yielding SSE event]

        TR[ToolExecutionResultMessage\nreceived] --> PTR

        PTR[persist_tool_result\nPOSTGRES insert\nBEFORE yielding SSE event]
    end

    AGENT --> AT
    AGENT --> COMP
    AGENT --> TR

    Redis[("Redis 7\nLIST per session_id\nkey: session:ID:messages\ncap: max_messages e.g. 200\nttl: REDIS_SESSION_TTL")] -.-> RESTORE
    BG -.-> Redis
    PG[(PostgreSQL 16\nsteps table per thread)] -.-> COLD
    PCOMP -.-> PG
    PTR -.-> PG

6. ServerContext — DI Container, Locks and Cancel Registry

ServerContext is the single DI container assembled at startup. The thread_locks dict prevents concurrent streams on the same thread (returns 409). The cancel_registry dict maps thread IDs to asyncio.Events; the cancel route sets the event and the poll loop detects it within the next 200 ms poll window.

graph TB
    subgraph AppLifecycle["FastAPI Lifespan — app.py startup"]
        OTEL[configure OpenTelemetry\nOTLP → Tempo]
        DBInit[init_db\nSQLAlchemy async engine]
        RedisInit[RedisMemory global\nconnect TCP pool]
        FSInit[create_file_store\nLocalFileStore or S3FileStore\n+ optional EncryptedFileStore]
        ModelClient[OpenAIClient\ngpt-4o-mini]
        AudioClient[OpenAIAudioClient\ntranscription + TTS + realtime]
        BRInit[BridgeRegistry\nper-thread HITL bridge pool]
        ToolReg[ToolRegistry\nregister all tools at startup]
        CTX_BUILT[build ServerContext\nall deps bundled]
        STATE[app.state.ctx = ctx]
    end

    subgraph ServerContext["ServerContext dataclass — context.py"]
        SC[ServerContext]
        SC --> MC[model_client\nOpenAIClient]
        SC --> AC[audio_client\nOpenAIAudioClient]
        SC --> RM2[redis_memory\nglobal RedisMemory]
        SC --> TR2[tools\nToolRegistry]
        SC --> BR2[bridge_registry\nBridgeRegistry]
        SC --> TRA[tools_requiring_approval\nlist of tool names]
        SC --> SI[system_instructions\nloaded from prompts/default_system.md]
        SC --> TT[tool_timeout float]
        SC --> CR[cancel_registry\ndict thread_id → asyncio.Event]
        SC --> TL[thread_locks\ndict thread_id → asyncio.Lock]
        SC --> SF[session_factory\nSQLAlchemy async factory]
        SC --> MCP2[mcp_servers\ndict name → config]
        SC --> CI[ci_client\nCodeInterpreterClient optional]
        SC --> FS2[file_store\nFileStore ABC\nLocalFileStore or S3FileStore]
    end

    subgraph SingleFlight["Single-Flight Lock — thread_locks"]
        TL --> SFL[setdefault: create Lock if missing]
        SFL --> LOCKED{locked?}
        LOCKED -->|yes| HTTP409[raise 409\nstream already running]
        LOCKED -->|no| ACQUIRE[acquire lock\nfor SSE generator lifetime]
        ACQUIRE --> FINREL[finally: release lock\npop from dict on stream end]
    end

    subgraph CancelReg["Cancel Registry — cancel_registry"]
        CR --> CRK[key: str thread_id]
        CRK --> EVT[value: asyncio.Event]
        EVT --> CPOST[POST /chat/cancel\n→ event.set]
        CPOST --> POLLED[poll loop detects\ncancel_event.is_set]
        POLLED --> CANYIELD[yield cancelled SSE]
    end

    OTEL & DBInit & RedisInit & FSInit & ModelClient & AudioClient & BRInit & ToolReg --> CTX_BUILT --> STATE

    subgraph DI["Dependency Injection — get_ctx"]
        GETCTX["def get_ctx(request: Request)\n→ request.app.state.ctx"]
    end

    STATE -.-> GETCTX
    GETCTX -.->|"Depends(get_ctx) in every route"| SC

7. Complete End-to-End Flow

The full lifecycle of a user message across all 7 phases: request setup → memory restore → file context build → direct text response → tool call → HITL approval → cancel/disconnect.

EventBus is an in-process asyncio.Queue — not a network service. It is not shown as a participant; instead the agent_worker emits events to it and the SSE generator polls it, both inside the same process.

sequenceDiagram
    autonumber
    participant FE as Frontend\n(page.tsx)
    participant ChatAPI as POST /chat\n(chat.py)
    participant AgSvc as agent_service\nload_agent_for_thread
    participant Redis as Redis 7\nHot Store
    participant PG as PostgreSQL\nCold Store
    participant FileStore as FileStore\n(local / S3)
    participant ReAct as ReActAgent\nreact_agent.py
    participant OAI as OpenAI API
    participant Tools as Tools\n(executor)
    participant Bridge as WebHITLBridge\nhitl.py
    participant HIRE as POST /chat/respond\n(hitl.py)

    Note over FE,HIRE: ═══ STARTUP (once) ═══
    Note over ChatAPI: app.state.ctx = ServerContext\n(model_client, bridge_registry,\nredis_memory, tools, file_store, locks, ...)

    Note over FE,HIRE: ═══ PHASE 1 — Request Setup ═══

    FE->>ChatAPI: POST /chat {thread_id, messages[]}
    ChatAPI->>PG: get_thread(thread_id) — validate exists
    ChatAPI->>ChatAPI: thread_locks.setdefault(thread_id, Lock)\nif locked → 409 Conflict
    ChatAPI->>ChatAPI: thread_lock.acquire()
    ChatAPI->>Bridge: bridge_registry.acquire(thread_id)\nget or create WebHITLBridge
    ChatAPI->>AgSvc: load_agent_for_thread(thread_id, ...)

    Note over FE,HIRE: ═══ PHASE 2 — Memory Restore ═══

    AgSvc->>Redis: RedisMemory.restore()\nLRANGE session:{id}:messages 0 -1
    alt Redis HIT (hot path)
        Redis-->>AgSvc: ALL stored messages (up to max_messages cap)
        Note over AgSvc: full history loaded into local in-process list
    else Redis MISS (cold path)
        Redis-->>AgSvc: empty
        AgSvc->>PG: load_messages_for_memory(thread_id)\nSELECT steps ORDER BY created_at
        PG-->>AgSvc: step rows
        AgSvc->>AgSvc: _rebuild_messages(rows)\nSystemMessage + UserMessage +\nAssistantMessage + ToolResultMessage
        AgSvc->>Redis: add_message for each — seeds Redis from Postgres
    end
    AgSvc->>AgSvc: wrap in SlidingWindowContext(max=model_context_window)\nfilters at LLM-call time — last N msgs sent to OpenAI\nfull history stays in local list and Redis
    AgSvc-->>ChatAPI: ReActAgent ready

    Note over FE,HIRE: ═══ PHASE 3 — File Context + Persist Setup ═══

    ChatAPI->>PG: get_files_by_ids(file_ids) — query FileMetadata
    PG-->>ChatAPI: FileMetadata records
    ChatAPI->>FileStore: extract_text(store, meta) per attached file
    FileStore-->>ChatAPI: extracted text bytes
    ChatAPI->>ChatAPI: hooks.fire_message(hook_ctx, user_content)
    ChatAPI->>PG: persist_user_message(thread_id, content)
    Note over ChatAPI: EventBus() — create fresh in-process asyncio.Queue\nasyncio.create_task(agent_worker)\nasyncio.create_task(hitl_worker)
    ChatAPI-->>FE: StreamingResponse (SSE)\nContent-Type: text/event-stream

    Note over FE,HIRE: ═══ PHASE 4 — Direct Response (no tools) ═══

    activate ChatAPI
    ChatAPI->>ReAct: agent.run_stream(user_content)
    ReAct->>Redis: memory.add_message(UserMessage)\nfire-and-forget background RPUSH
    ReAct->>OAI: generate_stream(messages, tools)\nSlidingWindowContext builds prompt
    OAI-->>ReAct: TextDelta chunks...
    ReAct-->>ChatAPI: yield TextDeltaChunk
    Note over ChatAPI: agent_worker emits event to in-process EventBus\nSSE generator polls bus and streams to FE
    ChatAPI-->>FE: data: {"type":"text_delta","content":"Hello..."}

    OAI-->>ReAct: CompletionChunk finish_reason=stop
    ReAct->>Redis: memory.add_message(AssistantMessage)
    ReAct-->>ChatAPI: yield CompletionChunk
    ChatAPI->>PG: persist_assistant_message BEFORE emitting
    ChatAPI-->>FE: data: {"type":"completion","content":"..."}

    ChatAPI->>Bridge: agent_worker finally: bridge.signal_done() — BRIDGE_DONE sentinel
    Bridge-->>ChatAPI: hitl_worker gets BRIDGE_DONE → bus.close()
    ChatAPI-->>FE: data: [DONE]
    ChatAPI->>ChatAPI: finally: release lock\npop cancel_registry\nbridge_registry.release_if_idle

    Note over FE,HIRE: ═══ PHASE 5 — Tool Call Path ═══

    OAI-->>ReAct: CompletionChunk finish_reason=tool_calls\n[{name:"calculator", arguments:{...}}]
    ReAct->>ReAct: parse tool calls → _ParsedToolCall
    ReAct->>ReAct: memory.add_message(AssistantMessage with tool_calls)

    alt Tool NOT in tools_requiring_approval
        ReAct->>Tools: tool.execute(**arguments)
        Tools-->>ReAct: ToolResult(content="42")
        ReAct->>Redis: memory.add_message(ToolExecutionResultMessage)
        ReAct-->>ChatAPI: yield ToolExecutionResultMessage
        ChatAPI->>PG: persist_tool_result BEFORE emitting
        ChatAPI-->>FE: data: {"type":"tool_result","tool_name":"calculator","result":"42"}
    end
    ReAct->>OAI: next LLM call with tool result in context

    Note over FE,HIRE: ═══ PHASE 6 — HITL Tool Approval Path ═══

    OAI-->>ReAct: CompletionChunk finish_reason=tool_calls\n[{name:"delete_file", arguments:{...}}]
    Note over ReAct: delete_file IS in tools_requiring_approval

    ReAct->>Bridge: tool_approval_handler._handle_approval(ToolApprovalRequest)
    Bridge->>Bridge: create asyncio.Future()\n_pending[request_id] = future
    Bridge->>Bridge: _outgoing.put({type: tool_approval_request, ...})
    Note over ReAct: BLOCKED — awaiting future.set_result

    activate Bridge
    ChatAPI->>Bridge: hitl_worker: get_event()
    Bridge-->>ChatAPI: {type: tool_approval_request, requestId: "abc"}
    ChatAPI-->>FE: data: {"type":"tool_approval_request","requestId":"abc"}
    FE->>FE: render ToolApprovalCard

    FE->>HIRE: POST /chat/respond/abc\n{"action":"approve"}
    HIRE->>Bridge: BridgeRegistry.resolve("abc", {action:"approve"})
    Bridge->>Bridge: future.set_result({"action":"approve"})
    deactivate Bridge
    Note over ReAct: UNBLOCKED — returns ToolApprovalResponse APPROVE

    ReAct->>Tools: tool.execute(**arguments)
    Tools-->>ReAct: ToolResult
    ReAct-->>ChatAPI: yield ToolExecutionResultMessage
    ChatAPI-->>FE: data: {"type":"tool_result",...}

    Note over FE,HIRE: ═══ PHASE 7 — Cancel / Disconnect ═══

    FE->>ChatAPI: POST /chat/{thread_id}/cancel
    ChatAPI->>ChatAPI: cancel_registry[thread_id].set()
    Note over ChatAPI: poll() → TimeoutError\ncancel_event.is_set() = True
    ChatAPI->>ReAct: agent_task.cancel()
    ChatAPI-->>FE: data: {"type":"cancelled"}
    ChatAPI-->>FE: data: [DONE]
    deactivate ChatAPI

8. File Storage Architecture

Four-zone view of the file storage subsystem: API ingress → service functions → storage abstraction → persistence backends.

flowchart TD
    subgraph APILayer["🌐 API Layer"]
        FilesRoute["FilesRoute\nPOST /files (upload)\nGET /files (list)\nGET /files/{id} (download)\nDELETE /files/{id}"]
        FMT["FileManagerTool\nagent-facing CRUD\ncurrent_thread_id ContextVar"]
    end

    subgraph ServiceLayer["⚙️ Service Layer — file_service.py"]
        SaveFile["save_file()\nwrite bytes → store\ninsert FileMetadata row"]
        ListFiles["list_files()\nSELECT FileMetadata\nby thread / user / org"]
        GetFile["get_file()\nfetch bytes from store\nchecks ownership"]
        DeleteFile["delete_file()\ndelete from store\ndelete FileMetadata row"]
        ExtractText["extract_text()\nplain-text → raw bytes\nPDF/image → placeholder"]
        GetURL["get_file_url()\npresigned URL (S3)\nor local /files/{id}"]
        PurgeThread["purge_thread_files()\ndelete all files for\na thread on cleanup"]
    end

    subgraph StorageLayer["🗂️ Storage Layer — core/storage/"]
        FSAbC["FileStore ABC\nput / get / delete / copy\nlist / exists / make_public_url"]
        LocalFS["LocalFileStore\nlocal disk volume\npath = base_dir/{tenant_path}"]
        S3FS["S3FileStore\naiobotocore async\nbucket/{tenant_path}"]
        EncFS["EncryptedFileStore\nAES-256-GCM wrapper\ntransparent encrypt/decrypt on put/get"]
        TC["TenantContext path builder\norg/{org_id}/user/{user_id}/\nthread/{thread_id}/{scope}/"]
        Factory["create_file_store(settings)\ndispatches on FILE_STORE_BACKEND\n+ wraps EncryptedFileStore\nif FILE_STORE_ENCRYPT=true"]
    end

    subgraph PersistenceLayer["💾 Persistence Layer"]
        PGMeta[("PostgreSQL\nFileMetadata table\nid, thread_id, user_id, org_id\nstore_key, filename, mime_type\nsize_bytes, created_at")]
        ObjStore[("LocalFS volume\nor MinIO / AWS S3\nraw file bytes")]
    end

    FilesRoute --> SaveFile & ListFiles & GetFile & DeleteFile & GetURL
    FMT --> SaveFile & ListFiles & GetFile & DeleteFile

    SaveFile & GetFile & DeleteFile & ExtractText & GetURL & PurgeThread --> FSAbC
    SaveFile & ListFiles & GetFile & DeleteFile --> PGMeta

    FSAbC --> LocalFS & S3FS
    LocalFS & S3FS --> EncFS
    Factory --> LocalFS & S3FS
    FSAbC --> TC

    LocalFS --> ObjStore
    S3FS --> ObjStore