Coverage for node / src / stigmem_node / cli / capability.py: 98%
91 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Capability-token CLI handlers."""
3from __future__ import annotations
5import argparse
6import sys
9def _cmd_capability_issue(args: argparse.Namespace) -> int:
10 """Issue a capability token via the local node HTTP API."""
11 import json
13 import httpx
15 payload = {
16 "issuer": args.issuer,
17 "subject": args.subject,
18 "verb": args.verb,
19 "object": args.object,
20 }
21 if args.ttl_seconds is not None:
22 payload["ttl_seconds"] = args.ttl_seconds
24 headers: dict[str, str] = {"Content-Type": "application/json"}
25 if args.api_key:
26 headers["Authorization"] = f"Bearer {args.api_key}"
28 try:
29 resp = httpx.post(
30 f"{args.node_url.rstrip('/')}/v1/federation/capability-tokens",
31 json=payload,
32 headers=headers,
33 timeout=15.0,
34 )
35 except Exception as exc:
36 print(f"error: cannot reach node at {args.node_url}: {exc}", file=sys.stderr)
37 return 1
39 if resp.status_code == 201:
40 data = resp.json()
41 if args.json:
42 print(json.dumps(data, indent=2))
43 else:
44 print(f"token_id: {data['token_id']}")
45 print(f"issuer: {data['issuer']}")
46 print(f"subject: {data['subject']}")
47 print(f"verb: {data['verb']}")
48 print(f"object: {data['object']}")
49 print(f"expiry: {data['expiry']}")
50 print(f"token_json: {data['token_json']}")
51 return 0
53 print(f"error: {resp.status_code}: {resp.text}", file=sys.stderr)
54 return 1
57def _cmd_capability_verify(args: argparse.Namespace) -> int:
58 """Verify a capability token via the local node HTTP API."""
59 import json
61 import httpx
63 token_json_str = args.token_json
64 if token_json_str == "-": # nosec B105 — "-" is stdin sentinel, not a password 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true
65 token_json_str = sys.stdin.read().strip()
67 headers: dict[str, str] = {}
68 if args.api_key:
69 headers["Authorization"] = f"Bearer {args.api_key}"
71 try:
72 resp = httpx.post(
73 f"{args.node_url.rstrip('/')}/v1/federation/capability-tokens/verify",
74 json={"token_json": token_json_str},
75 headers=headers,
76 timeout=15.0,
77 )
78 except Exception as exc:
79 print(f"error: cannot reach node at {args.node_url}: {exc}", file=sys.stderr)
80 return 1
82 if resp.status_code == 200:
83 data = resp.json()
84 if args.json:
85 print(json.dumps(data, indent=2))
86 else:
87 valid = data.get("valid", False)
88 print(f"valid: {valid}")
89 if not valid:
90 print(f"reason: {data.get('reason', 'unknown')}")
91 return 0
93 # Treat 422/400 as invalid (not an HTTP error — the token itself is invalid)
94 if resp.status_code in (422, 400):
95 try:
96 detail = resp.json().get("detail", resp.text)
97 except ValueError:
98 detail = resp.text
99 print(f"invalid: {detail}", file=sys.stderr)
100 return 1
102 print(f"error: {resp.status_code}: {resp.text}", file=sys.stderr)
103 return 1
106def _cmd_capability_revoke(args: argparse.Namespace) -> int:
107 """Revoke a capability token via the local node HTTP API."""
108 import json
110 import httpx
112 payload: dict[str, str] = {}
113 if args.reason:
114 payload["reason"] = args.reason
116 headers: dict[str, str] = {"Content-Type": "application/json"}
117 if args.api_key:
118 headers["Authorization"] = f"Bearer {args.api_key}"
120 try:
121 resp = httpx.post(
122 f"{args.node_url.rstrip('/')}/v1/federation/capability-tokens/{args.token_id}/revoke",
123 json=payload,
124 headers=headers,
125 timeout=15.0,
126 )
127 except Exception as exc:
128 print(f"error: cannot reach node at {args.node_url}: {exc}", file=sys.stderr)
129 return 1
131 if resp.status_code == 200:
132 data = resp.json()
133 if args.json:
134 print(json.dumps(data, indent=2))
135 else:
136 print(f"revoked: {data['token_id']} at {data['revoked_at']}")
137 return 0
139 if resp.status_code == 404:
140 print(f"error: token not found: {args.token_id}", file=sys.stderr)
141 return 1
142 if resp.status_code == 409:
143 print(f"error: token already revoked: {args.token_id}", file=sys.stderr)
144 return 1
146 print(f"error: {resp.status_code}: {resp.text}", file=sys.stderr)
147 return 1