Coverage for node / src / stigmem_node / cli / capability.py: 98%

91 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Capability-token CLI handlers.""" 

2 

3from __future__ import annotations 

4 

5import argparse 

6import sys 

7 

8 

9def _cmd_capability_issue(args: argparse.Namespace) -> int: 

10 """Issue a capability token via the local node HTTP API.""" 

11 import json 

12 

13 import httpx 

14 

15 payload = { 

16 "issuer": args.issuer, 

17 "subject": args.subject, 

18 "verb": args.verb, 

19 "object": args.object, 

20 } 

21 if args.ttl_seconds is not None: 

22 payload["ttl_seconds"] = args.ttl_seconds 

23 

24 headers: dict[str, str] = {"Content-Type": "application/json"} 

25 if args.api_key: 

26 headers["Authorization"] = f"Bearer {args.api_key}" 

27 

28 try: 

29 resp = httpx.post( 

30 f"{args.node_url.rstrip('/')}/v1/federation/capability-tokens", 

31 json=payload, 

32 headers=headers, 

33 timeout=15.0, 

34 ) 

35 except Exception as exc: 

36 print(f"error: cannot reach node at {args.node_url}: {exc}", file=sys.stderr) 

37 return 1 

38 

39 if resp.status_code == 201: 

40 data = resp.json() 

41 if args.json: 

42 print(json.dumps(data, indent=2)) 

43 else: 

44 print(f"token_id: {data['token_id']}") 

45 print(f"issuer: {data['issuer']}") 

46 print(f"subject: {data['subject']}") 

47 print(f"verb: {data['verb']}") 

48 print(f"object: {data['object']}") 

49 print(f"expiry: {data['expiry']}") 

50 print(f"token_json: {data['token_json']}") 

51 return 0 

52 

53 print(f"error: {resp.status_code}: {resp.text}", file=sys.stderr) 

54 return 1 

55 

56 

57def _cmd_capability_verify(args: argparse.Namespace) -> int: 

58 """Verify a capability token via the local node HTTP API.""" 

59 import json 

60 

61 import httpx 

62 

63 token_json_str = args.token_json 

64 if token_json_str == "-": # nosec B105 — "-" is stdin sentinel, not a password 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true

65 token_json_str = sys.stdin.read().strip() 

66 

67 headers: dict[str, str] = {} 

68 if args.api_key: 

69 headers["Authorization"] = f"Bearer {args.api_key}" 

70 

71 try: 

72 resp = httpx.post( 

73 f"{args.node_url.rstrip('/')}/v1/federation/capability-tokens/verify", 

74 json={"token_json": token_json_str}, 

75 headers=headers, 

76 timeout=15.0, 

77 ) 

78 except Exception as exc: 

79 print(f"error: cannot reach node at {args.node_url}: {exc}", file=sys.stderr) 

80 return 1 

81 

82 if resp.status_code == 200: 

83 data = resp.json() 

84 if args.json: 

85 print(json.dumps(data, indent=2)) 

86 else: 

87 valid = data.get("valid", False) 

88 print(f"valid: {valid}") 

89 if not valid: 

90 print(f"reason: {data.get('reason', 'unknown')}") 

91 return 0 

92 

93 # Treat 422/400 as invalid (not an HTTP error — the token itself is invalid) 

94 if resp.status_code in (422, 400): 

95 try: 

96 detail = resp.json().get("detail", resp.text) 

97 except ValueError: 

98 detail = resp.text 

99 print(f"invalid: {detail}", file=sys.stderr) 

100 return 1 

101 

102 print(f"error: {resp.status_code}: {resp.text}", file=sys.stderr) 

103 return 1 

104 

105 

106def _cmd_capability_revoke(args: argparse.Namespace) -> int: 

107 """Revoke a capability token via the local node HTTP API.""" 

108 import json 

109 

110 import httpx 

111 

112 payload: dict[str, str] = {} 

113 if args.reason: 

114 payload["reason"] = args.reason 

115 

116 headers: dict[str, str] = {"Content-Type": "application/json"} 

117 if args.api_key: 

118 headers["Authorization"] = f"Bearer {args.api_key}" 

119 

120 try: 

121 resp = httpx.post( 

122 f"{args.node_url.rstrip('/')}/v1/federation/capability-tokens/{args.token_id}/revoke", 

123 json=payload, 

124 headers=headers, 

125 timeout=15.0, 

126 ) 

127 except Exception as exc: 

128 print(f"error: cannot reach node at {args.node_url}: {exc}", file=sys.stderr) 

129 return 1 

130 

131 if resp.status_code == 200: 

132 data = resp.json() 

133 if args.json: 

134 print(json.dumps(data, indent=2)) 

135 else: 

136 print(f"revoked: {data['token_id']} at {data['revoked_at']}") 

137 return 0 

138 

139 if resp.status_code == 404: 

140 print(f"error: token not found: {args.token_id}", file=sys.stderr) 

141 return 1 

142 if resp.status_code == 409: 

143 print(f"error: token already revoked: {args.token_id}", file=sys.stderr) 

144 return 1 

145 

146 print(f"error: {resp.status_code}: {resp.text}", file=sys.stderr) 

147 return 1