diff --git a/AI/WxMini/Start.py b/AI/WxMini/Start.py index 7c82a988..7e058d08 100644 --- a/AI/WxMini/Start.py +++ b/AI/WxMini/Start.py @@ -3,6 +3,7 @@ import base64 import datetime import json import logging +import time import uuid from contextlib import asynccontextmanager from datetime import datetime, timedelta @@ -14,6 +15,7 @@ from alibabacloud_tea_openapi.models import Config from fastapi import Query, Depends, HTTPException, status, Form, FastAPI from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt +from openai import AsyncOpenAI from passlib.context import CryptContext from starlette.responses import StreamingResponse @@ -23,15 +25,14 @@ from WxMini.Milvus.Utils.MilvusConnectionPool import * from WxMini.Utils.EmbeddingUtil import text_to_embedding from WxMini.Utils.ImageUtil import * from WxMini.Utils.MySQLUtil import init_mysql_pool, get_chat_log_by_session, get_user_by_login_name, \ - get_chat_logs_by_risk_flag, get_chat_logs_summary + get_chat_logs_by_risk_flag, get_chat_logs_summary, save_chat_to_mysql from WxMini.Utils.MySQLUtil import update_risk, get_last_chat_log_id from WxMini.Utils.OssUtil import upload_mp3_to_oss_from_memory, hmacsha256 from WxMini.Utils.TianQiUtil import get_weather from WxMini.Utils.TtsUtil import TTS # 配置日志 -logging.basicConfig(level=logging.INFO, - format="%(asctime)s - %(levelname)s - %(message)s") +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # 密码加密上下文 @@ -603,11 +604,11 @@ async def generate_upload_params(current_user: dict = Depends(get_current_user)) @app.get("/aichat/recognize_content") async def web_recognize_content(image_url: str, - #current_user: dict = Depends(get_current_user) + # current_user: dict = Depends(get_current_user) ): - #logger.info(f"current_user:{current_user['login_name']}") - #person_id = current_user['person_id'] - person_id='0F66D9C5-428C-FE03-67EE-E5BEE46CB495' + # logger.info(f"current_user:{current_user['login_name']}") + # person_id = current_user['person_id'] + person_id = '0F66D9C5-428C-FE03-67EE-E5BEE46CB495' # 获取图片宽高 image_width, image_height = getImgWidthHeight(image_url) @@ -666,45 +667,168 @@ async def web_recognize_content(image_url: str, @app.get("/aichat/recognize_text") -async def web_recognize_text(image_url: str, current_user: dict = Depends(get_current_user)): - logger.info(f"current_user:{current_user['login_name']}") - person_id = current_user['person_id'] - try: - async def generate_stream(): - # 假设 recognize_content 是一个异步生成器,逐条返回识别结果 - async for result in recognize_text(client, app.state.mysql_pool, person_id, image_url): - yield f"{str(result)}" # 使用SSE格式 - # 控制输出速度,间隔0.01秒 - await asyncio.sleep(0.01) - - return StreamingResponse( - generate_stream(), - media_type="text/event-stream", # 使用SSE的media_type - headers={"Cache-Control": "no-cache", "Connection": "keep-alive"} # 禁用缓存,保持连接 - ) - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) +async def web_recognize_text(image_url: str, + # current_user: dict = Depends(get_current_user) + ): + # logger.info(f"current_user:{current_user['login_name']}") + # person_id = current_user['person_id'] + person_id = '0F66D9C5-428C-FE03-67EE-E5BEE46CB495' + # 获取图片宽高 + image_width, image_height = getImgWidthHeight(image_url) + + # 调用 AI 模型生成内容(流式输出) + response = await client.chat.completions.create( + model="qwen-vl-ocr", + messages=[ + { + "role": "user", + "content": [ + { + "type": "image_url", + "image_url": image_url, + "min_pixels": 28 * 28 * 4, + "max_pixels": 28 * 28 * 1280 + }, + {"type": "text", "text": "Read all the text in the image."}, + ] + } + ], + stream=True + ) + + # 定义一个生成器函数,用于逐字返回流式结果 + async def generate_stream(): + summary = "" # 用于存储最终拼接的字符串 + try: + # 逐块处理 AI 返回的内容 + async for chunk in response: + if chunk.choices[0].delta.content: + chunk_content = chunk.choices[0].delta.content + # 逐字返回 + for char in chunk_content: + print(char, end="", flush=True) # 打印到控制台 + yield char.encode("utf-8") # 逐字返回 + summary += char # 拼接字符 + + # 流式传输完成后,记录到数据库 + await save_chat_to_mysql(app.state.mysql_pool, person_id, f'{image_url}', summary, "", 0, 2, 2, 1, + image_width, + image_height) + + except asyncio.CancelledError: + # 客户端提前断开连接,无需处理 + print("客户端断开连接") + except Exception as e: + error_response = json.dumps({ + "success": False, + "message": f"生成内容失败: {str(e)}" + }) + print(error_response) + yield error_response.encode("utf-8") + + # 使用 StreamingResponse 返回流式结果 + return StreamingResponse( + generate_stream(), + media_type="text/plain; charset=utf-8", # 明确指定字符编码为 UTF-8 + headers={ + "Cache-Control": "no-cache", # 禁用缓存 + "Content-Type": "text/event-stream; charset=utf-8", # 设置内容类型和字符编码 + "Transfer-Encoding": "chunked", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", # 禁用 Nginx 缓冲(如果使用 Nginx) + } + ) @app.get("/aichat/recognize_math") -async def web_recognize_math(image_url: str, current_user: dict = Depends(get_current_user)): - logger.info(f"current_user:{current_user['login_name']}") - person_id = current_user['person_id'] - try: - async def generate_stream(): - # 假设 recognize_content 是一个异步生成器,逐条返回识别结果 - async for result in recognize_math(client, app.state.mysql_pool, person_id, image_url): - yield f"{str(result)}" # 使用SSE格式 - # 控制输出速度,间隔0.01秒 - await asyncio.sleep(0.01) - - return StreamingResponse( - generate_stream(), - media_type="text/event-stream", # 使用SSE的media_type - headers={"Cache-Control": "no-cache", "Connection": "keep-alive"} # 禁用缓存,保持连接 - ) - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) +async def web_recognize_math(image_url: str, + # current_user: dict = Depends(get_current_user) + ): + # logger.info(f"current_user:{current_user['login_name']}") + # person_id = current_user['person_id'] + person_id = '0F66D9C5-428C-FE03-67EE-E5BEE46CB495' + # 获取图片宽高 + image_width, image_height = getImgWidthHeight(image_url) + + client = AsyncOpenAI( + api_key=MODELSCOPE_ACCESS_TOKEN, + base_url="https://api-inference.modelscope.cn/v1" + ) + """ + 识别图片中的数学题,流式输出,并将结果记录到数据库 + :param client: AsyncOpenAI 客户端 + :param pool: 数据库连接池 + :param person_id: 用户 ID + :param image_url: 图片 URL + :return: 最终拼接的字符串 + """ + # 提示词 + prompt = "You are a helpful and harmless assistant. You are Qwen developed by Alibaba. You should think step-by-step." + + response = await client.chat.completions.create( + model="Qwen/Qwen2.5-VL-32B-Instruct", + messages=[ + { + "role": "system", + "content": [ + {"type": "text", "text": prompt} + ], + }, + { + "role": "user", + "content": [ + { + "type": "image_url", + "image_url": {"url": image_url} + }, + {"type": "text", "text": "请使用中文回答:如何作答?"}, + ], + } + ], + stream=True + ) + + # 定义一个生成器函数,用于逐字返回流式结果 + async def generate_stream(): + summary = "" # 用于存储最终拼接的字符串 + try: + # 逐块处理 AI 返回的内容 + async for chunk in response: + if chunk.choices[0].delta.content: + chunk_content = chunk.choices[0].delta.content + # 逐字返回 + for char in chunk_content: + print(char, end="", flush=True) # 打印到控制台 + yield char.encode("utf-8") # 逐字返回 + summary += char # 拼接字符 + + # 流式传输完成后,记录到数据库 + await save_chat_to_mysql(app.state.mysql_pool, person_id, f'{image_url}', summary, "", 0, 2, 2, 1, + image_width, image_height) + + except asyncio.CancelledError: + # 客户端提前断开连接,无需处理 + print("客户端断开连接") + except Exception as e: + error_response = json.dumps({ + "success": False, + "message": f"生成内容失败: {str(e)}" + }) + print(error_response) + yield error_response.encode("utf-8") + + # 使用 StreamingResponse 返回流式结果 + return StreamingResponse( + generate_stream(), + media_type="text/plain; charset=utf-8", # 明确指定字符编码为 UTF-8 + headers={ + "Cache-Control": "no-cache", # 禁用缓存 + "Content-Type": "text/event-stream; charset=utf-8", # 设置内容类型和字符编码 + "Transfer-Encoding": "chunked", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", # 禁用 Nginx 缓冲(如果使用 Nginx) + } + ) # 运行 FastAPI 应用 diff --git a/AI/WxMini/Utils/ImageUtil.py b/AI/WxMini/Utils/ImageUtil.py index 7c6b37b6..6d23b304 100644 --- a/AI/WxMini/Utils/ImageUtil.py +++ b/AI/WxMini/Utils/ImageUtil.py @@ -1,116 +1,4 @@ -import time import requests -from openai import OpenAI, AsyncOpenAI -from WxMini.Milvus.Config.MulvusConfig import MODELSCOPE_ACCESS_TOKEN -from WxMini.Utils.MySQLUtil import save_chat_to_mysql - - -async def recognize_text(client, pool, person_id, image_url): - """ - 识别图片中的文字,流式输出,并将结果记录到数据库 - :param client: AsyncOpenAI 客户端 - :param pool: 数据库连接池 - :param person_id: 用户 ID - :param image_url: 图片 URL - :return: 最终拼接的字符串 - """ - completion = await client.chat.completions.create( - model="qwen-vl-ocr", - messages=[ - { - "role": "user", - "content": [ - { - "type": "image_url", - "image_url": image_url, - "min_pixels": 28 * 28 * 4, - "max_pixels": 28 * 28 * 1280 - }, - {"type": "text", "text": "Read all the text in the image."}, - ] - } - ], - stream=True - ) - - full_text = "" # 用于存储最终拼接的字符串 - async for chunk in completion: - if chunk.choices[0].delta.content is not None: - for char in chunk.choices[0].delta.content: - if char != ' ': - yield char # 流式输出字符 - full_text += char # 拼接字符 - print(char, end='') - time.sleep(0.1) # 控制输出速度 - - # 获取图片宽高 - image_width, image_height = getImgWidthHeight(image_url) - - # 记录到数据库 - try: - await save_chat_to_mysql(pool, person_id, f'{image_url}', full_text, "", 0, 2, 2, 1, image_width, image_height) - except Exception as e: - print(f"记录到数据库时出错:{e}") - - - -async def recognize_math(pool, person_id, image_url): - client = AsyncOpenAI( - api_key=MODELSCOPE_ACCESS_TOKEN, - base_url="https://api-inference.modelscope.cn/v1" - ) - """ - 识别图片中的数学题,流式输出,并将结果记录到数据库 - :param client: AsyncOpenAI 客户端 - :param pool: 数据库连接池 - :param person_id: 用户 ID - :param image_url: 图片 URL - :return: 最终拼接的字符串 - """ - # 提示词 - prompt = "You are a helpful and harmless assistant. You are Qwen developed by Alibaba. You should think step-by-step." - - completion = await client.chat.completions.create( - model="Qwen/Qwen2.5-VL-32B-Instruct", - # model="Qwen/Qwen2.5-VL-72B-Instruct", - messages=[ - { - "role": "system", - "content": [ - {"type": "text", "text": prompt} - ], - }, - { - "role": "user", - "content": [ - { - "type": "image_url", - "image_url": {"url": image_url} - }, - {"type": "text", "text": "请使用中文回答:如何作答?"}, - ], - } - ], - stream=True - ) - - full_text = "" # 用于存储最终拼接的字符串 - async for chunk in completion: - if chunk.choices[0].delta.content is not None: - for char in chunk.choices[0].delta.content: - if char != ' ': - yield char # 流式输出字符 - full_text += char # 拼接字符 - print(char, end='') - time.sleep(0.1) # 控制输出速度 - # 获取图片宽高 - image_width, image_height = getImgWidthHeight(image_url) - - # 记录到数据库 - try: - await save_chat_to_mysql(pool, person_id, f'{image_url}', full_text, "", 0, 2, 2, 1, image_width, image_height) - except Exception as e: - print(f"记录到数据库时出错:{e}") # 获取图片的宽高