| | """ |
| | CriticAgent for SPARKNET - LangChain Version |
| | Reviews and validates outputs against VISTA quality standards |
| | Uses LangChain chains for structured validation and feedback |
| | """ |
| |
|
| | from typing import Optional, Dict, Any, List |
| | from loguru import logger |
| | import json |
| |
|
| | from langchain_core.prompts import ChatPromptTemplate |
| | from langchain_core.output_parsers import JsonOutputParser |
| | from langchain_core.messages import HumanMessage, SystemMessage |
| |
|
| | from .base_agent import BaseAgent, Task, Message |
| | from ..llm.langchain_ollama_client import LangChainOllamaClient |
| | from ..workflow.langgraph_state import ValidationResult |
| |
|
| |
|
| | class CriticAgent(BaseAgent): |
| | """ |
| | Agent specialized in output validation and quality assurance. |
| | Uses LangChain chains with mistral for balanced analysis. |
| | Ensures outputs meet VISTA quality standards. |
| | """ |
| |
|
| | |
| | QUALITY_CRITERIA = { |
| | 'patent_analysis': { |
| | 'completeness': { |
| | 'weight': 0.30, |
| | 'threshold': 0.90, |
| | 'description': 'Must extract >90% of claims and key information', |
| | }, |
| | 'clarity': { |
| | 'weight': 0.25, |
| | 'threshold': 0.85, |
| | 'description': 'Summaries and explanations must be clear and understandable', |
| | }, |
| | 'actionability': { |
| | 'weight': 0.25, |
| | 'threshold': 0.80, |
| | 'description': 'Must include clear next steps and recommendations', |
| | }, |
| | 'accuracy': { |
| | 'weight': 0.20, |
| | 'threshold': 0.90, |
| | 'description': 'Information must be factually correct', |
| | }, |
| | }, |
| | 'legal_review': { |
| | 'accuracy': { |
| | 'weight': 0.35, |
| | 'threshold': 0.95, |
| | 'description': 'Risk identification must be precise', |
| | }, |
| | 'coverage': { |
| | 'weight': 0.30, |
| | 'threshold': 0.90, |
| | 'description': 'Must check all major clauses and sections', |
| | }, |
| | 'compliance': { |
| | 'weight': 0.25, |
| | 'threshold': 1.00, |
| | 'description': 'GDPR/Law 25 compliance must be 100%', |
| | }, |
| | 'actionability': { |
| | 'weight': 0.10, |
| | 'threshold': 0.85, |
| | 'description': 'Must provide clear remediation steps', |
| | }, |
| | }, |
| | 'stakeholder_matching': { |
| | 'relevance': { |
| | 'weight': 0.35, |
| | 'threshold': 0.85, |
| | 'description': 'Matches must be relevant to objectives', |
| | }, |
| | 'diversity': { |
| | 'weight': 0.20, |
| | 'threshold': 0.75, |
| | 'description': 'Should include diverse perspectives', |
| | }, |
| | 'justification': { |
| | 'weight': 0.25, |
| | 'threshold': 0.80, |
| | 'description': 'Must explain why matches are appropriate', |
| | }, |
| | 'actionability': { |
| | 'weight': 0.20, |
| | 'threshold': 0.85, |
| | 'description': 'Must include concrete next steps', |
| | }, |
| | }, |
| | 'general': { |
| | 'completeness': { |
| | 'weight': 0.30, |
| | 'threshold': 0.80, |
| | 'description': 'All required elements present', |
| | }, |
| | 'clarity': { |
| | 'weight': 0.25, |
| | 'threshold': 0.80, |
| | 'description': 'Clear and understandable', |
| | }, |
| | 'accuracy': { |
| | 'weight': 0.25, |
| | 'threshold': 0.85, |
| | 'description': 'Factually correct', |
| | }, |
| | 'actionability': { |
| | 'weight': 0.20, |
| | 'threshold': 0.75, |
| | 'description': 'Provides next steps', |
| | }, |
| | }, |
| | } |
| |
|
| | def __init__( |
| | self, |
| | llm_client: LangChainOllamaClient, |
| | memory_agent: Optional['MemoryAgent'] = None, |
| | temperature: float = 0.6, |
| | ): |
| | """ |
| | Initialize CriticAgent with LangChain client. |
| | |
| | Args: |
| | llm_client: LangChain Ollama client |
| | memory_agent: Optional memory agent for context |
| | temperature: LLM temperature for validation |
| | """ |
| | self.llm_client = llm_client |
| | self.memory_agent = memory_agent |
| | self.temperature = temperature |
| |
|
| | |
| | self.validation_chain = self._create_validation_chain() |
| | self.feedback_chain = self._create_feedback_chain() |
| |
|
| | |
| | self.name = "CriticAgent" |
| | self.description = "Output validation and quality assurance" |
| |
|
| | logger.info(f"Initialized CriticAgent with LangChain (complexity: analysis)") |
| |
|
| | def _create_validation_chain(self): |
| | """ |
| | Create LangChain chain for output validation. |
| | |
| | Returns: |
| | Runnable chain: prompt | llm | parser |
| | """ |
| | system_template = """You are a critical analysis agent for research valorization outputs. |
| | |
| | Your role is to: |
| | 1. Review outputs from other agents objectively |
| | 2. Identify errors, inconsistencies, or gaps |
| | 3. Assess quality against specific criteria |
| | 4. Provide constructive feedback for improvement |
| | 5. Ensure alignment with VISTA project objectives |
| | |
| | When reviewing output, evaluate: |
| | - Completeness: Are all required elements present? |
| | - Clarity: Is it easy to understand? |
| | - Accuracy: Is the information correct? |
| | - Actionability: Does it provide clear next steps? |
| | - Relevance: Does it address the original task? |
| | |
| | Be thorough but fair. Focus on constructive feedback that helps improve quality. |
| | |
| | Output your assessment as JSON with this structure: |
| | {{ |
| | "dimension_scores": {{"completeness": 0.85, "clarity": 0.90, ...}}, |
| | "issues": ["Issue 1", "Issue 2"], |
| | "suggestions": ["Suggestion 1", "Suggestion 2"], |
| | "details": {{}} |
| | }}""" |
| |
|
| | human_template = """Review the following output and assess its quality. |
| | |
| | ORIGINAL TASK: |
| | {task_description} |
| | |
| | OUTPUT TO REVIEW: |
| | {output_text} |
| | |
| | QUALITY CRITERIA: |
| | {criteria_text} |
| | |
| | For each criterion, score from 0.0 to 1.0: |
| | - 1.0 = Perfect |
| | - 0.8-0.9 = Good, minor improvements possible |
| | - 0.6-0.7 = Acceptable, some issues |
| | - 0.4-0.5 = Poor, significant issues |
| | - < 0.4 = Unacceptable |
| | |
| | Provide: |
| | 1. Score for each dimension (dimension_scores) |
| | 2. List of specific issues found (issues) |
| | 3. Concrete suggestions for improvement (suggestions) |
| | 4. Additional details if needed (details) |
| | |
| | Output JSON only.""" |
| |
|
| | prompt = ChatPromptTemplate.from_messages([ |
| | ("system", system_template), |
| | ("human", human_template) |
| | ]) |
| |
|
| | |
| | llm = self.llm_client.get_llm(complexity="analysis", temperature=self.temperature) |
| |
|
| | |
| | parser = JsonOutputParser() |
| |
|
| | |
| | chain = prompt | llm | parser |
| |
|
| | return chain |
| |
|
| | def _create_feedback_chain(self): |
| | """ |
| | Create LangChain chain for generating constructive feedback. |
| | |
| | Returns: |
| | Runnable chain for feedback generation |
| | """ |
| | system_template = """You are an expert at providing constructive feedback for improvement. |
| | |
| | Your role is to: |
| | 1. Analyze validation results and identify key issues |
| | 2. Generate specific, actionable improvement suggestions |
| | 3. Prioritize suggestions by impact |
| | 4. Explain why each suggestion matters |
| | 5. Be encouraging while being honest about problems |
| | |
| | Focus on feedback that: |
| | - Is specific and concrete |
| | - Can be acted upon immediately |
| | - Addresses root causes, not symptoms |
| | - Builds on strengths while fixing weaknesses""" |
| |
|
| | human_template = """Generate constructive feedback for the following output. |
| | |
| | VALIDATION RESULTS: |
| | - Overall Score: {overall_score} |
| | - Issues: {issues} |
| | - Dimension Scores: {dimension_scores} |
| | |
| | ORIGINAL OUTPUT: |
| | {output_text} |
| | |
| | Provide prioritized suggestions for improvement. Output as JSON: |
| | {{ |
| | "priority_suggestions": ["Most important suggestion", "Second priority", ...], |
| | "strengths": ["What worked well", ...], |
| | "weaknesses": ["What needs improvement", ...], |
| | "next_steps": ["Specific action 1", "Specific action 2", ...] |
| | }}""" |
| |
|
| | prompt = ChatPromptTemplate.from_messages([ |
| | ("system", system_template), |
| | ("human", human_template) |
| | ]) |
| |
|
| | llm = self.llm_client.get_llm(complexity="analysis", temperature=self.temperature) |
| | parser = JsonOutputParser() |
| |
|
| | chain = prompt | llm | parser |
| |
|
| | return chain |
| |
|
| | async def process_task(self, task: Task) -> Task: |
| | """ |
| | Process validation task. |
| | |
| | Args: |
| | task: Task containing output to validate |
| | |
| | Returns: |
| | Updated task with validation result |
| | """ |
| | logger.info(f"CriticAgent validating output for task: {task.id}") |
| | task.status = "in_progress" |
| |
|
| | try: |
| | |
| | if not task.metadata or 'output_to_validate' not in task.metadata: |
| | raise ValueError("No output provided for validation") |
| |
|
| | output = task.metadata['output_to_validate'] |
| | output_type = task.metadata.get('output_type', 'general') |
| | criteria_override = task.metadata.get('criteria') |
| |
|
| | |
| | validation_result = await self.validate_output( |
| | output=output, |
| | task=task, |
| | output_type=output_type, |
| | criteria=criteria_override, |
| | ) |
| |
|
| | |
| | task.result = validation_result |
| | task.status = "completed" |
| |
|
| | logger.info(f"Validation completed: {validation_result.overall_score:.2f} score") |
| |
|
| | except Exception as e: |
| | logger.error(f"Validation failed: {e}") |
| | task.status = "failed" |
| | task.error = str(e) |
| |
|
| | return task |
| |
|
| | async def validate_output( |
| | self, |
| | output: Any, |
| | task: Task, |
| | output_type: str = 'general', |
| | criteria: Optional[Dict[str, Any]] = None, |
| | ) -> ValidationResult: |
| | """ |
| | Validate output against quality criteria using LangChain. |
| | |
| | Args: |
| | output: Output to validate (can be str, dict, list, etc.) |
| | task: Original task that produced this output |
| | output_type: Type of output (determines criteria) |
| | criteria: Optional custom criteria |
| | |
| | Returns: |
| | ValidationResult with score, issues, and suggestions |
| | """ |
| | |
| | if criteria is None: |
| | criteria = self.QUALITY_CRITERIA.get(output_type, self.QUALITY_CRITERIA['general']) |
| |
|
| | |
| | if isinstance(output, dict) or isinstance(output, list): |
| | output_str = json.dumps(output, indent=2) |
| | else: |
| | output_str = str(output) |
| |
|
| | |
| | output_str = output_str[:2000] |
| |
|
| | |
| | criteria_desc = [] |
| | for dim, props in criteria.items(): |
| | criteria_desc.append( |
| | f"- {dim.capitalize()} (threshold: {props['threshold']:.0%}): {props['description']}" |
| | ) |
| | criteria_text = "\n".join(criteria_desc) |
| |
|
| | try: |
| | |
| | result = await self.validation_chain.ainvoke({ |
| | "task_description": task.description, |
| | "output_text": output_str, |
| | "criteria_text": criteria_text |
| | }) |
| |
|
| | |
| | dimension_scores = result.get('dimension_scores', {}) |
| |
|
| | |
| | total_weight = sum(props['weight'] for props in criteria.values()) |
| | overall_score = 0.0 |
| |
|
| | for dim, props in criteria.items(): |
| | score = dimension_scores.get(dim, 0.0) |
| | weight = props['weight'] |
| | overall_score += score * weight |
| |
|
| | if total_weight > 0: |
| | overall_score /= total_weight |
| |
|
| | |
| | valid = all( |
| | dimension_scores.get(dim, 0.0) >= props['threshold'] |
| | for dim, props in criteria.items() |
| | ) |
| |
|
| | |
| | validation_result = ValidationResult( |
| | valid=valid, |
| | overall_score=overall_score, |
| | dimension_scores=dimension_scores, |
| | issues=result.get('issues', []), |
| | suggestions=result.get('suggestions', []), |
| | details=result.get('details', {}), |
| | ) |
| |
|
| | return validation_result |
| |
|
| | except Exception as e: |
| | logger.error(f"Failed to validate with LangChain: {e}") |
| | logger.debug(f"Output was: {output_str[:500]}") |
| |
|
| | |
| | return ValidationResult( |
| | valid=False, |
| | overall_score=0.0, |
| | dimension_scores={}, |
| | issues=[f"Failed to validate: {str(e)}"], |
| | suggestions=["Re-run validation with clearer output"], |
| | details={'error': str(e)}, |
| | ) |
| |
|
| | async def suggest_improvements( |
| | self, |
| | validation_result: ValidationResult, |
| | original_output: Any, |
| | ) -> List[str]: |
| | """ |
| | Generate actionable improvement suggestions using LangChain. |
| | |
| | Args: |
| | validation_result: Previous validation result |
| | original_output: The output that was validated |
| | |
| | Returns: |
| | List of improvement suggestions |
| | """ |
| | if validation_result.valid and validation_result.overall_score >= 0.9: |
| | return ["Output is excellent. No major improvements needed."] |
| |
|
| | |
| | if validation_result.suggestions and len(validation_result.suggestions) > 0: |
| | return validation_result.suggestions |
| |
|
| | |
| | try: |
| | output_str = str(original_output)[:1000] |
| | |
| | result = await self.feedback_chain.ainvoke({ |
| | "overall_score": f"{validation_result.overall_score:.2f}", |
| | "issues": ", ".join(validation_result.issues), |
| | "dimension_scores": json.dumps(validation_result.dimension_scores), |
| | "output_text": output_str |
| | }) |
| |
|
| | suggestions = result.get('priority_suggestions', []) |
| | next_steps = result.get('next_steps', []) |
| |
|
| | return suggestions + next_steps |
| |
|
| | except Exception as e: |
| | logger.error(f"Failed to generate suggestions: {e}") |
| | |
| | |
| | suggestions = [] |
| | for issue in validation_result.issues: |
| | suggestions.append(f"Address: {issue}") |
| |
|
| | |
| | for dim, score in validation_result.dimension_scores.items(): |
| | if score < 0.8: |
| | suggestions.append(f"Improve {dim}: Current score {score:.2f}, aim for >0.80") |
| |
|
| | return suggestions |
| |
|
| | def get_feedback_for_iteration( |
| | self, |
| | validation_result: ValidationResult, |
| | ) -> str: |
| | """ |
| | Format validation feedback for iterative improvement. |
| | |
| | Args: |
| | validation_result: Validation result |
| | |
| | Returns: |
| | Formatted feedback string |
| | """ |
| | feedback_parts = [] |
| |
|
| | |
| | if validation_result.valid: |
| | feedback_parts.append(f"✓ Output is VALID (score: {validation_result.overall_score:.2f})") |
| | else: |
| | feedback_parts.append(f"✗ Output is INVALID (score: {validation_result.overall_score:.2f})") |
| |
|
| | |
| | feedback_parts.append("\nQuality Dimensions:") |
| | for dim, score in validation_result.dimension_scores.items(): |
| | status = "✓" if score >= 0.8 else "✗" |
| | feedback_parts.append(f" {status} {dim.capitalize()}: {score:.2f}") |
| |
|
| | |
| | if validation_result.issues: |
| | feedback_parts.append("\nIssues Found:") |
| | for i, issue in enumerate(validation_result.issues, 1): |
| | feedback_parts.append(f" {i}. {issue}") |
| |
|
| | |
| | if validation_result.suggestions: |
| | feedback_parts.append("\nSuggestions for Improvement:") |
| | for i, suggestion in enumerate(validation_result.suggestions, 1): |
| | feedback_parts.append(f" {i}. {suggestion}") |
| |
|
| | return "\n".join(feedback_parts) |
| |
|
| | def get_vista_criteria(self, output_type: str) -> Dict[str, Any]: |
| | """ |
| | Get VISTA quality criteria for a specific output type. |
| | |
| | Args: |
| | output_type: Type of output |
| | |
| | Returns: |
| | Quality criteria dictionary |
| | """ |
| | return self.QUALITY_CRITERIA.get(output_type, self.QUALITY_CRITERIA['general']) |
| |
|