Parquet images
What this example does - Run a batch conversion on a parquet file with an image column.
Requirements
- Python 3.9+
- Install Docling: pip install docling
How to run
- python docs/examples/parquet_images.py FILE
The parquet file should be in the format similar to the ViDoRe V3 dataset. https://huggingface.co/collections/vidore/vidore-benchmark-v3
For example: - https://huggingface.co/datasets/vidore/vidore_v3_hr/blob/main/corpus/test-00000-of-00001.parquet
Start models with vllm
vllm serve ibm-granite/granite-docling-258M \
--host 127.0.0.1 --port 8000 \
--max-num-seqs 512 \
--max-num-batched-tokens 8192 \
--enable-chunked-prefill \
--gpu-memory-utilization 0.9
import io
import sys
import time
from pathlib import Path
from typing import Annotated, Literal
import pyarrow.parquet as pq
import typer
from PIL import Image
from docling.datamodel import vlm_model_specs
from docling.datamodel.accelerator_options import AcceleratorDevice, AcceleratorOptions
from docling.datamodel.base_models import ConversionStatus, DocumentStream, InputFormat
from docling.datamodel.pipeline_options import (
PdfPipelineOptions,
PipelineOptions,
RapidOcrOptions,
VlmPipelineOptions,
)
from docling.datamodel.pipeline_options_vlm_model import ApiVlmOptions, ResponseFormat
from docling.datamodel.settings import settings
from docling.document_converter import DocumentConverter, ImageFormatOption
from docling.pipeline.base_pipeline import ConvertPipeline
from docling.pipeline.legacy_standard_pdf_pipeline import LegacyStandardPdfPipeline
from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline
from docling.pipeline.vlm_pipeline import VlmPipeline
from docling.utils.accelerator_utils import decide_device
def process_document(
images: list[Image.Image], chunk_idx: int, doc_converter: DocumentConverter
):
"""Builds a tall image and sends it through Docling."""
print(f"\n--- Processing chunk {chunk_idx} with {len(images)} images ---")
# Convert images to mode RGB (TIFF pages must match)
rgb_images = [im.convert("RGB") for im in images]
# First image is the base frame
first = rgb_images[0]
rest = rgb_images[1:]
# Create multi-page TIFF using PIL frames
buf = io.BytesIO()
first.save(
buf,
format="TIFF",
save_all=True,
append_images=rest,
compression="tiff_deflate", # good compression, optional
)
buf.seek(0)
# Docling conversion
doc_stream = DocumentStream(name=f"doc_{chunk_idx}.tiff", stream=buf)
start_time = time.time()
conv_result = doc_converter.convert(doc_stream)
runtime = time.time() - start_time
assert conv_result.status == ConversionStatus.SUCCESS
pages = len(conv_result.pages)
print(
f"Chunk {chunk_idx} converted in {runtime:.2f} sec ({pages / runtime:.2f} pages/s)."
)
def run(
filename: Annotated[Path, typer.Argument()] = Path(
"docs/examples/data/vidore_v3_hr-slice.parquet"
),
doc_size: int = 192,
batch_size: int = 64,
pipeline: Literal["standard", "vlm", "legacy"] = "standard",
):
acc_opts = AcceleratorOptions()
device = decide_device(acc_opts.device)
ocr_options = RapidOcrOptions()
if "cuda" in device:
ocr_options = RapidOcrOptions(backend="torch")
# On Python 3.14 we only have torch
if sys.version_info >= (3, 14):
ocr_options = RapidOcrOptions(backend="torch")
if pipeline == "standard":
pipeline_cls: type[ConvertPipeline] = StandardPdfPipeline
pipeline_options: PipelineOptions = PdfPipelineOptions(
ocr_options=ocr_options,
ocr_batch_size=batch_size,
layout_batch_size=batch_size,
table_batch_size=4,
)
elif pipeline == "legacy":
settings.perf.page_batch_size = batch_size
pipeline_cls: type[ConvertPipeline] = LegacyStandardPdfPipeline
pipeline_options: PipelineOptions = PdfPipelineOptions(
ocr_options=ocr_options,
ocr_batch_size=batch_size,
layout_batch_size=batch_size,
table_batch_size=4,
)
elif pipeline == "vlm":
settings.perf.page_batch_size = batch_size
pipeline_cls = VlmPipeline
vlm_options = vlm_model_specs.GRANITEDOCLING_VLLM_API
vlm_options.concurrency = batch_size
vlm_options.scale = 1.0 # avoid rescaling image inputs
pipeline_options = VlmPipelineOptions(
vlm_options=vlm_options,
enable_remote_services=True, # required when using a remote inference service.
)
else:
raise RuntimeError(f"Pipeline {pipeline} not available.")
doc_converter = DocumentConverter(
format_options={
InputFormat.IMAGE: ImageFormatOption(
pipeline_cls=pipeline_cls,
pipeline_options=pipeline_options,
)
}
)
start_time = time.time()
doc_converter.initialize_pipeline(InputFormat.IMAGE)
init_runtime = time.time() - start_time
print(f"Pipeline initialized in {init_runtime:.2f} seconds.")
# ------------------------------------------------------------
# Open parquet file in streaming mode
# ------------------------------------------------------------
pf = pq.ParquetFile(filename)
image_buffer = [] # holds up to doc_size images
chunk_idx = 0
# ------------------------------------------------------------
# Stream batches from parquet
# ------------------------------------------------------------
for batch in pf.iter_batches(batch_size=batch_size, columns=["image"]):
col = batch.column("image")
# Extract Python objects (PIL images)
# Arrow stores them as Python objects inside an ObjectArray
for i in range(len(col)):
img_dict = col[i].as_py() # {"bytes": ..., "path": ...}
pil_image = Image.open(io.BytesIO(img_dict["bytes"]))
image_buffer.append(pil_image)
# If enough images gathered → process one doc
if len(image_buffer) == doc_size:
process_document(image_buffer, chunk_idx, doc_converter)
image_buffer.clear()
chunk_idx += 1
# ------------------------------------------------------------
# Process trailing images (last partial chunk)
# ------------------------------------------------------------
if image_buffer:
process_document(image_buffer, chunk_idx, doc_converter)
if __name__ == "__main__":
typer.run(run)