Coverage for node / src / stigmem_node / main.py: 82%

213 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Stigmem reference node — FastAPI application factory and entrypoint.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import logging 

7import signal 

8import ssl 

9from collections.abc import AsyncGenerator, Awaitable, Callable 

10from contextlib import asynccontextmanager, suppress 

11from pathlib import Path 

12from typing import Annotated, Any, cast 

13from urllib.parse import urlparse 

14 

15import uvicorn 

16from fastapi import Depends, FastAPI, Request, Response 

17from fastapi.middleware.cors import CORSMiddleware 

18from fastapi.responses import FileResponse, JSONResponse 

19 

20from .auth import Identity, resolve_identity 

21from .db import apply_migrations 

22from .rate_limit import RateLimitMiddleware 

23from .routes.admin_audit import router as admin_audit_router 

24from .routes.agent_keys import router as agent_keys_router 

25from .routes.aliases import router as aliases_router 

26from .routes.audit import router as audit_router 

27from .routes.auth import router as auth_router 

28from .routes.cards import router as cards_router 

29from .routes.cid_admin import router as cid_admin_router 

30from .routes.decay import router as decay_router 

31from .routes.facts import router as facts_router 

32from .routes.federation import router as federation_router 

33from .routes.gardens import router as gardens_router 

34from .routes.graph import router as graph_router 

35from .routes.identity import router as identity_router 

36from .routes.intents import router as intents_router 

37from .routes.lint import router as lint_router 

38from .routes.mcp import router as mcp_router 

39from .routes.quarantine import router as quarantine_router 

40from .routes.recall import router as recall_router 

41from .routes.resolver import router as resolver_router 

42from .routes.subscriptions import router as subscriptions_router 

43from .routes.synthesize import router as synthesize_router 

44from .routes.wellknown import router as wellknown_router 

45from .settings import settings 

46 

47_STATIC_DIR = Path(__file__).parent / "static" 

48 

49logger = logging.getLogger("stigmem") 

50 

51_DEV_LOCALHOST_CORS_REGEX = r"^https?://(localhost|127\.0\.0\.1)(:\d+)?$" 

52 

53 

54def _enforce_federation_transport_security() -> None: 

55 """Require explicit opt-in for federation without mTLS.""" 

56 federation_active = settings.federation_enabled or settings.federation_push_enabled 

57 if not federation_active or settings.mtls_enabled: 

58 return 

59 

60 if not settings.federation_insecure: 

61 raise RuntimeError( 

62 "Federation requires mTLS by default. Configure STIGMEM_TLS_CERT_PATH, " 

63 "STIGMEM_TLS_KEY_PATH, and STIGMEM_TLS_CA_BUNDLE, or set " 

64 "STIGMEM_FEDERATION_INSECURE=1 only for local/dev/test federation." 

65 ) 

66 

67 if not _node_url_is_loopback(settings.node_url) and not ( 

68 settings.local_dev_allow_insecure_non_loopback 

69 ): 

70 raise RuntimeError( 

71 "STIGMEM_FEDERATION_INSECURE=1 is only permitted when node_url is " 

72 f"bound to 127.0.0.1 or localhost. Got node_url={settings.node_url!r}. " 

73 "Configure mTLS for any non-loopback deployment, or set " 

74 "STIGMEM_LOCAL_DEV_ALLOW_INSECURE_NON_LOOPBACK=1 only for local " 

75 "Docker/dev networks." 

76 ) 

77 

78 logger.warning( 

79 "SECURITY WARNING: federation is running without mTLS because " 

80 "STIGMEM_FEDERATION_INSECURE=1 is set. This is only allowed because " 

81 "node_url is a loopback address or " 

82 "STIGMEM_LOCAL_DEV_ALLOW_INSECURE_NON_LOOPBACK=1 is set. Use this only " 

83 "for local/dev/test." 

84 ) 

85 

86 

87def _enforce_auth_required_in_production() -> None: 

88 """Refuse to run unauthenticated outside loopback.""" 

89 if settings.auth_required: 

90 return 

91 if not _node_url_is_loopback(settings.node_url) and not ( 

92 settings.local_dev_allow_insecure_non_loopback 

93 ): 

94 raise RuntimeError( 

95 "STIGMEM_AUTH_REQUIRED=false is only permitted when node_url is " 

96 f"bound to 127.0.0.1 or localhost. Got node_url={settings.node_url!r}. " 

97 "Anonymous identity has read/write/federate permissions; never expose " 

98 "this configuration to a network. Set " 

99 "STIGMEM_LOCAL_DEV_ALLOW_INSECURE_NON_LOOPBACK=1 only for local " 

100 "Docker/dev networks." 

101 ) 

102 logger.warning( 

103 "SECURITY WARNING: STIGMEM_AUTH_REQUIRED=false. Anonymous identity has " 

104 "full read/write/federate permissions. This is only allowed because " 

105 "node_url is a loopback address or " 

106 "STIGMEM_LOCAL_DEV_ALLOW_INSECURE_NON_LOOPBACK=1 is set." 

107 ) 

108 

109 

110def _enforce_rate_limit_kill_switch_ack() -> None: 

111 """Refuse boot when quota is fully disabled without an explicit acknowledgement.""" 

112 if settings.rate_limit_write_per_hour != 0 or settings.rate_limit_read_per_hour != 0: 

113 return 

114 if not settings.rate_limit_disabled_ack: 

115 raise RuntimeError( 

116 "STIGMEM_RATE_LIMIT_WRITE_PER_HOUR=0 and " 

117 "STIGMEM_RATE_LIMIT_READ_PER_HOUR=0 fully disable quota enforcement. " 

118 "To proceed, set STIGMEM_RATE_LIMIT_DISABLED_ACK=1 to acknowledge " 

119 "that this node accepts unbounded read and write traffic." 

120 ) 

121 

122 logger.warning( 

123 "SECURITY WARNING: quota enforcement is fully disabled " 

124 "(write=0, read=0) with explicit operator acknowledgment via " 

125 "STIGMEM_RATE_LIMIT_DISABLED_ACK=1." 

126 ) 

127 

128 

129def _warn_if_cors_dev_localhost_enabled() -> None: 

130 """Log the expanded development CORS posture at startup.""" 

131 if settings.cors_dev_localhost: 

132 logger.warning( 

133 "SECURITY WARNING: STIGMEM_CORS_DEV_LOCALHOST=1 enables browser " 

134 "access from localhost and loopback origins. Use this only for " 

135 "local development." 

136 ) 

137 

138 

139def _node_url_is_loopback(node_url: str) -> bool: 

140 """Return True iff node_url's host is a loopback address.""" 

141 try: 

142 parsed = urlparse(node_url) 

143 host = (parsed.hostname or "").lower() 

144 except ValueError: 

145 return False 

146 return host in {"localhost", "127.0.0.1", "::1"} 

147 

148 

149def create_app() -> FastAPI: 

150 @asynccontextmanager 

151 async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: 

152 from .plugins import get_registry, register_discovered_plugins 

153 

154 if settings.trust_mode == "strict" and not settings.node_private_key: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true

155 raise RuntimeError("STIGMEM_NODE_PRIVATE_KEY must be set when trust_mode=strict") 

156 _enforce_federation_transport_security() 

157 _enforce_auth_required_in_production() 

158 _enforce_rate_limit_kill_switch_ack() 

159 _warn_if_cors_dev_localhost_enabled() 

160 

161 discovered_plugins = register_discovered_plugins(freeze=False) 

162 _include_plugin_routers(app, discovered_plugins) 

163 apply_migrations() 

164 from .memory_garden_acl_gate import warn_if_memory_garden_acl_filtering_disabled 

165 

166 warn_if_memory_garden_acl_filtering_disabled(logger) 

167 get_registry().freeze() 

168 

169 if settings.otel_enabled: 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true

170 from .observability.tracing import init_tracing 

171 

172 init_tracing( 

173 service_name=settings.otel_service_name, 

174 otlp_endpoint=settings.otel_exporter_otlp_endpoint, 

175 ) 

176 

177 pull_task: asyncio.Task[None] | None = None 

178 if settings.federation_enabled: 

179 from .federation.federation_pull import pull_loop_task 

180 from .federation.peer_token import init_federation_keys 

181 

182 init_federation_keys() 

183 pull_task = asyncio.create_task(pull_loop_task()) 

184 logger.info("Federation enabled — pull %ds", settings.federation_pull_interval_s) 

185 

186 from .subscription_delivery import sweep_loop as _sub_sweep_loop 

187 

188 sweep_task: asyncio.Task[None] = asyncio.create_task(_sub_sweep_loop()) 

189 logger.info( 

190 "Stigmem subscription sweep enabled — interval %ds", 

191 settings.subscription_delivery_sweep_s, 

192 ) 

193 

194 logger.info( 

195 "Stigmem node ready — db=%s auth=%s federation=%s", 

196 settings.db_path, 

197 settings.auth_required, 

198 "enabled" if settings.federation_enabled else "disabled", 

199 ) 

200 yield 

201 

202 sweep_task.cancel() 

203 with suppress(asyncio.CancelledError): 

204 _unused_result = await cast("asyncio.Task[object]", sweep_task) 

205 

206 if pull_task is not None: 

207 pull_task.cancel() 

208 with suppress(asyncio.CancelledError): 

209 _unused_result = await cast("asyncio.Task[object]", pull_task) 

210 

211 app = FastAPI( 

212 title="Stigmem Reference Node", 

213 version="0.9.0a9", 

214 description=( 

215 "Reference node implementing the Stigmem v0.9.0a9 HTTP API — facts, federation, " 

216 "gardens, recall, subscriptions, audit, identity, content-addressed fact IDs. " 

217 "Cross-cutting features (tombstones, time-travel, multi-tenant) are opt-in plugins." 

218 ), 

219 license_info={"name": "Apache-2.0", "url": "https://www.apache.org/licenses/LICENSE-2.0"}, 

220 lifespan=lifespan, 

221 ) 

222 

223 app.add_middleware(RateLimitMiddleware) 

224 _cors_regex = settings.cors_allowed_origin_regex 

225 if settings.cors_dev_localhost: 

226 _cors_regex = _DEV_LOCALHOST_CORS_REGEX 

227 if settings.cors_allowed_origins or _cors_regex: 

228 app.add_middleware( 

229 CORSMiddleware, 

230 allow_origins=settings.cors_allowed_origins, 

231 allow_origin_regex=_cors_regex, 

232 allow_credentials=settings.cors_allow_credentials, 

233 allow_methods=["GET", "POST", "PATCH", "DELETE", "OPTIONS"], 

234 allow_headers=["*"], 

235 expose_headers=["ETag"], 

236 max_age=600, 

237 ) 

238 

239 @app.middleware("http") 

240 async def unsigned_plugin_override_warning( 

241 _request: Request, 

242 call_next: Callable[[Request], Awaitable[Response]], 

243 ) -> Response: 

244 """Warn every request while development unsigned-plugin override is active.""" 

245 from .plugins import get_registry 

246 

247 unsigned_plugins = get_registry().development_unsigned_plugins() 

248 if unsigned_plugins: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true

249 logger.warning( 

250 "SECURITY WARNING: unsigned plugins active via " 

251 "STIGMEM_PLUGIN_SIGNING_REQUIRED=false: %s", 

252 ", ".join(unsigned_plugins), 

253 ) 

254 return await call_next(_request) 

255 

256 if settings.mtls_enabled: 

257 

258 @app.middleware("http") 

259 async def mtls_plaintext_guard( 

260 request: Request, 

261 call_next: Callable[[Request], Awaitable[Response]], 

262 ) -> Response: 

263 """Reject plaintext federation requests when mTLS is configured (§22.1).""" 

264 if request.method == "OPTIONS" and not request.url.path.startswith("/v1/federation"): 

265 return await call_next(request) 

266 if request.url.path.startswith("/v1/federation") and request.url.scheme != "https": 

267 return JSONResponse( 

268 { 

269 "error": "mTLS required", 

270 "detail": "Federation transport requires mutual TLS (spec §22.1). " 

271 "Connect via HTTPS with a valid node certificate.", 

272 }, 

273 status_code=421, 

274 ) 

275 return await call_next(request) 

276 

277 app.include_router(admin_audit_router) 

278 app.include_router(cid_admin_router) 

279 app.include_router(auth_router) 

280 app.include_router(agent_keys_router) 

281 app.include_router(audit_router) 

282 app.include_router(facts_router) 

283 app.include_router(gardens_router) 

284 app.include_router(graph_router) 

285 app.include_router(identity_router) 

286 app.include_router(intents_router) 

287 app.include_router(federation_router) 

288 app.include_router(quarantine_router) 

289 app.include_router(lint_router) 

290 app.include_router(synthesize_router) 

291 app.include_router(decay_router) 

292 app.include_router(aliases_router) 

293 app.include_router(resolver_router) 

294 app.include_router(cards_router) 

295 app.include_router(recall_router) 

296 app.include_router(subscriptions_router) 

297 app.include_router(mcp_router) 

298 app.include_router(wellknown_router) 

299 

300 @app.get("/healthz", tags=["ops"]) 

301 def health() -> dict[str, str]: 

302 return {"status": "ok"} 

303 

304 @app.get("/v1/doctor", tags=["ops"]) 

305 def doctor() -> dict[str, str]: 

306 """Return coarse node health and operator posture. 

307 

308 This endpoint is unauthenticated in v0.9.0a9. The garden ACL posture 

309 field is accepted as ops-endpoint disclosure and intentionally avoids 

310 garden names, membership rows, tenant identifiers, or policy subjects. 

311 """ 

312 from .memory_garden_acl_gate import memory_garden_acl_filtering_state 

313 

314 return { 

315 "status": "ok", 

316 "memory_garden_acl_filtering": memory_garden_acl_filtering_state(), 

317 } 

318 

319 @app.get("/metrics", include_in_schema=False, tags=["ops"]) 

320 def prometheus_metrics() -> Response: 

321 from .observability.metrics import make_metrics_response 

322 

323 resp = make_metrics_response() 

324 if resp is None: 324 ↛ 328line 324 didn't jump to line 328 because the condition on line 324 was always true

325 from fastapi.responses import PlainTextResponse 

326 

327 return PlainTextResponse("# prometheus_client not installed\n", status_code=200) 

328 return resp 

329 

330 @app.get("/v1/me", tags=["auth"]) 

331 def whoami(identity: Annotated[Identity, Depends(resolve_identity)]) -> dict[str, Any]: 

332 return { 

333 "entity_uri": identity.entity_uri, 

334 "permissions": sorted(identity.permissions), 

335 "oidc_sub": identity.oidc_sub, 

336 "tenant_id": identity.tenant_id, 

337 } 

338 

339 @app.get("/ui", include_in_schema=False) 

340 def ui_index() -> FileResponse: 

341 return FileResponse(_STATIC_DIR / "index.html", media_type="text/html") 

342 

343 return app 

344 

345 

346def _include_plugin_routers(app: FastAPI, discovered_plugins: tuple[Any, ...]) -> None: 

347 """Include routers declared by installed plugins once per app instance.""" 

348 if getattr(app.state, "stigmem_plugin_routes_included", False): 

349 return 

350 for plugin in discovered_plugins: 

351 for router in plugin.manifest.routes: 

352 app.include_router(router) 

353 app.state.stigmem_plugin_routes_included = True 

354 

355 

356app = create_app() 

357 

358 

359def run() -> None: 

360 if not settings.mtls_enabled: 

361 uvicorn.run( 

362 "stigmem_node.main:app", 

363 host=settings.host, 

364 port=settings.port, 

365 log_level=settings.log_level, 

366 reload=False, 

367 ) 

368 return 

369 

370 from .federation.tls import cert_watcher_task, reload_tls_cert 

371 

372 # Let uvicorn build the SSL context from cert/key files, then enforce TLS 1.3 

373 # floor and mTLS client-cert requirement on the resulting context object. 

374 config = uvicorn.Config( 

375 "stigmem_node.main:app", 

376 host=settings.host, 

377 port=settings.port, 

378 log_level=settings.log_level, 

379 reload=False, 

380 ssl_certfile=settings.tls_cert_path, 

381 ssl_keyfile=settings.tls_key_path, 

382 ssl_ca_certs=settings.tls_ca_bundle or None, 

383 ssl_cert_reqs=ssl.CERT_REQUIRED, 

384 ) 

385 config.load() 

386 

387 if config.ssl: 

388 config.ssl.minimum_version = ssl.TLSVersion.TLSv1_3 

389 ssl_ctx = config.ssl 

390 

391 async def _serve_with_cert_watcher() -> None: 

392 loop = asyncio.get_running_loop() 

393 if ssl_ctx is not None: 

394 loop.add_signal_handler( 

395 signal.SIGHUP, 

396 lambda: reload_tls_cert(ssl_ctx), 

397 ) 

398 

399 server = uvicorn.Server(config) 

400 watcher_task: asyncio.Task[None] | None = None 

401 if ssl_ctx is not None: 

402 watcher_task = asyncio.create_task(cert_watcher_task(ssl_ctx)) 

403 

404 try: 

405 await server.serve() 

406 finally: 

407 if watcher_task is not None: 

408 watcher_task.cancel() 

409 with suppress(asyncio.CancelledError): 

410 _unused_result = await cast("asyncio.Task[object]", watcher_task) 

411 

412 asyncio.run(_serve_with_cert_watcher()) 

413 

414 

415if __name__ == "__main__": 415 ↛ 416line 415 didn't jump to line 416 because the condition on line 415 was never true

416 run()