| | """ |
| | Semantic deduplication for compressing context content. |
| | """ |
| |
|
| | import logging |
| | from typing import List, Optional, Tuple, Dict, Any |
| |
|
| | import numpy as np |
| | from sklearn.metrics.pairwise import cosine_similarity |
| |
|
| | from efficient_context.compression.base import BaseCompressor |
| | from efficient_context.utils.text import split_into_sentences, get_sentence_importance |
| |
|
| | |
| | logging.basicConfig(level=logging.INFO) |
| | logger = logging.getLogger(__name__) |
| |
|
| | class SemanticDeduplicator(BaseCompressor): |
| | """ |
| | Compressor that removes semantically duplicate or redundant content. |
| | |
| | This compressor identifies and removes sentences that are semantically |
| | similar to others in the content, keeping only the most representative ones. |
| | It's designed to be CPU-friendly and memory-efficient. |
| | """ |
| | |
| | def __init__( |
| | self, |
| | threshold: float = 0.85, |
| | embedding_model: str = "lightweight", |
| | min_sentence_length: int = 10, |
| | importance_weight: float = 0.3, |
| | ): |
| | """ |
| | Initialize the SemanticDeduplicator. |
| | |
| | Args: |
| | threshold: Similarity threshold for considering content duplicated (0.0 to 1.0) |
| | embedding_model: The model to use for generating embeddings |
| | min_sentence_length: Minimum length of sentences to consider |
| | importance_weight: Weight given to sentence importance vs. deduplication |
| | """ |
| | self.threshold = threshold |
| | self.embedding_model = embedding_model |
| | self.min_sentence_length = min_sentence_length |
| | self.importance_weight = importance_weight |
| | |
| | |
| | self._init_embedding_model() |
| | |
| | logger.info("SemanticDeduplicator initialized with threshold: %.2f", threshold) |
| | |
| | def _init_embedding_model(self): |
| | """Initialize the embedding model based on the selected type.""" |
| | try: |
| | from sentence_transformers import SentenceTransformer |
| | |
| | |
| | if self.embedding_model == "lightweight": |
| | |
| | self.model = SentenceTransformer('paraphrase-MiniLM-L3-v2') |
| | else: |
| | |
| | self.model = SentenceTransformer(self.embedding_model) |
| | |
| | logger.info("Using embedding model: %s", self.model.get_sentence_embedding_dimension()) |
| | except ImportError: |
| | logger.warning("SentenceTransformer not available, using numpy fallback (less accurate)") |
| | self.model = None |
| | |
| | def _get_embeddings(self, sentences: List[str]) -> np.ndarray: |
| | """ |
| | Get embeddings for a list of sentences. |
| | |
| | Args: |
| | sentences: List of sentences to embed |
| | |
| | Returns: |
| | embeddings: Array of sentence embeddings |
| | """ |
| | if not sentences: |
| | return np.array([]) |
| | |
| | if self.model is not None: |
| | |
| | return self.model.encode(sentences, show_progress_bar=False) |
| | else: |
| | |
| | |
| | from sklearn.feature_extraction.text import TfidfVectorizer |
| | vectorizer = TfidfVectorizer(max_features=5000) |
| | return vectorizer.fit_transform(sentences).toarray() |
| | |
| | def _compute_similarity_matrix(self, embeddings: np.ndarray) -> np.ndarray: |
| | """ |
| | Compute pairwise similarity between embeddings. |
| | |
| | Args: |
| | embeddings: Array of sentence embeddings |
| | |
| | Returns: |
| | similarity_matrix: Matrix of pairwise similarities |
| | """ |
| | |
| | if embeddings.shape[0] == 0: |
| | return np.array([]) |
| | |
| | |
| | return cosine_similarity(embeddings) |
| | |
| | def _deduplicate_sentences( |
| | self, |
| | sentences: List[str], |
| | importances: Optional[List[float]] = None |
| | ) -> List[int]: |
| | """ |
| | Identify non-redundant sentence indices. |
| | |
| | Args: |
| | sentences: List of sentences to deduplicate |
| | importances: Optional list of importance scores |
| | |
| | Returns: |
| | kept_indices: Indices of sentences to keep |
| | """ |
| | if not sentences: |
| | return [] |
| | |
| | |
| | valid_indices = [i for i, s in enumerate(sentences) if len(s.split()) >= self.min_sentence_length] |
| | |
| | if not valid_indices: |
| | |
| | return list(range(len(sentences))) |
| | |
| | |
| | valid_sentences = [sentences[i] for i in valid_indices] |
| | embeddings = self._get_embeddings(valid_sentences) |
| | |
| | |
| | similarity_matrix = self._compute_similarity_matrix(embeddings) |
| | |
| | |
| | np.fill_diagonal(similarity_matrix, 0) |
| | |
| | |
| | kept_indices = [] |
| | remaining_indices = set(range(len(valid_indices))) |
| | |
| | |
| | if importances is not None: |
| | valid_importances = [importances[i] for i in valid_indices] |
| | ordered_indices = [i for i, _ in sorted( |
| | enumerate(valid_importances), |
| | key=lambda x: x[1], |
| | reverse=True |
| | )] |
| | else: |
| | |
| | ordered_indices = [i for i, _ in sorted( |
| | enumerate(valid_sentences), |
| | key=lambda x: len(x[1].split()), |
| | reverse=True |
| | )] |
| | |
| | |
| | for idx in ordered_indices: |
| | if idx not in remaining_indices: |
| | continue |
| | |
| | |
| | kept_indices.append(valid_indices[idx]) |
| | remaining_indices.remove(idx) |
| | |
| | |
| | similar_indices = [ |
| | i for i in remaining_indices |
| | if similarity_matrix[idx, i] > self.threshold |
| | ] |
| | |
| | remaining_indices -= set(similar_indices) |
| | |
| | |
| | if not remaining_indices: |
| | break |
| | |
| | |
| | short_indices = [i for i, s in enumerate(sentences) if len(s.split()) < self.min_sentence_length] |
| | kept_indices.extend(short_indices) |
| | |
| | |
| | return sorted(kept_indices) |
| | |
| | def compress(self, content: str, target_size: Optional[int] = None) -> str: |
| | """ |
| | Compress content by removing semantic duplicates. |
| | |
| | Args: |
| | content: The content to compress |
| | target_size: Optional target size in tokens |
| | |
| | Returns: |
| | compressed_content: The compressed content |
| | """ |
| | |
| | sentences = split_into_sentences(content) |
| | |
| | if not sentences: |
| | return content |
| | |
| | |
| | importances = get_sentence_importance(sentences) |
| | |
| | |
| | kept_indices = self._deduplicate_sentences(sentences, importances) |
| | |
| | |
| | kept_sentences = [sentences[i] for i in kept_indices] |
| | compressed = " ".join(kept_sentences) |
| | |
| | |
| | if target_size and len(compressed.split()) > target_size: |
| | |
| | current_size = len(compressed.split()) |
| | reduction_needed = current_size - target_size |
| | |
| | |
| | sentence_priorities = [(i, importances[i]) for i in kept_indices] |
| | sorted_priorities = sorted(sentence_priorities, key=lambda x: x[1]) |
| | |
| | |
| | remove_count = 0 |
| | tokens_removed = 0 |
| | indices_to_remove = [] |
| | |
| | for idx, _ in sorted_priorities: |
| | sentence_tokens = len(sentences[idx].split()) |
| | tokens_removed += sentence_tokens |
| | remove_count += 1 |
| | indices_to_remove.append(idx) |
| | |
| | if tokens_removed >= reduction_needed: |
| | break |
| | |
| | |
| | final_indices = [i for i in kept_indices if i not in indices_to_remove] |
| | |
| | |
| | compressed = " ".join(sentences[i] for i in sorted(final_indices)) |
| | |
| | |
| | original_tokens = len(content.split()) |
| | compressed_tokens = len(compressed.split()) |
| | reduction = (1 - compressed_tokens / original_tokens) * 100 if original_tokens > 0 else 0 |
| | |
| | logger.info( |
| | "Compressed from %d to %d tokens (%.1f%% reduction)", |
| | original_tokens, compressed_tokens, reduction |
| | ) |
| | |
| | return compressed |
| |
|