Developer Guide · 08
Scheduler (Scheduled Tasks)
8.1 Scheduler Design
app/services/scheduler.py::TaskSchedulersingleton, asyncio loop checks every 30s- Started by
main.pystartup, gracefully stopped on shutdown
8.2 Task Storage
| Type | Path | Prefix |
|---|---|---|
| Admin tasks | users/{uid}/tasks/{task_id}.json | task_* |
| Service tasks | users/{uid}/services/{svc}/tasks/{task_id}.json | stask_* |
Each task file contains the last 20 run records.
8.3 Task Types
| Type | Description | Scope |
|---|---|---|
script | Execute Python script under scripts/ | Admin only |
agent | Execute Agent task (prompt + optional document context) | Admin + Service |
8.4 Schedule Types
| Type | Description | Example |
|---|---|---|
once | One-time | 2026-12-31T09:00:00+08:00 (include timezone suffix) |
cron | Cron expression | 0 9 * * * |
interval | Interval (seconds) | 3600 |
8.5 Timezone Handling
- Each task stores
tz_offset_hoursfield (user's timezone offset at creation time) - Cron interpreted in user's timezone:
_next_cronUTC now → user local → croniter → back to UTC - once must include timezone suffix:
_ensure_tz_suffixprovides fallback withtz_offset_hours - interval not affected by timezone: directly uses second offsets
- Old tasks without the field use
_resolve_task_tz_offset(task)falling back toget_tz_offset(user_id)(consistent with preferences default +8) - React Scheduler must include
tz_offset_hours: getTzOffset()in create/update task body
8.6 reply_to Routing
Service tasks control result delivery target via reply_to field:
json
{
"reply_to": {
"channel": "wechat | inbox | admin_chat",
"admin_id": "...",
"service_id": "...",
"conversation_id": "...",
"session_id": "wechat_user_xxx"
}
}| Channel | Delivery Target |
|---|---|
wechat | delivery.py::deliver_tool_message delivers to WeChat user |
inbox | Writes to Admin inbox |
admin_chat | Writes to Admin's regular conversation |
_run_service_agent_task in real-time intercepts send_message tool calls during agent execution loop, sends to WeChat via delivery.py (supports text + media); _deliver_reply only as fallback when agent doesn't use send_message (plain text summary).
8.7 Run Record Steps
Each run record contains steps[]:
| Step Type | Description |
|---|---|
start | Execution begins |
docs_loaded | Documents loaded |
loop | Agent loop iteration |
tool_call | Tool invocation |
tool_result | Tool result |
ai_message | AI message |
auto_approve | Auto-approval (HITL) |
wechat_warning | WeChat client unavailable warning |
wechat_error | WeChat delivery failure |
finish | Completed |
error | Error |
reply | Fallback delivery |
8.8 Task Result Persistence
After _run_agent_loop finishes, calls save_message / save_consumer_message to write conversation JSON, including:
source: "scheduled_task"or"admin_broadcast"marker- Complete
blocks[]
8.9 Sync→Async Main Loop Bridge
LangChain sync tools (like contact_admin) execute via BaseTool._arun in run_in_executor thread pool, which has no event loop. Fix:
python
# inbox.py / scheduler.py
def set_main_loop(loop: asyncio.AbstractEventLoop):
global _main_loop
_main_loop = loop
# When scheduling:
try:
loop = asyncio.get_running_loop()
loop.create_task(coro)
except RuntimeError:
if _main_loop is not None and _main_loop.is_running():
asyncio.run_coroutine_threadsafe(coro, _main_loop)8.10 Service Task Tools
Consumer agent via:
create_service_schedule_toolinjectsschedule_taskcreate_service_manage_tasks_toolinjectsmanage_scheduled_tasks(only when"scheduler"in capabilities)- Service's
manage_scheduled_taskscan only operate tasks for the current conversation_id (permission isolation)
publish_service_task (Admin tool):
service_idssupports ID and name matching (case-insensitive), returns available Service list when no matchsession_idsoptional parameter to target specific WeChat sessionsrun_nowscheduling uses_schedule_corothread-safe mode