refactor: simplify _get_ws_url_from_request

Consolidate nested helper functions and reduce complexity:
- Inline header extraction with first_header helper
- Use rpartition for cleaner host:port splitting
- Simplify control flow with loop over header candidates

Addresses REFACTORING.md item about simplifying _get_ws_url_from_request.
This commit is contained in:
GitHub Copilot
2026-01-24 10:25:24 +00:00
parent 8ab263f1a7
commit 0086cf76b0
+23 -24
View File
@@ -579,39 +579,38 @@ class LocalServer:
def _get_ws_url_from_request(self, request: web.Request, route_key: str) -> str: def _get_ws_url_from_request(self, request: web.Request, route_key: str) -> str:
"""Build WebSocket URL honoring reverse proxies and port mapping.""" """Build WebSocket URL honoring reverse proxies and port mapping."""
# Extract forwarded headers (take first value if comma-separated)
def first_header(name: str) -> str:
return request.headers.get(name, "").split(",")[0].strip().lower()
forwarded_proto = request.headers.get("X-Forwarded-Proto", "").split(",")[0].strip().lower() forwarded_proto = first_header("X-Forwarded-Proto")
forwarded_host = request.headers.get("X-Forwarded-Host", "").split(",")[0].strip() forwarded_host = first_header("X-Forwarded-Host")
forwarded_port = request.headers.get("X-Forwarded-Port", "").split(",")[0].strip() forwarded_port = first_header("X-Forwarded-Port")
def _pick_proto() -> str: # Determine WebSocket protocol
if forwarded_proto in ("https", "wss"): if forwarded_proto in ("https", "wss"):
return "wss" ws_proto = "wss"
if forwarded_proto in ("http", "ws"): elif forwarded_proto in ("http", "ws"):
return "ws" ws_proto = "ws"
return "wss" if request.secure else "ws" else:
ws_proto = "wss" if request.secure else "ws"
def _split_host_port(host: str) -> tuple[str, str]: # Determine host and port (priority: forwarded > Host header > server config)
if not host: ws_host, ws_port = "", ""
return "", "" for candidate in (forwarded_host, request.headers.get("Host", "")):
if ":" in host: if candidate:
return host.rsplit(":", 1) ws_host, _, ws_port = candidate.rpartition(":")
return host, "" if not ws_host: # No colon found, entire string is host
ws_host, ws_port = candidate, ""
ws_proto = _pick_proto() break
ws_host, ws_port = _split_host_port(forwarded_host)
if not ws_host:
host_header = request.headers.get("Host", "")
ws_host, ws_port = _split_host_port(host_header)
if not ws_host: if not ws_host:
ws_host = "localhost" if self.host == "0.0.0.0" else self.host ws_host = "localhost" if self.host == "0.0.0.0" else self.host
ws_port = str(self.port) ws_port = str(self.port)
if not ws_port and forwarded_port: ws_port = ws_port or forwarded_port
ws_port = forwarded_port
# Include port in URL only for non-standard ports
if ws_port and ws_port not in ("80", "443"): if ws_port and ws_port not in ("80", "443"):
return f"{ws_proto}://{ws_host}:{ws_port}/ws/{route_key}" return f"{ws_proto}://{ws_host}:{ws_port}/ws/{route_key}"
if not ws_port and self.port not in (80, 443): if not ws_port and self.port not in (80, 443):