Coverage for node / src / stigmem_node / cli_admin_handlers.py: 84%
334 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Admin / migration / audit CLI handlers extracted from cli.py.
3These handlers are imported back into ``stigmem_node.cli`` so that the public
4import surface (``from stigmem_node.cli import _cmd_backfill_cids`` etc.) is
5preserved. No behavioural changes — code was moved verbatim from cli.py.
6"""
8from __future__ import annotations
10import argparse
11import logging
12import os
13import sys
15logger = logging.getLogger("stigmem.cli")
18def _write_owner_only_text(path: str, text: str) -> None:
19 fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
20 try:
21 os.fchmod(fd, 0o600)
22 file_obj = os.fdopen(fd, "w")
23 fd = -1
24 with file_obj as f:
25 f.write(text)
26 finally:
27 if fd != -1: 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true
28 os.close(fd)
29 os.chmod(path, 0o600)
32def _resolve_scope_prefix_and_label(
33 args: argparse.Namespace,
34 deployment: str,
35 agent_id: str,
36) -> tuple[str, str]:
37 """Pick the role/skill scope-prefix builder and produce a human-readable label."""
38 from .instruction_migrate import scope_prefix_for_role, scope_prefix_for_skill
40 if args.role:
41 return scope_prefix_for_role(deployment, agent_id), f"role:{args.role} agent:{agent_id}"
42 return scope_prefix_for_skill(deployment, args.skill), f"skill:{args.skill} agent:{agent_id}"
45def _load_existing_state_from_db(
46 db_path: str,
47 stub_diff_uris: set[str],
48 agent_id: str,
49) -> tuple[dict[str, str], set[str]]:
50 """Best-effort: read existing fact values + prior manifest entry names from local SQLite."""
51 import json as _json
52 import sqlite3
54 existing_content: dict[str, str] = {}
55 prev_names: set[str] = set()
56 try:
57 conn = sqlite3.connect(db_path)
58 conn.row_factory = sqlite3.Row
59 for uri in stub_diff_uris:
60 row = conn.execute(
61 "SELECT value_v FROM facts WHERE entity = ? ORDER BY timestamp DESC LIMIT 1",
62 (uri,),
63 ).fetchone()
64 if row: 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true
65 existing_content[uri] = str(row["value_v"])
66 row = conn.execute(
67 "SELECT body FROM instruction_manifests"
68 " WHERE agent_id = ? AND superseded_at IS NULL"
69 " ORDER BY created_at DESC LIMIT 1",
70 (agent_id,),
71 ).fetchone()
72 if row: 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true
73 prev_names = {e["name"] for e in _json.loads(row["body"])}
74 conn.close()
75 except Exception as exc:
76 print(f"warning: db query failed: {exc}", file=sys.stderr)
77 return existing_content, prev_names
80def _load_existing_state_from_api(
81 node_url: str,
82 api_key: str,
83 stub_diff_uris: set[str],
84 agent_id: str,
85) -> tuple[dict[str, str], set[str]]:
86 """Best-effort: fetch existing facts + manifest entries via HTTP."""
87 try:
88 import httpx
89 except ImportError:
90 print("warning: httpx not installed — skipping idempotency checks", file=sys.stderr)
91 return {}, set()
93 existing_content: dict[str, str] = {}
94 prev_names: set[str] = set()
95 headers = {"Authorization": f"Bearer {api_key}"}
96 base = node_url.rstrip("/")
98 for uri in stub_diff_uris:
99 try:
100 r = httpx.get(
101 f"{base}/v1/facts",
102 params={"entity": uri, "limit": 1},
103 headers=headers,
104 timeout=10.0,
105 )
106 if r.status_code == 200: 106 ↛ 98line 106 didn't jump to line 98 because the condition on line 106 was always true
107 facts = r.json().get("facts", [])
108 if facts: 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true
109 existing_content[uri] = str(facts[0]["value"]["v"])
110 except Exception as exc: # nosec B110 — best-effort pre-flight
111 logger.debug("instruction migrate pre-flight fact fetch failed: %s", exc)
112 try:
113 r = httpx.get(
114 f"{base}/v1/agents/{agent_id}/instruction-manifest",
115 headers=headers,
116 timeout=10.0,
117 )
118 if r.status_code == 200: 118 ↛ 122line 118 didn't jump to line 122 because the condition on line 118 was always true
119 prev_names = {e["name"] for e in r.json().get("entries", [])}
120 except Exception as exc: # nosec B110 — best-effort pre-flight
121 logger.debug("instruction migrate pre-flight manifest fetch failed: %s", exc)
122 return existing_content, prev_names
125def _load_existing_state(
126 args: argparse.Namespace,
127 node_url: str,
128 api_key: str,
129 stub_diff_uris: set[str],
130 agent_id: str,
131) -> tuple[dict[str, str], set[str]]:
132 """Dispatch to the DB-backed or HTTP-backed pre-flight loader, or return empty."""
133 if args.db:
134 return _load_existing_state_from_db(args.db, stub_diff_uris, agent_id)
135 if api_key:
136 return _load_existing_state_from_api(node_url, api_key, stub_diff_uris, agent_id)
137 return {}, set()
140def _confirm_proceed() -> bool:
141 """Interactive [y/N] confirmation. Returns False on abort or non-y answer."""
142 try:
143 answer = input("Proceed? [y/N] ")
144 except (EOFError, KeyboardInterrupt):
145 print("\nAborted.")
146 return False
147 if answer.lower() not in ("y", "yes"):
148 print("Aborted.")
149 return False
150 return True
153def _cmd_instruction_migrate(args: argparse.Namespace) -> int:
154 """Migrate markdown instruction files to stigmem facts and publish manifest."""
155 import os
156 import time
157 from pathlib import Path
159 from .instruction_migrate import (
160 build_fact_uri,
161 compute_diff,
162 format_preview,
163 parse_instruction_chunks,
164 publish_manifest,
165 write_facts,
166 )
168 path = Path(args.path)
169 if not path.exists():
170 print(f"error: path '{args.path}' does not exist", file=sys.stderr)
171 return 1
173 api_key = args.api_key or os.environ.get("STIGMEM_API_KEY", "")
174 node_url = args.node_url
175 deployment = args.deployment
176 version = args.version
177 agent_id = args.agent_id
179 scope_prefix, scope_label = _resolve_scope_prefix_and_label(args, deployment, agent_id)
181 chunks = parse_instruction_chunks(path)
182 if not chunks:
183 print(
184 "No instruction chunks found. Check that the path contains .md files.",
185 file=sys.stderr,
186 )
187 return 0
189 # Idempotency pre-flight: load existing fact content + prior manifest entry names.
190 stub_diff_uris = {build_fact_uri(scope_prefix, c.slug, version) for c in chunks}
191 existing_content, prev_names = _load_existing_state(
192 args,
193 node_url,
194 api_key,
195 stub_diff_uris,
196 agent_id,
197 )
199 diff = compute_diff(chunks, scope_prefix, version, existing_content, prev_names)
200 print(format_preview(diff, scope_label, path, version))
202 if args.dry_run:
203 print("Dry-run mode — no changes written.")
204 return 0
206 work = [d for d in diff if d.action in ("CREATE", "UPDATE", "TOMBSTONE")]
207 if not work: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true
208 print("Nothing to do.")
209 return 0
211 if not args.yes and not _confirm_proceed(): 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 return 1
214 if not api_key:
215 print(
216 "error: --api-key or STIGMEM_API_KEY env var required to write facts",
217 file=sys.stderr,
218 )
219 return 1
221 written, failed = write_facts(diff, node_url, api_key)
222 if failed > 0:
223 print(
224 f"\n{failed} fact(s) failed to write. Manifest will NOT be published.",
225 file=sys.stderr,
226 )
227 return 1
229 # Unique manifest version per run (timestamp suffix)
230 manifest_version = f"{version}-{int(time.time())}"
231 if not publish_manifest(agent_id, diff, manifest_version, node_url, api_key):
232 return 1
234 print(f"\nDone. {written} fact(s) written, manifest published as version '{manifest_version}'.")
235 print(f"Verify: stigmem recall-instruction via POST /v1/agents/{agent_id}/recall-instruction")
236 return 0
239def _cmd_instruction_manifest_generate(args: argparse.Namespace) -> int:
240 """Generate an instruction manifest JSON from a directory of markdown files."""
241 import json
242 import re
243 from pathlib import Path
245 path = Path(args.path)
246 if not path.is_dir():
247 print(f"error: '{args.path}' is not a directory", file=sys.stderr)
248 return 1
250 entries = []
251 md_files = sorted(path.glob("*.md")) + sorted(path.glob("**/*.md"))
253 for md_file in md_files:
254 try:
255 text = md_file.read_text(encoding="utf-8")
256 except Exception as exc:
257 print(f"warning: skipping {md_file}: {exc}", file=sys.stderr)
258 continue
260 # Split at H2/H3 boundaries
261 sections = re.split(r"(?m)^(#{2,3}\s+.+)$", text)
263 # Merge heading with following content
264 chunks: list[tuple[str, str]] = []
265 i = 0
266 while i < len(sections):
267 if re.match(r"^#{2,3}\s+", sections[i].strip()):
268 heading = sections[i].strip()
269 body = sections[i + 1].strip() if i + 1 < len(sections) else ""
270 chunks.append((heading, body))
271 i += 2
272 else:
273 if sections[i].strip():
274 chunks.append(
275 ("# " + md_file.stem.replace("-", " ").title(), sections[i].strip())
276 )
277 i += 1
279 if not chunks: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true
280 chunks = [("# " + md_file.stem.replace("-", " ").title(), text.strip())]
282 for heading, body in chunks:
283 heading_text = re.sub(r"^#{2,3}\s+", "", heading).strip()
284 slug = re.sub(r"[^a-z0-9]+", "-", heading_text.lower()).strip("-")
285 if not slug: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true
286 slug = md_file.stem
287 unit_name = f"{md_file.stem}-{slug}" if md_file.stem not in slug else slug
289 keywords = list(
290 {
291 w.lower()
292 for w in re.findall(r"\b[a-zA-Z]{4,}\b", heading_text + " " + body[:200])
293 }
294 )[:8]
295 token_est = max(1, len(body) // 4)
296 fact_uri = (
297 f"instruction:{args.deployment}/agent/{args.agent_id}/{unit_name}/{args.version}"
298 )
300 entries.append(
301 {
302 "name": unit_name,
303 "description": heading_text[:120],
304 "required_by_task_types": [],
305 "guarantee_load": False,
306 "load_triggers": {
307 "intents": [heading_text.lower()],
308 "keywords": keywords,
309 "task_types": [],
310 },
311 "fact_uri": fact_uri,
312 "path": str(md_file),
313 "token_estimate": token_est,
314 }
315 )
317 result = {
318 "version": args.version,
319 "agent_id": args.agent_id,
320 "deployment": args.deployment,
321 "generated_from": str(path),
322 "entries": entries,
323 }
324 output = json.dumps(result, indent=2)
326 if args.out:
327 _write_owner_only_text(args.out, output)
328 print(f"Wrote {len(entries)} entries to {args.out}")
329 else:
330 print(output)
332 return 0
335def _cmd_audit_discovery(args: argparse.Namespace) -> int:
336 """Print discovery audit metrics from the local database."""
337 import json
338 import sqlite3
339 from datetime import UTC, datetime, timedelta
341 from .settings import settings
343 db_path = args.db or settings.db_path
345 if args.since:
346 try:
347 since_dt = datetime.fromisoformat(args.since.replace("Z", "+00:00"))
348 except ValueError:
349 print(f"error: invalid --since date: {args.since}", file=sys.stderr)
350 return 1
351 else:
352 since_dt = datetime.now(UTC) - timedelta(days=7)
353 since_ms = int(since_dt.timestamp() * 1000)
355 try:
356 conn = sqlite3.connect(db_path)
357 conn.row_factory = sqlite3.Row
358 except Exception as exc:
359 print(f"error: cannot open database {db_path}: {exc}", file=sys.stderr)
360 return 1
362 agent_filter = args.agent
363 rows = conn.execute(
364 "SELECT * FROM instruction_audit WHERE agent_id LIKE ? AND session_start >= ?",
365 (f"%{agent_filter}%", since_ms),
366 ).fetchall()
368 if not rows:
369 print(f"No audit records found for agent '{agent_filter}' since {since_dt.date()}")
370 return 0
372 total = len(rows)
373 recall_at_k_num: float = 0.0
374 hit_at_k_num = 0
375 total_used = 0
376 total_missed = 0
378 for row in rows:
379 loaded = set(json.loads(row["loaded_chunks"]))
380 used = json.loads(row["used_chunks"])
381 missed = json.loads(row["missed_chunks"])
382 used_set = set(used)
383 missed_set = set(missed)
385 if used_set: 385 ↛ 391line 385 didn't jump to line 391 because the condition on line 385 was always true
386 recall_at_k = len(used_set & loaded) / len(used_set)
387 recall_at_k_num += recall_at_k
388 if used_set & loaded: 388 ↛ 391line 388 didn't jump to line 391 because the condition on line 388 was always true
389 hit_at_k_num += 1
391 total_used += len(used_set)
392 total_missed += len(missed_set)
394 recall_at_k_avg = recall_at_k_num / total if total > 0 else 0.0
395 hit_at_k_avg = hit_at_k_num / total if total > 0 else 0.0
396 miss_rate = (
397 total_missed / (total_used + total_missed) if (total_used + total_missed) > 0 else 0.0
398 )
400 report = {
401 "agent": agent_filter,
402 "since": since_dt.isoformat(),
403 "total_events": total,
404 "recall_at_k": round(recall_at_k_avg, 4),
405 "hit_at_k": round(hit_at_k_avg, 4),
406 "miss_rate": round(miss_rate, 4),
407 }
409 if args.json:
410 print(json.dumps(report, indent=2))
411 else:
412 print(f"Discovery audit — agent: {agent_filter} since: {since_dt.date()}")
413 print(f" Total events : {total}")
414 print(f" Recall@k : {recall_at_k_avg:.1%}")
415 print(f" Hit@k : {hit_at_k_avg:.1%}")
416 print(f" Miss rate : {miss_rate:.1%}")
417 if miss_rate > 0.15: 417 ↛ 420line 417 didn't jump to line 420 because the condition on line 417 was always true
418 print(" ALERT: miss_rate > 0.15 — manifest descriptions or triggers need review")
420 conn.close()
421 return 0
424def _cmd_identity_rotate_key(args: argparse.Namespace) -> int:
425 """Rotate the node or issuer Ed25519 key (spec §22.2).
427 Generates a next-gen keypair, appends a rotation event to the manifest,
428 submits updated manifest + KeyRotationLogEntry to the transparency log,
429 and prints the new private key seed for injection into your secrets manager.
431 The retiring key stays in the dual-trust accept_set for --dual-trust-days
432 (default 90), covering all in-flight tokens signed under the old key.
433 """
434 import json as _json
436 from .db import apply_migrations
437 from .identity.capability import load_node_private_key
438 from .identity.key_rotation import rotate_key
439 from .identity.manifest import manifest_from_dict
441 if args.db: 441 ↛ 449line 441 didn't jump to line 449 because the condition on line 441 was always true
442 import stigmem_node.settings as settings_module
444 from .settings import Settings
446 patched = Settings(db_path=args.db)
447 settings_module.settings = patched
449 from .settings import settings
451 apply_migrations(db_path=settings.db_path)
453 old_priv = load_node_private_key()
454 if old_priv is None:
455 print(
456 "error: STIGMEM_NODE_PRIVATE_KEY is not configured — cannot rotate",
457 file=sys.stderr,
458 )
459 return 1
461 entity_uri = settings.node_url
463 from .db import db as _db_ctx
465 with _db_ctx() as conn:
466 row = conn.execute(
467 "SELECT manifest_json FROM federation_manifests WHERE entity_uri = ?",
468 (entity_uri,),
469 ).fetchone()
471 if row is None: 471 ↛ 479line 471 didn't jump to line 479 because the condition on line 471 was always true
472 print(
473 f"error: no manifest found for {entity_uri!r} in federation_manifests\n"
474 "Publish a manifest first via PUT /v1/federation/manifest",
475 file=sys.stderr,
476 )
477 return 1
479 old_manifest = manifest_from_dict(_json.loads(row["manifest_json"]))
481 try:
482 result = rotate_key(
483 entity_uri=entity_uri,
484 old_manifest=old_manifest,
485 old_private_key=old_priv,
486 dual_trust_days=args.dual_trust_days,
487 dry_run=args.dry_run,
488 )
489 except ValueError as exc:
490 print(f"error: {exc}", file=sys.stderr)
491 return 1
493 tag = "[DRY RUN] " if args.dry_run else ""
494 print(f"{tag}Key rotation ({args.kind}) complete")
495 print(f" old key_id : {old_manifest.key_id}")
496 print(f" new key_id : {result.new_manifest.key_id}")
497 print(f" dual-trust : {result.rotation_log_entry.dual_trust_expires_at}")
499 if not args.dry_run and result.manifest_log_entry and result.rotation_tl_entry:
500 print(f" manifest TL index : {result.manifest_log_entry.log_index}")
501 print(f" rotation TL index : {result.rotation_tl_entry.log_index}")
503 from .identity.trust_store import store_peer_manifest
505 store_peer_manifest(entity_uri, result.new_manifest, result.manifest_log_entry)
506 print(" manifest stored in federation_manifests")
508 print()
509 print("ACTION REQUIRED — update your secrets manager with the new private key:")
510 print(f" STIGMEM_NODE_PRIVATE_KEY={result.new_private_key_b64}")
511 print("Then restart the node. Keep the old key value until the dual-trust window closes.")
512 return 0
515def _cmd_backfill_cids(args: argparse.Namespace) -> int:
516 """Compute and persist CIDs for facts that pre-date Phase 13 (spec §25.6.3)."""
517 import sqlite3 as _sqlite3
519 from .cid import compute_cid as _compute_cid
520 from .lifecycle.immutability import set_fact_cid_backfill_status
522 db_path: str | None = getattr(args, "db", None)
523 if db_path is None:
524 import os as _os
526 db_path = _os.environ.get("STIGMEM_DB_PATH", "stigmem.db")
528 batch_size: int = getattr(args, "batch_size", 500)
529 quiet: bool = getattr(args, "quiet", False)
531 conn = _sqlite3.connect(db_path)
532 conn.row_factory = _sqlite3.Row
534 total_updated = 0
535 collision_skipped = 0
537 while True:
538 rows = conn.execute(
539 "SELECT f.id, f.entity, f.relation, f.value_type, f.value_v, f.source, "
540 "f.scope, f.confidence"
541 " FROM facts f"
542 " LEFT JOIN fact_cid_backfill fcb ON fcb.fact_id = f.id"
543 " WHERE f.cid IS NULL AND COALESCE(fcb.status, 'pending') != 'complete'"
544 " LIMIT ?",
545 (batch_size,),
546 ).fetchall()
547 if not rows:
548 break
550 for row in rows:
551 cid = _compute_cid(
552 entity=row["entity"],
553 relation=row["relation"],
554 value_type=row["value_type"],
555 value_v=row["value_v"] or "",
556 source=row["source"],
557 scope=row["scope"],
558 confidence=float(row["confidence"]),
559 )
560 # Check for CID collision before writing
561 existing = conn.execute(
562 "SELECT fact_id FROM fact_cid_aliases WHERE cid = ?", (cid,)
563 ).fetchone()
564 if existing and existing["fact_id"] != row["id"]: 564 ↛ 565line 564 didn't jump to line 565 because the condition on line 564 was never true
565 collision_skipped += 1
566 continue
568 conn.execute(
569 "INSERT OR IGNORE INTO fact_cid_aliases (fact_id, cid) VALUES (?, ?)",
570 (row["id"], cid),
571 )
572 set_fact_cid_backfill_status(conn, fact_id=row["id"], status="complete")
574 conn.commit()
575 total_updated += len(rows)
576 if not quiet:
577 print(f"backfill-cids: processed {total_updated} facts…", file=sys.stderr)
579 conn.close()
580 if not quiet:
581 print(
582 f"backfill-cids: done — {total_updated} facts updated"
583 + (f", {collision_skipped} CID collisions skipped" if collision_skipped else ""),
584 file=sys.stderr,
585 )
586 return 0
589def _cmd_auth_bootstrap_key(args: argparse.Namespace) -> int:
590 """Register a caller-provided admin-scope API key on a fresh install.
592 The caller supplies the key value via `--key` or the
593 `STIGMEM_BOOTSTRAP_KEY` env var; we hash and store it. This is by
594 design: the system is not the credential-generation surface. The
595 user keeps full custody of the raw key from the moment it exists.
597 Refuses to run if the api_keys table is non-empty — bootstrap is
598 one-shot. After bootstrap, additional keys go through
599 `POST /v1/auth/keys` authenticated with the bootstrap key.
600 """
601 import os
603 from .auth import register_api_key
604 from .db import db
606 # Resolve key material from the caller. We do NOT generate one.
607 key_value: str | None = args.key or os.environ.get("STIGMEM_BOOTSTRAP_KEY")
608 if not key_value:
609 print(
610 "ERROR: no key value provided. Generate one externally and pass via\n"
611 " --key VALUE or STIGMEM_BOOTSTRAP_KEY=VALUE\n\n"
612 "Example:\n"
613 " KEY=$(openssl rand -hex 32)\n"
614 ' stigmem auth bootstrap-key --key "$KEY"\n'
615 " # then use $KEY as `Authorization: Bearer $KEY` for API calls",
616 file=sys.stderr,
617 )
618 return 2
620 # Minimum-length check. Not a substitute for proper entropy
621 # validation — we trust the caller used a CSPRNG — but it catches
622 # obvious mistakes like `--key admin` or `--key password`.
623 if len(key_value) < 16:
624 print(
625 f"ERROR: key must be at least 16 characters (got {len(key_value)}). "
626 "Use `openssl rand -hex 32` or similar to generate sufficient entropy.",
627 file=sys.stderr,
628 )
629 return 2
631 with db() as conn:
632 row = conn.execute("SELECT COUNT(*) FROM api_keys").fetchone()
633 existing = int(row[0]) if row else 0
635 if existing > 0:
636 print(
637 f"ERROR: api_keys table is not empty ({existing} row(s)). "
638 "Bootstrap is one-shot.\n"
639 "Mint additional keys via `POST /v1/auth/keys` "
640 "authenticated with an existing admin key.",
641 file=sys.stderr,
642 )
643 return 1
645 permissions: list[str] = (
646 args.permissions.split(",") if args.permissions else ["admin", "write", "read"]
647 )
649 # Discard the returned row id — it's UUID bookkeeping the adopter has
650 # no use for. Suppressing it also avoids tripping CodeQL's name-based
651 # heuristic that treats any variable matching `*key*` as a credential
652 # candidate. (The actual raw key value never flows here; this is just
653 # naming hygiene to prevent false positives on follow-up scans.)
654 register_api_key(
655 raw_key=key_value,
656 entity_uri=args.entity_uri,
657 permissions=permissions,
658 )
660 # Confirmation: entity + permissions only. The raw value is never
661 # printed; the caller already has it from their `--key` / env-var input.
662 print(
663 f"Registered admin API key for entity={args.entity_uri!r} "
664 f"with permissions={permissions!r}.\n"
665 "Use your provided value as `Authorization: Bearer <value>` "
666 "for subsequent requests.",
667 file=sys.stderr,
668 )
669 return 0