Spaces:
Running
Running
File size: 4,841 Bytes
08040eb 66aba9d 08040eb 66aba9d 08040eb 66aba9d 08040eb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 | import httpx
from .schemas import (
EndpointInfo,
Parameter,
RequestBody,
Response,
)
JSON_TYPE_MAP = {
"string": "string",
"integer": "integer",
"number": "number",
"boolean": "boolean",
"array": "array",
"object": "object",
}
def _resolve_ref(spec: dict, ref: str) -> dict:
"""Resolve a $ref pointer like '#/components/schemas/MyModel'."""
parts = ref.lstrip("#/").split("/")
node = spec
for part in parts:
node = node[part]
return node
def _get_type(schema: dict, spec: dict) -> str:
if "$ref" in schema:
schema = _resolve_ref(spec, schema["$ref"])
if "type" in schema:
return JSON_TYPE_MAP.get(schema["type"], schema["type"])
if "anyOf" in schema:
types = [_get_type(s, spec) for s in schema["anyOf"] if s.get("type") != "null"]
return types[0] if len(types) == 1 else " | ".join(types)
return "unknown"
def _extract_fields(schema: dict, spec: dict) -> dict[str, str]:
"""Extract field_name -> type from an object schema."""
if "$ref" in schema:
schema = _resolve_ref(spec, schema["$ref"])
properties = schema.get("properties", {})
return {name: _get_type(prop, spec) for name, prop in properties.items()}
def _parse_parameters(params: list[dict], spec: dict) -> list[Parameter]:
result = []
for p in params:
if "$ref" in p:
p = _resolve_ref(spec, p["$ref"])
schema = p.get("schema", {})
result.append(
Parameter(
name=p["name"],
location=p["in"],
type=_get_type(schema, spec),
required=p.get("required", False),
description=p.get("description"),
)
)
return result
def _parse_request_body(body: dict | None, spec: dict) -> RequestBody | None:
if not body:
return None
if "$ref" in body:
body = _resolve_ref(spec, body["$ref"])
content = body.get("content", {})
for content_type, media in content.items():
schema = media.get("schema", {})
fields = _extract_fields(schema, spec)
return RequestBody(content_type=content_type, fields=fields)
return None
def _parse_responses(responses: dict, spec: dict) -> list[Response]:
result = []
for status_code, resp in responses.items():
if "$ref" in resp:
resp = _resolve_ref(spec, resp["$ref"])
content = resp.get("content", {})
if content:
for content_type, media in content.items():
schema = media.get("schema", {})
fields = _extract_fields(schema, spec)
result.append(
Response(
status_code=str(status_code),
description=resp.get("description"),
content_type=content_type,
fields=fields,
)
)
break
else:
result.append(
Response(
status_code=str(status_code),
description=resp.get("description"),
fields={},
)
)
return result
def parse_endpoint(spec: dict, path: str, method: str, operation: dict) -> EndpointInfo:
return EndpointInfo(
path=path,
method=method.upper(),
summary=operation.get("summary"),
description=operation.get("description"),
operation_id=operation.get("operationId"),
parameters=_parse_parameters(operation.get("parameters", []), spec),
request_body=_parse_request_body(operation.get("requestBody"), spec),
responses=_parse_responses(operation.get("responses", {}), spec),
)
def _normalize_path(p: str) -> str:
"""Strip trailing slashes for consistent comparison, but keep root '/'."""
return p.rstrip("/") or "/"
async def fetch_and_parse(spec_url: str, path: str | None = None, method: str | None = None) -> list[EndpointInfo]:
async with httpx.AsyncClient() as client:
resp = await client.get(str(spec_url), follow_redirects=True)
resp.raise_for_status()
spec = resp.json()
normalized_path = _normalize_path(path) if path else None
endpoints: list[EndpointInfo] = []
for ep_path, methods in spec.get("paths", {}).items():
if normalized_path and _normalize_path(ep_path) != normalized_path:
continue
for ep_method, operation in methods.items():
if ep_method in ("parameters", "summary", "description", "servers"):
continue
if method and ep_method.upper() != method.upper():
continue
endpoints.append(parse_endpoint(spec, ep_path, ep_method, operation))
return endpoints
|