Files
gemini_boy/app.py
2025-06-06 16:41:50 +08:00

1033 lines
43 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app.py
from flask import Flask, jsonify, request, send_from_directory, Response
from flask_cors import CORS # 导入 CORS
import os
import logging
import json
import threading
import time
import requests # 导入 requests 库
from flask import g # 导入g对象用于存储请求上下文中的数据
# 导入自定义模块
from utils import (
setup_logging,
get_current_timestamp,
generate_uuid,
) # 工具函数如日志设置、时间戳、UUID生成
from config import get_config, set_config # 配置管理用于读取和写入config.ini
import file_manager # 文件管理模块,处理角色、记忆、聊天记录等文件的读写
from gemini_client import GeminiClient # 导入Gemini客户端用于与Gemini API交互
import memory_manager # 导入记忆管理模块负责构建Prompt和处理记忆更新逻辑
# --- Flask 应用初始化 ---
# app_config 声明为全局变量,用于存储应用配置
app_config = {}
# 日志配置将在 initialize_app_data 中进行
logger = logging.getLogger(__name__) # 获取app.py的日志器
app = Flask(__name__, static_folder="static")
CORS(app) # 初始化 CORS允许所有来源的跨域请求
# 全局会话状态变量
global_current_role_id: str = "default_role"
global_current_memory_id: str = "default_memory"
global_current_chat_log_id: str = ""
global_turn_counter: int = 0
global_current_chat_history: list = []
global_memory_update_status: str = "idle" # 新增:记忆更新状态
# 初始化Gemini客户端 (全局唯一实例)
gemini_client: GeminiClient | None = None
# --- 辅助函数 ---
def error_response(message: str, status_code: int) -> Response:
"""
标准化错误响应格式。
:param message: 错误信息。
:param status_code: HTTP状态码。
:return: JSON格式的错误响应。
"""
response = jsonify({"error": message})
response.status_code = status_code
return response
@app.before_request
def load_session_context():
"""
在每个请求之前加载会话上下文到 Flask 的 g 对象。
这确保了每个请求都有独立的会话状态。
"""
# 直接使用全局变量来设置 g 对象,确保 g 对象中的会话信息与全局变量保持同步
g.current_role_id = global_current_role_id
g.current_memory_id = global_current_memory_id
g.current_chat_log_id = global_current_chat_log_id
g.turn_counter = global_turn_counter # g.turn_counter 应该反映当前的全局轮次
logger.debug(
f"请求上下文加载角色ID={g.current_role_id}, 记忆ID={g.current_memory_id}, 聊天记录ID={g.current_chat_log_id}, 轮次={g.turn_counter}"
)
# --- 静态文件路由 ---
@app.route("/")
def index():
"""根路由返回前端的index.html文件"""
# 确保 static_folder 不为 None
static_folder_path = (
app.static_folder if app.static_folder is not None else "static"
)
return send_from_directory(static_folder_path, "index.html")
@app.route("/favicon.ico")
def favicon():
"""处理 favicon.ico 请求"""
static_folder_path = (
app.static_folder if app.static_folder is not None else "static"
)
return send_from_directory(
static_folder_path, "favicon.ico", mimetype="image/vnd.microsoft.icon"
)
# --- API 路由定义 ---
@app.route("/api/config", methods=["GET"])
def get_app_config():
"""
获取当前应用程序的所有配置。
:return: JSON格式的配置数据。
"""
current_config = get_config()
# 敏感信息不直接暴露给前端例如API KEY
api_key = current_config["API"].get("GEMINI_API_KEY", "")
# 根据用户要求完整显示API Key
display_config = {
"API": current_config["API"],
"Application": current_config["Application"],
"Session": current_config["Session"],
}
logger.debug("获取配置:成功")
return jsonify(display_config)
@app.route("/api/config", methods=["POST"])
def update_app_config():
"""
更新应用程序配置。
:return: 更新后的配置信息或错误信息。
"""
data = request.json
if data is None:
logger.warning("更新配置:请求数据为空或格式不正确。")
return error_response("请求数据为空或格式不正确", 400)
new_config = get_config() # 获取当前配置的副本
try:
# 允许更新API和Application部分
for section_name, section_data in data.items():
if section_name == "API":
for key, value in section_data.items():
if key == "GEMINI_API_KEY":
new_config["API"][key] = value
else:
new_config["API"][key] = value
elif section_name == "Application":
for key, value in section_data.items():
if key in [
"CONTEXT_WINDOW_SIZE",
"MEMORY_RETENTION_TURNS",
"MAX_SHORT_TERM_EVENTS",
]:
try:
new_config["Application"][key] = int(value)
except ValueError:
logger.warning(
f"配置项 {key} 的值 '{value}' 不是有效数字。"
)
return error_response(
f"配置项 {key} 的值 '{value}' 不是有效数字", 400
)
else:
new_config["Application"][key] = value
# Session 部分由 /api/active_session 控制,不在此处直接修改
else:
logger.warning(f"尝试更新未知配置部分: {section_name}")
set_config(new_config) # 保存到文件并更新内存中的全局配置
logger.info("配置更新:成功。")
# 重新初始化Gemini客户端以使用新的API配置
global gemini_client
api_key = new_config["API"].get("GEMINI_API_KEY")
if api_key and api_key.strip():
gemini_client = GeminiClient()
logger.info("Gemini客户端已重新初始化。")
else:
logger.warning("GEMINI_API_KEY 已被移除或为空Gemini客户端未初始化。")
gemini_client = None
return jsonify({"message": "配置更新成功", "config": new_config["Application"]})
except json.JSONDecodeError:
logger.error("更新配置失败: 请求JSON格式不正确。", exc_info=True)
return error_response("请求JSON格式不正确。", 400)
except KeyError as e:
logger.error(f"更新配置失败: 缺少必要的配置键 {e}", exc_info=True)
return error_response(f"缺少必要的配置键: {e}", 400)
except Exception as e:
logger.error(f"更新配置失败: {e}", exc_info=True)
return error_response(f"更新配置失败: {str(e)}", 500)
@app.route("/api/roles", methods=["GET"])
def get_roles():
"""
获取所有可用的角色列表。
:return: 角色列表JSON。
"""
roles = file_manager.list_roles()
logger.debug(f"获取角色列表:{len(roles)} 个角色。")
return jsonify(roles)
@app.route("/api/roles", methods=["POST"])
def create_new_role():
"""
创建一个新角色。
需要提供 'id''name'
:return: 成功或失败信息。
"""
data = request.json or {} # 确保 data 始终是字典
if not data: # 检查 data 否为空字典或 None
return error_response("请求数据为空或格式不正确", 400)
role_id = data.get("id")
role_name = data.get("name")
if not role_id or not role_name:
logger.warning("创建角色失败: 缺少 'id''name'")
return error_response("需要提供 'id''name' 来创建角色。", 400)
try:
if file_manager.create_role(role_id, role_name):
logger.info(f"创建角色 '{role_name}' ({role_id}) 成功。")
return jsonify({"message": "角色创建成功", "role_id": role_id})
else:
logger.error(f"创建角色 '{role_name}' ({role_id}) 失败可能ID已存在。")
return error_response("创建角色失败可能ID已存在。", 409) # Conflict
except Exception as e:
logger.error(f"创建角色时发生异常: {e}", exc_info=True)
return error_response(f"创建角色失败: {str(e)}", 500)
@app.route("/api/roles/<string:role_id>", methods=["DELETE"])
def delete_existing_role(role_id):
"""
删除指定ID的角色。
:return: 成功或失败信息。
"""
# 检查是否是当前活跃角色,不允许删除
if role_id == g.current_role_id:
logger.warning(f"尝试删除当前活跃角色 '{role_id}'")
return error_response("不能删除当前活跃的角色。", 403) # Forbidden
try:
if file_manager.delete_role(role_id):
logger.info(f"删除角色 '{role_id}' 成功。")
return jsonify({"message": "角色删除成功", "role_id": role_id})
else:
logger.error(f"删除角色 '{role_id}' 失败,可能角色不存在或文件操作失败。")
return error_response("删除角色失败,可能角色不存在或文件操作失败。", 404)
except Exception as e:
logger.error(f"删除角色时发生异常: {e}", exc_info=True)
return error_response(f"删除角色失败: {str(e)}", 500)
@app.route("/api/memories", methods=["GET"])
def get_memories():
"""
获取所有可用的记忆集列表。
:return: 记忆集列表JSON。
"""
memories = file_manager.list_memories()
logger.debug(f"获取记忆列表:{len(memories)} 个记忆集。")
return jsonify(memories)
@app.route("/api/memories", methods=["POST"])
def create_new_memory():
"""
创建一个新记忆集。
需要提供 'id''name'
:return: 成功或失败信息。
"""
data = request.json or {} # 确保 data 始终是字典
if not data: # 检查 data 是否为空字典或 None
return error_response("请求数据为空或格式不正确", 400)
memory_id = data.get("id")
memory_name = data.get("name")
if not memory_id or not memory_name:
logger.warning("创建记忆集失败: 缺少 'id''name'")
return error_response("需要提供 'id''name' 来创建记忆集。", 400)
try:
if file_manager.create_memory(memory_id, memory_name):
logger.info(f"创建记忆集 '{memory_name}' ({memory_id}) 成功。")
return jsonify({"message": "记忆集创建成功", "memory_id": memory_id})
else:
logger.error(
f"创建记忆集 '{memory_name}' ({memory_id}) 失败可能ID已存在。"
)
return error_response("创建记忆集失败可能ID已存在。", 409)
except Exception as e:
logger.error(f"创建记忆集时发生异常: {e}", exc_info=True)
return error_response(f"创建记忆集失败: {str(e)}", 500)
@app.route("/api/memories/<string:memory_id>", methods=["DELETE"])
def delete_existing_memory(memory_id):
"""
删除指定ID的记忆集。
:return: 成功或失败信息。
"""
# 检查是否是当前活跃记忆集,不允许删除
if memory_id == g.current_memory_id:
logger.warning(f"尝试删除当前活跃记忆集 '{memory_id}'")
return error_response("不能删除当前活跃的记忆集。", 403) # Forbidden
try:
if file_manager.delete_memory(memory_id):
logger.info(f"删除记忆集 '{memory_id}' 成功。")
return jsonify({"message": "记忆集删除成功", "memory_id": memory_id})
else:
logger.error(
f"删除记忆集 '{memory_id}' 失败,可能记忆集不存在或文件操作失败。"
)
return error_response("删记忆集失败,可能记忆集不存在或文件操作失败。", 404)
except Exception as e:
logger.error(f"删除记忆集时发生异常: {e}", exc_info=True)
return error_response(f"删除记忆集失败: {str(e)}", 500)
@app.route("/api/active_session", methods=["GET"])
def get_active_session():
"""
获取当前活跃的会话信息包括角色ID、记忆ID和聊天记录ID。
:return: JSON格式的会话信息。
"""
global global_current_role_id, global_current_memory_id, global_current_chat_log_id, global_turn_counter
logger.debug("获取当前活跃会话:成功。")
return jsonify(
{
"role_id": global_current_role_id,
"memory_id": global_current_memory_id,
"chat_log_id": global_current_chat_log_id,
"turn_counter": global_turn_counter,
"memory_status": global_memory_update_status, # 新增:返回记忆更新状态
}
)
@app.route("/api/active_session", methods=["POST"])
def set_active_session():
"""
设置当前活跃的会话(角色和记忆)。
如果提供了新的role_id或memory_id则更新并重置聊天上下文和轮次。
:return: 更新后的会话信息。
"""
global global_current_role_id, global_current_memory_id, global_current_chat_log_id, global_turn_counter, global_current_chat_history
data = request.json or {} # 确保 data 始终是字典
new_role_id = data.get("role_id", global_current_role_id)
new_memory_id = data.get("memory_id", global_current_memory_id)
# 检查新ID是否存在
if not file_manager.role_exists(new_role_id):
logger.warning(f"设置活跃会话失败: 角色ID '{new_role_id}' 不存在。")
return error_response(f"角色ID '{new_role_id}' 不存在。", 404)
if not file_manager.memory_exists(new_memory_id):
logger.warning(f"设置活跃会话失败: 记忆ID '{new_memory_id}' 不存在。")
return error_response(f"记忆ID '{new_memory_id}' 不存在。", 404)
# 只有当角色或记忆发生变化时才重置会话
if (
new_role_id != global_current_role_id
or new_memory_id != global_current_memory_id
):
old_chat_log_id = global_current_chat_log_id # 保存旧的聊天记录ID
global_current_role_id = new_role_id
global_current_memory_id = new_memory_id
global_current_chat_log_id = (
f"{global_current_role_id}_{global_current_memory_id}_{generate_uuid()}"
)
global_turn_counter = 0
global_current_chat_history = [] # 清空聊天历史
# 更新config.ini中的当前激活会话
current_app_config = get_config()
current_app_config["Session"]["CURRENT_ROLE_ID"] = global_current_role_id
current_app_config["Session"]["CURRENT_MEMORY_ID"] = global_current_memory_id
set_config(current_app_config) # 保存到文件
# 删除旧的聊天记录文件(如果存在且与新的不同)
if old_chat_log_id and old_chat_log_id != global_current_chat_log_id:
if file_manager.delete_chat_log(old_chat_log_id):
logger.info(f"旧聊天记录 '{old_chat_log_id}' 已成功删除。")
else:
logger.warning(f"删除旧聊天记录 '{old_chat_log_id}' 失败或文件不存在。")
logger.info(
f"活跃会话已切换至角色 '{global_current_role_id}' 和记忆集 '{global_current_memory_id}'。新聊天记录ID: {global_current_chat_log_id}"
)
else:
logger.info(
f"活跃会话保持不变 (角色: {global_current_role_id}, 记忆: {global_current_memory_id})。"
)
return jsonify(
{
"message": "活跃会话已更新",
"role_id": global_current_role_id,
"memory_id": global_current_memory_id,
"chat_log_id": global_current_chat_log_id,
"turn_counter": global_turn_counter,
}
)
@app.route("/api/features_content", methods=["GET"])
def get_features_content():
"""
获取当前活跃角色的特征文件内容。
:return: JSON格式的特征内容。
"""
features_data = file_manager.load_active_features(global_current_role_id)
if features_data:
logger.debug(f"获取角色 '{global_current_role_id}' 的特征内容:成功。")
return jsonify(features_data)
else:
logger.error(f"获取角色 '{global_current_role_id}' 的特征内容失败。")
return error_response("未能加载角色特征内容。", 500)
@app.route("/api/features_content", methods=["POST"])
def update_features_content():
"""
更新当前活跃角色的特征文内容。
:return: 成功或失败信息。
"""
data = request.json
if data is None:
return error_response("请求数据为空或格式不正确", 400)
# 确保保存的JSON中包含"角色名称"字段,并与实际角色名称一致
# 从文件管理器获取所有角色,找到当前角色的名称
roles = file_manager.list_roles()
current_role_name = next(
(r["name"] for r in roles if r["id"] == global_current_role_id),
global_current_role_id,
)
data["角色名称"] = current_role_name
if file_manager.save_active_features(global_current_role_id, data):
logger.info(f"角色 '{global_current_role_id}' 的特征内容更新成功。")
return jsonify({"message": "特征内容更新成功"})
else:
logger.error(f"角色 '{global_current_role_id}' 的特征内容更新失败。")
return error_response("特征内容更新失败。", 500)
@app.route("/api/memory_content", methods=["GET"])
def get_memory_content():
"""
获取当前活跃记忆集的记忆文件内容。
:return: JSON格式的记忆内容。
"""
memory_data = file_manager.load_active_memory(global_current_memory_id)
if memory_data:
logger.debug(f"获取记忆集 '{global_current_memory_id}' 的记忆内容:成功。")
return jsonify(memory_data)
else:
logger.error(f"获取记忆集 '{global_current_memory_id}' 的记忆内容失败。")
return error_response("未能加载记忆内容。", 500)
@app.route("/api/memory_content", methods=["POST"])
def update_memory_content():
"""
更新当前活跃记忆集的记忆文件内容。
:return: 成功或失败信息。
"""
data = request.json
if data is None:
return error_response("请求数据为空或格式不正确", 400)
# 确保保存的JSON中包含'long_term_facts'和'short_term_events'字段
if "long_term_facts" not in data or "short_term_events" not in data:
return error_response(
"记忆内容需要包含 'long_term_facts''short_term_events' 字段。", 400
)
if file_manager.save_active_memory(global_current_memory_id, data):
logger.info(f"记忆集 '{global_current_memory_id}' 的记忆内容更新成功。")
return jsonify({"message": "记忆内容更新成功"})
else:
logger.error(f"记忆集 '{global_current_memory_id}' 的记忆内容更新失败。")
return error_response("记忆内容更新失败。", 500)
@app.route("/api/chat_log", methods=["GET"])
def get_chat_log():
"""
获取当前活跃会话的聊天记录。
:param limit: 可选,限制返回的记录数量。
:return: JSON格式聊天记录列表。
"""
limit = request.args.get("limit", type=int)
logs = file_manager.read_chat_log(global_current_chat_log_id, limit=limit)
logger.debug(f"获取聊天记录:{len(logs)} 条。")
return jsonify(logs)
@app.route("/api/chat_log", methods=["DELETE"])
def delete_chat_log_route():
"""
删除当前活跃会话的聊天记录文件。
:return: 成功或失败信息。
"""
# 这里我们只删除当前 session 对应的聊天记录文件
if file_manager.delete_chat_log(global_current_chat_log_id):
# 删除后,清空内存中的聊天历史
global global_current_chat_history, global_turn_counter
global_current_chat_history = []
global_turn_counter = 0
logger.info(
f"聊天记录 '{global_current_chat_log_id}' 删除成功,内存历史已清空。"
)
return jsonify({"message": "聊天记录删除成功,会话历史已清空"})
else:
logger.error(f"聊天记录 '{global_current_chat_log_id}' 删除失败。")
return error_response("聊天记录删除失败。", 500)
@app.route("/api/proxy_models", methods=["GET"])
def proxy_get_available_models():
"""
通过后端代理获取Gemini API可用的模型列表。
:return: JSON格式的模型列表。
"""
current_config = get_config()
gemini_api_base_url = current_config["API"].get("GEMINI_API_BASE_URL")
if not gemini_api_base_url:
logger.warning("GEMINI_API_BASE_URL 未配置,无法获取模型列表。")
return jsonify({"models": [], "error": "GEMINI_API_BASE_URL 未配置。"})
# 确保 GEMINI_API_BASE_URL 以 /v1beta 结尾,并拼接 /models
# 移除末尾的斜杠,然后添加 /v1beta/models
base_url_cleaned = gemini_api_base_url.rstrip("/")
if not base_url_cleaned.endswith("/v1beta"):
base_url_cleaned += "/v1beta"
models_url = f"{base_url_cleaned}/models"
headers = {}
# 如果有API Key也传递给上游API
api_key = current_config["API"].get("GEMINI_API_KEY")
if api_key and api_key.strip():
headers["x-goog-api-key"] = api_key # 根据Gemini API的要求添加API Key头
try:
response = requests.get(models_url, headers=headers, timeout=10) # 设置超时
response.raise_for_status() # 如果状态码不是2xx则抛出HTTPError
models_data = response.json()
logger.debug("通过代理成功获取模型列表。")
return jsonify(models_data)
except requests.exceptions.Timeout:
logger.error(f"获取模型列表超时: {models_url}")
return error_response("获取模型列表超时。", 504) # Gateway Timeout
except requests.exceptions.RequestException as e:
logger.error(f"通过代理获取模型列表失败: {e}", exc_info=True)
return error_response(f"获取模型列表失败: {str(e)}", 500)
except json.JSONDecodeError:
logger.error(f"无法解析模型列表响应为JSON: {response.text}", exc_info=True)
return error_response("无法解析模型列表响应。", 500)
@app.route("/api/logs", methods=["GET"])
def get_app_logs():
"""
获取应用程序的日志内容。
:return: 日志文件内容或错误信息。
"""
log_file_path = get_config()["Application"]["APP_LOG_FILE_PATH"]
try:
if not os.path.exists(log_file_path):
logger.warning(f"日志文件不存在: {log_file_path}")
return error_response("日志文件不存在。", 404)
with open(log_file_path, "r", encoding="utf-8") as f:
logs = f.read()
return Response(logs, mimetype="text/plain")
except Exception as e:
logger.error(f"读取日志文件失败: {e}", exc_info=True)
return error_response(f"读取日志文件失败: {str(e)}", 500)
# --- 主对话路由 ---
@app.route("/api/chat", methods=["POST"])
def chat_with_gemini():
"""
与Gemini模型进行对话。
:return: AI回复和当前的会话状态。
"""
global global_turn_counter, global_current_chat_history, global_current_role_id, global_current_memory_id
# 检查是否请求流式响应
use_stream = request.args.get("stream", "false").lower() == "true"
data = request.json
if data is None:
return error_response("请求数据为空或格式不正确", 400)
user_message = data.get("message")
if not user_message:
return error_response("消息内容不能为空。", 400)
# 1. 记录用户消息并更新全局聊天历史
user_entry = {
"timestamp": get_current_timestamp(),
"role": "user",
"content": user_message,
"id": f"user-{time.time()}-{generate_uuid()[:8]}", # 添加消息ID
}
file_manager.append_chat_log(g.current_chat_log_id, user_entry)
global_current_chat_history.append(
{"role": "user", "parts": [{"text": user_message}]}
)
logger.info(f"用户消息已记录。当前轮次: {g.turn_counter}")
# 2. 构建系统提示词 (包含特征和记忆)
system_instruction = memory_manager.build_system_prompt(
g.current_role_id,
g.current_memory_id,
g.turn_counter, # 传递当前轮次给Prompt
)
# 3. 管理上下文长度:只保留最近 CONTEXT_WINDOW_SIZE * 2 条消息 (用户+AI)
context_window_size = app_config["Application"]["CONTEXT_WINDOW_SIZE"]
# 从全局聊天历史中获取用于API调用的部分
chat_history_for_api = global_current_chat_history[-context_window_size * 2 :]
# 4. 调用 Gemini API 进行对话
if gemini_client is None:
logger.error("Gemini客户端未初始化无法进行对话。")
return error_response("AI助手未初始化请检查API配置。", 500)
model_name = app_config["API"]["DEFAULT_GEMINI_MODEL"]
# 创建AI回复的消息ID
ai_message_id = f"assistant-{time.time()}-{generate_uuid()[:8]}"
# 根据请求类型选择处理方式
if use_stream:
# 流式响应处理
def generate_stream():
try:
# 确保gemini_client不为None
if gemini_client is None:
logger.error("Gemini客户端未初始化无法进行流式对话。")
yield f"data: {json.dumps({'error': 'AI助手未初始化请检查API配置。'})}\n\n"
return
# 调用流式API
stream_response = gemini_client.generate_content(
model_name=model_name,
contents=chat_history_for_api,
system_instruction=system_instruction,
stream=True, # 启用流式响应
)
full_response = ""
if isinstance(stream_response, tuple) and len(stream_response) == 2:
# 如果返回错误信息
error_message, status_code = stream_response
logger.error(
f"Gemini API 流式响应返回错误: {error_message} (状态码: {status_code})"
)
yield f"data: {json.dumps({'error': error_message})}\n\n"
return
# 处理流式响应
for chunk in stream_response:
if chunk:
full_response += chunk
# 发送数据块
yield f"data: {json.dumps({'chunk': chunk, 'id': ai_message_id})}\n\n"
# 流结束后,保存完整回复到数据库
# 获取当前请求中的聊天记录ID(避免使用g对象)
current_chat_log_id = global_current_chat_log_id
assistant_entry = {
"timestamp": get_current_timestamp(),
"role": "assistant",
"content": full_response,
"id": ai_message_id,
}
# 使用全局变量而不是g对象
file_manager.append_chat_log(current_chat_log_id, assistant_entry)
global_current_chat_history.append(
{"role": "assistant", "parts": [{"text": full_response}]}
)
# 更新全局对话轮次
global global_turn_counter
global_turn_counter += 1
# 发送结束标记
yield f"data: {json.dumps({'end': True, 'turn_counter': global_turn_counter})}\n\n"
# 在另一个线程中检查并触发记忆更新,避免阻塞流式响应
def delayed_memory_update():
try:
with app.app_context(): # 创建应用上下文
logger.info("在新线程中触发记忆更新")
check_and_trigger_memory_update()
except Exception as e:
logger.error(f"延迟记忆更新失败: {e}", exc_info=True)
# 启动延迟更新线程
threading.Thread(target=delayed_memory_update).start()
except Exception as e:
logger.error(f"流式生成内容时发生错误: {e}", exc_info=True)
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return Response(generate_stream(), mimetype="text/event-stream")
else:
# 标准响应处理
try:
ai_response = gemini_client.generate_content(
model_name=model_name,
contents=chat_history_for_api,
system_instruction=system_instruction,
)
# 检查 ai_response 是否为元组 (错误信息, 状态码)
if isinstance(ai_response, tuple) and len(ai_response) == 2:
error_message: str = ai_response[0]
status_code: int = ai_response[1]
logger.error(
f"Gemini API 返回错误: {error_message} (状态码: {status_code})"
)
return error_response(error_message, status_code)
# 如果不是元组,则预期它是一个字符串
if not isinstance(ai_response, str):
logger.error(
f"Gemini API 返回了非字符串/元组类型结果: {type(ai_response)} - {ai_response}"
)
return error_response("AI助手返回了非预期的结果类型。", 500)
ai_response_text: str = ai_response
except Exception as e:
logger.error(f"调用Gemini API失败: {e}", exc_info=True)
return error_response(f"AI助手调用失败: {str(e)}", 500)
logger.info(f"Gemini API 回复: {ai_response_text[:100]}...")
# 5. 记录AI回复并更新全局聊天历史
assistant_entry = {
"timestamp": get_current_timestamp(),
"role": "assistant",
"content": ai_response_text,
"id": ai_message_id,
}
file_manager.append_chat_log(g.current_chat_log_id, assistant_entry)
global_current_chat_history.append(
{"role": "assistant", "parts": [{"text": ai_response_text}]}
)
# 6. 更新全局对话轮次
global global_turn_counter
global_turn_counter += 1
# 7. 检查并触发记忆更新 (异步)
check_and_trigger_memory_update()
# 8. 返回AI回复和当前会话状态
response_data = {
"success": True,
"response": ai_response_text,
"turn_counter": global_turn_counter,
"active_session": {
"role_id": g.current_role_id,
"memory_id": g.current_memory_id,
"chat_log_id": g.current_chat_log_id,
},
"id": ai_message_id, # 返回消息ID便于前端追踪
}
# 如果触发记忆更新,则在响应中添加状态
if global_turn_counter > 0 and global_turn_counter % context_window_size == 0:
response_data["memory_status"] = (
"memory_updating" # 保持与 /api/active_session 一致
)
return jsonify(response_data)
def check_and_trigger_memory_update():
"""检查是否需要触发记忆更新,如果需要则在后台线程中执行"""
global global_turn_counter, global_current_chat_history, global_current_role_id, global_current_memory_id
context_window_size = app_config["Application"]["CONTEXT_WINDOW_SIZE"]
if global_turn_counter > 0 and global_turn_counter % context_window_size == 0:
logger.info(f"对话轮次达到 {global_turn_counter},触发异步记忆更新。")
# 使用全局变量而不是g对象
try:
recent_chat_for_memory_update = global_current_chat_history[
-context_window_size * 2 :
]
current_memory_data_for_update = file_manager.load_active_memory(
global_current_memory_id # 使用全局变量替代g.current_memory_id
)
if current_memory_data_for_update:
update_thread = threading.Thread(
target=_async_memory_update_task,
args=(
current_memory_data_for_update,
recent_chat_for_memory_update,
global_turn_counter,
global_current_role_id, # 使用全局变量替代g.current_role_id
global_current_memory_id, # 使用全局变量替代g.current_memory_id
),
)
update_thread.start()
logger.info(
f"记忆更新线程已启动角色ID: {global_current_role_id}, 记忆ID: {global_current_memory_id}"
)
else:
logger.error(f"无法加载记忆数据记忆ID: {global_current_memory_id}")
except Exception as e:
logger.error(f"触发记忆更新时发生错误: {e}", exc_info=True)
def _async_memory_update_task(
initial_memory_data,
recent_chat_history,
current_trigger_turn_count,
role_id: str,
memory_id: str,
):
"""
异步执行记忆更新任务。
:param initial_memory_data: 触发更新时的原始记忆数据。
:param recent_chat_history: 最近 N 轮对话历史。
:param current_trigger_turn_count: 触发本次更新时的全局对话轮次。
:param role_id: 当前活跃的角色ID。
:param memory_id: 当前活跃的记忆ID。
"""
global global_memory_update_status # 声明使用全局变量
# 在新线程中手动创建应用上下文
with app.app_context():
logger.info(
f"异步记忆更新任务开始,角色: {role_id}, 记忆: {memory_id}, 触发轮次: {current_trigger_turn_count}"
)
global_memory_update_status = "updating" # 设置状态为更新中
try:
# 1. 构建记忆更新的Prompt
update_prompt = memory_manager.build_memory_update_prompt(
recent_chat_history, initial_memory_data, current_trigger_turn_count
)
# 2. 调用Gemini API进行记忆更新
if gemini_client is None:
logger.error("Gemini客户端未初始化无法执行记忆更新。请检查API配置。")
return
# 确保 app_config 在上下文中可用,或者重新获取
current_app_config = get_config() # 重新获取配置,确保在上下文中
update_model_name = current_app_config["API"]["MEMORY_UPDATE_MODEL"]
try:
# 构造一个符合Gemini API预期的请求体
# 使用单条消息将prompt作为文本内容
gemini_response = gemini_client.generate_content(
model_name=update_model_name,
contents=[{"role": "user", "parts": [{"text": update_prompt}]}],
system_instruction=None, # 不使用system_instruction直接在contents中包含prompt
)
# 检查 gemini_response 是否为元组 (错误信息, 状态码)
if isinstance(gemini_response, tuple) and len(gemini_response) == 2:
error_message: str = gemini_response[0]
status_code: int = gemini_response[1]
logger.error(
f"Gemini模型在记忆更新时返回错误: {error_message} (状态码: {status_code})"
)
return # 无法继续,直接返回
# 如果不是元组,则预期它是一个字符串
if not isinstance(gemini_response, str):
logger.error(
f"Gemini模型在记忆更新时返回了非字符串/元组类型结果: {type(gemini_response)} - {gemini_response}"
)
return # 无法继续,直接返回
gemini_response_text: str = gemini_response
except Exception as e:
logger.error(f"调用Gemini API进行记忆更新失败: {e}", exc_info=True)
return
# 3. 处理模型返回的记忆更新JSON
updated_memory_data = memory_manager.process_memory_update_response(
gemini_response_text,
initial_memory_data,
current_trigger_turn_count,
)
# 4. 保存更新后的记忆数据
if file_manager.save_active_memory(memory_id, updated_memory_data):
logger.info(f"记忆集 '{memory_id}' (角色 '{role_id}') 记忆更新成功。")
global_memory_update_status = "completed" # 更新成功
else:
logger.error(f"记忆集 '{memory_id}' (角色 '{role_id}') 记忆保存失败。")
global_memory_update_status = "error" # 保存失败
except Exception as e:
logger.error(f"异步记忆更新任务发生致命错误: {e}", exc_info=True)
global_memory_update_status = "error" # 发生错误
finally:
# 无论成功或失败,最终都将状态设置为 idle除非有其他更新正在进行
# 这里需要更复杂的逻辑来判断是否真的空闲,暂时简化处理
if global_memory_update_status != "updating": # 避免覆盖正在进行的更新
global_memory_update_status = "idle"
# --- 主运行部分 ---
# --- 应用启动前处理 ---
def initialize_app_data():
"""
在Flask应用启动时执行。
用于确保必要的默认数据(如默认角色和记忆)存在,并初始化会话状态。
"""
global app_config, gemini_client
global global_current_role_id, global_current_memory_id, global_current_chat_log_id, global_turn_counter, global_current_chat_history
# 在所有模块加载和配置读取完成后配置日志
app_config = get_config() # 确保获取到最新的配置
setup_logging(app_config["Application"]["APP_LOG_FILE_PATH"])
logger.info("应用数据初始化开始...")
file_manager.ensure_initial_data()
# 从 config.ini 加载初始会话状态
session_config = app_config["Session"]
# 检查并设置当前角色ID和记忆ID并更新config.ini如果需要
configured_role_id = session_config.get("CURRENT_ROLE_ID", "default_role")
configured_memory_id = session_config.get("CURRENT_MEMORY_ID", "default_memory")
# 确保配置中的角色和记忆ID是有效的如果无效则回退到默认值
if not file_manager.role_exists(configured_role_id):
logger.warning(
f"配置的角色ID '{configured_role_id}' 不存在,回退到 'default_role'"
)
global_current_role_id = "default_role"
else:
global_current_role_id = configured_role_id
if not file_manager.memory_exists(configured_memory_id):
logger.warning(
f"配置的记忆ID '{configured_memory_id}' 不存在,回退到 'default_memory'"
)
global_current_memory_id = "default_memory"
else:
global_current_memory_id = configured_memory_id
# 如果回退了需要更新config.ini
if (
configured_role_id != global_current_role_id
or configured_memory_id != global_current_memory_id
):
current_app_config = get_config()
current_app_config["Session"]["CURRENT_ROLE_ID"] = global_current_role_id
current_app_config["Session"]["CURRENT_MEMORY_ID"] = global_current_memory_id
set_config(current_app_config)
logger.info("config.ini 中的会话配置已更新为有效值。")
app_config = get_config() # 重新加载配置到 app_config
# 聊天记录ID每次应用启动时生成一个新的唯一ID
global_current_chat_log_id = (
f"{global_current_role_id}_{global_current_memory_id}_{generate_uuid()}"
)
global_turn_counter = 0
global_current_chat_history = [] # 清空聊天历史
logger.info(f"应用启动已创建新的聊天记录ID: {global_current_chat_log_id}")
logger.info("应用数据初始化完成。")
# 初始化Gemini客户端
api_key = app_config["API"].get("GEMINI_API_KEY")
if api_key and api_key.strip(): # 确保API Key不为空或只包含空格
gemini_client = GeminiClient()
logger.info("Gemini客户端已初始化。")
else:
logger.warning("未配置或API KEY为空Gemini客户端未初始化。")
gemini_client = None
@app.route("/api/memory/trigger_update", methods=["POST"])
def trigger_memory_update_route():
"""
手动触发记忆更新的API接口。
"""
global global_current_role_id, global_current_memory_id, global_turn_counter, global_current_chat_history
logger.info("收到手动触发记忆更新的请求。")
# 获取当前记忆数据和最近的聊天历史
current_memory_data_for_update = file_manager.load_active_memory(
global_current_memory_id
)
# 记忆更新通常需要最近的对话历史,这里可以根据需要调整截取长度
context_window_size = app_config["Application"]["CONTEXT_WINDOW_SIZE"]
recent_chat_for_memory_update = global_current_chat_history[
-context_window_size * 2 :
]
if not current_memory_data_for_update:
return error_response("无法加载当前记忆数据,无法触发更新。", 500)
# 异步执行记忆更新任务
update_thread = threading.Thread(
target=_async_memory_update_task,
args=(
current_memory_data_for_update,
recent_chat_for_memory_update,
global_turn_counter,
global_current_role_id, # 传递当前角色ID
global_current_memory_id, # 传递当前记忆ID
),
)
update_thread.start()
return jsonify({"message": "记忆更新任务已在后台触发。"})
if __name__ == "__main__":
# 应用启动时初始化数据
initialize_app_data()
# 运行Flask应用
# debug=True 仅用于开发环境,生产环境请勿使用
app.run(debug=True, host="0.0.0.0", port=5000)