main
HuangHai 4 months ago
parent f4daa9d587
commit d7b4fc92cb

@ -3,18 +3,51 @@ import uuid
import time
import jieba
from fastapi import FastAPI, Form, HTTPException
from openai import OpenAI
from openai import AsyncOpenAI # 使用异步客户端
from gensim.models import KeyedVectors
from contextlib import asynccontextmanager
from TtsConfig import *
from WxMini.OssUtil import upload_mp3_to_oss
from WxMini.TtsUtil import TTS
from WxMini.Utils.OssUtil import upload_mp3_to_oss
from WxMini.Utils.TtsUtil import TTS
from WxMini.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager
from WxMini.Milvus.Utils.MilvusConnectionPool import *
from WxMini.Milvus.Config.MulvusConfig import *
import asyncio # 引入异步支持
import logging # 增加日志记录
import jieba.analyse
# 配置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
from aiomysql import create_pool
# MySQL 配置
MYSQL_CONFIG = {
"host": MYSQL_HOST,
"port": MYSQL_PORT,
"user": MYSQL_USER,
"password": MYSQL_PASSWORD,
"db": MYSQL_DB_NAME,
"minsize": 1,
"maxsize": 20,
}
# 保存聊天记录到 MySQL
async def save_chat_to_mysql(mysql_pool, session_id, prompt, result):
async with mysql_pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"INSERT INTO t_chat_log (session_id, user_input, model_response, create_time) VALUES (%s, %s, %s, NOW())",
(session_id, prompt, result)
)
await conn.commit()
# 初始化 MySQL 连接池
async def init_mysql_pool():
return await create_pool(**MYSQL_CONFIG)
# 提取用户输入的关键词
def extract_keywords(text, topK=3):
"""
@ -26,10 +59,11 @@ def extract_keywords(text, topK=3):
keywords = jieba.analyse.extract_tags(text, topK=topK)
return keywords
# 初始化 Word2Vec 模型
model_path = MS_MODEL_PATH
model = KeyedVectors.load_word2vec_format(model_path, binary=False, limit=MS_MODEL_LIMIT)
print(f"模型加载成功,词向量维度: {model.vector_size}")
logger.info(f"模型加载成功,词向量维度: {model.vector_size}")
# 初始化 Milvus 连接池
milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS)
@ -38,40 +72,49 @@ milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=M
collection_name = MS_COLLECTION_NAME
collection_manager = MilvusCollectionManager(collection_name)
# 将文本转换为嵌入向量
def text_to_embedding(text):
words = jieba.lcut(text) # 使用 jieba 分词
print(f"文本: {text}, 分词结果: {words}")
logger.info(f"文本: {text}, 分词结果: {words}")
embeddings = [model[word] for word in words if word in model]
print(f"有效词向量数量: {len(embeddings)}")
logger.info(f"有效词向量数量: {len(embeddings)}")
if embeddings:
avg_embedding = sum(embeddings) / len(embeddings)
print(f"生成的平均向量: {avg_embedding[:5]}...") # 打印前 5 维
logger.info(f"生成的平均向量: {avg_embedding[:5]}...") # 打印前 5 维
return avg_embedding
else:
print("未找到有效词,返回零向量")
logger.warning("未找到有效词,返回零向量")
return [0.0] * model.vector_size
# 使用 Lifespan Events 处理应用启动和关闭逻辑
@asynccontextmanager
async def lifespan(app: FastAPI):
# 应用启动时加载集合到内存
collection_manager.load_collection()
print(f"集合 '{collection_name}' 已加载到内存。")
logger.info(f"集合 '{collection_name}' 已加载到内存。")
# 初始化 MySQL 连接池
app.state.mysql_pool = await init_mysql_pool()
logger.info("MySQL 连接池已初始化。")
yield
# 应用关闭时释放连接池
milvus_pool.close()
print("Milvus 连接池已关闭。")
app.state.mysql_pool.close()
await app.state.mysql_pool.wait_closed()
logger.info("Milvus 和 MySQL 连接池已关闭。")
# 初始化 FastAPI 应用
app = FastAPI(lifespan=lifespan)
# 初始化 OpenAI 客户端
client = OpenAI(
# 初始化异步 OpenAI 客户端
client = AsyncOpenAI(
api_key=MODEL_API_KEY,
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
@app.post("/reply")
async def reply(session_id: str = Form(...), prompt: str = Form(...)):
"""
@ -81,7 +124,7 @@ async def reply(session_id: str = Form(...), prompt: str = Form(...)):
:return: 大模型的回复
"""
try:
print(f"收到用户输入: {prompt}")
logger.info(f"收到用户输入: {prompt}")
# 从连接池中获取一个连接
connection = milvus_pool.get_connection()
@ -94,7 +137,8 @@ async def reply(session_id: str = Form(...), prompt: str = Form(...)):
"params": {"nprobe": MS_NPROBE} # 设置 IVF_FLAT 的 nprobe 参数
}
start_time = time.time()
results = collection_manager.search(
results = await asyncio.to_thread( # 将阻塞操作放到线程池中执行
collection_manager.search,
data=current_embedding, # 输入向量
search_params=search_params, # 搜索参数
expr=f"session_id == '{session_id}'", # 按 session_id 过滤
@ -109,30 +153,40 @@ async def reply(session_id: str = Form(...), prompt: str = Form(...)):
for hit in hits:
try:
# 查询非向量字段
record = collection_manager.query_by_id(hit.id)
record = await asyncio.to_thread(collection_manager.query_by_id, hit.id)
if record:
print(f"查询到的记录: {record}")
logger.info(f"查询到的记录: {record}")
# 添加历史交互
history_prompt += f"用户: {record['user_input']}\n大模型: {record['model_response']}\n"
except Exception as e:
print(f"查询失败: {e}")
logger.error(f"查询失败: {e}")
print(f"历史交互提示词: {history_prompt}")
# 限制历史交互提示词长度
history_prompt = history_prompt[:2000]
logger.info(f"历史交互提示词: {history_prompt}")
# 调用大模型,将历史交互作为提示词
response = client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个私人助理,负责回答用户的问题。请根据用户的历史对话和当前问题,提供准确且简洁的回答。不要提及你是通义千问或其他无关信息,也不可以回复与本次用户问题不相关的历史对话记录内容。"},
{"role": "user", "content": f"历史对话记录:{history_prompt},本次用户问题: {prompt}"} # 将历史交互和当前输入一起发送
],
max_tokens=500
)
try:
response = await asyncio.wait_for(
client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system",
"content": "你是一个私人助理,负责回答用户的问题。请根据用户的历史对话和当前问题,提供准确且简洁的回答。不要提及你是通义千问或其他无关信息,也不可以回复与本次用户问题不相关的历史对话记录内容。"},
{"role": "user", "content": f"历史对话记录:{history_prompt},本次用户问题: {prompt}"}
],
max_tokens=500
),
timeout=60 # 设置超时时间为 30 秒
)
except asyncio.TimeoutError:
logger.error("大模型调用超时")
raise HTTPException(status_code=500, detail="大模型调用超时")
# 提取生成的回复
if response.choices and response.choices[0].message.content:
result = response.choices[0].message.content.strip()
print(f"大模型回复: {result}")
logger.info(f"大模型回复: {result}")
# 记录用户输入和大模型反馈到向量数据库
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
@ -143,22 +197,31 @@ async def reply(session_id: str = Form(...), prompt: str = Form(...)):
[timestamp], # timestamp
[current_embedding] # embedding
]
collection_manager.insert_data(entities)
print("用户输入和大模型反馈已记录到向量数据库。")
if len(prompt) > 500:
logger.warning(f"用户输入被截断,原始长度: {len(prompt)}")
if len(result) > 500:
logger.warning(f"大模型回复被截断,原始长度: {len(result)}")
await asyncio.to_thread(collection_manager.insert_data, entities)
logger.info("用户输入和大模型反馈已记录到向量数据库。")
# 记录聊天数据到 MySQL
await save_chat_to_mysql(app.state.mysql_pool, session_id, prompt, result)
logger.info("用户输入和大模型反馈已记录到 MySQL 数据库。")
# 调用 TTS 生成 MP3
uuid_str = str(uuid.uuid4())
tts_file = "audio/" + uuid_str + ".mp3"
timestamp = int(time.time())
tts_file = f"audio/{uuid_str}_{timestamp}.mp3"
t = TTS(tts_file)
t.start(result)
await asyncio.to_thread(t.start, result) # 将 TTS 生成放到线程池中执行
# 文件上传到 OSS
upload_mp3_to_oss(tts_file, tts_file)
await asyncio.to_thread(upload_mp3_to_oss, tts_file, tts_file)
# 删除临时文件
try:
os.remove(tts_file)
print(f"临时文件 {tts_file} 已删除")
logger.info(f"临时文件 {tts_file} 已删除")
except Exception as e:
print(f"删除临时文件失败: {e}")
logger.error(f"删除临时文件失败: {e}")
# 完整的 URL
url = 'https://ylt.oss-cn-hangzhou.aliyuncs.com/' + tts_file
return {
@ -170,13 +233,15 @@ async def reply(session_id: str = Form(...), prompt: str = Form(...)):
else:
raise HTTPException(status_code=500, detail="大模型未返回有效结果")
except Exception as e:
logger.error(f"调用大模型失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"调用大模型失败: {str(e)}")
finally:
# 释放连接
milvus_pool.release_connection(connection)
# 运行 FastAPI 应用
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=5600)
uvicorn.run("Start:app", host="0.0.0.0", port=5600, workers=1)

@ -20,4 +20,33 @@ MS_MODEL_LIMIT = 10000
MS_DIMENSION = 200
# Milvus 搜索时的 nprobe 参数,用于控制搜索的精度和性能
MS_NPROBE = 100
MS_NPROBE = 100
# 驿来特账号的AK,SK
ACCESS_KEY_ID = 'LTAI5t5jxkgJtRK8wew8fnbq'
ACCESS_KEY_SECRET = 'b8HXNGz7IkI3Dhv7BZx9BNBEZy1uku'
ENDPOINT = 'https://oss-cn-hangzhou.aliyuncs.com'
BUCKET_NAME = 'ylt'
# Redis 配置
REDIS_HOST = '10.10.14.14' # Redis 服务器地址
REDIS_PORT = 18890 # Redis 端口
REDIS_DB = 0 # Redis 数据库编号
REDIS_PASSWORD = None # Redis 密码(如果没有密码,设置为 None
# 阿里云中用来调用 deepseek v3 的密钥
MODEL_API_KEY = "sk-01d13a39e09844038322108ecdbd1bbc"
#MODEL_NAME = "qwen-plus"
MODEL_NAME = "deepseek-v3"
# TTS的APPKEY
APPKEY = "90RJcqjlN4ZqymGd" # 获取Appkey请前往控制台https://nls-portal.console.aliyun.com/applist
# MYSQL配置信息
MYSQL_HOST= "10.10.14.210"
MYSQL_PORT= 22066
MYSQL_USER= "root"
MYSQL_PASSWORD="DsideaL147258369"
MYSQL_DB_NAME="ai_db"

@ -2,6 +2,5 @@
gensim==4.3.3
jieba==0.42.1
pymilvus==2.5.6
aiomysql==0.2.0
numpy==1.23.5

@ -1,22 +0,0 @@
import oss2
from TtsConfig import *
# 初始化 OSS Bucket
auth = oss2.Auth(ACCESS_KEY_ID, ACCESS_KEY_SECRET)
bucket = oss2.Bucket(auth, ENDPOINT, BUCKET_NAME)
# 上传 MP3 文件到 OSS
def upload_mp3_to_oss(file_path, oss_object_name):
"""
上传 MP3 文件到 OSS
:param file_path: 本地 MP3 文件路径
:param oss_object_name: OSS 中存储的文件名
"""
try:
# 上传文件
with open(file_path, 'rb') as file:
bucket.put_object(oss_object_name, file)
print(f"文件 {file_path} 已成功上传到 OSS存储为 {oss_object_name}")
except Exception as e:
print(f"上传失败: {e}")

@ -0,0 +1,11 @@
DROP TABLE IF EXISTS `t_chat_log`;
CREATE TABLE `t_chat_log` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`session_id` char(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '用户人员编号',
`user_input` varchar(2000) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '用户提出的问题',
`model_response` varchar(2000) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '大模型的反馈',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`) USING BTREE,
INDEX `idx_session_id` (`session_id`) USING BTREE,
INDEX `idx_create_time` (`create_time`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;

@ -6,9 +6,8 @@ from fastapi import FastAPI, Form, HTTPException
from openai import AsyncOpenAI # 使用异步客户端
from gensim.models import KeyedVectors
from contextlib import asynccontextmanager
from TtsConfig import *
from WxMini.OssUtil import upload_mp3_to_oss
from WxMini.TtsUtil import TTS
from WxMini.Utils.OssUtil import upload_mp3_to_oss, upload_mp3_to_oss_from_memory
from WxMini.Utils.TtsUtil import TTS
from WxMini.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager
from WxMini.Milvus.Utils.MilvusConnectionPool import *
from WxMini.Milvus.Config.MulvusConfig import *
@ -21,6 +20,35 @@ import jieba.analyse
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
from aiomysql import create_pool
# MySQL 配置
MYSQL_CONFIG = {
"host": MYSQL_HOST,
"port": MYSQL_PORT,
"user": MYSQL_USER,
"password": MYSQL_PASSWORD,
"db": MYSQL_DB_NAME,
"minsize": 1,
"maxsize": 20,
}
# 保存聊天记录到 MySQL
async def save_chat_to_mysql(mysql_pool, session_id, prompt, result):
async with mysql_pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"INSERT INTO t_chat_log (session_id, user_input, model_response, create_time) VALUES (%s, %s, %s, NOW())",
(session_id, prompt, result)
)
await conn.commit()
# 初始化 MySQL 连接池
async def init_mysql_pool():
return await create_pool(**MYSQL_CONFIG)
# 提取用户输入的关键词
def extract_keywords(text, topK=3):
@ -68,10 +96,15 @@ async def lifespan(app: FastAPI):
# 应用启动时加载集合到内存
collection_manager.load_collection()
logger.info(f"集合 '{collection_name}' 已加载到内存。")
# 初始化 MySQL 连接池
app.state.mysql_pool = await init_mysql_pool()
logger.info("MySQL 连接池已初始化。")
yield
# 应用关闭时释放连接池
milvus_pool.close()
logger.info("Milvus 连接池已关闭。")
app.state.mysql_pool.close()
await app.state.mysql_pool.wait_closed()
logger.info("Milvus 和 MySQL 连接池已关闭。")
# 初始化 FastAPI 应用
@ -130,18 +163,27 @@ async def reply(session_id: str = Form(...), prompt: str = Form(...)):
except Exception as e:
logger.error(f"查询失败: {e}")
# 限制历史交互提示词长度
history_prompt = history_prompt[:2000]
logger.info(f"历史交互提示词: {history_prompt}")
# 调用大模型,将历史交互作为提示词
response = await client.chat.completions.create( # 使用异步调用
model=MODEL_NAME,
messages=[
{"role": "system",
"content": "你是一个私人助理,负责回答用户的问题。请根据用户的历史对话和当前问题,提供准确且简洁的回答。不要提及你是通义千问或其他无关信息,也不可以回复与本次用户问题不相关的历史对话记录内容。"},
{"role": "user", "content": f"历史对话记录:{history_prompt},本次用户问题: {prompt}"} # 将历史交互和当前输入一起发送
],
max_tokens=500
)
try:
response = await asyncio.wait_for(
client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system",
"content": "你是一个私人助理,负责回答用户的问题。请根据用户的历史对话和当前问题,提供准确且简洁的回答。不要提及你是通义千问或其他无关信息,也不可以回复与本次用户问题不相关的历史对话记录内容。"},
{"role": "user", "content": f"历史对话记录:{history_prompt},本次用户问题: {prompt}"}
],
max_tokens=500
),
timeout=60 # 设置超时时间为 60 秒
)
except asyncio.TimeoutError:
logger.error("大模型调用超时")
raise HTTPException(status_code=500, detail="大模型调用超时")
# 提取生成的回复
if response.choices and response.choices[0].message.content:
@ -157,22 +199,31 @@ async def reply(session_id: str = Form(...), prompt: str = Form(...)):
[timestamp], # timestamp
[current_embedding] # embedding
]
if len(prompt) > 500:
logger.warning(f"用户输入被截断,原始长度: {len(prompt)}")
if len(result) > 500:
logger.warning(f"大模型回复被截断,原始长度: {len(result)}")
await asyncio.to_thread(collection_manager.insert_data, entities)
logger.info("用户输入和大模型反馈已记录到向量数据库。")
# 记录聊天数据到 MySQL
await save_chat_to_mysql(app.state.mysql_pool, session_id, prompt, result)
logger.info("用户输入和大模型反馈已记录到 MySQL 数据库。")
# 调用 TTS 生成 MP3
uuid_str = str(uuid.uuid4())
tts_file = "audio/" + uuid_str + ".mp3"
t = TTS(tts_file)
await asyncio.to_thread(t.start, result) # 将 TTS 生成放到线程池中执行
# 文件上传到 OSS
await asyncio.to_thread(upload_mp3_to_oss, tts_file, tts_file)
# 删除临时文件
try:
os.remove(tts_file)
logger.info(f"临时文件 {tts_file} 已删除")
except Exception as e:
logger.error(f"删除临时文件失败: {e}")
timestamp = int(time.time())
tts_file = f"audio/{uuid_str}_{timestamp}.mp3"
# 生成 TTS 音频数据(不落盘)
t = TTS(None) # 传入 None 表示不保存到本地文件
audio_data = await asyncio.to_thread(t.generate_audio, result) # 假设 TTS 类有一个 generate_audio 方法返回音频数据
# 将音频数据直接上传到 OSS
await asyncio.to_thread(upload_mp3_to_oss_from_memory, tts_file,
audio_data) # 假设 upload_mp3_to_oss_from_memory 支持从内存上传
logger.info(f"TTS 文件已直接上传到 OSS: {tts_file}")
# 完整的 URL
url = 'https://ylt.oss-cn-hangzhou.aliyuncs.com/' + tts_file
return {
@ -195,4 +246,4 @@ async def reply(session_id: str = Form(...), prompt: str = Form(...)):
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=5600, workers=4)
uvicorn.run("Start:app", host="0.0.0.0", port=5800, workers=1)

@ -1,21 +0,0 @@
# 驿来特账号的AK,SK
ACCESS_KEY_ID = 'LTAI5t5jxkgJtRK8wew8fnbq'
ACCESS_KEY_SECRET = 'b8HXNGz7IkI3Dhv7BZx9BNBEZy1uku'
ENDPOINT = 'https://oss-cn-hangzhou.aliyuncs.com'
BUCKET_NAME = 'ylt'
# Redis 配置
REDIS_HOST = '10.10.14.14' # Redis 服务器地址
REDIS_PORT = 18890 # Redis 端口
REDIS_DB = 0 # Redis 数据库编号
REDIS_PASSWORD = None # Redis 密码(如果没有密码,设置为 None
# 阿里云中用来调用 deepseek v3 的密钥
MODEL_API_KEY = "sk-01d13a39e09844038322108ecdbd1bbc"
#MODEL_NAME = "qwen-plus"
MODEL_NAME = "deepseek-v3"
# TTS的APPKEY
APPKEY = "90RJcqjlN4ZqymGd" # 获取Appkey请前往控制台https://nls-portal.console.aliyun.com/applist

@ -1,37 +0,0 @@
# -*- coding: utf-8 -*-
import nls
from GetToken import *
from TtsConfig import *
URL = "wss://nls-gateway-cn-shanghai.aliyuncs.com/ws/v1"
class TTS:
def __init__(self, _file):
self._file = _file
self._f = None
def start(self, text):
self._text = text
# 确保目录存在
# os.makedirs(os.path.dirname(self._file), exist_ok=True)
self._f = open(self._file, "wb")
TOKEN = getToken() # 参考https://help.aliyun.com/document_detail/450255.html获取token
# 初始化 TTS
tts = nls.NlsSpeechSynthesizer(
url=URL,
token=TOKEN,
appkey=APPKEY,
on_data=self.on_data,
on_close=self.on_close
)
# 同步执行 TTS 生成
tts.start(self._text, voice="xiaobei", aformat="mp3")
def on_close(self, *args):
if self._f:
self._f.close()
print("TTS 生成完成,文件已关闭")
def on_data(self, data, *args):
if self._f:
self._f.write(data)

@ -0,0 +1,35 @@
import oss2
from WxMini.Milvus.Config.MulvusConfig import *
# 初始化 OSS Bucket
auth = oss2.Auth(ACCESS_KEY_ID, ACCESS_KEY_SECRET)
bucket = oss2.Bucket(auth, ENDPOINT, BUCKET_NAME)
def upload_mp3_to_oss(file_path, oss_object_name):
"""
上传本地 MP3 文件到 OSS
:param file_path: 本地 MP3 文件路径
:param oss_object_name: OSS 中存储的文件名
"""
try:
# 上传文件
with open(file_path, 'rb') as file:
bucket.put_object(oss_object_name, file)
print(f"文件 {file_path} 已成功上传到 OSS存储为 {oss_object_name}")
except Exception as e:
print(f"上传失败: {e}")
def upload_mp3_to_oss_from_memory(oss_object_name, audio_data):
"""
从内存上传 MP3 数据到 OSS
:param oss_object_name: OSS 中存储的文件名
:param audio_data: 音频数据bytes 类型
"""
try:
# 上传音频数据
bucket.put_object(oss_object_name, audio_data)
print(f"音频数据已成功上传到 OSS存储为 {oss_object_name}")
except Exception as e:
print(f"上传失败: {e}")

@ -1,5 +1,6 @@
import redis
from TtsConfig import *
from WxMini.Milvus.Config.MulvusConfig import *
# 初始化 Redis 连接池
pool = redis.ConnectionPool(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, password=REDIS_PASSWORD)

@ -1,12 +1,11 @@
#! /usr/bin/env python
# coding=utf-8
import json
from datetime import datetime
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
from TtsConfig import *
from WxMini.RedisUtil import set_tts_token, get_tts_token
from WxMini.Milvus.Config.MulvusConfig import *
from WxMini.Utils.RedisUtil import set_tts_token, get_tts_token
def getToken():
@ -32,16 +31,16 @@ def getToken():
jss = json.loads(response)
if 'Token' in jss and 'Id' in jss['Token']:
token = jss['Token']['Id']
#expireTime = jss['Token']['ExpireTime']
expireTime = jss['Token']['ExpireTime']
# 转换为本地时间
#expire_date = datetime.fromtimestamp(expireTime)
expire_date = datetime.fromtimestamp(expireTime)
# 格式化输出
#formatted_date = expire_date.strftime("%Y-%m-%d %H:%M:%S")
#print("过期时间:", formatted_date)
formatted_date = expire_date.strftime("%Y-%m-%d %H:%M:%S")
print("过期时间:", formatted_date)
# 计算时间差(秒数)
#now = datetime.now()
#time_diff = (expire_date - now).total_seconds()
#print("距离过期还有(秒):", time_diff)
now = datetime.now()
time_diff = (expire_date - now).total_seconds()
print("距离过期还有(秒):", time_diff)
# 设置 TTS Token
set_tts_token(token)
return token

@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
import nls
from WxMini.Utils.TokenUtil import *
from WxMini.Milvus.Config.MulvusConfig import *
URL = "wss://nls-gateway-cn-shanghai.aliyuncs.com/ws/v1"
class TTS:
def __init__(self, _file=None):
self._file = _file
self._f = None
self._audio_data = bytearray() # 用于存储生成的音频数据
def start(self, text):
"""
生成 TTS 并保存到文件如果 _file 不为 None
"""
self._text = text
if self._file:
# 如果指定了文件路径,则保存到文件
self._f = open(self._file, "wb")
TOKEN = getToken() # 参考https://help.aliyun.com/document_detail/450255.html获取token
# 初始化 TTS
tts = nls.NlsSpeechSynthesizer(
url=URL,
token=TOKEN,
appkey=APPKEY,
on_data=self.on_data,
on_close=self.on_close
)
# 同步执行 TTS 生成
tts.start(self._text, voice="xiaobei", aformat="mp3")
def generate_audio(self, text):
"""
生成 TTS 并返回音频数据不保存到文件
"""
self._text = text
self._audio_data = bytearray() # 重置音频数据
TOKEN = getToken() # 参考https://help.aliyun.com/document_detail/450255.html获取token
# 初始化 TTS
tts = nls.NlsSpeechSynthesizer(
url=URL,
token=TOKEN,
appkey=APPKEY,
on_data=self.on_data,
on_close=self.on_close
)
# 同步执行 TTS 生成
tts.start(self._text, voice="xiaobei", aformat="mp3")
return bytes(self._audio_data) # 返回生成的音频数据
def on_close(self, *args):
"""
TTS 生成完成时的回调
"""
if self._f:
self._f.close()
print("TTS 生成完成,文件已关闭")
else:
print("TTS 生成完成,音频数据已返回")
def on_data(self, data, *args):
"""
接收 TTS 数据的回调
"""
if self._f:
self._f.write(data) # 如果指定了文件路径,则写入文件
self._audio_data.extend(data) # 将数据存储到内存中
Loading…
Cancel
Save