You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
82 lines
2.6 KiB
82 lines
2.6 KiB
import os
|
|
import datetime
|
|
import logging
|
|
|
|
from elasticsearch import Elasticsearch
|
|
|
|
from Config.Config import ES_CONFIG
|
|
from Util.EmbeddingUtil import text_to_embedding
|
|
|
|
# 在文件开头添加logger配置
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.INFO)
|
|
|
|
# 创建控制台handler并设置格式
|
|
handler = logging.StreamHandler()
|
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
|
|
def split_paragraphs(text):
|
|
"""按段落分割文本"""
|
|
# 按两个换行符分割段落
|
|
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
|
|
return paragraphs
|
|
|
|
def save_to_es(text):
|
|
"""保存向量化文本和原始文本到ES"""
|
|
vector = text_to_embedding(text)
|
|
|
|
if vector is None:
|
|
logger.warning(f"跳过无法生成向量的文本: {text}")
|
|
return
|
|
|
|
doc = {
|
|
'text': text,
|
|
'vector': vector,
|
|
'timestamp': datetime.datetime.now().isoformat(),
|
|
'analyzer': 'ik_smart'
|
|
}
|
|
|
|
try:
|
|
es.index(index='knowledge_base', body=doc)
|
|
es.index(index='raw_texts', body={'raw_text': text})
|
|
except Exception as e:
|
|
logger.error(f"保存文本到ES失败: {e}")
|
|
|
|
def process_directory(dir_path):
|
|
"""处理目录下所有文本文件"""
|
|
total_paragraphs = 0
|
|
|
|
# 获取所有txt文件并按数字排序
|
|
files = [f for f in os.listdir(dir_path) if f.endswith('.txt')]
|
|
files.sort(key=lambda x: int(x.split('.')[0]))
|
|
file_count = len(files)
|
|
|
|
print(f"共发现{file_count}个文本文件需要处理")
|
|
|
|
for i, filename in enumerate(files, 1):
|
|
print(f"正在处理第{i}/{file_count}个文件: {filename}")
|
|
|
|
file_path = os.path.join(dir_path, filename)
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
paragraphs = split_paragraphs(content)
|
|
total_paragraphs += len(paragraphs)
|
|
|
|
for paragraph in paragraphs:
|
|
save_to_es(paragraph)
|
|
|
|
print(f"\n处理完成,共处理{file_count}个文件,保存{total_paragraphs}个段落")
|
|
|
|
if __name__ == '__main__':
|
|
es = Elasticsearch(
|
|
hosts=[ES_CONFIG['hosts']],
|
|
basic_auth=ES_CONFIG['basic_auth'],
|
|
verify_certs=ES_CONFIG['verify_certs'],
|
|
ssl_show_warn=ES_CONFIG['ssl_show_warn']
|
|
)
|
|
|
|
dir_path = '../Txt/processed_chunks' # T2_DocxProcessor.py生成的目录
|
|
process_directory(dir_path) |