diff --git a/dsSchoolBuddy/ElasticSearch/T2_Vector.py b/dsSchoolBuddy/ElasticSearch/T2_Vector.py index 500524b1..774e3599 100644 --- a/dsSchoolBuddy/ElasticSearch/T2_Vector.py +++ b/dsSchoolBuddy/ElasticSearch/T2_Vector.py @@ -1,5 +1,5 @@ # pip install pydantic requests -from Util.VectorUtil import text_to_vector_db, query_vector_db +from ElasticSearch.Utils.VectorUtil import text_to_vector_db, query_vector_db def main(): diff --git a/dsSchoolBuddy/ElasticSearch/T7_XiangLiangQuery.py b/dsSchoolBuddy/ElasticSearch/T7_XiangLiangQuery.py index 5c2eda10..1bf022f4 100644 --- a/dsSchoolBuddy/ElasticSearch/T7_XiangLiangQuery.py +++ b/dsSchoolBuddy/ElasticSearch/T7_XiangLiangQuery.py @@ -1,76 +1,14 @@ import logging import warnings -import json -import requests from typing import List, Tuple, Dict -from elasticsearch import Elasticsearch - -from Config import Config -from Config.Config import ES_CONFIG, EMBED_MODEL_NAME, EMBED_BASE_URL, EMBED_API_KEY, RERANK_MODEL, RERANK_BASE_URL, RERANK_BINDING_API_KEY -from langchain_openai import OpenAIEmbeddings -from pydantic import SecretStr +from Config.Config import ES_CONFIG +from ElasticSearch.Utils.EsSearchUtil import EsSearchUtil # 初始化日志 logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) -# 抑制HTTPS相关警告 -warnings.filterwarnings('ignore', message='Connecting to .* using TLS with verify_certs=False is insecure') -warnings.filterwarnings('ignore', message='Unverified HTTPS request is being made to host') - - -def text_to_embedding(text: str) -> List[float]: - """ - 将文本转换为嵌入向量 - """ - embeddings = OpenAIEmbeddings( - model=EMBED_MODEL_NAME, - base_url=EMBED_BASE_URL, - api_key=SecretStr(EMBED_API_KEY) - ) - return embeddings.embed_query(text) - - -def rerank_results(query: str, results: List[Dict]) -> List[Tuple[Dict, float]]: - """ - 对搜索结果进行重排 - """ - if len(results) <= 1: - return [(doc, 1.0) for doc in results] - - # 准备重排请求数据 - rerank_data = { - "model": RERANK_MODEL, - "query": query, - "documents": [doc['_source']['user_input'] for doc in results], - "top_n": len(results) - } - - # 调用SiliconFlow API进行重排 - headers = { - "Content-Type": "application/json", - "Authorization": f"Bearer {RERANK_BINDING_API_KEY}" - } - - try: - response = requests.post(RERANK_BASE_URL, headers=headers, data=json.dumps(rerank_data)) - response.raise_for_status() - rerank_result = response.json() - - # 处理重排结果 - reranked_docs_with_scores = [] - if "results" in rerank_result: - for item in rerank_result["results"]: - doc_idx = item.get("index") - score = item.get("relevance_score", 0.0) - if 0 <= doc_idx < len(results): - reranked_docs_with_scores.append((results[doc_idx], score)) - return reranked_docs_with_scores - except Exception as e: - logger.error(f"重排失败: {str(e)}") - return [(doc, 1.0) for doc in results] - def merge_results(keyword_results: List[Tuple[Dict, float]], vector_results: List[Tuple[Dict, float]]) -> List[Tuple[Dict, float, str]]: """ @@ -97,11 +35,7 @@ def merge_results(keyword_results: List[Tuple[Dict, float]], vector_results: Lis if __name__ == "__main__": # 初始化EsSearchUtil - esClient = Elasticsearch( - hosts=Config.ES_CONFIG['hosts'], - basic_auth=Config.ES_CONFIG['basic_auth'], - verify_certs=False - ) + search_util = EsSearchUtil(ES_CONFIG) # 获取用户输入 user_query = input("请输入查询语句(例如:高性能的混凝土): ") @@ -114,83 +48,34 @@ if __name__ == "__main__": print(f"\n=== 开始执行查询 ===") print(f"原始查询文本: {user_query}") - # 执行搜索 - es_conn = esClient.es_pool.get_connection() try: # 1. 向量搜索 print("\n=== 向量搜索阶段 ===") print("1. 文本向量化处理中...") - query_embedding = text_to_embedding(user_query) + query_embedding = search_util.get_query_embedding(user_query) print(f"2. 生成的查询向量维度: {len(query_embedding)}") print(f"3. 前3维向量值: {query_embedding[:3]}") print("4. 正在执行Elasticsearch向量搜索...") - vector_results = es_conn.search( - index=ES_CONFIG['index_name'], - body={ - "query": { - "script_score": { - "query": { - "bool": { - "should": [ - { - "terms": { - "tags.tags": query_tags - } - } - ] if query_tags else {"match_all": {}}, - "minimum_should_match": 1 if query_tags else 0 - } - }, - "script": { - "source": "double score = cosineSimilarity(params.query_vector, 'embedding'); return score >= 0 ? score : 0", - "params": {"query_vector": query_embedding} - } - } - }, - "size": 5 - } - ) + vector_results = search_util.search_by_vector(query_embedding, k=5) vector_hits = vector_results['hits']['hits'] print(f"5. 向量搜索结果数量: {len(vector_hits)}") # 向量结果重排 print("6. 正在进行向量结果重排...") - reranked_vector_results = rerank_results(user_query, vector_hits) + reranked_vector_results = search_util.rerank_results(user_query, vector_hits) print(f"7. 重排后向量结果数量: {len(reranked_vector_results)}") # 2. 关键字搜索 print("\n=== 关键字搜索阶段 ===") print("1. 正在执行Elasticsearch关键字搜索...") - keyword_results = es_conn.search( - index=ES_CONFIG['index_name'], - body={ - "query": { - "bool": { - "must": [ - { - "match": { - "user_input": user_query - } - } - ] + ([ - { - "terms": { - "tags.tags": query_tags - } - } - ] if query_tags else []) - } - }, - "size": 5 - } - ) + keyword_results = search_util.text_search(user_query, size=5) keyword_hits = keyword_results['hits']['hits'] print(f"2. 关键字搜索结果数量: {len(keyword_hits)}") # 3. 合并结果 print("\n=== 合并搜索结果 ===") - # 为关键字结果添加默认分数1.0 + # 为关键字结果添加分数 keyword_results_with_scores = [(doc, doc['_score']) for doc in keyword_hits] merged_results = merge_results(keyword_results_with_scores, reranked_vector_results) print(f"合并后唯一结果数量: {len(merged_results)}") @@ -205,5 +90,3 @@ if __name__ == "__main__": except Exception as e: logger.error(f"搜索过程中发生错误: {str(e)}") print(f"搜索失败: {str(e)}") - finally: - esClient.es_pool.release_connection(es_conn) diff --git a/dsSchoolBuddy/ElasticSearch/Utils/EsSearchUtil.py b/dsSchoolBuddy/ElasticSearch/Utils/EsSearchUtil.py index c8a7473c..808bd36f 100644 --- a/dsSchoolBuddy/ElasticSearch/Utils/EsSearchUtil.py +++ b/dsSchoolBuddy/ElasticSearch/Utils/EsSearchUtil.py @@ -13,7 +13,7 @@ from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_openai import OpenAIEmbeddings from pydantic import SecretStr from Config import Config - +from typing import List, Tuple, Dict # 初始化日志 logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -285,72 +285,29 @@ class EsSearchUtil: query_embedding = embeddings.embed_query(query) return query_embedding - def search_by_vector(self, query_embedding: list, k: int = 10) -> list: + def rerank_results(self, query: str, results: List[Dict]) -> List[Tuple[Dict, float]]: """ - 在Elasticsearch中按向量搜索 - - 参数: - query_embedding: 查询向量 - k: 返回结果数量 - - 返回: - list: 搜索结果 - """ - # 从连接池获取连接 - conn = self.es_pool.get_connection() - - try: - # 构建向量查询DSL - query = { - "query": { - "script_score": { - "query": {"match_all": {}}, - "script": { - "source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0", - "params": { - "query_vector": query_embedding - } - } - } - }, - "size": k - } - - # 执行查询 - response = conn.search(index=self.es_config['index_name'], body=query) - return response['hits']['hits'] - except Exception as e: - logger.error(f"向量查询失败: {e}") - print(f"向量查询失败: {e}") - return [] - finally: - # 释放连接回连接池 - self.es_pool.release_connection(conn) - - def rerank_results(self, query: str, results: list) -> list: - """ - 使用重排模型对结果进行排序 + 对搜索结果进行重排 参数: query: 查询文本 - results: 初始搜索结果 + results: 搜索结果列表 返回: - list: 重排后的结果 + list: 重排后的结果列表,每个元素是(文档, 分数)元组 """ if len(results) <= 1: - # 结果太少,无需重排 - return [(result, 1.0) for result in results] + return [(doc, 1.0) for doc in results] # 准备重排请求数据 rerank_data = { "model": Config.RERANK_MODEL, "query": query, - "documents": [result['_source']['user_input'] for result in results], + "documents": [doc['_source']['user_input'] for doc in results], "top_n": len(results) } - # 调用重排API + # 调用API进行重排 headers = { "Content-Type": "application/json", "Authorization": f"Bearer {Config.RERANK_BINDING_API_KEY}" @@ -361,45 +318,78 @@ class EsSearchUtil: response.raise_for_status() rerank_result = response.json() - # 检查响应结构 - if 'results' not in rerank_result: - logger.error(f"重排API响应结构不正确,缺少'results'字段: {rerank_result}") - print(f"重排API响应结构不正确,缺少'results'字段") - return [(result, 1.0) for result in results] + # 处理重排结果 + reranked_docs_with_scores = [] + if "results" in rerank_result: + for item in rerank_result["results"]: + # 尝试获取index和relevance_score字段 + doc_idx = item.get("index") + score = item.get("relevance_score", 0.0) + + # 如果找不到,尝试fallback到document和score字段 + if doc_idx is None: + doc_idx = item.get("document") + if score == 0.0: + score = item.get("score", 0.0) - # 构建重排后的结果列表 - reranked_pairs = [] - for item in rerank_result['results']: - # 尝试获取文档索引,优先使用'index'字段,其次是'document'字段 - doc_idx = item.get('index', item.get('document', -1)) - if doc_idx == -1: - logger.error(f"重排结果项缺少有效索引字段: {item}") - print(f"重排结果项结构不正确") - continue - - # 尝试获取分数,优先使用'relevance_score'字段,其次是'score'字段 - score = item.get('relevance_score', item.get('score', 1.0)) - - # 检查索引是否有效 - if 0 <= doc_idx < len(results): - reranked_pairs.append((results[doc_idx], score)) - else: - logger.error(f"文档索引{doc_idx}超出范围") - print(f"文档索引超出范围") + if doc_idx is not None and 0 <= doc_idx < len(results): + reranked_docs_with_scores.append((results[doc_idx], score)) + logger.debug(f"重排结果: 文档索引={doc_idx}, 分数={score}") + else: + logger.warning(f"重排结果项索引无效: {doc_idx}") # 如果没有有效的重排结果,返回原始结果 - if not reranked_pairs: - logger.warning("没有有效的重排结果,返回原始结果") - return [(result, 1.0) for result in results] + if not reranked_docs_with_scores: + logger.warning("没有获取到有效的重排结果,返回原始结果") + return [(doc, 1.0) for doc in results] - # 按分数降序排序 - reranked_pairs.sort(key=lambda x: x[1], reverse=True) - return reranked_pairs + return reranked_docs_with_scores except Exception as e: logger.error(f"重排失败: {str(e)}") - print(f"重排失败: {e}") - # 重排失败时返回原始结果 - return [(result, 1.0) for result in results] + return [(doc, 1.0) for doc in results] + + def search_by_vector(self, query_embedding: list, k: int = 10) -> dict: + """ + 在Elasticsearch中按向量搜索 + + 参数: + query_embedding: 查询向量 + k: 返回结果数量 + + 返回: + dict: 搜索结果 + """ + # 从连接池获取连接 + conn = self.es_pool.get_connection() + try: + # 构建向量搜索查询 + query = { + "query": { + "script_score": { + "query": { + "bool": { + "should": [], + "minimum_should_match": 0 + } + }, + "script": { + "source": "double score = cosineSimilarity(params.query_vector, 'embedding'); return score >= 0 ? score : 0", + "params": {"query_vector": query_embedding} + } + } + }, + "size": k + } + + # 执行查询 + response = conn.search(index=self.es_config['index_name'], body=query) + return response + except Exception as e: + logger.error(f"向量搜索失败: {str(e)}") + raise + finally: + # 释放连接回连接池 + self.es_pool.release_connection(conn) def display_results(self, results: list, show_score: bool = True) -> None: """ diff --git a/dsSchoolBuddy/Util/VectorUtil.py b/dsSchoolBuddy/ElasticSearch/Utils/VectorUtil.py similarity index 100% rename from dsSchoolBuddy/Util/VectorUtil.py rename to dsSchoolBuddy/ElasticSearch/Utils/VectorUtil.py diff --git a/dsSchoolBuddy/ElasticSearch/Utils/__pycache__/EsSearchUtil.cpython-310.pyc b/dsSchoolBuddy/ElasticSearch/Utils/__pycache__/EsSearchUtil.cpython-310.pyc index 6dd2f28b..5dd71999 100644 Binary files a/dsSchoolBuddy/ElasticSearch/Utils/__pycache__/EsSearchUtil.cpython-310.pyc and b/dsSchoolBuddy/ElasticSearch/Utils/__pycache__/EsSearchUtil.cpython-310.pyc differ diff --git a/dsSchoolBuddy/Util/__pycache__/VectorUtil.cpython-310.pyc b/dsSchoolBuddy/Util/__pycache__/VectorUtil.cpython-310.pyc deleted file mode 100644 index 74028cc1..00000000 Binary files a/dsSchoolBuddy/Util/__pycache__/VectorUtil.cpython-310.pyc and /dev/null differ