main
HuangHai 4 months ago
parent 75f2431009
commit 7e9128a3f1

@ -3,6 +3,7 @@ import base64
import datetime
import json
import logging
import time
import uuid
from contextlib import asynccontextmanager
from datetime import datetime, timedelta
@ -14,6 +15,7 @@ from alibabacloud_tea_openapi.models import Config
from fastapi import Query, Depends, HTTPException, status, Form, FastAPI
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from openai import AsyncOpenAI
from passlib.context import CryptContext
from starlette.responses import StreamingResponse
@ -23,15 +25,14 @@ from WxMini.Milvus.Utils.MilvusConnectionPool import *
from WxMini.Utils.EmbeddingUtil import text_to_embedding
from WxMini.Utils.ImageUtil import *
from WxMini.Utils.MySQLUtil import init_mysql_pool, get_chat_log_by_session, get_user_by_login_name, \
get_chat_logs_by_risk_flag, get_chat_logs_summary
get_chat_logs_by_risk_flag, get_chat_logs_summary, save_chat_to_mysql
from WxMini.Utils.MySQLUtil import update_risk, get_last_chat_log_id
from WxMini.Utils.OssUtil import upload_mp3_to_oss_from_memory, hmacsha256
from WxMini.Utils.TianQiUtil import get_weather
from WxMini.Utils.TtsUtil import TTS
# 配置日志
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s")
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# 密码加密上下文
@ -603,11 +604,11 @@ async def generate_upload_params(current_user: dict = Depends(get_current_user))
@app.get("/aichat/recognize_content")
async def web_recognize_content(image_url: str,
#current_user: dict = Depends(get_current_user)
# current_user: dict = Depends(get_current_user)
):
#logger.info(f"current_user:{current_user['login_name']}")
#person_id = current_user['person_id']
person_id='0F66D9C5-428C-FE03-67EE-E5BEE46CB495'
# logger.info(f"current_user:{current_user['login_name']}")
# person_id = current_user['person_id']
person_id = '0F66D9C5-428C-FE03-67EE-E5BEE46CB495'
# 获取图片宽高
image_width, image_height = getImgWidthHeight(image_url)
@ -666,45 +667,168 @@ async def web_recognize_content(image_url: str,
@app.get("/aichat/recognize_text")
async def web_recognize_text(image_url: str, current_user: dict = Depends(get_current_user)):
logger.info(f"current_user:{current_user['login_name']}")
person_id = current_user['person_id']
try:
async def generate_stream():
# 假设 recognize_content 是一个异步生成器,逐条返回识别结果
async for result in recognize_text(client, app.state.mysql_pool, person_id, image_url):
yield f"{str(result)}" # 使用SSE格式
# 控制输出速度间隔0.01秒
await asyncio.sleep(0.01)
return StreamingResponse(
generate_stream(),
media_type="text/event-stream", # 使用SSE的media_type
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"} # 禁用缓存,保持连接
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def web_recognize_text(image_url: str,
# current_user: dict = Depends(get_current_user)
):
# logger.info(f"current_user:{current_user['login_name']}")
# person_id = current_user['person_id']
person_id = '0F66D9C5-428C-FE03-67EE-E5BEE46CB495'
# 获取图片宽高
image_width, image_height = getImgWidthHeight(image_url)
# 调用 AI 模型生成内容(流式输出)
response = await client.chat.completions.create(
model="qwen-vl-ocr",
messages=[
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": image_url,
"min_pixels": 28 * 28 * 4,
"max_pixels": 28 * 28 * 1280
},
{"type": "text", "text": "Read all the text in the image."},
]
}
],
stream=True
)
# 定义一个生成器函数,用于逐字返回流式结果
async def generate_stream():
summary = "" # 用于存储最终拼接的字符串
try:
# 逐块处理 AI 返回的内容
async for chunk in response:
if chunk.choices[0].delta.content:
chunk_content = chunk.choices[0].delta.content
# 逐字返回
for char in chunk_content:
print(char, end="", flush=True) # 打印到控制台
yield char.encode("utf-8") # 逐字返回
summary += char # 拼接字符
# 流式传输完成后,记录到数据库
await save_chat_to_mysql(app.state.mysql_pool, person_id, f'{image_url}', summary, "", 0, 2, 2, 1,
image_width,
image_height)
except asyncio.CancelledError:
# 客户端提前断开连接,无需处理
print("客户端断开连接")
except Exception as e:
error_response = json.dumps({
"success": False,
"message": f"生成内容失败: {str(e)}"
})
print(error_response)
yield error_response.encode("utf-8")
# 使用 StreamingResponse 返回流式结果
return StreamingResponse(
generate_stream(),
media_type="text/plain; charset=utf-8", # 明确指定字符编码为 UTF-8
headers={
"Cache-Control": "no-cache", # 禁用缓存
"Content-Type": "text/event-stream; charset=utf-8", # 设置内容类型和字符编码
"Transfer-Encoding": "chunked",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用 Nginx 缓冲(如果使用 Nginx
}
)
@app.get("/aichat/recognize_math")
async def web_recognize_math(image_url: str, current_user: dict = Depends(get_current_user)):
logger.info(f"current_user:{current_user['login_name']}")
person_id = current_user['person_id']
try:
async def generate_stream():
# 假设 recognize_content 是一个异步生成器,逐条返回识别结果
async for result in recognize_math(client, app.state.mysql_pool, person_id, image_url):
yield f"{str(result)}" # 使用SSE格式
# 控制输出速度间隔0.01秒
await asyncio.sleep(0.01)
return StreamingResponse(
generate_stream(),
media_type="text/event-stream", # 使用SSE的media_type
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"} # 禁用缓存,保持连接
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def web_recognize_math(image_url: str,
# current_user: dict = Depends(get_current_user)
):
# logger.info(f"current_user:{current_user['login_name']}")
# person_id = current_user['person_id']
person_id = '0F66D9C5-428C-FE03-67EE-E5BEE46CB495'
# 获取图片宽高
image_width, image_height = getImgWidthHeight(image_url)
client = AsyncOpenAI(
api_key=MODELSCOPE_ACCESS_TOKEN,
base_url="https://api-inference.modelscope.cn/v1"
)
"""
识别图片中的数学题流式输出并将结果记录到数据库
:param client: AsyncOpenAI 客户端
:param pool: 数据库连接池
:param person_id: 用户 ID
:param image_url: 图片 URL
:return: 最终拼接的字符串
"""
# 提示词
prompt = "You are a helpful and harmless assistant. You are Qwen developed by Alibaba. You should think step-by-step."
response = await client.chat.completions.create(
model="Qwen/Qwen2.5-VL-32B-Instruct",
messages=[
{
"role": "system",
"content": [
{"type": "text", "text": prompt}
],
},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": image_url}
},
{"type": "text", "text": "请使用中文回答:如何作答?"},
],
}
],
stream=True
)
# 定义一个生成器函数,用于逐字返回流式结果
async def generate_stream():
summary = "" # 用于存储最终拼接的字符串
try:
# 逐块处理 AI 返回的内容
async for chunk in response:
if chunk.choices[0].delta.content:
chunk_content = chunk.choices[0].delta.content
# 逐字返回
for char in chunk_content:
print(char, end="", flush=True) # 打印到控制台
yield char.encode("utf-8") # 逐字返回
summary += char # 拼接字符
# 流式传输完成后,记录到数据库
await save_chat_to_mysql(app.state.mysql_pool, person_id, f'{image_url}', summary, "", 0, 2, 2, 1,
image_width, image_height)
except asyncio.CancelledError:
# 客户端提前断开连接,无需处理
print("客户端断开连接")
except Exception as e:
error_response = json.dumps({
"success": False,
"message": f"生成内容失败: {str(e)}"
})
print(error_response)
yield error_response.encode("utf-8")
# 使用 StreamingResponse 返回流式结果
return StreamingResponse(
generate_stream(),
media_type="text/plain; charset=utf-8", # 明确指定字符编码为 UTF-8
headers={
"Cache-Control": "no-cache", # 禁用缓存
"Content-Type": "text/event-stream; charset=utf-8", # 设置内容类型和字符编码
"Transfer-Encoding": "chunked",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用 Nginx 缓冲(如果使用 Nginx
}
)
# 运行 FastAPI 应用

@ -1,116 +1,4 @@
import time
import requests
from openai import OpenAI, AsyncOpenAI
from WxMini.Milvus.Config.MulvusConfig import MODELSCOPE_ACCESS_TOKEN
from WxMini.Utils.MySQLUtil import save_chat_to_mysql
async def recognize_text(client, pool, person_id, image_url):
"""
识别图片中的文字流式输出并将结果记录到数据库
:param client: AsyncOpenAI 客户端
:param pool: 数据库连接池
:param person_id: 用户 ID
:param image_url: 图片 URL
:return: 最终拼接的字符串
"""
completion = await client.chat.completions.create(
model="qwen-vl-ocr",
messages=[
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": image_url,
"min_pixels": 28 * 28 * 4,
"max_pixels": 28 * 28 * 1280
},
{"type": "text", "text": "Read all the text in the image."},
]
}
],
stream=True
)
full_text = "" # 用于存储最终拼接的字符串
async for chunk in completion:
if chunk.choices[0].delta.content is not None:
for char in chunk.choices[0].delta.content:
if char != ' ':
yield char # 流式输出字符
full_text += char # 拼接字符
print(char, end='')
time.sleep(0.1) # 控制输出速度
# 获取图片宽高
image_width, image_height = getImgWidthHeight(image_url)
# 记录到数据库
try:
await save_chat_to_mysql(pool, person_id, f'{image_url}', full_text, "", 0, 2, 2, 1, image_width, image_height)
except Exception as e:
print(f"记录到数据库时出错:{e}")
async def recognize_math(pool, person_id, image_url):
client = AsyncOpenAI(
api_key=MODELSCOPE_ACCESS_TOKEN,
base_url="https://api-inference.modelscope.cn/v1"
)
"""
识别图片中的数学题流式输出并将结果记录到数据库
:param client: AsyncOpenAI 客户端
:param pool: 数据库连接池
:param person_id: 用户 ID
:param image_url: 图片 URL
:return: 最终拼接的字符串
"""
# 提示词
prompt = "You are a helpful and harmless assistant. You are Qwen developed by Alibaba. You should think step-by-step."
completion = await client.chat.completions.create(
model="Qwen/Qwen2.5-VL-32B-Instruct",
# model="Qwen/Qwen2.5-VL-72B-Instruct",
messages=[
{
"role": "system",
"content": [
{"type": "text", "text": prompt}
],
},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": image_url}
},
{"type": "text", "text": "请使用中文回答:如何作答?"},
],
}
],
stream=True
)
full_text = "" # 用于存储最终拼接的字符串
async for chunk in completion:
if chunk.choices[0].delta.content is not None:
for char in chunk.choices[0].delta.content:
if char != ' ':
yield char # 流式输出字符
full_text += char # 拼接字符
print(char, end='')
time.sleep(0.1) # 控制输出速度
# 获取图片宽高
image_width, image_height = getImgWidthHeight(image_url)
# 记录到数据库
try:
await save_chat_to_mysql(pool, person_id, f'{image_url}', full_text, "", 0, 2, 2, 1, image_width, image_height)
except Exception as e:
print(f"记录到数据库时出错:{e}")
# 获取图片的宽高

Loading…
Cancel
Save