main
HuangHai 4 months ago
parent f8f2be7a27
commit fbab6eb7c5

@ -12,7 +12,8 @@ from WxMini.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager
from WxMini.Milvus.Utils.MilvusConnectionPool import *
from WxMini.Utils.OssUtil import upload_mp3_to_oss_from_memory
from WxMini.Utils.TtsUtil import TTS
from WxMini.Utils.MySQLUtil import init_mysql_pool, save_chat_to_mysql, get_chat_log_by_session
from WxMini.Utils.MySQLUtil import init_mysql_pool, save_chat_to_mysql, get_chat_log_by_session, update_risk, \
get_risk_chat_log_page
from WxMini.Utils.EmbeddingUtil import text_to_embedding
# 配置日志
@ -44,6 +45,40 @@ async def lifespan(app: FastAPI):
logger.info("Milvus 和 MySQL 连接池已关闭。")
# 会话结束后,调用检查方法,判断是不是有需要介入的问题出现
async def on_session_end(session_id):
# 获取聊天记录
result = await get_chat_log_by_session(app.state.mysql_pool, session_id, page=1, page_size=100)
# 拼接历史聊天记录
history = ""
for row in result['data']:
history = f"{history}\n问题:{row['user_input']}\n回答:{row['model_response']}"
# 将历史聊天记录发给大模型,让它帮我分析一下
prompt = (
"我将把用户与AI大模型交流的记录发给你帮我分析一下这个用户是否存在心理健康方面的问题"
"参考1、PHQ-9抑郁症筛查量表和2、Beck自杀意念评量表BSI-CV"
"如果没有健康问题请回复: OK否则回复NO换行后再输出是什么问题。"
f"\n\n历史聊天记录:{history}"
)
response = await client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个心理健康分析助手,负责分析用户的心理健康状况。"},
{"role": "user", "content": prompt}
],
max_tokens=1000
)
# 处理分析结果
if response.choices and response.choices[0].message.content:
analysis_result = response.choices[0].message.content.strip()
if analysis_result.startswith("NO"):
await update_risk(app.state.mysql_pool, session_id, analysis_result)
# 初始化 FastAPI 应用
app = FastAPI(lifespan=lifespan)
@ -111,10 +146,10 @@ async def reply(session_id: str = Form(...), prompt: str = Form(...)):
model=MODEL_NAME,
messages=[
{"role": "system",
"content": "你是一个私人助理,负责回答用户的问题。请根据用户的历史对话和当前问题,提供准确且简洁的回答。不要提及你是通义千问或其他无关信息,也不可以回复与本次用户问题不相关的历史对话记录内容"},
"content": "你是一个私人助理,负责回答用户的问题。请根据用户的历史对话和当前问题,提供准确且简洁的回答。不要提及你是通义千问或其他无关信息,也不可以回复与本次用户问题不相关的历史对话记录内容,回复内容不要超过90字"},
{"role": "user", "content": f"历史对话记录:{history_prompt},本次用户问题: {prompt}"}
],
max_tokens=500
max_tokens=100
),
timeout=60 # 设置超时时间为 60 秒
)
@ -165,6 +200,10 @@ async def reply(session_id: str = Form(...), prompt: str = Form(...)):
await save_chat_to_mysql(app.state.mysql_pool, session_id, prompt, result, url, duration)
logger.info("用户输入和大模型反馈已记录到 MySQL 数据库。")
# 调用会话检查机制
await on_session_end(session_id)
# 返回数据
return {
"success": True,
"url": url,
@ -204,6 +243,26 @@ async def get_chat_log(
raise HTTPException(status_code=500, detail=f"查询聊天记录失败: {str(e)}")
@app.get("/aichat/get_risk_page")
async def get_risk_page(
risk_flag: int = Query(default=1, ge=1, description="1有风险0无风险2:有风险但已处理"),
page: int = Query(default=1, ge=1, description="当前页码"),
page_size: int = Query(default=10, ge=1, le=100, description="每页记录数")
):
"""
查询有风险的聊天记录并按 id 降序分页
:param page: 当前页码
:param page_size: 每页记录数
:return: 分页数据
"""
try:
result = await get_risk_chat_log_page(app.state.mysql_pool, risk_flag, page, page_size)
return result
except Exception as e:
logger.error(f"查询有风险的聊天记录失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"查询有风险的聊天记录失败: {str(e)}")
# 运行 FastAPI 应用
if __name__ == "__main__":
import uvicorn

@ -0,0 +1,67 @@
import asyncio
from openai import AsyncOpenAI
from WxMini.Milvus.Config.MulvusConfig import *
from WxMini.Utils.MySQLUtil import init_mysql_pool, get_chat_log_by_session
# 初始化异步 OpenAI 客户端
client = AsyncOpenAI(
api_key=MODEL_API_KEY,
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
async def main():
# 哪个人员
session_id = 1
# 哪一页
page = 1
# 一页多少个
page_size = 100
# 初始化 MySQL 连接池
mysql_pool = await init_mysql_pool()
# 调用
result = await get_chat_log_by_session(mysql_pool, session_id, page, page_size)
# 我只关心 user_input 与 model_response
# 把这些拼接出问题与回答
history = ""
for row in result['data']: # 注意result 是一个字典,包含 'data' 字段
user_input = row['user_input']
model_response = row['model_response']
history = f"{history}\n问题:{user_input}\n回答:{model_response}"
# 将历史聊天记录发给大模型,让它帮我分析一下
prompt = (
"我将把用户与AI大模型交流的记录发给你帮我分析一下这个用户是否存在心理健康方面的问题"
"参考1、PHQ-9抑郁症筛查量表和2、Beck自杀意念评量表BSI-CV"
"如果没有健康问题请回复: OK否则回复NO换行后再输出是什么问题。"
f"\n\n历史聊天记录:{history}"
)
# 调用大模型进行分析
try:
response = await client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个心理健康分析助手,负责分析用户的心理健康状况。"},
{"role": "user", "content": prompt}
],
max_tokens=1000
)
# 提取大模型的回复
if response.choices and response.choices[0].message.content:
analysis_result = response.choices[0].message.content.strip()
print("大模型分析结果:")
print(analysis_result)
else:
print("大模型未返回有效结果。")
except Exception as e:
print(f"调用大模型失败: {str(e)}")
# 关闭连接池
mysql_pool.close()
await mysql_pool.wait_closed()
if __name__ == '__main__':
asyncio.run(main())

@ -1,4 +1,3 @@
import asyncio
import logging
from aiomysql import create_pool
from WxMini.Milvus.Config.MulvusConfig import *
@ -25,16 +24,17 @@ async def init_mysql_pool():
# 保存聊天记录到 MySQL
async def save_chat_to_mysql(mysql_pool, session_id, prompt, result,audio_url,duration):
async def save_chat_to_mysql(mysql_pool, session_id, prompt, result, audio_url, duration):
async with mysql_pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"INSERT INTO t_chat_log (session_id, user_input, model_response,audio_url,duration,create_time) VALUES (%s, %s, %s, %s, %s,NOW())",
(session_id, prompt, result,audio_url,duration)
(session_id, prompt, result, audio_url, duration)
)
await conn.commit()
logger.info("用户输入和大模型反馈已记录到 MySQL 数据库。")
# 清空表
async def truncate_chat_log(mysql_pool):
async with mysql_pool.acquire() as conn:
@ -43,8 +43,9 @@ async def truncate_chat_log(mysql_pool):
await conn.commit()
logger.info("表 t_chat_log 已清空。")
# 分页查询聊天记录
async def get_chat_log_by_session(mysql_pool,session_id, page=1, page_size=10):
async def get_chat_log_by_session(mysql_pool, session_id, page=1, page_size=10):
"""
根据 session_id 查询聊天记录并按 id 降序分页
:param session_id: 用户会话 ID
@ -92,4 +93,76 @@ async def get_chat_log_by_session(mysql_pool,session_id, page=1, page_size=10):
"total": total,
"page": page,
"page_size": page_size
}
}
# 更新为危险的记录
async def update_risk(mysql_pool, session_id, risk_memo):
async with mysql_pool.acquire() as conn:
async with conn.cursor() as cur:
# 1. 获取此人员的最后一条记录 id
await cur.execute(
"SELECT id FROM t_chat_log WHERE session_id = %s ORDER BY id DESC LIMIT 1",
(session_id,)
)
result = await cur.fetchone()
if result:
last_id = result[0]
# 2. 更新 risk_flag 和 risk_memo
await cur.execute(
"UPDATE t_chat_log SET risk_flag = 1, risk_memo = %s WHERE id = %s",
(risk_memo.replace('\n', '').replace("NO", ""), last_id)
)
await conn.commit()
print(f"已更新 session_id={session_id} 的最后一条记录 (id={last_id}) 的 risk_flag 和 risk_memo。")
else:
print(f"未找到 session_id={session_id} 的记录。")
async def get_risk_chat_log_page(mysql_pool, risk_flag, page=1, page_size=10):
"""
查询有风险的聊天记录并按 id 降序分页
:param mysql_pool: MySQL 连接池
:param page: 当前页码
:param page_size: 每页记录数
:return: 分页数据
"""
offset = (page - 1) * page_size
async with mysql_pool.acquire() as conn:
async with conn.cursor() as cur:
# 查询总记录数
await cur.execute(
"SELECT COUNT(*) FROM t_chat_log WHERE risk_flag = %s", (risk_flag)
)
total = (await cur.fetchone())[0]
# 查询分页数据
await cur.execute(
"SELECT id, session_id, user_input, model_response, audio_url, duration, create_time, risk_memo "
"FROM t_chat_log WHERE risk_flag = %s ORDER BY id DESC LIMIT %s OFFSET %s",
(risk_flag, page_size, offset)
)
records = await cur.fetchall()
# 将查询结果转换为字典列表
result = [
{
"id": record[0],
"session_id": record[1],
"user_input": record[2],
"model_response": record[3],
"audio_url": record[4],
"duration": record[5],
"create_time": record[6].strftime("%Y-%m-%d %H:%M:%S"),
"risk_memo": record[7]
}
for record in records
]
return {
"data": result,
"total": total,
"page": page,
"page_size": page_size
}

Loading…
Cancel
Save