# app.py from flask import Flask, jsonify, request, send_from_directory, Response from flask_cors import CORS # 导入 CORS import os import logging import json import threading import time import requests # 导入 requests 库 from flask import g # 导入g对象,用于存储请求上下文中的数据 # 导入自定义模块 from utils import ( setup_logging, get_current_timestamp, generate_uuid, ) # 工具函数,如日志设置、时间戳、UUID生成 from config import get_config, set_config # 配置管理,用于读取和写入config.ini import file_manager # 文件管理模块,处理角色、记忆、聊天记录等文件的读写 from gemini_client import GeminiClient # 导入Gemini客户端,用于与Gemini API交互 import memory_manager # 导入记忆管理模块,负责构建Prompt和处理记忆更新逻辑 # --- Flask 应用初始化 --- # app_config 声明为全局变量,用于存储应用配置 app_config = {} # 日志配置将在 initialize_app_data 中进行 logger = logging.getLogger(__name__) # 获取app.py的日志器 app = Flask(__name__, static_folder="static") CORS(app) # 初始化 CORS,允许所有来源的跨域请求 # 全局会话状态变量 global_current_role_id: str = "default_role" global_current_memory_id: str = "default_memory" global_current_chat_log_id: str = "" global_turn_counter: int = 0 global_current_chat_history: list = [] global_memory_update_status: str = "idle" # 新增:记忆更新状态 # 初始化Gemini客户端 (全局唯一实例) gemini_client: GeminiClient | None = None # --- 辅助函数 --- def error_response(message: str, status_code: int) -> Response: """ 标准化错误响应格式。 :param message: 错误信息。 :param status_code: HTTP状态码。 :return: JSON格式的错误响应。 """ response = jsonify({"error": message}) response.status_code = status_code return response @app.before_request def load_session_context(): """ 在每个请求之前加载会话上下文到 Flask 的 g 对象。 这确保了每个请求都有独立的会话状态。 """ # 直接使用全局变量来设置 g 对象,确保 g 对象中的会话信息与全局变量保持同步 g.current_role_id = global_current_role_id g.current_memory_id = global_current_memory_id g.current_chat_log_id = global_current_chat_log_id g.turn_counter = global_turn_counter # g.turn_counter 应该反映当前的全局轮次 logger.debug( f"请求上下文加载:角色ID={g.current_role_id}, 记忆ID={g.current_memory_id}, 聊天记录ID={g.current_chat_log_id}, 轮次={g.turn_counter}" ) # --- 静态文件路由 --- @app.route("/") def index(): """根路由,返回前端的index.html文件""" # 确保 static_folder 不为 None static_folder_path = ( app.static_folder if app.static_folder is not None else "static" ) return send_from_directory(static_folder_path, "index.html") @app.route("/favicon.ico") def favicon(): """处理 favicon.ico 请求""" static_folder_path = ( app.static_folder if app.static_folder is not None else "static" ) return send_from_directory( static_folder_path, "favicon.ico", mimetype="image/vnd.microsoft.icon" ) # --- API 路由定义 --- @app.route("/api/config", methods=["GET"]) def get_app_config(): """ 获取当前应用程序的所有配置。 :return: JSON格式的配置数据。 """ current_config = get_config() # 敏感信息不直接暴露给前端,例如API KEY api_key = current_config["API"].get("GEMINI_API_KEY", "") # 根据用户要求,完整显示API Key display_config = { "API": current_config["API"], "Application": current_config["Application"], "Session": current_config["Session"], } logger.debug("获取配置:成功") return jsonify(display_config) @app.route("/api/config", methods=["POST"]) def update_app_config(): """ 更新应用程序配置。 :return: 更新后的配置信息或错误信息。 """ data = request.json if data is None: logger.warning("更新配置:请求数据为空或格式不正确。") return error_response("请求数据为空或格式不正确", 400) new_config = get_config() # 获取当前配置的副本 try: # 允许更新API和Application部分 for section_name, section_data in data.items(): if section_name == "API": for key, value in section_data.items(): if key == "GEMINI_API_KEY": new_config["API"][key] = value else: new_config["API"][key] = value elif section_name == "Application": for key, value in section_data.items(): if key in [ "CONTEXT_WINDOW_SIZE", "MEMORY_RETENTION_TURNS", "MAX_SHORT_TERM_EVENTS", ]: try: new_config["Application"][key] = int(value) except ValueError: logger.warning( f"配置项 {key} 的值 '{value}' 不是有效数字。" ) return error_response( f"配置项 {key} 的值 '{value}' 不是有效数字", 400 ) else: new_config["Application"][key] = value # Session 部分由 /api/active_session 控制,不在此处直接修改 else: logger.warning(f"尝试更新未知配置部分: {section_name}") set_config(new_config) # 保存到文件并更新内存中的全局配置 logger.info("配置更新:成功。") # 重新初始化Gemini客户端以使用新的API配置 global gemini_client api_key = new_config["API"].get("GEMINI_API_KEY") if api_key and api_key.strip(): gemini_client = GeminiClient() logger.info("Gemini客户端已重新初始化。") else: logger.warning("GEMINI_API_KEY 已被移除或为空,Gemini客户端未初始化。") gemini_client = None return jsonify({"message": "配置更新成功", "config": new_config["Application"]}) except json.JSONDecodeError: logger.error("更新配置失败: 请求JSON格式不正确。", exc_info=True) return error_response("请求JSON格式不正确。", 400) except KeyError as e: logger.error(f"更新配置失败: 缺少必要的配置键 {e}", exc_info=True) return error_response(f"缺少必要的配置键: {e}", 400) except Exception as e: logger.error(f"更新配置失败: {e}", exc_info=True) return error_response(f"更新配置失败: {str(e)}", 500) @app.route("/api/roles", methods=["GET"]) def get_roles(): """ 获取所有可用的角色列表。 :return: 角色列表JSON。 """ roles = file_manager.list_roles() logger.debug(f"获取角色列表:{len(roles)} 个角色。") return jsonify(roles) @app.route("/api/roles", methods=["POST"]) def create_new_role(): """ 创建一个新角色。 需要提供 'id' 和 'name'。 :return: 成功或失败信息。 """ data = request.json or {} # 确保 data 始终是字典 if not data: # 检查 data 否为空字典或 None return error_response("请求数据为空或格式不正确", 400) role_id = data.get("id") role_name = data.get("name") if not role_id or not role_name: logger.warning("创建角色失败: 缺少 'id' 或 'name'。") return error_response("需要提供 'id' 和 'name' 来创建角色。", 400) try: if file_manager.create_role(role_id, role_name): logger.info(f"创建角色 '{role_name}' ({role_id}) 成功。") return jsonify({"message": "角色创建成功", "role_id": role_id}) else: logger.error(f"创建角色 '{role_name}' ({role_id}) 失败,可能ID已存在。") return error_response("创建角色失败,可能ID已存在。", 409) # Conflict except Exception as e: logger.error(f"创建角色时发生异常: {e}", exc_info=True) return error_response(f"创建角色失败: {str(e)}", 500) @app.route("/api/roles/", methods=["DELETE"]) def delete_existing_role(role_id): """ 删除指定ID的角色。 :return: 成功或失败信息。 """ # 检查是否是当前活跃角色,不允许删除 if role_id == g.current_role_id: logger.warning(f"尝试删除当前活跃角色 '{role_id}'。") return error_response("不能删除当前活跃的角色。", 403) # Forbidden try: if file_manager.delete_role(role_id): logger.info(f"删除角色 '{role_id}' 成功。") return jsonify({"message": "角色删除成功", "role_id": role_id}) else: logger.error(f"删除角色 '{role_id}' 失败,可能角色不存在或文件操作失败。") return error_response("删除角色失败,可能角色不存在或文件操作失败。", 404) except Exception as e: logger.error(f"删除角色时发生异常: {e}", exc_info=True) return error_response(f"删除角色失败: {str(e)}", 500) @app.route("/api/memories", methods=["GET"]) def get_memories(): """ 获取所有可用的记忆集列表。 :return: 记忆集列表JSON。 """ memories = file_manager.list_memories() logger.debug(f"获取记忆列表:{len(memories)} 个记忆集。") return jsonify(memories) @app.route("/api/memories", methods=["POST"]) def create_new_memory(): """ 创建一个新记忆集。 需要提供 'id' 和 'name'。 :return: 成功或失败信息。 """ data = request.json or {} # 确保 data 始终是字典 if not data: # 检查 data 是否为空字典或 None return error_response("请求数据为空或格式不正确", 400) memory_id = data.get("id") memory_name = data.get("name") if not memory_id or not memory_name: logger.warning("创建记忆集失败: 缺少 'id' 或 'name'。") return error_response("需要提供 'id' 和 'name' 来创建记忆集。", 400) try: if file_manager.create_memory(memory_id, memory_name): logger.info(f"创建记忆集 '{memory_name}' ({memory_id}) 成功。") return jsonify({"message": "记忆集创建成功", "memory_id": memory_id}) else: logger.error( f"创建记忆集 '{memory_name}' ({memory_id}) 失败,可能ID已存在。" ) return error_response("创建记忆集失败,可能ID已存在。", 409) except Exception as e: logger.error(f"创建记忆集时发生异常: {e}", exc_info=True) return error_response(f"创建记忆集失败: {str(e)}", 500) @app.route("/api/memories/", methods=["DELETE"]) def delete_existing_memory(memory_id): """ 删除指定ID的记忆集。 :return: 成功或失败信息。 """ # 检查是否是当前活跃记忆集,不允许删除 if memory_id == g.current_memory_id: logger.warning(f"尝试删除当前活跃记忆集 '{memory_id}'。") return error_response("不能删除当前活跃的记忆集。", 403) # Forbidden try: if file_manager.delete_memory(memory_id): logger.info(f"删除记忆集 '{memory_id}' 成功。") return jsonify({"message": "记忆集删除成功", "memory_id": memory_id}) else: logger.error( f"删除记忆集 '{memory_id}' 失败,可能记忆集不存在或文件操作失败。" ) return error_response("删记忆集失败,可能记忆集不存在或文件操作失败。", 404) except Exception as e: logger.error(f"删除记忆集时发生异常: {e}", exc_info=True) return error_response(f"删除记忆集失败: {str(e)}", 500) @app.route("/api/active_session", methods=["GET"]) def get_active_session(): """ 获取当前活跃的会话信息,包括角色ID、记忆ID和聊天记录ID。 :return: JSON格式的会话信息。 """ global global_current_role_id, global_current_memory_id, global_current_chat_log_id, global_turn_counter logger.debug("获取当前活跃会话:成功。") return jsonify( { "role_id": global_current_role_id, "memory_id": global_current_memory_id, "chat_log_id": global_current_chat_log_id, "turn_counter": global_turn_counter, "memory_status": global_memory_update_status, # 新增:返回记忆更新状态 } ) @app.route("/api/active_session", methods=["POST"]) def set_active_session(): """ 设置当前活跃的会话(角色和记忆)。 如果提供了新的role_id或memory_id,则更新并重置聊天上下文和轮次。 :return: 更新后的会话信息。 """ global global_current_role_id, global_current_memory_id, global_current_chat_log_id, global_turn_counter, global_current_chat_history data = request.json or {} # 确保 data 始终是字典 new_role_id = data.get("role_id", global_current_role_id) new_memory_id = data.get("memory_id", global_current_memory_id) # 检查新ID是否存在 if not file_manager.role_exists(new_role_id): logger.warning(f"设置活跃会话失败: 角色ID '{new_role_id}' 不存在。") return error_response(f"角色ID '{new_role_id}' 不存在。", 404) if not file_manager.memory_exists(new_memory_id): logger.warning(f"设置活跃会话失败: 记忆ID '{new_memory_id}' 不存在。") return error_response(f"记忆ID '{new_memory_id}' 不存在。", 404) # 只有当角色或记忆发生变化时才重置会话 if ( new_role_id != global_current_role_id or new_memory_id != global_current_memory_id ): old_chat_log_id = global_current_chat_log_id # 保存旧的聊天记录ID global_current_role_id = new_role_id global_current_memory_id = new_memory_id global_current_chat_log_id = ( f"{global_current_role_id}_{global_current_memory_id}_{generate_uuid()}" ) global_turn_counter = 0 global_current_chat_history = [] # 清空聊天历史 # 更新config.ini中的当前激活会话 current_app_config = get_config() current_app_config["Session"]["CURRENT_ROLE_ID"] = global_current_role_id current_app_config["Session"]["CURRENT_MEMORY_ID"] = global_current_memory_id set_config(current_app_config) # 保存到文件 # 删除旧的聊天记录文件(如果存在且与新的不同) if old_chat_log_id and old_chat_log_id != global_current_chat_log_id: if file_manager.delete_chat_log(old_chat_log_id): logger.info(f"旧聊天记录 '{old_chat_log_id}' 已成功删除。") else: logger.warning(f"删除旧聊天记录 '{old_chat_log_id}' 失败或文件不存在。") logger.info( f"活跃会话已切换至角色 '{global_current_role_id}' 和记忆集 '{global_current_memory_id}'。新聊天记录ID: {global_current_chat_log_id}" ) else: logger.info( f"活跃会话保持不变 (角色: {global_current_role_id}, 记忆: {global_current_memory_id})。" ) return jsonify( { "message": "活跃会话已更新", "role_id": global_current_role_id, "memory_id": global_current_memory_id, "chat_log_id": global_current_chat_log_id, "turn_counter": global_turn_counter, } ) @app.route("/api/features_content", methods=["GET"]) def get_features_content(): """ 获取当前活跃角色的特征文件内容。 :return: JSON格式的特征内容。 """ features_data = file_manager.load_active_features(global_current_role_id) if features_data: logger.debug(f"获取角色 '{global_current_role_id}' 的特征内容:成功。") return jsonify(features_data) else: logger.error(f"获取角色 '{global_current_role_id}' 的特征内容失败。") return error_response("未能加载角色特征内容。", 500) @app.route("/api/features_content", methods=["POST"]) def update_features_content(): """ 更新当前活跃角色的特征文内容。 :return: 成功或失败信息。 """ data = request.json if data is None: return error_response("请求数据为空或格式不正确", 400) # 确保保存的JSON中包含"角色名称"字段,并与实际角色名称一致 # 从文件管理器获取所有角色,找到当前角色的名称 roles = file_manager.list_roles() current_role_name = next( (r["name"] for r in roles if r["id"] == global_current_role_id), global_current_role_id, ) data["角色名称"] = current_role_name if file_manager.save_active_features(global_current_role_id, data): logger.info(f"角色 '{global_current_role_id}' 的特征内容更新成功。") return jsonify({"message": "特征内容更新成功"}) else: logger.error(f"角色 '{global_current_role_id}' 的特征内容更新失败。") return error_response("特征内容更新失败。", 500) @app.route("/api/memory_content", methods=["GET"]) def get_memory_content(): """ 获取当前活跃记忆集的记忆文件内容。 :return: JSON格式的记忆内容。 """ memory_data = file_manager.load_active_memory(global_current_memory_id) if memory_data: logger.debug(f"获取记忆集 '{global_current_memory_id}' 的记忆内容:成功。") return jsonify(memory_data) else: logger.error(f"获取记忆集 '{global_current_memory_id}' 的记忆内容失败。") return error_response("未能加载记忆内容。", 500) @app.route("/api/memory_content", methods=["POST"]) def update_memory_content(): """ 更新当前活跃记忆集的记忆文件内容。 :return: 成功或失败信息。 """ data = request.json if data is None: return error_response("请求数据为空或格式不正确", 400) # 确保保存的JSON中包含'long_term_facts'和'short_term_events'字段 if "long_term_facts" not in data or "short_term_events" not in data: return error_response( "记忆内容需要包含 'long_term_facts' 和 'short_term_events' 字段。", 400 ) if file_manager.save_active_memory(global_current_memory_id, data): logger.info(f"记忆集 '{global_current_memory_id}' 的记忆内容更新成功。") return jsonify({"message": "记忆内容更新成功"}) else: logger.error(f"记忆集 '{global_current_memory_id}' 的记忆内容更新失败。") return error_response("记忆内容更新失败。", 500) @app.route("/api/chat_log", methods=["GET"]) def get_chat_log(): """ 获取当前活跃会话的聊天记录。 :param limit: 可选,限制返回的记录数量。 :return: JSON格式聊天记录列表。 """ limit = request.args.get("limit", type=int) logs = file_manager.read_chat_log(global_current_chat_log_id, limit=limit) logger.debug(f"获取聊天记录:{len(logs)} 条。") return jsonify(logs) @app.route("/api/chat_log", methods=["DELETE"]) def delete_chat_log_route(): """ 删除当前活跃会话的聊天记录文件。 :return: 成功或失败信息。 """ # 这里我们只删除当前 session 对应的聊天记录文件 if file_manager.delete_chat_log(global_current_chat_log_id): # 删除后,清空内存中的聊天历史 global global_current_chat_history, global_turn_counter global_current_chat_history = [] global_turn_counter = 0 logger.info( f"聊天记录 '{global_current_chat_log_id}' 删除成功,内存历史已清空。" ) return jsonify({"message": "聊天记录删除成功,会话历史已清空"}) else: logger.error(f"聊天记录 '{global_current_chat_log_id}' 删除失败。") return error_response("聊天记录删除失败。", 500) @app.route("/api/proxy_models", methods=["GET"]) def proxy_get_available_models(): """ 通过后端代理获取Gemini API可用的模型列表。 :return: JSON格式的模型列表。 """ current_config = get_config() gemini_api_base_url = current_config["API"].get("GEMINI_API_BASE_URL") if not gemini_api_base_url: logger.warning("GEMINI_API_BASE_URL 未配置,无法获取模型列表。") return jsonify({"models": [], "error": "GEMINI_API_BASE_URL 未配置。"}) # 确保 GEMINI_API_BASE_URL 以 /v1beta 结尾,并拼接 /models # 移除末尾的斜杠,然后添加 /v1beta/models base_url_cleaned = gemini_api_base_url.rstrip("/") if not base_url_cleaned.endswith("/v1beta"): base_url_cleaned += "/v1beta" models_url = f"{base_url_cleaned}/models" headers = {} # 如果有API Key,也传递给上游API api_key = current_config["API"].get("GEMINI_API_KEY") if api_key and api_key.strip(): headers["x-goog-api-key"] = api_key # 根据Gemini API的要求添加API Key头 try: response = requests.get(models_url, headers=headers, timeout=10) # 设置超时 response.raise_for_status() # 如果状态码不是2xx,则抛出HTTPError models_data = response.json() logger.debug("通过代理成功获取模型列表。") return jsonify(models_data) except requests.exceptions.Timeout: logger.error(f"获取模型列表超时: {models_url}") return error_response("获取模型列表超时。", 504) # Gateway Timeout except requests.exceptions.RequestException as e: logger.error(f"通过代理获取模型列表失败: {e}", exc_info=True) return error_response(f"获取模型列表失败: {str(e)}", 500) except json.JSONDecodeError: logger.error(f"无法解析模型列表响应为JSON: {response.text}", exc_info=True) return error_response("无法解析模型列表响应。", 500) @app.route("/api/logs", methods=["GET"]) def get_app_logs(): """ 获取应用程序的日志内容。 :return: 日志文件内容或错误信息。 """ log_file_path = get_config()["Application"]["APP_LOG_FILE_PATH"] try: if not os.path.exists(log_file_path): logger.warning(f"日志文件不存在: {log_file_path}") return error_response("日志文件不存在。", 404) with open(log_file_path, "r", encoding="utf-8") as f: logs = f.read() return Response(logs, mimetype="text/plain") except Exception as e: logger.error(f"读取日志文件失败: {e}", exc_info=True) return error_response(f"读取日志文件失败: {str(e)}", 500) # --- 主对话路由 --- @app.route("/api/chat", methods=["POST"]) def chat_with_gemini(): """ 与Gemini模型进行对话。 :return: AI回复和当前的会话状态。 """ global global_turn_counter, global_current_chat_history, global_current_role_id, global_current_memory_id # 检查是否请求流式响应 use_stream = request.args.get("stream", "false").lower() == "true" data = request.json if data is None: return error_response("请求数据为空或格式不正确", 400) user_message = data.get("message") if not user_message: return error_response("消息内容不能为空。", 400) # 1. 记录用户消息并更新全局聊天历史 user_entry = { "timestamp": get_current_timestamp(), "role": "user", "content": user_message, "id": f"user-{time.time()}-{generate_uuid()[:8]}", # 添加消息ID } file_manager.append_chat_log(g.current_chat_log_id, user_entry) global_current_chat_history.append( {"role": "user", "parts": [{"text": user_message}]} ) logger.info(f"用户消息已记录。当前轮次: {g.turn_counter}") # 2. 构建系统提示词 (包含特征和记忆) system_instruction = memory_manager.build_system_prompt( g.current_role_id, g.current_memory_id, g.turn_counter, # 传递当前轮次给Prompt ) # 3. 管理上下文长度:只保留最近 CONTEXT_WINDOW_SIZE * 2 条消息 (用户+AI) context_window_size = app_config["Application"]["CONTEXT_WINDOW_SIZE"] # 从全局聊天历史中获取用于API调用的部分 chat_history_for_api = global_current_chat_history[-context_window_size * 2 :] # 4. 调用 Gemini API 进行对话 if gemini_client is None: logger.error("Gemini客户端未初始化,无法进行对话。") return error_response("AI助手未初始化,请检查API配置。", 500) model_name = app_config["API"]["DEFAULT_GEMINI_MODEL"] # 创建AI回复的消息ID ai_message_id = f"assistant-{time.time()}-{generate_uuid()[:8]}" # 根据请求类型选择处理方式 if use_stream: # 流式响应处理 def generate_stream(): try: # 确保gemini_client不为None if gemini_client is None: logger.error("Gemini客户端未初始化,无法进行流式对话。") yield f"data: {json.dumps({'error': 'AI助手未初始化,请检查API配置。'})}\n\n" return # 调用流式API stream_response = gemini_client.generate_content( model_name=model_name, contents=chat_history_for_api, system_instruction=system_instruction, stream=True, # 启用流式响应 ) full_response = "" if isinstance(stream_response, tuple) and len(stream_response) == 2: # 如果返回错误信息 error_message, status_code = stream_response logger.error( f"Gemini API 流式响应返回错误: {error_message} (状态码: {status_code})" ) yield f"data: {json.dumps({'error': error_message})}\n\n" return # 处理流式响应 for chunk in stream_response: if chunk: full_response += chunk # 发送数据块 yield f"data: {json.dumps({'chunk': chunk, 'id': ai_message_id})}\n\n" # 流结束后,保存完整回复到数据库 # 获取当前请求中的聊天记录ID(避免使用g对象) current_chat_log_id = global_current_chat_log_id assistant_entry = { "timestamp": get_current_timestamp(), "role": "assistant", "content": full_response, "id": ai_message_id, } # 使用全局变量而不是g对象 file_manager.append_chat_log(current_chat_log_id, assistant_entry) global_current_chat_history.append( {"role": "assistant", "parts": [{"text": full_response}]} ) # 更新全局对话轮次 global global_turn_counter global_turn_counter += 1 # 发送结束标记 yield f"data: {json.dumps({'end': True, 'turn_counter': global_turn_counter})}\n\n" # 在另一个线程中检查并触发记忆更新,避免阻塞流式响应 def delayed_memory_update(): try: with app.app_context(): # 创建应用上下文 logger.info("在新线程中触发记忆更新") check_and_trigger_memory_update() except Exception as e: logger.error(f"延迟记忆更新失败: {e}", exc_info=True) # 启动延迟更新线程 threading.Thread(target=delayed_memory_update).start() except Exception as e: logger.error(f"流式生成内容时发生错误: {e}", exc_info=True) yield f"data: {json.dumps({'error': str(e)})}\n\n" return Response(generate_stream(), mimetype="text/event-stream") else: # 标准响应处理 try: ai_response = gemini_client.generate_content( model_name=model_name, contents=chat_history_for_api, system_instruction=system_instruction, ) # 检查 ai_response 是否为元组 (错误信息, 状态码) if isinstance(ai_response, tuple) and len(ai_response) == 2: error_message: str = ai_response[0] status_code: int = ai_response[1] logger.error( f"Gemini API 返回错误: {error_message} (状态码: {status_code})" ) return error_response(error_message, status_code) # 如果不是元组,则预期它是一个字符串 if not isinstance(ai_response, str): logger.error( f"Gemini API 返回了非字符串/元组类型结果: {type(ai_response)} - {ai_response}" ) return error_response("AI助手返回了非预期的结果类型。", 500) ai_response_text: str = ai_response except Exception as e: logger.error(f"调用Gemini API失败: {e}", exc_info=True) return error_response(f"AI助手调用失败: {str(e)}", 500) logger.info(f"Gemini API 回复: {ai_response_text[:100]}...") # 5. 记录AI回复并更新全局聊天历史 assistant_entry = { "timestamp": get_current_timestamp(), "role": "assistant", "content": ai_response_text, "id": ai_message_id, } file_manager.append_chat_log(g.current_chat_log_id, assistant_entry) global_current_chat_history.append( {"role": "assistant", "parts": [{"text": ai_response_text}]} ) # 6. 更新全局对话轮次 global global_turn_counter global_turn_counter += 1 # 7. 检查并触发记忆更新 (异步) check_and_trigger_memory_update() # 8. 返回AI回复和当前会话状态 response_data = { "success": True, "response": ai_response_text, "turn_counter": global_turn_counter, "active_session": { "role_id": g.current_role_id, "memory_id": g.current_memory_id, "chat_log_id": g.current_chat_log_id, }, "id": ai_message_id, # 返回消息ID,便于前端追踪 } # 如果触发记忆更新,则在响应中添加状态 if global_turn_counter > 0 and global_turn_counter % context_window_size == 0: response_data["memory_status"] = ( "memory_updating" # 保持与 /api/active_session 一致 ) return jsonify(response_data) def check_and_trigger_memory_update(): """检查是否需要触发记忆更新,如果需要则在后台线程中执行""" global global_turn_counter, global_current_chat_history, global_current_role_id, global_current_memory_id context_window_size = app_config["Application"]["CONTEXT_WINDOW_SIZE"] if global_turn_counter > 0 and global_turn_counter % context_window_size == 0: logger.info(f"对话轮次达到 {global_turn_counter},触发异步记忆更新。") # 使用全局变量而不是g对象 try: recent_chat_for_memory_update = global_current_chat_history[ -context_window_size * 2 : ] current_memory_data_for_update = file_manager.load_active_memory( global_current_memory_id # 使用全局变量替代g.current_memory_id ) if current_memory_data_for_update: update_thread = threading.Thread( target=_async_memory_update_task, args=( current_memory_data_for_update, recent_chat_for_memory_update, global_turn_counter, global_current_role_id, # 使用全局变量替代g.current_role_id global_current_memory_id, # 使用全局变量替代g.current_memory_id ), ) update_thread.start() logger.info( f"记忆更新线程已启动,角色ID: {global_current_role_id}, 记忆ID: {global_current_memory_id}" ) else: logger.error(f"无法加载记忆数据,记忆ID: {global_current_memory_id}") except Exception as e: logger.error(f"触发记忆更新时发生错误: {e}", exc_info=True) def _async_memory_update_task( initial_memory_data, recent_chat_history, current_trigger_turn_count, role_id: str, memory_id: str, ): """ 异步执行记忆更新任务。 :param initial_memory_data: 触发更新时的原始记忆数据。 :param recent_chat_history: 最近 N 轮对话历史。 :param current_trigger_turn_count: 触发本次更新时的全局对话轮次。 :param role_id: 当前活跃的角色ID。 :param memory_id: 当前活跃的记忆ID。 """ global global_memory_update_status # 声明使用全局变量 # 在新线程中手动创建应用上下文 with app.app_context(): logger.info( f"异步记忆更新任务开始,角色: {role_id}, 记忆: {memory_id}, 触发轮次: {current_trigger_turn_count}" ) global_memory_update_status = "updating" # 设置状态为更新中 try: # 1. 构建记忆更新的Prompt update_prompt = memory_manager.build_memory_update_prompt( recent_chat_history, initial_memory_data, current_trigger_turn_count ) # 2. 调用Gemini API进行记忆更新 if gemini_client is None: logger.error("Gemini客户端未初始化,无法执行记忆更新。请检查API配置。") return # 确保 app_config 在上下文中可用,或者重新获取 current_app_config = get_config() # 重新获取配置,确保在上下文中 update_model_name = current_app_config["API"]["MEMORY_UPDATE_MODEL"] try: # 构造一个符合Gemini API预期的请求体 # 使用单条消息,将prompt作为文本内容 gemini_response = gemini_client.generate_content( model_name=update_model_name, contents=[{"role": "user", "parts": [{"text": update_prompt}]}], system_instruction=None, # 不使用system_instruction,直接在contents中包含prompt ) # 检查 gemini_response 是否为元组 (错误信息, 状态码) if isinstance(gemini_response, tuple) and len(gemini_response) == 2: error_message: str = gemini_response[0] status_code: int = gemini_response[1] logger.error( f"Gemini模型在记忆更新时返回错误: {error_message} (状态码: {status_code})" ) return # 无法继续,直接返回 # 如果不是元组,则预期它是一个字符串 if not isinstance(gemini_response, str): logger.error( f"Gemini模型在记忆更新时返回了非字符串/元组类型结果: {type(gemini_response)} - {gemini_response}" ) return # 无法继续,直接返回 gemini_response_text: str = gemini_response except Exception as e: logger.error(f"调用Gemini API进行记忆更新失败: {e}", exc_info=True) return # 3. 处理模型返回的记忆更新JSON updated_memory_data = memory_manager.process_memory_update_response( gemini_response_text, initial_memory_data, current_trigger_turn_count, ) # 4. 保存更新后的记忆数据 if file_manager.save_active_memory(memory_id, updated_memory_data): logger.info(f"记忆集 '{memory_id}' (角色 '{role_id}') 记忆更新成功。") global_memory_update_status = "completed" # 更新成功 else: logger.error(f"记忆集 '{memory_id}' (角色 '{role_id}') 记忆保存失败。") global_memory_update_status = "error" # 保存失败 except Exception as e: logger.error(f"异步记忆更新任务发生致命错误: {e}", exc_info=True) global_memory_update_status = "error" # 发生错误 finally: # 无论成功或失败,最终都将状态设置为 idle,除非有其他更新正在进行 # 这里需要更复杂的逻辑来判断是否真的空闲,暂时简化处理 if global_memory_update_status != "updating": # 避免覆盖正在进行的更新 global_memory_update_status = "idle" # --- 主运行部分 --- # --- 应用启动前处理 --- def initialize_app_data(): """ 在Flask应用启动时执行。 用于确保必要的默认数据(如默认角色和记忆)存在,并初始化会话状态。 """ global app_config, gemini_client global global_current_role_id, global_current_memory_id, global_current_chat_log_id, global_turn_counter, global_current_chat_history # 在所有模块加载和配置读取完成后配置日志 app_config = get_config() # 确保获取到最新的配置 setup_logging(app_config["Application"]["APP_LOG_FILE_PATH"]) logger.info("应用数据初始化开始...") file_manager.ensure_initial_data() # 从 config.ini 加载初始会话状态 session_config = app_config["Session"] # 检查并设置当前角色ID和记忆ID,并更新config.ini(如果需要) configured_role_id = session_config.get("CURRENT_ROLE_ID", "default_role") configured_memory_id = session_config.get("CURRENT_MEMORY_ID", "default_memory") # 确保配置中的角色和记忆ID是有效的,如果无效则回退到默认值 if not file_manager.role_exists(configured_role_id): logger.warning( f"配置的角色ID '{configured_role_id}' 不存在,回退到 'default_role'。" ) global_current_role_id = "default_role" else: global_current_role_id = configured_role_id if not file_manager.memory_exists(configured_memory_id): logger.warning( f"配置的记忆ID '{configured_memory_id}' 不存在,回退到 'default_memory'。" ) global_current_memory_id = "default_memory" else: global_current_memory_id = configured_memory_id # 如果回退了,需要更新config.ini if ( configured_role_id != global_current_role_id or configured_memory_id != global_current_memory_id ): current_app_config = get_config() current_app_config["Session"]["CURRENT_ROLE_ID"] = global_current_role_id current_app_config["Session"]["CURRENT_MEMORY_ID"] = global_current_memory_id set_config(current_app_config) logger.info("config.ini 中的会话配置已更新为有效值。") app_config = get_config() # 重新加载配置到 app_config # 聊天记录ID:每次应用启动时生成一个新的唯一ID global_current_chat_log_id = ( f"{global_current_role_id}_{global_current_memory_id}_{generate_uuid()}" ) global_turn_counter = 0 global_current_chat_history = [] # 清空聊天历史 logger.info(f"应用启动,已创建新的聊天记录ID: {global_current_chat_log_id}。") logger.info("应用数据初始化完成。") # 初始化Gemini客户端 api_key = app_config["API"].get("GEMINI_API_KEY") if api_key and api_key.strip(): # 确保API Key不为空或只包含空格 gemini_client = GeminiClient() logger.info("Gemini客户端已初始化。") else: logger.warning("未配置或API KEY为空,Gemini客户端未初始化。") gemini_client = None @app.route("/api/memory/trigger_update", methods=["POST"]) def trigger_memory_update_route(): """ 手动触发记忆更新的API接口。 """ global global_current_role_id, global_current_memory_id, global_turn_counter, global_current_chat_history logger.info("收到手动触发记忆更新的请求。") # 获取当前记忆数据和最近的聊天历史 current_memory_data_for_update = file_manager.load_active_memory( global_current_memory_id ) # 记忆更新通常需要最近的对话历史,这里可以根据需要调整截取长度 context_window_size = app_config["Application"]["CONTEXT_WINDOW_SIZE"] recent_chat_for_memory_update = global_current_chat_history[ -context_window_size * 2 : ] if not current_memory_data_for_update: return error_response("无法加载当前记忆数据,无法触发更新。", 500) # 异步执行记忆更新任务 update_thread = threading.Thread( target=_async_memory_update_task, args=( current_memory_data_for_update, recent_chat_for_memory_update, global_turn_counter, global_current_role_id, # 传递当前角色ID global_current_memory_id, # 传递当前记忆ID ), ) update_thread.start() return jsonify({"message": "记忆更新任务已在后台触发。"}) if __name__ == "__main__": # 应用启动时初始化数据 initialize_app_data() # 运行Flask应用 # debug=True 仅用于开发环境,生产环境请勿使用 app.run(debug=True, host="0.0.0.0", port=5000)