You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1356 lines
56 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# type: ignore
"""
MinerU Document Parser Utility
This module provides functionality for parsing PDF and image documents using MinerU 2.0 library,
and converts the parsing results into markdown and JSON formats
Note: MinerU 2.0 no longer includes LibreOffice document conversion module.
For Office documents (.doc, .docx, .ppt, .pptx), please convert them to PDF format first.
"""
from __future__ import annotations
__all__ = ["MineruParser"]
import json
import argparse
import subprocess
import tempfile
from pathlib import Path
from typing import (
Dict,
List,
Optional,
Union,
Tuple,
Any,
TypeVar,
)
T = TypeVar("T")
class MineruParser:
"""
MinerU 2.0 document parsing utility class
Supports parsing PDF and image documents, converting the content into structured data
and generating markdown and JSON output.
Note: Office documents are no longer directly supported. Please convert them to PDF first.
"""
__slots__ = ()
def __init__(self) -> None:
"""Initialize MineruParser"""
pass
@staticmethod
def _run_mineru_command(
input_path: Union[str, Path],
output_dir: Union[str, Path],
method: str = "auto",
lang: Optional[str] = None,
backend: str = "pipeline",
start_page: Optional[int] = None,
end_page: Optional[int] = None,
formula: bool = True,
table: bool = True,
device: Optional[str] = None,
source: str = "huggingface",
) -> None:
"""
Run mineru command line tool
Args:
input_path: Path to input file or directory
output_dir: Output directory path
method: Parsing method (auto, txt, ocr)
lang: Document language for OCR optimization
backend: Parsing backend
start_page: Starting page number (0-based)
end_page: Ending page number (0-based)
formula: Enable formula parsing
table: Enable table parsing
device: Inference device
source: Model source
"""
# 【黄海】 MinerU需要下载模型可以从国内的源下载
# https://github.com/opendatalab/MinerU/blob/master/README_zh-CN.md#本地部署
# mineru-models-download
# 居然在下载 OCR/paddleocr_torch/ 果然是个好东西!
cmd = [
"mineru",
"-p",
str(input_path),
"-o",
str(output_dir),
"-m",
method,
"-b",
backend,
"--source",
source,
]
if lang:
cmd.extend(["-l", lang])
if start_page is not None:
cmd.extend(["-s", str(start_page)])
if end_page is not None:
cmd.extend(["-e", str(end_page)])
if not formula:
cmd.extend(["-f", "false"])
if not table:
cmd.extend(["-t", "false"])
if device:
cmd.extend(["-d", device])
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True,
encoding="utf-8",
errors="ignore",
)
print("MinerU command executed successfully")
if result.stdout:
print(f"Output: {result.stdout}")
except subprocess.CalledProcessError as e:
print(f"Error running mineru command: {e}")
if e.stderr:
print(f"Error details: {e.stderr}")
raise
except FileNotFoundError:
raise RuntimeError(
"mineru command not found. Please ensure MinerU 2.0 is properly installed:\n"
"pip install -U 'mineru[core]' or uv pip install -U 'mineru[core]'"
)
@staticmethod
def _read_output_files(
output_dir: Path, file_stem: str
) -> Tuple[List[Dict[str, Any]], str]:
"""
Read the output files generated by mineru
Args:
output_dir: Output directory
file_stem: File name without extension
Returns:
Tuple containing (content list JSON, Markdown text)
"""
# Look for the generated files
md_file = output_dir / f"{file_stem}.md"
json_file = output_dir / f"{file_stem}_content_list.json"
# Try alternative naming patterns if files not found
if not md_file.exists():
# Check for files in subdirectory (MinerU 2.0 may create subdirectories)
subdir = output_dir / file_stem
if subdir.exists():
md_file = subdir / f"{file_stem}.md"
json_file = subdir / f"{file_stem}_content_list.json"
# Read markdown content
md_content = ""
if md_file.exists():
try:
with open(md_file, "r", encoding="utf-8") as f:
md_content = f.read()
except Exception as e:
print(f"Warning: Could not read markdown file {md_file}: {e}")
# Read JSON content list
content_list = []
if json_file.exists():
try:
with open(json_file, "r", encoding="utf-8") as f:
content_list = json.load(f)
except Exception as e:
print(f"Warning: Could not read JSON file {json_file}: {e}")
# If standard files not found, look for any .md and .json files in the directory
if not md_content and not content_list:
# First try to find any markdown file
for file_path in output_dir.rglob("*.md"):
try:
with open(file_path, "r", encoding="utf-8") as f:
md_content = f.read()
break
except Exception:
continue
# Then try to find content list JSON files
for file_path in output_dir.rglob("*_content_list.json"):
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list): # Likely content list
content_list = data
break
except Exception:
continue
return content_list, md_content
@staticmethod
def parse_pdf(
pdf_path: Union[str, Path],
output_dir: Optional[str] = None,
method: str = "auto",
lang: Optional[str] = None,
**kwargs,
) -> Tuple[List[Dict[str, Any]], str]:
"""
Parse PDF document using MinerU 2.0
Args:
pdf_path: Path to the PDF file
output_dir: Output directory path
method: Parsing method (auto, txt, ocr)
lang: Document language for OCR optimization
**kwargs: Additional parameters for mineru command
Returns:
Tuple[List[Dict[str, Any]], str]: Tuple containing (content list JSON, Markdown text)
"""
try:
# Convert to Path object for easier handling
pdf_path = Path(pdf_path)
if not pdf_path.exists():
raise FileNotFoundError(f"PDF file does not exist: {pdf_path}")
name_without_suff = pdf_path.stem
# Prepare output directory
if output_dir:
base_output_dir = Path(output_dir)
else:
base_output_dir = pdf_path.parent / "mineru_output"
base_output_dir.mkdir(parents=True, exist_ok=True)
# Run mineru command
MineruParser._run_mineru_command(
input_path=pdf_path,
output_dir=base_output_dir,
method=method,
lang=lang,
**kwargs,
)
# Read the generated output files
content_list, md_content = MineruParser._read_output_files(
base_output_dir, name_without_suff
)
return content_list, md_content
except Exception as e:
print(f"Error in parse_pdf: {str(e)}")
raise
@staticmethod
def parse_image(
image_path: Union[str, Path],
output_dir: Optional[str] = None,
lang: Optional[str] = None,
**kwargs,
) -> Tuple[List[Dict[str, Any]], str]:
"""
Parse image document using MinerU 2.0
Note: MinerU 2.0 natively supports .png, .jpeg, .jpg formats.
Other formats (.bmp, .tiff, .tif, etc.) will be automatically converted to .png.
Args:
image_path: Path to the image file
output_dir: Output directory path
lang: Document language for OCR optimization
**kwargs: Additional parameters for mineru command
Returns:
Tuple[List[Dict[str, Any]], str]: Tuple containing (content list JSON, Markdown text)
"""
try:
# Convert to Path object for easier handling
image_path = Path(image_path)
if not image_path.exists():
raise FileNotFoundError(f"Image file does not exist: {image_path}")
# Supported image formats by MinerU 2.0
mineru_supported_formats = {".png", ".jpeg", ".jpg"}
# All supported image formats (including those we can convert)
all_supported_formats = {
".png",
".jpeg",
".jpg",
".bmp",
".tiff",
".tif",
".gif",
".webp",
}
ext = image_path.suffix.lower()
if ext not in all_supported_formats:
raise ValueError(
f"Unsupported image format: {ext}. Supported formats: {', '.join(all_supported_formats)}"
)
# Determine the actual image file to process
actual_image_path = image_path
temp_converted_file = None
# If format is not natively supported by MinerU, convert it
if ext not in mineru_supported_formats:
print(f"Converting {ext} image to PNG for MinerU compatibility...")
try:
from PIL import Image
except ImportError:
raise RuntimeError(
"PIL/Pillow is required for image format conversion. "
"Please install it using: pip install Pillow"
)
# Create temporary directory for conversion
import tempfile
temp_dir = Path(tempfile.mkdtemp())
temp_converted_file = temp_dir / f"{image_path.stem}_converted.png"
try:
# Open and convert image
with Image.open(image_path) as img:
# Handle different image modes
if img.mode in ("RGBA", "LA", "P"):
# For images with transparency or palette, convert to RGB first
if img.mode == "P":
img = img.convert("RGBA")
# Create white background for transparent images
background = Image.new("RGB", img.size, (255, 255, 255))
if img.mode == "RGBA":
background.paste(
img, mask=img.split()[-1]
) # Use alpha channel as mask
else:
background.paste(img)
img = background
elif img.mode not in ("RGB", "L"):
# Convert other modes to RGB
img = img.convert("RGB")
# Save as PNG
img.save(temp_converted_file, "PNG", optimize=True)
print(
f"Successfully converted {image_path.name} to PNG ({temp_converted_file.stat().st_size / 1024:.1f} KB)"
)
actual_image_path = temp_converted_file
except Exception as e:
if temp_converted_file and temp_converted_file.exists():
temp_converted_file.unlink()
raise RuntimeError(
f"Failed to convert image {image_path.name}: {str(e)}"
)
name_without_suff = image_path.stem
# Prepare output directory
if output_dir:
base_output_dir = Path(output_dir)
else:
base_output_dir = image_path.parent / "mineru_output"
base_output_dir.mkdir(parents=True, exist_ok=True)
try:
# Run mineru command (images are processed with OCR method)
MineruParser._run_mineru_command(
input_path=actual_image_path,
output_dir=base_output_dir,
method="ocr", # Images require OCR method
lang=lang,
**kwargs,
)
# Read the generated output files
content_list, md_content = MineruParser._read_output_files(
base_output_dir, name_without_suff
)
return content_list, md_content
finally:
# Clean up temporary converted file if it was created
if temp_converted_file and temp_converted_file.exists():
try:
temp_converted_file.unlink()
temp_converted_file.parent.rmdir() # Remove temp directory if empty
except Exception:
pass # Ignore cleanup errors
except Exception as e:
print(f"Error in parse_image: {str(e)}")
raise
@staticmethod
def parse_office_doc(
doc_path: Union[str, Path], output_dir: Optional[str] = None, **kwargs
) -> Tuple[List[Dict[str, Any]], str]:
"""
Parse office document by first converting to PDF, then parsing with MinerU 2.0
Note: This method requires LibreOffice to be installed separately for PDF conversion.
MinerU 2.0 no longer includes built-in Office document conversion.
Supported formats: .doc, .docx, .ppt, .pptx, .xls, .xlsx
Args:
doc_path: Path to the document file (.doc, .docx, .ppt, .pptx, .xls, .xlsx)
output_dir: Output directory path
**kwargs: Additional parameters for mineru command
Returns:
Tuple[List[Dict[str, Any]], str]: Tuple containing (content list JSON, Markdown text)
"""
try:
doc_path = Path(doc_path)
if not doc_path.exists():
raise FileNotFoundError(f"Document file does not exist: {doc_path}")
# Supported office formats
supported_office_formats = {
".doc",
".docx",
".ppt",
".pptx",
".xls",
".xlsx",
}
if doc_path.suffix.lower() not in supported_office_formats:
raise ValueError(f"Unsupported office format: {doc_path.suffix}")
# # Check if LibreOffice is available
# libreoffice_available = False
working_libreoffice_cmd = 'soffice'
# try:
# result = subprocess.run(
# ["libreoffice", "--version"],
# capture_output=True,
# check=True,
# timeout=10,
# encoding="utf-8",
# errors="ignore",
# )
# libreoffice_available = True
# working_libreoffice_cmd = "libreoffice"
# print(f"LibreOffice detected: {result.stdout.strip()}")
# except (
# subprocess.CalledProcessError,
# FileNotFoundError,
# subprocess.TimeoutExpired,
# ):
# pass
#
# # Try alternative commands for LibreOffice
# if not libreoffice_available:
# for cmd in ["soffice", "libreoffice"]:
# try:
# result = subprocess.run(
# [cmd, "--version"],
# capture_output=True,
# check=True,
# timeout=10,
# encoding="utf-8",
# errors="ignore",
# )
# libreoffice_available = True
# working_libreoffice_cmd = cmd
# print(
# f"LibreOffice detected with command '{cmd}': {result.stdout.strip()}"
# )
# break
# except (
# subprocess.CalledProcessError,
# FileNotFoundError,
# subprocess.TimeoutExpired,
# ):
# continue
#
# if not libreoffice_available:
# raise RuntimeError(
# "LibreOffice is required for Office document conversion but was not found.\n"
# "Please install LibreOffice:\n"
# "- Windows: Download from https://www.libreoffice.org/download/download/\n"
# "- macOS: brew install --cask libreoffice\n"
# "- Ubuntu/Debian: sudo apt-get install libreoffice\n"
# "- CentOS/RHEL: sudo yum install libreoffice\n"
# "Alternatively, convert the document to PDF manually.\n"
# "MinerU 2.0 no longer includes built-in Office document conversion."
# )
# Create temporary directory for PDF conversion
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Convert to PDF using LibreOffice
print(f"Converting {doc_path.name} to PDF using LibreOffice...")
# Use the working LibreOffice command first, then try alternatives if it fails
commands_to_try = [working_libreoffice_cmd]
if working_libreoffice_cmd == "libreoffice":
commands_to_try.append("soffice")
else:
commands_to_try.append("libreoffice")
conversion_successful = False
for cmd in commands_to_try:
try:
convert_cmd = [
cmd,
"--headless",
"--convert-to",
"pdf",
"--outdir",
str(temp_path),
str(doc_path),
]
result = subprocess.run(
convert_cmd,
capture_output=True,
text=True,
timeout=60, # 60 second timeout
encoding="utf-8",
errors="ignore",
)
if result.returncode == 0:
conversion_successful = True
print(f"Successfully converted {doc_path.name} to PDF")
print(convert_cmd)
break
else:
print(
f"LibreOffice command '{cmd}' failed: {result.stderr}"
)
except subprocess.TimeoutExpired:
print(f"LibreOffice command '{cmd}' timed out")
except Exception as e:
print(f"LibreOffice command '{cmd}' failed with exception: {e}")
if not conversion_successful:
raise RuntimeError(
f"LibreOffice conversion failed for {doc_path.name}. "
f"Please check if the file is corrupted or try converting manually."
)
# Find the generated PDF
pdf_files = list(temp_path.glob("*.pdf"))
if not pdf_files:
raise RuntimeError(
f"PDF conversion failed for {doc_path.name} - no PDF file generated. "
f"Please check LibreOffice installation or try manual conversion."
)
pdf_path = pdf_files[0]
print(
f"Generated PDF: {pdf_path.name} ({pdf_path.stat().st_size} bytes)"
)
# Validate the generated PDF
if pdf_path.stat().st_size < 100: # Very small file, likely empty
raise RuntimeError(
"Generated PDF appears to be empty or corrupted. "
"Original file may have issues or LibreOffice conversion failed."
)
# Parse the converted PDF
# TODO
return MineruParser.parse_pdf(
pdf_path=pdf_path, output_dir=output_dir, **kwargs
)
except Exception as e:
print(f"Error in parse_office_doc: {str(e)}")
raise
@staticmethod
def parse_text_file(
text_path: Union[str, Path], output_dir: Optional[str] = None, **kwargs
) -> Tuple[List[Dict[str, Any]], str]:
"""
Parse text file by first converting to PDF, then parsing with MinerU 2.0
Supported formats: .txt, .md
Args:
text_path: Path to the text file (.txt, .md)
output_dir: Output directory path
**kwargs: Additional parameters for mineru command
Returns:
Tuple[List[Dict[str, Any]], str]: Tuple containing (content list JSON, Markdown text)
"""
try:
text_path = Path(text_path)
if not text_path.exists():
raise FileNotFoundError(f"Text file does not exist: {text_path}")
# Supported text formats
supported_text_formats = {".txt", ".md"}
if text_path.suffix.lower() not in supported_text_formats:
raise ValueError(f"Unsupported text format: {text_path.suffix}")
# Read the text content
try:
with open(text_path, "r", encoding="utf-8") as f:
text_content = f.read()
except UnicodeDecodeError:
# Try with different encodings
for encoding in ["gbk", "latin-1", "cp1252"]:
try:
with open(text_path, "r", encoding=encoding) as f:
text_content = f.read()
print(f"Successfully read file with {encoding} encoding")
break
except UnicodeDecodeError:
continue
else:
raise RuntimeError(
f"Could not decode text file {text_path.name} with any supported encoding"
)
# Create temporary directory for PDF conversion
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
pdf_path = temp_path / f"{text_path.stem}.pdf"
# Convert text to PDF
print(f"Converting {text_path.name} to PDF...")
try:
from reportlab.lib.pagesizes import A4
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
from reportlab.pdfbase import pdfmetrics
import re
# Create PDF document
doc = SimpleDocTemplate(
str(pdf_path),
pagesize=A4,
leftMargin=inch,
rightMargin=inch,
topMargin=inch,
bottomMargin=inch,
)
# Get styles
styles = getSampleStyleSheet()
normal_style = styles["Normal"]
heading_style = styles["Heading1"]
# Try to register a font that supports Chinese characters
try:
# Try to use system fonts that support Chinese
import platform
system = platform.system()
if system == "Windows":
# Try common Windows fonts
for font_name in ["SimSun", "SimHei", "Microsoft YaHei"]:
try:
from reportlab.pdfbase.cidfonts import (
UnicodeCIDFont,
)
pdfmetrics.registerFont(UnicodeCIDFont(font_name))
normal_style.fontName = font_name
heading_style.fontName = font_name
break
except Exception:
continue
elif system == "Darwin": # macOS
for font_name in ["STSong-Light", "STHeiti"]:
try:
from reportlab.pdfbase.cidfonts import (
UnicodeCIDFont,
)
pdfmetrics.registerFont(UnicodeCIDFont(font_name))
normal_style.fontName = font_name
heading_style.fontName = font_name
break
except Exception:
continue
except Exception:
pass # Use default fonts if Chinese font setup fails
# Build content
story = []
# Handle markdown or plain text
if text_path.suffix.lower() == ".md":
# Complete markdown parsing with full feature support
import re
from reportlab.platypus import (
Image as RLImage,
Table,
TableStyle,
)
from reportlab.lib import colors
lines = text_content.split("\n")
i = 0
in_code_block = False
code_lines = []
in_table = False
table_lines = []
while i < len(lines):
line = lines[i].rstrip()
# Handle code blocks
if line.startswith("```"):
if not in_code_block:
# Start code block
in_code_block = True
code_lines = []
else:
# End code block
in_code_block = False
if code_lines:
code_style = ParagraphStyle(
name="Code",
parent=normal_style,
fontName="Courier",
fontSize=9,
backgroundColor=colors.lightgrey,
borderColor=colors.grey,
borderWidth=1,
borderPadding=6,
leftIndent=12,
rightIndent=12,
)
code_text = "\n".join(code_lines)
story.append(Paragraph(code_text, code_style))
story.append(Spacer(1, 12))
i += 1
continue
if in_code_block:
code_lines.append(line)
i += 1
continue
# Handle tables
if (
"|" in line
and line.strip().startswith("|")
and line.strip().endswith("|")
):
if not in_table:
in_table = True
table_lines = []
table_lines.append(line)
i += 1
continue
elif in_table:
# End of table
in_table = False
if (
len(table_lines) >= 2
): # Need at least header and separator
try:
# Parse table
table_data = []
for table_line in table_lines:
if (
"---" in table_line
or "===" in table_line
):
continue # Skip separator line
cells = [
cell.strip()
for cell in table_line.split("|")[1:-1]
]
if cells:
table_data.append(cells)
if table_data:
# Create table
table = Table(table_data)
table.setStyle(
TableStyle(
[
(
"BACKGROUND",
(0, 0),
(-1, 0),
colors.grey,
),
(
"TEXTCOLOR",
(0, 0),
(-1, 0),
colors.whitesmoke,
),
(
"ALIGN",
(0, 0),
(-1, -1),
"LEFT",
),
(
"FONTNAME",
(0, 0),
(-1, 0),
"Helvetica-Bold",
),
(
"FONTSIZE",
(0, 0),
(-1, 0),
10,
),
(
"BOTTOMPADDING",
(0, 0),
(-1, 0),
12,
),
(
"BACKGROUND",
(0, 1),
(-1, -1),
colors.beige,
),
(
"GRID",
(0, 0),
(-1, -1),
1,
colors.black,
),
]
)
)
story.append(table)
story.append(Spacer(1, 12))
except Exception:
# Fallback to text if table parsing fails
for table_line in table_lines:
story.append(
Paragraph(table_line, normal_style)
)
continue
# Empty lines
if not line.strip():
story.append(Spacer(1, 12))
i += 1
continue
# Headers
if line.startswith("#"):
level = len(line) - len(line.lstrip("#"))
header_text = line.lstrip("#").strip()
if header_text:
header_style = ParagraphStyle(
name=f"Heading{level}",
parent=heading_style,
fontSize=max(16 - level, 10),
spaceAfter=8,
spaceBefore=16 if level <= 2 else 12,
)
story.append(
Paragraph(
MineruParser._process_inline_markdown(
header_text
),
header_style,
)
)
# Horizontal rules
elif re.match(r"^---+$|^\*\*\*+$|^___+$", line):
from reportlab.platypus import HRFlowable
story.append(
HRFlowable(
width="100%",
thickness=1,
lineCap="round",
color=colors.grey,
)
)
story.append(Spacer(1, 12))
# Images
elif line.startswith("![") and "](" in line and ")" in line:
match = re.search(r"!\[([^\]]*)\]\(([^)]+)\)", line)
if match:
alt_text = match.group(1)
img_src = match.group(2)
# Handle relative paths
if not Path(img_src).is_absolute():
img_path = text_path.parent / img_src
else:
img_path = Path(img_src)
if img_path.exists():
try:
# Auto-scale image
from PIL import Image as PILImage
with PILImage.open(img_path) as pil_img:
img_width, img_height = pil_img.size
max_width = 5 * inch
max_height = 4 * inch
# Calculate scaled dimensions
scale = min(
max_width / img_width,
max_height / img_height,
1.0,
)
final_width = img_width * scale
final_height = img_height * scale
img = RLImage(
str(img_path),
width=final_width,
height=final_height,
)
story.append(img)
if alt_text:
caption_style = ParagraphStyle(
name="Caption",
parent=normal_style,
fontSize=9,
textColor=colors.grey,
alignment=1, # Center
)
story.append(
Paragraph(
f"Image: {alt_text}",
caption_style,
)
)
story.append(Spacer(1, 12))
print(f" 📷 Added image: {img_path.name}")
except Exception as e:
story.append(
Paragraph(
f"[Image loading failed: {alt_text}]",
normal_style,
)
)
print(
f" ⚠️ Failed to load image {img_path}: {e}"
)
else:
story.append(
Paragraph(
f"[Image not found: {alt_text} - {img_src}]",
normal_style,
)
)
print(f" ⚠️ Image not found: {img_src}")
# Block quotes
elif line.startswith(">"):
quote_text = line.lstrip(">").strip()
quote_style = ParagraphStyle(
name="Quote",
parent=normal_style,
leftIndent=24,
rightIndent=24,
fontSize=10,
textColor=colors.darkgrey,
borderColor=colors.grey,
borderWidth=0,
borderPadding=8,
backgroundColor=colors.lightgrey,
)
story.append(
Paragraph(
MineruParser._process_inline_markdown(
quote_text
),
quote_style,
)
)
story.append(Spacer(1, 6))
# Unordered lists
elif re.match(r"^[\s]*[-\*\+]\s+", line):
indent_level = len(line) - len(line.lstrip())
list_text = re.sub(r"^[\s]*[-\*\+]\s+", "", line)
list_style = ParagraphStyle(
name="List",
parent=normal_style,
leftIndent=12 + indent_level,
bulletIndent=6 + indent_level,
bulletFontName="Symbol",
)
story.append(
Paragraph(
f"{MineruParser._process_inline_markdown(list_text)}",
list_style,
)
)
story.append(Spacer(1, 3))
# Ordered lists
elif re.match(r"^[\s]*\d+\.\s+", line):
indent_level = len(line) - len(line.lstrip())
match = re.match(r"^[\s]*(\d+)\.\s+(.+)", line)
if match:
num = match.group(1)
list_text = match.group(2)
list_style = ParagraphStyle(
name="OrderedList",
parent=normal_style,
leftIndent=12 + indent_level,
bulletIndent=6 + indent_level,
)
story.append(
Paragraph(
f"{num}. {MineruParser._process_inline_markdown(list_text)}",
list_style,
)
)
story.append(Spacer(1, 3))
# Regular paragraphs
else:
processed_text = MineruParser._process_inline_markdown(
line
)
story.append(Paragraph(processed_text, normal_style))
story.append(Spacer(1, 6))
i += 1
else:
# Handle plain text files (.txt)
print(
f"Processing plain text file with {len(text_content)} characters..."
)
# Split text into lines and process each line
lines = text_content.split("\n")
line_count = 0
for line in lines:
line = line.rstrip()
line_count += 1
# Empty lines
if not line.strip():
story.append(Spacer(1, 6))
continue
# Regular text lines
# Escape special characters for ReportLab
safe_line = (
line.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
)
# Create paragraph
story.append(Paragraph(safe_line, normal_style))
story.append(Spacer(1, 3))
print(f"Added {line_count} lines to PDF")
# If no content was added, add a placeholder
if not story:
story.append(Paragraph("(Empty text file)", normal_style))
# Build PDF
doc.build(story)
print(
f"Successfully converted {text_path.name} to PDF ({pdf_path.stat().st_size / 1024:.1f} KB)"
)
except ImportError:
raise RuntimeError(
"reportlab is required for text-to-PDF conversion. "
"Please install it using: pip install reportlab"
)
except Exception as e:
raise RuntimeError(
f"Failed to convert text file {text_path.name} to PDF: {str(e)}"
)
# Validate the generated PDF
if not pdf_path.exists() or pdf_path.stat().st_size < 100:
raise RuntimeError(
f"PDF conversion failed for {text_path.name} - generated PDF is empty or corrupted."
)
# Parse the converted PDF
return MineruParser.parse_pdf(
pdf_path=pdf_path, output_dir=output_dir, **kwargs
)
except Exception as e:
print(f"Error in parse_text_file: {str(e)}")
raise
@staticmethod
def parse_document(
file_path: Union[str, Path],
method: str = "auto",
output_dir: Optional[str] = None,
lang: Optional[str] = None,
**kwargs,
) -> Tuple[List[Dict[str, Any]], str]:
"""
Parse document using MinerU 2.0 based on file extension
Args:
file_path: Path to the file to be parsed
method: Parsing method (auto, txt, ocr)
output_dir: Output directory path
lang: Document language for OCR optimization
**kwargs: Additional parameters for mineru command
Returns:
Tuple[List[Dict[str, Any]], str]: Tuple containing (content list JSON, Markdown text)
"""
# Convert to Path object
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"File does not exist: {file_path}")
# Get file extension
ext = file_path.suffix.lower()
# Choose appropriate parser based on file type
if ext == ".pdf":
return MineruParser.parse_pdf(file_path, output_dir, method, lang, **kwargs)
elif ext in [".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif", ".gif", ".webp"]:
return MineruParser.parse_image(file_path, output_dir, lang, **kwargs)
elif ext in [".doc", ".docx", ".ppt", ".pptx", ".xls", ".xlsx"]:
print(
f"Warning: Office document detected ({ext}). "
f"MinerU 2.0 requires conversion to PDF first."
)
return MineruParser.parse_office_doc(file_path, output_dir, **kwargs)
elif ext in [".txt", ".md"]:
return MineruParser.parse_text_file(file_path, output_dir, **kwargs)
else:
# For unsupported file types, try as PDF
print(
f"Warning: Unsupported file extension '{ext}', "
f"attempting to parse as PDF"
)
return MineruParser.parse_pdf(file_path, output_dir, method, lang, **kwargs)
@staticmethod
def check_installation() -> bool:
"""
Check if MinerU 2.0 is properly installed
Returns:
bool: True if installation is valid, False otherwise
"""
try:
result = subprocess.run(
["mineru", "--version"],
capture_output=True,
text=True,
check=True,
encoding="utf-8",
errors="ignore",
)
print(f"MinerU version: {result.stdout.strip()}")
return True
except (subprocess.CalledProcessError, FileNotFoundError):
print(
"MinerU 2.0 is not properly installed. "
"Please install it using: pip install -U 'mineru[core]'"
)
return False
@staticmethod
def _process_inline_markdown(text: str) -> str:
"""
Process inline markdown formatting (bold, italic, code, links)
Args:
text: Raw text with markdown formatting
Returns:
Text with ReportLab markup
"""
import re
# Escape special characters for ReportLab
text = text.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
# Bold text: **text** or __text__
text = re.sub(r"\*\*(.*?)\*\*", r"<b>\1</b>", text)
text = re.sub(r"__(.*?)__", r"<b>\1</b>", text)
# Italic text: *text* or _text_ (but not in the middle of words)
text = re.sub(r"(?<!\w)\*([^*\n]+?)\*(?!\w)", r"<i>\1</i>", text)
text = re.sub(r"(?<!\w)_([^_\n]+?)_(?!\w)", r"<i>\1</i>", text)
# Inline code: `code`
text = re.sub(
r"`([^`]+?)`",
r'<font name="Courier" size="9" color="darkred">\1</font>',
text,
)
# Links: [text](url) - convert to text with URL annotation
def link_replacer(match):
link_text = match.group(1)
url = match.group(2)
return f'<link href="{url}" color="blue"><u>{link_text}</u></link>'
text = re.sub(r"\[([^\]]+?)\]\(([^)]+?)\)", link_replacer, text)
# Strikethrough: ~~text~~
text = re.sub(r"~~(.*?)~~", r"<strike>\1</strike>", text)
return text
def main():
"""
Main function to run the MinerU 2.0 parser from command line
"""
parser = argparse.ArgumentParser(description="Parse documents using MinerU 2.0")
parser.add_argument("file_path", help="Path to the document to parse")
parser.add_argument("--output", "-o", help="Output directory path")
parser.add_argument(
"--method",
"-m",
choices=["auto", "txt", "ocr"],
default="auto",
help="Parsing method (auto, txt, ocr)",
)
parser.add_argument(
"--lang",
"-l",
help="Document language for OCR optimization (e.g., ch, en, ja)",
)
parser.add_argument(
"--backend",
"-b",
choices=[
"pipeline",
"vlm-transformers",
"vlm-sglang-engine",
"vlm-sglang-client",
],
default="pipeline",
help="Parsing backend",
)
parser.add_argument(
"--device",
"-d",
help="Inference device (e.g., cpu, cuda, cuda:0, npu, mps)",
)
parser.add_argument(
"--source",
choices=["huggingface", "modelscope", "local"],
default="huggingface",
help="Model source",
)
parser.add_argument(
"--no-formula",
action="store_true",
help="Disable formula parsing",
)
parser.add_argument(
"--no-table",
action="store_true",
help="Disable table parsing",
)
parser.add_argument(
"--stats", action="store_true", help="Display content statistics"
)
parser.add_argument(
"--check",
action="store_true",
help="Check MinerU installation",
)
args = parser.parse_args()
# Check installation if requested
if args.check:
if MineruParser.check_installation():
print("✅ MinerU 2.0 is properly installed")
return 0
else:
print("❌ MinerU 2.0 installation check failed")
return 1
try:
# Parse the document
content_list, md_content = MineruParser.parse_document(
file_path=args.file_path,
method=args.method,
output_dir=args.output,
lang=args.lang,
backend=args.backend,
device=args.device,
source=args.source,
formula=not args.no_formula,
table=not args.no_table,
)
print(f"✅ Successfully parsed: {args.file_path}")
print(f"📝 Generated {len(md_content)} characters of markdown")
print(f"📊 Extracted {len(content_list)} content blocks")
# Display statistics if requested
if args.stats:
print("\n📈 Document Statistics:")
print(f"Total content blocks: {len(content_list)}")
# Count different types of content
content_types = {}
for item in content_list:
if isinstance(item, dict):
content_type = item.get("type", "unknown")
content_types[content_type] = content_types.get(content_type, 0) + 1
if content_types:
print("\n📋 Content Type Distribution:")
for content_type, count in sorted(content_types.items()):
print(f"{content_type}: {count}")
except Exception as e:
print(f"❌ Error: {str(e)}")
return 1
return 0
if __name__ == "__main__":
exit(main())