| | """ |
| | Reading Order Reconstructor Implementation |
| | |
| | Rule-based reading order reconstruction for document elements. |
| | """ |
| |
|
| | import time |
| | from typing import List, Optional, Dict, Any, Tuple |
| | from collections import defaultdict |
| | import numpy as np |
| | from loguru import logger |
| |
|
| | from .base import ReadingOrderReconstructor, ReadingOrderConfig, ReadingOrderResult |
| | from ..schemas.core import BoundingBox, LayoutRegion, OCRRegion, LayoutType |
| |
|
| |
|
| | class RuleBasedReadingOrder(ReadingOrderReconstructor): |
| | """ |
| | Rule-based reading order reconstruction. |
| | |
| | Handles: |
| | - Single column documents |
| | - Multi-column layouts |
| | - Mixed layouts (text + figures) |
| | - Headers and footers |
| | """ |
| |
|
| | def initialize(self): |
| | """Initialize (no model loading needed).""" |
| | self._initialized = True |
| | logger.info("Initialized rule-based reading order reconstructor") |
| |
|
| | def reconstruct( |
| | self, |
| | regions: List[Any], |
| | layout_regions: Optional[List[LayoutRegion]] = None, |
| | page_width: Optional[int] = None, |
| | page_height: Optional[int] = None, |
| | ) -> ReadingOrderResult: |
| | """Reconstruct reading order using rule-based approach.""" |
| | if not self._initialized: |
| | self.initialize() |
| |
|
| | start_time = time.time() |
| |
|
| | if not regions: |
| | return ReadingOrderResult(success=True) |
| |
|
| | |
| | bboxes = self._extract_bboxes(regions) |
| | if not bboxes: |
| | return ReadingOrderResult(success=True) |
| |
|
| | |
| | if page_width is None: |
| | page_width = max(b.x_max for b in bboxes) |
| | if page_height is None: |
| | page_height = max(b.y_max for b in bboxes) |
| |
|
| | |
| | num_columns, column_assignments = self._detect_columns(bboxes, page_width) |
| |
|
| | |
| | if num_columns == 1: |
| | order = self._sort_single_column(bboxes) |
| | else: |
| | order = self._sort_multi_column(bboxes, column_assignments, num_columns) |
| |
|
| | |
| | if self.config.header_footer_separate and layout_regions: |
| | order = self._adjust_for_headers_footers( |
| | order, regions, layout_regions, page_height |
| | ) |
| |
|
| | processing_time = (time.time() - start_time) * 1000 |
| |
|
| | return ReadingOrderResult( |
| | order=order, |
| | ordered_regions=[regions[i] for i in order], |
| | num_columns=num_columns, |
| | column_assignments=column_assignments, |
| | processing_time_ms=processing_time, |
| | success=True, |
| | ) |
| |
|
| | def _extract_bboxes(self, regions: List[Any]) -> List[BoundingBox]: |
| | """Extract bounding boxes from regions.""" |
| | bboxes = [] |
| | for r in regions: |
| | if hasattr(r, 'bbox'): |
| | bboxes.append(r.bbox) |
| | elif isinstance(r, BoundingBox): |
| | bboxes.append(r) |
| | return bboxes |
| |
|
| | def _detect_columns( |
| | self, |
| | bboxes: List[BoundingBox], |
| | page_width: int, |
| | ) -> Tuple[int, Dict[int, int]]: |
| | """Detect column structure in the document.""" |
| | if not self.config.detect_columns or len(bboxes) < 4: |
| | return 1, {i: 0 for i in range(len(bboxes))} |
| |
|
| | |
| | x_centers = [(b.x_min + b.x_max) / 2 for b in bboxes] |
| |
|
| | |
| | min_gap = page_width * self.config.column_gap_threshold |
| | sorted_centers = sorted(set(x_centers)) |
| |
|
| | |
| | gaps = [] |
| | for i in range(len(sorted_centers) - 1): |
| | gap = sorted_centers[i + 1] - sorted_centers[i] |
| | if gap > min_gap: |
| | gaps.append((sorted_centers[i] + sorted_centers[i + 1]) / 2) |
| |
|
| | |
| | num_columns = min(len(gaps) + 1, self.config.max_columns) |
| |
|
| | if num_columns == 1: |
| | return 1, {i: 0 for i in range(len(bboxes))} |
| |
|
| | |
| | column_boundaries = [0] + sorted(gaps[:num_columns - 1]) + [page_width] |
| | assignments = {} |
| |
|
| | for i, bbox in enumerate(bboxes): |
| | center = (bbox.x_min + bbox.x_max) / 2 |
| | for col in range(num_columns): |
| | if column_boundaries[col] <= center < column_boundaries[col + 1]: |
| | assignments[i] = col |
| | break |
| | else: |
| | assignments[i] = num_columns - 1 |
| |
|
| | return num_columns, assignments |
| |
|
| | def _sort_single_column(self, bboxes: List[BoundingBox]) -> List[int]: |
| | """Sort regions in single-column layout.""" |
| | |
| | indexed = list(enumerate(bboxes)) |
| |
|
| | if self.config.vertical_priority: |
| | |
| | indexed.sort(key=lambda x: (x[1].y_min, x[1].x_min)) |
| | else: |
| | |
| | indexed.sort(key=lambda x: (x[1].x_min, x[1].y_min)) |
| |
|
| | if self.config.reading_direction == "rtl": |
| | |
| | |
| | rows = self._group_by_y(indexed) |
| | result = [] |
| | for row in rows: |
| | row.reverse() |
| | result.extend([i for i, _ in row]) |
| | return result |
| |
|
| | return [i for i, _ in indexed] |
| |
|
| | def _sort_multi_column( |
| | self, |
| | bboxes: List[BoundingBox], |
| | column_assignments: Dict[int, int], |
| | num_columns: int, |
| | ) -> List[int]: |
| | """Sort regions in multi-column layout.""" |
| | |
| | columns = defaultdict(list) |
| | for i, bbox in enumerate(bboxes): |
| | col = column_assignments.get(i, 0) |
| | columns[col].append((i, bbox)) |
| |
|
| | |
| | for col in columns: |
| | columns[col].sort(key=lambda x: (x[1].y_min, x[1].x_min)) |
| |
|
| | |
| | result = [] |
| | if self.config.reading_direction == "ltr": |
| | col_order = range(num_columns) |
| | else: |
| | col_order = range(num_columns - 1, -1, -1) |
| |
|
| | for col in col_order: |
| | result.extend([i for i, _ in columns.get(col, [])]) |
| |
|
| | return result |
| |
|
| | def _group_by_y( |
| | self, |
| | indexed_bboxes: List[Tuple[int, BoundingBox]], |
| | tolerance: float = 10.0, |
| | ) -> List[List[Tuple[int, BoundingBox]]]: |
| | """Group bboxes into rows by y position.""" |
| | if not indexed_bboxes: |
| | return [] |
| |
|
| | |
| | sorted_items = sorted(indexed_bboxes, key=lambda x: x[1].y_min) |
| |
|
| | rows = [] |
| | current_row = [sorted_items[0]] |
| | current_y = sorted_items[0][1].y_min |
| |
|
| | for item in sorted_items[1:]: |
| | if abs(item[1].y_min - current_y) <= tolerance: |
| | current_row.append(item) |
| | else: |
| | |
| | current_row.sort(key=lambda x: x[1].x_min) |
| | rows.append(current_row) |
| | current_row = [item] |
| | current_y = item[1].y_min |
| |
|
| | if current_row: |
| | current_row.sort(key=lambda x: x[1].x_min) |
| | rows.append(current_row) |
| |
|
| | return rows |
| |
|
| | def _adjust_for_headers_footers( |
| | self, |
| | order: List[int], |
| | regions: List[Any], |
| | layout_regions: List[LayoutRegion], |
| | page_height: int, |
| | ) -> List[int]: |
| | """Adjust order to put headers first and footers last.""" |
| | |
| | header_indices = set() |
| | footer_indices = set() |
| |
|
| | header_y_threshold = page_height * 0.1 |
| | footer_y_threshold = page_height * 0.9 |
| |
|
| | for layout_r in layout_regions: |
| | if layout_r.type == LayoutType.HEADER: |
| | for i, r in enumerate(regions): |
| | if hasattr(r, 'bbox') and layout_r.bbox.contains(r.bbox): |
| | header_indices.add(i) |
| | elif layout_r.type == LayoutType.FOOTER: |
| | for i, r in enumerate(regions): |
| | if hasattr(r, 'bbox') and layout_r.bbox.contains(r.bbox): |
| | footer_indices.add(i) |
| |
|
| | |
| | for i, r in enumerate(regions): |
| | if hasattr(r, 'bbox'): |
| | if r.bbox.y_max < header_y_threshold: |
| | header_indices.add(i) |
| | elif r.bbox.y_min > footer_y_threshold: |
| | footer_indices.add(i) |
| |
|
| | |
| | headers = [i for i in order if i in header_indices] |
| | footers = [i for i in order if i in footer_indices] |
| | body = [i for i in order if i not in header_indices and i not in footer_indices] |
| |
|
| | return headers + body + footers |
| |
|
| |
|
| | |
| | _reading_order: Optional[ReadingOrderReconstructor] = None |
| |
|
| |
|
| | def get_reading_order_reconstructor( |
| | config: Optional[ReadingOrderConfig] = None, |
| | ) -> ReadingOrderReconstructor: |
| | """Get or create singleton reading order reconstructor.""" |
| | global _reading_order |
| | if _reading_order is None: |
| | config = config or ReadingOrderConfig() |
| | _reading_order = RuleBasedReadingOrder(config) |
| | _reading_order.initialize() |
| | return _reading_order |
| |
|