Developer Guide · 08

Scheduler (Scheduled Tasks)

8.1 Scheduler Design

  • app/services/scheduler.py::TaskScheduler singleton, asyncio loop checks every 30s
  • Started by main.py startup, gracefully stopped on shutdown

8.2 Task Storage

TypePathPrefix
Admin tasksusers/{uid}/tasks/{task_id}.jsontask_*
Service tasksusers/{uid}/services/{svc}/tasks/{task_id}.jsonstask_*

Each task file contains the last 20 run records.

8.3 Task Types

TypeDescriptionScope
scriptExecute Python script under scripts/Admin only
agentExecute Agent task (prompt + optional document context)Admin + Service

8.4 Schedule Types

TypeDescriptionExample
onceOne-time2026-12-31T09:00:00+08:00 (include timezone suffix)
cronCron expression0 9 * * *
intervalInterval (seconds)3600

8.5 Timezone Handling

  • Each task stores tz_offset_hours field (user's timezone offset at creation time)
  • Cron interpreted in user's timezone: _next_cron UTC now → user local → croniter → back to UTC
  • once must include timezone suffix: _ensure_tz_suffix provides fallback with tz_offset_hours
  • interval not affected by timezone: directly uses second offsets
  • Old tasks without the field use _resolve_task_tz_offset(task) falling back to get_tz_offset(user_id) (consistent with preferences default +8)
  • React Scheduler must include tz_offset_hours: getTzOffset() in create/update task body

8.6 reply_to Routing

Service tasks control result delivery target via reply_to field:

json
{
  "reply_to": {
    "channel": "wechat | inbox | admin_chat",
    "admin_id": "...",
    "service_id": "...",
    "conversation_id": "...",
    "session_id": "wechat_user_xxx"
  }
}
ChannelDelivery Target
wechatdelivery.py::deliver_tool_message delivers to WeChat user
inboxWrites to Admin inbox
admin_chatWrites to Admin's regular conversation

_run_service_agent_task in real-time intercepts send_message tool calls during agent execution loop, sends to WeChat via delivery.py (supports text + media); _deliver_reply only as fallback when agent doesn't use send_message (plain text summary).

8.7 Run Record Steps

Each run record contains steps[]:

Step TypeDescription
startExecution begins
docs_loadedDocuments loaded
loopAgent loop iteration
tool_callTool invocation
tool_resultTool result
ai_messageAI message
auto_approveAuto-approval (HITL)
wechat_warningWeChat client unavailable warning
wechat_errorWeChat delivery failure
finishCompleted
errorError
replyFallback delivery

8.8 Task Result Persistence

After _run_agent_loop finishes, calls save_message / save_consumer_message to write conversation JSON, including:

  • source: "scheduled_task" or "admin_broadcast" marker
  • Complete blocks[]

8.9 Sync→Async Main Loop Bridge

LangChain sync tools (like contact_admin) execute via BaseTool._arun in run_in_executor thread pool, which has no event loop. Fix:

python
# inbox.py / scheduler.py
def set_main_loop(loop: asyncio.AbstractEventLoop):
    global _main_loop
    _main_loop = loop

# When scheduling:
try:
    loop = asyncio.get_running_loop()
    loop.create_task(coro)
except RuntimeError:
    if _main_loop is not None and _main_loop.is_running():
        asyncio.run_coroutine_threadsafe(coro, _main_loop)

8.10 Service Task Tools

Consumer agent via:

  • create_service_schedule_tool injects schedule_task
  • create_service_manage_tasks_tool injects manage_scheduled_tasks (only when "scheduler" in capabilities)
  • Service's manage_scheduled_tasks can only operate tasks for the current conversation_id (permission isolation)

publish_service_task (Admin tool):

  • service_ids supports ID and name matching (case-insensitive), returns available Service list when no match
  • session_ids optional parameter to target specific WeChat sessions
  • run_now scheduling uses _schedule_coro thread-safe mode