Coverage for node / src / stigmem_node / cli / maintenance.py: 100%

25 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Maintenance and migration CLI handlers.""" 

2 

3from __future__ import annotations 

4 

5import argparse 

6import sys 

7 

8 

9def _cmd_decay_sweep(args: argparse.Namespace) -> int: 

10 from ..db import apply_migrations 

11 from ..lifecycle.decay import run_decay_sweep 

12 from ..settings import settings 

13 

14 db_path: str = args.db or settings.db_path 

15 apply_migrations(db_path=db_path) 

16 

17 result = run_decay_sweep( 

18 ttl_seconds=args.ttl_seconds, 

19 min_confidence=args.min_confidence, 

20 scope=args.scope or None, 

21 dry_run=args.dry_run, 

22 ) 

23 

24 if args.dry_run: 

25 print(f"[dry-run] {result['scanned']} facts would be decayed", file=sys.stderr) 

26 else: 

27 print(f"{result['decayed']} facts decayed ({result['scanned']} scanned)", file=sys.stderr) 

28 return 0 

29 

30 

31def _cmd_migrate_normalize_entities(args: argparse.Namespace) -> int: 

32 from ..db import apply_migrations 

33 from ..migrate import normalize_entities_sweep 

34 from ..settings import settings 

35 

36 db_path: str = args.db or settings.db_path 

37 apply_migrations(db_path=db_path) 

38 

39 registered, already_present = normalize_entities_sweep(db_path, dry_run=args.dry_run) 

40 

41 if args.dry_run: 

42 print( 

43 f"[dry-run] {registered} aliases would be registered", 

44 file=sys.stderr, 

45 ) 

46 else: 

47 print( 

48 f"{registered} aliases registered, {already_present} already present", 

49 file=sys.stderr, 

50 ) 

51 return 0