Files
dsProject/dsSchoolBuddy/Start.py

349 lines
14 KiB
Python
Raw Normal View History

2025-08-19 11:56:23 +08:00
# pip install jieba
2025-08-19 07:34:39 +08:00
import json
2025-08-19 10:51:04 +08:00
import logging
2025-08-19 11:56:23 +08:00
import time
import jieba
2025-08-19 07:34:39 +08:00
import fastapi
import uvicorn
2025-08-19 10:52:13 +08:00
from fastapi import FastAPI, HTTPException
2025-08-19 07:34:39 +08:00
from openai import AsyncOpenAI
from sse_starlette import EventSourceResponse
2025-08-19 11:56:23 +08:00
import uuid
2025-08-19 07:34:39 +08:00
from Config import Config
2025-08-19 10:51:04 +08:00
from ElasticSearch.Utils.EsSearchUtil import EsSearchUtil
2025-08-19 07:34:39 +08:00
# 初始化日志
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
2025-08-19 11:56:23 +08:00
# 初始化停用词表
STOPWORDS = set(['', '', '', '', '', '', '', '', '', '', '', '', '一个', '', '', '', '', '', '', '', '', '', '', '没有', '', '', '自己', ''])
2025-08-19 07:34:39 +08:00
# 初始化异步 OpenAI 客户端
client = AsyncOpenAI(
2025-08-19 10:51:04 +08:00
api_key=Config.ALY_LLM_API_KEY,
2025-08-19 10:58:04 +08:00
base_url=Config.ALY_LLM_BASE_URL
2025-08-19 07:34:39 +08:00
)
2025-08-19 10:51:04 +08:00
# 初始化 ElasticSearch 工具
search_util = EsSearchUtil(Config.ES_CONFIG)
2025-08-19 11:56:23 +08:00
# 存储对话历史的字典键为会话ID值为对话历史列表
conversation_history = {}
2025-08-19 13:22:23 +08:00
# 存储学生信息的字典键为用户ID值为学生信息
student_info = {}
# 年级关键词词典
GRADE_KEYWORDS = {
'一年级': ['一年级', '初一'],
'二年级': ['二年级', '初二'],
'三年级': ['三年级', '初三'],
'四年级': ['四年级'],
'五年级': ['五年级'],
'六年级': ['六年级'],
'七年级': ['七年级', '初一'],
'八年级': ['八年级', '初二'],
'九年级': ['九年级', '初三'],
'高一': ['高一'],
'高二': ['高二'],
'高三': ['高三']
}
2025-08-19 11:56:23 +08:00
# 最大对话历史轮数
MAX_HISTORY_ROUNDS = 10
2025-08-19 07:34:39 +08:00
2025-08-19 13:22:23 +08:00
# 添加函数保存学生信息到ES
def save_student_info_to_es(user_id, info):
"""将学生信息保存到Elasticsearch"""
try:
# 使用用户ID作为文档ID
doc_id = f"student_{user_id}"
# 准备文档内容
doc = {
"user_id": user_id,
"info": info,
"update_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
}
# 从连接池获取连接
es_conn = search_util.es_pool.get_connection()
try:
# 确保索引存在,如果不存在则创建
es_conn.index(index="student_info", id=doc_id, document=doc)
logger.info(f"学生 {user_id} 的信息已保存到ES: {info}")
finally:
# 释放连接回连接池
search_util.es_pool.release_connection(es_conn)
except Exception as e:
logger.error(f"保存学生信息到ES失败: {str(e)}", exc_info=True)
# 添加函数从ES获取学生信息
def get_student_info_from_es(user_id):
"""从Elasticsearch获取学生信息"""
try:
doc_id = f"student_{user_id}"
# 从连接池获取连接
es_conn = search_util.es_pool.get_connection()
try:
# 确保索引存在
if es_conn.indices.exists(index="student_info"):
result = es_conn.get(index="student_info", id=doc_id)
if result and '_source' in result:
logger.info(f"从ES获取到学生 {user_id} 的信息: {result['_source']['info']}")
return result['_source']['info']
else:
logger.info(f"ES中没有找到学生 {user_id} 的信息")
else:
logger.info("student_info索引不存在")
finally:
# 释放连接回连接池
search_util.es_pool.release_connection(es_conn)
except Exception as e:
# 如果文档不存在,返回空字典
if "not_found" in str(e).lower():
logger.info(f"学生 {user_id} 的信息在ES中不存在")
return {}
logger.error(f"从ES获取学生信息失败: {str(e)}", exc_info=True)
return {}
def extract_student_info(text, user_id):
"""使用jieba分词提取学生信息"""
try:
# 提取年级信息
seg_list = jieba.cut(text, cut_all=False) # 精确模式
seg_set = set(seg_list)
# 检查是否已有学生信息如果没有则从ES加载
if user_id not in student_info:
# 从ES加载学生信息
info_from_es = get_student_info_from_es(user_id)
if info_from_es:
student_info[user_id] = info_from_es
logger.info(f"从ES加载用户 {user_id} 的信息: {info_from_es}")
else:
student_info[user_id] = {}
# 提取并更新年级信息
grade_found = False
for grade, keywords in GRADE_KEYWORDS.items():
for keyword in keywords:
if keyword in seg_set:
if 'grade' not in student_info[user_id] or student_info[user_id]['grade'] != grade:
student_info[user_id]['grade'] = grade
logger.info(f"提取到用户 {user_id} 的年级信息: {grade}")
# 保存到ES
save_student_info_to_es(user_id, student_info[user_id])
grade_found = True
break
if grade_found:
break
# 如果文本中明确提到年级,但没有匹配到关键词,尝试直接提取数字
if not grade_found:
import re
# 匹配"我是X年级"格式
match = re.search(r'我是(\d+)年级', text)
if match:
grade_num = match.group(1)
grade = f"{grade_num}年级"
if 'grade' not in student_info[user_id] or student_info[user_id]['grade'] != grade:
student_info[user_id]['grade'] = grade
logger.info(f"通过正则提取到用户 {user_id} 的年级信息: {grade}")
# 保存到ES
save_student_info_to_es(user_id, student_info[user_id])
except Exception as e:
logger.error(f"提取学生信息失败: {str(e)}", exc_info=True)
2025-08-19 10:58:04 +08:00
def get_system_prompt():
"""获取系统提示"""
return """
你是一位平易近人且教学方法灵活的教师通过引导学生自主学习来帮助他们掌握知识
严格遵循以下教学规则
2025-08-19 13:22:23 +08:00
1. 基于学生情况调整教学如果已了解学生的年级水平和知识背景应基于此调整教学内容和难度
2025-08-19 10:58:04 +08:00
2. 基于现有知识构建将新思想与学生已有的知识联系起来
3. 引导而非灌输使用问题提示和小步骤让学生自己发现答案
4. 检查和强化在讲解难点后确认学生能够重述或应用这些概念
5. 变化节奏混合讲解提问和互动活动让教学像对话而非讲座
最重要的是不要直接给出答案而是通过合作和基于学生已有知识的引导帮助学生自己找到答案
"""
2025-08-19 11:02:57 +08:00
async def lifespan(_: FastAPI):
2025-08-19 07:34:39 +08:00
yield
app = FastAPI(lifespan=lifespan)
2025-08-19 10:52:03 +08:00
@app.post("/api/chat")
2025-08-19 10:51:04 +08:00
async def chat(request: fastapi.Request):
"""
2025-08-19 11:56:23 +08:00
根据用户输入的语句查询相关历史对话
2025-08-19 10:51:04 +08:00
然后调用大模型进行回答
"""
2025-08-19 07:34:39 +08:00
try:
2025-08-19 10:51:04 +08:00
data = await request.json()
user_id = data.get('user_id', 'anonymous')
query = data.get('query', '')
2025-08-19 11:56:23 +08:00
session_id = data.get('session_id', str(uuid.uuid4())) # 获取或生成会话ID
include_history = data.get('include_history', True)
2025-08-19 10:51:04 +08:00
if not query:
raise HTTPException(status_code=400, detail="查询内容不能为空")
2025-08-19 13:22:23 +08:00
# 1. 初始化会话历史和学生信息
2025-08-19 11:56:23 +08:00
if session_id not in conversation_history:
conversation_history[session_id] = []
2025-08-19 13:22:23 +08:00
# 检查是否已有学生信息如果没有则从ES加载
if user_id not in student_info:
# 从ES加载学生信息
info_from_es = get_student_info_from_es(user_id)
if info_from_es:
student_info[user_id] = info_from_es
logger.info(f"从ES加载用户 {user_id} 的信息: {info_from_es}")
else:
student_info[user_id] = {}
# 2. 使用jieba分词提取学生信息
extract_student_info(query, user_id)
# 输出调试信息
logger.info(f"当前学生信息: {student_info.get(user_id, {})}")
# 为用户查询生成标签并存储到ES
2025-08-19 11:56:23 +08:00
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
tags = [user_id, f"time:{current_time.split()[0]}", f"session:{session_id}"]
# 提取查询中的关键词作为额外标签 - 使用jieba分词
try:
seg_list = jieba.cut(query, cut_all=False) # 精确模式
keywords = [kw for kw in seg_list if kw.strip() and kw not in STOPWORDS and len(kw) > 1]
keywords = keywords[:5]
tags.extend([f"keyword:{kw}" for kw in keywords])
logger.info(f"使用jieba分词提取的关键词: {keywords}")
except Exception as e:
logger.error(f"分词失败: {str(e)}")
keywords = query.split()[:5]
tags.extend([f"keyword:{kw}" for kw in keywords if kw.strip()])
# 存储查询到ES
try:
search_util.insert_long_text_to_es(query, tags)
logger.info(f"用户 {user_id} 的查询已存储到ES标签: {tags}")
except Exception as e:
logger.error(f"存储用户查询到ES失败: {str(e)}")
# 3. 构建对话历史上下文
history_context = ""
if include_history and session_id in conversation_history:
# 获取最近的几次对话历史
recent_history = conversation_history[session_id][-MAX_HISTORY_ROUNDS:]
if recent_history:
history_context = "\n\n以下是最近的对话历史,可供参考:\n"
for i, (user_msg, ai_msg) in enumerate(recent_history, 1):
history_context += f"[对话 {i}] 用户: {user_msg}\n"
history_context += f"[对话 {i}] 老师: {ai_msg}\n"
2025-08-19 13:22:23 +08:00
# 4. 构建学生信息上下文
student_context = ""
if user_id in student_info and student_info[user_id]:
student_context = "\n\n学生基础信息:\n"
for key, value in student_info[user_id].items():
if key == 'grade':
student_context += f"- 年级: {value}\n"
else:
student_context += f"- {key}: {value}\n"
# 5. 构建提示词
2025-08-19 10:58:04 +08:00
system_prompt = get_system_prompt()
2025-08-19 13:22:23 +08:00
# 添加学生信息到系统提示词
if user_id in student_info and student_info[user_id]:
student_info_str = "\n\n学生基础信息:\n"
for key, value in student_info[user_id].items():
if key == 'grade':
student_info_str += f"- 年级: {value}\n"
else:
student_info_str += f"- {key}: {value}\n"
system_prompt += student_info_str
# 6. 流式调用大模型生成回答
2025-08-19 10:51:04 +08:00
async def generate_response_stream():
try:
2025-08-19 11:56:23 +08:00
# 构建消息列表
2025-08-19 13:22:23 +08:00
messages = [{'role': 'system', 'content': system_prompt.strip()}]
# 添加学生信息(如果有)
if student_context:
messages.append({'role': 'user', 'content': student_context.strip()})
2025-08-19 11:56:23 +08:00
# 添加历史对话(如果有)
if history_context:
messages.append({'role': 'user', 'content': history_context.strip()})
# 添加当前问题
messages.append({'role': 'user', 'content': query})
2025-08-19 10:51:04 +08:00
stream = await client.chat.completions.create(
2025-08-19 11:15:40 +08:00
model=Config.ALY_LLM_MODEL_NAME,
2025-08-19 11:56:23 +08:00
messages=messages,
2025-08-19 10:51:04 +08:00
max_tokens=8000,
stream=True
)
# 收集完整回答用于保存
full_answer = []
async for chunk in stream:
if chunk.choices[0].delta.content:
full_answer.append(chunk.choices[0].delta.content)
yield f"data: {json.dumps({'reply': chunk.choices[0].delta.content}, ensure_ascii=False)}\n\n"
2025-08-19 11:56:23 +08:00
# 保存回答到ES和对话历史
if full_answer:
answer_text = ''.join(full_answer)
2025-08-19 13:22:23 +08:00
extract_student_info(answer_text, user_id)
2025-08-19 11:56:23 +08:00
try:
# 为回答添加标签
answer_tags = [f"{user_id}_answer", f"time:{current_time.split()[0]}", f"session:{session_id}"]
try:
seg_list = jieba.cut(answer_text, cut_all=False)
answer_keywords = [kw for kw in seg_list if kw.strip() and kw not in STOPWORDS and len(kw) > 1]
answer_keywords = answer_keywords[:5]
answer_tags.extend([f"keyword:{kw}" for kw in answer_keywords])
except Exception as e:
logger.error(f"回答分词失败: {str(e)}")
search_util.insert_long_text_to_es(answer_text, answer_tags)
logger.info(f"用户 {user_id} 的回答已存储到ES")
# 更新对话历史
conversation_history[session_id].append((query, answer_text))
# 保持历史记录不超过最大轮数
if len(conversation_history[session_id]) > MAX_HISTORY_ROUNDS:
conversation_history[session_id].pop(0)
except Exception as e:
logger.error(f"存储回答到ES失败: {str(e)}")
2025-08-19 10:51:04 +08:00
except Exception as e:
logger.error(f"大模型调用失败: {str(e)}")
yield f"data: {json.dumps({'error': f'生成回答失败: {str(e)}'})}\n\n"
return EventSourceResponse(generate_response_stream())
except HTTPException as e:
logger.error(f"聊天接口错误: {str(e.detail)}")
raise e
except Exception as e:
logger.error(f"聊天接口异常: {str(e)}")
raise HTTPException(status_code=500, detail=f"处理请求失败: {str(e)}")
2025-08-19 07:34:39 +08:00
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)