Coverage for node / src / stigmem_node / cli / federation.py: 98%

74 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Federation CLI handlers.""" 

2 

3from __future__ import annotations 

4 

5import argparse 

6import sys 

7 

8 

9def _cmd_federation_register_peer(args: argparse.Namespace) -> int: 

10 """Register this node as a peer with a remote node (Spec-05-Federation-Trust).""" 

11 import base64 

12 import json 

13 import ssl 

14 from datetime import UTC, datetime 

15 

16 import httpx 

17 

18 from ..db import apply_migrations 

19 from ..settings import settings 

20 

21 # Ensure migrations are applied so keypair tables exist. 

22 apply_migrations() 

23 

24 # Resolve local node URL: explicit flag > settings. 

25 local_url = (args.local_url or settings.node_url).rstrip("/") 

26 remote_url = args.remote_url.rstrip("/") 

27 allowed_scopes: list[str] = [s.strip() for s in args.scopes.split(",") if s.strip()] 

28 cert = (args.tls_cert, args.tls_key) if args.tls_cert and args.tls_key else None 

29 verify: ssl.SSLContext | str | bool | None = None 

30 if cert is not None: 

31 ssl_ctx = ssl.create_default_context(cafile=args.ca_bundle or None) 

32 ssl_ctx.load_cert_chain(*cert) 

33 verify = ssl_ctx 

34 elif args.ca_bundle: 34 ↛ 35line 34 didn't jump to line 35 because the condition on line 34 was never true

35 verify = args.ca_bundle 

36 

37 # ------------------------------------------------------------------ 

38 # 1. Fetch local /.well-known/stigmem to get our published metadata. 

39 # ------------------------------------------------------------------ 

40 try: 

41 if verify is not None: 

42 with httpx.Client(timeout=15.0, trust_env=False, verify=verify) as client: 

43 wk = client.get(f"{local_url}/.well-known/stigmem") 

44 else: 

45 wk = httpx.get(f"{local_url}/.well-known/stigmem", timeout=10.0) 

46 wk.raise_for_status() 

47 except Exception as exc: 

48 print(f"error: cannot reach local node at {local_url}: {exc}", file=sys.stderr) 

49 return 1 

50 

51 wk_data = wk.json() 

52 local_node_id: str = wk_data["node_id"] 

53 local_pubkey: str = wk_data.get("federation_pubkey", "") 

54 if not local_pubkey: 

55 print( 

56 "error: local node has no federation_pubkey in /.well-known/stigmem — " 

57 "set STIGMEM_FEDERATION_ENABLED=true and restart", 

58 file=sys.stderr, 

59 ) 

60 return 1 

61 

62 # ------------------------------------------------------------------ 

63 # 2. Load local private key and sign the PeerDeclaration. 

64 # ------------------------------------------------------------------ 

65 from ..federation.peer_token import init_federation_keys 

66 

67 _, priv_b64 = init_federation_keys() 

68 

69 def _pad(s: str) -> str: 

70 return s + "=" * (-len(s) % 4) 

71 

72 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey 

73 

74 priv_key = Ed25519PrivateKey.from_private_bytes(base64.urlsafe_b64decode(_pad(priv_b64))) 

75 

76 signed_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") 

77 signed_fields: dict[str, object] = { 

78 "allowed_scopes": sorted(allowed_scopes), 

79 "federation_pubkey": local_pubkey, 

80 "node_id": local_node_id, 

81 "node_url": local_url, 

82 "signed_at": signed_at, 

83 } 

84 canonical = json.dumps(signed_fields, sort_keys=True, separators=(",", ":")).encode("utf-8") 

85 sig_bytes = priv_key.sign(canonical) 

86 declaration_sig = base64.urlsafe_b64encode(sig_bytes).decode().rstrip("=") 

87 

88 # ------------------------------------------------------------------ 

89 # 3. POST to the remote node. 

90 # ------------------------------------------------------------------ 

91 payload = { 

92 "node_id": local_node_id, 

93 "node_url": local_url, 

94 "federation_pubkey": local_pubkey, 

95 "allowed_scopes": sorted(allowed_scopes), 

96 "signed_at": signed_at, 

97 "declaration_sig": declaration_sig, 

98 } 

99 

100 headers = {"Content-Type": "application/json"} 

101 if args.api_key: 

102 headers["Authorization"] = f"Bearer {args.api_key}" 

103 

104 try: 

105 if verify is not None: 

106 with httpx.Client(timeout=15.0, trust_env=False, verify=verify) as client: 

107 resp = client.post( 

108 f"{remote_url}/v1/federation/peers", 

109 json=payload, 

110 headers=headers, 

111 ) 

112 else: 

113 resp = httpx.post( 

114 f"{remote_url}/v1/federation/peers", 

115 json=payload, 

116 headers=headers, 

117 timeout=15.0, 

118 ) 

119 except Exception as exc: 

120 print(f"error: cannot reach remote node at {remote_url}: {exc}", file=sys.stderr) 

121 return 1 

122 

123 if resp.status_code in (200, 201): 

124 result = resp.json() 

125 peer_status = result.get("status", "unknown") 

126 peer_id = result.get("peer_id", "") 

127 if peer_status == "active": 

128 print(f"peer registered and verified (peer_id={peer_id})") 

129 else: 

130 print( 

131 f"peer registered but not yet active (status={peer_status}, peer_id={peer_id})\n" 

132 "Check that the remote node can reach this node's /.well-known/stigmem endpoint.", 

133 file=sys.stderr, 

134 ) 

135 return 1 

136 elif resp.status_code == 409: 

137 print("peer already registered — nothing to do") 

138 else: 

139 print( 

140 f"error: remote node returned {resp.status_code}: {resp.text}", 

141 file=sys.stderr, 

142 ) 

143 return 1 

144 

145 return 0