Coverage for node / src / stigmem_node / lifecycle / immutability.py: 100%

19 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""ADR-016 L1 append-only journal and projection helpers.""" 

2 

3from __future__ import annotations 

4 

5import json 

6import uuid 

7from datetime import UTC, datetime 

8from typing import Any 

9 

10 

11def utc_now_iso() -> str: 

12 return datetime.now(UTC).replace(microsecond=0).isoformat() 

13 

14 

15def write_fact_journal( 

16 conn: Any, 

17 *, 

18 fact_id: str, 

19 event_type: str, 

20 tenant_id: str, 

21 actor_uri: str | None, 

22 source: str | None, 

23 scope: str | None, 

24 cid: str | None, 

25 body: dict[str, Any], 

26) -> None: 

27 """Append one fact event to the immutable L1 journal.""" 

28 conn.execute( 

29 "INSERT INTO fact_journal " 

30 "(id, fact_id, event_type, event_ts, tenant_id, actor_uri, source, scope, cid, body_json) " 

31 "VALUES (?,?,?,?,?,?,?,?,?,?)", 

32 ( 

33 str(uuid.uuid4()), 

34 fact_id, 

35 event_type, 

36 utc_now_iso(), 

37 tenant_id, 

38 actor_uri, 

39 source, 

40 scope, 

41 cid, 

42 json.dumps(body, sort_keys=True, separators=(",", ":")), 

43 ), 

44 ) 

45 

46 

47def set_embedding_status( 

48 conn: Any, 

49 *, 

50 fact_id: str, 

51 embedding_missing: bool, 

52 updated_by: str | None = None, 

53 last_error: str | None = None, 

54) -> None: 

55 """Upsert embedding status in the projection table, not on ``facts``.""" 

56 conn.execute( 

57 "INSERT INTO fact_embedding_status " 

58 "(fact_id, embedding_missing, updated_at, last_error, updated_by) " 

59 "VALUES (?,?,?,?,?) " 

60 "ON CONFLICT(fact_id) DO UPDATE SET " 

61 "embedding_missing = excluded.embedding_missing, " 

62 "updated_at = excluded.updated_at, " 

63 "last_error = excluded.last_error, " 

64 "updated_by = excluded.updated_by", 

65 ( 

66 fact_id, 

67 1 if embedding_missing else 0, 

68 utc_now_iso(), 

69 last_error, 

70 updated_by, 

71 ), 

72 ) 

73 

74 

75def set_fact_validity_override( 

76 conn: Any, 

77 *, 

78 fact_id: str, 

79 valid_until: str | None = None, 

80 confidence: float | None = None, 

81 reason: str | None = None, 

82 updated_by: str | None = None, 

83) -> None: 

84 """Upsert derived validity/confidence state outside the base fact row.""" 

85 conn.execute( 

86 "INSERT INTO fact_validity_overrides " 

87 "(fact_id, valid_until, confidence, reason, updated_at, updated_by) " 

88 "VALUES (?,?,?,?,?,?) " 

89 "ON CONFLICT(fact_id) DO UPDATE SET " 

90 "valid_until = excluded.valid_until, " 

91 "confidence = excluded.confidence, " 

92 "reason = excluded.reason, " 

93 "updated_at = excluded.updated_at, " 

94 "updated_by = excluded.updated_by", 

95 (fact_id, valid_until, confidence, reason, utc_now_iso(), updated_by), 

96 ) 

97 

98 

99def set_fact_quarantine_status( 

100 conn: Any, 

101 *, 

102 fact_id: str, 

103 quarantine_garden_id: str | None, 

104 quarantine_status: str | None, 

105 quarantine_reason: str | None = None, 

106 quarantine_acted_by: str | None = None, 

107 quarantine_acted_at: str | None = None, 

108) -> None: 

109 """Upsert quarantine workflow state outside the base fact row.""" 

110 conn.execute( 

111 "INSERT INTO fact_quarantine_status " 

112 "(fact_id, quarantine_garden_id, quarantine_status, quarantine_reason, " 

113 " quarantine_acted_by, quarantine_acted_at, updated_at) " 

114 "VALUES (?,?,?,?,?,?,?) " 

115 "ON CONFLICT(fact_id) DO UPDATE SET " 

116 "quarantine_garden_id = excluded.quarantine_garden_id, " 

117 "quarantine_status = excluded.quarantine_status, " 

118 "quarantine_reason = excluded.quarantine_reason, " 

119 "quarantine_acted_by = excluded.quarantine_acted_by, " 

120 "quarantine_acted_at = excluded.quarantine_acted_at, " 

121 "updated_at = excluded.updated_at", 

122 ( 

123 fact_id, 

124 quarantine_garden_id, 

125 quarantine_status, 

126 quarantine_reason, 

127 quarantine_acted_by, 

128 quarantine_acted_at, 

129 utc_now_iso(), 

130 ), 

131 ) 

132 

133 

134def set_fact_garden_membership( 

135 conn: Any, 

136 *, 

137 fact_id: str, 

138 garden_id: str | None, 

139 updated_by: str | None = None, 

140) -> None: 

141 """Upsert derived garden membership outside the base fact row.""" 

142 conn.execute( 

143 "INSERT INTO fact_garden_membership " 

144 "(fact_id, garden_id, updated_at, updated_by) " 

145 "VALUES (?,?,?,?) " 

146 "ON CONFLICT(fact_id) DO UPDATE SET " 

147 "garden_id = excluded.garden_id, " 

148 "updated_at = excluded.updated_at, " 

149 "updated_by = excluded.updated_by", 

150 (fact_id, garden_id, utc_now_iso(), updated_by), 

151 ) 

152 

153 

154def set_fact_cid_backfill_status( 

155 conn: Any, 

156 *, 

157 fact_id: str, 

158 status: str, 

159 error: str | None = None, 

160) -> None: 

161 """Record CID backfill progress without mutating ``facts.cid``.""" 

162 conn.execute( 

163 "INSERT INTO fact_cid_backfill " 

164 "(fact_id, status, attempted_at, error, updated_at) " 

165 "VALUES (?,?,?,?,?) " 

166 "ON CONFLICT(fact_id) DO UPDATE SET " 

167 "status = excluded.status, " 

168 "attempted_at = excluded.attempted_at, " 

169 "error = excluded.error, " 

170 "updated_at = excluded.updated_at", 

171 (fact_id, status, utc_now_iso(), error, utc_now_iso()), 

172 )