Backend Architecture
4.1 Directory Structure
app/
├── main.py # FastAPI app entry, router registration, startup/shutdown hooks
├── deps.py # get_current_user / get_service_context dependencies
│
├── core/
│ ├── settings.py # Path constants (ROOT_DIR, USERS_DIR, DATA_DIR)
│ ├── security.py # User auth (register/login/JWT) + _create_user_dirs
│ ├── api_config.py # Route API Key + base_url by capability (supports user_id)
│ ├── user_api_keys.py # AES-256-GCM encrypted per-admin key storage
│ ├── encryption.py # AES-GCM master key (data/encryption.key)
│ ├── path_security.py # safe_join / ensure_within (path traversal protection)
│ ├── fileutil.py # File read/write utilities
│ └── observability.py # Langfuse integration (v3 SDK)
│
├── schemas/
│ ├── requests.py # Admin request models
│ └── service.py # Service / Consumer request/config models
│
├── services/
│ ├── agent.py # Admin Agent factory + model resolution + cache
│ ├── consumer_agent.py # Consumer Agent factory (channel-aware)
│ ├── tools.py # @tool factories + CAPABILITY_PROMPTS
│ ├── ai_tools.py # Multimedia generation (image/tts/video) internals
│ ├── web_tools.py # CloudsWay / Tavily dual provider
│ ├── script_runner.py # Script execution (subprocess + AST check + semaphore queue)
│ ├── _sandbox_wrapper.py # Runtime I/O sandbox (monkey-patch)
│ ├── conversations.py # Admin conversation persistence + attachments
│ ├── prompt.py # System Prompt version management + capability prompts
│ ├── preferences.py # User timezone/UI preferences
│ ├── subagents.py # Subagent config + DEFAULT_SUBAGENTS
│ ├── memory_tools.py # Memory Subagent tools + Soul config + short-term memory
│ ├── published.py # Service CRUD + API Key + Consumer sessions
│ ├── scheduler.py # Scheduled tasks (Admin + Service dual-track)
│ ├── inbox.py # Inbox (contact_admin → Inbox Agent → WeChat forward)
│ └── venv_manager.py # Per-user Python virtual environments
│
├── routes/ # See §4.2 for full route table
│ ├── auth.py
│ ├── conversations.py
│ ├── chat.py
│ ├── files.py
│ ├── scripts.py
│ ├── models.py
│ ├── settings_routes.py # system_prompt / user_profile / subagents / api_keys / soul / capability_prompts
│ ├── batch.py
│ ├── services.py
│ ├── consumer.py # /api/v1/* (Consumer external API)
│ ├── consumer_ui.py # /s/{service_id} (Consumer chat page)
│ ├── scheduler.py
│ ├── inbox.py
│ └── wechat_ui.py # /wc/{service_id} (WeChat scan landing page)
│
├── channels/wechat/
│ ├── client.py # iLink protocol client (getconfig/getupdates/sendmessage/cdn)
│ ├── bridge.py # Service Consumer Bridge (multimodal + send_message interception)
│ ├── admin_bridge.py # Admin self-onboarding Bridge (independent logic)
│ ├── admin_router.py # /api/admin/wechat/* routes
│ ├── router.py # /api/wc/* public routes + Admin service channel management
│ ├── session_manager.py # Session lifecycle + persistence + reconnection
│ ├── media.py # AES-128-ECB + CDN upload/download
│ ├── delivery.py # Unified delivery (with <<FILE:>> tag parsing)
│ └── rate_limiter.py # Per-user/QR/global session rate limiting
│
├── storage/
│ ├── config.py # S3Config + is_s3_mode
│ ├── base.py # StorageService ABC
│ ├── local.py # Local filesystem implementation
│ ├── s3.py # boto3 S3 implementation (MinIO/R2/OSS compatible)
│ ├── s3_backend.py # deepagents BackendProtocol for S3
│ └── __init__.py # get_storage_service / create_agent_backend / create_consumer_backend
│
└── voice/
└── router.py # WebSocket S2S proxy (OpenAI Realtime)4.2 FastAPI Route Reference
app/main.py registers all routes — approximately 70 routes total.
Public / Auth
| Prefix | Route | Description |
|---|---|---|
/api/auth | register / login / me | Register (requires code) + Login + JWT |
Admin (requires get_current_user)
| Prefix | Purpose |
|---|---|
/api/conversations | Conversation CRUD + attachments |
/api/chat | Primary SSE chat + /resume + /stop + /streaming-status |
/api/files | File CRUD + upload + download + media |
/api/scripts | Script execution + audio transcription |
/api/models | Available models list |
/api/system-prompt | Prompt content + version management |
/api/user-profile | User profile + version management |
/api/subagents | Subagent CRUD + available_tools |
/api/capability-prompts | Per-user capability prompt overrides |
/api/soul/config | Soul config (GET/PUT) |
/api/batch | Excel batch execution |
/api/services | Service CRUD + API Key |
/api/scheduler | Admin scheduled tasks + run history |
/api/scheduler/services/... | Service scheduled tasks |
/api/inbox | Inbox (list/get/update/delete) |
/api/packages | Per-user venv package management |
/api/settings/api-keys | Per-admin API Keys (encrypted) |
/api/wc/... | WeChat Service channel (QR + sessions + messages) |
/api/admin/wechat/* | Admin WeChat self-onboarding |
/api/voice/... | WebSocket S2S proxy |
Consumer (requires get_service_context)
| Route | Description |
|---|---|
POST /api/v1/conversations | Create conversation |
GET /api/v1/conversations/{id} | Get conversation history |
GET /api/v1/conversations/{id}/files | List generated files |
GET /api/v1/conversations/{id}/files/{path} | Download generated file (query ?key=) |
GET /api/v1/conversations/{id}/attachments/{path} | Download user attachment |
POST /api/v1/chat | Custom SSE chat |
POST /api/v1/chat/completions | OpenAI-compatible (streaming + non-streaming) |
Static Pages
| Route | Description |
|---|---|
GET /s/{service_id} | Consumer standalone chat page (React multi-entry) |
GET /wc/{service_id} | WeChat scan landing page (HTML template) |
4.3 Dependency Injection & Authentication
app/deps.py provides two core dependencies:
async def get_current_user(token: str = Depends(oauth2_scheme)) -> dict
async def get_service_context(authorization: str = Header(...)) -> ServiceContext| Dimension | Admin | Consumer |
|---|---|---|
| Credential | JWT Bearer Token | sk-svc-... Bearer Token |
| Storage | users/users.json (bcrypt + sha256 fallback) | users/{admin}/services/{svc}/keys.json (sha256 hashed) |
| Context | {user_id, username} | ServiceContext{admin_id, service_id, service_config, key_id} |
| Applicable routes | /api/* (except /api/v1/*) | /api/v1/* + /s/{sid} |
Both dependency sets coexist in the same FastAPI instance. Route files must explicitly choose the right one — Consumer routes must not use
get_current_user.
4.4 Agent Engine
4.4.1 Admin Agent (app/services/agent.py)
- Creation:
create_user_agent(user_id, model_id, capabilities, plan_mode, ...) - Cache: per-(user_id, model_id, capabilities, plan_mode, channel) — avoids re-initialization
- Checkpointer:
AsyncSqliteSaver→data/checkpoints.db- Executes
PRAGMA journal_mode=WAL+PRAGMA synchronous=NORMALon startup to reduce lock contention with multiple bridges
- Executes
- Soul injection: injects
memory_subagent/soul_editcapability prompts based onsoul/config.json - Cache invalidation:
clear_agent_cache(user_id)— called when prompt / subagent / API key changes
4.4.2 Model Resolution
# Model ID format
"anthropic:claude-sonnet-4-6-20250929"
"openai:gpt-5.4"
# Thinking models auto-configure extended_thinking
THINKING_MODEL_CONFIG = {
"anthropic:claude-opus-4-6-20250929-thinking": {...},
...
}
# api_key + base_url resolved via api_config + user_api_keys
_resolve_model(model_id, user_id=None) → ChatModel4.4.3 Consumer Agent (app/services/consumer_agent.py)
- Factory:
create_consumer_agent(admin_id, service_id, conv_id, channel="web", ...) - Channel-aware (critical!):
channel="web": Web /s/ +/api/v1/*, does not injectsend_message(messages already flow through SSE)channel="wechat": injectssend_message, results delivered bydelivery.pyto WeChatchannel="scheduler": same as wechat, for scheduled task push
- cache_key: includes
::ch={channel}(for non-web), preventing incorrect cache reuse - Restricted capabilities: only injects tools based on Service
capabilitieslist - Filesystem: isolated to
users/{admin_id}/services/{svc}/conversations/{conv_id}/generated/
4.4.4 Message Timestamp Injection (Refactored 2026-04-13)
Exact time is no longer written to system prompt (daily cache would freeze time). Instead, [YYYY-MM-DD HH:MM:SS] is injected before each user message:
prompt.py::stamp_message(content, user_id)handles bothstrand multimodallistformats- Injection points:
chat.py(Web),admin_bridge.py(Admin WeChat),consumer.py(Consumer × 3),bridge.py(Service WeChat) - Consumer side uses
admin_idto resolve timezone
4.5 Tool System
4.5.1 Built-in Tools (app/services/tools.py)
| Tool | Description | Admin | Consumer Injection Condition |
|---|---|---|---|
read_file / write_file / ls / glob / grep | deepagents built-in | ✅ Always | ✅ Always |
edit_file / write_todos | deepagents built-in | ✅ | ✅ |
task | deepagents subagent dispatch | ✅ | ✅ |
run_python_script | Sandboxed script execution | ✅ | ✅ |
web_search / web_fetch | CloudsWay / Tavily | ✅ Always | web capability |
generate_image / generate_speech / generate_video | OpenAI multimedia | By capability | By capability |
schedule_task / manage_scheduled_tasks | Scheduled task CRUD | ✅ Always | scheduler capability |
publish_service_task | Admin dispatches task to Service | ✅ Always | ❌ |
send_message | Send to WeChat user | Injected in wechat channel | humanchat capability + non-web channel |
contact_admin | Service notifies admin | ❌ | humanchat capability |
soul_list / soul_read / soul_write / soul_delete | Soul file operations | Memory Subagent only (when memory_subagent_enabled) | ❌ |
4.5.2 Multimedia Tools (app/services/ai_tools.py)
Calls OpenAI API under the hood:
generate_image:gpt-image-1, output stored ingenerated/images/generate_speech:tts-1/tts-1-hd(6 voices), stored ingenerated/audio/generate_video:Sora 2, stored ingenerated/videos/
Return convention: Returns the message "Generated ... Please display with <<FILE:/generated/xxx>> to the user". Both frontend markdown.ts and delivery.py::extract_media_tags recognize this tag.
4.5.3 Web Tools (app/services/web_tools.py)
Dual provider, resolves key by user_id:
| Provider | Key Source | Notes |
|---|---|---|
| CloudsWay (preferred) | CLOUDSWAY_SEARCH_KEY | CLOUDSWAY_SEARCH_URL / CLOUDSWAY_READ_URL override endpoints |
| Tavily (fallback) | TAVILY_API_KEY | Automatic fallback |
4.5.4 Capability Prompts
tools.py::CAPABILITY_PROMPTSdefines tool usage rules for each capability- Per-user overrides:
users/{uid}/capability_prompts.json(stores only overridden entries) - API:
GET /api/capability-prompts(withis_custom),PUT /key,DELETE /key - Resolution:
prompt.py::get_resolved_capability_prompt(user_id, key)
4.6 Subagent / Memory / Soul
4.6.1 Subagent (app/services/subagents.py)
- Storage:
users/{uid}/subagents.json - DEFAULT_SUBAGENTS: includes one built-in
memorysubagent (cannot delete, but can disable) - Available tool pool (
SHARED_TOOL_NAMES + MEMORY_TOOL_NAMES):- General:
run_script/web_search/web_fetch/generate_image/generate_speech/generate_video/schedule_task/manage_scheduled_tasks/publish_service_task/send_message - Memory:
list_conversations/read_conversation/list_service_conversations/read_service_conversation/read_inbox/soul_list/soul_read/soul_write/soul_delete
- General:
- On-demand creation:
build_subagent_tools(subagent_config, user_id)only instantiates tools listed in config - API:
GET /api/subagents(withavailable_tools) + CRUD
4.6.2 Memory Subagent
- Admin Memory Subagent: default tools = 5 conversation/inbox read tools
- When
memory_subagent_enabled=true: adds 4 soul write tools - Conversation history is read-only, soul content is read-write
- When
- Consumer Memory Subagent: only
read_my_conversation(own conversations, read-only) - Factories:
create_admin_memory_tools(user_id)→ 5-9 toolscreate_consumer_memory_tools(admin_id, svc_id, conv_id)→ 1 tool
4.6.3 Soul System (app/services/memory_tools.py)
users/{uid}/
├── soul/
│ └── config.json # App-layer config (not agent-accessible)
└── filesystem/soul/ # Agent-readable/writable soul content (notes/personality)
└── *.md / *.json- config.json fields:
memory_enabled: enables short-term memory injectioninclude_consumer_conversations: whether Memory Subagent can read Consumer conversationsmax_recent_messages: default 5memory_subagent_enabled: whether Memory Subagent has soul write permissionsoul_edit_enabled: whether main Agent can directly read/write Soul viafilesystem/soul/
- Path migration:
sync_soul_symlink()removes old symlinks and auto-migrates content tofilesystem/soul/(avoids deepagentsPath.resolve()following symlinks causing escape errors) - Capability prompts:
memory_subagent/soul_editinjected by toggle state inCAPABILITY_PROMPTS
4.6.4 Short-term Memory Injection
scheduler.py::_run_*_agent_task: reads recent N messages from conversation JSON, prepends to promptinbox.py::_trigger_inbox_agent: injects 3 most recent inbox messages- Source tagging:
- Service scheduled task prompt header:
[System Instruction - From Admin] - Inbox agent prompt header:
[System Instruction - Service Inbox Notification]
- Service scheduled task prompt header:
4.7 Storage Layer
4.7.1 Abstract Interface (app/storage/base.py)
class StorageService(ABC):
async def read_file(self, path: str) -> bytes
async def write_file(self, path: str, content: bytes) -> None
async def list_directory(self, path: str) -> list
async def delete(self, path: str) -> None
async def exists(self, path: str) -> bool
async def move(self, src: str, dst: str) -> None4.7.2 Backend Selection
Controlled via STORAGE_BACKEND environment variable:
local(default):LocalStorageService—os.*operations on local disks3:S3StorageService— boto3 API (AWS S3 / MinIO / R2 / Alibaba OSS compatible)
4.7.3 S3 Key Mapping
{prefix}/{user_id}/fs/{path} # Admin filesystem
{prefix}/{admin_id}/svc/{svc_id}/{conv_id}/gen/{path} # Consumer generated filesNote: JSON config files (users.json, conversations, service configs) currently remain on local disk. S3 mode only hosts the filesystem layer.
4.7.4 Media Access Differences
| Mode | /api/files/media Behavior |
|---|---|
local | FileResponse streams file directly |
s3 | Generates presigned URL, returns 302 redirect |
4.7.5 Factory Functions (app/storage/__init__.py)
get_storage_service() → StorageService
create_agent_backend(root_dir, user_id=None) → BackendProtocol # Admin
create_consumer_backend(admin_id, svc_id, conv_id, gen_dir) → BackendProtocol4.7.6 Script Execution
- Local mode: reads scripts directly from
users/{uid}/filesystem/scripts/ - S3 mode: temporarily downloads to local → subprocess execution → uploads results back to S3
Full directory tree, JSON schemas, and 6 message flow sequences: see
docs/filesystem-architecture.md.
4.8 Security Architecture
4.8.1 Path Traversal Protection (app/core/path_security.py)
safe_join(base, user_path) → str # Safe path joining
ensure_within(path, root) → bool # Verify path is within root directoryImplementation: pathlib.Path.resolve() + separator-aware boundary check (startswith(root + os.sep)). Be cautious of mixed-case paths on case-insensitive Windows file systems.
4.8.2 Script Sandbox (Two-Layer Defense-in-Depth)
Layer 1: AST Static Analysis (script_runner._check_script_safety)
- Blocks dangerous modules:
subprocess/pathlib/ctypes/io/pickle/threading/posix/nt/_posixsubprocess - Blocks dangerous builtins:
exec/eval/getattr/setattr/globals - Blocks "absolutely dangerous" os functions:
system/popen/exec*/spawn*/fork/kill/chown/setuid/chroot/chdir - Blocks access to
__builtins__/__subclasses__/__globals__/__dict__/__mro__/__bases__ - Rejects
Subscript/Callforms of function calls (closesos.__dict__['system'](...)/(lambda:...)()attack vectors) - File I/O functions (remove / rename / mkdir / listdir / chmod, etc.) are intentionally not in AST blocklist — they are intercepted at runtime by path whitelist
Layer 2: Runtime Sandbox (_sandbox_wrapper.py)
- Monkey-patches
builtins.open/io.open/os.listdir/os.scandir/os.walk/os.chdir/os.open/os.readlinkto enforce read permissions - Write operations (remove / unlink / rmdir / rename / replace / mkdir / makedirs / chmod / chown / link / symlink / utime / truncate / mkfifo / mknod) go through
_check_writepath whitelist - "Absolutely dangerous" functions (system / popen / exec* / spawn* / posix_spawn* / fork / forkpty / kill / killpg / chown / lchown / setuid / setgid / setres*id / chroot / pipe / pipe2 / dup / dup2) are overwritten with
PermissionError-raising functions — even if attackers obtain references viaos.__dict__[name]/vars(os)[name], calls still raise errors
Sandbox Permission Configuration:
| Role | read | write |
|---|---|---|
| Admin | scripts/ + docs/ | scripts/ + generated/ |
| Consumer | scripts/ + docs/ (filtered by allowed_scripts) | conversation's own generated/ |
| Scheduled Task | task_config.permissions.read_dirs | task_config.permissions.write_dirs |
Auto-exempted sandbox read paths:
_PYTHON_READ_ROOTS:sys.prefix/sys.base_prefix/sys.exec_prefix/site.getsitepackages()— allowsimport matplotlib_SYSTEM_READ_DIRS:/usr/share//etc/fonts//etc/ssl/ macOS font directories_TEMP_DIR(write):tempfile.gettempdir()— library cache writesMPLCONFIGDIRredirected to temp directory
Resource Limits (tuned 2026-04-18):
| Limit | Value | Reason |
|---|---|---|
_MAX_NPROC | 256 | numpy/OpenBLAS crashes with limit of 16 |
_MEMORY_LIMIT_BYTES | 1024 MB | pandas/matplotlib headroom |
OPENBLAS_NUM_THREADS etc. | 2 | Prevents single script consuming all CPU |
_SCRIPT_SEMAPHORE | 4 | Global concurrency (SCRIPT_CONCURRENCY override) |
_QUEUE_TIMEOUT | 180s | Queue timeout (SCRIPT_QUEUE_TIMEOUT override) |
Linux note:
RLIMIT_NPROClimits the total number of processes/threads for the current uid (pthread = LWP). On Windows,preexec_fn=None— noresource.setrlimit.
4.8.3 XSS Protection
- Backend:
consumer_ui.py/wechat_ui.pyusehtml.escape()for template injection _safe_json_for_inline_scriptescapes</to<\/to prevent script breakout- Frontend:
marked.parse()output sanitized byDOMPurify.sanitize(), whitelist includesaudio/video/iframe
4.8.4 Encryption
- Master Key:
data/encryption.key(auto-generated first time, or override withENCRYPTION_KEYenv var) - API Key encryption: AES-256-GCM, stored in
users/{uid}/api_keys.json - WeChat media: AES-128-ECB (required by iLink protocol)
4.9 Per-Admin Python venv
- Module:
app/services/venv_manager.py - Directory:
users/{uid}/venv/, each Admin has an isolated Python virtualenv - Creation:
--system-site-packagesto inherit system pre-installed packages - Persistence: packages installed by users are recorded in
users/{uid}/venv/requirements.txt - Script execution:
tools.py::create_run_script_toolusesget_user_python(user_id); Consumer uses admin's venv - API:
GET /api/packages— list installed packages + venv statusPOST /api/packages/init— initialize user venvPOST /api/packages/install— install package (name cannot contain `;|&$`` injection characters)POST /api/packages/uninstall— uninstall package
- Startup restore:
main.pystartup callsrestore_all_venvs(), autopip install -rfor users withrequirements.txt
4.10 Per-Admin API Keys
4.10.1 Design
- Each Admin can configure their own OpenAI / Anthropic / Tavily / multimedia keys in Settings → General
- AES-256-GCM encrypted storage
- Priority chain:
user config > environment variables > not configured (prompt to set) - Admin's Agents (main Agent / Subagent / Consumer Agent) all use that Admin's keys
4.10.2 Call Chain
agent.py::_resolve_model(model_id, user_id)
└── api_config.get_openai_llm_config(user_id)
└── user_api_keys.get_user_api_keys(user_id)
└── encryption.decrypt(...)
consumer_agent.create_consumer_agent(admin_id, ...)
└── _resolve_model(model_id, user_id=admin_id)
ai_tools.generate_*(user_id=admin_id)
└── api_config.get_api_config("image", user_id)
web_tools.web_search(query, user_id=admin_id)4.10.3 Supported Fields
- Keys:
openai_api_key/anthropic_api_key/tavily_api_key/cloudsway_search_key/image_api_key/tts_api_key/video_api_key/s2s_api_key/stt_api_key - URLs:
openai_base_url/anthropic_base_url/image_base_url/tts_base_url/video_base_url/s2s_base_url/stt_base_url
4.10.4 API Endpoints
| Method | Path | Description |
|---|---|---|
| GET | /api/settings/api-keys | Masked response + *_configured flags |
| PUT | /api/settings/api-keys | Encrypted save (triggers clear_agent_cache + clear_consumer_cache) |
| POST | /api/settings/api-keys/test | Test connectivity (openai / anthropic / tavily / all) |
| GET | /api/settings/api-keys/status | Quick check if any LLM provider is available |
4.10.5 Cache Invalidation
After key update, automatically calls clear_agent_cache(user_id) + clear_consumer_cache(admin_id=user_id) — next Agent request creates a fresh instance.