后端架构
4.1 目录结构
app/
├── main.py # FastAPI 应用入口、路由注册、startup/shutdown 钩子
├── deps.py # get_current_user / get_service_context 依赖
│
├── core/
│ ├── settings.py # 路径常量(ROOT_DIR、USERS_DIR、DATA_DIR)
│ ├── security.py # 用户认证(注册/登录/JWT)+ _create_user_dirs
│ ├── api_config.py # 按能力路由 API Key + base_url(支持 user_id)
│ ├── user_api_keys.py # AES-256-GCM 加密的 per-admin key 存储
│ ├── encryption.py # AES-GCM master key(data/encryption.key)
│ ├── path_security.py # safe_join / ensure_within(路径遍历防护)
│ ├── fileutil.py # 文件读写工具
│ └── observability.py # Langfuse 集成(v3 SDK)
│
├── schemas/
│ ├── requests.py # Admin 请求模型
│ └── service.py # Service / Consumer 请求/配置模型
│
├── services/
│ ├── agent.py # Admin Agent 工厂 + 模型解析 + cache
│ ├── consumer_agent.py # Consumer Agent 工厂(channel-aware)
│ ├── tools.py # @tool 工厂 + CAPABILITY_PROMPTS
│ ├── ai_tools.py # 多媒体生成(image/tts/video)底层
│ ├── web_tools.py # CloudsWay / Tavily 双 provider
│ ├── script_runner.py # 脚本执行(subprocess + AST 检查 + 信号量队列)
│ ├── _sandbox_wrapper.py # 运行时 I/O 沙箱(monkey-patch)
│ ├── conversations.py # Admin 对话持久化 + 附件
│ ├── prompt.py # System Prompt 版本管理 + capability prompts
│ ├── preferences.py # 用户时区/界面偏好
│ ├── subagents.py # Subagent 配置 + DEFAULT_SUBAGENTS
│ ├── memory_tools.py # Memory Subagent 工具 + Soul 配置 + 短期记忆
│ ├── published.py # Service CRUD + API Key + Consumer 会话
│ ├── scheduler.py # 定时任务(Admin + Service 双轨)
│ ├── inbox.py # 收件箱(contact_admin → Inbox Agent → WeChat 转发)
│ └── venv_manager.py # per-user Python 虚拟环境
│
├── routes/ # 详见 §4.2 路由总表
│ ├── auth.py
│ ├── conversations.py
│ ├── chat.py
│ ├── files.py
│ ├── scripts.py
│ ├── models.py
│ ├── settings_routes.py # system_prompt / user_profile / subagents / api_keys / soul / capability_prompts
│ ├── batch.py
│ ├── services.py
│ ├── consumer.py # /api/v1/*(Consumer 对外)
│ ├── consumer_ui.py # /s/{service_id}(Consumer 聊天页)
│ ├── scheduler.py
│ ├── inbox.py
│ └── wechat_ui.py # /wc/{service_id}(WeChat 扫码中间页)
│
├── channels/wechat/
│ ├── client.py # iLink 协议客户端(getconfig / getupdates / sendmessage / cdn)
│ ├── bridge.py # Service Consumer Bridge(multimodal + send_message 拦截)
│ ├── admin_bridge.py # Admin 自接入 Bridge(独立逻辑)
│ ├── admin_router.py # /api/admin/wechat/* 路由
│ ├── router.py # /api/wc/* 公开路由 + Admin 服务渠道管理
│ ├── session_manager.py # 会话生命周期 + 持久化 + 重连
│ ├── media.py # AES-128-ECB + CDN 上传/下载
│ ├── delivery.py # 统一投递(含 <<FILE:>> 标签解析)
│ └── rate_limiter.py # 单用户/QR/全局 session 频率限制
│
├── storage/
│ ├── config.py # S3Config + is_s3_mode
│ ├── base.py # StorageService ABC
│ ├── local.py # 本地文件系统实现
│ ├── s3.py # boto3 S3 实现(兼容 MinIO/R2/OSS)
│ ├── s3_backend.py # deepagents BackendProtocol for S3
│ └── __init__.py # get_storage_service / create_agent_backend / create_consumer_backend
│
└── voice/
└── router.py # WebSocket S2S 代理(OpenAI Realtime)4.2 FastAPI 路由总表
app/main.py 中统一注册,当前约 70 条路由。
公开 / 认证
| 前缀 | 路由 | 说明 |
|---|---|---|
/api/auth | register / login / me | 用户注册(需注册码)+ 登录 + JWT |
Admin(需 get_current_user)
| 前缀 | 用途 |
|---|---|
/api/conversations | 对话 CRUD + 附件 |
/api/chat | 主 SSE 聊天 + /resume + /stop + /streaming-status |
/api/files | 文件 CRUD + 上传 + 下载 + media |
/api/scripts | 脚本执行 + audio 转写 |
/api/models | 可用模型列表 |
/api/system-prompt | Prompt 内容 + 版本管理 |
/api/user-profile | 用户画像 + 版本管理 |
/api/subagents | Subagent CRUD + available_tools |
/api/capability-prompts | 能力提示词 per-user 覆盖 |
/api/soul/config | Soul 配置(GET/PUT) |
/api/batch | Excel 批量执行 |
/api/services | Service CRUD + API Key |
/api/scheduler | Admin 定时任务 + 运行记录 |
/api/scheduler/services/... | Service 定时任务 |
/api/inbox | 收件箱(list/get/update/delete) |
/api/packages | per-user venv 包管理 |
/api/settings/api-keys | per-admin API Key(加密) |
/api/wc/... | WeChat Service 渠道(QR + sessions + messages) |
/api/admin/wechat/* | Admin 自接入微信 |
/api/voice/... | WebSocket S2S 代理 |
Consumer(需 get_service_context)
| 路由 | 说明 |
|---|---|
POST /api/v1/conversations | 创建对话 |
GET /api/v1/conversations/{id} | 获取对话历史 |
GET /api/v1/conversations/{id}/files | 列出生成文件 |
GET /api/v1/conversations/{id}/files/{path} | 下载生成文件(query ?key=) |
GET /api/v1/conversations/{id}/attachments/{path} | 下载用户附件 |
POST /api/v1/chat | 自定义 SSE 聊天 |
POST /api/v1/chat/completions | OpenAI 兼容(流式 + 非流式) |
静态页面
| 路由 | 说明 |
|---|---|
GET /s/{service_id} | Consumer 独立聊天页(React multi-entry) |
GET /wc/{service_id} | WeChat 扫码中间页(HTML 模板) |
4.3 依赖注入与认证
app/deps.py 提供两个核心依赖:
async def get_current_user(token: str = Depends(oauth2_scheme)) -> dict
async def get_service_context(authorization: str = Header(...)) -> ServiceContext| 维度 | Admin | Consumer |
|---|---|---|
| 凭证 | JWT Bearer Token | sk-svc-... Bearer Token |
| 存储 | users/users.json(bcrypt + sha256 fallback) | users/{admin}/services/{svc}/keys.json(sha256 hashed) |
| 上下文 | {user_id, username} | ServiceContext{admin_id, service_id, service_config, key_id} |
| 适用路由 | /api/*(除 /api/v1/*) | /api/v1/* + /s/{sid} |
同一台 FastAPI 内两套依赖共存,路由文件需明确选择,Consumer 路由不要错用
get_current_user。
4.4 Agent 引擎
4.4.1 Admin Agent (app/services/agent.py)
- 创建:
create_user_agent(user_id, model_id, capabilities, plan_mode, ...) - 缓存:per-(user_id, model_id, capabilities, plan_mode, channel) 缓存,避免重复初始化
- Checkpointer:
AsyncSqliteSaver→data/checkpoints.db- 启动时执行
PRAGMA journal_mode=WAL+PRAGMA synchronous=NORMAL,降低多 bridge 并发的锁冲突
- 启动时执行
- Soul 注入:根据
soul/config.json决定是否注入memory_subagent/soul_edit能力提示词 - 缓存失效:
clear_agent_cache(user_id)— 在 prompt / subagent / API key 变更时调用
4.4.2 模型解析
# 模型 ID 格式
"anthropic:claude-sonnet-4-6-20250929"
"openai:gpt-5.4"
# Thinking 模型自动配置 extended_thinking
THINKING_MODEL_CONFIG = {
"anthropic:claude-opus-4-6-20250929-thinking": {...},
...
}
# api_key + base_url 由 _resolve_model 通过 api_config + user_api_keys 解析
_resolve_model(model_id, user_id=None) → ChatModel4.4.3 Consumer Agent (app/services/consumer_agent.py)
- 工厂:
create_consumer_agent(admin_id, service_id, conv_id, channel="web", ...) - channel 感知(重要!):
channel="web":Web /s/ +/api/v1/*,不注入send_message(消息已直接走 SSE)channel="wechat":注入send_message,结果由delivery.py反向投递到 WeChatchannel="scheduler":同 wechat,定时任务推送结果
- cache_key:包含
::ch={channel}(非 web 时),避免错误复用 - 能力受限:仅根据 Service
capabilities列表注入工具 - 文件系统:
users/{admin_id}/services/{svc}/conversations/{conv_id}/generated/隔离
4.4.4 消息时间戳注入(2026-04-13 重构)
精确时间不再写入 system prompt(按天缓存会冻结时间),改为每条用户消息前注入 [YYYY-MM-DD HH:MM:SS]:
prompt.py::stamp_message(content, user_id)处理str和 multimodallist两种格式- 注入点:
chat.py(Web)、admin_bridge.py(Admin WeChat)、consumer.py(Consumer × 3 处)、bridge.py(Service WeChat) - Consumer 端使用
admin_id获取时区
4.5 工具系统
4.5.1 内置工具 (app/services/tools.py)
| 工具 | 说明 | Admin | Consumer 注入条件 |
|---|---|---|---|
read_file / write_file / ls / glob / grep | deepagents 内置 | ✅ 始终 | ✅ 始终 |
edit_file / write_todos | deepagents 内置 | ✅ | ✅ |
task | deepagents subagent 调度 | ✅ | ✅ |
run_python_script | 沙箱脚本执行 | ✅ | ✅ |
web_search / web_fetch | CloudsWay / Tavily | ✅ 始终注入 | web capability |
generate_image / generate_speech / generate_video | OpenAI 多媒体 | 按 capability | 按 capability |
schedule_task / manage_scheduled_tasks | 定时任务 CRUD | ✅ 始终 | scheduler capability |
publish_service_task | Admin 给 Service 派发任务 | ✅ 始终 | ❌ |
send_message | 发送给微信用户 | wechat 渠道注入 | humanchat capability + 非 web channel |
contact_admin | Service 通知管理员 | ❌ | humanchat capability |
soul_list / soul_read / soul_write / soul_delete | Soul 文件操作 | Memory Subagent 内(且需 memory_subagent_enabled) | ❌ |
4.5.2 多媒体工具 (app/services/ai_tools.py)
底层调用 OpenAI API:
generate_image:gpt-image-1,输出存到generated/images/generate_speech:tts-1/tts-1-hd(6 种音色),存到generated/audio/generate_video:Sora 2,存到generated/videos/
返回约定:返回提示语 "已生成 ... 请使用 <<FILE:/generated/xxx>> 展示给用户",前端 markdown.ts 和 delivery.py::extract_media_tags 都识别此标签。
4.5.3 联网工具 (app/services/web_tools.py)
双 provider,按 user_id 解析 key:
| Provider | Key 来源 | 备注 |
|---|---|---|
| CloudsWay(优先) | CLOUDSWAY_SEARCH_KEY | CLOUDSWAY_SEARCH_URL / CLOUDSWAY_READ_URL 可覆盖端点 |
| Tavily(备选) | TAVILY_API_KEY | 自动 fallback |
4.5.4 Capability Prompts
tools.py::CAPABILITY_PROMPTS定义每个能力的工具使用规则- per-user 覆盖:
users/{uid}/capability_prompts.json(仅存覆盖项) - API:
GET /api/capability-prompts(含is_custom)、PUT /key、DELETE /key - 解析:
prompt.py::get_resolved_capability_prompt(user_id, key)
4.6 Subagent / Memory / Soul
4.6.1 Subagent (app/services/subagents.py)
- 存储:
users/{uid}/subagents.json - DEFAULT_SUBAGENTS:包含一个内置
memorysubagent(不可删,但可禁用) - 可用工具池(
SHARED_TOOL_NAMES + MEMORY_TOOL_NAMES):- 通用:
run_script/web_search/web_fetch/generate_image/generate_speech/generate_video/schedule_task/manage_scheduled_tasks/publish_service_task/send_message - 记忆:
list_conversations/read_conversation/list_service_conversations/read_service_conversation/read_inbox/soul_list/soul_read/soul_write/soul_delete
- 通用:
- 按需创建:
build_subagent_tools(subagent_config, user_id)仅实例化配置中列出的工具 - API:
GET /api/subagents(含available_tools)+ CRUD
4.6.2 Memory Subagent
- Admin Memory Subagent:默认工具 = 5 个对话/inbox 读工具
memory_subagent_enabled=true时追加 4 个 soul 写工具- 对话历史只读,soul 内容可读写
- Consumer Memory Subagent:仅
read_my_conversation(自身对话只读) - 工厂:
create_admin_memory_tools(user_id)→ 5~9 个工具create_consumer_memory_tools(admin_id, svc_id, conv_id)→ 1 个工具
4.6.3 Soul 系统 (app/services/memory_tools.py)
users/{uid}/
├── soul/
│ └── config.json # 应用层配置(agent 不可访问)
└── filesystem/soul/ # agent 可读写的 Soul 内容(笔记/人格)
└── *.md / *.json- config.json 字段:
memory_enabled:开启短期记忆注入include_consumer_conversations:Memory subagent 是否能读 Consumer 对话max_recent_messages:默认 5 条memory_subagent_enabled:是否给 Memory Subagent soul 写权限soul_edit_enabled:主 Agent 是否能直接通过filesystem/soul/读写 Soul
- 路径迁移:
sync_soul_symlink()会移除旧 symlink、自动迁移内容到filesystem/soul/(避免 deepagentsPath.resolve()跟随符号链接的逃逸报错) - 能力提示词:
memory_subagent/soul_edit在CAPABILITY_PROMPTS中按开关注入
4.6.4 短期记忆注入
scheduler.py::_run_*_agent_task:从对话 JSON 读最近 N 条消息拼入 prompt 前缀inbox.py::_trigger_inbox_agent:注入最近 3 条 inbox 历史- 消息来源标注:
- Service 定时任务 prompt 头部
[系统指令 - 来自管理员] - Inbox agent prompt 头部
[系统指令 - Service 收件箱通知]
- Service 定时任务 prompt 头部
4.7 存储层
4.7.1 抽象接口 (app/storage/base.py)
class StorageService(ABC):
async def read_file(self, path: str) -> bytes
async def write_file(self, path: str, content: bytes) -> None
async def list_directory(self, path: str) -> list
async def delete(self, path: str) -> None
async def exists(self, path: str) -> bool
async def move(self, src: str, dst: str) -> None4.7.2 后端选择
通过 STORAGE_BACKEND 环境变量:
local(默认):LocalStorageService—os.*操作本地磁盘s3:S3StorageService— boto3 API(兼容 AWS S3 / MinIO / R2 / 阿里 OSS)
4.7.3 S3 键映射
{prefix}/{user_id}/fs/{path} # Admin 文件系统
{prefix}/{admin_id}/svc/{svc_id}/{conv_id}/gen/{path} # Consumer 生成文件注意:JSON 配置文件(users.json、conversations、services config 等)目前仍走本地盘。S3 模式仅托管文件系统层。
4.7.4 媒体访问差异
| 模式 | /api/files/media 行为 |
|---|---|
local | FileResponse 直接流文件 |
s3 | 生成 presigned URL,返回 302 重定向 |
4.7.5 工厂函数 (app/storage/__init__.py)
get_storage_service() → StorageService
create_agent_backend(root_dir, user_id=None) → BackendProtocol # Admin
create_consumer_backend(admin_id, svc_id, conv_id, gen_dir) → BackendProtocol4.7.6 脚本执行
- 本地模式:直接读
users/{uid}/filesystem/scripts/下的脚本 - S3 模式:临时下载到本地 → 子进程执行 → 结果上传回 S3
完整的目录树、JSON Schema 和 6 大消息流详见
docs/filesystem-architecture.md。
4.8 安全架构
4.8.1 路径遍历防护 (app/core/path_security.py)
safe_join(base, user_path) → str # 安全路径拼接
ensure_within(path, root) → bool # 验证路径在根目录内实现:pathlib.Path.resolve() + 分隔符感知的边界检查(startswith(root + os.sep))。Windows 大小写不敏感场景下混合大小写路径理论上可能误判,需注意。
4.8.2 脚本沙箱(双层 Defense-in-Depth)
第一层:AST 静态分析 (script_runner._check_script_safety)
- 禁止危险模块:
subprocess/pathlib/ctypes/io/pickle/threading/posix/nt/_posixsubprocess - 禁止危险内置:
exec/eval/getattr/setattr/globals - 禁止"绝对危险"的 os 函数:
system/popen/exec*/spawn*/fork/kill/chown/setuid/chroot/chdir - 禁止访问
__builtins__/__subclasses__/__globals__/__dict__/__mro__/__bases__ - 拒绝
Subscript/Call形式的函数调用(封堵os.__dict__['system'](...)/(lambda:...)()) - 文件 I/O 函数(remove / rename / mkdir / listdir / chmod 等)故意不在 AST 黑名单,仅在运行时按路径白名单拦截
第二层:运行时沙箱 (_sandbox_wrapper.py)
- Monkey-patch
builtins.open/io.open/os.listdir/os.scandir/os.walk/os.chdir/os.open/os.readlink强制读权限 - 写操作(remove / unlink / rmdir / rename / replace / mkdir / makedirs / chmod / chown / link / symlink / utime / truncate / mkfifo / mknod)走
_check_write路径白名单 - "绝对危险"函数(system / popen / exec* / spawn* / posix_spawn* / fork / forkpty / kill / killpg / chown / lchown / setuid / setgid / setres*id / chroot / pipe / pipe2 / dup / dup2)直接覆盖为
PermissionError报错函数,即使攻击者通过os.__dict__[name]/vars(os)[name]拿到引用,调用时仍然抛错
沙箱权限配置:
| 角色 | read | write |
|---|---|---|
| Admin | scripts/ + docs/ | scripts/ + generated/ |
| Consumer | scripts/ + docs/(按 allowed_scripts 过滤) | conversation 自己的 generated/ |
| 定时任务 | task_config.permissions.read_dirs | task_config.permissions.write_dirs |
Sandbox read 豁免(自动):
_PYTHON_READ_ROOTS:sys.prefix/sys.base_prefix/sys.exec_prefix/site.getsitepackages()— 允许import matplotlib_SYSTEM_READ_DIRS:/usr/share//etc/fonts//etc/ssl/ macOS 字体目录_TEMP_DIR(write):tempfile.gettempdir()— 库缓存写入MPLCONFIGDIR重定向到 temp 目录
资源限制(2026-04-18 调优):
| 限制 | 值 | 说明 |
|---|---|---|
_MAX_NPROC | 256 | numpy/OpenBLAS 在 16 配额下崩溃 |
_MEMORY_LIMIT_BYTES | 1024 MB | pandas/matplotlib 余量 |
OPENBLAS_NUM_THREADS 等 | 2 | 防止单脚本占满 CPU |
_SCRIPT_SEMAPHORE | 4 | 全局并发数(SCRIPT_CONCURRENCY 覆盖) |
_QUEUE_TIMEOUT | 180s | 排队超时(SCRIPT_QUEUE_TIMEOUT 覆盖) |
Linux 注意:
RLIMIT_NPROC限制的是当前 uid 的进程/线程总数(pthread = LWP)。Windows 上preexec_fn=None,无resource.setrlimit。
4.8.3 XSS 防护
- 后端:
consumer_ui.py/wechat_ui.py使用html.escape()处理模板注入 _safe_json_for_inline_script把</转义为<\/防 script breakout- 前端:
marked.parse()输出经DOMPurify.sanitize()清洗,白名单含audio/video/iframe
4.8.4 加密
- Master Key:
data/encryption.key(首次生成,或ENCRYPTION_KEY环境变量覆盖) - API Key 加密:AES-256-GCM,存储在
users/{uid}/api_keys.json - WeChat 媒体:AES-128-ECB(iLink 协议要求)
4.9 Per-Admin Python venv
- 模块:
app/services/venv_manager.py - 目录:
users/{uid}/venv/,每个 Admin 独立的 Python 虚拟环境 - 创建:
--system-site-packages继承系统预装包 - 持久化:用户安装的包记录到
users/{uid}/venv/requirements.txt - 脚本执行:
tools.py::create_run_script_tool使用get_user_python(user_id);Consumer 使用 admin 的 venv - API:
GET /api/packages— 列出已安装包 + venv 状态POST /api/packages/init— 初始化用户 venvPOST /api/packages/install— 安装包(包名禁止;|&$`等注入字符)POST /api/packages/uninstall— 卸载包
- 启动恢复:
main.pystartup 调用restore_all_venvs(),对有requirements.txt的用户自动pip install -r
4.10 Per-Admin API Key
4.10.1 设计
- 每个 Admin 在 Settings → General 配置自己的 OpenAI / Anthropic / Tavily / 多媒体 key
- AES-256-GCM 加密存储
- 优先级链:
用户配置 > 环境变量 > 未配置(提醒设置) - Admin 的所有 Agent(主 Agent / Subagent / Consumer Agent)统一使用该 Admin 的 key
4.10.2 调用链
agent.py::_resolve_model(model_id, user_id)
└── api_config.get_openai_llm_config(user_id)
└── user_api_keys.get_user_api_keys(user_id)
└── encryption.decrypt(...)
consumer_agent.create_consumer_agent(admin_id, ...)
└── _resolve_model(model_id, user_id=admin_id)
ai_tools.generate_*(user_id=admin_id)
└── api_config.get_api_config("image", user_id)
web_tools.web_search(query, user_id=admin_id)4.10.3 支持的字段
- 密钥:
openai_api_key/anthropic_api_key/tavily_api_key/cloudsway_search_key/image_api_key/tts_api_key/video_api_key/s2s_api_key/stt_api_key - URL:
openai_base_url/anthropic_base_url/image_base_url/tts_base_url/video_base_url/s2s_base_url/stt_base_url
4.10.4 API 端点
| 方法 | 路径 | 说明 |
|---|---|---|
| GET | /api/settings/api-keys | 脱敏返回 + *_configured 标记 |
| PUT | /api/settings/api-keys | 加密保存(触发 clear_agent_cache + clear_consumer_cache) |
| POST | /api/settings/api-keys/test | 测试连通性(openai / anthropic / tavily / all) |
| GET | /api/settings/api-keys/status | 快速检查是否有 LLM provider 可用 |
4.10.5 缓存失效
Key 更新后自动调用 clear_agent_cache(user_id) + clear_consumer_cache(admin_id=user_id),下次 Agent 请求重新创建。