main
HuangHai 4 weeks ago
parent cbb5b56623
commit 5056cfe2c2

@ -0,0 +1,114 @@
"""
conda activate rag
pip install openai
"""
from elasticsearch import Elasticsearch
from openai import OpenAI
from Config import Config
# 初始化ES连接
es = Elasticsearch(
hosts=Config.ES_CONFIG['hosts'],
basic_auth=Config.ES_CONFIG['basic_auth'],
verify_certs=Config.ES_CONFIG['verify_certs']
)
# 初始化DeepSeek客户端
client = OpenAI(
api_key=Config.DEEPSEEK_API_KEY,
base_url=Config.DEEPSEEK_URL
)
def generate_report(query, context):
"""使用DeepSeek生成报告"""
prompt = f"""根据以下关于'{query}'的相关信息,整理一份结构化的报告:
要求
1. 分章节组织内容
2. 包含关键数据和事实
3. 语言简洁专业
相关信息
{context}"""
try:
response = client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system", "content": "你是一个专业的文档整理助手"},
{"role": "user", "content": prompt}
],
temperature=0.3,
stream=True
)
# 流式输出处理
full_response = ""
for chunk in response:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)
full_response += content
return full_response
except Exception as e:
print(f"生成报告时出错: {str(e)}")
return ""
def process_query(query):
"""处理用户查询并生成报告"""
print(f"正在搜索与'{query}'相关的数据...")
context = search_related_data(query)
print(f"找到{len(context.split(chr(10)+chr(10)))}条相关数据")
print("正在生成报告...")
report = generate_report(query, context)
return report
def search_related_data(query):
"""搜索与查询相关的数据"""
# 向量搜索
vector_results = es.search(
index=Config.ES_CONFIG['index_name'],
body={
"query": {
"match": {
"content": {
"query": query,
"analyzer": "ik_smart" # 指定分词器
}
}
},
"size": 5
}
)
# 文本精确搜索
text_results = es.search(
index="raw_texts",
body={
"query": {
"match": {
"text.keyword": query
}
},
"size": 5
}
)
# 合并结果
context = ""
for hit in vector_results['hits']['hits']:
context += f"向量相似度结果(score={hit['_score']}):\n{hit['_source']['text']}\n\n"
for hit in text_results['hits']['hits']:
context += f"文本精确匹配结果(score={hit['_score']}):\n{hit['_source']['text']}\n\n"
return context
if __name__ == "__main__":
#user_query = input("请输入您的查询要求:")
user_query = "小学数学"
report = process_query(user_query)
print("\n=== 生成的报告 ===\n")
print(report)

@ -0,0 +1,127 @@
import os
import logging
from logging.handlers import RotatingFileHandler
import jieba
from gensim.models import KeyedVectors
from Config.Config import ES_CONFIG, MS_MODEL_PATH, MS_MODEL_LIMIT
from ElasticSearch.Utils.ElasticsearchConnectionPool import ElasticsearchConnectionPool
# 初始化日志
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# 确保日志目录存在
os.makedirs('Logs', exist_ok=True)
handler = RotatingFileHandler('Logs/start.log', maxBytes=1024 * 1024, backupCount=5)
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
# 1. 加载预训练的 Word2Vec 模型
model = KeyedVectors.load_word2vec_format(MS_MODEL_PATH, binary=False, limit=MS_MODEL_LIMIT)
logger.info(f"模型加载成功,词向量维度: {model.vector_size}")
def init_es_pool():
# 初始化Elasticsearch连接池
es_pool = ElasticsearchConnectionPool(
hosts=ES_CONFIG["hosts"],
basic_auth=ES_CONFIG["basic_auth"],
verify_certs=ES_CONFIG["verify_certs"],
max_connections=50
)
logger.info("Elasticsearch连接池初始化完成")
return es_pool
# 将文本转换为嵌入向量
def text_to_embedding(text):
words = jieba.lcut(text) # 使用 jieba 分词
print(f"文本: {text}, 分词结果: {words}")
try:
embeddings = [model[word] for word in words if word in model]
logger.info(f"有效词向量数量: {len(embeddings)}")
if embeddings:
avg_embedding = sum(embeddings) / len(embeddings)
logger.info(f"生成的平均向量: {avg_embedding[:5]}...") # 打印前 5 维
return avg_embedding
else:
logger.warning("未找到有效词,返回零向量")
return [0.0] * model.vector_size
except Exception as e:
logger.error(f"向量转换失败: {str(e)}")
return [0.0] * model.vector_size
def main():
# 初始化ES连接池
es_pool = init_es_pool()
# 测试查询
query = "小学数学教学中的若干问题"
print(f"\n=== 开始执行查询 ===")
print(f"原始查询文本: {query}")
# 执行混合搜索
es_conn = es_pool.get_connection()
try:
# 向量搜索
print("\n=== 向量搜索阶段 ===")
print("1. 文本分词和向量化处理中...")
query_embedding = text_to_embedding(query)
print(f"2. 生成的查询向量维度: {len(query_embedding)}")
print(f"3. 前5维向量值: {query_embedding[:5]}")
print("4. 正在执行Elasticsearch向量搜索...")
vector_results = es_conn.search(
index=ES_CONFIG['index_name'],
body={
"query": {
"script_score": {
"query": {"match_all": {}},
"script": {
"source": "double score = cosineSimilarity(params.query_vector, 'embedding'); return score >= 0 ? score : 0",
"params": {"query_vector": query_embedding}
}
}
},
"size": 5
}
)
print(f"5. 向量搜索结果数量: {len(vector_results['hits']['hits'])}")
# 文本精确搜索
print("\n=== 文本精确搜索阶段 ===")
print("1. 正在执行Elasticsearch文本精确搜索...")
text_results = es_conn.search(
index="raw_texts",
body={
"query": {
"match": {
"text.keyword": query
}
},
"size": 5
}
)
print(f"2. 文本搜索结果数量: {len(text_results['hits']['hits'])}")
# 打印详细结果
print("\n=== 最终搜索结果 ===")
print("向量搜索结果:")
for i, hit in enumerate(vector_results['hits']['hits']):
print(f" {i+1}. 文档ID: {hit['_id']}, 相似度分数: {hit['_score']:.2f}")
print("\n文本精确搜索结果:")
for i, hit in enumerate(text_results['hits']['hits']):
print(f" {i+1}. 文档ID: {hit['_id']}, 匹配分数: {hit['_score']:.2f}")
finally:
es_pool.release_connection(es_conn)
# 关闭连接池
es_pool.close()
if __name__ == "__main__":
main()

@ -79,14 +79,18 @@ app.mount("/static", StaticFiles(directory="Static"), name="static")
def text_to_embedding(text):
words = jieba.lcut(text) # 使用 jieba 分词
print(f"文本: {text}, 分词结果: {words}")
embeddings = [model[word] for word in words if word in model]
logger.info(f"有效词向量数量: {len(embeddings)}")
if embeddings:
avg_embedding = sum(embeddings) / len(embeddings)
logger.info(f"生成的平均向量: {avg_embedding[:5]}...") # 打印前 5 维
return avg_embedding
else:
logger.warning("未找到有效词,返回零向量")
try:
embeddings = [model[word] for word in words if word in model]
logger.info(f"有效词向量数量: {len(embeddings)}")
if embeddings:
avg_embedding = sum(embeddings) / len(embeddings)
logger.info(f"生成的平均向量: {avg_embedding[:5]}...") # 打印前 5 维
return avg_embedding
else:
logger.warning("未找到有效词,返回零向量")
return [0.0] * model.vector_size
except Exception as e:
logger.error(f"向量转换失败: {str(e)}")
return [0.0] * model.vector_size
@ -111,13 +115,54 @@ async def generate_stream(client, es_pool, collection_manager, query, documents)
expr = None
# 3. 执行混合搜索
results = collection_manager.search(
es_conn,
current_embedding,
search_params={"k": 5}, # 返回5个结果
expr=expr,
limit=5
if not isinstance(current_embedding, list):
logger.error(f"current_embedding类型错误: {type(current_embedding)}")
current_embedding = [0.0] * model.vector_size
# 向量搜索
vector_results = es_conn.search(
index=ES_CONFIG['index_name'],
body={
"query": {
"match": {
"content": {
"query": query,
"analyzer": "ik_smart" # 指定分词器
}
}
},
"size": 5
}
)
# 文本精确搜索
text_results = es_conn.search(
index="raw_texts",
body={
"query": {
"match": {
"text.keyword": query
}
},
"size": 5
}
)
# 合并结果
results = []
for hit in vector_results['hits']['hits']:
results.append({
"_source": hit['_source'],
"_score": hit['_score'],
"_id": hit['_id']
})
for hit in text_results['hits']['hits']:
results.append({
"_source": hit['_source'],
"_score": hit['_score'],
"_id": hit['_id']
})
logger.info(f"搜索返回结果数量: {len(results) if results else 0}")
# 3. 处理搜索结果
@ -138,7 +183,10 @@ async def generate_stream(client, es_pool, collection_manager, query, documents)
logger.info("-" * 40)
full_content = source['tags'].get('full_content', source['user_input'])
context = context + full_content
if isinstance(full_content, str):
context = context + full_content
else:
logger.warning(f"Unexpected content type: {type(full_content)}")
else:
logger.warning(f"分数太低,忽略此结果: {hit['_id']}")
continue

Loading…
Cancel
Save