openapi-parser / app /parser.py
Moibe's picture
Normalización de paths de endpoints externos
66aba9d
import httpx
from .schemas import (
EndpointInfo,
Parameter,
RequestBody,
Response,
)
JSON_TYPE_MAP = {
"string": "string",
"integer": "integer",
"number": "number",
"boolean": "boolean",
"array": "array",
"object": "object",
}
def _resolve_ref(spec: dict, ref: str) -> dict:
"""Resolve a $ref pointer like '#/components/schemas/MyModel'."""
parts = ref.lstrip("#/").split("/")
node = spec
for part in parts:
node = node[part]
return node
def _get_type(schema: dict, spec: dict) -> str:
if "$ref" in schema:
schema = _resolve_ref(spec, schema["$ref"])
if "type" in schema:
return JSON_TYPE_MAP.get(schema["type"], schema["type"])
if "anyOf" in schema:
types = [_get_type(s, spec) for s in schema["anyOf"] if s.get("type") != "null"]
return types[0] if len(types) == 1 else " | ".join(types)
return "unknown"
def _extract_fields(schema: dict, spec: dict) -> dict[str, str]:
"""Extract field_name -> type from an object schema."""
if "$ref" in schema:
schema = _resolve_ref(spec, schema["$ref"])
properties = schema.get("properties", {})
return {name: _get_type(prop, spec) for name, prop in properties.items()}
def _parse_parameters(params: list[dict], spec: dict) -> list[Parameter]:
result = []
for p in params:
if "$ref" in p:
p = _resolve_ref(spec, p["$ref"])
schema = p.get("schema", {})
result.append(
Parameter(
name=p["name"],
location=p["in"],
type=_get_type(schema, spec),
required=p.get("required", False),
description=p.get("description"),
)
)
return result
def _parse_request_body(body: dict | None, spec: dict) -> RequestBody | None:
if not body:
return None
if "$ref" in body:
body = _resolve_ref(spec, body["$ref"])
content = body.get("content", {})
for content_type, media in content.items():
schema = media.get("schema", {})
fields = _extract_fields(schema, spec)
return RequestBody(content_type=content_type, fields=fields)
return None
def _parse_responses(responses: dict, spec: dict) -> list[Response]:
result = []
for status_code, resp in responses.items():
if "$ref" in resp:
resp = _resolve_ref(spec, resp["$ref"])
content = resp.get("content", {})
if content:
for content_type, media in content.items():
schema = media.get("schema", {})
fields = _extract_fields(schema, spec)
result.append(
Response(
status_code=str(status_code),
description=resp.get("description"),
content_type=content_type,
fields=fields,
)
)
break
else:
result.append(
Response(
status_code=str(status_code),
description=resp.get("description"),
fields={},
)
)
return result
def parse_endpoint(spec: dict, path: str, method: str, operation: dict) -> EndpointInfo:
return EndpointInfo(
path=path,
method=method.upper(),
summary=operation.get("summary"),
description=operation.get("description"),
operation_id=operation.get("operationId"),
parameters=_parse_parameters(operation.get("parameters", []), spec),
request_body=_parse_request_body(operation.get("requestBody"), spec),
responses=_parse_responses(operation.get("responses", {}), spec),
)
def _normalize_path(p: str) -> str:
"""Strip trailing slashes for consistent comparison, but keep root '/'."""
return p.rstrip("/") or "/"
async def fetch_and_parse(spec_url: str, path: str | None = None, method: str | None = None) -> list[EndpointInfo]:
async with httpx.AsyncClient() as client:
resp = await client.get(str(spec_url), follow_redirects=True)
resp.raise_for_status()
spec = resp.json()
normalized_path = _normalize_path(path) if path else None
endpoints: list[EndpointInfo] = []
for ep_path, methods in spec.get("paths", {}).items():
if normalized_path and _normalize_path(ep_path) != normalized_path:
continue
for ep_method, operation in methods.items():
if ep_method in ("parameters", "summary", "description", "servers"):
continue
if method and ep_method.upper() != method.upper():
continue
endpoints.append(parse_endpoint(spec, ep_path, ep_method, operation))
return endpoints