Coverage for node / src / stigmem_node / federation / federation_pull.py: 30%
153 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Pull replication background task (spec §6.3).
3The pull loop runs as an asyncio task in the app lifespan.
4It is also callable directly for testing.
5"""
7from __future__ import annotations
9import asyncio
10import json
11import logging
12import random
13from datetime import UTC, datetime
14from typing import Any
16import httpx
18from ..db import db
19from ..observability.metrics import FEDERATION_INGRESS, REPLICATION_LAG
20from ..settings import settings
21from .federation_ingest import FederationIntegrityError, ingest_fact, write_audit_log
22from .peer_token import create_peer_token
23from .tls import check_peer_san
25logger = logging.getLogger("stigmem.federation.pull")
27_MAX_BACKOFF_S = 300.0 # 5 minutes
28_BASE_BACKOFF_S = 1.0
31def _jitter(base: float) -> float:
32 return base * (1 + random.uniform(-0.2, 0.2)) # noqa: S311 # nosec B311 — retry jitter, not crypto
35def load_cursor(peer_id: str) -> str | None:
36 with db() as conn:
37 row = conn.execute(
38 "SELECT cursor FROM replication_cursors WHERE peer_id = ? AND direction = 'inbound'",
39 (peer_id,),
40 ).fetchone()
41 return row["cursor"] if row else None
44def save_cursor(peer_id: str, cursor: str | None) -> None:
45 with db() as conn:
46 conn.execute(
47 """INSERT INTO replication_cursors (peer_id, direction, cursor, updated_at)
48 VALUES (?,?,?,?)
49 ON CONFLICT(peer_id, direction)
50 DO UPDATE SET cursor = excluded.cursor, updated_at = excluded.updated_at""",
51 (peer_id, "inbound", cursor, datetime.now(UTC).isoformat()),
52 )
55async def pull_from_peer_once(
56 peer: dict[str, Any],
57 client: httpx.AsyncClient,
58 cursor: str | None,
59) -> str | None:
60 """Pull one page of facts from the peer. Returns the new cursor (or same if no more)."""
61 allowed_scopes: list[str] = json.loads(peer["allowed_scopes"])
62 token = create_peer_token(peer["node_id"], allowed_scopes)
64 params: dict[str, Any] = {"limit": 100}
65 if cursor: 65 ↛ 68line 65 didn't jump to line 68 because the condition on line 65 was always true
66 params["cursor"] = cursor
68 backoff = _BASE_BACKOFF_S
69 while True:
70 try:
71 resp = await client.get(
72 f"{peer['node_url']}/v1/federation/facts",
73 params=params,
74 headers={"Authorization": f"Bearer {token}", "Stigmem-Verify": "full"},
75 timeout=30.0,
76 )
77 except httpx.RequestError as exc:
78 logger.warning("Pull network error from %s: %s", peer["node_id"], exc)
79 return cursor # retain old cursor; will retry next cycle
81 if resp.status_code == 429: 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true
82 backoff = min(backoff * 2, _MAX_BACKOFF_S)
83 delay = _jitter(backoff)
84 logger.info("429 from %s — backing off %.1fs", peer["node_id"], delay)
85 await asyncio.sleep(delay)
86 token = create_peer_token(peer["node_id"], allowed_scopes) # refresh token after sleep
87 continue
89 if resp.status_code != 200: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true
90 logger.warning("Pull from %s returned %s", peer["node_id"], resp.status_code)
91 return cursor
93 # §22.1.2.4 — validate server cert URI SAN before consuming any data.
94 if settings.mtls_enabled: 94 ↛ 116line 94 didn't jump to line 116 because the condition on line 94 was always true
95 ssl_obj = resp.extensions.get("ssl_object")
96 peer_cert: dict[str, Any] = ssl_obj.getpeercert() if ssl_obj is not None else {}
97 if peer_cert and not check_peer_san(peer_cert, peer["node_id"]): 97 ↛ 109line 97 didn't jump to line 109 because the condition on line 97 was always true
98 logger.warning(
99 "Client-side SAN mismatch from peer %s — cert URI SAN does not "
100 "match node_id; discarding response",
101 peer["node_id"],
102 )
103 write_audit_log(
104 peer["node_id"],
105 "san_mismatch",
106 {"peer_node_id": peer["node_id"], "direction": "pull"},
107 )
108 return cursor # fail-closed: no data ingested from identity-mismatched peer
109 if not peer_cert:
110 logger.warning(
111 "mTLS peer certificate from %s was not exposed by httpx; "
112 "falling back to TLS-layer certificate verification",
113 peer["node_id"],
114 )
116 data = resp.json()
117 # origin_allowed_scopes = peer's registered declaration scope (spec §6.8.1).
118 # These fields are internal and MUST NOT be re-replicated (§3.1), so we
119 # derive them from the peer registry rather than reading from the fact payload.
120 ingested = 0
121 for fact in data.get("facts", []):
122 try:
123 ingest_fact(
124 fact,
125 peer["node_id"],
126 origin_allowed_scopes=allowed_scopes,
127 )
128 except FederationIntegrityError as exc:
129 logger.warning(
130 "Rejected federated fact %s from %s: %s",
131 exc.fact_id,
132 peer["node_id"],
133 exc.reason,
134 )
135 write_audit_log(
136 peer["node_id"],
137 "federation_integrity_rejected",
138 {
139 "fact_id": exc.fact_id,
140 "reason": exc.reason,
141 "stored_cid": exc.stored_cid,
142 "computed_cid": exc.computed_cid,
143 },
144 )
145 continue
146 ingested += 1
148 if ingested:
149 FEDERATION_INGRESS.labels(peer_id=peer["node_id"], status="ok").inc(ingested)
151 new_cursor: str | None = data.get("cursor")
153 # Replication-lag gauge: difference between now and the cursor HLC timestamp.
154 # The HLC is an ISO timestamp string; if parsing fails we leave the gauge unchanged.
155 try:
156 if new_cursor:
157 from datetime import UTC, datetime
159 cursor_ts = datetime.fromisoformat(new_cursor.split("_")[0].replace("Z", "+00:00"))
160 if cursor_ts.tzinfo is None:
161 cursor_ts = cursor_ts.replace(tzinfo=UTC)
162 lag_s = max(0.0, (datetime.now(UTC) - cursor_ts).total_seconds())
163 REPLICATION_LAG.labels(peer_id=peer["node_id"]).set(lag_s)
164 except Exception as exc: # noqa: BLE001 # nosec B110 — best-effort lag metric
165 logger.debug("replication lag metric update failed: %s", exc)
167 return new_cursor
170def _make_pull_client() -> httpx.AsyncClient:
171 """Return an httpx client configured for mTLS when STIGMEM_TLS_* are set."""
172 if settings.mtls_enabled:
173 from .tls import build_client_ssl_context
175 ssl_ctx = build_client_ssl_context(
176 settings.tls_cert_path,
177 settings.tls_key_path,
178 settings.tls_ca_bundle,
179 )
180 return httpx.AsyncClient(verify=ssl_ctx)
181 return httpx.AsyncClient()
184async def pull_tombstones_from_peer_once(
185 peer: dict[str, Any],
186 client: httpx.AsyncClient,
187 cursor: str | None,
188) -> str | None:
189 """Pull one page of tombstones from the peer (§23.4.3). Returns the new cursor."""
190 allowed_scopes: list[str] = json.loads(peer["allowed_scopes"])
191 token = create_peer_token(peer["node_id"], allowed_scopes)
193 params: dict[str, Any] = {"limit": 200}
194 if cursor:
195 params["since"] = cursor
197 try:
198 resp = await client.get(
199 f"{peer['node_url']}/v1/federation/tombstones",
200 params=params,
201 headers={"Authorization": f"Bearer {token}"},
202 timeout=30.0,
203 )
204 except httpx.RequestError as exc:
205 logger.warning("Tombstone pull network error from %s: %s", peer["node_id"], exc)
206 return cursor
208 if resp.status_code != 200:
209 logger.warning("Tombstone pull from %s returned %s", peer["node_id"], resp.status_code)
210 return cursor
212 data = resp.json()
213 tombstones = data.get("tombstones", [])
214 new_cursor: str | None = data.get("cursor")
216 # F-13 §23.4.3: emit tombstone_sync_gap when result set is non-empty and cursor
217 # indicates skipped pages (more results available beyond this batch)
218 if tombstones and new_cursor is not None:
219 from ..observability.audit_event import emit_nofail
221 emit_nofail(
222 "tombstone_sync_gap",
223 entity_uri=peer["node_id"],
224 tenant_id="default",
225 source=f"federation_pull:{peer['node_id']}",
226 detail={
227 "peer_node_id": peer["node_id"],
228 "tombstones_in_batch": len(tombstones),
229 "cursor": new_cursor,
230 },
231 )
233 # Ingest tombstones and revocations
234 from ..lifecycle.tombstones import apply_inbound_revocation, apply_inbound_tombstone
235 from ..models.tombstones import TombstoneRecord, TombstoneRevocationRecord
237 for t in tombstones:
238 try:
239 record = TombstoneRecord(**t)
240 apply_inbound_tombstone(record)
241 except Exception as exc:
242 logger.warning("Tombstone ingest from %s failed: %s", peer["node_id"], exc)
244 for r in data.get("revocations", []):
245 try:
246 rev = TombstoneRevocationRecord(**r)
247 apply_inbound_revocation(rev)
248 except Exception as exc:
249 logger.warning("Tombstone revocation ingest from %s failed: %s", peer["node_id"], exc)
251 return new_cursor
254def _load_tombstone_cursor(peer_id: str) -> str | None:
255 with db() as conn:
256 row = conn.execute(
257 "SELECT cursor FROM replication_cursors"
258 " WHERE peer_id = ? AND direction = 'tombstone_inbound'",
259 (peer_id,),
260 ).fetchone()
261 return row["cursor"] if row else None
264def _save_tombstone_cursor(peer_id: str, cursor: str | None) -> None:
265 with db() as conn:
266 conn.execute(
267 """INSERT INTO replication_cursors (peer_id, direction, cursor, updated_at)
268 VALUES (?,?,?,?)
269 ON CONFLICT(peer_id, direction)
270 DO UPDATE SET cursor = excluded.cursor, updated_at = excluded.updated_at""",
271 (peer_id, "tombstone_inbound", cursor, datetime.now(UTC).isoformat()),
272 )
275async def pull_all_peers_once() -> None:
276 """Pull one batch from every active peer. Called by the loop and by tests."""
277 with db() as conn:
278 peers = conn.execute(
279 "SELECT id, node_id, node_url, allowed_scopes FROM peers WHERE status = 'active'"
280 ).fetchall()
282 if not peers:
283 return
285 async with _make_pull_client() as client:
286 for peer in peers:
287 peer_dict = dict(peer)
288 cursor = load_cursor(peer_dict["id"])
289 new_cursor = await pull_from_peer_once(peer_dict, client, cursor)
290 if new_cursor != cursor:
291 save_cursor(peer_dict["id"], new_cursor)
293 # §23.4.3: pull tombstones from peers
294 tomb_cursor = _load_tombstone_cursor(peer_dict["id"])
295 new_tomb_cursor = await pull_tombstones_from_peer_once(peer_dict, client, tomb_cursor)
296 if new_tomb_cursor != tomb_cursor:
297 _save_tombstone_cursor(peer_dict["id"], new_tomb_cursor)
300async def pull_loop_task() -> None:
301 """Background asyncio task: pull from all active peers every pull_interval_s."""
302 while True:
303 await asyncio.sleep(settings.federation_pull_interval_s)
304 try:
305 await pull_all_peers_once()
306 except Exception:
307 logger.exception("Unexpected error in pull loop")