| from qdrant_client import QdrantClient |
| from qdrant_client.models import VectorParams, Distance, PointStruct |
| import numpy as np |
| from typing import List, Dict, Optional, Tuple, Set |
| from collections import Counter, defaultdict |
| from sentence_transformers import SentenceTransformer |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| import time |
| import re |
| import pprint |
| import os |
| from dotenv import load_dotenv |
|
|
| |
| load_dotenv() |
|
|
|
|
| class MultiCollectionChapterRetrieval: |
| def __init__(self, use_cloud: bool = True): |
| """ |
| Initialize with Qdrant Cloud or local connection |
| |
| Args: |
| use_cloud: If True, connects to Qdrant Cloud using environment variables |
| """ |
| if use_cloud: |
| self.client = self._create_cloud_client() |
| else: |
| self.client = QdrantClient("http://localhost:6333") |
| |
| self.encoder = None |
| |
| |
| self.chapter_info = { |
| "chapter_1_I": "Certain infectious and parasitic diseases", |
| "chapter_2_II": "Neoplasms", |
| "chapter_3_III": "Diseases of the blood and blood-forming organs and certain disorders involving the immune mechanism", |
| "chapter_4_IV": "Endocrine, nutritional and metabolic diseases", |
| "chapter_5_V": "Mental and behavioural disorders", |
| "chapter_6_VI": "Diseases of the nervous system", |
| "chapter_7_VII": "Diseases of the eye and adnexa", |
| "chapter_8_VIII": "Diseases of the ear and mastoid process", |
| "chapter_9_IX": "Diseases of the circulatory system", |
| "chapter_10_X": "Diseases of the respiratory system", |
| "chapter_11_XI": "Diseases of the digestive system", |
| "chapter_12_XII": "Diseases of the skin and subcutaneous tissue", |
| "chapter_13_XIII": "Diseases of the musculoskeletal system and connective tissue", |
| "chapter_14_XIV": "Diseases of the genitourinary system", |
| "chapter_15_XV": "Pregnancy, childbirth and the puerperium", |
| "chapter_16_XVI": "Certain conditions originating in the perinatal period", |
| "chapter_17_XVII": "Congenital malformations, deformations and chromosomal abnormalities", |
| "chapter_18_XVIII": "Symptoms, signs and abnormal clinical and laboratory findings, not elsewhere classified", |
| "chapter_19_XIX": "Injury, poisoning and certain other consequences of external causes", |
| "chapter_20_XX": "External causes of morbidity and mortality", |
| "chapter_21_XXI": "Factors influencing health status and contact with health services", |
| "chapter_22_XXII": "Codes for special purposes" |
| } |
| |
| |
| self._chapter_collections = None |
| |
| def _create_cloud_client(self) -> QdrantClient: |
| """Create Qdrant Cloud client with authentication""" |
| qdrant_url = os.getenv('QDRANT_URL') |
| qdrant_api_key = os.getenv('QDRANT_API_KEY') |
| |
| if not qdrant_url or not qdrant_api_key: |
| raise ValueError( |
| "Qdrant Cloud credentials not found in environment variables.\n" |
| "Please set QDRANT_URL and QDRANT_API_KEY in your .env file:\n" |
| "QDRANT_URL=https://your-cluster-id.region.aws.cloud.qdrant.io:6333\n" |
| "QDRANT_API_KEY=your-api-key-here" |
| ) |
| |
| print(f"π Connecting to Qdrant Cloud: {qdrant_url}") |
| |
| try: |
| client = QdrantClient( |
| url=qdrant_url, |
| api_key=qdrant_api_key, |
| timeout=60, |
| |
| prefer_grpc=True, |
| ) |
| |
| |
| collections = client.get_collections() |
| print(f"β
Connected successfully! Found {len(collections.collections)} collections") |
| |
| |
| return client |
| |
| except Exception as e: |
| print(f"β Failed to connect to Qdrant Cloud: {e}") |
| print("Please check your QDRANT_URL and QDRANT_API_KEY in the .env file") |
| raise |
|
|
| def split_into_sentences(self, text: str) -> List[str]: |
| """Split text into sentences using simple rules""" |
| import re |
| |
| |
| sentences = re.split(r'[.!?]+', text) |
| sentences = [s.strip() for s in sentences if s.strip()] |
| return sentences |
| |
| def load_encoder(self, model_name: str = "all-MiniLM-L6-v2"): |
| """Load the sentence transformer model""" |
| if self.encoder is None: |
| print(f"π₯ Loading encoder: {model_name}") |
| self.encoder = SentenceTransformer(model_name) |
| print(f"β
Encoder loaded successfully") |
| |
| def encode_query(self, query: str) -> List[float]: |
| """Encode diagnostic string to vector""" |
| if self.encoder is None: |
| self.load_encoder() |
| return self.encoder.encode([query])[0].tolist() |
| |
| def get_chapter_collections(self) -> Dict[str, str]: |
| """ |
| Get mapping of chapter_id -> collection_name |
| Discovers collections automatically based on naming patterns |
| """ |
| if self._chapter_collections is not None: |
| return self._chapter_collections |
| |
| try: |
| collections = self.client.get_collections() |
| chapter_collections = {} |
| |
| print("π Discovering chapter collections...") |
| |
| for collection in collections.collections: |
| collection_name = collection.name |
| |
| |
| chapter_match = None |
| |
| |
| pattern1 = re.search(r'chapter[_-]?(\d+)[_-]?([IVX]+)', collection_name, re.IGNORECASE) |
| if pattern1: |
| chapter_num = pattern1.group(1) |
| roman = pattern1.group(2) |
| chapter_match = f"chapter_{chapter_num}_{roman}" |
| |
| |
| elif 'all' in collection_name.lower() and ('chapter' in collection_name.lower() or 'icd' in collection_name.lower()): |
| print(f" π Found unified collection: {collection_name}") |
| |
| chapter_collections['unified_collection'] = collection_name |
| continue |
| |
| |
| elif 'chapter' in collection_name.lower(): |
| numbers = re.findall(r'\d+', collection_name) |
| romans = re.findall(r'[IVX]+', collection_name) |
| |
| if numbers and romans: |
| chapter_match = f"chapter_{numbers[0]}_{romans[0]}" |
| elif numbers: |
| |
| num = int(numbers[0]) |
| roman_map = {1: 'I', 2: 'II', 3: 'III', 4: 'IV', 5: 'V', 6: 'VI', 7: 'VII', |
| 8: 'VIII', 9: 'IX', 10: 'X', 11: 'XI', 12: 'XII', 13: 'XIII', |
| 14: 'XIV', 15: 'XV', 16: 'XVI', 17: 'XVII', 18: 'XVIII', 19: 'XIX', |
| 20: 'XX', 21: 'XXI', 22: 'XXII'} |
| if num in roman_map: |
| chapter_match = f"chapter_{num}_{roman_map[num]}" |
| |
| if chapter_match: |
| chapter_collections[chapter_match] = collection_name |
| print(f" β {chapter_match} -> {collection_name}") |
| |
| print(f"π Found {len(chapter_collections)} chapter collections") |
| |
| |
| if len(chapter_collections) == 1 and 'unified_collection' in chapter_collections: |
| print("β οΈ Only unified collection found. Searches will use chapter filtering.") |
| |
| self._chapter_collections = chapter_collections |
| return chapter_collections |
| |
| except Exception as e: |
| print(f"β Error discovering collections: {e}") |
| return {} |
| |
| def search_single_collection( |
| self, |
| collection_name: str, |
| query_vector: List[float], |
| limit: int = 20, |
| score_threshold: float = 0.3, |
| chapter_filter: Optional[str] = None |
| ) -> List[Dict]: |
| """Search a single collection and return formatted results""" |
| try: |
| |
| search_params = { |
| "collection_name": collection_name, |
| "query_vector": query_vector, |
| "limit": limit, |
| "score_threshold": score_threshold |
| } |
| |
| results = self.client.search(**search_params) |
| |
| formatted_results = [] |
| for result in results: |
| formatted_results.append({ |
| 'collection': collection_name, |
| 'score': result.score, |
| 'id': result.id, |
| 'payload': result.payload |
| }) |
| |
| return formatted_results |
| |
| except Exception as e: |
| print(f"β Error searching {collection_name}: {e}") |
| if "timeout" in str(e).lower(): |
| print(" This might be due to network issues. Retrying with lower limit...") |
| try: |
| |
| search_params["limit"] = min(limit, 10) |
| search_params["score_threshold"] = max(score_threshold, 0.5) |
| results = self.client.search(**search_params) |
| |
| formatted_results = [] |
| for result in results: |
| formatted_results.append({ |
| 'collection': collection_name, |
| 'score': result.score, |
| 'id': result.id, |
| 'payload': result.payload |
| }) |
| return formatted_results |
| except: |
| pass |
| return [] |
| |
| def analyze_chapters_parallel( |
| self, |
| diagnostic_string: str, |
| sample_size_per_chapter: int = 15, |
| score_threshold: float = 0.3, |
| max_workers: int = 4 |
| ) -> Dict[str, Dict]: |
| """ |
| Analyze all chapter collections in parallel to determine relevance |
| Optimized for cloud performance |
| """ |
| query_vector = self.encode_query(diagnostic_string) |
| chapter_collections = self.get_chapter_collections() |
| |
| if not chapter_collections: |
| print("β No chapter collections found!") |
| return {} |
|
|
| print(f"\nπ Analyzing diagnostic: '{diagnostic_string}'") |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| print(f"π Searching {len(chapter_collections)} collections in parallel...") |
| |
| chapter_analysis = {} |
| |
| def search_chapter(chapter_id: str, collection_name: str) -> Tuple[str, List[Dict]]: |
| """Search function for parallel execution with retry logic""" |
| max_retries = 2 |
| for attempt in range(max_retries): |
| try: |
| results = self.search_single_collection( |
| collection_name, query_vector, sample_size_per_chapter, score_threshold |
| ) |
| return chapter_id, results |
| except Exception as e: |
| if attempt < max_retries - 1: |
| print(f" β οΈ Retry {attempt + 1} for {chapter_id}: {e}") |
| time.sleep(1) |
| else: |
| print(f" β Failed {chapter_id} after {max_retries} attempts: {e}") |
| return chapter_id, [] |
| |
| |
| start_time = time.time() |
| |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: |
| |
| future_to_chapter = { |
| executor.submit(search_chapter, chapter_id, collection_name): chapter_id |
| for chapter_id, collection_name in chapter_collections.items() |
| if chapter_id != 'unified_collection' |
| } |
| |
| |
| for future in as_completed(future_to_chapter): |
| chapter_id = future_to_chapter[future] |
| try: |
| chapter_id, results = future.result(timeout=30) |
| |
| if results: |
| scores = [r['score'] for r in results] |
| |
| |
| chapter_analysis[chapter_id] = { |
| 'collection_name': chapter_collections[chapter_id], |
| 'match_count': len(results), |
| 'max_score': max(scores), |
| 'avg_score': np.mean(scores), |
| 'median_score': np.median(scores), |
| 'min_score': min(scores), |
| 'score_std': np.std(scores), |
| 'top_matches': sorted(results, key=lambda x: x['score'], reverse=True)[:5], |
| 'all_results': results |
| } |
| |
| |
| relevance = ( |
| chapter_analysis[chapter_id]['avg_score'] * 0.4 + |
| chapter_analysis[chapter_id]['max_score'] * 0.3 + |
| min(len(results) / sample_size_per_chapter, 1.0) * 0.2 + |
| (1.0 / (1.0 + chapter_analysis[chapter_id]['score_std'])) * 0.1 |
| ) |
| |
| chapter_analysis[chapter_id]['relevance_score'] = relevance |
| |
| |
| |
| |
| |
| except Exception as e: |
| print(f" β {chapter_id}: Error - {e}") |
| |
| elapsed = time.time() - start_time |
| print(f"β±οΈ Parallel analysis completed in {elapsed:.2f} seconds") |
| |
| |
| sorted_analysis = dict(sorted( |
| chapter_analysis.items(), |
| key=lambda x: x[1]['relevance_score'], |
| reverse=True |
| )) |
| |
| return sorted_analysis |
| |
| def _analyze_unified_collection( |
| self, |
| diagnostic_string: str, |
| query_vector: List[float], |
| collection_name: str, |
| sample_size_per_chapter: int, |
| score_threshold: float |
| ) -> Dict[str, Dict]: |
| """Analyze unified collection by searching with chapter filters""" |
| print(f"π Analyzing unified collection: {collection_name}") |
| |
| chapter_analysis = {} |
| |
| |
| for chapter_id in self.chapter_info.keys(): |
| try: |
| results = self.search_single_collection( |
| collection_name, query_vector, sample_size_per_chapter, |
| score_threshold, chapter_filter=chapter_id |
| ) |
| |
| if results: |
| scores = [r['score'] for r in results] |
| |
| chapter_analysis[chapter_id] = { |
| 'collection_name': collection_name, |
| 'match_count': len(results), |
| 'max_score': max(scores), |
| 'avg_score': np.mean(scores), |
| 'median_score': np.median(scores), |
| 'min_score': min(scores), |
| 'score_std': np.std(scores), |
| 'top_matches': sorted(results, key=lambda x: x['score'], reverse=True)[:5], |
| 'all_results': results |
| } |
| |
| |
| relevance = ( |
| chapter_analysis[chapter_id]['avg_score'] * 0.4 + |
| chapter_analysis[chapter_id]['max_score'] * 0.3 + |
| min(len(results) / sample_size_per_chapter, 1.0) * 0.2 + |
| (1.0 / (1.0 + chapter_analysis[chapter_id]['score_std'])) * 0.1 |
| ) |
| |
| chapter_analysis[chapter_id]['relevance_score'] = relevance |
| print(f" β
{chapter_id}: {len(results)} matches, relevance: {relevance:.4f}") |
| else: |
| print(f" β {chapter_id}: No matches above threshold") |
| |
| |
| time.sleep(0.1) |
| |
| except Exception as e: |
| print(f" β {chapter_id}: Error - {e}") |
| |
| |
| return dict(sorted( |
| chapter_analysis.items(), |
| key=lambda x: x[1]['relevance_score'], |
| reverse=True |
| )) |
| |
| def get_top_chapters( |
| self, |
| diagnostic_string: str, |
| top_n: int = 5, |
| min_relevance: float = 0.1 |
| ) -> List[Tuple[str, float, str]]: |
| """ |
| Get top N most relevant chapters for a diagnostic string |
| Returns: [(chapter_id, relevance_score, description)] |
| """ |
| analysis = self.analyze_chapters_parallel(diagnostic_string) |
| |
| top_chapters = [] |
| for chapter_id, stats in analysis.items(): |
| relevance = stats['relevance_score'] |
| |
| if relevance >= min_relevance and len(top_chapters) < top_n: |
| description = self.chapter_info.get(chapter_id, "Unknown chapter") |
| top_chapters.append((chapter_id, relevance, description)) |
| |
| return top_chapters |
| |
| def search_targeted_chapters( |
| self, |
| diagnostic_string: str, |
| target_chapters: List[str] = None, |
| results_per_chapter: int = 10, |
| results_per_sentence: int = 3, |
| chapters_per_sentence: int = 2 |
| ) -> Dict[str, Dict[str, List[Dict]]]: |
| """ |
| Search only specific chapters or auto-identify top chapters for each sentence individually. |
| Now searches only the most relevant chapters for each specific sentence. |
| """ |
| print(f"\n=== STARTING search_targeted_chapters ===") |
| print(f"Input parameters:") |
| print(f" diagnostic_string: '{diagnostic_string[:100]}{'...' if len(diagnostic_string) > 100 else ''}'") |
| print(f" target_chapters: {target_chapters}") |
| print(f" results_per_sentence: {results_per_sentence}") |
| print(f" chapters_per_sentence: {chapters_per_sentence}") |
| |
| |
| print(f"\n--- SENTENCE SPLITTING ---") |
| sentences = self.split_into_sentences(diagnostic_string) |
| print(f"Split into {len(sentences)} sentences:") |
| for i, sentence in enumerate(sentences): |
| print(f" [{i+1}]: '{sentence}'") |
| |
| print(f"\n--- GETTING CHAPTER COLLECTIONS ---") |
| chapter_collections = self.get_chapter_collections() |
| print(f"Available chapter collections: {len(chapter_collections)} total") |
| print(f"Chapter IDs: {list(chapter_collections.keys())}") |
| |
| results = {} |
| |
| if target_chapters is None: |
| print(f"\n=== AUTO-IDENTIFICATION MODE ===") |
| print("Auto-identifying most relevant chapters for each sentence individually...") |
| |
| for i, sentence in enumerate(sentences): |
| if sentence.strip(): |
| sentence_key = f"sentence_{i+1}" |
| print(f"\n--- Processing sentence {i+1} ---") |
| print(f"Sentence: '{sentence}'") |
| print(f"Sentence key: {sentence_key}") |
| |
| |
| print(f"Getting top {chapters_per_sentence} chapters for this sentence...") |
| try: |
| sentence_top_chapters = self.get_top_chapters( |
| sentence, |
| top_n=chapters_per_sentence, |
| min_relevance=0.05 |
| ) |
| print(f"Found {len(sentence_top_chapters)} relevant chapters:") |
| for j, (ch_id, rel, desc) in enumerate(sentence_top_chapters): |
| print(f" [{j+1}] {ch_id}: {rel:.4f} - {desc}") |
| except Exception as e: |
| print(f"ERROR in get_top_chapters: {e}") |
| sentence_top_chapters = [] |
| |
| |
| print(f"Searching in {len(sentence_top_chapters)} selected chapters...") |
| for chapter_id, relevance, description in sentence_top_chapters: |
| print(f"\n >> Searching chapter: {chapter_id} (relevance: {relevance:.4f})") |
| |
| if chapter_id in chapter_collections: |
| collection_name = chapter_collections[chapter_id] |
| print(f" Collection name: {collection_name}") |
| |
| |
| if chapter_id not in results: |
| results[chapter_id] = {} |
| print(f" Initialized results dict for chapter {chapter_id}") |
| |
| |
| try: |
| print(f" Encoding query for sentence...") |
| query_vector = self.encode_query(sentence) |
| print(f" Query vector shape: {getattr(query_vector, 'shape', 'N/A')}") |
| |
| print(f" Searching collection '{collection_name}' for top {results_per_sentence} results...") |
| sentence_results = self.search_single_collection( |
| collection_name, query_vector, results_per_sentence |
| ) |
| print(f" Raw search returned {len(sentence_results) if sentence_results else 0} results") |
| |
| except Exception as e: |
| print(f" ERROR during search: {e}") |
| sentence_results = [] |
| |
| if sentence_results: |
| results[chapter_id][sentence_key] = { |
| 'text': sentence, |
| 'chapter_relevance': relevance, |
| 'results': sentence_results |
| } |
| print(f" β Stored {len(sentence_results)} results for {chapter_id}[{sentence_key}]") |
| |
| |
| if sentence_results: |
| top_scores = [r.get('score', 'N/A') for r in sentence_results[:3]] |
| print(f" Top 3 scores: {top_scores}") |
| else: |
| print(f" β No results above threshold for {chapter_id}") |
| else: |
| print(f" ERROR: Chapter {chapter_id} collection not found in available collections") |
| else: |
| print(f"\n--- Skipping empty sentence {i+1} ---") |
| |
| else: |
| print(f"\n=== PRE-SPECIFIED CHAPTERS MODE ===") |
| print(f"Using pre-specified chapters: {target_chapters}") |
| |
| |
| valid_chapters = [] |
| invalid_chapters = [] |
| for chapter_id in target_chapters: |
| if chapter_id in chapter_collections: |
| valid_chapters.append(chapter_id) |
| else: |
| invalid_chapters.append(chapter_id) |
| |
| print(f"Valid chapters: {valid_chapters}") |
| if invalid_chapters: |
| print(f"WARNING: Invalid chapters (will be skipped): {invalid_chapters}") |
| |
| for chapter_id in valid_chapters: |
| collection_name = chapter_collections[chapter_id] |
| print(f"\n--- Searching chapter: {chapter_id} ---") |
| print(f"Collection name: {collection_name}") |
| |
| chapter_results = {} |
| |
| |
| for i, sentence in enumerate(sentences): |
| if sentence.strip(): |
| sentence_key = f"sentence_{i+1}" |
| print(f"\n >> Processing sentence {i+1} in {chapter_id}") |
| print(f" Sentence: '{sentence}'") |
| |
| try: |
| print(f" Encoding query...") |
| query_vector = self.encode_query(sentence) |
| print(f" Query vector shape: {getattr(query_vector, 'shape', 'N/A')}") |
| |
| print(f" Searching for top {results_per_sentence} results...") |
| sentence_results = self.search_single_collection( |
| collection_name, query_vector, results_per_sentence |
| ) |
| print(f" Found {len(sentence_results) if sentence_results else 0} results") |
| |
| except Exception as e: |
| print(f" ERROR during search: {e}") |
| sentence_results = [] |
| |
| if sentence_results: |
| chapter_results[sentence_key] = { |
| 'text': sentence, |
| 'chapter_relevance': None, |
| 'results': sentence_results |
| } |
| print(f" β Stored results for sentence {i+1}") |
| |
| |
| top_scores = [r.get('score', 'N/A') for r in sentence_results[:3]] |
| print(f" Top 3 scores: {top_scores}") |
| else: |
| print(f" β No results found for sentence {i+1}") |
| else: |
| print(f" >> Skipping empty sentence {i+1}") |
| |
| if chapter_results: |
| results[chapter_id] = chapter_results |
| print(f"\n β Chapter {chapter_id}: Stored results for {len(chapter_results)} sentences") |
| else: |
| print(f"\n β Chapter {chapter_id}: No results found") |
| |
| |
| print(f"\n=== SEARCH COMPLETE ===") |
| print(f"Results summary:") |
| total_results = 0 |
| for chapter_id, chapter_data in results.items(): |
| sentence_count = len(chapter_data) |
| result_count = sum(len(sent_data.get('results', [])) for sent_data in chapter_data.values()) |
| total_results += result_count |
| print(f" {chapter_id}: {sentence_count} sentences, {result_count} total results") |
| |
| print(f"Grand total: {len(results)} chapters, {total_results} results") |
| print(f"=== END search_targeted_chapters ===\n") |
| |
| return results |
| |
| def format_chapter_analysis(self, diagnostic_string: str, detailed: bool = True) -> str: |
| """Format comprehensive chapter analysis""" |
| analysis = self.analyze_chapters_parallel(diagnostic_string) |
| |
| if not analysis: |
| return "β No relevant chapters found." |
| |
| output = [] |
| output.append(f"\n{'='*90}") |
| output.append(f"π CHAPTER RELEVANCE ANALYSIS") |
| output.append(f"π Diagnostic: '{diagnostic_string}'") |
| output.append(f"{'='*90}") |
| |
| for i, (chapter_id, stats) in enumerate(analysis.items(), 1): |
| if stats['relevance_score'] < 0.05: |
| continue |
| |
| description = self.chapter_info.get(chapter_id, "Unknown chapter") |
| |
| output.append(f"\n{i}. π {chapter_id.upper()}") |
| output.append(f" π·οΈ Collection: {stats['collection_name']}") |
| output.append(f" π Description: {description}") |
| output.append(f" β Relevance Score: {stats['relevance_score']:.4f}") |
| output.append(f" π Statistics:") |
| output.append(f" β’ Matches: {stats['match_count']}") |
| output.append(f" β’ Max Score: {stats['max_score']:.4f}") |
| output.append(f" β’ Avg Score: {stats['avg_score']:.4f}") |
| output.append(f" β’ Score Range: {stats['min_score']:.4f} - {stats['max_score']:.4f}") |
| |
| if detailed: |
| output.append(f"\n π― Top Matches:") |
| for j, match in enumerate(stats['top_matches'][:3], 1): |
| code = match['payload'].get('code', 'N/A') |
| title = match['payload'].get('title', 'N/A') |
| score = match['score'] |
| output.append(f" {j}. {code} - {title}") |
| output.append(f" π― Similarity: {score:.4f}") |
| |
| output.append("-" * 90) |
| |
| return "\n".join(output) |
|
|
|
|
| |
| def analyze_diagnostic_chapters(diagnostic_string: str, detailed: bool = True, use_cloud: bool = True) -> str: |
| """ |
| Main function to analyze which chapters are most relevant for a diagnostic |
| """ |
| retriever = MultiCollectionChapterRetrieval(use_cloud=use_cloud) |
| return retriever.format_chapter_analysis(diagnostic_string, detailed) |
|
|
| def get_relevant_chapters(diagnostic_string: str, top_n: int = 5, use_cloud: bool = True) -> List[str]: |
| """ |
| Get list of most relevant chapter IDs for a diagnostic string |
| Returns: ['chapter_9_IX', 'chapter_10_X', ...] |
| """ |
| retriever = MultiCollectionChapterRetrieval(use_cloud=use_cloud) |
| top_chapters = retriever.get_top_chapters(diagnostic_string, top_n) |
| return [chapter_id for chapter_id, _, _ in top_chapters] |
|
|
| def smart_diagnostic_search( |
| diagnostic_string: str, |
| auto_select_chapters: bool = True, |
| target_chapters: List[str] = None, |
| results_per_sentence: int = 3, |
| use_cloud: bool = True |
| ) -> Dict[str, Dict[str, List[Dict]]]: |
| """ |
| Intelligent diagnostic search that processes each sentence separately |
| Optimized for Qdrant Cloud |
| """ |
| retriever = MultiCollectionChapterRetrieval(use_cloud=use_cloud) |
| |
| if auto_select_chapters: |
| return retriever.search_targeted_chapters( |
| diagnostic_string, target_chapters, results_per_sentence=results_per_sentence |
| ) |
| else: |
| return retriever.search_targeted_chapters( |
| diagnostic_string, target_chapters, results_per_sentence=results_per_sentence |
| ) |
|
|
| def format_smart_search_results( |
| diagnostic_string: str, |
| search_results: Dict[str, Dict[str, List[Dict]]], |
| use_cloud: bool = True |
| ) -> str: |
| """Format the results from sentence-based smart_diagnostic_search""" |
| |
| if not search_results: |
| return "β No results found." |
| |
| retriever = MultiCollectionChapterRetrieval(use_cloud=use_cloud) |
| |
| output = [] |
| output.append(f"\n{'='*90}") |
| output.append(f"π SENTENCE-BASED DIAGNOSTIC SEARCH RESULTS") |
| output.append(f"π― Query: '{diagnostic_string}'") |
| output.append(f"{'='*90}") |
| |
| |
| total_results = 0 |
| total_sentences = 0 |
| for chapter_results in search_results.values(): |
| total_sentences += len(chapter_results) |
| for sentence_data in chapter_results.values(): |
| total_results += len(sentence_data['results']) |
| |
| output.append(f"π Total results: {total_results} across {len(search_results)} chapters and {total_sentences} sentences") |
| |
| for chapter_id, chapter_data in search_results.items(): |
| description = retriever.chapter_info.get(chapter_id, "Unknown chapter") |
| |
| output.append(f"\nπ {chapter_id.upper()}") |
| output.append(f" π {description}") |
| output.append(f" π {len(chapter_data)} sentences processed") |
| output.append("-" * 60) |
| |
| for sentence_key, sentence_data in chapter_data.items(): |
| sentence_text = sentence_data['text'] |
| results = sentence_data['results'] |
| |
| output.append(f"\n π {sentence_key.replace('_', ' ').title()}: \"{sentence_text}\"") |
| output.append(f" π― Top {len(results)} matches:") |
| output.append("") |
| |
| for i, result in enumerate(results, 1): |
| payload = result['payload'] |
| code = payload.get('code', 'N/A') |
| title = payload.get('title', 'N/A') |
| score = result['score'] |
| |
| output.append(f" {i}. {code} - {title}") |
| output.append(f" π― Score: {score:.4f}") |
| |
| |
| desc = payload.get('description', '') |
| if desc: |
| desc_preview = desc[:100] + "..." if len(desc) > 100 else desc |
| output.append(f" π {desc_preview}") |
| |
| output.append("") |
| |
| output.append("=" * 90) |
| |
| return "\n".join(output) |
|
|
| |
| def example_multi_collection_analysis(use_cloud: bool = True): |
| """Example of using the multi-collection chapter analysis""" |
| |
| test_cases = [ |
| "severe chest pain with shortness of breath", |
| "type 2 diabetes with kidney complications", |
| "depression and anxiety disorder", |
| "broken wrist from falling", |
| "acute appendicitis with fever", |
| "skin cancer melanoma", |
| "pregnancy complications in third trimester" |
| ] |
| |
| for diagnostic in test_cases: |
| print(f"\n{'='*100}") |
| print(f"π ANALYZING: {diagnostic}") |
| print(f"{'='*100}") |
| |
| try: |
| |
| analysis = analyze_diagnostic_chapters(diagnostic, detailed=False, use_cloud=use_cloud) |
| print(analysis) |
| |
| |
| top_chapters = get_relevant_chapters(diagnostic, top_n=3, use_cloud=use_cloud) |
| print(f"\nπ Top 3 relevant chapters: {top_chapters}") |
| |
| |
| search_results = smart_diagnostic_search( |
| diagnostic, |
| results_per_sentence=5, |
| use_cloud=use_cloud |
| ) |
| formatted_results = format_smart_search_results( |
| diagnostic, |
| search_results, |
| use_cloud=use_cloud |
| ) |
| print(formatted_results) |
| |
| except Exception as e: |
| print(f"β Error processing '{diagnostic}': {e}") |
| continue |
|
|
| def test_cloud_connection(): |
| """Test Qdrant Cloud connection and basic functionality""" |
| print("π§ͺ Testing Qdrant Cloud Connection...") |
| |
| try: |
| retriever = MultiCollectionChapterRetrieval(use_cloud=True) |
| |
| |
| test_query = "heart disease" |
| print(f"\n㪠Testing with query: '{test_query}'") |
| |
| |
| collections = retriever.get_chapter_collections() |
| print(f"π Available collections: {len(collections)}") |
| |
| if collections: |
| |
| top_chapters = retriever.get_top_chapters(test_query, top_n=3) |
| print(f"π― Top chapters for '{test_query}': {[ch[0] for ch in top_chapters]}") |
| |
| print("β
Cloud connection test successful!") |
| return True |
| else: |
| print("β οΈ No collections found") |
| return False |
| |
| except Exception as e: |
| print(f"β Cloud connection test failed: {e}") |
| return False |
|
|
| if __name__ == "__main__": |
| |
| if test_cloud_connection(): |
| print("\n" + "="*100) |
| print("π Running example analysis with Qdrant Cloud...") |
| print("="*100) |
| |
| |
| example_multi_collection_analysis(use_cloud=True) |
| else: |
| print("β Skipping examples due to connection issues") |
| |
| |
| |
| |
| |