Coverage for node / src / stigmem_node / migrate.py: 95%

39 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Entity alias population sweep — spec §2.6.6 migration guide.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6import sqlite3 

7from datetime import UTC, datetime 

8 

9from .utility.entity_normalizer import NormalizationError, normalize_entity_uri 

10 

11logger = logging.getLogger(__name__) 

12 

13 

14def normalize_entities_sweep( 

15 db_path: str, 

16 *, 

17 dry_run: bool = False, 

18) -> tuple[int, int]: 

19 """Scan facts for non-canonical entity/source URIs and populate entity_aliases. 

20 

21 Collects every distinct entity and source URI from the facts table, normalizes 

22 each via normalize_entity_uri, and inserts raw→canonical rows into entity_aliases 

23 for any URI that differs from its canonical form. 

24 

25 Safe to re-run: uses INSERT OR IGNORE so existing rows are skipped. 

26 

27 Returns: 

28 (registered, already_present) — counts of newly inserted vs skipped aliases. 

29 When dry_run=True, prints the would-be pairs to stdout and returns 

30 (would_register, 0) without touching the database. 

31 """ 

32 conn = sqlite3.connect(db_path) 

33 conn.row_factory = sqlite3.Row 

34 try: 

35 raw_uris: set[str] = set() 

36 for row in conn.execute("SELECT DISTINCT entity FROM facts"): 

37 raw_uris.add(row[0]) 

38 for row in conn.execute("SELECT DISTINCT source FROM facts"): 

39 raw_uris.add(row[0]) 

40 

41 pairs: list[tuple[str, str]] = [] 

42 for raw in sorted(raw_uris): 

43 try: 

44 canonical = normalize_entity_uri(raw) 

45 except NormalizationError as exc: 

46 logger.warning("skipping non-normalizable entity/source URI %r: %s", raw, exc) 

47 continue 

48 if raw != canonical: 

49 pairs.append((raw, canonical)) 

50 

51 if dry_run: 

52 for raw, canonical in pairs: 

53 print(f"{raw!r} → {canonical!r}") 

54 return len(pairs), 0 

55 

56 now = datetime.now(UTC).isoformat() 

57 registered = 0 

58 already_present = 0 

59 for raw, canonical in pairs: 

60 cur = conn.execute( 

61 "INSERT OR IGNORE INTO entity_aliases (raw_uri, canonical_uri, created_at)" 

62 " VALUES (?, ?, ?)", 

63 (raw, canonical, now), 

64 ) 

65 if cur.rowcount > 0: 

66 registered += 1 

67 else: 

68 already_present += 1 

69 

70 conn.commit() 

71 return registered, already_present 

72 finally: 

73 conn.close()