fix(cron): isolate cron-execution guard with contextvars
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
"""Cron tool for scheduling reminders and tasks."""
|
"""Cron tool for scheduling reminders and tasks."""
|
||||||
|
|
||||||
|
from contextvars import ContextVar
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from nanobot.agent.tools.base import Tool
|
from nanobot.agent.tools.base import Tool
|
||||||
@@ -14,16 +15,20 @@ class CronTool(Tool):
|
|||||||
self._cron = cron_service
|
self._cron = cron_service
|
||||||
self._channel = ""
|
self._channel = ""
|
||||||
self._chat_id = ""
|
self._chat_id = ""
|
||||||
self._in_cron_context = False
|
self._in_cron_context: ContextVar[bool] = ContextVar("cron_in_context", default=False)
|
||||||
|
|
||||||
def set_context(self, channel: str, chat_id: str) -> None:
|
def set_context(self, channel: str, chat_id: str) -> None:
|
||||||
"""Set the current session context for delivery."""
|
"""Set the current session context for delivery."""
|
||||||
self._channel = channel
|
self._channel = channel
|
||||||
self._chat_id = chat_id
|
self._chat_id = chat_id
|
||||||
|
|
||||||
def set_cron_context(self, active: bool) -> None:
|
def set_cron_context(self, active: bool):
|
||||||
"""Mark whether the tool is executing inside a cron job callback."""
|
"""Mark whether the tool is executing inside a cron job callback."""
|
||||||
self._in_cron_context = active
|
return self._in_cron_context.set(active)
|
||||||
|
|
||||||
|
def reset_cron_context(self, token) -> None:
|
||||||
|
"""Restore previous cron context."""
|
||||||
|
self._in_cron_context.reset(token)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def name(self) -> str:
|
def name(self) -> str:
|
||||||
@@ -77,7 +82,7 @@ class CronTool(Tool):
|
|||||||
**kwargs: Any,
|
**kwargs: Any,
|
||||||
) -> str:
|
) -> str:
|
||||||
if action == "add":
|
if action == "add":
|
||||||
if self._in_cron_context:
|
if self._in_cron_context.get():
|
||||||
return "Error: cannot schedule new jobs from within a cron job execution"
|
return "Error: cannot schedule new jobs from within a cron job execution"
|
||||||
return self._add_job(message, every_seconds, cron_expr, tz, at)
|
return self._add_job(message, every_seconds, cron_expr, tz, at)
|
||||||
elif action == "list":
|
elif action == "list":
|
||||||
|
|||||||
@@ -306,8 +306,9 @@ def gateway(
|
|||||||
|
|
||||||
# Prevent the agent from scheduling new cron jobs during execution
|
# Prevent the agent from scheduling new cron jobs during execution
|
||||||
cron_tool = agent.tools.get("cron")
|
cron_tool = agent.tools.get("cron")
|
||||||
|
cron_token = None
|
||||||
if isinstance(cron_tool, CronTool):
|
if isinstance(cron_tool, CronTool):
|
||||||
cron_tool.set_cron_context(True)
|
cron_token = cron_tool.set_cron_context(True)
|
||||||
try:
|
try:
|
||||||
response = await agent.process_direct(
|
response = await agent.process_direct(
|
||||||
reminder_note,
|
reminder_note,
|
||||||
@@ -316,8 +317,8 @@ def gateway(
|
|||||||
chat_id=job.payload.to or "direct",
|
chat_id=job.payload.to or "direct",
|
||||||
)
|
)
|
||||||
finally:
|
finally:
|
||||||
if isinstance(cron_tool, CronTool):
|
if isinstance(cron_tool, CronTool) and cron_token is not None:
|
||||||
cron_tool.set_cron_context(False)
|
cron_tool.reset_cron_context(cron_token)
|
||||||
|
|
||||||
message_tool = agent.tools.get("message")
|
message_tool = agent.tools.get("message")
|
||||||
if isinstance(message_tool, MessageTool) and message_tool._sent_in_turn:
|
if isinstance(message_tool, MessageTool) and message_tool._sent_in_turn:
|
||||||
|
|||||||
Reference in New Issue
Block a user