File size: 4,841 Bytes
08040eb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
66aba9d
 
 
 
 
08040eb
 
 
 
 
 
66aba9d
08040eb
 
 
66aba9d
08040eb
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import httpx

from .schemas import (
    EndpointInfo,
    Parameter,
    RequestBody,
    Response,
)

JSON_TYPE_MAP = {
    "string": "string",
    "integer": "integer",
    "number": "number",
    "boolean": "boolean",
    "array": "array",
    "object": "object",
}


def _resolve_ref(spec: dict, ref: str) -> dict:
    """Resolve a $ref pointer like '#/components/schemas/MyModel'."""
    parts = ref.lstrip("#/").split("/")
    node = spec
    for part in parts:
        node = node[part]
    return node


def _get_type(schema: dict, spec: dict) -> str:
    if "$ref" in schema:
        schema = _resolve_ref(spec, schema["$ref"])
    if "type" in schema:
        return JSON_TYPE_MAP.get(schema["type"], schema["type"])
    if "anyOf" in schema:
        types = [_get_type(s, spec) for s in schema["anyOf"] if s.get("type") != "null"]
        return types[0] if len(types) == 1 else " | ".join(types)
    return "unknown"


def _extract_fields(schema: dict, spec: dict) -> dict[str, str]:
    """Extract field_name -> type from an object schema."""
    if "$ref" in schema:
        schema = _resolve_ref(spec, schema["$ref"])
    properties = schema.get("properties", {})
    return {name: _get_type(prop, spec) for name, prop in properties.items()}


def _parse_parameters(params: list[dict], spec: dict) -> list[Parameter]:
    result = []
    for p in params:
        if "$ref" in p:
            p = _resolve_ref(spec, p["$ref"])
        schema = p.get("schema", {})
        result.append(
            Parameter(
                name=p["name"],
                location=p["in"],
                type=_get_type(schema, spec),
                required=p.get("required", False),
                description=p.get("description"),
            )
        )
    return result


def _parse_request_body(body: dict | None, spec: dict) -> RequestBody | None:
    if not body:
        return None
    if "$ref" in body:
        body = _resolve_ref(spec, body["$ref"])
    content = body.get("content", {})
    for content_type, media in content.items():
        schema = media.get("schema", {})
        fields = _extract_fields(schema, spec)
        return RequestBody(content_type=content_type, fields=fields)
    return None


def _parse_responses(responses: dict, spec: dict) -> list[Response]:
    result = []
    for status_code, resp in responses.items():
        if "$ref" in resp:
            resp = _resolve_ref(spec, resp["$ref"])
        content = resp.get("content", {})
        if content:
            for content_type, media in content.items():
                schema = media.get("schema", {})
                fields = _extract_fields(schema, spec)
                result.append(
                    Response(
                        status_code=str(status_code),
                        description=resp.get("description"),
                        content_type=content_type,
                        fields=fields,
                    )
                )
                break
        else:
            result.append(
                Response(
                    status_code=str(status_code),
                    description=resp.get("description"),
                    fields={},
                )
            )
    return result


def parse_endpoint(spec: dict, path: str, method: str, operation: dict) -> EndpointInfo:
    return EndpointInfo(
        path=path,
        method=method.upper(),
        summary=operation.get("summary"),
        description=operation.get("description"),
        operation_id=operation.get("operationId"),
        parameters=_parse_parameters(operation.get("parameters", []), spec),
        request_body=_parse_request_body(operation.get("requestBody"), spec),
        responses=_parse_responses(operation.get("responses", {}), spec),
    )


def _normalize_path(p: str) -> str:
    """Strip trailing slashes for consistent comparison, but keep root '/'."""
    return p.rstrip("/") or "/"


async def fetch_and_parse(spec_url: str, path: str | None = None, method: str | None = None) -> list[EndpointInfo]:
    async with httpx.AsyncClient() as client:
        resp = await client.get(str(spec_url), follow_redirects=True)
        resp.raise_for_status()
        spec = resp.json()

    normalized_path = _normalize_path(path) if path else None
    endpoints: list[EndpointInfo] = []

    for ep_path, methods in spec.get("paths", {}).items():
        if normalized_path and _normalize_path(ep_path) != normalized_path:
            continue
        for ep_method, operation in methods.items():
            if ep_method in ("parameters", "summary", "description", "servers"):
                continue
            if method and ep_method.upper() != method.upper():
                continue
            endpoints.append(parse_endpoint(spec, ep_path, ep_method, operation))

    return endpoints