| | """ |
| | Workflow execution and monitoring endpoints |
| | """ |
| |
|
| | from fastapi import APIRouter, BackgroundTasks, HTTPException, WebSocket, WebSocketDisconnect |
| | from pydantic import BaseModel |
| | from typing import Dict, List |
| | import uuid |
| | from datetime import datetime |
| | import asyncio |
| | from loguru import logger |
| |
|
| | router = APIRouter() |
| |
|
| | class WorkflowRequest(BaseModel): |
| | """Request to start a workflow""" |
| | patent_id: str |
| | scenario: str = "patent_wakeup" |
| |
|
| | class WorkflowResponse(BaseModel): |
| | """Workflow execution response""" |
| | workflow_id: str |
| | status: str |
| | message: str |
| |
|
| | @router.post("/execute", response_model=WorkflowResponse) |
| | async def execute_workflow( |
| | request: WorkflowRequest, |
| | background_tasks: BackgroundTasks |
| | ): |
| | """ |
| | Start Patent Wake-Up workflow execution. |
| | |
| | Args: |
| | request: Workflow execution request |
| | |
| | Returns: |
| | Workflow ID for tracking progress |
| | """ |
| | from api.main import app_state |
| |
|
| | |
| | if request.patent_id not in app_state["patents"]: |
| | raise HTTPException( |
| | status_code=404, |
| | detail=f"Patent not found: {request.patent_id}" |
| | ) |
| |
|
| | |
| | workflow_id = str(uuid.uuid4()) |
| |
|
| | |
| | workflow_state = { |
| | "id": workflow_id, |
| | "patent_id": request.patent_id, |
| | "scenario": request.scenario, |
| | "status": "queued", |
| | "progress": 0, |
| | "current_step": None, |
| | "started_at": datetime.utcnow().isoformat(), |
| | "completed_at": None, |
| | "execution_time_seconds": None, |
| | "result": None, |
| | "error": None, |
| | "steps": [] |
| | } |
| |
|
| | app_state["workflows"][workflow_id] = workflow_state |
| |
|
| | |
| | app_state["patents"][request.patent_id]["status"] = "analyzing" |
| | app_state["patents"][request.patent_id]["workflow_id"] = workflow_id |
| |
|
| | logger.info(f"🚀 Starting workflow {workflow_id} for patent {request.patent_id}") |
| |
|
| | |
| | background_tasks.add_task( |
| | run_workflow, |
| | workflow_id, |
| | request.patent_id, |
| | request.scenario |
| | ) |
| |
|
| | return WorkflowResponse( |
| | workflow_id=workflow_id, |
| | status="queued", |
| | message="Workflow started successfully" |
| | ) |
| |
|
| | async def run_workflow(workflow_id: str, patent_id: str, scenario: str): |
| | """ |
| | Background task to execute workflow. |
| | |
| | Args: |
| | workflow_id: Unique workflow identifier |
| | patent_id: Patent to analyze |
| | scenario: Workflow scenario type |
| | """ |
| | from api.main import app_state |
| | from src.workflow.langgraph_state import ScenarioType |
| |
|
| | workflow_state = app_state["workflows"][workflow_id] |
| | patent = app_state["patents"][patent_id] |
| |
|
| | start_time = datetime.utcnow() |
| |
|
| | try: |
| | logger.info(f"📊 Executing workflow {workflow_id}...") |
| |
|
| | |
| | workflow_state["status"] = "running" |
| | workflow_state["progress"] = 10 |
| | workflow_state["current_step"] = "initializing" |
| |
|
| | |
| | scenario_map = { |
| | "patent_wakeup": ScenarioType.PATENT_WAKEUP |
| | } |
| | scenario_type = scenario_map.get(scenario, ScenarioType.PATENT_WAKEUP) |
| |
|
| | |
| | logger.info(f"Analyzing patent: {patent['filename']}") |
| |
|
| | workflow_state["current_step"] = "document_analysis" |
| | workflow_state["progress"] = 25 |
| |
|
| | result = await app_state["workflow"].run( |
| | task_description=f"Analyze patent: {patent['filename']} and create valorization roadmap", |
| | scenario=scenario_type, |
| | input_data={"patent_path": patent["path"]}, |
| | task_id=workflow_id |
| | ) |
| |
|
| | |
| | end_time = datetime.utcnow() |
| | execution_time = (end_time - start_time).total_seconds() |
| |
|
| | |
| | workflow_state["status"] = "completed" |
| | workflow_state["progress"] = 100 |
| | workflow_state["current_step"] = "completed" |
| | workflow_state["completed_at"] = end_time.isoformat() |
| | workflow_state["execution_time_seconds"] = execution_time |
| |
|
| | |
| | workflow_state["result"] = { |
| | "success": result.success, |
| | "quality_score": result.quality_score, |
| | "iterations_used": result.iterations_used, |
| | "status_value": result.status.value, |
| |
|
| | |
| | "document_analysis": result.agent_outputs.get("document_analysis"), |
| |
|
| | |
| | "market_analysis": result.agent_outputs.get("market_analysis"), |
| |
|
| | |
| | "matches": result.agent_outputs.get("matches", []), |
| |
|
| | |
| | "brief": result.agent_outputs.get("brief"), |
| |
|
| | |
| | "executor_output": result.agent_outputs.get("executor", {}) |
| | } |
| |
|
| | |
| | patent["status"] = "analyzed" |
| |
|
| | logger.success(f"✅ Workflow {workflow_id} completed in {execution_time:.1f}s") |
| |
|
| | except Exception as e: |
| | logger.error(f"❌ Workflow {workflow_id} failed: {e}") |
| |
|
| | workflow_state["status"] = "failed" |
| | workflow_state["error"] = str(e) |
| | workflow_state["completed_at"] = datetime.utcnow().isoformat() |
| |
|
| | |
| | patent["status"] = "failed" |
| |
|
| | import traceback |
| | traceback.print_exc() |
| |
|
| | @router.get("/{workflow_id}", response_model=Dict) |
| | async def get_workflow(workflow_id: str): |
| | """ |
| | Get workflow status and results. |
| | |
| | Args: |
| | workflow_id: Unique workflow identifier |
| | |
| | Returns: |
| | Workflow state including results if completed |
| | """ |
| | from api.main import app_state |
| |
|
| | if workflow_id not in app_state["workflows"]: |
| | raise HTTPException( |
| | status_code=404, |
| | detail=f"Workflow not found: {workflow_id}" |
| | ) |
| |
|
| | return app_state["workflows"][workflow_id] |
| |
|
| | @router.get("/", response_model=List[Dict]) |
| | async def list_workflows( |
| | status: str = None, |
| | limit: int = 100, |
| | offset: int = 0 |
| | ): |
| | """ |
| | List all workflows. |
| | |
| | Args: |
| | status: Filter by status (queued, running, completed, failed) |
| | limit: Maximum number of results |
| | offset: Pagination offset |
| | |
| | Returns: |
| | List of workflow states |
| | """ |
| | from api.main import app_state |
| |
|
| | workflows = list(app_state["workflows"].values()) |
| |
|
| | |
| | if status: |
| | workflows = [w for w in workflows if w["status"] == status] |
| |
|
| | |
| | workflows.sort(key=lambda x: x["started_at"], reverse=True) |
| |
|
| | |
| | workflows = workflows[offset:offset + limit] |
| |
|
| | return workflows |
| |
|
| | @router.websocket("/{workflow_id}/stream") |
| | async def stream_workflow(websocket: WebSocket, workflow_id: str): |
| | """ |
| | WebSocket endpoint for real-time workflow updates. |
| | |
| | Args: |
| | websocket: WebSocket connection |
| | workflow_id: Workflow to stream |
| | """ |
| | from api.main import app_state |
| |
|
| | await websocket.accept() |
| |
|
| | logger.info(f"📡 WebSocket connected for workflow {workflow_id}") |
| |
|
| | if workflow_id not in app_state["workflows"]: |
| | await websocket.send_json({"error": "Workflow not found"}) |
| | await websocket.close() |
| | return |
| |
|
| | try: |
| | |
| | while True: |
| | workflow_state = app_state["workflows"].get(workflow_id) |
| |
|
| | if not workflow_state: |
| | await websocket.send_json({"error": "Workflow removed"}) |
| | break |
| |
|
| | |
| | await websocket.send_json(workflow_state) |
| |
|
| | |
| | if workflow_state["status"] in ["completed", "failed"]: |
| | logger.info(f"Workflow {workflow_id} finished, closing WebSocket") |
| | break |
| |
|
| | |
| | await asyncio.sleep(1) |
| |
|
| | except WebSocketDisconnect: |
| | logger.info(f"WebSocket disconnected for workflow {workflow_id}") |
| | except Exception as e: |
| | logger.error(f"WebSocket error: {e}") |
| | finally: |
| | await websocket.close() |
| |
|
| | @router.get("/{workflow_id}/brief/download") |
| | async def download_brief(workflow_id: str): |
| | """ |
| | Download the generated valorization brief. |
| | |
| | Args: |
| | workflow_id: Workflow identifier |
| | |
| | Returns: |
| | PDF file |
| | """ |
| | from api.main import app_state |
| | from fastapi.responses import FileResponse |
| | from pathlib import Path |
| |
|
| | if workflow_id not in app_state["workflows"]: |
| | raise HTTPException( |
| | status_code=404, |
| | detail="Workflow not found" |
| | ) |
| |
|
| | workflow = app_state["workflows"][workflow_id] |
| |
|
| | if workflow["status"] != "completed": |
| | raise HTTPException( |
| | status_code=400, |
| | detail="Workflow not yet completed" |
| | ) |
| |
|
| | |
| | result = workflow.get("result") or {} |
| | brief = result.get("brief") or {} |
| | pdf_path = brief.get("pdf_path") if isinstance(brief, dict) else None |
| |
|
| | if not pdf_path: |
| | raise HTTPException( |
| | status_code=404, |
| | detail="Valorization brief not found" |
| | ) |
| |
|
| | file_path = Path(pdf_path) |
| |
|
| | if not file_path.exists(): |
| | raise HTTPException( |
| | status_code=404, |
| | detail="Brief file not found on disk" |
| | ) |
| |
|
| | return FileResponse( |
| | path=file_path, |
| | media_type="application/pdf", |
| | filename=file_path.name |
| | ) |
| |
|