refactor: simplify message tool suppress and inline consolidation locks
This commit is contained in:
@@ -16,7 +16,7 @@
|
|||||||
|
|
||||||
⚡️ Delivers core agent functionality in just **~4,000** lines of code — **99% smaller** than Clawdbot's 430k+ lines.
|
⚡️ Delivers core agent functionality in just **~4,000** lines of code — **99% smaller** than Clawdbot's 430k+ lines.
|
||||||
|
|
||||||
📏 Real-time line count: **3,966 lines** (run `bash core_agent_lines.sh` to verify anytime)
|
📏 Real-time line count: **3,932 lines** (run `bash core_agent_lines.sh` to verify anytime)
|
||||||
|
|
||||||
## 📢 News
|
## 📢 News
|
||||||
|
|
||||||
|
|||||||
@@ -43,6 +43,8 @@ class AgentLoop:
|
|||||||
5. Sends responses back
|
5. Sends responses back
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
_TOOL_RESULT_MAX_CHARS = 500
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
bus: MessageBus,
|
bus: MessageBus,
|
||||||
@@ -145,17 +147,10 @@ class AgentLoop:
|
|||||||
|
|
||||||
def _set_tool_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None:
|
def _set_tool_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None:
|
||||||
"""Update context for all tools that need routing info."""
|
"""Update context for all tools that need routing info."""
|
||||||
if message_tool := self.tools.get("message"):
|
for name in ("message", "spawn", "cron"):
|
||||||
if isinstance(message_tool, MessageTool):
|
if tool := self.tools.get(name):
|
||||||
message_tool.set_context(channel, chat_id, message_id)
|
if hasattr(tool, "set_context"):
|
||||||
|
tool.set_context(channel, chat_id, *([message_id] if name == "message" else []))
|
||||||
if spawn_tool := self.tools.get("spawn"):
|
|
||||||
if isinstance(spawn_tool, SpawnTool):
|
|
||||||
spawn_tool.set_context(channel, chat_id)
|
|
||||||
|
|
||||||
if cron_tool := self.tools.get("cron"):
|
|
||||||
if isinstance(cron_tool, CronTool):
|
|
||||||
cron_tool.set_context(channel, chat_id)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _strip_think(text: str | None) -> str | None:
|
def _strip_think(text: str | None) -> str | None:
|
||||||
@@ -315,18 +310,6 @@ class AgentLoop:
|
|||||||
self._running = False
|
self._running = False
|
||||||
logger.info("Agent loop stopping")
|
logger.info("Agent loop stopping")
|
||||||
|
|
||||||
def _get_consolidation_lock(self, session_key: str) -> asyncio.Lock:
|
|
||||||
lock = self._consolidation_locks.get(session_key)
|
|
||||||
if lock is None:
|
|
||||||
lock = asyncio.Lock()
|
|
||||||
self._consolidation_locks[session_key] = lock
|
|
||||||
return lock
|
|
||||||
|
|
||||||
def _prune_consolidation_lock(self, session_key: str, lock: asyncio.Lock) -> None:
|
|
||||||
"""Drop lock entry if no longer in use."""
|
|
||||||
if not lock.locked():
|
|
||||||
self._consolidation_locks.pop(session_key, None)
|
|
||||||
|
|
||||||
async def _process_message(
|
async def _process_message(
|
||||||
self,
|
self,
|
||||||
msg: InboundMessage,
|
msg: InboundMessage,
|
||||||
@@ -362,7 +345,7 @@ class AgentLoop:
|
|||||||
# Slash commands
|
# Slash commands
|
||||||
cmd = msg.content.strip().lower()
|
cmd = msg.content.strip().lower()
|
||||||
if cmd == "/new":
|
if cmd == "/new":
|
||||||
lock = self._get_consolidation_lock(session.key)
|
lock = self._consolidation_locks.setdefault(session.key, asyncio.Lock())
|
||||||
self._consolidating.add(session.key)
|
self._consolidating.add(session.key)
|
||||||
try:
|
try:
|
||||||
async with lock:
|
async with lock:
|
||||||
@@ -383,7 +366,8 @@ class AgentLoop:
|
|||||||
)
|
)
|
||||||
finally:
|
finally:
|
||||||
self._consolidating.discard(session.key)
|
self._consolidating.discard(session.key)
|
||||||
self._prune_consolidation_lock(session.key, lock)
|
if not lock.locked():
|
||||||
|
self._consolidation_locks.pop(session.key, None)
|
||||||
|
|
||||||
session.clear()
|
session.clear()
|
||||||
self.sessions.save(session)
|
self.sessions.save(session)
|
||||||
@@ -397,7 +381,7 @@ class AgentLoop:
|
|||||||
unconsolidated = len(session.messages) - session.last_consolidated
|
unconsolidated = len(session.messages) - session.last_consolidated
|
||||||
if (unconsolidated >= self.memory_window and session.key not in self._consolidating):
|
if (unconsolidated >= self.memory_window and session.key not in self._consolidating):
|
||||||
self._consolidating.add(session.key)
|
self._consolidating.add(session.key)
|
||||||
lock = self._get_consolidation_lock(session.key)
|
lock = self._consolidation_locks.setdefault(session.key, asyncio.Lock())
|
||||||
|
|
||||||
async def _consolidate_and_unlock():
|
async def _consolidate_and_unlock():
|
||||||
try:
|
try:
|
||||||
@@ -405,7 +389,8 @@ class AgentLoop:
|
|||||||
await self._consolidate_memory(session)
|
await self._consolidate_memory(session)
|
||||||
finally:
|
finally:
|
||||||
self._consolidating.discard(session.key)
|
self._consolidating.discard(session.key)
|
||||||
self._prune_consolidation_lock(session.key, lock)
|
if not lock.locked():
|
||||||
|
self._consolidation_locks.pop(session.key, None)
|
||||||
_task = asyncio.current_task()
|
_task = asyncio.current_task()
|
||||||
if _task is not None:
|
if _task is not None:
|
||||||
self._consolidation_tasks.discard(_task)
|
self._consolidation_tasks.discard(_task)
|
||||||
@@ -454,8 +439,6 @@ class AgentLoop:
|
|||||||
metadata=msg.metadata or {},
|
metadata=msg.metadata or {},
|
||||||
)
|
)
|
||||||
|
|
||||||
_TOOL_RESULT_MAX_CHARS = 500
|
|
||||||
|
|
||||||
def _save_turn(self, session: Session, messages: list[dict], skip: int) -> None:
|
def _save_turn(self, session: Session, messages: list[dict], skip: int) -> None:
|
||||||
"""Save new-turn messages into session, truncating large tool results."""
|
"""Save new-turn messages into session, truncating large tool results."""
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|||||||
@@ -812,7 +812,7 @@ class TestConsolidationDeduplicationGuard:
|
|||||||
loop.sessions.save(session)
|
loop.sessions.save(session)
|
||||||
|
|
||||||
# Ensure lock exists before /new.
|
# Ensure lock exists before /new.
|
||||||
_ = loop._get_consolidation_lock(session.key)
|
loop._consolidation_locks.setdefault(session.key, asyncio.Lock())
|
||||||
assert session.key in loop._consolidation_locks
|
assert session.key in loop._consolidation_locks
|
||||||
|
|
||||||
async def _ok_consolidate(sess, archive_all: bool = False) -> bool:
|
async def _ok_consolidate(sess, archive_all: bool = False) -> bool:
|
||||||
|
|||||||
Reference in New Issue
Block a user