From 5056cfe2c279df6403d057a58c3439c480136358 Mon Sep 17 00:00:00 2001 From: HuangHai <10402852@qq.com> Date: Fri, 27 Jun 2025 14:20:24 +0800 Subject: [PATCH] 'commit' --- dsRag/ElasticSearch/T6_DeepSeekRag.py | 114 +++++++++++++++++++ dsRag/ElasticSearch/T7_XiangLiangQuery.py | 127 ++++++++++++++++++++++ dsRag/StartEs.py | 78 ++++++++++--- 3 files changed, 304 insertions(+), 15 deletions(-) create mode 100644 dsRag/ElasticSearch/T6_DeepSeekRag.py create mode 100644 dsRag/ElasticSearch/T7_XiangLiangQuery.py diff --git a/dsRag/ElasticSearch/T6_DeepSeekRag.py b/dsRag/ElasticSearch/T6_DeepSeekRag.py new file mode 100644 index 00000000..9542e0f5 --- /dev/null +++ b/dsRag/ElasticSearch/T6_DeepSeekRag.py @@ -0,0 +1,114 @@ +""" +conda activate rag +pip install openai +""" +from elasticsearch import Elasticsearch +from openai import OpenAI +from Config import Config + +# 初始化ES连接 +es = Elasticsearch( + hosts=Config.ES_CONFIG['hosts'], + basic_auth=Config.ES_CONFIG['basic_auth'], + verify_certs=Config.ES_CONFIG['verify_certs'] +) + +# 初始化DeepSeek客户端 +client = OpenAI( + api_key=Config.DEEPSEEK_API_KEY, + base_url=Config.DEEPSEEK_URL +) + +def generate_report(query, context): + """使用DeepSeek生成报告""" + prompt = f"""根据以下关于'{query}'的相关信息,整理一份结构化的报告: +要求: +1. 分章节组织内容 +2. 包含关键数据和事实 +3. 语言简洁专业 + +相关信息: +{context}""" + + try: + response = client.chat.completions.create( + model="deepseek-chat", + messages=[ + {"role": "system", "content": "你是一个专业的文档整理助手"}, + {"role": "user", "content": prompt} + ], + temperature=0.3, + stream=True + ) + + # 流式输出处理 + full_response = "" + for chunk in response: + if chunk.choices[0].delta.content: + content = chunk.choices[0].delta.content + print(content, end="", flush=True) + full_response += content + + return full_response + except Exception as e: + print(f"生成报告时出错: {str(e)}") + return "" + +def process_query(query): + """处理用户查询并生成报告""" + print(f"正在搜索与'{query}'相关的数据...") + context = search_related_data(query) + print(f"找到{len(context.split(chr(10)+chr(10)))}条相关数据") + + print("正在生成报告...") + report = generate_report(query, context) + + return report + +def search_related_data(query): + """搜索与查询相关的数据""" + # 向量搜索 + vector_results = es.search( + index=Config.ES_CONFIG['index_name'], + body={ + "query": { + "match": { + "content": { + "query": query, + "analyzer": "ik_smart" # 指定分词器 + } + } + }, + "size": 5 + } + ) + + # 文本精确搜索 + text_results = es.search( + index="raw_texts", + body={ + "query": { + "match": { + "text.keyword": query + } + }, + "size": 5 + } + ) + + # 合并结果 + context = "" + for hit in vector_results['hits']['hits']: + context += f"向量相似度结果(score={hit['_score']}):\n{hit['_source']['text']}\n\n" + + for hit in text_results['hits']['hits']: + context += f"文本精确匹配结果(score={hit['_score']}):\n{hit['_source']['text']}\n\n" + + return context + +if __name__ == "__main__": + #user_query = input("请输入您的查询要求:") + user_query = "小学数学" + report = process_query(user_query) + print("\n=== 生成的报告 ===\n") + print(report) \ No newline at end of file diff --git a/dsRag/ElasticSearch/T7_XiangLiangQuery.py b/dsRag/ElasticSearch/T7_XiangLiangQuery.py new file mode 100644 index 00000000..56fed378 --- /dev/null +++ b/dsRag/ElasticSearch/T7_XiangLiangQuery.py @@ -0,0 +1,127 @@ +import os +import logging +from logging.handlers import RotatingFileHandler + +import jieba +from gensim.models import KeyedVectors + +from Config.Config import ES_CONFIG, MS_MODEL_PATH, MS_MODEL_LIMIT +from ElasticSearch.Utils.ElasticsearchConnectionPool import ElasticsearchConnectionPool + +# 初始化日志 +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +# 确保日志目录存在 +os.makedirs('Logs', exist_ok=True) +handler = RotatingFileHandler('Logs/start.log', maxBytes=1024 * 1024, backupCount=5) +handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) +logger.addHandler(handler) + +# 1. 加载预训练的 Word2Vec 模型 +model = KeyedVectors.load_word2vec_format(MS_MODEL_PATH, binary=False, limit=MS_MODEL_LIMIT) +logger.info(f"模型加载成功,词向量维度: {model.vector_size}") + + + +def init_es_pool(): + # 初始化Elasticsearch连接池 + es_pool = ElasticsearchConnectionPool( + hosts=ES_CONFIG["hosts"], + basic_auth=ES_CONFIG["basic_auth"], + verify_certs=ES_CONFIG["verify_certs"], + max_connections=50 + ) + logger.info("Elasticsearch连接池初始化完成") + return es_pool + + +# 将文本转换为嵌入向量 +def text_to_embedding(text): + words = jieba.lcut(text) # 使用 jieba 分词 + print(f"文本: {text}, 分词结果: {words}") + try: + embeddings = [model[word] for word in words if word in model] + logger.info(f"有效词向量数量: {len(embeddings)}") + if embeddings: + avg_embedding = sum(embeddings) / len(embeddings) + logger.info(f"生成的平均向量: {avg_embedding[:5]}...") # 打印前 5 维 + return avg_embedding + else: + logger.warning("未找到有效词,返回零向量") + return [0.0] * model.vector_size + except Exception as e: + logger.error(f"向量转换失败: {str(e)}") + return [0.0] * model.vector_size + + +def main(): + # 初始化ES连接池 + es_pool = init_es_pool() + + # 测试查询 + query = "小学数学教学中的若干问题" + print(f"\n=== 开始执行查询 ===") + print(f"原始查询文本: {query}") + + # 执行混合搜索 + es_conn = es_pool.get_connection() + try: + # 向量搜索 + print("\n=== 向量搜索阶段 ===") + print("1. 文本分词和向量化处理中...") + query_embedding = text_to_embedding(query) + print(f"2. 生成的查询向量维度: {len(query_embedding)}") + print(f"3. 前5维向量值: {query_embedding[:5]}") + + print("4. 正在执行Elasticsearch向量搜索...") + vector_results = es_conn.search( + index=ES_CONFIG['index_name'], + body={ + "query": { + "script_score": { + "query": {"match_all": {}}, + "script": { + "source": "double score = cosineSimilarity(params.query_vector, 'embedding'); return score >= 0 ? score : 0", + "params": {"query_vector": query_embedding} + } + } + }, + "size": 5 + } + ) + print(f"5. 向量搜索结果数量: {len(vector_results['hits']['hits'])}") + + # 文本精确搜索 + print("\n=== 文本精确搜索阶段 ===") + print("1. 正在执行Elasticsearch文本精确搜索...") + text_results = es_conn.search( + index="raw_texts", + body={ + "query": { + "match": { + "text.keyword": query + } + }, + "size": 5 + } + ) + print(f"2. 文本搜索结果数量: {len(text_results['hits']['hits'])}") + + # 打印详细结果 + print("\n=== 最终搜索结果 ===") + print("向量搜索结果:") + for i, hit in enumerate(vector_results['hits']['hits']): + print(f" {i+1}. 文档ID: {hit['_id']}, 相似度分数: {hit['_score']:.2f}") + + print("\n文本精确搜索结果:") + for i, hit in enumerate(text_results['hits']['hits']): + print(f" {i+1}. 文档ID: {hit['_id']}, 匹配分数: {hit['_score']:.2f}") + + finally: + es_pool.release_connection(es_conn) + + # 关闭连接池 + es_pool.close() + +if __name__ == "__main__": + main() diff --git a/dsRag/StartEs.py b/dsRag/StartEs.py index 2dfd8d78..94b9972c 100644 --- a/dsRag/StartEs.py +++ b/dsRag/StartEs.py @@ -79,14 +79,18 @@ app.mount("/static", StaticFiles(directory="Static"), name="static") def text_to_embedding(text): words = jieba.lcut(text) # 使用 jieba 分词 print(f"文本: {text}, 分词结果: {words}") - embeddings = [model[word] for word in words if word in model] - logger.info(f"有效词向量数量: {len(embeddings)}") - if embeddings: - avg_embedding = sum(embeddings) / len(embeddings) - logger.info(f"生成的平均向量: {avg_embedding[:5]}...") # 打印前 5 维 - return avg_embedding - else: - logger.warning("未找到有效词,返回零向量") + try: + embeddings = [model[word] for word in words if word in model] + logger.info(f"有效词向量数量: {len(embeddings)}") + if embeddings: + avg_embedding = sum(embeddings) / len(embeddings) + logger.info(f"生成的平均向量: {avg_embedding[:5]}...") # 打印前 5 维 + return avg_embedding + else: + logger.warning("未找到有效词,返回零向量") + return [0.0] * model.vector_size + except Exception as e: + logger.error(f"向量转换失败: {str(e)}") return [0.0] * model.vector_size @@ -111,13 +115,54 @@ async def generate_stream(client, es_pool, collection_manager, query, documents) expr = None # 3. 执行混合搜索 - results = collection_manager.search( - es_conn, - current_embedding, - search_params={"k": 5}, # 返回5个结果 - expr=expr, - limit=5 + if not isinstance(current_embedding, list): + logger.error(f"current_embedding类型错误: {type(current_embedding)}") + current_embedding = [0.0] * model.vector_size + + # 向量搜索 + vector_results = es_conn.search( + index=ES_CONFIG['index_name'], + body={ + "query": { + "match": { + "content": { + "query": query, + "analyzer": "ik_smart" # 指定分词器 + } + } + }, + "size": 5 + } + ) + + # 文本精确搜索 + text_results = es_conn.search( + index="raw_texts", + body={ + "query": { + "match": { + "text.keyword": query + } + }, + "size": 5 + } ) + + # 合并结果 + results = [] + for hit in vector_results['hits']['hits']: + results.append({ + "_source": hit['_source'], + "_score": hit['_score'], + "_id": hit['_id'] + }) + + for hit in text_results['hits']['hits']: + results.append({ + "_source": hit['_source'], + "_score": hit['_score'], + "_id": hit['_id'] + }) logger.info(f"搜索返回结果数量: {len(results) if results else 0}") # 3. 处理搜索结果 @@ -138,7 +183,10 @@ async def generate_stream(client, es_pool, collection_manager, query, documents) logger.info("-" * 40) full_content = source['tags'].get('full_content', source['user_input']) - context = context + full_content + if isinstance(full_content, str): + context = context + full_content + else: + logger.warning(f"Unexpected content type: {type(full_content)}") else: logger.warning(f"分数太低,忽略此结果: {hit['_id']}") continue