开发者指南 · 08

定时任务(Scheduler)

8.1 调度器设计

  • app/services/scheduler.py::TaskScheduler 单例,asyncio 循环每 30s 检查
  • main.py startup 启动,shutdown 优雅停止

8.2 任务存储

类型路径前缀
Admin 任务users/{uid}/tasks/{task_id}.jsontask_*
Service 任务users/{uid}/services/{svc}/tasks/{task_id}.jsonstask_*

每个任务文件含最近 20 条运行记录。

8.3 任务类型

类型说明支持范围
script执行 scripts/ 下的 Python 脚本仅 Admin
agent执行 Agent 任务(prompt + 可选文档上下文)Admin + Service

8.4 调度类型

类型说明示例
once一次性2026-12-31T09:00:00+08:00(建议带时区后缀)
cronCron 表达式0 9 * * *
interval间隔(秒)3600

8.5 时区处理

  • 每个任务存 tz_offset_hours 字段(创建时的用户时区偏移)
  • cron 按用户时区解释_next_cron UTC now → 用户本地 → croniter → 转回 UTC
  • once 必须带时区后缀:无 tz 时按 tz_offset_hours 补充,工具层 _ensure_tz_suffix 兜底
  • interval 不受时区影响:直接按秒数偏移
  • 缺字段的旧任务用 _resolve_task_tz_offset(task) 回退 get_tz_offset(user_id)(与 preferences 默认 +8 一致)
  • React Scheduler 创建/更新任务时 body 必须带 tz_offset_hours: getTzOffset()

8.6 reply_to 路由

Service 任务通过 reply_to 字段控制结果推送目标:

json
{
  "reply_to": {
    "channel": "wechat | inbox | admin_chat",
    "admin_id": "...",
    "service_id": "...",
    "conversation_id": "...",
    "session_id": "wechat_user_xxx"
  }
}
channel推送目标
wechatdelivery.py::deliver_tool_message 投递到 WeChat 用户
inbox写入 Admin inbox
admin_chat写入 Admin 普通对话

_run_service_agent_task 在 agent 执行循环中实时拦截 send_message 工具调用,通过 delivery.py 发送到 WeChat(支持文本+媒体);_deliver_reply 仅在 agent 未使用 send_message 时作为兜底(纯文本摘要)。

8.7 运行记录步骤

每条 run record 含 steps[]

步骤类型说明
start开始执行
docs_loaded文档加载完成
loopAgent 循环迭代
tool_call工具调用
tool_result工具结果
ai_messageAI 消息
auto_approve自动审批(HITL)
wechat_warningWeChat client 不可用警告
wechat_errorWeChat 投递失败
finish完成
error错误
reply兜底推送

8.8 任务结果持久化

_run_agent_loop 结束后调用 save_message / save_consumer_message 写入对话 JSON,含:

  • source: "scheduled_task""admin_broadcast" 标记
  • 完整 blocks[]

8.9 sync→async 主循环桥接

LangChain sync tool(如 contact_admin)通过 BaseTool._arunrun_in_executor 线程池执行,无 event loop。修复:

python
# inbox.py / scheduler.py
def set_main_loop(loop: asyncio.AbstractEventLoop):
    global _main_loop
    _main_loop = loop

# 调度时:
try:
    loop = asyncio.get_running_loop()
    loop.create_task(coro)
except RuntimeError:
    if _main_loop is not None and _main_loop.is_running():
        asyncio.run_coroutine_threadsafe(coro, _main_loop)

8.10 Service 任务专用工具

Consumer agent 通过:

  • create_service_schedule_tool 注入 schedule_task
  • create_service_manage_tasks_tool 注入 manage_scheduled_tasks(仅 "scheduler" 在 capabilities 中时)
  • Service 的 manage_scheduled_tasks 仅能操作当前 conversation_id 的任务(权限隔离)

publish_service_task(Admin 工具):

  • service_ids 支持 ID 和名称匹配(大小写不敏感),未匹配时返回可用 Service 列表
  • session_ids 可选参数,精确到单个微信会话
  • run_now 调度使用 _schedule_coro 线程安全模式