Coverage for node / src / stigmem_node / utility / net_util.py: 94%
21 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Outbound HTTP safety utilities — SSRF guard (H-SEC-1)."""
3from __future__ import annotations
5import ipaddress
6import socket
7from urllib.parse import urlparse
9# RFC 1918, loopback, link-local, and IPv6 equivalents.
10# Cloud IMDS (169.254.169.254) is covered by 169.254.0.0/16.
11_BLOCKED_NETS: tuple[ipaddress.IPv4Network | ipaddress.IPv6Network, ...] = (
12 ipaddress.ip_network("127.0.0.0/8"),
13 ipaddress.ip_network("10.0.0.0/8"),
14 ipaddress.ip_network("172.16.0.0/12"),
15 ipaddress.ip_network("192.168.0.0/16"),
16 ipaddress.ip_network("169.254.0.0/16"),
17 ipaddress.ip_network("0.0.0.0/8"),
18 ipaddress.ip_network("::1/128"),
19 ipaddress.ip_network("fc00::/7"),
20 ipaddress.ip_network("fe80::/10"),
21)
24def assert_safe_url(
25 url: str,
26 *,
27 allow_schemes: frozenset[str] = frozenset({"https"}),
28) -> None:
29 """Raise ValueError if *url* is unsafe to fetch.
31 Checks:
32 - scheme is in *allow_schemes*
33 - hostname resolves (DNS failure → ValueError)
34 - no resolved address falls in RFC 1918, loopback, or link-local ranges
36 Residual risk: DNS rebinding window between this check and the actual
37 connection. Callers MUST also set follow_redirects=False so redirects
38 cannot send the connection to a private address after validation.
39 """
40 parsed = urlparse(url)
41 if parsed.scheme not in allow_schemes:
42 raise ValueError(f"Disallowed URL scheme: {parsed.scheme!r}")
43 hostname = parsed.hostname or ""
44 if not hostname: 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true
45 raise ValueError(f"URL has no hostname: {url!r}")
46 try:
47 infos = socket.getaddrinfo(hostname, None)
48 except socket.gaierror as exc:
49 raise ValueError(f"Cannot resolve hostname {hostname!r}: {exc}") from exc
50 for info in infos:
51 ip = ipaddress.ip_address(info[4][0])
52 for net in _BLOCKED_NETS:
53 if ip in net:
54 raise ValueError(f"Blocked private/loopback address for {hostname!r}: {ip}")