Coverage for node / src / stigmem_node / lifecycle / decay.py: 95%

57 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Configurable decay sweeper — marks stale facts as expired (Phase 6).""" 

2 

3from __future__ import annotations 

4 

5import uuid 

6from datetime import UTC, datetime, timedelta 

7from typing import Any 

8 

9from ..db import db 

10from ..settings import settings 

11from .immutability import set_fact_validity_override 

12 

13 

14def _resolve_effective_ttl(ttl_seconds: int | None) -> int | None: 

15 """Resolve the effective TTL: explicit arg overrides settings; 0 means "expire all".""" 

16 if ttl_seconds is not None: 

17 return ttl_seconds # 0 is valid: expire everything 

18 if settings.decay_ttl_seconds > 0: 18 ↛ 19line 18 didn't jump to line 19 because the condition on line 18 was never true

19 return settings.decay_ttl_seconds 

20 return None 

21 

22 

23def _resolve_effective_min_conf(min_confidence: float | None) -> float | None: 

24 """Resolve the effective confidence floor: explicit arg overrides settings.""" 

25 if min_confidence is not None and min_confidence > 0.0: 

26 return min_confidence 

27 if settings.decay_min_confidence > 0.0: 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true

28 return settings.decay_min_confidence 

29 return None 

30 

31 

32def _select_ttl_candidates( 

33 conn: Any, effective_ttl: int, scope: str | None, now_dt: datetime 

34) -> list[str]: 

35 """Return fact ids whose timestamp is older than (now - effective_ttl).""" 

36 cutoff = (now_dt - timedelta(seconds=effective_ttl)).isoformat() 

37 sql = ( 

38 "SELECT f.id FROM facts f " 

39 "LEFT JOIN fact_validity_overrides fvo ON fvo.fact_id = f.id " 

40 "WHERE f.timestamp <= ? " 

41 "AND COALESCE(fvo.valid_until, f.valid_until) IS NULL " 

42 "AND NOT (entity LIKE 'stigmem:%' AND entity NOT LIKE 'stigmem://%') " 

43 "AND NOT (relation LIKE 'stigmem:%' AND relation NOT LIKE 'stigmem://%')" 

44 ) 

45 params: list[Any] = [cutoff] 

46 if scope: 

47 sql += " AND scope = ?" 

48 params.append(scope) 

49 return [r["id"] for r in conn.execute(sql, params).fetchall()] 

50 

51 

52def _select_confidence_candidates( 

53 conn: Any, effective_min_conf: float, scope: str | None, now: str 

54) -> list[str]: 

55 """Return active fact ids whose confidence is below the floor.""" 

56 sql = ( 

57 "SELECT f.id FROM facts f " 

58 "LEFT JOIN fact_validity_overrides fvo ON fvo.fact_id = f.id " 

59 "WHERE COALESCE(fvo.confidence, f.confidence) < ? " 

60 "AND COALESCE(fvo.confidence, f.confidence) > 0.0 " 

61 "AND (COALESCE(fvo.valid_until, f.valid_until) IS NULL " 

62 "OR COALESCE(fvo.valid_until, f.valid_until) > ?) " 

63 "AND NOT (entity LIKE 'stigmem:%' AND entity NOT LIKE 'stigmem://%') " 

64 "AND NOT (relation LIKE 'stigmem:%' AND relation NOT LIKE 'stigmem://%')" 

65 ) 

66 params: list[Any] = [effective_min_conf, now] 

67 if scope: 

68 sql += " AND scope = ?" 

69 params.append(scope) 

70 return [r["id"] for r in conn.execute(sql, params).fetchall()] 

71 

72 

73def _apply_decay(conn: Any, candidates: list[str], conf_ids: list[str], now: str) -> None: 

74 """Persist decay: mark candidates expired, log confidence retractions, sync graph.""" 

75 for fact_id in candidates: 

76 set_fact_validity_override( 

77 conn, 

78 fact_id=fact_id, 

79 valid_until=now, 

80 reason="decay_sweep", 

81 updated_by="stigmem:system:decay", 

82 ) 

83 # Append-only retraction log for confidence-floor drops (§24.2.1 c.3) 

84 if conf_ids: 

85 conn.executemany( 

86 "INSERT INTO fact_retractions (id, fact_id, retracted_at, retracted_by) " 

87 "VALUES (?,?,?,?)", 

88 [(str(uuid.uuid4()), fid, now, "stigmem:system:decay") for fid in conf_ids], 

89 ) 

90 # Graph adjacency index (§20.1.2): propagate expiry to entity_edges 

91 from ..recall.graph_index import sync_edge_expiry 

92 

93 sync_edge_expiry(conn, candidates, now) 

94 

95 

96def run_decay_sweep( 

97 ttl_seconds: int | None = None, 

98 min_confidence: float | None = None, 

99 scope: str | None = None, 

100 dry_run: bool = False, 

101) -> dict[str, Any]: 

102 """Mark stale facts as expired by setting valid_until to now. 

103 

104 - TTL decay: non-expiring facts whose timestamp is at or before (now - ttl_seconds). 

105 Passing ttl_seconds=0 expires all non-expiring facts regardless of age. 

106 - Confidence decay: active facts whose confidence is below min_confidence. 

107 - System facts (stigmem: entity/relation, not stigmem://) are never decayed. 

108 - dry_run=True returns counts without writing. 

109 

110 Returns {"scanned": N, "decayed": M, "dry_run": bool}. 

111 """ 

112 now_dt = datetime.now(UTC) 

113 now = now_dt.isoformat() 

114 

115 # Explicit args override settings defaults; settings=0/0.0 means "disabled" 

116 effective_ttl = _resolve_effective_ttl(ttl_seconds) 

117 effective_min_conf = _resolve_effective_min_conf(min_confidence) 

118 

119 ttl_ids: list[str] = [] 

120 conf_ids: list[str] = [] 

121 

122 with db() as conn: 

123 if effective_ttl is not None: 

124 ttl_ids = _select_ttl_candidates(conn, effective_ttl, scope, now_dt) 

125 

126 if effective_min_conf is not None: 

127 conf_ids = _select_confidence_candidates(conn, effective_min_conf, scope, now) 

128 

129 candidates = list({*ttl_ids, *conf_ids}) 

130 

131 if not dry_run and candidates: 

132 _apply_decay(conn, candidates, conf_ids, now) 

133 

134 return { 

135 "scanned": len(candidates), 

136 "decayed": len(candidates) if not dry_run else 0, 

137 "dry_run": dry_run, 

138 }