main
HuangHai 4 weeks ago
parent 2fbffe2fd4
commit 04955cfb56

@ -1,126 +0,0 @@
from pymilvus import Collection, utility, CollectionSchema
class MilvusCollectionManager:
def __init__(self, collection_name):
"""
初始化集合管理器
:param collection_name: 集合名称
"""
self.collection_name = collection_name
self.collection = None
self._load_collection_if_exists()
def _load_collection_if_exists(self):
"""
如果集合存在则加载集合
"""
if utility.has_collection(self.collection_name):
self.collection = Collection(name=self.collection_name)
# print(f"集合 '{self.collection_name}' 已加载。")
else:
print(f"集合 '{self.collection_name}' 不存在。")
def create_collection(self, fields, schema_description):
"""
创建集合
:param fields: 字段列表
:param schema_description: 集合描述
"""
if utility.has_collection(self.collection_name):
utility.drop_collection(self.collection_name) # 如果集合已存在,先删除
schema = CollectionSchema(fields, description=schema_description)
self.collection = Collection(name=self.collection_name, schema=schema)
print(f"集合 '{self.collection_name}' 创建成功。")
def create_index(self, field_name, index_params):
"""
创建索引
:param field_name: 字段名称
:param index_params: 索引参数
"""
if self.collection is None:
raise Exception("集合未加载,请检查集合是否存在。")
self.collection.create_index(field_name, index_params)
print("索引创建成功。")
def insert_data(self, entities):
"""
插入数据
:param entities: 数据实体格式为 [texts, embeddings]
"""
if self.collection is None:
raise Exception("集合未加载,请检查集合是否存在。")
self.collection.insert(entities)
def load_collection(self):
"""
加载集合到内存
"""
if self.collection is None:
raise Exception("集合未加载,请检查集合是否存在。")
self.collection.load()
def query_by_id(self, id):
"""
根据 ID 查询非向量字段
:param id: 记录的 ID
:return: 包含非向量字段的字典
"""
try:
# 使用 Milvus 的 query 方法查询指定 ID 的记录
results = self.collection.query(
expr=f"id == {id}", # 查询条件
output_fields=["id", "tags", "user_input", "timestamp"] # 返回的字段
)
if results:
return results[0] # 返回第一条记录
else:
return None
except Exception as e:
print(f"查询失败: {e}")
return None
def search(self, data, search_params, expr=None, limit=5):
"""
在集合中搜索与输入向量最相似的数据
:param data: 输入向量
:param search_params: 搜索参数
:param expr: 过滤条件可选
:param limit: 返回结果的数量
:return: 搜索结果
"""
try:
# 构建搜索参数
search_result = self.collection.search(
data=[data], # 输入向量
anns_field="embedding", # 向量字段名称
param=search_params, # 搜索参数
limit=limit, # 返回结果的数量
expr=expr # 过滤条件
)
return search_result
except Exception as e:
print(f"搜索失败: {e}")
return None
def query_text_by_id(self, id):
"""
根据 ID 查询对话文本
:param id: 数据 ID
:return: 对话文本
"""
if self.collection is None:
raise Exception("集合未加载,请检查集合是否存在。")
# 检查集合的字段定义
schema = self.collection.schema
field_names = [field.name for field in schema.fields]
if "text" not in field_names:
raise Exception(f"集合 '{self.collection_name}' 中不存在 'text' 字段,请检查集合定义。")
result = self.collection.query(expr=f"id == {id}", output_fields=["text"])
if result:
return result[0]["text"]
else:
return None

@ -1,68 +0,0 @@
import logging
import threading
from queue import Queue
from pymilvus import connections
# 配置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# 1. 手动实现 Milvus 连接池
class MilvusConnectionPool:
def __init__(self, host, port, max_connections=5):
"""
初始化 Milvus 连接池
:param host: Milvus 主机地址
:param port: Milvus 端口
:param max_connections: 最大连接数
"""
self.host = host
self.port = port
self.max_connections = max_connections
self._pool = Queue(max_connections)
self._lock = threading.Lock()
# 初始化连接池
for _ in range(max_connections):
self._pool.put(self._create_connection())
def _create_connection(self):
"""
创建一个新的 Milvus 连接
:return: Milvus 连接对象
"""
return connections.connect(host=self.host, port=self.port, alias="default")
def get_connection(self):
logger.info(f"获取连接,当前可用连接数: {self._pool.qsize()}")
"""
从连接池中获取一个连接
:return: Milvus 连接对象
"""
with self._lock:
if not self._pool.empty():
return self._pool.get()
else:
raise Exception("连接池已满,无法获取连接")
def release_connection(self, connection):
"""
释放连接将其放回连接池
:param connection: Milvus 连接对象
"""
with self._lock:
if self._pool.qsize() < self.max_connections:
self._pool.put(connection)
else:
connections.disconnect("default")
logger.info(f"释放连接,当前可用连接数: {self._pool.qsize()}")
def close(self):
"""
关闭连接池释放所有连接
"""
with self._lock:
while not self._pool.empty():
connection = self._pool.get()
connections.disconnect("default")

@ -1,48 +0,0 @@
"""
pip install pymilvus gensim
"""
from pymilvus import FieldSchema, DataType, utility
from Config.Config import MS_HOST, MS_PORT, MS_MAX_CONNECTIONS, MS_COLLECTION_NAME, MS_DIMENSION
from Backup.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager
# 1. 使用连接池管理 Milvus 连接
milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS)
# 2. 从连接池中获取一个连接
connection = milvus_pool.get_connection()
# 3. 初始化集合管理器
collection_name = MS_COLLECTION_NAME
collection_manager = MilvusCollectionManager(collection_name)
# 4. 判断集合是否存在,存在则删除
if utility.has_collection(collection_name):
print(f"集合 '{collection_name}' 已存在,正在删除...")
utility.drop_collection(collection_name)
print(f"集合 '{collection_name}' 已删除。")
# 5. 定义集合的字段和模式
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="tags", dtype=DataType.JSON), # 改为JSON类型存储多个标签
FieldSchema(name="user_input", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="timestamp", dtype=DataType.VARCHAR, max_length=32),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=MS_DIMENSION)
]
schema_description = "Chat records collection with tags , user_input, and timestamp"
# 6. 创建集合
print(f"正在创建集合 '{collection_name}'...")
collection_manager.create_collection(fields, schema_description)
print(f"集合 '{collection_name}' 创建成功。")
# 7. 释放连接
milvus_pool.release_connection(connection)
# 8. 关闭连接池
milvus_pool.close()

@ -1,27 +0,0 @@
from Backup.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager
from Config.Config import *
# 1. 使用连接池管理 Milvus 连接
milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS)
# 2. 从连接池中获取一个连接
connection = milvus_pool.get_connection()
# 3. 初始化集合管理器
collection_name = MS_COLLECTION_NAME
collection_manager = MilvusCollectionManager(collection_name)
# 4. 创建索引
index_params = {
"index_type": "IVF_FLAT", # 使用 IVF_FLAT 索引类型
"metric_type": "L2", # 使用 L2 距离度量方式
"params": {"nlist": 128} # 设置 IVF_FLAT 的 nlist 参数
}
collection_manager.create_index("embedding", index_params) # 为 embedding 字段创建索引
print(f"集合 '{collection_name}''embedding' 字段索引创建成功。")
# 5. 释放连接
milvus_pool.release_connection(connection)
# 6. 关闭连接池
milvus_pool.close()

@ -1,71 +0,0 @@
import os
from Util.SplitDocxUtil import SplitDocxUtil
def split_into_blocks(text):
"""按行遍历文本,发现'问题X''话题X'时开始分割,但去除这些前缀字符串"""
blocks = []
current_block = []
in_block = False
for line in text.splitlines():
if line.startswith(('问题', '话题')) and any(c.isdigit() for c in line[:5]):
if in_block:
blocks.append('\n'.join(current_block))
current_block = []
in_block = True
# 去除前缀字符串
line = line[line.find(' ')+1:] if ' ' in line else ''
if in_block and line: # 只添加非空行
current_block.append(line)
if current_block:
blocks.append('\n'.join(current_block))
return [(i+1, block) for i, block in enumerate(blocks)]
def process_document(input_path, output_dir):
"""处理文档主函数"""
text = SplitDocxUtil.read_docx(input_path)
if not text:
print("无法读取输入文件内容")
return False
# 确保输出目录存在并清空目录
if os.path.exists(output_dir):
for file in os.listdir(output_dir):
os.remove(os.path.join(output_dir, file))
os.makedirs(output_dir, exist_ok=True)
chunks = split_into_blocks(text)
print(f"共分割出{len(chunks)}个段落块")
saved_count = 0
for chunk_num, chunk in chunks:
chunk = chunk.strip() # 确保去除空白字符
output_file = os.path.join(output_dir, f"{chunk_num}.txt")
if save_to_txt(chunk, output_file, mode='w'):
saved_count += 1
print(f"处理完成,共保存{saved_count}个文件到目录: {output_dir}")
return saved_count > 0
# 保留原有的save_to_txt函数
def save_to_txt(content, file_path, mode='w'):
"""将内容保存到文本文件"""
try:
with open(file_path, mode, encoding='utf-8') as f:
f.write(content)
return True
except Exception as e:
print(f"保存文件{file_path}时出错: {str(e)}")
return False
if __name__ == "__main__":
input_file = '../../static/Txt/小学数学教学中的若干问题_MATH_1.docx'
#input_file = '../static/Txt/小学数学知识点_MATH_2.docx'
#input_file = '../static/Txt/高中文言文_CHINESE_1.docx'
output_dir = '../Txt/processed_chunks'
process_document(input_file, output_dir)

@ -1,72 +0,0 @@
from Config.Config import *
from Backup.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager
from gensim.models import KeyedVectors
import jieba
import os
import time
# 需要进行标记的标签
selectedTags = ["CHINESE_DATA_1", "高中语文文言文"]
# 1. 加载预训练的 Word2Vec 模型
model_path = MS_MODEL_PATH
model = KeyedVectors.load_word2vec_format(model_path, binary=False, limit=MS_MODEL_LIMIT)
print(f"模型加载成功,词向量维度: {model.vector_size}")
# 功能:将文本转换为嵌入向量
def text_to_embedding(text):
words = jieba.lcut(text)
embeddings = [model[word] for word in words if word in model]
if embeddings:
return sum(embeddings) / len(embeddings)
return [0.0] * model.vector_size
# 2. 使用连接池管理 Milvus 连接
milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS)
connection = milvus_pool.get_connection()
# 3. 初始化集合管理器
collection_name = MS_COLLECTION_NAME
collection_manager = MilvusCollectionManager(collection_name)
# 4. 处理processed_chunks目录下的所有文件
txt_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'Txt', 'processed_chunks')
for filename in os.listdir(txt_dir):
if filename.endswith('.txt'):
filepath = os.path.join(txt_dir, filename)
with open(filepath, 'r', encoding='utf-8') as f:
# 只读取第一行作为向量计算
first_line = f.readline().strip()
# 读取全部内容用于后续查询
full_content = first_line + '\n' + f.read()
if not first_line:
print(f"跳过空文件: {filename}")
continue
print(f"正在处理文件: {filename}")
# 5. 获取当前时间和会话ID
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
tags = {"tags": selectedTags, "full_content": full_content} # 添加完整内容
# 6. 将第一行文本转换为嵌入向量
embedding = text_to_embedding(first_line)
# 7. 插入数据
entities = [
[tags], # tags
[first_line], # user_input
[timestamp], # timestamp
[embedding] # embedding
]
collection_manager.insert_data(entities)
print(f"文件 {filename} 数据插入成功")
# 8. 释放连接 (移出循环外)
milvus_pool.release_connection(connection)
milvus_pool.close()
print("所有文件处理完成")

@ -1,52 +0,0 @@
from Backup.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager
from Config.Config import *
# 1. 使用连接池管理 Milvus 连接
milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS)
# 2. 从连接池中获取一个连接
connection = milvus_pool.get_connection()
# 3. 初始化集合管理器
collection_name = MS_COLLECTION_NAME
collection_manager = MilvusCollectionManager(collection_name)
# 4. 加载集合到内存
collection_manager.load_collection()
print(f"集合 '{collection_name}' 已加载到内存。")
# 5. 直接在代码中指定要查询的标签
#query_tag = "MATH_DATA_1" # 可以修改为MATH_DATA_2或其他需要的标签
query_tag = "MATH_DATA_2" # 可以修改为MATH_DATA_2或其他需要的标签
expr = f"array_contains(tags['tags'], '{query_tag}')"
print(f"查询表达式: {expr}")
# 6. 查询数据
try:
results = collection_manager.collection.query(
expr=expr,
output_fields=["id", "tags", "user_input", "timestamp", "embedding"],
limit=1000
)
print(f"查询标签 '{query_tag}' 结果:")
if results:
for result in results:
try:
print(f"ID: {result['id']}")
print(f"标签: {result['tags']}")
print(f"用户问题: {result['user_input']}")
print(f"时间: {result['timestamp']}")
print(f"向量: {result['embedding'][:5]}...")
print("-" * 40)
except Exception as e:
print(f"处理结果失败: {e}")
else:
print(f"未找到标签为 '{query_tag}' 的数据。")
except Exception as e:
print(f"查询失败: {e}")
# 7. 释放连接
milvus_pool.release_connection(connection)
# 8. 关闭连接池
milvus_pool.close()

@ -1,92 +0,0 @@
import time
import jieba # 导入 jieba 分词库
from Backup.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager
from Config.Config import *
from gensim.models import KeyedVectors
# 1. 加载预训练的 Word2Vec 模型
model_path = MS_MODEL_PATH # 替换为你的 Word2Vec 模型路径
model = KeyedVectors.load_word2vec_format(model_path, binary=False, limit=MS_MODEL_LIMIT)
print(f"模型加载成功,词向量维度: {model.vector_size}")
# 将文本转换为嵌入向量
def text_to_embedding(text):
words = jieba.lcut(text) # 使用 jieba 分词
print(f"文本: {text}, 分词结果: {words}")
embeddings = [model[word] for word in words if word in model]
print(f"有效词向量数量: {len(embeddings)}")
if embeddings:
avg_embedding = sum(embeddings) / len(embeddings)
print(f"生成的平均向量: {avg_embedding[:5]}...") # 打印前 5 维
return avg_embedding
else:
print("未找到有效词,返回零向量")
return [0.0] * model.vector_size
# 2. 使用连接池管理 Milvus 连接
milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS)
# 3. 从连接池中获取一个连接
connection = milvus_pool.get_connection()
# 4. 初始化集合管理器
collection_name = MS_COLLECTION_NAME
collection_manager = MilvusCollectionManager(collection_name)
# 5. 加载集合到内存
collection_manager.load_collection()
print(f"集合 '{collection_name}' 已加载到内存。")
# 6. 输入一句话
input_text = "小学数学中有哪些模型?"
# 7. 将文本转换为嵌入向量
current_embedding = text_to_embedding(input_text)
# 8. 查询与当前对话最相关的历史对话
start_time = time.time()
search_params = {
"metric_type": "L2", # 使用 L2 距离度量方式
"params": {"nprobe": MS_NPROBE} # 设置 IVF_FLAT 的 nprobe 参数
}
# 哪些文档查询,哪些不查询,我说了算!
# 这样的话,我就可以打多个标签了!
expr = "array_contains(tags['tags'], 'MATH_DATA_1')"
results = collection_manager.search(
current_embedding,
search_params,
expr=expr, # 使用in操作符
limit=5
)
end_time = time.time()
# 9. 输出查询结果
print("最相关的历史对话:")
if results:
for hits in results:
for hit in hits:
try:
# 查询非向量字段
record = collection_manager.query_by_id(hit.id)
print(f"ID: {hit.id}")
print(f"标签: {record['tags']}")
print(f"用户问题: {record['user_input']}")
print(f"时间: {record['timestamp']}")
print(f"距离: {hit.distance}")
print("-" * 40) # 分隔线
except Exception as e:
print(f"查询失败: {e}")
else:
print("未找到相关历史对话,请检查查询参数或数据。")
# 10. 输出查询耗时
print(f"查询耗时: {end_time - start_time:.4f}")
# 11. 释放连接
milvus_pool.release_connection(connection)
# 12. 关闭连接池
milvus_pool.close()

@ -1,265 +0,0 @@
import os
import subprocess
import tempfile
import urllib.parse
import uuid
from contextlib import asynccontextmanager
from io import BytesIO
from logging.handlers import RotatingFileHandler
from typing import List
import jieba # 导入 jieba 分词库
import uvicorn
from fastapi import FastAPI, Request, HTTPException
from fastapi.staticfiles import StaticFiles
from gensim.models import KeyedVectors
from pydantic import BaseModel, Field, ValidationError
from starlette.responses import StreamingResponse
from Config.Config import MS_MODEL_PATH, MS_MODEL_LIMIT, MS_HOST, MS_PORT, MS_MAX_CONNECTIONS, MS_NPROBE, \
MS_COLLECTION_NAME
from Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager
from Milvus.Utils.MilvusConnectionPool import *
from Milvus.Utils.MilvusConnectionPool import MilvusConnectionPool
from Util.ALiYunUtil import ALiYunUtil
# 初始化日志
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = RotatingFileHandler('../Logs/start.log', maxBytes=1024 * 1024, backupCount=5)
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
# 1. 加载预训练的 Word2Vec 模型
model = KeyedVectors.load_word2vec_format(MS_MODEL_PATH, binary=False, limit=MS_MODEL_LIMIT)
logger.info(f"模型加载成功,词向量维度: {model.vector_size}")
# 将HTML文件转换为Word文件
def html_to_word_pandoc(html_file, output_file):
subprocess.run(['pandoc', html_file, '-o', output_file])
@asynccontextmanager
async def lifespan(app: FastAPI):
# 初始化Milvus连接池
app.state.milvus_pool = MilvusConnectionPool(host=MS_HOST, port=MS_PORT, max_connections=MS_MAX_CONNECTIONS)
# 初始化集合管理器
app.state.collection_manager = MilvusCollectionManager(MS_COLLECTION_NAME)
app.state.collection_manager.load_collection()
# 初始化阿里云大模型工具
app.state.aliyun_util = ALiYunUtil()
yield
# 关闭Milvus连接池
app.state.milvus_pool.close()
app = FastAPI(lifespan=lifespan)
# 挂载静态文件目录
app.mount("../static", StaticFiles(directory="Static"), name="static")
# 将文本转换为嵌入向量
def text_to_embedding(text):
words = jieba.lcut(text) # 使用 jieba 分词
print(f"文本: {text}, 分词结果: {words}")
embeddings = [model[word] for word in words if word in model]
logger.info(f"有效词向量数量: {len(embeddings)}")
if embeddings:
avg_embedding = sum(embeddings) / len(embeddings)
logger.info(f"生成的平均向量: {avg_embedding[:5]}...") # 打印前 5 维
return avg_embedding
else:
logger.warning("未找到有效词,返回零向量")
return [0.0] * model.vector_size
async def generate_stream(client, milvus_pool, collection_manager, query, documents):
# 从连接池获取连接
connection = milvus_pool.get_connection()
try:
# 1. 将查询文本转换为向量
current_embedding = text_to_embedding(query)
# 2. 搜索相关数据
search_params = {
"metric_type": "L2", # 使用 L2 距离度量方式
"params": {"nprobe": MS_NPROBE} # 设置 IVF_FLAT 的 nprobe 参数
}
# 动态生成expr表达式
if documents:
conditions = [f"array_contains(tags['tags'], '{doc}')" for doc in documents]
expr = " OR ".join(conditions)
else:
expr = "" # 如果没有选择文档,返回空字符串
# 7. 将文本转换为嵌入向量
results = collection_manager.search(current_embedding,
search_params,
expr=expr, # 使用in操作符
limit=5) # 返回 5 条结果
# 3. 处理搜索结果
logger.info("最相关的知识库内容:")
context = ""
if results:
for hits in results:
for hit in hits:
try:
# 查询非向量字段
record = collection_manager.query_by_id(hit.id)
if hit.distance < 0.88: # 设置距离阈值
logger.info(f"ID: {hit.id}")
logger.info(f"标签: {record['tags']}")
logger.info(f"用户问题: {record['user_input']}")
logger.info(f"时间: {record['timestamp']}")
logger.info(f"距离: {hit.distance}")
logger.info("-" * 40) # 分隔线
# 获取完整内容
full_content = record['tags'].get('full_content', record['user_input'])
context = context + full_content
else:
logger.warning(f"距离太远,忽略此结果: {hit.id}")
logger.info(f"标签: {record['tags']}")
logger.info(f"用户问题: {record['user_input']}")
logger.info(f"时间: {record['timestamp']}")
logger.info(f"距离: {hit.distance}")
continue
except Exception as e:
logger.error(f"查询失败: {e}")
else:
logger.warning("未找到相关历史对话,请检查查询参数或数据。")
prompt = f"""
信息检索与回答助手
根据以下关于'{query}'的相关信息
基本信息
- 语言: 中文
- 描述: 根据提供的材料检索信息并回答问题
- 特点: 快速准确提取关键信息清晰简洁地回答
相关信息
{context}
回答要求
1. 依托给定的资料快速准确地回答问题可以添加一些额外的信息但请勿重复内容
2. 如果未提供相关信息请不要回答
3. 如果发现相关信息与原来的问题契合度低也不要回答
4. 使用HTML格式返回包含适当的段落列表和标题标签
5. 确保内容结构清晰便于前端展示
"""
# 调用阿里云大模型
if len(context) > 0:
html_content = client.chat(prompt)
yield {"data": html_content}
else:
yield {"data": "没有在知识库中找到相关的信息,无法回答此问题。"}
except Exception as e:
yield {"data": f"生成报告时出错: {str(e)}"}
finally:
# 释放连接
milvus_pool.release_connection(connection)
"""
http://10.10.21.22:8000/static/ai.html
知识库中有的内容
小学数学中有哪些模型
帮我写一下 如何理解点线的教学设计
知识库中没有的内容
你知道黄海是谁吗
"""
class QueryRequest(BaseModel):
query: str = Field(..., description="用户查询的问题")
documents: List[str] = Field(..., description="用户上传的文档")
class SaveWordRequest(BaseModel):
html: str = Field(..., description="要保存为Word的HTML内容")
@app.post("/api/save-word")
async def save_to_word(request: Request):
temp_html = None
output_file = None
try:
# Parse request data
try:
data = await request.json()
html_content = data.get('html_content', '')
if not html_content:
raise ValueError("Empty HTML content")
except Exception as e:
logger.error(f"Request parsing failed: {str(e)}")
raise HTTPException(status_code=400, detail=f"Invalid request: {str(e)}")
# 创建临时HTML文件
temp_html = os.path.join(tempfile.gettempdir(), uuid.uuid4().hex + ".html")
with open(temp_html, "w", encoding="utf-8") as f:
f.write(html_content)
# 使用pandoc转换
output_file = os.path.join(tempfile.gettempdir(), "小学数学问答.docx")
subprocess.run(['pandoc', temp_html, '-o', output_file], check=True)
# 读取生成的Word文件
with open(output_file, "rb") as f:
stream = BytesIO(f.read())
# 返回响应
encoded_filename = urllib.parse.quote("小学数学问答.docx")
return StreamingResponse(
stream,
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
headers={"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}"})
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error")
finally:
# 清理临时文件
try:
if temp_html and os.path.exists(temp_html):
os.remove(temp_html)
if output_file and os.path.exists(output_file):
os.remove(output_file)
except Exception as e:
logger.warning(f"Failed to clean up temp files: {str(e)}")
@app.post("/api/rag")
async def rag_stream(request: Request):
try:
data = await request.json()
query_request = QueryRequest(**data)
except ValidationError as e:
logger.error(f"请求体验证失败: {e.errors()}")
raise HTTPException(status_code=422, detail=e.errors())
except Exception as e:
logger.error(f"请求解析失败: {str(e)}")
raise HTTPException(status_code=400, detail="无效的请求格式")
"""RAG+ALiYun接口"""
async for chunk in generate_stream(
request.app.state.aliyun_util,
request.app.state.milvus_pool,
request.app.state.collection_manager,
query_request.query,
query_request.documents
):
return chunk
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)

@ -1,4 +1,4 @@
# MYSQL配置信息
# MYSQL配置信息 【暂未使用到】
MYSQL_HOST = "10.10.14.210"
MYSQL_PORT = 22066
MYSQL_USER = "root"
@ -11,31 +11,9 @@ ES_CONFIG = {
"basic_auth": ("elastic", "jv9h8uwRrRxmDi1dq6u8"),
"verify_certs": False,
"ssl_show_warn": False,
"index_name": "knowledge_base"
"index_name": "ds_kb"
}
# Milvus 服务器的主机地址
MS_HOST = "10.10.14.207"
# Milvus 服务器的端口号
MS_PORT = "19530"
# Milvus 集合的名称
MS_COLLECTION_NAME = "ds_collection"
# Milvus 连接池的最大连接数
MS_MAX_CONNECTIONS = 50
# 腾讯 AI Lab 中文词向量模型的路径
MS_MODEL_PATH = "D:/Tencent_AILab_ChineseEmbedding/Tencent_AILab_ChineseEmbedding.txt"
# 加载词向量模型时限制的词汇数量
MS_MODEL_LIMIT = 10000
# 词向量的维度(腾讯 AI Lab 中文词向量模型的维度为 200
MS_DIMENSION = 200
# Milvus 搜索时的 nprobe 参数,用于控制搜索的精度和性能
MS_NPROBE = 100
# DeepSeek
DEEPSEEK_API_KEY = 'sk-44ae895eeb614aa1a9c6460579e322f1'
DEEPSEEK_URL = 'https://api.deepseek.com'
# 阿里云中用来调用 deepseek v3 的密钥【驿来特】
MODEL_API_KEY = "sk-f6da0c787eff4b0389e4ad03a35a911f"
MODEL_NAME = "qwen-plus"

Loading…
Cancel
Save