main
HuangHai 4 weeks ago
parent c024386037
commit 7ff869e4a5

@ -50,7 +50,7 @@ search_params = {
"params": {"nprobe": MS_NPROBE} # 设置 IVF_FLAT 的 nprobe 参数
}
start_time = time.time()
results = collection_manager.search(current_embedding, search_params, limit=2) # 返回 2 条结果
results = collection_manager.search(current_embedding, search_params, limit=10) # 返回 2 条结果
end_time = time.time()
# 9. 输出查询结果

@ -1,100 +0,0 @@
from elasticsearch import Elasticsearch
from Config.Config import ES_CONFIG
# 初始化ES连接
es = Elasticsearch(
hosts=ES_CONFIG["hosts"],
basic_auth=ES_CONFIG["basic_auth"],
verify_certs=ES_CONFIG["verify_certs"],
ssl_show_warn=ES_CONFIG["ssl_show_warn"]
)
def get_vector_mapping(dims=200):
"""获取向量索引的mapping结构"""
return {
"properties": {
"content": {
"type": "text",
"analyzer": "ik_smart",
"search_analyzer": "ik_smart",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 8192
}
}
},
"vector": {
"type": "dense_vector",
"dims": dims,
"index": True,
"similarity": "cosine"
},
"timestamp": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
}
}
}
def get_text_mapping():
"""获取文本索引的mapping结构"""
return {
"properties": {
"raw_text": {
"type": "text",
"analyzer": "ik_smart",
"fielddata": True
},
"timestamp": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
}
}
}
def manage_index(action, index_type="vector", index_name=None, dims=200):
"""管理Elasticsearch索引"""
if index_name is None:
index_name = "knowledge_base" if index_type == "vector" else "raw_texts"
if action == "create":
mapping = get_vector_mapping(dims) if index_type == "vector" else get_text_mapping()
try:
if es.indices.exists(index=index_name):
print(f"索引 {index_name} 已存在")
return False
es.indices.create(index=index_name, body={"mappings": mapping})
print(f"索引 {index_name} 创建成功(使用ik_smart分词器)")
return True
except Exception as e:
print(f"创建索引失败: {str(e)}")
raise
elif action == "delete":
try:
if not es.indices.exists(index=index_name):
print(f"索引 {index_name} 不存在")
return False
es.indices.delete(index=index_name)
print(f"索引 {index_name} 删除成功")
return True
except Exception as e:
print(f"删除索引失败: {str(e)}")
raise
else:
raise ValueError("action参数必须是'create''delete'")
# 使用示例
if __name__ == "__main__":
# 先删除现有索引(如果存在)
manage_index("delete", "vector")
manage_index("delete", "text")
# 创建新的向量索引
manage_index("create", "vector", dims=200)
# 创建新的原始文本索引
manage_index("create", "text")

@ -1,82 +0,0 @@
import os
import datetime
import logging
from elasticsearch import Elasticsearch
from Config.Config import ES_CONFIG
from Util.EmbeddingUtil import text_to_embedding
# 在文件开头添加logger配置
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# 创建控制台handler并设置格式
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
def split_paragraphs(text):
"""按段落分割文本"""
# 按两个换行符分割段落
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
return paragraphs
def save_to_es(text):
"""保存向量化文本和原始文本到ES"""
vector = text_to_embedding(text)
if vector is None:
logger.warning(f"跳过无法生成向量的文本: {text}")
return
doc = {
'text': text,
'vector': vector,
'timestamp': datetime.datetime.now().isoformat(),
'analyzer': 'ik_smart'
}
try:
es.index(index='knowledge_base', body=doc)
es.index(index='raw_texts', body={'raw_text': text})
except Exception as e:
logger.error(f"保存文本到ES失败: {e}")
def process_directory(dir_path):
"""处理目录下所有文本文件"""
total_paragraphs = 0
# 获取所有txt文件并按数字排序
files = [f for f in os.listdir(dir_path) if f.endswith('.txt')]
files.sort(key=lambda x: int(x.split('.')[0]))
file_count = len(files)
print(f"共发现{file_count}个文本文件需要处理")
for i, filename in enumerate(files, 1):
print(f"正在处理第{i}/{file_count}个文件: {filename}")
file_path = os.path.join(dir_path, filename)
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
paragraphs = split_paragraphs(content)
total_paragraphs += len(paragraphs)
for paragraph in paragraphs:
save_to_es(paragraph)
print(f"\n处理完成,共处理{file_count}个文件,保存{total_paragraphs}个段落")
if __name__ == '__main__':
es = Elasticsearch(
hosts=[ES_CONFIG['hosts']],
basic_auth=ES_CONFIG['basic_auth'],
verify_certs=ES_CONFIG['verify_certs'],
ssl_show_warn=ES_CONFIG['ssl_show_warn']
)
dir_path = '../Txt/processed_chunks' # T2_DocxProcessor.py生成的目录
process_directory(dir_path)

@ -1,115 +0,0 @@
"""
conda activate rag
pip install openai
"""
from elasticsearch import Elasticsearch
from openai import OpenAI
from Config import Config
# 初始化ES连接
es = Elasticsearch(
hosts=Config.ES_CONFIG['hosts'],
basic_auth=Config.ES_CONFIG['basic_auth'],
verify_certs=Config.ES_CONFIG['verify_certs']
)
# 初始化DeepSeek客户端
client = OpenAI(
api_key=Config.DEEPSEEK_API_KEY,
base_url=Config.DEEPSEEK_URL
)
def generate_report(query, context):
"""使用DeepSeek生成报告"""
prompt = f"""根据以下关于'{query}'的相关信息,整理一份结构化的报告:
要求
1. 分章节组织内容
2. 包含关键数据和事实
3. 语言简洁专业
相关信息
{context}"""
try:
response = client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system", "content": "你是一个专业的文档整理助手"},
{"role": "user", "content": prompt}
],
temperature=0.3,
stream=True
)
# 流式输出处理
full_response = ""
for chunk in response:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)
full_response += content
return full_response
except Exception as e:
print(f"生成报告时出错: {str(e)}")
return ""
def process_query(query):
"""处理用户查询并生成报告"""
print(f"正在搜索与'{query}'相关的数据...")
context = search_related_data(query)
print(f"找到{len(context.split(chr(10)+chr(10)))}条相关数据")
print("正在生成报告...")
report = generate_report(query, context)
return report
def search_related_data(query):
"""搜索与查询相关的数据"""
# 向量搜索
vector_results = es.search(
index=Config.ES_CONFIG['default_index'],
body={
"query": {
"match": {
"content": {
"query": query,
"analyzer": "ik_smart" # 指定分词器
}
}
},
"size": 5
}
)
# 文本精确搜索
text_results = es.search(
index="raw_texts",
body={
"query": {
"match": {
"text.keyword": query
}
},
"size": 5
}
)
# 合并结果
context = ""
for hit in vector_results['hits']['hits']:
context += f"向量相似度结果(score={hit['_score']}):\n{hit['_source']['text']}\n\n"
print(context)
for hit in text_results['hits']['hits']:
context += f"文本精确匹配结果(score={hit['_score']}):\n{hit['_source']['text']}\n\n"
print(context)
return context
if __name__ == "__main__":
#user_query = input("请输入您的查询要求:")
user_query = "小学数学中有哪些模型?"
report = process_query(user_query)
print(report)
Loading…
Cancel
Save