开发者指南 · 08
定时任务(Scheduler)
8.1 调度器设计
app/services/scheduler.py::TaskScheduler单例,asyncio 循环每 30s 检查main.pystartup 启动,shutdown 优雅停止
8.2 任务存储
| 类型 | 路径 | 前缀 |
|---|---|---|
| Admin 任务 | users/{uid}/tasks/{task_id}.json | task_* |
| Service 任务 | users/{uid}/services/{svc}/tasks/{task_id}.json | stask_* |
每个任务文件含最近 20 条运行记录。
8.3 任务类型
| 类型 | 说明 | 支持范围 |
|---|---|---|
script | 执行 scripts/ 下的 Python 脚本 | 仅 Admin |
agent | 执行 Agent 任务(prompt + 可选文档上下文) | Admin + Service |
8.4 调度类型
| 类型 | 说明 | 示例 |
|---|---|---|
once | 一次性 | 2026-12-31T09:00:00+08:00(建议带时区后缀) |
cron | Cron 表达式 | 0 9 * * * |
interval | 间隔(秒) | 3600 |
8.5 时区处理
- 每个任务存
tz_offset_hours字段(创建时的用户时区偏移) - cron 按用户时区解释:
_next_cronUTC now → 用户本地 → croniter → 转回 UTC - once 必须带时区后缀:无 tz 时按
tz_offset_hours补充,工具层_ensure_tz_suffix兜底 - interval 不受时区影响:直接按秒数偏移
- 缺字段的旧任务用
_resolve_task_tz_offset(task)回退get_tz_offset(user_id)(与 preferences 默认 +8 一致) - React Scheduler 创建/更新任务时 body 必须带
tz_offset_hours: getTzOffset()
8.6 reply_to 路由
Service 任务通过 reply_to 字段控制结果推送目标:
json
{
"reply_to": {
"channel": "wechat | inbox | admin_chat",
"admin_id": "...",
"service_id": "...",
"conversation_id": "...",
"session_id": "wechat_user_xxx"
}
}| channel | 推送目标 |
|---|---|
wechat | delivery.py::deliver_tool_message 投递到 WeChat 用户 |
inbox | 写入 Admin inbox |
admin_chat | 写入 Admin 普通对话 |
_run_service_agent_task 在 agent 执行循环中实时拦截 send_message 工具调用,通过 delivery.py 发送到 WeChat(支持文本+媒体);_deliver_reply 仅在 agent 未使用 send_message 时作为兜底(纯文本摘要)。
8.7 运行记录步骤
每条 run record 含 steps[]:
| 步骤类型 | 说明 |
|---|---|
start | 开始执行 |
docs_loaded | 文档加载完成 |
loop | Agent 循环迭代 |
tool_call | 工具调用 |
tool_result | 工具结果 |
ai_message | AI 消息 |
auto_approve | 自动审批(HITL) |
wechat_warning | WeChat client 不可用警告 |
wechat_error | WeChat 投递失败 |
finish | 完成 |
error | 错误 |
reply | 兜底推送 |
8.8 任务结果持久化
_run_agent_loop 结束后调用 save_message / save_consumer_message 写入对话 JSON,含:
source: "scheduled_task"或"admin_broadcast"标记- 完整
blocks[]
8.9 sync→async 主循环桥接
LangChain sync tool(如 contact_admin)通过 BaseTool._arun 在 run_in_executor 线程池执行,无 event loop。修复:
python
# inbox.py / scheduler.py
def set_main_loop(loop: asyncio.AbstractEventLoop):
global _main_loop
_main_loop = loop
# 调度时:
try:
loop = asyncio.get_running_loop()
loop.create_task(coro)
except RuntimeError:
if _main_loop is not None and _main_loop.is_running():
asyncio.run_coroutine_threadsafe(coro, _main_loop)8.10 Service 任务专用工具
Consumer agent 通过:
create_service_schedule_tool注入schedule_taskcreate_service_manage_tasks_tool注入manage_scheduled_tasks(仅"scheduler"在 capabilities 中时)- Service 的
manage_scheduled_tasks仅能操作当前 conversation_id 的任务(权限隔离)
publish_service_task(Admin 工具):
service_ids支持 ID 和名称匹配(大小写不敏感),未匹配时返回可用 Service 列表session_ids可选参数,精确到单个微信会话run_now调度使用_schedule_coro线程安全模式