# memory_manager.py import json import logging import uuid # 用于生成短期事件的唯一ID from config import get_config import file_manager # 导入文件管理模块来加载/保存features和memory logger = logging.getLogger(__name__) def build_system_prompt(current_role_id, current_memory_id, current_global_turn_count): """ 构建并返回当前会话的系统提示词。 这个提示词会包含AI的静态特征和动态记忆。 :param current_role_id: 当前激活的角色的ID。 :param current_memory_id: 当前激活的记忆集的ID。 :param current_global_turn_count: 当前对话的总轮次,用于短期记忆的“last_active_turn”参考。 :return: 组合好的系统提示词字符串。 """ try: # 1. 加载当前角色的特征数据 features_data = file_manager.load_active_features(current_role_id) config_data = get_config()["Session"] default_role_name = config_data.get("DEFAULT_ROLE_NAME", "通用AI助手") default_memory_name = config_data.get("DEFAULT_MEMORY_NAME", "通用记忆集") if not features_data: logger.warning(f"无法加载角色 '{current_role_id}' 的特征,将使用默认特征。") features_data = file_manager.DEFAULT_FEATURE_CONTENT.copy() features_data["角色名称"] = default_role_name # 2. 加载当前记忆集的记忆数据 memory_data = file_manager.load_active_memory(current_memory_id) if not memory_data: logger.warning( f"无法加载记忆集 '{current_memory_id}' 的记忆,将使用默认空记忆。" ) memory_data = file_manager.DEFAULT_MEMORY_CONTENT.copy() memory_data["name"] = default_memory_name if memory_data["long_term_facts"]: memory_data["long_term_facts"][ 0 ] = f"此记忆集名为 '{default_memory_name}',目前未包含特定用户或AI的长期事实。" else: memory_data["long_term_facts"].append( f"此记忆集名为 '{default_memory_name}',目前未包含特定用户或AI的长期事实。" ) # 将 Python 对象转换为美化后的 JSON 字符串,确保中文不被转义 features_json_str = json.dumps(features_data, ensure_ascii=False, indent=2) memory_json_str = json.dumps(memory_data, ensure_ascii=False, indent=2) # 组合最终的系统提示词 # 增加一些说明性文字,帮助模型更好理解传入的JSON结构 system_prompt = ( f"以下是你的语言和思维特征,请你严格遵循,并基于以下记忆内容回答问题。\n\n" f"--- 语言和思维特征 (Features) ---\n" f"{features_json_str}\n\n" f"--- 当前记忆 (Memory) ---\n" f"以下是关于你的长期记忆和短期事件的最新整理,请你充分利用这些信息来理解用户意图、提供个性化回复和维持对话连贯性。\n" f"当前总对话轮次: {current_global_turn_count}\n" # 传递当前轮次给模型,作为last_active_turn的参考 f"{memory_json_str}\n\n" f"请记,你的所有回答都应体现并遵循上述特征和记忆。你是一个名为“{features_data.get('角色名称', default_role_name)}”的智能体。始终以专业、清晰、友好的态度进行交流,并优先利用现有记忆来提供更个性化和连贯的回答。" ) logger.debug("系统提示词构建成功。") return system_prompt except Exception as e: logger.error(f"构建系统提示词时发生错误: {e}", exc_info=True) # 降级为通用提示词,并从config中获取默认角色名称 config_data = get_config()["Session"] default_role_name = config_data.get("DEFAULT_ROLE_NAME", "通用AI助手") return f"你是一个名为“{default_role_name}”的AI助手,旨在提供清晰、准确、友好的回答。" def build_memory_update_prompt( recent_chat_history_list, current_memory_data, current_global_turn_count ): """ 构建用于记忆更新的Prompt (V3)。 :param recent_chat_history_list: 最近N轮对话的列表 (Python list of dicts)。 :param current_memory_data: 当前memory.json内容的字典 (Python dict)。 :param current_global_turn_count: 当前全局对话的总轮次。 :return: 记忆更新指令字符串。 """ config = get_config()["Application"] memory_retention_turns = config["MEMORY_RETENTION_TURNS"] # 将Python对象转换为JSON字符串,用于嵌入Prompt recent_chat_history_json_str = json.dumps( recent_chat_history_list, ensure_ascii=False, indent=2 ) current_memory_json_str = json.dumps( current_memory_data, ensure_ascii=False, indent=2 ) prompt = ( "你是一个高级记忆整合与管理系统。你的任务是根据提供的现有记忆和最近的对话," "分析其中的关键信息、用户偏好、重要事件、对话中的新概念以及话题状态。\n" "请你严格按照以下JSON格式和更新规则,输出更新后的长短期记忆。\n\n" "**当前全局对话轮次:** {current_global_turn_count}\n" "**短期记忆保留阈值:** 超过 {memory_retention_turns} 轮未活跃的短期事件应被考虑移除或晋升。\n\n" "**更新规则:**\n" "1. **长期事实 (long_term_facts):**\n" " - 包含所有对用户或AI角色长期重要、不变、或已确认的关键信息和事实(例如用户名字、长期兴趣、职业、已完成的重要事项等)。\n" " - **晋升机制:** 从最近对话和短期事件中识别出新的、应晋升为长期记忆的关键事实。判断标准包括:信息被重复提及、用户明确确认、对用户身份或偏好有根本性影响、或事件已圆满结束且其核心结果是永久性的。请将其添加到此列表中。\n" " - 如果现有长期事实有新的细节补充或需要修正,请更新其内容。\n" " - 请确保每条事实简洁明了,通常为一句话或一个关键事实片段。避免冗余。\n" "2. **短期事件 (short_term_events):**\n" " - 这是一个列表,每个元素代表一个正在进行、最近发生、或仍在讨论中的短期事件或话题。\n" " - **字段要求:** 每个短期事件对象必须包含以下字段:\n" " - `id`: (字符串) 唯一标识符。如果是一个新事件,请生成一个简短且具有描述性的ID (例如 'AI_ethics_discussion', 'phone_recommendation'),要求英文小写,下划线分隔,最大长度30字符。如果模型没有生成ID,或ID不符合格式,后端会为其生成一个UUID。\n" " - `topic`: (字符串) 事件或话题的核心内容,简洁概括,最多20字。\n" " - `summary`: (字符串) 对事件的开始、发展和当前状态(结果)的清晰、简洁描述。请涵盖关键的细节和进展。总结应不超过100字。\n" " - `status`: (字符串) 事件的当前状态,请选择以下之一:'进行中', '待跟进', '已解决', '已结束'。\n" " - `last_active_turn`: (整数) 最近一次该事件或话题被讨论或有实质进展的全局对话轮次编号(基于`current_global_turn_count`更新)。\n" " - `mentions_count`: (整数) 该事件或话题被明确提及或有实质进展的累积次数。新事件从1开始。\n" " - **更新逻辑:**\n" " - **新增:** 从最近对话中识别出新的、重要的短期事件,并添加到列表中。为其生成一个临时的、描述性的`id`,`last_active_turn`设为当前全局轮次,`mentions_count`设为1。\n" " - **更新:** 对于已存在的短期事件,根据最近对话更新其`summary`, `status`。如果它在最近对话中被提及,请更新其`last_active_turn`为当前全局轮次,并增加`mentions_count`。\n" " - **遗忘/删除:** 如果短期事件中的信息已完全整合进长期事实,或该事件明确已解决(`status`为'已解决'或'已结束')且无后续,或者其`last_active_turn`距离`current_global_turn_count`过远(超过`memory_retention_turns`),请将其从`short_term_events`列表中移除。\n" " - **避免重复:** 确保`long_term_facts`和`short_term_events`中的信息不冗余。短期事件应侧重于动态和未完成部分,长期事实则侧重于稳定和核心信息。\n" " - **保持精简:** `short_term_events`列表的长度应控制在合理范围,优先保留最重要的事件。可以删除重要性较低的已结束或不活跃事件。\n\n" "--- 现有记忆 (Current Memory JSON) ---\n" f"{current_memory_json_str}\n\n" "--- 最近对话 (Recent Chat History JSON - 格式: [{'role': '...', 'parts': [{'text': '...'}]}]) ---\n" f"{recent_chat_history_json_str}\n\n" "请现在根据上述信息和规则,输出一个仅包含 'long_term_facts' 和 'short_term_events' 两个键的JSON对象。请严格遵循JSON格式,不要包含其他任何文本或解释。" "```json\n" "{\n" ' "long_term_facts": [\n' ' "用户名是张三。",\n' ' "张三对科幻电影感兴趣,喜欢《银翼杀手》。"\n' " ],\n" ' "short_term_events": [\n' " {\n" ' "id": "phone_recommendation",\n' ' "topic": "新手机推荐需求收集",\n' ' "summary": "用户对安卓手机有偏好,预算4000-6000元,看重拍照和续航。",\n' ' "status": "待跟进",\n' ' "last_active_turn": 10,\n' ' "mentions_count": 3\n' " },\n" " {\n" ' "id": "weekend_plan_discussion",\n' ' "topic": "周末活动计划",\n' ' "summary": "用户提到周末想去公园散步,但天气预报有雨,正在考虑备选方案。",\n' ' "status": "进行中",\n' ' "last_active_turn": 12,\n' ' "mentions_count": 1\n' " }\n" " ]\n" "}\n" "```\n" ) logger.debug("记忆更新提示词构建成功。") return prompt def process_memory_update_response( model_response_text, current_memory_data, current_global_turn_count ): """ 处理Gemini模型返回的记忆更新JSON字符串,并进行后端逻辑修正和合并。 :param model_response_text: Gemini模型返回的JSON字符串。 :param current_memory_data: 当前内存中的记忆数据 (Python dict)。 :param current_global_turn_count: 当前全局对话轮次。 :return: 更新后的记忆数据 (Python dict)。 """ config = get_config()["Application"] memory_retention_turns = config["MEMORY_RETENTION_TURNS"] max_short_term_events = config["MAX_SHORT_TERM_EVENTS"] updated_memory = current_memory_data.copy() # 复制当前记忆作为基准 try: # 尝试解析模型返回的JSON # 有时模型会返回Markdown代码块,需要先提取 if model_response_text.strip().startswith("```json"): model_response_text = model_response_text.strip()[ 7:-3 ].strip() # 移除 ```json 和 ``` parsed_response = json.loads(model_response_text) logger.debug(f"模型返回的记忆更新JSON已解析:{parsed_response}") # 1. 更新长期事实 (long_term_facts) if "long_term_facts" in parsed_response and isinstance( parsed_response["long_term_facts"], list ): updated_memory["long_term_facts"] = list( set(parsed_response["long_term_facts"]) ) # 去重 logger.info( f"长期事实已更新,共 {len(updated_memory['long_term_facts'])} 条。" ) else: logger.warning("模型返回的长期事实格式不正确或缺失,保留原有长期事实。") # 2. 合并和处理短期事件 (short_term_events) model_short_term_events = parsed_response.get("short_term_events", []) if not isinstance(model_short_term_events, list): logger.warning("模型返回的短期事件格式不正确,将忽略。") model_short_term_events = [] existing_events_map = { event["id"]: event for event in current_memory_data.get("short_term_events", []) if "id" in event } new_short_term_events_list = [] # 遍历模型返回的短期事件 for event in model_short_term_events: # 确保事件包含必要字段 if not all( k in event for k in [ "topic", "summary", "status", "last_active_turn", "mentions_count", ] ): logger.warning(f"模型返回的短期事件缺少必要字段,跳过: {event}") continue event_id = event.get("id") is_new_event = False final_event_id = None # 尝试使用模型提供的ID if event_id and isinstance(event_id, str): # 验证ID格式:英文小写,下划线分隔,最大长度30字符 if ( len(event_id) <= 30 and all(c.isalnum() or c == "_" for c in event_id) and event_id.islower() ): if event_id not in existing_events_map: # 模型提供了有效且不冲突的ID final_event_id = event_id is_new_event = True logger.debug(f"使用模型建议的有效新ID: {event_id}") else: # ID冲突,这是现有事件的更新 final_event_id = event_id logger.debug( f"更新现有短期事件: {event.get('topic', '未知')} (ID: {event_id})" ) else: logger.warning( f"模型建议的短期事件ID '{event_id}' 格式不正确,将尝试生成新ID。" ) # 如果模型ID无效或冲突,尝试基于topic生成 if final_event_id is None and event.get("topic"): temp_id = "".join( c if c.isalnum() else "_" for c in event["topic"] ).lower() temp_id = temp_id.strip("_") if len(temp_id) > 30: temp_id = temp_id[:30] # 确保生成的ID是唯一的 if temp_id and temp_id not in existing_events_map: final_event_id = temp_id is_new_event = True logger.debug(f"基于topic生成描述性新ID: {final_event_id}") else: logger.warning( f"基于topic '{event['topic']}' 生成的ID '{temp_id}' 已存在或无效,将生成UUID。" ) # 如果所有尝试都失败,生成UUID if final_event_id is None: final_event_id = str(uuid.uuid4()) is_new_event = True logger.debug(f"无法生成描述性ID,生成新UUID: {final_event_id}") event["id"] = final_event_id if is_new_event: # 确保新事件的活跃轮次和提及次数正确初始化 event["last_active_turn"] = current_global_turn_count event["mentions_count"] = 1 logger.info(f"发现新短期事件: {event['topic']} (ID: {event['id']})") else: # 现有事件:更新其活跃轮次和提及次数 event["last_active_turn"] = current_global_turn_count # 从 existing_events_map 中获取旧的 mentions_count old_event = existing_events_map.get(event["id"]) if old_event and "mentions_count" in old_event: event["mentions_count"] = old_event["mentions_count"] + 1 else: event["mentions_count"] = ( 1 # 如果旧事件没有 mentions_count,则初始化为1 ) logger.debug(f"更新现有短期事件: {event['topic']} (ID: {event['id']})") new_short_term_events_list.append(event) # 从现有事件映射中移除,以便后续处理未被模型提及的旧事件 if final_event_id in existing_events_map: del existing_events_map[final_event_id] # 将模型未提及但仍然活跃的旧事件添加回来 (后端辅助的遗忘机制) for old_event_id, old_event in existing_events_map.items(): if not all( k in old_event for k in [ "topic", "summary", "status", "last_active_turn", "mentions_count", ] ): logger.warning(f"跳过损坏的旧短期事件: {old_event}") continue # 判断是否需要遗忘 if current_global_turn_count - old_event[ "last_active_turn" ] > memory_retention_turns and old_event["status"] in ["已解决", "已结束"]: logger.info( f"遗忘旧短期事件 (不活跃且已解决/结束): {old_event['topic']} (ID: {old_event['id']})" ) continue # 不添加到新列表中,实现遗忘 new_short_term_events_list.append(old_event) logger.debug( f"保留旧短期事件 (仍活跃或未解决): {old_event['topic']} (ID: {old_event['id']})" ) # 强制截断短期事件列表,确保不超过最大数量 if len(new_short_term_events_list) > max_short_term_events: # 优先保留最新的和提及次数多的 new_short_term_events_list.sort( key=lambda x: ( x.get("last_active_turn", 0), x.get("mentions_count", 0), ), reverse=True, ) removed_count = len(new_short_term_events_list) - max_short_term_events logger.warning( f"短期事件列表超出 {max_short_term_events} 个,移除 {removed_count} 个最不活跃的事件。" ) new_short_term_events_list = new_short_term_events_list[ :max_short_term_events ] updated_memory["short_term_events"] = new_short_term_events_list logger.info( f"短期事件已处理,共 {len(updated_memory['short_term_events'])} 条。" ) except json.JSONDecodeError as e: logger.error( f"模型返回的记忆更新响应不是有效JSON,无法解析: {model_response_text[:200]}... - 错误: {e}" ) # 返回原始记忆,不进行更新 return current_memory_data except Exception as e: logger.error(f"处理记忆更新响应时发生未知错误: {e}", exc_info=True) # 返回原始记忆,不进行更新 return current_memory_data return updated_memory