import logging from Util.EsMappingUtil import create_vector_index, create_text_index from Util.EmbeddingUtil import text_to_embedding # 修改导入 from Config.Config import ES_CONFIG from elasticsearch import Elasticsearch import re from tqdm import tqdm import datetime import numpy as np def split_sentences(text): """按句分割文本""" paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()] # 使用jieba进行分句 sentences = re.split(r'[。!?;\n]', text) # 添加这行定义sentences return [s.strip() for s in sentences if s.strip()] def save_to_es(text): """保存向量化文本和原始文本到ES""" vector = text_to_embedding(text) # 修改函数调用 # 检查向量是否有效 if vector is None or (hasattr(vector, 'size')) and vector.size == 0: logging.warning(f"跳过无效向量文本: {text}") return None # 检查向量是否全为零或接近零 if np.all(np.abs(vector) < 1e-6): logging.warning(f"跳过零向量文本: {text}") return None # 归一化向量以避免cosine相似度问题 norm = np.linalg.norm(vector) if norm > 0: vector = vector / norm doc = { 'text': text, 'vector': vector.tolist(), 'timestamp': datetime.datetime.now().isoformat() } es.index(index='knowledge_base', body=doc) es.index(index='raw_texts', body={'text': text}) def process_file(file_path): """处理文本文件""" with open(file_path, 'r', encoding='utf-8') as f: content = f.read() sentences = split_sentences(content) # 添加进度条 for sentence in tqdm(sentences, desc='处理进度', unit='句'): save_to_es(sentence) print(f"\n处理完成,共保存{len(sentences)}个句子") if __name__ == '__main__': es = Elasticsearch( hosts=[ES_CONFIG['hosts']], basic_auth=ES_CONFIG['basic_auth'], verify_certs=ES_CONFIG['verify_certs'], ssl_show_warn=ES_CONFIG['ssl_show_warn'] ) create_vector_index() create_text_index() file_path = 'Txt/人口变化趋势对云南教育的影响.txt' process_file(file_path)