From fbab6eb7c50eeeab2242a49328d2aade5145c955 Mon Sep 17 00:00:00 2001 From: HuangHai <10402852@qq.com> Date: Tue, 25 Mar 2025 14:51:15 +0800 Subject: [PATCH] 'commit' --- AI/WxMini/Start.py | 65 +++++++++++++- AI/WxMini/Test/TongJi.py | 67 ++++++++++++++ AI/WxMini/Utils/MySQLUtil.py | 83 ++++++++++++++++-- .../__pycache__/MySQLUtil.cpython-310.pyc | Bin 3109 -> 3090 bytes 4 files changed, 207 insertions(+), 8 deletions(-) create mode 100644 AI/WxMini/Test/TongJi.py diff --git a/AI/WxMini/Start.py b/AI/WxMini/Start.py index 9074be94..894bfe25 100644 --- a/AI/WxMini/Start.py +++ b/AI/WxMini/Start.py @@ -12,7 +12,8 @@ from WxMini.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager from WxMini.Milvus.Utils.MilvusConnectionPool import * from WxMini.Utils.OssUtil import upload_mp3_to_oss_from_memory from WxMini.Utils.TtsUtil import TTS -from WxMini.Utils.MySQLUtil import init_mysql_pool, save_chat_to_mysql, get_chat_log_by_session +from WxMini.Utils.MySQLUtil import init_mysql_pool, save_chat_to_mysql, get_chat_log_by_session, update_risk, \ + get_risk_chat_log_page from WxMini.Utils.EmbeddingUtil import text_to_embedding # 配置日志 @@ -44,6 +45,40 @@ async def lifespan(app: FastAPI): logger.info("Milvus 和 MySQL 连接池已关闭。") +# 会话结束后,调用检查方法,判断是不是有需要介入的问题出现 +async def on_session_end(session_id): + # 获取聊天记录 + result = await get_chat_log_by_session(app.state.mysql_pool, session_id, page=1, page_size=100) + + # 拼接历史聊天记录 + history = "" + for row in result['data']: + history = f"{history}\n问题:{row['user_input']}\n回答:{row['model_response']}" + + # 将历史聊天记录发给大模型,让它帮我分析一下 + prompt = ( + "我将把用户与AI大模型交流的记录发给你,帮我分析一下这个用户是否存在心理健康方面的问题," + "参考:1、PHQ-9抑郁症筛查量表和2、Beck自杀意念评量表(BSI-CV)。" + "如果没有健康问题请回复: OK;否则回复:NO,换行后再输出是什么问题。" + f"\n\n历史聊天记录:{history}" + ) + + response = await client.chat.completions.create( + model=MODEL_NAME, + messages=[ + {"role": "system", "content": "你是一个心理健康分析助手,负责分析用户的心理健康状况。"}, + {"role": "user", "content": prompt} + ], + max_tokens=1000 + ) + + # 处理分析结果 + if response.choices and response.choices[0].message.content: + analysis_result = response.choices[0].message.content.strip() + if analysis_result.startswith("NO"): + await update_risk(app.state.mysql_pool, session_id, analysis_result) + + # 初始化 FastAPI 应用 app = FastAPI(lifespan=lifespan) @@ -111,10 +146,10 @@ async def reply(session_id: str = Form(...), prompt: str = Form(...)): model=MODEL_NAME, messages=[ {"role": "system", - "content": "你是一个私人助理,负责回答用户的问题。请根据用户的历史对话和当前问题,提供准确且简洁的回答。不要提及你是通义千问或其他无关信息,也不可以回复与本次用户问题不相关的历史对话记录内容。"}, + "content": "你是一个私人助理,负责回答用户的问题。请根据用户的历史对话和当前问题,提供准确且简洁的回答。不要提及你是通义千问或其他无关信息,也不可以回复与本次用户问题不相关的历史对话记录内容,回复内容不要超过90字。"}, {"role": "user", "content": f"历史对话记录:{history_prompt},本次用户问题: {prompt}"} ], - max_tokens=500 + max_tokens=100 ), timeout=60 # 设置超时时间为 60 秒 ) @@ -165,6 +200,10 @@ async def reply(session_id: str = Form(...), prompt: str = Form(...)): await save_chat_to_mysql(app.state.mysql_pool, session_id, prompt, result, url, duration) logger.info("用户输入和大模型反馈已记录到 MySQL 数据库。") + # 调用会话检查机制 + await on_session_end(session_id) + + # 返回数据 return { "success": True, "url": url, @@ -204,6 +243,26 @@ async def get_chat_log( raise HTTPException(status_code=500, detail=f"查询聊天记录失败: {str(e)}") +@app.get("/aichat/get_risk_page") +async def get_risk_page( + risk_flag: int = Query(default=1, ge=1, description="1:有风险,0:无风险,2:有风险但已处理"), + page: int = Query(default=1, ge=1, description="当前页码"), + page_size: int = Query(default=10, ge=1, le=100, description="每页记录数") +): + """ + 查询有风险的聊天记录,并按 id 降序分页 + :param page: 当前页码 + :param page_size: 每页记录数 + :return: 分页数据 + """ + try: + result = await get_risk_chat_log_page(app.state.mysql_pool, risk_flag, page, page_size) + return result + except Exception as e: + logger.error(f"查询有风险的聊天记录失败: {str(e)}") + raise HTTPException(status_code=500, detail=f"查询有风险的聊天记录失败: {str(e)}") + + # 运行 FastAPI 应用 if __name__ == "__main__": import uvicorn diff --git a/AI/WxMini/Test/TongJi.py b/AI/WxMini/Test/TongJi.py new file mode 100644 index 00000000..628714f3 --- /dev/null +++ b/AI/WxMini/Test/TongJi.py @@ -0,0 +1,67 @@ +import asyncio +from openai import AsyncOpenAI +from WxMini.Milvus.Config.MulvusConfig import * +from WxMini.Utils.MySQLUtil import init_mysql_pool, get_chat_log_by_session + +# 初始化异步 OpenAI 客户端 +client = AsyncOpenAI( + api_key=MODEL_API_KEY, + base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", +) + +async def main(): + # 哪个人员 + session_id = 1 + # 哪一页 + page = 1 + # 一页多少个 + page_size = 100 + + # 初始化 MySQL 连接池 + mysql_pool = await init_mysql_pool() + + # 调用 + result = await get_chat_log_by_session(mysql_pool, session_id, page, page_size) + + # 我只关心 user_input 与 model_response + # 把这些拼接出问题与回答 + history = "" + for row in result['data']: # 注意:result 是一个字典,包含 'data' 字段 + user_input = row['user_input'] + model_response = row['model_response'] + history = f"{history}\n问题:{user_input}\n回答:{model_response}" + + # 将历史聊天记录发给大模型,让它帮我分析一下 + prompt = ( + "我将把用户与AI大模型交流的记录发给你,帮我分析一下这个用户是否存在心理健康方面的问题," + "参考:1、PHQ-9抑郁症筛查量表和2、Beck自杀意念评量表(BSI-CV)。" + "如果没有健康问题请回复: OK;否则回复:NO,换行后再输出是什么问题。" + f"\n\n历史聊天记录:{history}" + ) + + # 调用大模型进行分析 + try: + response = await client.chat.completions.create( + model=MODEL_NAME, + messages=[ + {"role": "system", "content": "你是一个心理健康分析助手,负责分析用户的心理健康状况。"}, + {"role": "user", "content": prompt} + ], + max_tokens=1000 + ) + # 提取大模型的回复 + if response.choices and response.choices[0].message.content: + analysis_result = response.choices[0].message.content.strip() + print("大模型分析结果:") + print(analysis_result) + else: + print("大模型未返回有效结果。") + except Exception as e: + print(f"调用大模型失败: {str(e)}") + + # 关闭连接池 + mysql_pool.close() + await mysql_pool.wait_closed() + +if __name__ == '__main__': + asyncio.run(main()) \ No newline at end of file diff --git a/AI/WxMini/Utils/MySQLUtil.py b/AI/WxMini/Utils/MySQLUtil.py index 8cb2c96d..70911e0a 100644 --- a/AI/WxMini/Utils/MySQLUtil.py +++ b/AI/WxMini/Utils/MySQLUtil.py @@ -1,4 +1,3 @@ -import asyncio import logging from aiomysql import create_pool from WxMini.Milvus.Config.MulvusConfig import * @@ -25,16 +24,17 @@ async def init_mysql_pool(): # 保存聊天记录到 MySQL -async def save_chat_to_mysql(mysql_pool, session_id, prompt, result,audio_url,duration): +async def save_chat_to_mysql(mysql_pool, session_id, prompt, result, audio_url, duration): async with mysql_pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute( "INSERT INTO t_chat_log (session_id, user_input, model_response,audio_url,duration,create_time) VALUES (%s, %s, %s, %s, %s,NOW())", - (session_id, prompt, result,audio_url,duration) + (session_id, prompt, result, audio_url, duration) ) await conn.commit() logger.info("用户输入和大模型反馈已记录到 MySQL 数据库。") + # 清空表 async def truncate_chat_log(mysql_pool): async with mysql_pool.acquire() as conn: @@ -43,8 +43,9 @@ async def truncate_chat_log(mysql_pool): await conn.commit() logger.info("表 t_chat_log 已清空。") + # 分页查询聊天记录 -async def get_chat_log_by_session(mysql_pool,session_id, page=1, page_size=10): +async def get_chat_log_by_session(mysql_pool, session_id, page=1, page_size=10): """ 根据 session_id 查询聊天记录,并按 id 降序分页 :param session_id: 用户会话 ID @@ -92,4 +93,76 @@ async def get_chat_log_by_session(mysql_pool,session_id, page=1, page_size=10): "total": total, "page": page, "page_size": page_size - } \ No newline at end of file + } + + +# 更新为危险的记录 +async def update_risk(mysql_pool, session_id, risk_memo): + async with mysql_pool.acquire() as conn: + async with conn.cursor() as cur: + # 1. 获取此人员的最后一条记录 id + await cur.execute( + "SELECT id FROM t_chat_log WHERE session_id = %s ORDER BY id DESC LIMIT 1", + (session_id,) + ) + result = await cur.fetchone() + + if result: + last_id = result[0] + # 2. 更新 risk_flag 和 risk_memo + await cur.execute( + "UPDATE t_chat_log SET risk_flag = 1, risk_memo = %s WHERE id = %s", + (risk_memo.replace('\n', '').replace("NO", ""), last_id) + ) + await conn.commit() + print(f"已更新 session_id={session_id} 的最后一条记录 (id={last_id}) 的 risk_flag 和 risk_memo。") + else: + print(f"未找到 session_id={session_id} 的记录。") + + +async def get_risk_chat_log_page(mysql_pool, risk_flag, page=1, page_size=10): + """ + 查询有风险的聊天记录,并按 id 降序分页 + :param mysql_pool: MySQL 连接池 + :param page: 当前页码 + :param page_size: 每页记录数 + :return: 分页数据 + """ + offset = (page - 1) * page_size + async with mysql_pool.acquire() as conn: + async with conn.cursor() as cur: + # 查询总记录数 + await cur.execute( + "SELECT COUNT(*) FROM t_chat_log WHERE risk_flag = %s", (risk_flag) + ) + total = (await cur.fetchone())[0] + + # 查询分页数据 + await cur.execute( + "SELECT id, session_id, user_input, model_response, audio_url, duration, create_time, risk_memo " + "FROM t_chat_log WHERE risk_flag = %s ORDER BY id DESC LIMIT %s OFFSET %s", + (risk_flag, page_size, offset) + ) + records = await cur.fetchall() + + # 将查询结果转换为字典列表 + result = [ + { + "id": record[0], + "session_id": record[1], + "user_input": record[2], + "model_response": record[3], + "audio_url": record[4], + "duration": record[5], + "create_time": record[6].strftime("%Y-%m-%d %H:%M:%S"), + "risk_memo": record[7] + } + for record in records + ] + + return { + "data": result, + "total": total, + "page": page, + "page_size": page_size + } diff --git a/AI/WxMini/Utils/__pycache__/MySQLUtil.cpython-310.pyc b/AI/WxMini/Utils/__pycache__/MySQLUtil.cpython-310.pyc index 8731b166748eead131cdbe259042f3637d08bbc3..90ecbe7d8e0ab56b4b7d4c9863ec8aa8e019027f 100644 GIT binary patch delta 278 zcmZ1~F-d|qpO=@50SG)D9;H8=$SYf40OX`Fq%h_%M1g3g9L8LxC?-Y_n>mL$gdvq7 ziv@^TQ&>`1dx7Gq3=7y&*%vZKainsla;0*o@}%;nu%)o4aLi$k;!EL7;c8)s;!ojD z;Q`VDDZD9sKw2f0 zBcs~pHrCV3yir`G#i>Q{nRx}JB~g>(IHMVTChy{0DQgJSS!4txZn5R3r4^@^6v=}` z6hMT*pF delta 300 zcmbOvu~dRLpO=@50SML$JWBsKkyp090?0{WNMX!jh=S0JQH(%7Qw~!ua}+ZpNQ^m$ zC4?cBF^d(5*-}_iSbLcmf&2yRsT>O#qc~H!Qn^!kQh8JPQrJ@1Q#j_ZNAag{rf{_| zL_?R~S8+7-crgvBWbn zs&B4iJtIawJ1I_ub{N#*5okGXhz@3n>bgp83P?$WHLFCON`NIas`(qqyFSg zT#nLKKw(X$A~uk$GKej5i#@TpGA}tZe=;lg0Y=I+j2w&t%v_92 TEKH0D$o7+ihl7=an~wtk8gW6D