Coverage for node / src / stigmem_node / federation / federation_pull.py: 30%

153 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Pull replication background task (spec §6.3). 

2 

3The pull loop runs as an asyncio task in the app lifespan. 

4It is also callable directly for testing. 

5""" 

6 

7from __future__ import annotations 

8 

9import asyncio 

10import json 

11import logging 

12import random 

13from datetime import UTC, datetime 

14from typing import Any 

15 

16import httpx 

17 

18from ..db import db 

19from ..observability.metrics import FEDERATION_INGRESS, REPLICATION_LAG 

20from ..settings import settings 

21from .federation_ingest import FederationIntegrityError, ingest_fact, write_audit_log 

22from .peer_token import create_peer_token 

23from .tls import check_peer_san 

24 

25logger = logging.getLogger("stigmem.federation.pull") 

26 

27_MAX_BACKOFF_S = 300.0 # 5 minutes 

28_BASE_BACKOFF_S = 1.0 

29 

30 

31def _jitter(base: float) -> float: 

32 return base * (1 + random.uniform(-0.2, 0.2)) # noqa: S311 # nosec B311 — retry jitter, not crypto 

33 

34 

35def load_cursor(peer_id: str) -> str | None: 

36 with db() as conn: 

37 row = conn.execute( 

38 "SELECT cursor FROM replication_cursors WHERE peer_id = ? AND direction = 'inbound'", 

39 (peer_id,), 

40 ).fetchone() 

41 return row["cursor"] if row else None 

42 

43 

44def save_cursor(peer_id: str, cursor: str | None) -> None: 

45 with db() as conn: 

46 conn.execute( 

47 """INSERT INTO replication_cursors (peer_id, direction, cursor, updated_at) 

48 VALUES (?,?,?,?) 

49 ON CONFLICT(peer_id, direction) 

50 DO UPDATE SET cursor = excluded.cursor, updated_at = excluded.updated_at""", 

51 (peer_id, "inbound", cursor, datetime.now(UTC).isoformat()), 

52 ) 

53 

54 

55async def pull_from_peer_once( 

56 peer: dict[str, Any], 

57 client: httpx.AsyncClient, 

58 cursor: str | None, 

59) -> str | None: 

60 """Pull one page of facts from the peer. Returns the new cursor (or same if no more).""" 

61 allowed_scopes: list[str] = json.loads(peer["allowed_scopes"]) 

62 token = create_peer_token(peer["node_id"], allowed_scopes) 

63 

64 params: dict[str, Any] = {"limit": 100} 

65 if cursor: 65 ↛ 68line 65 didn't jump to line 68 because the condition on line 65 was always true

66 params["cursor"] = cursor 

67 

68 backoff = _BASE_BACKOFF_S 

69 while True: 

70 try: 

71 resp = await client.get( 

72 f"{peer['node_url']}/v1/federation/facts", 

73 params=params, 

74 headers={"Authorization": f"Bearer {token}", "Stigmem-Verify": "full"}, 

75 timeout=30.0, 

76 ) 

77 except httpx.RequestError as exc: 

78 logger.warning("Pull network error from %s: %s", peer["node_id"], exc) 

79 return cursor # retain old cursor; will retry next cycle 

80 

81 if resp.status_code == 429: 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true

82 backoff = min(backoff * 2, _MAX_BACKOFF_S) 

83 delay = _jitter(backoff) 

84 logger.info("429 from %s — backing off %.1fs", peer["node_id"], delay) 

85 await asyncio.sleep(delay) 

86 token = create_peer_token(peer["node_id"], allowed_scopes) # refresh token after sleep 

87 continue 

88 

89 if resp.status_code != 200: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true

90 logger.warning("Pull from %s returned %s", peer["node_id"], resp.status_code) 

91 return cursor 

92 

93 # §22.1.2.4 — validate server cert URI SAN before consuming any data. 

94 if settings.mtls_enabled: 94 ↛ 116line 94 didn't jump to line 116 because the condition on line 94 was always true

95 ssl_obj = resp.extensions.get("ssl_object") 

96 peer_cert: dict[str, Any] = ssl_obj.getpeercert() if ssl_obj is not None else {} 

97 if peer_cert and not check_peer_san(peer_cert, peer["node_id"]): 97 ↛ 109line 97 didn't jump to line 109 because the condition on line 97 was always true

98 logger.warning( 

99 "Client-side SAN mismatch from peer %s — cert URI SAN does not " 

100 "match node_id; discarding response", 

101 peer["node_id"], 

102 ) 

103 write_audit_log( 

104 peer["node_id"], 

105 "san_mismatch", 

106 {"peer_node_id": peer["node_id"], "direction": "pull"}, 

107 ) 

108 return cursor # fail-closed: no data ingested from identity-mismatched peer 

109 if not peer_cert: 

110 logger.warning( 

111 "mTLS peer certificate from %s was not exposed by httpx; " 

112 "falling back to TLS-layer certificate verification", 

113 peer["node_id"], 

114 ) 

115 

116 data = resp.json() 

117 # origin_allowed_scopes = peer's registered declaration scope (spec §6.8.1). 

118 # These fields are internal and MUST NOT be re-replicated (§3.1), so we 

119 # derive them from the peer registry rather than reading from the fact payload. 

120 ingested = 0 

121 for fact in data.get("facts", []): 

122 try: 

123 ingest_fact( 

124 fact, 

125 peer["node_id"], 

126 origin_allowed_scopes=allowed_scopes, 

127 ) 

128 except FederationIntegrityError as exc: 

129 logger.warning( 

130 "Rejected federated fact %s from %s: %s", 

131 exc.fact_id, 

132 peer["node_id"], 

133 exc.reason, 

134 ) 

135 write_audit_log( 

136 peer["node_id"], 

137 "federation_integrity_rejected", 

138 { 

139 "fact_id": exc.fact_id, 

140 "reason": exc.reason, 

141 "stored_cid": exc.stored_cid, 

142 "computed_cid": exc.computed_cid, 

143 }, 

144 ) 

145 continue 

146 ingested += 1 

147 

148 if ingested: 

149 FEDERATION_INGRESS.labels(peer_id=peer["node_id"], status="ok").inc(ingested) 

150 

151 new_cursor: str | None = data.get("cursor") 

152 

153 # Replication-lag gauge: difference between now and the cursor HLC timestamp. 

154 # The HLC is an ISO timestamp string; if parsing fails we leave the gauge unchanged. 

155 try: 

156 if new_cursor: 

157 from datetime import UTC, datetime 

158 

159 cursor_ts = datetime.fromisoformat(new_cursor.split("_")[0].replace("Z", "+00:00")) 

160 if cursor_ts.tzinfo is None: 

161 cursor_ts = cursor_ts.replace(tzinfo=UTC) 

162 lag_s = max(0.0, (datetime.now(UTC) - cursor_ts).total_seconds()) 

163 REPLICATION_LAG.labels(peer_id=peer["node_id"]).set(lag_s) 

164 except Exception as exc: # noqa: BLE001 # nosec B110 — best-effort lag metric 

165 logger.debug("replication lag metric update failed: %s", exc) 

166 

167 return new_cursor 

168 

169 

170def _make_pull_client() -> httpx.AsyncClient: 

171 """Return an httpx client configured for mTLS when STIGMEM_TLS_* are set.""" 

172 if settings.mtls_enabled: 

173 from .tls import build_client_ssl_context 

174 

175 ssl_ctx = build_client_ssl_context( 

176 settings.tls_cert_path, 

177 settings.tls_key_path, 

178 settings.tls_ca_bundle, 

179 ) 

180 return httpx.AsyncClient(verify=ssl_ctx) 

181 return httpx.AsyncClient() 

182 

183 

184async def pull_tombstones_from_peer_once( 

185 peer: dict[str, Any], 

186 client: httpx.AsyncClient, 

187 cursor: str | None, 

188) -> str | None: 

189 """Pull one page of tombstones from the peer (§23.4.3). Returns the new cursor.""" 

190 allowed_scopes: list[str] = json.loads(peer["allowed_scopes"]) 

191 token = create_peer_token(peer["node_id"], allowed_scopes) 

192 

193 params: dict[str, Any] = {"limit": 200} 

194 if cursor: 

195 params["since"] = cursor 

196 

197 try: 

198 resp = await client.get( 

199 f"{peer['node_url']}/v1/federation/tombstones", 

200 params=params, 

201 headers={"Authorization": f"Bearer {token}"}, 

202 timeout=30.0, 

203 ) 

204 except httpx.RequestError as exc: 

205 logger.warning("Tombstone pull network error from %s: %s", peer["node_id"], exc) 

206 return cursor 

207 

208 if resp.status_code != 200: 

209 logger.warning("Tombstone pull from %s returned %s", peer["node_id"], resp.status_code) 

210 return cursor 

211 

212 data = resp.json() 

213 tombstones = data.get("tombstones", []) 

214 new_cursor: str | None = data.get("cursor") 

215 

216 # F-13 §23.4.3: emit tombstone_sync_gap when result set is non-empty and cursor 

217 # indicates skipped pages (more results available beyond this batch) 

218 if tombstones and new_cursor is not None: 

219 from ..observability.audit_event import emit_nofail 

220 

221 emit_nofail( 

222 "tombstone_sync_gap", 

223 entity_uri=peer["node_id"], 

224 tenant_id="default", 

225 source=f"federation_pull:{peer['node_id']}", 

226 detail={ 

227 "peer_node_id": peer["node_id"], 

228 "tombstones_in_batch": len(tombstones), 

229 "cursor": new_cursor, 

230 }, 

231 ) 

232 

233 # Ingest tombstones and revocations 

234 from ..lifecycle.tombstones import apply_inbound_revocation, apply_inbound_tombstone 

235 from ..models.tombstones import TombstoneRecord, TombstoneRevocationRecord 

236 

237 for t in tombstones: 

238 try: 

239 record = TombstoneRecord(**t) 

240 apply_inbound_tombstone(record) 

241 except Exception as exc: 

242 logger.warning("Tombstone ingest from %s failed: %s", peer["node_id"], exc) 

243 

244 for r in data.get("revocations", []): 

245 try: 

246 rev = TombstoneRevocationRecord(**r) 

247 apply_inbound_revocation(rev) 

248 except Exception as exc: 

249 logger.warning("Tombstone revocation ingest from %s failed: %s", peer["node_id"], exc) 

250 

251 return new_cursor 

252 

253 

254def _load_tombstone_cursor(peer_id: str) -> str | None: 

255 with db() as conn: 

256 row = conn.execute( 

257 "SELECT cursor FROM replication_cursors" 

258 " WHERE peer_id = ? AND direction = 'tombstone_inbound'", 

259 (peer_id,), 

260 ).fetchone() 

261 return row["cursor"] if row else None 

262 

263 

264def _save_tombstone_cursor(peer_id: str, cursor: str | None) -> None: 

265 with db() as conn: 

266 conn.execute( 

267 """INSERT INTO replication_cursors (peer_id, direction, cursor, updated_at) 

268 VALUES (?,?,?,?) 

269 ON CONFLICT(peer_id, direction) 

270 DO UPDATE SET cursor = excluded.cursor, updated_at = excluded.updated_at""", 

271 (peer_id, "tombstone_inbound", cursor, datetime.now(UTC).isoformat()), 

272 ) 

273 

274 

275async def pull_all_peers_once() -> None: 

276 """Pull one batch from every active peer. Called by the loop and by tests.""" 

277 with db() as conn: 

278 peers = conn.execute( 

279 "SELECT id, node_id, node_url, allowed_scopes FROM peers WHERE status = 'active'" 

280 ).fetchall() 

281 

282 if not peers: 

283 return 

284 

285 async with _make_pull_client() as client: 

286 for peer in peers: 

287 peer_dict = dict(peer) 

288 cursor = load_cursor(peer_dict["id"]) 

289 new_cursor = await pull_from_peer_once(peer_dict, client, cursor) 

290 if new_cursor != cursor: 

291 save_cursor(peer_dict["id"], new_cursor) 

292 

293 # §23.4.3: pull tombstones from peers 

294 tomb_cursor = _load_tombstone_cursor(peer_dict["id"]) 

295 new_tomb_cursor = await pull_tombstones_from_peer_once(peer_dict, client, tomb_cursor) 

296 if new_tomb_cursor != tomb_cursor: 

297 _save_tombstone_cursor(peer_dict["id"], new_tomb_cursor) 

298 

299 

300async def pull_loop_task() -> None: 

301 """Background asyncio task: pull from all active peers every pull_interval_s.""" 

302 while True: 

303 await asyncio.sleep(settings.federation_pull_interval_s) 

304 try: 

305 await pull_all_peers_once() 

306 except Exception: 

307 logger.exception("Unexpected error in pull loop")