| | """ |
| | Evidence Building and Management |
| | |
| | Creates and manages evidence references for extracted data. |
| | Links every extraction to its visual source. |
| | """ |
| |
|
| | import hashlib |
| | from dataclasses import dataclass, field |
| | from pathlib import Path |
| | from typing import Any, Dict, List, Optional, Union |
| |
|
| | from ..chunks.models import ( |
| | BoundingBox, |
| | DocumentChunk, |
| | EvidenceRef, |
| | TableChunk, |
| | ChartChunk, |
| | ) |
| |
|
| |
|
| | @dataclass |
| | class EvidenceConfig: |
| | """Configuration for evidence building.""" |
| |
|
| | |
| | crop_enabled: bool = True |
| | crop_output_dir: Optional[Path] = None |
| | crop_format: str = "png" |
| | crop_padding_percent: float = 0.02 |
| |
|
| | |
| | include_snippet: bool = True |
| | max_snippet_length: int = 200 |
| | include_context: bool = True |
| | context_chars: int = 50 |
| |
|
| |
|
| | class EvidenceBuilder: |
| | """ |
| | Builds evidence references for extractions. |
| | |
| | Creates links between extracted values and their |
| | visual sources in the document. |
| | """ |
| |
|
| | def __init__(self, config: Optional[EvidenceConfig] = None): |
| | self.config = config or EvidenceConfig() |
| | self._crop_counter = 0 |
| |
|
| | def create_evidence( |
| | self, |
| | chunk: DocumentChunk, |
| | value: Any, |
| | field_name: Optional[str] = None, |
| | crop_image: Optional[Any] = None, |
| | ) -> EvidenceRef: |
| | """ |
| | Create an evidence reference from a chunk. |
| | |
| | Args: |
| | chunk: Source chunk |
| | value: Extracted value |
| | field_name: Optional field name being extracted |
| | crop_image: Optional cropped image for this evidence |
| | |
| | Returns: |
| | EvidenceRef linking to the source |
| | """ |
| | |
| | crop_path = None |
| | if crop_image is not None and self.config.crop_enabled: |
| | crop_path = self._save_crop(crop_image, chunk) |
| |
|
| | |
| | snippet = self._create_snippet(chunk.text, str(value)) |
| |
|
| | |
| | if isinstance(chunk, TableChunk): |
| | source_type = "table" |
| | elif isinstance(chunk, ChartChunk): |
| | source_type = "chart" |
| | else: |
| | source_type = chunk.chunk_type.value |
| |
|
| | return EvidenceRef( |
| | chunk_id=chunk.chunk_id, |
| | doc_id=chunk.doc_id, |
| | page=chunk.page, |
| | bbox=chunk.bbox, |
| | source_type=source_type, |
| | snippet=snippet, |
| | confidence=chunk.confidence, |
| | crop_path=crop_path, |
| | ) |
| |
|
| | def create_evidence_from_bbox( |
| | self, |
| | doc_id: str, |
| | page: int, |
| | bbox: BoundingBox, |
| | source_text: str, |
| | confidence: float = 1.0, |
| | source_type: str = "region", |
| | crop_image: Optional[Any] = None, |
| | ) -> EvidenceRef: |
| | """ |
| | Create evidence from a bounding box. |
| | |
| | Args: |
| | doc_id: Document ID |
| | page: Page number |
| | bbox: Bounding box of evidence |
| | source_text: Text content |
| | confidence: Confidence score |
| | source_type: Type of source (text, table, chart, etc.) |
| | crop_image: Optional cropped image |
| | |
| | Returns: |
| | EvidenceRef for the region |
| | """ |
| | |
| | chunk_id = self._generate_region_id(doc_id, page, bbox) |
| |
|
| | |
| | crop_path = None |
| | if crop_image is not None and self.config.crop_enabled: |
| | crop_path = self._save_crop_direct( |
| | crop_image, |
| | doc_id, |
| | page, |
| | chunk_id, |
| | ) |
| |
|
| | return EvidenceRef( |
| | chunk_id=chunk_id, |
| | doc_id=doc_id, |
| | page=page, |
| | bbox=bbox, |
| | source_type=source_type, |
| | snippet=source_text[:self.config.max_snippet_length], |
| | confidence=confidence, |
| | crop_path=crop_path, |
| | ) |
| |
|
| | def create_table_cell_evidence( |
| | self, |
| | table_chunk: TableChunk, |
| | row: int, |
| | col: int, |
| | crop_image: Optional[Any] = None, |
| | ) -> Optional[EvidenceRef]: |
| | """ |
| | Create evidence for a specific table cell. |
| | |
| | Args: |
| | table_chunk: Source table |
| | row: Cell row (0-indexed) |
| | col: Cell column (0-indexed) |
| | crop_image: Optional cropped cell image |
| | |
| | Returns: |
| | EvidenceRef for the cell, or None if cell not found |
| | """ |
| | cell = table_chunk.get_cell(row, col) |
| | if cell is None: |
| | return None |
| |
|
| | cell_id = f"r{row}c{col}" |
| |
|
| | |
| | crop_path = None |
| | if crop_image is not None and self.config.crop_enabled: |
| | crop_path = self._save_crop_direct( |
| | crop_image, |
| | table_chunk.doc_id, |
| | table_chunk.page, |
| | f"{table_chunk.chunk_id}_{cell_id}", |
| | ) |
| |
|
| | return EvidenceRef( |
| | chunk_id=table_chunk.chunk_id, |
| | doc_id=table_chunk.doc_id, |
| | page=table_chunk.page, |
| | bbox=cell.bbox, |
| | source_type="table_cell", |
| | snippet=cell.text[:self.config.max_snippet_length], |
| | confidence=cell.confidence, |
| | cell_id=cell_id, |
| | crop_path=crop_path, |
| | ) |
| |
|
| | def merge_evidence( |
| | self, |
| | evidence_list: List[EvidenceRef], |
| | ) -> List[EvidenceRef]: |
| | """ |
| | Merge overlapping evidence references. |
| | |
| | Combines evidence that refers to the same region. |
| | """ |
| | if len(evidence_list) <= 1: |
| | return evidence_list |
| |
|
| | merged = [] |
| | used = set() |
| |
|
| | for i, ev1 in enumerate(evidence_list): |
| | if i in used: |
| | continue |
| |
|
| | |
| | group = [ev1] |
| | for j, ev2 in enumerate(evidence_list[i + 1:], start=i + 1): |
| | if j in used: |
| | continue |
| |
|
| | if (ev1.doc_id == ev2.doc_id and |
| | ev1.page == ev2.page and |
| | ev1.bbox.iou(ev2.bbox) > 0.5): |
| | group.append(ev2) |
| | used.add(j) |
| |
|
| | |
| | if len(group) == 1: |
| | merged.append(ev1) |
| | else: |
| | merged.append(self._merge_evidence_group(group)) |
| |
|
| | used.add(i) |
| |
|
| | return merged |
| |
|
| | def _merge_evidence_group( |
| | self, |
| | group: List[EvidenceRef], |
| | ) -> EvidenceRef: |
| | """Merge a group of overlapping evidence.""" |
| | |
| | best = max(group, key=lambda e: e.confidence) |
| |
|
| | |
| | merged_bbox = BoundingBox( |
| | x_min=min(e.bbox.x_min for e in group), |
| | y_min=min(e.bbox.y_min for e in group), |
| | x_max=max(e.bbox.x_max for e in group), |
| | y_max=max(e.bbox.y_max for e in group), |
| | normalized=best.bbox.normalized, |
| | ) |
| |
|
| | |
| | snippets = list(set(e.snippet for e in group if e.snippet)) |
| | combined_snippet = " | ".join(snippets)[:self.config.max_snippet_length] |
| |
|
| | return EvidenceRef( |
| | chunk_id=best.chunk_id, |
| | doc_id=best.doc_id, |
| | page=best.page, |
| | bbox=merged_bbox, |
| | source_type=best.source_type, |
| | snippet=combined_snippet, |
| | confidence=max(e.confidence for e in group), |
| | cell_id=best.cell_id, |
| | crop_path=best.crop_path, |
| | ) |
| |
|
| | def _create_snippet( |
| | self, |
| | full_text: str, |
| | value: str, |
| | ) -> str: |
| | """Create a text snippet highlighting the value.""" |
| | if not self.config.include_snippet: |
| | return "" |
| |
|
| | |
| | value_lower = value.lower() |
| | text_lower = full_text.lower() |
| |
|
| | idx = text_lower.find(value_lower) |
| | if idx >= 0 and self.config.include_context: |
| | |
| | start = max(0, idx - self.config.context_chars) |
| | end = min(len(full_text), idx + len(value) + self.config.context_chars) |
| |
|
| | snippet = full_text[start:end] |
| | if start > 0: |
| | snippet = "..." + snippet |
| | if end < len(full_text): |
| | snippet = snippet + "..." |
| |
|
| | return snippet[:self.config.max_snippet_length] |
| |
|
| | |
| | return full_text[:self.config.max_snippet_length] |
| |
|
| | def _generate_region_id( |
| | self, |
| | doc_id: str, |
| | page: int, |
| | bbox: BoundingBox, |
| | ) -> str: |
| | """Generate a stable ID for a region.""" |
| | content = f"{doc_id}_{page}_{bbox.xyxy}" |
| | return hashlib.md5(content.encode()).hexdigest()[:16] |
| |
|
| | def _save_crop( |
| | self, |
| | image: Any, |
| | chunk: DocumentChunk, |
| | ) -> Optional[str]: |
| | """Save a crop image for a chunk.""" |
| | return self._save_crop_direct( |
| | image, |
| | chunk.doc_id, |
| | chunk.page, |
| | chunk.chunk_id, |
| | ) |
| |
|
| | def _save_crop_direct( |
| | self, |
| | image: Any, |
| | doc_id: str, |
| | page: int, |
| | identifier: str, |
| | ) -> Optional[str]: |
| | """Save a crop image directly.""" |
| | if self.config.crop_output_dir is None: |
| | return None |
| |
|
| | try: |
| | from PIL import Image |
| | import numpy as np |
| |
|
| | |
| | if isinstance(image, np.ndarray): |
| | pil_image = Image.fromarray(image) |
| | elif isinstance(image, Image.Image): |
| | pil_image = image |
| | else: |
| | return None |
| |
|
| | |
| | output_dir = Path(self.config.crop_output_dir) |
| | output_dir.mkdir(parents=True, exist_ok=True) |
| |
|
| | filename = f"{doc_id}_{page}_{identifier}.{self.config.crop_format}" |
| | output_path = output_dir / filename |
| |
|
| | pil_image.save(output_path) |
| | return str(output_path) |
| |
|
| | except Exception: |
| | return None |
| |
|
| |
|
| | class EvidenceTracker: |
| | """ |
| | Tracks evidence references during extraction. |
| | |
| | Maintains a collection of evidence and provides |
| | methods for querying and validation. |
| | """ |
| |
|
| | def __init__(self): |
| | self._evidence: List[EvidenceRef] = [] |
| | self._by_field: Dict[str, List[EvidenceRef]] = {} |
| | self._by_chunk: Dict[str, List[EvidenceRef]] = {} |
| |
|
| | def add( |
| | self, |
| | evidence: EvidenceRef, |
| | field_name: Optional[str] = None, |
| | ) -> None: |
| | """Add an evidence reference.""" |
| | self._evidence.append(evidence) |
| |
|
| | |
| | if evidence.chunk_id not in self._by_chunk: |
| | self._by_chunk[evidence.chunk_id] = [] |
| | self._by_chunk[evidence.chunk_id].append(evidence) |
| |
|
| | |
| | if field_name: |
| | if field_name not in self._by_field: |
| | self._by_field[field_name] = [] |
| | self._by_field[field_name].append(evidence) |
| |
|
| | def get_all(self) -> List[EvidenceRef]: |
| | """Get all evidence references.""" |
| | return self._evidence.copy() |
| |
|
| | def get_for_field(self, field_name: str) -> List[EvidenceRef]: |
| | """Get evidence for a specific field.""" |
| | return self._by_field.get(field_name, []).copy() |
| |
|
| | def get_for_chunk(self, chunk_id: str) -> List[EvidenceRef]: |
| | """Get evidence from a specific chunk.""" |
| | return self._by_chunk.get(chunk_id, []).copy() |
| |
|
| | def get_by_page(self, page: int) -> List[EvidenceRef]: |
| | """Get evidence from a specific page.""" |
| | return [e for e in self._evidence if e.page == page] |
| |
|
| | def get_high_confidence(self, threshold: float = 0.8) -> List[EvidenceRef]: |
| | """Get evidence above confidence threshold.""" |
| | return [e for e in self._evidence if e.confidence >= threshold] |
| |
|
| | def validate_field( |
| | self, |
| | field_name: str, |
| | min_evidence: int = 1, |
| | min_confidence: float = 0.5, |
| | ) -> bool: |
| | """ |
| | Validate that a field has sufficient evidence. |
| | |
| | Args: |
| | field_name: Field to validate |
| | min_evidence: Minimum number of evidence references |
| | min_confidence: Minimum confidence score |
| | |
| | Returns: |
| | True if field has sufficient evidence |
| | """ |
| | field_evidence = self.get_for_field(field_name) |
| |
|
| | if len(field_evidence) < min_evidence: |
| | return False |
| |
|
| | |
| | max_confidence = max((e.confidence for e in field_evidence), default=0) |
| | return max_confidence >= min_confidence |
| |
|
| | def clear(self) -> None: |
| | """Clear all evidence.""" |
| | self._evidence = [] |
| | self._by_field = {} |
| | self._by_chunk = {} |
| |
|