Coverage for node / src / stigmem_node / cli_admin_handlers.py: 84%

334 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Admin / migration / audit CLI handlers extracted from cli.py. 

2 

3These handlers are imported back into ``stigmem_node.cli`` so that the public 

4import surface (``from stigmem_node.cli import _cmd_backfill_cids`` etc.) is 

5preserved. No behavioural changes — code was moved verbatim from cli.py. 

6""" 

7 

8from __future__ import annotations 

9 

10import argparse 

11import logging 

12import os 

13import sys 

14 

15logger = logging.getLogger("stigmem.cli") 

16 

17 

18def _write_owner_only_text(path: str, text: str) -> None: 

19 fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) 

20 try: 

21 os.fchmod(fd, 0o600) 

22 file_obj = os.fdopen(fd, "w") 

23 fd = -1 

24 with file_obj as f: 

25 f.write(text) 

26 finally: 

27 if fd != -1: 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true

28 os.close(fd) 

29 os.chmod(path, 0o600) 

30 

31 

32def _resolve_scope_prefix_and_label( 

33 args: argparse.Namespace, 

34 deployment: str, 

35 agent_id: str, 

36) -> tuple[str, str]: 

37 """Pick the role/skill scope-prefix builder and produce a human-readable label.""" 

38 from .instruction_migrate import scope_prefix_for_role, scope_prefix_for_skill 

39 

40 if args.role: 

41 return scope_prefix_for_role(deployment, agent_id), f"role:{args.role} agent:{agent_id}" 

42 return scope_prefix_for_skill(deployment, args.skill), f"skill:{args.skill} agent:{agent_id}" 

43 

44 

45def _load_existing_state_from_db( 

46 db_path: str, 

47 stub_diff_uris: set[str], 

48 agent_id: str, 

49) -> tuple[dict[str, str], set[str]]: 

50 """Best-effort: read existing fact values + prior manifest entry names from local SQLite.""" 

51 import json as _json 

52 import sqlite3 

53 

54 existing_content: dict[str, str] = {} 

55 prev_names: set[str] = set() 

56 try: 

57 conn = sqlite3.connect(db_path) 

58 conn.row_factory = sqlite3.Row 

59 for uri in stub_diff_uris: 

60 row = conn.execute( 

61 "SELECT value_v FROM facts WHERE entity = ? ORDER BY timestamp DESC LIMIT 1", 

62 (uri,), 

63 ).fetchone() 

64 if row: 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true

65 existing_content[uri] = str(row["value_v"]) 

66 row = conn.execute( 

67 "SELECT body FROM instruction_manifests" 

68 " WHERE agent_id = ? AND superseded_at IS NULL" 

69 " ORDER BY created_at DESC LIMIT 1", 

70 (agent_id,), 

71 ).fetchone() 

72 if row: 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true

73 prev_names = {e["name"] for e in _json.loads(row["body"])} 

74 conn.close() 

75 except Exception as exc: 

76 print(f"warning: db query failed: {exc}", file=sys.stderr) 

77 return existing_content, prev_names 

78 

79 

80def _load_existing_state_from_api( 

81 node_url: str, 

82 api_key: str, 

83 stub_diff_uris: set[str], 

84 agent_id: str, 

85) -> tuple[dict[str, str], set[str]]: 

86 """Best-effort: fetch existing facts + manifest entries via HTTP.""" 

87 try: 

88 import httpx 

89 except ImportError: 

90 print("warning: httpx not installed — skipping idempotency checks", file=sys.stderr) 

91 return {}, set() 

92 

93 existing_content: dict[str, str] = {} 

94 prev_names: set[str] = set() 

95 headers = {"Authorization": f"Bearer {api_key}"} 

96 base = node_url.rstrip("/") 

97 

98 for uri in stub_diff_uris: 

99 try: 

100 r = httpx.get( 

101 f"{base}/v1/facts", 

102 params={"entity": uri, "limit": 1}, 

103 headers=headers, 

104 timeout=10.0, 

105 ) 

106 if r.status_code == 200: 106 ↛ 98line 106 didn't jump to line 98 because the condition on line 106 was always true

107 facts = r.json().get("facts", []) 

108 if facts: 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true

109 existing_content[uri] = str(facts[0]["value"]["v"]) 

110 except Exception as exc: # nosec B110 — best-effort pre-flight 

111 logger.debug("instruction migrate pre-flight fact fetch failed: %s", exc) 

112 try: 

113 r = httpx.get( 

114 f"{base}/v1/agents/{agent_id}/instruction-manifest", 

115 headers=headers, 

116 timeout=10.0, 

117 ) 

118 if r.status_code == 200: 118 ↛ 122line 118 didn't jump to line 122 because the condition on line 118 was always true

119 prev_names = {e["name"] for e in r.json().get("entries", [])} 

120 except Exception as exc: # nosec B110 — best-effort pre-flight 

121 logger.debug("instruction migrate pre-flight manifest fetch failed: %s", exc) 

122 return existing_content, prev_names 

123 

124 

125def _load_existing_state( 

126 args: argparse.Namespace, 

127 node_url: str, 

128 api_key: str, 

129 stub_diff_uris: set[str], 

130 agent_id: str, 

131) -> tuple[dict[str, str], set[str]]: 

132 """Dispatch to the DB-backed or HTTP-backed pre-flight loader, or return empty.""" 

133 if args.db: 

134 return _load_existing_state_from_db(args.db, stub_diff_uris, agent_id) 

135 if api_key: 

136 return _load_existing_state_from_api(node_url, api_key, stub_diff_uris, agent_id) 

137 return {}, set() 

138 

139 

140def _confirm_proceed() -> bool: 

141 """Interactive [y/N] confirmation. Returns False on abort or non-y answer.""" 

142 try: 

143 answer = input("Proceed? [y/N] ") 

144 except (EOFError, KeyboardInterrupt): 

145 print("\nAborted.") 

146 return False 

147 if answer.lower() not in ("y", "yes"): 

148 print("Aborted.") 

149 return False 

150 return True 

151 

152 

153def _cmd_instruction_migrate(args: argparse.Namespace) -> int: 

154 """Migrate markdown instruction files to stigmem facts and publish manifest.""" 

155 import os 

156 import time 

157 from pathlib import Path 

158 

159 from .instruction_migrate import ( 

160 build_fact_uri, 

161 compute_diff, 

162 format_preview, 

163 parse_instruction_chunks, 

164 publish_manifest, 

165 write_facts, 

166 ) 

167 

168 path = Path(args.path) 

169 if not path.exists(): 

170 print(f"error: path '{args.path}' does not exist", file=sys.stderr) 

171 return 1 

172 

173 api_key = args.api_key or os.environ.get("STIGMEM_API_KEY", "") 

174 node_url = args.node_url 

175 deployment = args.deployment 

176 version = args.version 

177 agent_id = args.agent_id 

178 

179 scope_prefix, scope_label = _resolve_scope_prefix_and_label(args, deployment, agent_id) 

180 

181 chunks = parse_instruction_chunks(path) 

182 if not chunks: 

183 print( 

184 "No instruction chunks found. Check that the path contains .md files.", 

185 file=sys.stderr, 

186 ) 

187 return 0 

188 

189 # Idempotency pre-flight: load existing fact content + prior manifest entry names. 

190 stub_diff_uris = {build_fact_uri(scope_prefix, c.slug, version) for c in chunks} 

191 existing_content, prev_names = _load_existing_state( 

192 args, 

193 node_url, 

194 api_key, 

195 stub_diff_uris, 

196 agent_id, 

197 ) 

198 

199 diff = compute_diff(chunks, scope_prefix, version, existing_content, prev_names) 

200 print(format_preview(diff, scope_label, path, version)) 

201 

202 if args.dry_run: 

203 print("Dry-run mode — no changes written.") 

204 return 0 

205 

206 work = [d for d in diff if d.action in ("CREATE", "UPDATE", "TOMBSTONE")] 

207 if not work: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true

208 print("Nothing to do.") 

209 return 0 

210 

211 if not args.yes and not _confirm_proceed(): 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 return 1 

213 

214 if not api_key: 

215 print( 

216 "error: --api-key or STIGMEM_API_KEY env var required to write facts", 

217 file=sys.stderr, 

218 ) 

219 return 1 

220 

221 written, failed = write_facts(diff, node_url, api_key) 

222 if failed > 0: 

223 print( 

224 f"\n{failed} fact(s) failed to write. Manifest will NOT be published.", 

225 file=sys.stderr, 

226 ) 

227 return 1 

228 

229 # Unique manifest version per run (timestamp suffix) 

230 manifest_version = f"{version}-{int(time.time())}" 

231 if not publish_manifest(agent_id, diff, manifest_version, node_url, api_key): 

232 return 1 

233 

234 print(f"\nDone. {written} fact(s) written, manifest published as version '{manifest_version}'.") 

235 print(f"Verify: stigmem recall-instruction via POST /v1/agents/{agent_id}/recall-instruction") 

236 return 0 

237 

238 

239def _cmd_instruction_manifest_generate(args: argparse.Namespace) -> int: 

240 """Generate an instruction manifest JSON from a directory of markdown files.""" 

241 import json 

242 import re 

243 from pathlib import Path 

244 

245 path = Path(args.path) 

246 if not path.is_dir(): 

247 print(f"error: '{args.path}' is not a directory", file=sys.stderr) 

248 return 1 

249 

250 entries = [] 

251 md_files = sorted(path.glob("*.md")) + sorted(path.glob("**/*.md")) 

252 

253 for md_file in md_files: 

254 try: 

255 text = md_file.read_text(encoding="utf-8") 

256 except Exception as exc: 

257 print(f"warning: skipping {md_file}: {exc}", file=sys.stderr) 

258 continue 

259 

260 # Split at H2/H3 boundaries 

261 sections = re.split(r"(?m)^(#{2,3}\s+.+)$", text) 

262 

263 # Merge heading with following content 

264 chunks: list[tuple[str, str]] = [] 

265 i = 0 

266 while i < len(sections): 

267 if re.match(r"^#{2,3}\s+", sections[i].strip()): 

268 heading = sections[i].strip() 

269 body = sections[i + 1].strip() if i + 1 < len(sections) else "" 

270 chunks.append((heading, body)) 

271 i += 2 

272 else: 

273 if sections[i].strip(): 

274 chunks.append( 

275 ("# " + md_file.stem.replace("-", " ").title(), sections[i].strip()) 

276 ) 

277 i += 1 

278 

279 if not chunks: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true

280 chunks = [("# " + md_file.stem.replace("-", " ").title(), text.strip())] 

281 

282 for heading, body in chunks: 

283 heading_text = re.sub(r"^#{2,3}\s+", "", heading).strip() 

284 slug = re.sub(r"[^a-z0-9]+", "-", heading_text.lower()).strip("-") 

285 if not slug: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true

286 slug = md_file.stem 

287 unit_name = f"{md_file.stem}-{slug}" if md_file.stem not in slug else slug 

288 

289 keywords = list( 

290 { 

291 w.lower() 

292 for w in re.findall(r"\b[a-zA-Z]{4,}\b", heading_text + " " + body[:200]) 

293 } 

294 )[:8] 

295 token_est = max(1, len(body) // 4) 

296 fact_uri = ( 

297 f"instruction:{args.deployment}/agent/{args.agent_id}/{unit_name}/{args.version}" 

298 ) 

299 

300 entries.append( 

301 { 

302 "name": unit_name, 

303 "description": heading_text[:120], 

304 "required_by_task_types": [], 

305 "guarantee_load": False, 

306 "load_triggers": { 

307 "intents": [heading_text.lower()], 

308 "keywords": keywords, 

309 "task_types": [], 

310 }, 

311 "fact_uri": fact_uri, 

312 "path": str(md_file), 

313 "token_estimate": token_est, 

314 } 

315 ) 

316 

317 result = { 

318 "version": args.version, 

319 "agent_id": args.agent_id, 

320 "deployment": args.deployment, 

321 "generated_from": str(path), 

322 "entries": entries, 

323 } 

324 output = json.dumps(result, indent=2) 

325 

326 if args.out: 

327 _write_owner_only_text(args.out, output) 

328 print(f"Wrote {len(entries)} entries to {args.out}") 

329 else: 

330 print(output) 

331 

332 return 0 

333 

334 

335def _cmd_audit_discovery(args: argparse.Namespace) -> int: 

336 """Print discovery audit metrics from the local database.""" 

337 import json 

338 import sqlite3 

339 from datetime import UTC, datetime, timedelta 

340 

341 from .settings import settings 

342 

343 db_path = args.db or settings.db_path 

344 

345 if args.since: 

346 try: 

347 since_dt = datetime.fromisoformat(args.since.replace("Z", "+00:00")) 

348 except ValueError: 

349 print(f"error: invalid --since date: {args.since}", file=sys.stderr) 

350 return 1 

351 else: 

352 since_dt = datetime.now(UTC) - timedelta(days=7) 

353 since_ms = int(since_dt.timestamp() * 1000) 

354 

355 try: 

356 conn = sqlite3.connect(db_path) 

357 conn.row_factory = sqlite3.Row 

358 except Exception as exc: 

359 print(f"error: cannot open database {db_path}: {exc}", file=sys.stderr) 

360 return 1 

361 

362 agent_filter = args.agent 

363 rows = conn.execute( 

364 "SELECT * FROM instruction_audit WHERE agent_id LIKE ? AND session_start >= ?", 

365 (f"%{agent_filter}%", since_ms), 

366 ).fetchall() 

367 

368 if not rows: 

369 print(f"No audit records found for agent '{agent_filter}' since {since_dt.date()}") 

370 return 0 

371 

372 total = len(rows) 

373 recall_at_k_num: float = 0.0 

374 hit_at_k_num = 0 

375 total_used = 0 

376 total_missed = 0 

377 

378 for row in rows: 

379 loaded = set(json.loads(row["loaded_chunks"])) 

380 used = json.loads(row["used_chunks"]) 

381 missed = json.loads(row["missed_chunks"]) 

382 used_set = set(used) 

383 missed_set = set(missed) 

384 

385 if used_set: 385 ↛ 391line 385 didn't jump to line 391 because the condition on line 385 was always true

386 recall_at_k = len(used_set & loaded) / len(used_set) 

387 recall_at_k_num += recall_at_k 

388 if used_set & loaded: 388 ↛ 391line 388 didn't jump to line 391 because the condition on line 388 was always true

389 hit_at_k_num += 1 

390 

391 total_used += len(used_set) 

392 total_missed += len(missed_set) 

393 

394 recall_at_k_avg = recall_at_k_num / total if total > 0 else 0.0 

395 hit_at_k_avg = hit_at_k_num / total if total > 0 else 0.0 

396 miss_rate = ( 

397 total_missed / (total_used + total_missed) if (total_used + total_missed) > 0 else 0.0 

398 ) 

399 

400 report = { 

401 "agent": agent_filter, 

402 "since": since_dt.isoformat(), 

403 "total_events": total, 

404 "recall_at_k": round(recall_at_k_avg, 4), 

405 "hit_at_k": round(hit_at_k_avg, 4), 

406 "miss_rate": round(miss_rate, 4), 

407 } 

408 

409 if args.json: 

410 print(json.dumps(report, indent=2)) 

411 else: 

412 print(f"Discovery audit — agent: {agent_filter} since: {since_dt.date()}") 

413 print(f" Total events : {total}") 

414 print(f" Recall@k : {recall_at_k_avg:.1%}") 

415 print(f" Hit@k : {hit_at_k_avg:.1%}") 

416 print(f" Miss rate : {miss_rate:.1%}") 

417 if miss_rate > 0.15: 417 ↛ 420line 417 didn't jump to line 420 because the condition on line 417 was always true

418 print(" ALERT: miss_rate > 0.15 — manifest descriptions or triggers need review") 

419 

420 conn.close() 

421 return 0 

422 

423 

424def _cmd_identity_rotate_key(args: argparse.Namespace) -> int: 

425 """Rotate the node or issuer Ed25519 key (spec §22.2). 

426 

427 Generates a next-gen keypair, appends a rotation event to the manifest, 

428 submits updated manifest + KeyRotationLogEntry to the transparency log, 

429 and prints the new private key seed for injection into your secrets manager. 

430 

431 The retiring key stays in the dual-trust accept_set for --dual-trust-days 

432 (default 90), covering all in-flight tokens signed under the old key. 

433 """ 

434 import json as _json 

435 

436 from .db import apply_migrations 

437 from .identity.capability import load_node_private_key 

438 from .identity.key_rotation import rotate_key 

439 from .identity.manifest import manifest_from_dict 

440 

441 if args.db: 441 ↛ 449line 441 didn't jump to line 449 because the condition on line 441 was always true

442 import stigmem_node.settings as settings_module 

443 

444 from .settings import Settings 

445 

446 patched = Settings(db_path=args.db) 

447 settings_module.settings = patched 

448 

449 from .settings import settings 

450 

451 apply_migrations(db_path=settings.db_path) 

452 

453 old_priv = load_node_private_key() 

454 if old_priv is None: 

455 print( 

456 "error: STIGMEM_NODE_PRIVATE_KEY is not configured — cannot rotate", 

457 file=sys.stderr, 

458 ) 

459 return 1 

460 

461 entity_uri = settings.node_url 

462 

463 from .db import db as _db_ctx 

464 

465 with _db_ctx() as conn: 

466 row = conn.execute( 

467 "SELECT manifest_json FROM federation_manifests WHERE entity_uri = ?", 

468 (entity_uri,), 

469 ).fetchone() 

470 

471 if row is None: 471 ↛ 479line 471 didn't jump to line 479 because the condition on line 471 was always true

472 print( 

473 f"error: no manifest found for {entity_uri!r} in federation_manifests\n" 

474 "Publish a manifest first via PUT /v1/federation/manifest", 

475 file=sys.stderr, 

476 ) 

477 return 1 

478 

479 old_manifest = manifest_from_dict(_json.loads(row["manifest_json"])) 

480 

481 try: 

482 result = rotate_key( 

483 entity_uri=entity_uri, 

484 old_manifest=old_manifest, 

485 old_private_key=old_priv, 

486 dual_trust_days=args.dual_trust_days, 

487 dry_run=args.dry_run, 

488 ) 

489 except ValueError as exc: 

490 print(f"error: {exc}", file=sys.stderr) 

491 return 1 

492 

493 tag = "[DRY RUN] " if args.dry_run else "" 

494 print(f"{tag}Key rotation ({args.kind}) complete") 

495 print(f" old key_id : {old_manifest.key_id}") 

496 print(f" new key_id : {result.new_manifest.key_id}") 

497 print(f" dual-trust : {result.rotation_log_entry.dual_trust_expires_at}") 

498 

499 if not args.dry_run and result.manifest_log_entry and result.rotation_tl_entry: 

500 print(f" manifest TL index : {result.manifest_log_entry.log_index}") 

501 print(f" rotation TL index : {result.rotation_tl_entry.log_index}") 

502 

503 from .identity.trust_store import store_peer_manifest 

504 

505 store_peer_manifest(entity_uri, result.new_manifest, result.manifest_log_entry) 

506 print(" manifest stored in federation_manifests") 

507 

508 print() 

509 print("ACTION REQUIRED — update your secrets manager with the new private key:") 

510 print(f" STIGMEM_NODE_PRIVATE_KEY={result.new_private_key_b64}") 

511 print("Then restart the node. Keep the old key value until the dual-trust window closes.") 

512 return 0 

513 

514 

515def _cmd_backfill_cids(args: argparse.Namespace) -> int: 

516 """Compute and persist CIDs for facts that pre-date Phase 13 (spec §25.6.3).""" 

517 import sqlite3 as _sqlite3 

518 

519 from .cid import compute_cid as _compute_cid 

520 from .lifecycle.immutability import set_fact_cid_backfill_status 

521 

522 db_path: str | None = getattr(args, "db", None) 

523 if db_path is None: 

524 import os as _os 

525 

526 db_path = _os.environ.get("STIGMEM_DB_PATH", "stigmem.db") 

527 

528 batch_size: int = getattr(args, "batch_size", 500) 

529 quiet: bool = getattr(args, "quiet", False) 

530 

531 conn = _sqlite3.connect(db_path) 

532 conn.row_factory = _sqlite3.Row 

533 

534 total_updated = 0 

535 collision_skipped = 0 

536 

537 while True: 

538 rows = conn.execute( 

539 "SELECT f.id, f.entity, f.relation, f.value_type, f.value_v, f.source, " 

540 "f.scope, f.confidence" 

541 " FROM facts f" 

542 " LEFT JOIN fact_cid_backfill fcb ON fcb.fact_id = f.id" 

543 " WHERE f.cid IS NULL AND COALESCE(fcb.status, 'pending') != 'complete'" 

544 " LIMIT ?", 

545 (batch_size,), 

546 ).fetchall() 

547 if not rows: 

548 break 

549 

550 for row in rows: 

551 cid = _compute_cid( 

552 entity=row["entity"], 

553 relation=row["relation"], 

554 value_type=row["value_type"], 

555 value_v=row["value_v"] or "", 

556 source=row["source"], 

557 scope=row["scope"], 

558 confidence=float(row["confidence"]), 

559 ) 

560 # Check for CID collision before writing 

561 existing = conn.execute( 

562 "SELECT fact_id FROM fact_cid_aliases WHERE cid = ?", (cid,) 

563 ).fetchone() 

564 if existing and existing["fact_id"] != row["id"]: 564 ↛ 565line 564 didn't jump to line 565 because the condition on line 564 was never true

565 collision_skipped += 1 

566 continue 

567 

568 conn.execute( 

569 "INSERT OR IGNORE INTO fact_cid_aliases (fact_id, cid) VALUES (?, ?)", 

570 (row["id"], cid), 

571 ) 

572 set_fact_cid_backfill_status(conn, fact_id=row["id"], status="complete") 

573 

574 conn.commit() 

575 total_updated += len(rows) 

576 if not quiet: 

577 print(f"backfill-cids: processed {total_updated} facts…", file=sys.stderr) 

578 

579 conn.close() 

580 if not quiet: 

581 print( 

582 f"backfill-cids: done — {total_updated} facts updated" 

583 + (f", {collision_skipped} CID collisions skipped" if collision_skipped else ""), 

584 file=sys.stderr, 

585 ) 

586 return 0 

587 

588 

589def _cmd_auth_bootstrap_key(args: argparse.Namespace) -> int: 

590 """Register a caller-provided admin-scope API key on a fresh install. 

591 

592 The caller supplies the key value via `--key` or the 

593 `STIGMEM_BOOTSTRAP_KEY` env var; we hash and store it. This is by 

594 design: the system is not the credential-generation surface. The 

595 user keeps full custody of the raw key from the moment it exists. 

596 

597 Refuses to run if the api_keys table is non-empty — bootstrap is 

598 one-shot. After bootstrap, additional keys go through 

599 `POST /v1/auth/keys` authenticated with the bootstrap key. 

600 """ 

601 import os 

602 

603 from .auth import register_api_key 

604 from .db import db 

605 

606 # Resolve key material from the caller. We do NOT generate one. 

607 key_value: str | None = args.key or os.environ.get("STIGMEM_BOOTSTRAP_KEY") 

608 if not key_value: 

609 print( 

610 "ERROR: no key value provided. Generate one externally and pass via\n" 

611 " --key VALUE or STIGMEM_BOOTSTRAP_KEY=VALUE\n\n" 

612 "Example:\n" 

613 " KEY=$(openssl rand -hex 32)\n" 

614 ' stigmem auth bootstrap-key --key "$KEY"\n' 

615 " # then use $KEY as `Authorization: Bearer $KEY` for API calls", 

616 file=sys.stderr, 

617 ) 

618 return 2 

619 

620 # Minimum-length check. Not a substitute for proper entropy 

621 # validation — we trust the caller used a CSPRNG — but it catches 

622 # obvious mistakes like `--key admin` or `--key password`. 

623 if len(key_value) < 16: 

624 print( 

625 f"ERROR: key must be at least 16 characters (got {len(key_value)}). " 

626 "Use `openssl rand -hex 32` or similar to generate sufficient entropy.", 

627 file=sys.stderr, 

628 ) 

629 return 2 

630 

631 with db() as conn: 

632 row = conn.execute("SELECT COUNT(*) FROM api_keys").fetchone() 

633 existing = int(row[0]) if row else 0 

634 

635 if existing > 0: 

636 print( 

637 f"ERROR: api_keys table is not empty ({existing} row(s)). " 

638 "Bootstrap is one-shot.\n" 

639 "Mint additional keys via `POST /v1/auth/keys` " 

640 "authenticated with an existing admin key.", 

641 file=sys.stderr, 

642 ) 

643 return 1 

644 

645 permissions: list[str] = ( 

646 args.permissions.split(",") if args.permissions else ["admin", "write", "read"] 

647 ) 

648 

649 # Discard the returned row id — it's UUID bookkeeping the adopter has 

650 # no use for. Suppressing it also avoids tripping CodeQL's name-based 

651 # heuristic that treats any variable matching `*key*` as a credential 

652 # candidate. (The actual raw key value never flows here; this is just 

653 # naming hygiene to prevent false positives on follow-up scans.) 

654 register_api_key( 

655 raw_key=key_value, 

656 entity_uri=args.entity_uri, 

657 permissions=permissions, 

658 ) 

659 

660 # Confirmation: entity + permissions only. The raw value is never 

661 # printed; the caller already has it from their `--key` / env-var input. 

662 print( 

663 f"Registered admin API key for entity={args.entity_uri!r} " 

664 f"with permissions={permissions!r}.\n" 

665 "Use your provided value as `Authorization: Bearer <value>` " 

666 "for subsequent requests.", 

667 file=sys.stderr, 

668 ) 

669 return 0