You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

195 lines
7.9 KiB

1 month ago
import asyncio
1 month ago
from contextlib import asynccontextmanager
1 month ago
from logging.handlers import RotatingFileHandler
1 month ago
4 weeks ago
import jieba # 导入 jieba 分词库
1 month ago
import uvicorn
4 weeks ago
from fastapi import FastAPI, Request, Body
1 month ago
from openai import OpenAI
4 weeks ago
from sse_starlette.sse import EventSourceResponse
from starlette.staticfiles import StaticFiles
from gensim.models import KeyedVectors
1 month ago
from Config import Config
4 weeks ago
from Milvus.Config.MulvusConfig import *
from Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager
from Milvus.Utils.MilvusConnectionPool import *
from Milvus.Utils.MilvusConnectionPool import MilvusConnectionPool
1 month ago
1 month ago
# 初始化日志
1 month ago
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
4 weeks ago
handler = RotatingFileHandler('Logs/start.log', maxBytes=1024 * 1024, backupCount=5)
1 month ago
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
1 month ago
4 weeks ago
# 1. 加载预训练的 Word2Vec 模型
model_path = MS_MODEL_PATH # 替换为你的 Word2Vec 模型路径
model = KeyedVectors.load_word2vec_format(model_path, binary=False, limit=MS_MODEL_LIMIT)
print(f"模型加载成功,词向量维度: {model.vector_size}")
1 month ago
@asynccontextmanager
async def lifespan(app: FastAPI):
4 weeks ago
# 初始化Milvus连接池
app.state.milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS)
1 month ago
4 weeks ago
# 初始化集合管理器
app.state.collection_manager = MilvusCollectionManager(MS_COLLECTION_NAME)
app.state.collection_manager.load_collection()
1 month ago
# 初始化DeepSeek客户端
app.state.deepseek_client = OpenAI(
api_key=Config.DEEPSEEK_API_KEY,
base_url=Config.DEEPSEEK_URL
)
1 month ago
yield
1 month ago
4 weeks ago
# 关闭Milvus连接池
app.state.milvus_pool.close()
1 month ago
4 weeks ago
app = FastAPI(lifespan=lifespan)
1 month ago
4 weeks ago
# 将文本转换为嵌入向量
def text_to_embedding(text):
words = jieba.lcut(text) # 使用 jieba 分词
print(f"文本: {text}, 分词结果: {words}")
embeddings = [model[word] for word in words if word in model]
print(f"有效词向量数量: {len(embeddings)}")
if embeddings:
avg_embedding = sum(embeddings) / len(embeddings)
print(f"生成的平均向量: {avg_embedding[:5]}...") # 打印前 5 维
return avg_embedding
else:
print("未找到有效词,返回零向量")
return [0.0] * model.vector_size
async def generate_stream(client, milvus_pool, collection_manager, query):
1 month ago
"""生成SSE流"""
4 weeks ago
# 从连接池获取连接
connection = milvus_pool.get_connection()
1 month ago
4 weeks ago
try:
# 1. 将查询文本转换为向量
current_embedding = text_to_embedding(query)
# 2. 搜索相关数据
search_params = {
"metric_type": "L2", # 使用 L2 距离度量方式
"params": {"nprobe": MS_NPROBE} # 设置 IVF_FLAT 的 nprobe 参数
}
# 7. 将文本转换为嵌入向量
4 weeks ago
results = collection_manager.search(current_embedding, search_params, limit=5) # 返回 2 条结果
4 weeks ago
# 3. 处理搜索结果
print("最相关的历史对话:")
context=""
if results:
for hits in results:
for hit in hits:
try:
# 查询非向量字段
record = collection_manager.query_by_id(hit.id)
print(f"ID: {hit.id}")
print(f"会话 ID: {record['person_id']}")
print(f"用户问题: {record['user_input']}")
context=context+record['user_input']
print(f"大模型回复: {record['model_response']}")
print(f"时间: {record['timestamp']}")
print(f"距离: {hit.distance}")
print("-" * 40) # 分隔线
except Exception as e:
print(f"查询失败: {e}")
else:
print("未找到相关历史对话,请检查查询参数或数据。")
4 weeks ago
prompt = f"""根据以下关于'{query}'的相关信息,# Role: 信息检索与回答助手
## Profile
- language: 中文
- description: 这是一个专门设计来根据提供的材料检索信息并回答相关问题的助手它能够快速准确地从大量文本中提取关键信息并以清晰简洁的方式回答用户的问题
- background: 该助手基于先进的自然语言处理技术能够理解和处理复杂的查询提供准确的信息
- personality: 冷静客观高效
- expertise: 信息检索文本分析问答系统
- target_audience: 需要快速获取信息的用户如研究人员学生专业人士等
## Skills
1. 信息检索
- 文本搜索: 能够在大量文本中快速搜索关键词或短语
- 语义理解: 理解用户的查询意图即使查询语句不完全符合标准格式
- 结果筛选: 从搜索结果中筛选出最相关的信息
2. 信息处理
- 文本摘要: 提供文本的简要摘要帮助用户快速了解主要内容
- 关键信息提取: 提取文本中的关键信息如日期地点人物等
- 数据整合: 将来自不同来源的信息进行整合提供全面的回答
## Rules
1. 基本原则
- 准确性: 提供的信息必须准确无误确保来源可靠
- 客观性: 回答问题时保持客观避免主观判断
- 完整性: 尽可能提供完整的信息满足用户的需求
2. 行为准则
- 及时响应: 快速响应用户的查询提供及时的信息
- 清晰表达: 使用简洁明了的语言确保用户能够理解回答
- 保密性: 严格遵守保密协议不泄露用户的个人信息
3. 限制条件
- 不提供猜测性信息: 只提供有据可查的信息不进行猜测
- 不传播不实信息: 确保提供的信息真实可靠不传播不实信息
- 不涉及敏感内容: 避免回答涉及敏感内容的问题
## Workflows
- 目标: 根据提供的材料回答用户的问题
- 步骤 1: 接收并理解用户的查询确定查询意图
- 步骤 2: 在提供的材料中搜索相关信息筛选出最相关的信息
- 步骤 3: 对搜索到的信息进行处理提取关键信息并进行整合
- 预期结果: 提供准确清晰完整的回答满足用户的需求
## Initialization
作为信息检索与回答助手你必须遵守上述Rules按照Workflows执行任务
1 month ago
相关信息
{context}"""
4 weeks ago
1 month ago
response = client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system", "content": "你是一个专业的文档整理助手"},
{"role": "user", "content": prompt}
],
temperature=0.3,
stream=True
)
for chunk in response:
if chunk.choices[0].delta.content:
yield {"data": chunk.choices[0].delta.content}
await asyncio.sleep(0.01)
except Exception as e:
yield {"data": f"生成报告时出错: {str(e)}"}
4 weeks ago
finally:
# 释放连接
milvus_pool.release_connection(connection)
"""
http://10.10.21.22:8000/api/rag?query=小学数学中有哪些模型
"""
4 weeks ago
@app.post("/api/rag")
4 weeks ago
async def rag_stream(request: Request, query: str = Body(...)):
1 month ago
"""RAG+DeepSeek流式接口"""
return EventSourceResponse(
4 weeks ago
generate_stream(
request.app.state.deepseek_client,
request.app.state.milvus_pool,
request.app.state.collection_manager,
query
)
1 month ago
)
1 month ago
app.mount("/static", StaticFiles(directory="Static"), name="static")
1 month ago
if __name__ == "__main__":
1 month ago
uvicorn.run(app, host="0.0.0.0", port=8000)