Coverage for node / src / stigmem_node / recall / graph_index.py: 80%
13 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Graph adjacency index maintenance — spec §20.1.
3Called from:
4 - routes/facts.py on every ref-typed fact insert
5 - decay.py when facts expire (valid_until propagation)
6 - routes/facts.py contradiction path (confidence = 0.0 soft-delete)
7"""
9from __future__ import annotations
11import time
12from typing import Any
15def upsert_edge(
16 conn: Any,
17 fact_id: str,
18 subject: str,
19 relation: str,
20 object_uri: str,
21 scope: str,
22 confidence: float,
23 garden_id: str | None,
24 tenant_id: str,
25 received_from: str | None,
26 source_trust: float | None = None,
27 valid_until: str | None = None,
28) -> None:
29 """Insert or replace the entity_edges row for a ref-typed fact.
31 On conflict (same fact_id), updates mutable fields so re-assertions stay
32 in sync. Called inside the same transaction as the facts INSERT.
33 """
34 now_ms = int(time.time() * 1000)
35 conn.execute(
36 """INSERT INTO entity_edges
37 (id, subject, relation, object, scope, garden_id, tenant_id,
38 received_from, valid_until, confidence, source_trust,
39 decay_epoch, created_at)
40 VALUES (?,?,?,?,?,?,?,?,?,?,?,NULL,?)
41 ON CONFLICT(id) DO UPDATE SET
42 confidence = excluded.confidence,
43 source_trust = excluded.source_trust,
44 valid_until = excluded.valid_until,
45 decay_epoch = excluded.decay_epoch""",
46 (
47 fact_id,
48 subject,
49 relation,
50 object_uri,
51 scope,
52 garden_id,
53 tenant_id,
54 received_from,
55 valid_until,
56 confidence,
57 source_trust,
58 now_ms,
59 ),
60 )
63def sync_edge_confidence(
64 conn: Any,
65 fact_id: str,
66 confidence: float,
67 decay_epoch_ms: int,
68) -> None:
69 """Propagate a retraction (confidence=0.0) to entity_edges (§20.1.3)."""
70 conn.execute(
71 "UPDATE entity_edges SET confidence = ?, decay_epoch = ? WHERE id = ?",
72 (confidence, decay_epoch_ms, fact_id),
73 )
76def sync_edge_expiry(
77 conn: Any,
78 fact_ids: list[str],
79 valid_until: str,
80) -> None:
81 """Propagate TTL/confidence-decay expiry to entity_edges (§20.1.2)."""
82 if not fact_ids: 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true
83 return
84 now_ms = int(time.time() * 1000)
85 conn.executemany(
86 "UPDATE entity_edges SET valid_until = ?, decay_epoch = ? WHERE id = ?",
87 [(valid_until, now_ms, fact_id) for fact_id in fact_ids],
88 )