You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
dsProject/dsRag/Test/TestReadWordTextAndImage.py

169 lines
6.8 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import docx
from Util.WordImageUtil import extract_images_from_docx
# 定义常量
BLOCK_START_KEYWORDS = ('问题', '话题')
OUTPUT_IMAGE_DIR = "D:\\dsWork\\dsProject\\dsRag\\static\\Images"
OUTPUT_TXT_DIR = "D:\\dsWork\\dsProject\\dsRag\\Txt"
def read_docx(file_path):
"""读取docx文件内容返回包含段落索引和文本的列表"""
try:
doc = docx.Document(file_path)
return [(i, para.text) for i, para in enumerate(doc.paragraphs) if para.text]
except Exception as e:
print(f"读取docx文件出错: {str(e)}")
return []
def is_new_block_start(para_text):
"""判断段落是否是新块的开始"""
return para_text.startswith(BLOCK_START_KEYWORDS) and any(c.isdigit() for c in para_text[:5])
def split_into_blocks(paragraphs):
"""按段落遍历文本,发现'问题X''话题X'时开始分割,保留段落索引和内容"""
blocks = []
current_block = []
current_indices = []
in_block = False
for para_idx, para_text in paragraphs:
if is_new_block_start(para_text):
if in_block:
blocks.append((current_indices, '\n'.join(current_block)))
print(
f"问题/话题 {len(blocks)} 管辖段落范围: {current_indices[0]} - {current_indices[-1]} (段落数: {len(current_block)})")
print(f"当前段落块内容: {''.join(current_block)}")
print(f"当前段落块索引: {current_indices}")
current_block = [para_text]
current_indices = [para_idx]
in_block = True
elif not current_block and (para_text.strip() or para_idx == 0):
# 处理文档开头没有明确标记的问题/话题
current_block = [para_text]
current_indices = [para_idx]
if in_block and para_text.strip(): # 只添加非空段落
# 检查当前段落是否与前一个段落重复,避免重复添加
if not current_block or para_text.strip() != current_block[-1].strip():
current_block.append(para_text)
# 确保 current_indices 包含所有属于当前块的段落索引
if para_idx not in current_indices:
current_indices.append(para_idx)
if current_block:
blocks.append((current_indices, '\n'.join(current_block)))
print(
f"问题/话题 {len(blocks)} 管辖段落范围: {current_indices[0]} - {current_indices[-1]} (段落数: {len(current_block)})")
print(f"当前段落块内容: {''.join(current_block)}")
print(f"当前段落块索引: {current_indices}")
return blocks
def process_images(images, blocks):
"""处理图片与段落块的关联,并将图片信息写入对应的段落块文本中"""
for img in images:
img_para_idx = img['location']['paragraph_index']
img_name = os.path.basename(img['image_path'])
img_path = img['image_path']
assigned = False
for block_idx, (para_indices, block_text) in enumerate(blocks, 1):
# 检查图片所在的段落索引是否直接包含在段落块的索引列表中
if img_para_idx in para_indices:
print(f"图片 {img_name} 属于段落块 {block_idx} (段落索引: {para_indices})")
# 将图片信息添加到对应的段落块文本中
blocks[block_idx - 1] = (para_indices, block_text + f"\n[图片: {img_name}, 路径: {img_path}]")
assigned = True
break
# 如果图片未被分配到任何特定段落块,则将其添加到最后一个段落块
if not assigned:
if blocks:
last_block_idx = len(blocks) - 1
last_para_indices, last_block_text = blocks[last_block_idx]
blocks[last_block_idx] = (last_para_indices, last_block_text + f"\n[图片: {img_name}, 路径: {img_path}]")
print(f"图片 {img_name} 未找到精确匹配的段落块,已添加到最后一个段落块。")
else:
print(f"图片 {img_name} 未找到匹配的段落块,且没有可用的段落块。")
def save_blocks_to_txt(blocks, file_prefix, output_dir):
"""将段落块保存到TXT文件"""
saved_count = 0
for block_idx, (para_indices, block_text) in enumerate(blocks, 1):
block_text = block_text.strip()
output_file = os.path.join(output_dir, f"{file_prefix}_{block_idx}.txt")
if save_to_txt(block_text, output_file):
saved_count += 1
print(f"处理完成,共保存{saved_count}个文件到目录: {output_dir}")
return saved_count > 0
def process_document(input_path):
"""处理文档主函数"""
paragraphs = read_docx(input_path)
print("段落块信息:" + str(paragraphs))
if not paragraphs:
print("无法读取输入文件内容")
return False
os.makedirs(OUTPUT_IMAGE_DIR, exist_ok=True)
print(f"图片将保存到目录: {OUTPUT_IMAGE_DIR}")
images = extract_images_from_docx(input_path, OUTPUT_IMAGE_DIR)
print(f"共提取到{len(images)}张图片")
for img in images:
print(f"图片保存至: {img['image_path']}")
loc = img['location']
print(f"位置信息: 段落 {loc['paragraph_index']}")
blocks = split_into_blocks(paragraphs)
print(f"共分割出{len(blocks)}个段落块")
# 处理图片
process_images(images, blocks)
file_prefix = os.path.basename(input_path).split('.')[0].split('_')[-2] + '_' + \
os.path.basename(input_path).split('.')[0].split('_')[-1]
return save_blocks_to_txt(blocks, file_prefix, OUTPUT_TXT_DIR)
# 保留原有的save_to_txt函数
def save_to_txt(content, file_path, mode='w'):
"""将内容保存到文本文件"""
try:
with open(file_path, mode, encoding='utf-8') as f:
f.write(content)
return True
except Exception as e:
print(f"保存文件{file_path}时出错: {str(e)}")
return False
def process_directory(input_dir):
"""处理指定目录下的所有docx文件"""
docx_files = [f for f in os.listdir(input_dir) if f.lower().endswith('.docx')]
if not docx_files:
print(f"目录中没有找到docx文件: {input_dir}")
return False
success_count = 0
for docx_file in docx_files:
input_path = os.path.join(input_dir, docx_file)
print(f"正在处理文件: {docx_file}")
if process_document(input_path):
success_count += 1
print(f"处理完成,共处理{success_count}/{len(docx_files)}个文件")
return success_count > 0
if __name__ == "__main__":
input_dir = '../static/Test'
process_directory(input_dir)