| | """ |
| | FastAPI REST API for Computer-Using Agent |
| | Provides HTTP endpoints for agent control and interaction |
| | """ |
| |
|
| | from fastapi import FastAPI, HTTPException, WebSocket |
| | from fastapi.middleware.cors import CORSMiddleware |
| | from pydantic import BaseModel |
| | from typing import Optional, Dict, Any |
| | import asyncio |
| | from loguru import logger |
| |
|
| | from .cua_agent import ComputerUsingAgent |
| |
|
| | |
| | app = FastAPI( |
| | title="Computer-Using Agent API", |
| | description="REST API for controlling the computer-using agent", |
| | version="1.0.0" |
| | ) |
| |
|
| | |
| | app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=["*"], |
| | allow_credentials=True, |
| | allow_methods=["*"], |
| | allow_headers=["*"], |
| | ) |
| |
|
| | |
| | agent = ComputerUsingAgent() |
| |
|
| | |
| | class TaskRequest(BaseModel): |
| | task: str |
| | |
| | class TaskResponse(BaseModel): |
| | success: bool |
| | message: str |
| | screenshot: Optional[str] = None |
| | task: str |
| |
|
| | class StatusResponse(BaseModel): |
| | status: str |
| | current_task: Optional[str] |
| | display: str |
| | active_window: Dict[str, Any] |
| |
|
| | class ScreenshotResponse(BaseModel): |
| | screenshot: str |
| | timestamp: str |
| |
|
| | |
| |
|
| | @app.get("/") |
| | async def root(): |
| | """API root endpoint""" |
| | return { |
| | "name": "Computer-Using Agent API", |
| | "version": "1.0.0", |
| | "status": "running", |
| | "endpoints": { |
| | "status": "/agent/status", |
| | "execute": "/agent/execute", |
| | "screenshot": "/agent/screenshot", |
| | "stop": "/agent/stop", |
| | "docs": "/docs" |
| | } |
| | } |
| |
|
| | @app.get("/health") |
| | async def health_check(): |
| | """Health check endpoint""" |
| | return {"status": "healthy"} |
| |
|
| | @app.get("/agent/status", response_model=StatusResponse) |
| | async def get_status(): |
| | """ |
| | Get current agent status |
| | |
| | Returns agent status, current task, and active window information |
| | """ |
| | try: |
| | status = agent.get_status() |
| | return StatusResponse(**status) |
| | except Exception as e: |
| | logger.error(f"Error getting status: {e}") |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | @app.post("/agent/execute", response_model=TaskResponse) |
| | async def execute_task(request: TaskRequest): |
| | """ |
| | Execute a task using the computer-using agent |
| | |
| | Args: |
| | request: Task request with natural language description |
| | |
| | Returns: |
| | Task execution result with screenshot |
| | """ |
| | try: |
| | logger.info(f"Received task: {request.task}") |
| | result = agent.execute_task(request.task) |
| | return TaskResponse(**result) |
| | except Exception as e: |
| | logger.error(f"Error executing task: {e}") |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | @app.post("/agent/screenshot", response_model=ScreenshotResponse) |
| | async def capture_screenshot(): |
| | """ |
| | Capture a screenshot of the desktop |
| | |
| | Returns: |
| | Screenshot as base64-encoded PNG |
| | """ |
| | try: |
| | screenshot_b64 = agent.get_screenshot_base64() |
| | |
| | if screenshot_b64: |
| | import datetime |
| | return ScreenshotResponse( |
| | screenshot=screenshot_b64, |
| | timestamp=datetime.datetime.now().isoformat() |
| | ) |
| | else: |
| | raise HTTPException(status_code=500, detail="Failed to capture screenshot") |
| | |
| | except Exception as e: |
| | logger.error(f"Error capturing screenshot: {e}") |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | @app.post("/agent/stop") |
| | async def stop_agent(): |
| | """ |
| | Stop the current agent task |
| | |
| | Returns: |
| | Success message |
| | """ |
| | try: |
| | agent.stop() |
| | return {"message": "Agent stopped", "status": "stopped"} |
| | except Exception as e: |
| | logger.error(f"Error stopping agent: {e}") |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| | @app.websocket("/ws/agent") |
| | async def websocket_endpoint(websocket: WebSocket): |
| | """ |
| | WebSocket endpoint for real-time agent updates |
| | |
| | Streams agent status and task updates |
| | """ |
| | await websocket.accept() |
| | logger.info("WebSocket client connected") |
| | |
| | try: |
| | while True: |
| | |
| | status = agent.get_status() |
| | await websocket.send_json(status) |
| | await asyncio.sleep(2) |
| | |
| | except Exception as e: |
| | logger.error(f"WebSocket error: {e}") |
| | finally: |
| | logger.info("WebSocket client disconnected") |
| |
|
| | |
| | @app.on_event("startup") |
| | async def startup_event(): |
| | """Initialize services on startup""" |
| | logger.info("Agent API starting up") |
| | |
| | import os |
| | os.makedirs("/app/logs", exist_ok=True) |
| |
|
| | @app.on_event("shutdown") |
| | async def shutdown_event(): |
| | """Cleanup on shutdown""" |
| | logger.info("Agent API shutting down") |
| | agent.stop() |
| |
|
| | if __name__ == "__main__": |
| | import uvicorn |
| | uvicorn.run(app, host="0.0.0.0", port=8000) |
| |
|