Files
dsProject/dsLightRag/YunXiao/action.py
2025-08-27 16:50:47 +08:00

220 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import uuid
import logging
from pathlib import Path
import Config.Config
from Util.ObsUtil import ObsUploader
# 全局变量
logger = None
# 配置日志
def setup_logging():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler('image_processing.log'), logging.StreamHandler()]
)
return logging.getLogger(__name__)
# 初始化OBS上传器
def init_obs_uploader():
try:
return ObsUploader()
except Exception as e:
logger.error(f"初始化OBS上传器失败: {str(e)}")
return None
# 生成UUID文件名保留原始扩展名
def generate_uuid_filename(original_filename):
ext = os.path.splitext(original_filename)[1].lower()
return f"{uuid.uuid4()}{ext}"
# 处理单个图片文件上传
def upload_image_to_obs(obs_uploader, image_path, target_obs_dir="HuangHai/YunXiao/"):
if not os.path.exists(image_path):
logger.error(f"图片文件不存在: {image_path}")
return None
# 生成UUID文件名
uuid_filename = generate_uuid_filename(os.path.basename(image_path))
obs_object_key = f"{target_obs_dir}{uuid_filename}"
# 上传图片到OBS
success, result = obs_uploader.upload_file(
object_key=obs_object_key,
file_path=image_path
)
if success:
https_url = f"https://{Config.Config.OBS_SERVER}/{Config.Config.OBS_BUCKET}/{obs_object_key}"
logger.info(f"图片上传成功: {image_path} -> {https_url}")
return https_url
else:
logger.error(f"图片上传失败: {image_path} - {result.get('errorMessage', '未知错误')}")
return None
# 替换Markdown文件中的图片引用
def replace_image_references(md_content, image_mapping, md_file_path):
updated_content = md_content
# 处理带属性的Markdown格式图片: ![alt](path){attrs}
md_pattern = re.compile(r'!\[([^]]*)\]\(([^)\s]+)(\s+[^)]*)?\)')
def md_replacer(match):
alt_text = match.group(1)
image_path = match.group(2)
attrs = match.group(3) or '' # 获取可选的属性部分
# 将相对路径转换为绝对路径进行匹配
md_dir = os.path.dirname(md_file_path)
abs_image_path = os.path.abspath(os.path.join(md_dir, image_path))
# 查找映射中的URL
for local_path, obs_url in image_mapping.items():
if abs_image_path == local_path or image_path in local_path:
return f"![{alt_text}]({obs_url}){attrs}" # 保留原始属性
return match.group(0)
updated_content = md_pattern.sub(md_replacer, updated_content)
# 处理HTML格式图片: <img src="path" ...>
html_pattern = re.compile(r'<img\s+[^>]*src=["\']([^"\']+)["\'][^>]*>')
def html_replacer(match):
image_path = match.group(1)
# 查找相对路径对应的URL
for local_path, obs_url in image_mapping.items():
if image_path.endswith(local_path) or local_path.endswith(image_path):
return match.group(0).replace(image_path, obs_url)
return match.group(0)
updated_content = html_pattern.sub(html_replacer, updated_content)
return updated_content
# 处理单个Markdown文件
def process_markdown_file(md_file_path, image_mapping, target_dir, current_md_path):
try:
with open(md_file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 处理图片引用
new_content = replace_image_references(content, image_mapping, md_file_path)
# 保存修改后的内容
file_name = os.path.basename(md_file_path)
target_file_path = os.path.join(target_dir, file_name)
with open(target_file_path, 'w', encoding='utf-8') as f:
f.write(new_content)
logger.info(f"已处理并保存文件到: {target_file_path}")
except Exception as e:
logger.error(f"处理文件 {md_file_path} 时出错: {str(e)}")
# 扫描并上传所有media目录中的图片
def scan_and_upload_media_images(root_directory, obs_uploader):
# 支持的图片扩展名
image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.svg', '.tiff']
# 递归查找所有media目录
media_directories = []
for root, dirs, files in os.walk(root_directory):
if 'media' in dirs:
media_dir = os.path.join(root, 'media')
media_directories.append(media_dir)
logger.info(f"找到 {len(media_directories)} 个media目录")
# 图片路径→OBS URL映射字典
image_mapping = {}
# 统计变量
total_images = 0
uploaded_images = 0
# 上传每个media目录中的所有图片
for media_dir in media_directories:
logger.info(f"开始处理media目录: {media_dir}")
# 获取目录中的所有图片文件
image_files = []
for file in os.listdir(media_dir):
file_ext = os.path.splitext(file)[1].lower()
if file_ext in image_extensions:
image_path = os.path.join(media_dir, file)
image_files.append(image_path)
logger.info(f"{media_dir} 中找到 {len(image_files)} 张图片")
total_images += len(image_files)
# 上传每张图片并记录映射
for image_path in image_files:
# 使用绝对路径作为映射键
abs_image_path = os.path.abspath(image_path)
if abs_image_path not in image_mapping:
obs_url = upload_image_to_obs(obs_uploader, image_path)
if obs_url:
image_mapping[abs_image_path] = obs_url
uploaded_images += 1
else:
logger.debug(f"图片已上传,跳过: {abs_image_path}")
logger.info(f"总共有 {total_images} 张图片,成功上传 {uploaded_images}")
return image_mapping
# 主处理函数
def main():
global logger
logger = setup_logging()
# 配置参数
root_directory = r"D:\dsWork\dsProject\dsLightRag\YunXiao\《万有引力定律》试题"
target_dir = os.path.join(os.getcwd(), "target")
try:
# 初始化OBS上传器
obs_uploader = init_obs_uploader()
if not obs_uploader:
logger.error("初始化OBS上传器失败无法继续执行")
return
# 创建target目录
os.makedirs(target_dir, exist_ok=True)
logger.info(f"已创建target目录: {target_dir}")
# 扫描并上传所有media目录中的图片
image_mapping = scan_and_upload_media_images(root_directory, obs_uploader)
if not image_mapping:
logger.warning("未找到任何图片或图片上传失败")
else:
logger.info(f"共上传了 {len(image_mapping)} 张图片到OBS")
# 递归查找所有Markdown文件并处理
logger.info("开始处理Markdown文件...")
directory = Path(root_directory)
if not directory.exists():
logger.error(f"目录不存在: {directory}")
return
md_files = list(directory.glob('**/*.md'))
if not md_files:
logger.warning(f"{directory} 中没有找到Markdown文件")
return
logger.info(f"找到 {len(md_files)} 个Markdown文件")
# 处理每个Markdown文件
for md_file in md_files:
process_markdown_file(str(md_file), image_mapping, target_dir, str(md_file))
logger.info("所有文件处理完成!")
except Exception as e:
logger.error(f"程序执行出错: {str(e)}", exc_info=True)
if __name__ == "__main__":
main()