Alibrown's picture
Update app/mcp.py
78519ba verified
# =============================================================================
# root/app/mcp.py
# 14.03.2026
# Universal MCP Hub (Sandboxed) - based on PyFundaments Architecture
# Copyright 2026 - Volkan KΓΌcΓΌkbudak
# Apache License V. 2 + ESOL 1.1
# Repo: https://github.com/VolkanSah/Universal-MCP-Hub-sandboxed
# =============================================================================
# ARCHITECTURE NOTE:
# This file lives exclusively in app/ and is ONLY started by app/app.py.
# NO direct access to fundaments/*, .env, or Guardian (main.py).
# All config comes from app/.pyfun via app/config.py.
#
# TRANSPORT:
# Primary: Streamable HTTP (MCP spec 2025-11-25) β†’ single /mcp endpoint
# Configured via HUB_TRANSPORT = "streamable-http" in .pyfun [HUB]
# ASGI-App via get_asgi_app() β†’ mounted by app/app.py
#
# Fallback: SSE (legacy, deprecated per spec) β†’ /mcp route via Quart
# Configured via HUB_TRANSPORT = "sse" in .pyfun [HUB]
# handle_request() called directly by app/app.py Quart route
#
# All MCP traffic (both transports) passes through app/app.py first β€”
# auth checks, rate limiting, logging can be added there before reaching MCP.
#
# TOOL REGISTRATION PRINCIPLE:
# Tools are registered via tools.py β€” NOT hardcoded here.
# No key = no provider = no tool = no crash.
# Server always starts, just with fewer tools.
# Adding a new tool = update .pyfun + providers.py only. Never touch mcp.py.
#
# DEPENDENCY CHAIN (app/* only, no fundaments!):
# config.py β†’ parses app/.pyfun β€” single source of truth
# providers.py β†’ LLM + Search provider registry + fallback chain
# models.py β†’ model limits, costs, capabilities from .pyfun [MODELS]
# tools.py β†’ tool registry + execution β€” reads .pyfun [TOOLS]
# db_sync.py β†’ internal SQLite IPC (app/* state) β€” NOT postgresql.py!
# mcp.py β†’ registers tools only, delegates all logic to tools.py
# =============================================================================
import logging
from typing import Dict, Any
from . import config as app_config
from . import providers
from . import models
from . import tools
logger = logging.getLogger('mcp')
# =============================================================================
# Globals β€” set once during initialize(), never touched elsewhere
# =============================================================================
_mcp = None # FastMCP instance
_transport = None # "streamable-http" | "sse" β€” from .pyfun [HUB] HUB_TRANSPORT
_stateless = None # True = HF Spaces / horizontal scaling safe
# =============================================================================
# Initialization β€” called exclusively by app/app.py
# =============================================================================
async def initialize() -> None:
"""
Initializes the MCP instance and registers all tools.
Called once by app/app.py during startup sequence.
No fundaments passed in β€” fully sandboxed.
Reads HUB_TRANSPORT and HUB_STATELESS from .pyfun [HUB].
Transport modes:
streamable-http β†’ get_asgi_app() returns ASGI app β†’ app.py mounts it
sse β†’ handle_request() used by Quart route in app.py
Registration order:
1. LLM tools β†’ via tools.py + providers.py (key-gated)
2. Search tools β†’ via tools.py + providers.py (key-gated)
3. System tools β†’ always registered, no key required
4. DB tools β†’ uncomment when db_sync.py is ready
"""
global _mcp, _transport, _stateless
hub_cfg = app_config.get_hub()
_transport = hub_cfg.get("HUB_TRANSPORT", "streamable-http").lower()
_stateless = hub_cfg.get("HUB_STATELESS", "true").lower() == "true"
logger.info(f"MCP Hub initializing (transport: {_transport}, stateless: {_stateless})...")
try:
from mcp.server.fastmcp import FastMCP
except ImportError:
logger.critical("FastMCP not installed. Run: pip install mcp")
raise
_mcp = FastMCP(
name=hub_cfg.get("HUB_NAME", "Universal MCP Hub"),
instructions=(
f"{hub_cfg.get('HUB_DESCRIPTION', 'Universal MCP Hub on PyFundaments')} "
"Use list_active_tools to see what is currently available."
),
stateless_http=_stateless, # True = no session state, HF Spaces safe
)
# --- Initialize registries ---
providers.initialize()
models.initialize()
tools.initialize()
# --- Register MCP tools ---
_register_llm_tools(_mcp)
_register_search_tools(_mcp)
_register_system_tools(_mcp)
# _register_db_tools(_mcp) # uncomment when db_sync.py is ready
logger.info(f"MCP Hub initialized. Transport: {_transport}")
# =============================================================================
# ASGI App β€” used by app/app.py for Streamable HTTP transport
# =============================================================================
def get_asgi_app():
"""
Returns the ASGI app for the configured transport.
Called by app/app.py AFTER initialize() β€” mounted as ASGI sub-app.
Streamable HTTP: mounts on /mcp β€” single endpoint for all MCP traffic.
SSE (fallback): returns sse_app() for legacy client compatibility.
NOTE: For SSE transport, app/app.py uses the Quart route + handle_request()
instead β€” get_asgi_app() is only called for streamable-http.
"""
if _mcp is None:
raise RuntimeError("MCP not initialized β€” call initialize() first.")
if _transport == "streamable-http":
logger.info("MCP ASGI app: Streamable HTTP β†’ /mcp")
return _mcp.streamable_http_app()
else:
# SSE as ASGI app β€” only used if app.py mounts it directly
# (normally app.py uses the Quart route + handle_request() for SSE)
logger.info("MCP ASGI app: SSE (legacy) β†’ /sse")
return _mcp.sse_app()
# =============================================================================
# Request Handler β€” Quart /mcp route entry point (SSE legacy transport only)
# =============================================================================
async def handle_request(request) -> None:
"""
Handles incoming MCP SSE requests via Quart /mcp route.
Only active when HUB_TRANSPORT = "sse" in .pyfun [HUB].
For Streamable HTTP transport this function is NOT called β€”
app/app.py mounts the ASGI app from get_asgi_app() directly.
Interceptor point for SSE traffic:
Add auth, rate limiting, logging here before reaching MCP.
"""
if _mcp is None:
logger.error("MCP not initialized β€” call initialize() first.")
from quart import jsonify
return jsonify({"error": "MCP not initialized"}), 503
# --- Interceptor hooks (uncomment as needed) ---
# logger.debug(f"MCP SSE request: {request.method} {request.path}")
# await _check_auth(request)
# await _rate_limit(request)
# await _log_payload(request)
return await _mcp.handle_sse(request)
# =============================================================================
# Tool Registration β€” delegates all logic to tools.py
# =============================================================================
def _register_llm_tools(mcp) -> None:
"""
Register LLM completion tool.
All logic delegated to tools.py β†’ providers.py.
Adding a new LLM provider = update .pyfun + providers.py. Never touch this.
"""
if not providers.list_active_llm():
logger.info("No active LLM providers β€” llm_complete tool skipped.")
return
@mcp.tool()
async def llm_complete(
prompt: str,
provider: str = None,
model: str = None,
max_tokens: int = 1024,
) -> str:
"""
Send a prompt to any configured LLM provider.
Automatically follows the fallback chain defined in .pyfun if a provider fails.
Args:
prompt: The input text to send to the model.
provider: Provider name (e.g. 'anthropic', 'gemini', 'openrouter', 'huggingface').
Defaults to default_provider from .pyfun [TOOL.llm_complete].
model: Model name override. Defaults to provider's default_model in .pyfun.
max_tokens: Maximum tokens in the response. Default: 1024.
Returns:
Model response as plain text string.
"""
return await tools.run(
tool_name="llm_complete",
prompt=prompt,
provider_name=provider,
model=model,
max_tokens=max_tokens,
)
logger.info(f"Tool registered: llm_complete (active providers: {providers.list_active_llm()})")
def _register_search_tools(mcp) -> None:
"""
Register web search tool.
All logic delegated to tools.py β†’ providers.py.
Adding a new search provider = update .pyfun + providers.py. Never touch this.
"""
if not providers.list_active_search():
logger.info("No active search providers β€” web_search tool skipped.")
return
@mcp.tool()
async def web_search(
query: str,
provider: str = None,
max_results: int = 5,
) -> str:
"""
Search the web via any configured search provider.
Automatically follows the fallback chain defined in .pyfun if a provider fails.
Args:
query: Search query string.
provider: Provider name (e.g. 'brave', 'tavily').
Defaults to default_provider from .pyfun [TOOL.web_search].
max_results: Maximum number of results to return. Default: 5.
Returns:
Formatted search results as plain text string.
"""
return await tools.run(
tool_name="web_search",
prompt=query,
provider_name=provider,
max_results=max_results,
)
logger.info(f"Tool registered: web_search (active providers: {providers.list_active_search()})")
def _register_system_tools(mcp) -> None:
"""
System tools β€” always registered, no ENV key required.
Exposes hub status and model info without touching secrets.
"""
@mcp.tool()
def list_active_tools() -> Dict[str, Any]:
"""
List all active providers and registered tools.
Shows ENV key names only β€” never exposes values or secrets.
Returns:
Dict with hub info, active LLM providers, active search providers,
available tools and model names.
"""
hub = app_config.get_hub()
return {
"hub": hub.get("HUB_NAME", "Universal MCP Hub"),
"version": hub.get("HUB_VERSION", ""),
"transport": _transport,
"active_llm_providers": providers.list_active_llm(),
"active_search_providers": providers.list_active_search(),
"active_tools": tools.list_all(),
"available_models": models.list_all(),
}
logger.info("Tool registered: list_active_tools")
@mcp.tool()
def health_check() -> Dict[str, str]:
"""
Health check endpoint for HuggingFace Spaces and monitoring systems.
Returns:
Dict with service status and active transport.
"""
return {
"status": "ok",
"service": "Universal MCP Hub",
"transport": _transport,
}
logger.info("Tool registered: health_check")
@mcp.tool()
def get_model_info(model_name: str) -> Dict[str, Any]:
"""
Get limits, costs, and capabilities for a specific model.
Args:
model_name: Model name as defined in .pyfun [MODELS] (e.g. 'claude-sonnet-4-6').
Returns:
Dict with context size, max output tokens, rate limits, costs, and capabilities.
Returns empty dict if model is not configured in .pyfun.
"""
return models.get(model_name)
logger.info("Tool registered: get_model_info")
# =============================================================================
# DB Tools β€” uncomment when db_sync.py is ready
# =============================================================================
# def _register_db_tools(mcp) -> None:
# """
# Register internal SQLite query tool.
# Uses db_sync.py (app/* internal SQLite) β€” NOT postgresql.py (Guardian-only)!
#
# SECURITY: Only SELECT queries are permitted.
# Enforced at application level in db_sync.query() β€” not just in docs.
# Tables accessible: hub_state, tool_cache (app/* only)
# Tables blocked: users, sessions (Guardian-only, different owner)
#
# To enable:
# 1. Uncomment this function
# 2. Uncomment _register_db_tools(_mcp) in initialize()
# 3. Make sure db_sync.initialize() is called in app/app.py before mcp.initialize()
# """
# from . import db_sync
#
# @mcp.tool()
# async def db_query(sql: str) -> list:
# """
# Execute a read-only SELECT query on the internal hub state database.
#
# Only SELECT statements are permitted β€” all write operations are blocked
# at the db_sync layer (not just by convention).
#
# Accessible tables:
# hub_state β€” current hub runtime state (tool status, uptime, etc.)
# tool_cache β€” cached tool results for repeated queries
#
# NOT accessible (Guardian-only):
# users β€” managed by fundaments/user_handler.py
# sessions β€” managed by fundaments/user_handler.py
#
# Args:
# sql: SQL SELECT statement. Example: "SELECT * FROM hub_state LIMIT 10"
#
# Returns:
# List of result rows as dicts. Empty list if no results.
#
# Raises:
# ValueError: If statement is not a SELECT query.
# RuntimeError: If db_sync is not initialized.
# """
# return await db_sync.query(sql)
#
# logger.info("Tool registered: db_query (SQLite SELECT-only, app/* tables)")
# =============================================================================
# Direct execution guard
# =============================================================================
if __name__ == '__main__':
print("WARNING: Run via main.py β†’ app.py, not directly.")