220 lines
7.9 KiB
Python
220 lines
7.9 KiB
Python
import os
|
||
import re
|
||
import uuid
|
||
import logging
|
||
from pathlib import Path
|
||
|
||
import Config.Config
|
||
from Util.ObsUtil import ObsUploader
|
||
|
||
# 全局变量
|
||
logger = None
|
||
|
||
# 配置日志
|
||
def setup_logging():
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
handlers=[logging.FileHandler('image_processing.log'), logging.StreamHandler()]
|
||
)
|
||
return logging.getLogger(__name__)
|
||
|
||
# 初始化OBS上传器
|
||
def init_obs_uploader():
|
||
try:
|
||
return ObsUploader()
|
||
except Exception as e:
|
||
logger.error(f"初始化OBS上传器失败: {str(e)}")
|
||
return None
|
||
|
||
# 生成UUID文件名(保留原始扩展名)
|
||
def generate_uuid_filename(original_filename):
|
||
ext = os.path.splitext(original_filename)[1].lower()
|
||
return f"{uuid.uuid4()}{ext}"
|
||
|
||
# 处理单个图片文件上传
|
||
def upload_image_to_obs(obs_uploader, image_path, target_obs_dir="HuangHai/YunXiao/"):
|
||
if not os.path.exists(image_path):
|
||
logger.error(f"图片文件不存在: {image_path}")
|
||
return None
|
||
|
||
# 生成UUID文件名
|
||
uuid_filename = generate_uuid_filename(os.path.basename(image_path))
|
||
obs_object_key = f"{target_obs_dir}{uuid_filename}"
|
||
|
||
# 上传图片到OBS
|
||
success, result = obs_uploader.upload_file(
|
||
object_key=obs_object_key,
|
||
file_path=image_path
|
||
)
|
||
|
||
if success:
|
||
https_url = f"https://{Config.Config.OBS_SERVER}/{Config.Config.OBS_BUCKET}/{obs_object_key}"
|
||
logger.info(f"图片上传成功: {image_path} -> {https_url}")
|
||
return https_url
|
||
else:
|
||
logger.error(f"图片上传失败: {image_path} - {result.get('errorMessage', '未知错误')}")
|
||
return None
|
||
|
||
# 替换Markdown文件中的图片引用
|
||
def replace_image_references(md_content, image_mapping, md_file_path):
|
||
updated_content = md_content
|
||
|
||
# 处理带属性的Markdown格式图片: {attrs}
|
||
md_pattern = re.compile(r'!\[([^]]*)\]\(([^)\s]+)(\s+[^)]*)?\)')
|
||
|
||
def md_replacer(match):
|
||
alt_text = match.group(1)
|
||
image_path = match.group(2)
|
||
attrs = match.group(3) or '' # 获取可选的属性部分
|
||
|
||
# 将相对路径转换为绝对路径进行匹配
|
||
md_dir = os.path.dirname(md_file_path)
|
||
abs_image_path = os.path.abspath(os.path.join(md_dir, image_path))
|
||
|
||
# 查找映射中的URL
|
||
for local_path, obs_url in image_mapping.items():
|
||
if abs_image_path == local_path or image_path in local_path:
|
||
return f"{attrs}" # 保留原始属性
|
||
return match.group(0)
|
||
|
||
updated_content = md_pattern.sub(md_replacer, updated_content)
|
||
|
||
# 处理HTML格式图片: <img src="path" ...>
|
||
html_pattern = re.compile(r'<img\s+[^>]*src=["\']([^"\']+)["\'][^>]*>')
|
||
|
||
def html_replacer(match):
|
||
image_path = match.group(1)
|
||
|
||
# 查找相对路径对应的URL
|
||
for local_path, obs_url in image_mapping.items():
|
||
if image_path.endswith(local_path) or local_path.endswith(image_path):
|
||
return match.group(0).replace(image_path, obs_url)
|
||
return match.group(0)
|
||
|
||
updated_content = html_pattern.sub(html_replacer, updated_content)
|
||
|
||
return updated_content
|
||
|
||
# 处理单个Markdown文件
|
||
def process_markdown_file(md_file_path, image_mapping, target_dir, current_md_path):
|
||
try:
|
||
with open(md_file_path, 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
|
||
# 处理图片引用
|
||
new_content = replace_image_references(content, image_mapping, md_file_path)
|
||
|
||
# 保存修改后的内容
|
||
file_name = os.path.basename(md_file_path)
|
||
target_file_path = os.path.join(target_dir, file_name)
|
||
with open(target_file_path, 'w', encoding='utf-8') as f:
|
||
f.write(new_content)
|
||
logger.info(f"已处理并保存文件到: {target_file_path}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理文件 {md_file_path} 时出错: {str(e)}")
|
||
|
||
# 扫描并上传所有media目录中的图片
|
||
def scan_and_upload_media_images(root_directory, obs_uploader):
|
||
# 支持的图片扩展名
|
||
image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.svg', '.tiff']
|
||
|
||
# 递归查找所有media目录
|
||
media_directories = []
|
||
for root, dirs, files in os.walk(root_directory):
|
||
if 'media' in dirs:
|
||
media_dir = os.path.join(root, 'media')
|
||
media_directories.append(media_dir)
|
||
|
||
logger.info(f"找到 {len(media_directories)} 个media目录")
|
||
|
||
# 图片路径→OBS URL映射字典
|
||
image_mapping = {}
|
||
|
||
# 统计变量
|
||
total_images = 0
|
||
uploaded_images = 0
|
||
|
||
# 上传每个media目录中的所有图片
|
||
for media_dir in media_directories:
|
||
logger.info(f"开始处理media目录: {media_dir}")
|
||
|
||
# 获取目录中的所有图片文件
|
||
image_files = []
|
||
for file in os.listdir(media_dir):
|
||
file_ext = os.path.splitext(file)[1].lower()
|
||
if file_ext in image_extensions:
|
||
image_path = os.path.join(media_dir, file)
|
||
image_files.append(image_path)
|
||
|
||
logger.info(f"在 {media_dir} 中找到 {len(image_files)} 张图片")
|
||
total_images += len(image_files)
|
||
|
||
# 上传每张图片并记录映射
|
||
for image_path in image_files:
|
||
# 使用绝对路径作为映射键
|
||
abs_image_path = os.path.abspath(image_path)
|
||
if abs_image_path not in image_mapping:
|
||
obs_url = upload_image_to_obs(obs_uploader, image_path)
|
||
if obs_url:
|
||
image_mapping[abs_image_path] = obs_url
|
||
uploaded_images += 1
|
||
else:
|
||
logger.debug(f"图片已上传,跳过: {abs_image_path}")
|
||
|
||
logger.info(f"总共有 {total_images} 张图片,成功上传 {uploaded_images} 张")
|
||
return image_mapping
|
||
|
||
# 主处理函数
|
||
def main():
|
||
global logger
|
||
logger = setup_logging()
|
||
|
||
# 配置参数
|
||
root_directory = r"D:\dsWork\dsProject\dsLightRag\YunXiao\《万有引力定律》试题"
|
||
target_dir = os.path.join(os.getcwd(), "target")
|
||
|
||
try:
|
||
# 初始化OBS上传器
|
||
obs_uploader = init_obs_uploader()
|
||
if not obs_uploader:
|
||
logger.error("初始化OBS上传器失败,无法继续执行")
|
||
return
|
||
|
||
# 创建target目录
|
||
os.makedirs(target_dir, exist_ok=True)
|
||
logger.info(f"已创建target目录: {target_dir}")
|
||
|
||
# 扫描并上传所有media目录中的图片
|
||
image_mapping = scan_and_upload_media_images(root_directory, obs_uploader)
|
||
if not image_mapping:
|
||
logger.warning("未找到任何图片或图片上传失败")
|
||
else:
|
||
logger.info(f"共上传了 {len(image_mapping)} 张图片到OBS")
|
||
|
||
# 递归查找所有Markdown文件并处理
|
||
logger.info("开始处理Markdown文件...")
|
||
directory = Path(root_directory)
|
||
if not directory.exists():
|
||
logger.error(f"目录不存在: {directory}")
|
||
return
|
||
|
||
md_files = list(directory.glob('**/*.md'))
|
||
if not md_files:
|
||
logger.warning(f"在 {directory} 中没有找到Markdown文件")
|
||
return
|
||
|
||
logger.info(f"找到 {len(md_files)} 个Markdown文件")
|
||
|
||
# 处理每个Markdown文件
|
||
for md_file in md_files:
|
||
process_markdown_file(str(md_file), image_mapping, target_dir, str(md_file))
|
||
|
||
logger.info("所有文件处理完成!")
|
||
|
||
except Exception as e:
|
||
logger.error(f"程序执行出错: {str(e)}", exc_info=True)
|
||
|
||
if __name__ == "__main__":
|
||
main() |