| | """ |
| | Document Cache |
| | |
| | Caches rendered page images and document metadata for performance. |
| | """ |
| |
|
| | import hashlib |
| | import os |
| | from pathlib import Path |
| | from typing import Dict, Optional, Tuple |
| | from dataclasses import dataclass |
| | from datetime import datetime, timedelta |
| | from loguru import logger |
| |
|
| | import numpy as np |
| | from PIL import Image |
| |
|
| | from cachetools import TTLCache, LRUCache |
| |
|
| |
|
| | @dataclass |
| | class CacheEntry: |
| | """A cached page image entry.""" |
| | document_id: str |
| | page_number: int |
| | dpi: int |
| | image: np.ndarray |
| | created_at: datetime |
| | size_bytes: int |
| |
|
| |
|
| | class DocumentCache: |
| | """ |
| | In-memory cache for rendered document pages. |
| | Uses LRU eviction with optional disk persistence. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | max_pages: int = 100, |
| | max_memory_mb: int = 1024, |
| | ttl_seconds: int = 3600, |
| | disk_cache_dir: Optional[str] = None, |
| | ): |
| | """ |
| | Initialize document cache. |
| | |
| | Args: |
| | max_pages: Maximum number of pages to cache in memory |
| | max_memory_mb: Maximum memory usage in MB |
| | ttl_seconds: Time-to-live for cache entries |
| | disk_cache_dir: Optional directory for disk caching |
| | """ |
| | self.max_pages = max_pages |
| | self.max_memory_mb = max_memory_mb |
| | self.ttl_seconds = ttl_seconds |
| | self.disk_cache_dir = disk_cache_dir |
| |
|
| | |
| | self._cache: TTLCache = TTLCache(maxsize=max_pages, ttl=ttl_seconds) |
| |
|
| | |
| | self._memory_used_bytes = 0 |
| |
|
| | |
| | self._hits = 0 |
| | self._misses = 0 |
| |
|
| | |
| | if disk_cache_dir: |
| | self._disk_cache_path = Path(disk_cache_dir) |
| | self._disk_cache_path.mkdir(parents=True, exist_ok=True) |
| | else: |
| | self._disk_cache_path = None |
| |
|
| | logger.debug(f"Initialized DocumentCache (max_pages={max_pages}, max_memory={max_memory_mb}MB)") |
| |
|
| | def _make_key(self, document_id: str, page_number: int, dpi: int) -> str: |
| | """Generate cache key.""" |
| | return f"{document_id}:p{page_number}:d{dpi}" |
| |
|
| | def get( |
| | self, |
| | document_id: str, |
| | page_number: int, |
| | dpi: int = 300, |
| | ) -> Optional[np.ndarray]: |
| | """ |
| | Get a cached page image. |
| | |
| | Args: |
| | document_id: Document identifier |
| | page_number: Page number |
| | dpi: Rendering DPI |
| | |
| | Returns: |
| | Cached image array or None |
| | """ |
| | key = self._make_key(document_id, page_number, dpi) |
| |
|
| | |
| | entry = self._cache.get(key) |
| | if entry is not None: |
| | self._hits += 1 |
| | return entry.image |
| |
|
| | |
| | if self._disk_cache_path: |
| | disk_path = self._disk_cache_path / f"{key}.npy" |
| | if disk_path.exists(): |
| | try: |
| | image = np.load(disk_path) |
| | |
| | self._put_memory(key, document_id, page_number, dpi, image) |
| | self._hits += 1 |
| | return image |
| | except Exception as e: |
| | logger.warning(f"Failed to load from disk cache: {e}") |
| |
|
| | self._misses += 1 |
| | return None |
| |
|
| | def put( |
| | self, |
| | document_id: str, |
| | page_number: int, |
| | dpi: int, |
| | image: np.ndarray, |
| | persist_to_disk: bool = False, |
| | ): |
| | """ |
| | Cache a page image. |
| | |
| | Args: |
| | document_id: Document identifier |
| | page_number: Page number |
| | dpi: Rendering DPI |
| | image: Page image as numpy array |
| | persist_to_disk: Whether to persist to disk |
| | """ |
| | key = self._make_key(document_id, page_number, dpi) |
| |
|
| | |
| | self._put_memory(key, document_id, page_number, dpi, image) |
| |
|
| | |
| | if persist_to_disk and self._disk_cache_path: |
| | self._put_disk(key, image) |
| |
|
| | def _put_memory( |
| | self, |
| | key: str, |
| | document_id: str, |
| | page_number: int, |
| | dpi: int, |
| | image: np.ndarray, |
| | ): |
| | """Put entry in memory cache.""" |
| | size_bytes = image.nbytes |
| |
|
| | |
| | max_bytes = self.max_memory_mb * 1024 * 1024 |
| | if self._memory_used_bytes + size_bytes > max_bytes: |
| | |
| | self._evict_to_fit(size_bytes) |
| |
|
| | entry = CacheEntry( |
| | document_id=document_id, |
| | page_number=page_number, |
| | dpi=dpi, |
| | image=image, |
| | created_at=datetime.utcnow(), |
| | size_bytes=size_bytes, |
| | ) |
| |
|
| | self._cache[key] = entry |
| | self._memory_used_bytes += size_bytes |
| |
|
| | def _put_disk(self, key: str, image: np.ndarray): |
| | """Persist entry to disk cache.""" |
| | if not self._disk_cache_path: |
| | return |
| |
|
| | try: |
| | disk_path = self._disk_cache_path / f"{key}.npy" |
| | np.save(disk_path, image) |
| | except Exception as e: |
| | logger.warning(f"Failed to write to disk cache: {e}") |
| |
|
| | def _evict_to_fit(self, needed_bytes: int): |
| | """Evict entries to fit new entry.""" |
| | max_bytes = self.max_memory_mb * 1024 * 1024 |
| | target = max_bytes - needed_bytes |
| |
|
| | |
| | entries = list(self._cache.items()) |
| |
|
| | for key, entry in entries: |
| | if self._memory_used_bytes <= target: |
| | break |
| | self._memory_used_bytes -= entry.size_bytes |
| | del self._cache[key] |
| |
|
| | def invalidate(self, document_id: str, page_number: Optional[int] = None): |
| | """ |
| | Invalidate cache entries for a document. |
| | |
| | Args: |
| | document_id: Document to invalidate |
| | page_number: Optional specific page (None = all pages) |
| | """ |
| | keys_to_remove = [] |
| |
|
| | for key in self._cache.keys(): |
| | if key.startswith(f"{document_id}:"): |
| | if page_number is None or f":p{page_number}:" in key: |
| | keys_to_remove.append(key) |
| |
|
| | for key in keys_to_remove: |
| | entry = self._cache.pop(key, None) |
| | if entry: |
| | self._memory_used_bytes -= entry.size_bytes |
| |
|
| | |
| | if self._disk_cache_path: |
| | for key in keys_to_remove: |
| | disk_path = self._disk_cache_path / f"{key}.npy" |
| | if disk_path.exists(): |
| | disk_path.unlink() |
| |
|
| | def clear(self): |
| | """Clear all cache entries.""" |
| | self._cache.clear() |
| | self._memory_used_bytes = 0 |
| |
|
| | |
| | if self._disk_cache_path: |
| | for f in self._disk_cache_path.glob("*.npy"): |
| | f.unlink() |
| |
|
| | logger.info("Document cache cleared") |
| |
|
| | @property |
| | def stats(self) -> Dict: |
| | """Get cache statistics.""" |
| | total = self._hits + self._misses |
| | hit_rate = (self._hits / total * 100) if total > 0 else 0 |
| |
|
| | return { |
| | "hits": self._hits, |
| | "misses": self._misses, |
| | "hit_rate": f"{hit_rate:.1f}%", |
| | "entries": len(self._cache), |
| | "memory_used_mb": self._memory_used_bytes / (1024 * 1024), |
| | "max_memory_mb": self.max_memory_mb, |
| | } |
| |
|
| |
|
| | |
| | _document_cache: Optional[DocumentCache] = None |
| |
|
| |
|
| | def get_document_cache() -> DocumentCache: |
| | """Get or create the global document cache.""" |
| | global _document_cache |
| | if _document_cache is None: |
| | _document_cache = DocumentCache() |
| | return _document_cache |
| |
|