Skip to content

Instantly share code, notes, and snippets.

@relic-yuexi
Last active December 3, 2025 17:11
Show Gist options
  • Select an option

  • Save relic-yuexi/f4fb5970812e62540d948d6385baee4a to your computer and use it in GitHub Desktop.

Select an option

Save relic-yuexi/f4fb5970812e62540d948d6385baee4a to your computer and use it in GitHub Desktop.

PDF Processing with Multi-GPU Support / 多GPU支持的PDF处理

中文说明

概述

本项目提供了一个使用多GPU并行处理PDF文件的Python脚本。它基于 MinerU 库,能够将PDF文件转换为 Markdown/JSON 格式,并支持在多GPU环境下高效处理大量文件。

该脚本专为高吞吐量场景设计,通过多进程(Multiprocessing)与 CUDA 上下文隔离技术,实现了稳定的并行解析。

基于 MinerU 2.6.5

主要功能

  • ⚡️ 自动负载均衡:能够根据指定的 GPU 列表(如 0,1,2)和总进程数,自动通过取模运算(Modulo Operation)将任务均匀分配到每张显卡上。
  • 🛡️ 进程安全隔离:使用 Python 的 spawn 启动模式,确保每个 Worker 进程拥有独立的 CUDA 上下文,彻底解决 PyTorch 多进程环境下的显存冲突问题。
  • 📂 目录结构保持:输出文件会智能保留输入文件夹的相对层级结构,方便文件管理。
  • ⏯️ 断点续传:支持 --skip-processed 参数,自动跳过输出目录中已存在的文件,方便在任务中断后快速恢复。
  • 📝 详细日志:基于 loguru 的日志系统,实时显示每个 Worker(及对应的 GPU)的处理状态和错误信息。

安装依赖

在使用本脚本前,请确保已安装 Python 3.8+,并安装以下核心依赖:

pip install loguru mineru
# 同时也需要安装 mineru 所需的 torch 等依赖,具体参考 mineru 官方文档

使用指南

1. 基础用法

单张显卡(GPU 0),启动 4 个进程处理数据:

python process_pdf.py \
  --input-dir ./data/pdf_raw \
  --output-dir ./data/pdf_processed \
  --gpus 0 \
  --num-workers 4

2. 多显卡并行 (推荐)

假设你有 4 张显卡 (ID: 0,1,2,3),并希望每张卡跑 3 个进程(共 12 个进程):

python process_pdf.py \
  --input-dir ./data/pdf_raw \
  --output-dir ./data/pdf_processed \
  --gpus 0,1,2,3 \
  --num-workers 12

3. 断点续传

如果任务中途意外停止,添加 --skip-processed 参数即可跳过已完成的文件:

python process_pdf.py \
  --input-dir ./data/pdf_raw \
  --output-dir ./data/pdf_processed \
  --skip-processed

4. 参数详解

参数 类型 默认值 说明
--input-dir String (必填) PDF 文件的输入根目录
--output-dir String (必填) 结果保存的输出目录
--num-workers Int 4 并行进程数。建议设置为 GPU数量 * (2~4)
--gpus String "0" 可用的 GPU ID 列表,逗号分隔 (例: "0,1,2")
--backend String "pipeline" 解析后端: pipeline (本地) 或 vlm-siliconcloud
--lang String "en" 文档主要语言 (en, zh)
--skip-processed Flag False 是否跳过输出目录中已存在结果的 PDF
--vram Int 8 告诉 MinerU 每个进程可用的虚拟显存大小 (GB)

English Description

Overview

This project provides a Python script for parallel processing of PDF files using multiple GPUs. Built upon the MinerU library, it converts PDFs into Markdown/JSON formats and is optimized for high-throughput batch processing.

It leverages multiprocessing and CUDA context isolation to ensure stable and efficient parallel parsing.

Key Features

  • ⚡️ Automatic Load Balancing: Automatically distributes tasks across specified GPUs using Round-Robin scheduling based on the process ID.
  • 🛡️ Process Isolation: Uses the spawn start method to ensure each worker process has an independent CUDA context, preventing VRAM conflicts common in PyTorch multiprocessing.
  • 📂 Structure Preservation: Maintains the relative directory structure of input files in the output directory.
  • ⏯️ Resume Capability: Supports --skip-processed to automatically skip files that have already been converted, allowing for easy resumption of interrupted tasks.
  • 📝 Detailed Logging: Built with loguru, providing real-time status updates including Worker ID and GPU ID for easier debugging.

Installation

Ensure Python 3.8+ is installed. Install the required packages:

pip install loguru mineru
# Also ensure Mineru's dependencies (torch, etc.) are installed.

Usage Examples

1. Basic Usage

Using a single GPU (ID 0) with 4 parallel workers:

python process_pdf.py \
  --input-dir ./data/pdf_raw \
  --output-dir ./data/pdf_processed \
  --gpus 0 \
  --num-workers 4

2. Multi-GPU Parallelism (Recommended)

Using 4 GPUs (0,1,2,3) with 12 total workers (3 workers per GPU):

python process_pdf.py \
  --input-dir ./data/pdf_raw \
  --output-dir ./data/pdf_processed \
  --gpus 0,1,2,3 \
  --num-workers 12

3. Resume Mode

If a task was interrupted, use --skip-processed to ignore finished files:

python process_pdf.py \
  --input-dir ./data/pdf_raw \
  --output-dir ./data/pdf_processed \
  --skip-processed

4. Arguments

Argument Type Default Description
--input-dir String (Required) Root directory containing input PDFs
--output-dir String (Required) Directory to save processed results
--num-workers Int 4 Total number of parallel processes.
--gpus String "0" Comma-separated GPU IDs (e.g., "0,1,2")
--backend String "pipeline" Backend: pipeline (local) or vlm-siliconcloud etc.
--lang String "en" Document language (en, zh)
--skip-processed Flag False Skip files that already exist in the output dir
--vram Int 8 Virtual VRAM size (GB) setting for MinerU

License

Apache-2.0 License

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Mineru PDF 多进程并行处理脚本 (Mineru PDF Processing Parallel Pipeline)
=====================================================================
这是一个用于大规模 PDF 处理的工程化脚本。它基于 Python 的多进程 (Multiprocessing) 模块构建,
能够自动将任务分发到多张 GPU 上并行运行。
主要功能:
1. 多显卡并行:自动在指定的 GPU 之间进行负载均衡。
2. 进程隔离:每个 Worker 进程独立运行,避免 CUDA 上下文冲突。
3. 断点续传:支持跳过输出目录中已经存在的文件。
4. 结构保持:输出文件会尝试保持输入文件夹的层级结构。
使用示例 (Usage Examples):
------------------------
1. 基本用法 (使用 0 号卡,启动 2 个进程):
python process_pdf.py --input-dir ./my_pdfs --output-dir ./output --gpus 0 --num-workers 2
2. 多卡并行 (使用 0,1,2,3 四张卡,每张卡跑 2 个进程,共 8 个进程):
python process_pdf.py --input-dir ./data --output-dir ./results --gpus 0,1,2,3 --num-workers 8
3. 指定后端与语言 (使用 VLM 后端处理中文):
python process_pdf.py --input-dir ./pdfs --output-dir ./json --backend vlm-siliconcloud --lang zh
4. 跳过已处理文件 (用于中断后恢复):
python process_pdf.py --input-dir ./data --output-dir ./results --skip-processed
依赖安装:
pip install loguru mineru
Author: relic-yuexi
License: Apache-2.0
"""
import argparse
import json
import logging
import multiprocessing as mp
import os
import sys
import time
import warnings
import queue
import traceback
from datetime import datetime
from pathlib import Path
from typing import List, Tuple, Optional, Set
from loguru import logger
# --- 配置与常量 ---
# Loguru 日志格式配置
LOG_FORMAT = (
"<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{extra[worker_id]}</cyan> | "
"<level>{message}</level>"
)
# 抑制第三方库的冗余警告
warnings.filterwarnings("ignore", message=".*Cannot set gray non-stroke color.*")
warnings.filterwarnings("ignore", message=".*FontBBox.*")
logging.getLogger("pdfminer").setLevel(logging.ERROR)
logging.getLogger("pypdfium2").setLevel(logging.ERROR)
def setup_worker_env(gpu_id: int, vram_size: int) -> None:
"""
设置 Worker 进程的环境变量。
关键说明:
必须在导入任何 torch 或 mineru 相关库之前调用此函数。
这通过设置 CUDA_VISIBLE_DEVICES 确保该进程只能看到被分配的那张显卡。
"""
os.environ['CUDA_VISIBLE_DEVICES'] = str(gpu_id)
os.environ['MINERU_MODEL_SOURCE'] = "modelscope"
# 注意: 因为 CUDA_VISIBLE_DEVICES 已经屏蔽了其他卡,所以对进程来说永远是 device 0
os.environ['MINERU_DEVICE_MODE'] = "cuda:0"
os.environ['MINERU_VIRTUAL_VRAM_SIZE'] = str(vram_size)
def process_single_pdf(
pdf_path: str,
output_base_dir: str,
lang: str,
backend: str,
gpu_id: int
) -> Tuple[str, bool, Optional[str]]:
"""
处理单个 PDF 的核心逻辑。
Args:
pdf_path: PDF 文件绝对路径
output_base_dir: 输出根目录
lang: 语言代码 (如 'en', 'zh')
backend: 解析后端 ('pipeline', 'vlm-xxx')
gpu_id: 分配的物理 GPU ID (仅用于日志记录)
Returns:
(file_path, is_success, error_message)
"""
try:
# --- 关键:延迟导入 (Lazy Import) ---
# 必须在子进程内部导入这些库,防止在主进程中过早初始化 CUDA 上下文导致冲突。
from mineru.cli.common import convert_pdf_bytes_to_bytes_by_pypdfium2, prepare_env, read_fn
from mineru.data.data_reader_writer import FileBasedDataWriter
from mineru.utils.draw_bbox import draw_layout_bbox, draw_span_bbox
from mineru.utils.enum_class import MakeMode
from mineru.backend.vlm.vlm_analyze import doc_analyze as vlm_doc_analyze
from mineru.backend.pipeline.pipeline_analyze import doc_analyze as pipeline_doc_analyze
from mineru.backend.pipeline.pipeline_middle_json_mkcontent import union_make as pipeline_union_make
from mineru.backend.pipeline.model_json_to_middle_json import result_to_middle_json as pipeline_result_to_middle_json
from mineru.backend.vlm.vlm_middle_json_mkcontent import union_make as vlm_union_make
import copy
pdf_path_obj = Path(pdf_path)
# --- 计算输出路径 ---
# 尝试保持相对目录结构。如果没有明确的层级,则保存到输出根目录。
try:
# 这里的逻辑是:如果 PDF 在 input_dir/A/B.pdf,我们希望输出到 output_dir/A/
# 简单起见,这里取父文件夹名作为子目录
rel_path = pdf_path_obj.parent.name
except Exception:
rel_path = "."
target_output_dir = Path(output_base_dir) / rel_path
target_output_dir.mkdir(parents=True, exist_ok=True)
file_name = pdf_path_obj.stem
# --- Mineru 处理逻辑 ---
pdf_bytes = read_fn(str(pdf_path_obj))
# 设置解析参数
parse_method = "auto" if backend == "pipeline" else "vlm"
formula_enable = True
table_enable = True
# 准备环境 (创建 output/images 和 output/md 目录)
local_image_dir, local_md_dir = prepare_env(str(target_output_dir), file_name, parse_method)
image_writer = FileBasedDataWriter(local_image_dir)
md_writer = FileBasedDataWriter(local_md_dir)
# 根据后端选择处理流程
if backend == "pipeline":
infer_results, all_image_lists, all_pdf_docs, lang_list, ocr_enabled_list = pipeline_doc_analyze(
[pdf_bytes], [lang],
parse_method="auto",
formula_enable=formula_enable,
table_enable=table_enable
)
# 提取 Pipeline 结果
model_list = infer_results[0]
images_list = all_image_lists[0]
pdf_doc = all_pdf_docs[0]
middle_json = pipeline_result_to_middle_json(
model_list, images_list, pdf_doc, image_writer, lang_list[0], ocr_enabled_list[0], formula_enable
)
make_func = pipeline_union_make
# model_output = copy.deepcopy(model_list) # 如果需要保存原始模型输出可解开注释
else: # VLM 后端逻辑
vlm_backend_name = backend.replace("vlm-", "") if backend.startswith("vlm-") else backend
middle_json, model_output = vlm_doc_analyze(
pdf_bytes, image_writer=image_writer, backend=vlm_backend_name, server_url=None
)
make_func = vlm_union_make
# --- 生成最终结果文件 ---
pdf_info = middle_json["pdf_info"]
image_dir_name = os.path.basename(local_image_dir)
# 1. 写入 Markdown
md_content = make_func(pdf_info, MakeMode.MM_MD, image_dir_name)
md_writer.write_string(f"{file_name}.md", md_content)
# 2. 写入 Content List (JSON)
content_list = make_func(pdf_info, MakeMode.CONTENT_LIST, image_dir_name)
md_writer.write_string(f"{file_name}_content_list.json", json.dumps(content_list, ensure_ascii=False, indent=4))
# 3. 写入 Middle JSON (包含详细结构信息,方便调试)
md_writer.write_string(f"{file_name}_middle.json", json.dumps(middle_json, ensure_ascii=False, indent=4))
logger.info(f"Success: {local_md_dir}")
return str(pdf_path), True, None
except Exception:
# 捕获所有异常,确保进程不崩溃
err_msg = traceback.format_exc()
# 只记录最后一行错误信息以保持日志整洁,详细堆栈已在 err_msg 中
logger.error(f"Error processing {pdf_path}: {err_msg.splitlines()[-1]}")
return str(pdf_path), False, err_msg
def worker_loop(
task_queue: mp.Queue,
result_queue: mp.Queue,
gpu_id: int,
worker_idx: int,
vram_size: int
):
"""
Worker 进程的主循环。
不断从 task_queue 取任务,处理完后将结果放入 result_queue。
"""
# 1. 初始化环境变量 (最重要的一步)
setup_worker_env(gpu_id, vram_size)
# 2. 配置当前进程的 Logger (带上 Worker ID 和 GPU ID)
logger.configure(extra={"worker_id": f"W-{worker_idx}|GPU-{gpu_id}"})
logger.remove()
logger.add(sys.stderr, format=LOG_FORMAT, level="INFO")
logger.info("Worker started.")
while True:
try:
# 获取任务,设置超时防止死锁
task = task_queue.get(timeout=3.0)
# 如果收到 None,说明是终止信号
if task is None:
break
# 解包任务参数
pdf_path, output_dir, lang, backend = task
# 执行处理
result = process_single_pdf(pdf_path, output_dir, lang, backend, gpu_id)
# 返回结果
result_queue.put(result)
except queue.Empty:
continue
except Exception as e:
logger.critical(f"Worker crashed unexpectedly: {e}")
break
logger.info("Worker shutdown.")
def get_processed_files(output_dir: Path) -> Set[str]:
"""扫描输出目录,获取已经处理完成的 PDF 文件名集合 (基于 .md 文件存在与否)"""
processed = set()
if output_dir.exists():
for f in output_dir.rglob("*.md"):
processed.add(f.stem)
return processed
def main():
# --- 命令行参数定义 ---
parser = argparse.ArgumentParser(description="Mineru 多进程 PDF 并行解析工具")
parser.add_argument("--input-dir", type=str, required=True, help="输入 PDF 文件夹路径")
parser.add_argument("--output-dir", type=str, required=True, help="结果输出文件夹路径")
parser.add_argument("--num-workers", type=int, default=4, help="并行的 Worker 进程总数 (建议每张卡 2-4 个)")
parser.add_argument("--gpus", type=str, default="0", help="使用的 GPU ID 列表,用逗号分隔 (例如: '0,1,2')")
parser.add_argument("--backend", type=str, default="pipeline", choices=["pipeline", "vlm-siliconcloud", "vlm-openai"], help="解析后端选择")
parser.add_argument("--lang", type=str, default="en", help="文档主要语言 (en 或 zh)")
parser.add_argument("--skip-processed", action="store_true", help="是否跳过输出目录中已存在的文件")
parser.add_argument("--vram", type=int, default=8, help="每个进程预设的虚拟显存大小 (GB),仅用于 MinerU 内部配置")
args = parser.parse_args()
# --- 初始化检查 ---
input_path = Path(args.input_dir)
output_path = Path(args.output_dir)
gpu_list = [int(x.strip()) for x in args.gpus.split(",")]
if not input_path.exists():
logger.error(f"输入目录不存在: {input_path}")
sys.exit(1)
# --- 文件扫描 ---
logger.info(f"正在扫描目录: {input_path} ...")
all_pdfs = list(input_path.rglob("*.pdf"))
# 过滤已处理文件
if args.skip_processed:
processed_files = get_processed_files(output_path)
pending_pdfs = [p for p in all_pdfs if p.stem not in processed_files]
logger.info(f"总文件数: {len(all_pdfs)} | 跳过已处理: {len(processed_files)} | 待处理: {len(pending_pdfs)}")
else:
pending_pdfs = all_pdfs
logger.info(f"待处理文件总数: {len(all_pdfs)}")
if not pending_pdfs:
logger.info("没有需要处理的文件。退出。")
return
# --- 多进程启动准备 ---
# 必须使用 'spawn' 模式,否则 CUDA 上下文会在 fork 时被错误复制
ctx = mp.get_context('spawn')
task_queue = ctx.Queue()
result_queue = ctx.Queue()
# 1. 填充任务队列
for pdf in pending_pdfs:
task_queue.put((str(pdf), str(output_path), args.lang, args.backend))
# 2. 填充终止信号 (每个 Worker 需要一个 None)
for _ in range(args.num_workers):
task_queue.put(None)
# 3. 启动 Worker 进程
processes = []
logger.info(f"启动 {args.num_workers} 个 Worker 进程,分布在 GPU: {gpu_list}")
for i in range(args.num_workers):
# --- 核心数学逻辑:轮询分配 (Round-Robin) ---
# 公式: assigned_gpu = gpu_list[i % len(gpu_list)]
# 作用: 无论有多少个进程,都会均匀地依次分配给列表中的 GPU。
assigned_gpu = gpu_list[i % len(gpu_list)]
p = ctx.Process(
target=worker_loop,
args=(task_queue, result_queue, assigned_gpu, i, args.vram)
)
p.start()
processes.append(p)
# --- 结果收集与监控 ---
success_count = 0
fail_count = 0
failed_files = []
start_time = time.time()
total_tasks = len(pending_pdfs)
try:
while success_count + fail_count < total_tasks:
# 检查是否有僵尸进程 (所有 Worker 都挂了但任务没完)
if not any(p.is_alive() for p in processes) and result_queue.empty():
logger.error("所有 Worker 进程均已退出,但任务未完成。强制停止。")
break
try:
# 获取结果,超时时间 5 秒
path, success, msg = result_queue.get(timeout=5)
if success:
success_count += 1
else:
fail_count += 1
failed_files.append((path, msg))
# 每处理 10 个文件或遇到错误时打印进度
if (success_count + fail_count) % 10 == 0 or not success:
logger.info(f"进度: {success_count + fail_count}/{total_tasks} | 成功: {success_count} | 失败: {fail_count}")
except queue.Empty:
continue
except KeyboardInterrupt:
logger.warning("用户中断 (Ctrl+C)。正在终止所有进程...")
for p in processes:
p.terminate()
finally:
# 清理与收尾
logger.info("等待所有子进程退出...")
for p in processes:
p.join()
duration = time.time() - start_time
logger.info(f"任务结束。耗时: {duration:.2f}秒。成功: {success_count}, 失败: {fail_count}")
# 保存失败列表
if failed_files:
log_file = output_path / f"failed_files_{int(time.time())}.txt"
with open(log_file, "w", encoding="utf-8") as f:
for path, msg in failed_files:
f.write(f"{path}\t{msg}\n")
logger.warning(f"失败文件列表已保存至: {log_file}")
if __name__ == "__main__":
# 强制设置全局启动方法为 spawn,确保兼容性
mp.set_start_method('spawn', force=True)
# 配置主进程日志
logger.configure(extra={"worker_id": "MAIN"})
logger.remove()
logger.add(sys.stderr, format=LOG_FORMAT, level="INFO")
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment