Coverage for node / src / stigmem_node / utility / net_util.py: 94%

21 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Outbound HTTP safety utilities — SSRF guard (H-SEC-1).""" 

2 

3from __future__ import annotations 

4 

5import ipaddress 

6import socket 

7from urllib.parse import urlparse 

8 

9# RFC 1918, loopback, link-local, and IPv6 equivalents. 

10# Cloud IMDS (169.254.169.254) is covered by 169.254.0.0/16. 

11_BLOCKED_NETS: tuple[ipaddress.IPv4Network | ipaddress.IPv6Network, ...] = ( 

12 ipaddress.ip_network("127.0.0.0/8"), 

13 ipaddress.ip_network("10.0.0.0/8"), 

14 ipaddress.ip_network("172.16.0.0/12"), 

15 ipaddress.ip_network("192.168.0.0/16"), 

16 ipaddress.ip_network("169.254.0.0/16"), 

17 ipaddress.ip_network("0.0.0.0/8"), 

18 ipaddress.ip_network("::1/128"), 

19 ipaddress.ip_network("fc00::/7"), 

20 ipaddress.ip_network("fe80::/10"), 

21) 

22 

23 

24def assert_safe_url( 

25 url: str, 

26 *, 

27 allow_schemes: frozenset[str] = frozenset({"https"}), 

28) -> None: 

29 """Raise ValueError if *url* is unsafe to fetch. 

30 

31 Checks: 

32 - scheme is in *allow_schemes* 

33 - hostname resolves (DNS failure → ValueError) 

34 - no resolved address falls in RFC 1918, loopback, or link-local ranges 

35 

36 Residual risk: DNS rebinding window between this check and the actual 

37 connection. Callers MUST also set follow_redirects=False so redirects 

38 cannot send the connection to a private address after validation. 

39 """ 

40 parsed = urlparse(url) 

41 if parsed.scheme not in allow_schemes: 

42 raise ValueError(f"Disallowed URL scheme: {parsed.scheme!r}") 

43 hostname = parsed.hostname or "" 

44 if not hostname: 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true

45 raise ValueError(f"URL has no hostname: {url!r}") 

46 try: 

47 infos = socket.getaddrinfo(hostname, None) 

48 except socket.gaierror as exc: 

49 raise ValueError(f"Cannot resolve hostname {hostname!r}: {exc}") from exc 

50 for info in infos: 

51 ip = ipaddress.ip_address(info[4][0]) 

52 for net in _BLOCKED_NETS: 

53 if ip in net: 

54 raise ValueError(f"Blocked private/loopback address for {hostname!r}: {ip}")