Files
TrisolarisServer/scripts/generate_api_docs.py
androidlover5842 35680287d4
All checks were successful
build-and-deploy / build-deploy (push) Successful in 17s
Generate behavior-first API catalog from controllers
2026-02-04 12:21:48 +05:30

889 lines
30 KiB
Python

#!/usr/bin/env python3
"""Generate API docs from Spring controller source."""
from __future__ import annotations
import glob
import os
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, Iterable, List, Sequence, Set, Tuple
ROOT = Path(__file__).resolve().parents[1]
CONTROLLER_ROOT = ROOT / "src/main/kotlin/com/android/trisolarisserver/controller"
REFERENCE_OUTPUT = ROOT / "docs/API_REFERENCE.md"
CATALOG_OUTPUT = ROOT / "docs/API_CATALOG.md"
HTTP_BY_ANNOTATION = {
"GetMapping": "GET",
"PostMapping": "POST",
"PutMapping": "PUT",
"DeleteMapping": "DELETE",
"PatchMapping": "PATCH",
}
HTTP_STATUS_CODE = {
"CONTINUE": "100",
"SWITCHING_PROTOCOLS": "101",
"OK": "200",
"CREATED": "201",
"ACCEPTED": "202",
"NO_CONTENT": "204",
"MOVED_PERMANENTLY": "301",
"FOUND": "302",
"BAD_REQUEST": "400",
"UNAUTHORIZED": "401",
"FORBIDDEN": "403",
"NOT_FOUND": "404",
"METHOD_NOT_ALLOWED": "405",
"CONFLICT": "409",
"UNPROCESSABLE_ENTITY": "422",
"TOO_MANY_REQUESTS": "429",
"INTERNAL_SERVER_ERROR": "500",
"BAD_GATEWAY": "502",
"SERVICE_UNAVAILABLE": "503",
}
KNOWN_HELPER_ERRORS: Dict[str, List[Tuple[str, str]]] = {
"requirePrincipal": [("UNAUTHORIZED", "Missing principal")],
"requireUser": [("UNAUTHORIZED", "User not found")],
"requireMember": [
("UNAUTHORIZED", "Missing principal"),
("FORBIDDEN", "Property membership required"),
],
"requireRole": [
("UNAUTHORIZED", "Missing principal"),
("FORBIDDEN", "Required property role not granted"),
],
"requireSuperAdmin": [
("UNAUTHORIZED", "User not found"),
("FORBIDDEN", "Super admin only"),
],
"requireProperty": [("NOT_FOUND", "Property not found")],
"requirePropertyGuest": [
("NOT_FOUND", "Property or guest not found"),
("BAD_REQUEST", "Guest not in property"),
],
"requireRoomStayForProperty": [("NOT_FOUND", "Room stay not found for property")],
"requireOpenRoomStayForProperty": [
("NOT_FOUND", "Room stay not found for property"),
("CONFLICT", "Room stay is already closed"),
],
"parseOffset": [("BAD_REQUEST", "Invalid timestamp")],
"parseDate": [("BAD_REQUEST", "Invalid date format")],
}
KNOWN_SIDE_EFFECTS = (
("bookingEvents.emit", "Emits booking SSE updates."),
("roomBoardEvents.emit", "Emits room board SSE updates."),
("roomBoardEvents.emitRoom", "Emits room board SSE updates."),
("subscribe(", "Streams SSE events."),
("logStayAudit(", "Writes room-stay audit log."),
("roomStayAuditLogRepo.save", "Writes room-stay audit log."),
("guestDocumentRepo.save", "Stores/updates guest document metadata."),
("guestDocumentRepo.delete", "Deletes guest document metadata."),
("storageService.store", "Stores file payload on configured storage."),
("storageService.delete", "Deletes file payload from configured storage."),
)
KEYWORDS = {
"if",
"for",
"while",
"when",
"return",
"throw",
"catch",
"try",
"else",
"do",
"super",
"this",
"listOf",
"mapOf",
"setOf",
"mutableListOf",
"mutableMapOf",
"mutableSetOf",
"arrayOf",
"require",
"check",
"println",
}
@dataclass
class DtoField:
name: str
type_name: str
optional: bool
@dataclass
class FunctionInfo:
name: str
line: int
annotations: List[str]
param_blob: str
response_type: str
body: str
calls: Set[str]
errors: List[Tuple[str, str]]
roles: Set[str]
has_principal: bool
@dataclass
class Endpoint:
method: str
path: str
path_params: List[str]
query_params: List[str]
body_type: str
response_type: str
status: str
behavior: str
handler_file: str
handler_name: str
handler_line: int
auth: str = "-"
validation_notes: List[str] = field(default_factory=list)
common_errors: List[Tuple[str, List[str]]] = field(default_factory=list)
side_effects: List[str] = field(default_factory=list)
body_shape: str = "-"
def split_params(param_blob: str) -> List[str]:
parts: List[str] = []
buf: List[str] = []
angle = 0
paren = 0
bracket = 0
brace = 0
for ch in param_blob:
if ch == "<":
angle += 1
elif ch == ">":
angle = max(0, angle - 1)
elif ch == "(":
paren += 1
elif ch == ")":
paren = max(0, paren - 1)
elif ch == "[":
bracket += 1
elif ch == "]":
bracket = max(0, bracket - 1)
elif ch == "{":
brace += 1
elif ch == "}":
brace = max(0, brace - 1)
if ch == "," and angle == 0 and paren == 0 and bracket == 0 and brace == 0:
part = "".join(buf).strip()
if part:
parts.append(part)
buf = []
continue
buf.append(ch)
tail = "".join(buf).strip()
if tail:
parts.append(tail)
return parts
def normalize_space(value: str) -> str:
return " ".join(value.split())
def http_code(value: str) -> str:
if value.isdigit():
return value
return HTTP_STATUS_CODE.get(value, value)
def method_from_annotations(annotations: Sequence[str]) -> List[str]:
for ann in annotations:
m = re.match(r"@(\w+)", ann.strip())
if not m:
continue
name = m.group(1)
if name in HTTP_BY_ANNOTATION:
return [HTTP_BY_ANNOTATION[name]]
if name == "RequestMapping":
methods = re.findall(r"RequestMethod\.(GET|POST|PUT|DELETE|PATCH)", ann)
return methods or ["ANY"]
return []
def mapping_path(annotations: Sequence[str]) -> str:
for ann in annotations:
if "Mapping" not in ann:
continue
m = re.search(r'"([^"]*)"', ann)
if m:
return m.group(1)
return ""
def extract_types_from_params(param_blob: str) -> Tuple[List[str], List[str], str, bool]:
path_params: List[str] = []
query_params: List[str] = []
body_type = "-"
has_principal = False
for raw in split_params(param_blob):
segment = normalize_space(raw)
name_match = re.search(r"(\w+)\s*:", segment)
param_name = name_match.group(1) if name_match else "param"
type_match = re.search(r":\s*([^=]+)", segment)
param_type = type_match.group(1).strip() if type_match else "Unknown"
if "@AuthenticationPrincipal" in segment:
has_principal = True
if "@PathVariable" in segment:
path_params.append(f"{param_name}:{param_type}")
elif "@RequestParam" in segment:
required = "optional" if "required = false" in segment else "required"
query_params.append(f"{param_name}:{param_type} ({required})")
elif "@RequestBody" in segment:
body_type = param_type
return path_params, query_params, body_type, has_principal
def explicit_status_from_annotations(annotations: Sequence[str]) -> str | None:
for ann in annotations:
if not ann.strip().startswith("@ResponseStatus"):
continue
if "CREATED" in ann:
return "201"
if "NO_CONTENT" in ann:
return "204"
m = re.search(r"HttpStatus\.([A-Z_]+)", ann)
if m:
return http_code(m.group(1))
return None
def default_status(method: str, response_type: str, explicit: str | None) -> str:
if explicit:
return explicit
if method == "DELETE" and response_type == "Unit":
return "204"
return "200"
def behavior_from_name(name: str) -> str:
words = re.sub(r"([a-z0-9])([A-Z])", r"\1 \2", name).lower()
if name.startswith("list"):
return f"List resources ({words})."
if name.startswith("create"):
return f"Create resource ({words})."
if name.startswith("update"):
return f"Update resource ({words})."
if name.startswith("delete"):
return f"Delete resource ({words})."
if name.startswith("get"):
return f"Get resource ({words})."
if name.startswith("stream"):
return f"Stream events/data ({words})."
if name.startswith("checkOut"):
return f"Check out flow ({words})."
if name.startswith("checkIn"):
return f"Check in flow ({words})."
if name.startswith("cancel"):
return f"Cancel flow ({words})."
if name.startswith("noShow"):
return f"No-show flow ({words})."
return f"{words.capitalize()}."
def join_paths(base: str, rel: str) -> str:
if not base and not rel:
return "/"
parts = [p.strip("/") for p in (base, rel) if p]
return "/" + "/".join(parts)
def parse_return_type(tail: str) -> str:
m = re.search(r":\s*([^{=]+)", tail)
if not m:
return "Unit"
return m.group(1).strip()
def find_matching_paren(text: str, open_index: int) -> int:
depth = 0
for idx in range(open_index, len(text)):
ch = text[idx]
if ch == "(":
depth += 1
elif ch == ")":
depth -= 1
if depth == 0:
return idx
return -1
def extract_parenthesized(text: str, open_index: int) -> str:
close_index = find_matching_paren(text, open_index)
if close_index == -1:
return ""
return text[open_index + 1 : close_index]
def strip_leading_annotations(segment: str) -> str:
value = segment.strip()
while value.startswith("@"):
if "(" in value and (value.find("(") < value.find(" ") if " " in value else True):
open_index = value.find("(")
close_index = find_matching_paren(value, open_index)
if close_index == -1:
break
value = value[close_index + 1 :].strip()
else:
value = re.sub(r"^@\w+\s*", "", value).strip()
return value
def extract_calls(body: str) -> Set[str]:
calls = set()
for name in re.findall(r"\b([A-Za-z_][A-Za-z0-9_]*)\s*\(", body):
if name in KEYWORDS:
continue
calls.add(name)
return calls
def extract_errors(body: str) -> List[Tuple[str, str]]:
errors: List[Tuple[str, str]] = []
pattern = re.compile(
r"ResponseStatusException\(\s*HttpStatus\.([A-Z_]+)\s*,\s*\"([^\"]+)\"",
re.DOTALL,
)
for status, message in pattern.findall(body):
errors.append((status, normalize_space(message)))
return errors
def extract_roles(body: str) -> Set[str]:
return set(re.findall(r"Role\.([A-Z_]+)", body))
def parse_function_signature(lines: List[str], start_line: int) -> Tuple[str, str, int, int, str]:
current_line = lines[start_line]
open_col = current_line.find("(")
if open_col == -1:
return "", "Unit", start_line, len(current_line), ""
depth = 1
param_chars: List[str] = []
line_idx = start_line
close_col = open_col
while line_idx < len(lines):
line = lines[line_idx]
start = open_col + 1 if line_idx == start_line else 0
col = start
while col < len(line):
ch = line[col]
if depth > 0:
if ch == "(":
depth += 1
param_chars.append(ch)
elif ch == ")":
depth -= 1
if depth == 0:
close_col = col
break
param_chars.append(ch)
else:
param_chars.append(ch)
col += 1
if depth == 0:
break
line_idx += 1
tail_parts: List[str] = []
if line_idx < len(lines):
tail_parts.append(lines[line_idx][close_col + 1 :])
look = line_idx + 1
while look < len(lines):
trimmed = lines[look].strip()
if not trimmed:
tail_parts.append(" ")
look += 1
continue
if trimmed.startswith("@"):
break
if trimmed.startswith("fun ") or trimmed.startswith("private fun ") or trimmed.startswith("internal fun "):
break
tail_parts.append(" " + trimmed)
if "{" in trimmed or "=" in trimmed:
break
look += 1
tail = "".join(tail_parts).strip()
return "".join(param_chars).strip(), parse_return_type(tail), line_idx, close_col, tail
def parse_function_body(lines: List[str], signature_end_line: int, signature_end_col: int) -> Tuple[str, int]:
line_idx = signature_end_line
col = signature_end_col + 1
marker_line = -1
marker_col = -1
marker = ""
while line_idx < len(lines):
line = lines[line_idx]
start = col if line_idx == signature_end_line else 0
for idx in range(start, len(line)):
ch = line[idx]
if ch == "{":
marker_line = line_idx
marker_col = idx
marker = "{"
break
if ch == "=":
marker_line = line_idx
marker_col = idx
marker = "="
break
if marker:
break
line_idx += 1
if not marker:
return "", signature_end_line
if marker == "=":
expression = lines[marker_line][marker_col + 1 :].strip()
return expression, marker_line
body_parts: List[str] = []
depth = 1
for li in range(marker_line, len(lines)):
line = lines[li]
start = marker_col + 1 if li == marker_line else 0
segment_start = start
for cj in range(start, len(line)):
ch = line[cj]
if ch == "{":
if depth >= 1 and segment_start <= cj:
body_parts.append(line[segment_start:cj])
depth += 1
segment_start = cj + 1
elif ch == "}":
if depth >= 1 and segment_start <= cj:
body_parts.append(line[segment_start:cj])
depth -= 1
if depth == 0:
return "".join(body_parts), li
segment_start = cj + 1
if depth >= 1:
body_parts.append(line[segment_start:])
body_parts.append("\n")
return "".join(body_parts), len(lines) - 1
def parse_data_classes() -> Dict[str, List[DtoField]]:
dto_map: Dict[str, List[DtoField]] = {}
for file_name in sorted(glob.glob(str(CONTROLLER_ROOT / "**/*.kt"), recursive=True)):
text = Path(file_name).read_text(encoding="utf-8")
for match in re.finditer(r"\bdata class\s+(\w+)\s*\(", text):
class_name = match.group(1)
open_index = text.find("(", match.start())
if open_index == -1:
continue
blob = extract_parenthesized(text, open_index)
fields: List[DtoField] = []
for raw in split_params(blob):
cleaned = strip_leading_annotations(raw)
if not cleaned:
continue
field_match = re.search(r"(?:val|var)\s+(\w+)\s*:\s*([^=]+?)(?:\s*=\s*.+)?$", cleaned)
if not field_match:
continue
field_name = field_match.group(1)
type_name = field_match.group(2).strip()
optional = "?" in type_name or "=" in cleaned
fields.append(DtoField(field_name, type_name, optional))
if fields:
dto_map[class_name] = fields
return dto_map
def parse_controller_file(file_path: Path) -> Tuple[List[Endpoint], Dict[str, FunctionInfo]]:
text = file_path.read_text(encoding="utf-8")
if "@RestController" not in text and "@Controller" not in text:
return [], {}
lines = text.splitlines()
endpoints: List[Endpoint] = []
functions: Dict[str, FunctionInfo] = {}
pending_annotations: List[str] = []
current_base_path = ""
i = 0
while i < len(lines):
stripped = lines[i].strip()
if stripped.startswith("@"):
pending_annotations.append(stripped)
i += 1
continue
class_match = re.search(r"\bclass\b", stripped)
if class_match:
mapped_base = ""
for ann in pending_annotations:
if ann.startswith("@RequestMapping"):
mapped_base = mapping_path([ann])
break
if mapped_base:
current_base_path = mapped_base
pending_annotations = []
i += 1
continue
fun_match = re.search(r"\bfun\s+(\w+)\s*\(", stripped)
if fun_match:
name = fun_match.group(1)
function_annotations = pending_annotations[:]
pending_annotations = []
param_blob, return_type, sig_end_line, sig_end_col, _ = parse_function_signature(lines, i)
body, body_end_line = parse_function_body(lines, sig_end_line, sig_end_col)
path_params, query_params, body_type, has_principal = extract_types_from_params(param_blob)
direct_errors = extract_errors(body)
roles = extract_roles(body)
calls = extract_calls(body)
functions[name] = FunctionInfo(
name=name,
line=i + 1,
annotations=function_annotations,
param_blob=param_blob,
response_type=return_type,
body=body,
calls=calls,
errors=direct_errors,
roles=roles,
has_principal=has_principal,
)
method_names = method_from_annotations(function_annotations)
if method_names:
rel_path = mapping_path(function_annotations)
full_path = join_paths(current_base_path, rel_path)
explicit_status = explicit_status_from_annotations(function_annotations)
behavior = behavior_from_name(name)
rel_file = os.path.relpath(file_path, ROOT)
for method in method_names:
endpoints.append(
Endpoint(
method=method,
path=full_path,
path_params=path_params,
query_params=query_params,
body_type=body_type,
response_type=return_type,
status=default_status(method, return_type, explicit_status),
behavior=behavior,
handler_file=rel_file,
handler_name=name,
handler_line=i + 1,
)
)
i = body_end_line + 1
continue
if stripped and not stripped.startswith("//"):
pending_annotations = []
i += 1
return endpoints, functions
def collect_function_metadata(
function_name: str,
functions: Dict[str, FunctionInfo],
visiting: Set[str] | None = None,
) -> Tuple[List[Tuple[str, str]], Set[str], Set[str], Set[str]]:
if visiting is None:
visiting = set()
if function_name in visiting:
return [], set(), set(), set()
func = functions.get(function_name)
if not func:
return [], set(), set(), set()
visiting.add(function_name)
errors = list(func.errors)
roles = set(func.roles)
calls = set(func.calls)
auth_hints: Set[str] = set()
if "requireSuperAdmin" in calls:
auth_hints.add("SUPER_ADMIN")
if "requireMember" in calls:
auth_hints.add("PROPERTY_MEMBER")
if "requireRole" in calls and roles:
auth_hints.add("ROLE_BASED")
if func.has_principal:
auth_hints.add("AUTHENTICATED")
for call in list(calls):
if call in KNOWN_HELPER_ERRORS:
errors.extend(KNOWN_HELPER_ERRORS[call])
if call in functions and call != function_name:
child_errors, child_roles, child_calls, child_auth_hints = collect_function_metadata(
call, functions, visiting
)
errors.extend(child_errors)
roles.update(child_roles)
calls.update(child_calls)
auth_hints.update(child_auth_hints)
return errors, roles, calls, auth_hints
def unique_messages(values: Iterable[str]) -> List[str]:
seen = set()
out: List[str] = []
for value in values:
key = value.strip()
if not key or key in seen:
continue
seen.add(key)
out.append(key)
return out
def resolve_auth_label(
roles: Set[str],
calls: Set[str],
auth_hints: Set[str],
has_principal: bool,
) -> str:
if "SUPER_ADMIN" in auth_hints:
return "SUPER_ADMIN"
if roles:
return "Roles: " + ", ".join(sorted(roles))
if "PROPERTY_MEMBER" in auth_hints:
return "Any property member"
if has_principal or "AUTHENTICATED" in auth_hints:
return "Authenticated user (Firebase)"
return "Public/unspecified"
def summarize_errors(errors: Sequence[Tuple[str, str]]) -> List[Tuple[str, List[str]]]:
grouped: Dict[str, List[str]] = {}
for status, message in errors:
code = http_code(status)
grouped.setdefault(code, []).append(message)
ordering = ["401", "403", "404", "400", "409", "422", "429", "500"]
ordered_codes = sorted(grouped.keys(), key=lambda code: (ordering.index(code) if code in ordering else 999, code))
summary: List[Tuple[str, List[str]]] = []
for code in ordered_codes:
messages = unique_messages(grouped[code])[:4]
summary.append((code, messages))
return summary
def resolve_body_shape(body_type: str, dto_fields: Dict[str, List[DtoField]]) -> str:
if body_type == "-":
return "-"
clean = body_type.strip().replace("?", "")
candidates = [clean]
if "." in clean:
candidates.append(clean.split(".")[-1])
if "<" in clean and ">" in clean:
inner = clean[clean.find("<") + 1 : clean.rfind(">")].strip()
candidates.append(inner)
if "." in inner:
candidates.append(inner.split(".")[-1])
for candidate in candidates:
fields = dto_fields.get(candidate)
if not fields:
continue
chunks = []
for field in fields:
label = f"{field.name}:{field.type_name}"
if field.optional:
label += " (optional)"
chunks.append(label)
joined = ", ".join(chunks)
return f"{candidate} {{ {joined} }}"
return body_type
def resolve_side_effects(body: str, response_type: str) -> List[str]:
effects = []
if "SseEmitter" in response_type:
effects.append("Streams SSE events.")
for pattern, description in KNOWN_SIDE_EFFECTS:
if pattern in body:
effects.append(description)
return unique_messages(effects)
def enrich_endpoints(
endpoints: List[Endpoint],
file_function_map: Dict[str, Dict[str, FunctionInfo]],
dto_fields: Dict[str, List[DtoField]],
) -> List[Endpoint]:
enriched: List[Endpoint] = []
for endpoint in endpoints:
functions = file_function_map.get(endpoint.handler_file, {})
function_info = functions.get(endpoint.handler_name)
if function_info is None:
enriched.append(endpoint)
continue
errors, roles, calls, auth_hints = collect_function_metadata(endpoint.handler_name, functions)
error_summary = summarize_errors(errors)
validations = []
for code, messages in error_summary:
if code not in {"400", "409", "422"}:
continue
for message in messages:
validations.append(f"{code}: {message}")
endpoint.auth = resolve_auth_label(roles, calls, auth_hints, function_info.has_principal)
endpoint.common_errors = error_summary
endpoint.validation_notes = validations
endpoint.body_shape = resolve_body_shape(endpoint.body_type, dto_fields)
endpoint.side_effects = resolve_side_effects(function_info.body, endpoint.response_type)
enriched.append(endpoint)
return enriched
def write_reference(endpoints: Sequence[Endpoint]) -> None:
lines = [
"# API Reference",
"",
"Generated from controller source.",
"",
f"- Total endpoints: **{len(endpoints)}**",
"- Auth: Firebase Bearer token unless endpoint is public.",
"- Regenerate: `python scripts/generate_api_docs.py`",
"",
"## Usage Template",
"",
"```bash",
"curl -X <METHOD> \"https://api.hoteltrisolaris.in<PATH>\" \\",
" -H \"Authorization: Bearer <FIREBASE_ID_TOKEN>\" \\",
" -H \"Content-Type: application/json\" \\",
" -d '<REQUEST_BODY_JSON>'",
"```",
"",
"| Method | Path | Path Params | Query Params | Body Type | Response Type | Status | Auth | Common Errors | Behavior | Handler |",
"|---|---|---|---|---|---|---|---|---|---|---|",
]
for endpoint in endpoints:
path_params = ", ".join(endpoint.path_params) if endpoint.path_params else "-"
query_params = ", ".join(endpoint.query_params) if endpoint.query_params else "-"
handler = f"`{endpoint.handler_file}:{endpoint.handler_line}` (`{endpoint.handler_name}`)"
if endpoint.common_errors:
parts = []
for code, messages in endpoint.common_errors:
if messages:
parts.append(f"{code} ({'; '.join(messages[:2])})")
else:
parts.append(code)
errors_text = ", ".join(parts)
else:
errors_text = "-"
lines.append(
f"| `{endpoint.method}` | `{endpoint.path}` | `{path_params}` | `{query_params}` | `{endpoint.body_type}` | `{endpoint.response_type}` | `{endpoint.status}` | {endpoint.auth} | {errors_text} | {endpoint.behavior} | {handler} |"
)
REFERENCE_OUTPUT.parent.mkdir(parents=True, exist_ok=True)
REFERENCE_OUTPUT.write_text("\n".join(lines) + "\n", encoding="utf-8")
def write_catalog(endpoints: Sequence[Endpoint]) -> None:
grouped: Dict[str, List[Endpoint]] = {}
for endpoint in endpoints:
grouped.setdefault(endpoint.handler_file, []).append(endpoint)
lines = [
"# API Catalog",
"",
"Behavior-first catalog generated from controller source.",
"",
f"- Total endpoints: **{len(endpoints)}**",
"- Notes: validations/errors are extracted from explicit `ResponseStatusException` checks and shared helper guards.",
"- Regenerate: `python scripts/generate_api_docs.py`",
"",
]
for handler_file in sorted(grouped.keys()):
entries = sorted(grouped[handler_file], key=lambda e: (e.path, e.method, e.handler_name))
lines.append(f"## `{handler_file}`")
lines.append("")
for endpoint in entries:
lines.append(f"### `{endpoint.method} {endpoint.path}`")
lines.append("")
lines.append(f"- Handler: `{endpoint.handler_name}` (`{endpoint.handler_file}:{endpoint.handler_line}`)")
lines.append(f"- Behavior: {endpoint.behavior}")
if endpoint.path_params:
lines.append(f"- Path params: {', '.join(endpoint.path_params)}")
if endpoint.query_params:
lines.append(f"- Query params: {', '.join(endpoint.query_params)}")
if endpoint.body_type == "-":
lines.append("- Body: none")
else:
lines.append(f"- Body: {endpoint.body_shape}")
if endpoint.side_effects:
lines.append(f"- Side effects: {' '.join(endpoint.side_effects)}")
lines.append(f"- Auth: {endpoint.auth}")
lines.append(f"- Response: `{endpoint.status}` `{endpoint.response_type}`")
if endpoint.validation_notes:
lines.append("- Validation/guard checks:")
for note in endpoint.validation_notes:
lines.append(f" - {note}")
if endpoint.common_errors:
summary = []
for code, messages in endpoint.common_errors:
if messages:
summary.append(f"{code} ({'; '.join(messages[:2])})")
else:
summary.append(code)
lines.append(f"- Common errors: {', '.join(summary)}")
else:
lines.append("- Common errors: none observed in controller checks")
lines.append("")
CATALOG_OUTPUT.parent.mkdir(parents=True, exist_ok=True)
CATALOG_OUTPUT.write_text("\n".join(lines).rstrip() + "\n", encoding="utf-8")
def main() -> None:
dto_fields = parse_data_classes()
all_endpoints: List[Endpoint] = []
file_function_map: Dict[str, Dict[str, FunctionInfo]] = {}
for file_name in sorted(glob.glob(str(CONTROLLER_ROOT / "**/*.kt"), recursive=True)):
file_path = Path(file_name)
endpoints, functions = parse_controller_file(file_path)
rel_file = os.path.relpath(file_path, ROOT)
if endpoints:
all_endpoints.extend(endpoints)
if functions:
file_function_map[rel_file] = functions
uniq: Dict[Tuple[str, str, str, str], Endpoint] = {}
for endpoint in all_endpoints:
key = (endpoint.method, endpoint.path, endpoint.handler_file, endpoint.handler_name)
uniq[key] = endpoint
ordered = sorted(
enrich_endpoints(list(uniq.values()), file_function_map, dto_fields),
key=lambda e: (e.path, e.method, e.handler_file, e.handler_name),
)
write_reference(ordered)
write_catalog(ordered)
print(
f"Wrote {REFERENCE_OUTPUT} and {CATALOG_OUTPUT} ({len(ordered)} endpoints)"
)
if __name__ == "__main__":
main()