fix(loop): update _cumulative_tokens in _save_turn and preserve it in compression methods
This commit is contained in:
@@ -211,14 +211,14 @@ class AgentLoop:
|
|||||||
session.metadata["_compressed_until"] = compressed_until
|
session.metadata["_compressed_until"] = compressed_until
|
||||||
# 兼容旧版本:一旦迁移出连续边界,就可以清理旧字段
|
# 兼容旧版本:一旦迁移出连续边界,就可以清理旧字段
|
||||||
session.metadata.pop("_compressed_ranges", None)
|
session.metadata.pop("_compressed_ranges", None)
|
||||||
session.metadata.pop("_cumulative_tokens", None)
|
# 注意:不要删除 _cumulative_tokens,压缩逻辑需要它来跟踪累积 token 计数
|
||||||
return compressed_until
|
return compressed_until
|
||||||
|
|
||||||
def _set_compressed_until(self, session: Session, idx: int) -> None:
|
def _set_compressed_until(self, session: Session, idx: int) -> None:
|
||||||
"""Persist a contiguous compressed boundary."""
|
"""Persist a contiguous compressed boundary."""
|
||||||
session.metadata["_compressed_until"] = max(0, min(int(idx), len(session.messages)))
|
session.metadata["_compressed_until"] = max(0, min(int(idx), len(session.messages)))
|
||||||
session.metadata.pop("_compressed_ranges", None)
|
session.metadata.pop("_compressed_ranges", None)
|
||||||
session.metadata.pop("_cumulative_tokens", None)
|
# 注意:不要删除 _cumulative_tokens,压缩逻辑需要它来跟踪累积 token 计数
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _estimate_message_tokens(message: dict[str, Any]) -> int:
|
def _estimate_message_tokens(message: dict[str, Any]) -> int:
|
||||||
@@ -362,7 +362,6 @@ class AgentLoop:
|
|||||||
if len(chunk) < 2:
|
if len(chunk) < 2:
|
||||||
return
|
return
|
||||||
|
|
||||||
before_msg_count = len(session.messages)
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Compression chunk {}: msgs {}-{} (count={}, est~{}, need~{})",
|
"Compression chunk {}: msgs {}-{} (count={}, est~{}, need~{})",
|
||||||
session.key,
|
session.key,
|
||||||
@@ -384,13 +383,12 @@ class AgentLoop:
|
|||||||
self._set_compressed_until(session, end_idx)
|
self._set_compressed_until(session, end_idx)
|
||||||
self.sessions.save(session)
|
self.sessions.save(session)
|
||||||
|
|
||||||
after_msg_count = len(session.messages)
|
|
||||||
after_tokens, after_source = self._estimate_session_prompt_tokens(session)
|
after_tokens, after_source = self._estimate_session_prompt_tokens(session)
|
||||||
after_ratio = after_tokens / budget if budget else 0.0
|
after_ratio = after_tokens / budget if budget else 0.0
|
||||||
reduced = max(0, current_tokens - after_tokens)
|
reduced = max(0, current_tokens - after_tokens)
|
||||||
reduced_ratio = (reduced / current_tokens) if current_tokens > 0 else 0.0
|
reduced_ratio = (reduced / current_tokens) if current_tokens > 0 else 0.0
|
||||||
logger.info(
|
logger.info(
|
||||||
"Compression done {}: {}/{} ({:.1%}) via {}, reduced={} ({:.1%}), history: {} -> {}",
|
"Compression done {}: {}/{} ({:.1%}) via {}, reduced={} ({:.1%})",
|
||||||
session.key,
|
session.key,
|
||||||
after_tokens,
|
after_tokens,
|
||||||
budget,
|
budget,
|
||||||
@@ -398,8 +396,6 @@ class AgentLoop:
|
|||||||
after_source,
|
after_source,
|
||||||
reduced,
|
reduced,
|
||||||
reduced_ratio,
|
reduced_ratio,
|
||||||
before_msg_count,
|
|
||||||
after_msg_count,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def _schedule_background_compression(self, session_key: str) -> None:
|
def _schedule_background_compression(self, session_key: str) -> None:
|
||||||
@@ -855,14 +851,14 @@ class AgentLoop:
|
|||||||
channel=msg.channel, chat_id=msg.chat_id, content=content, metadata=meta,
|
channel=msg.channel, chat_id=msg.chat_id, content=content, metadata=meta,
|
||||||
))
|
))
|
||||||
|
|
||||||
final_content, _, all_msgs, _, _ = await self._run_agent_loop(
|
final_content, _, all_msgs, total_tokens_this_turn, token_source = await self._run_agent_loop(
|
||||||
initial_messages, on_progress=on_progress or _bus_progress,
|
initial_messages, on_progress=on_progress or _bus_progress,
|
||||||
)
|
)
|
||||||
|
|
||||||
if final_content is None:
|
if final_content is None:
|
||||||
final_content = "I've completed processing but have no response to give."
|
final_content = "I've completed processing but have no response to give."
|
||||||
|
|
||||||
self._save_turn(session, all_msgs, 1 + len(history))
|
self._save_turn(session, all_msgs, 1 + len(history), total_tokens_this_turn)
|
||||||
self.sessions.save(session)
|
self.sessions.save(session)
|
||||||
self._schedule_background_compression(session.key)
|
self._schedule_background_compression(session.key)
|
||||||
|
|
||||||
@@ -876,7 +872,7 @@ class AgentLoop:
|
|||||||
metadata=msg.metadata or {},
|
metadata=msg.metadata or {},
|
||||||
)
|
)
|
||||||
|
|
||||||
def _save_turn(self, session: Session, messages: list[dict], skip: int) -> None:
|
def _save_turn(self, session: Session, messages: list[dict], skip: int, total_tokens_this_turn: int = 0) -> None:
|
||||||
"""Save new-turn messages into session, truncating large tool results."""
|
"""Save new-turn messages into session, truncating large tool results."""
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
for m in messages[skip:]:
|
for m in messages[skip:]:
|
||||||
@@ -911,6 +907,14 @@ class AgentLoop:
|
|||||||
session.messages.append(entry)
|
session.messages.append(entry)
|
||||||
session.updated_at = datetime.now()
|
session.updated_at = datetime.now()
|
||||||
|
|
||||||
|
# Update cumulative token count for compression tracking
|
||||||
|
if total_tokens_this_turn > 0:
|
||||||
|
current_cumulative = session.metadata.get("_cumulative_tokens", 0)
|
||||||
|
if isinstance(current_cumulative, (int, float)):
|
||||||
|
session.metadata["_cumulative_tokens"] = int(current_cumulative) + total_tokens_this_turn
|
||||||
|
else:
|
||||||
|
session.metadata["_cumulative_tokens"] = total_tokens_this_turn
|
||||||
|
|
||||||
async def process_direct(
|
async def process_direct(
|
||||||
self,
|
self,
|
||||||
content: str,
|
content: str,
|
||||||
|
|||||||
Reference in New Issue
Block a user