Coverage for node / src / stigmem_node / cli / maintenance.py: 100%
25 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Maintenance and migration CLI handlers."""
3from __future__ import annotations
5import argparse
6import sys
9def _cmd_decay_sweep(args: argparse.Namespace) -> int:
10 from ..db import apply_migrations
11 from ..lifecycle.decay import run_decay_sweep
12 from ..settings import settings
14 db_path: str = args.db or settings.db_path
15 apply_migrations(db_path=db_path)
17 result = run_decay_sweep(
18 ttl_seconds=args.ttl_seconds,
19 min_confidence=args.min_confidence,
20 scope=args.scope or None,
21 dry_run=args.dry_run,
22 )
24 if args.dry_run:
25 print(f"[dry-run] {result['scanned']} facts would be decayed", file=sys.stderr)
26 else:
27 print(f"{result['decayed']} facts decayed ({result['scanned']} scanned)", file=sys.stderr)
28 return 0
31def _cmd_migrate_normalize_entities(args: argparse.Namespace) -> int:
32 from ..db import apply_migrations
33 from ..migrate import normalize_entities_sweep
34 from ..settings import settings
36 db_path: str = args.db or settings.db_path
37 apply_migrations(db_path=db_path)
39 registered, already_present = normalize_entities_sweep(db_path, dry_run=args.dry_run)
41 if args.dry_run:
42 print(
43 f"[dry-run] {registered} aliases would be registered",
44 file=sys.stderr,
45 )
46 else:
47 print(
48 f"{registered} aliases registered, {already_present} already present",
49 file=sys.stderr,
50 )
51 return 0