Coverage for node / src / stigmem_node / main.py: 82%
213 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Stigmem reference node — FastAPI application factory and entrypoint."""
3from __future__ import annotations
5import asyncio
6import logging
7import signal
8import ssl
9from collections.abc import AsyncGenerator, Awaitable, Callable
10from contextlib import asynccontextmanager, suppress
11from pathlib import Path
12from typing import Annotated, Any, cast
13from urllib.parse import urlparse
15import uvicorn
16from fastapi import Depends, FastAPI, Request, Response
17from fastapi.middleware.cors import CORSMiddleware
18from fastapi.responses import FileResponse, JSONResponse
20from .auth import Identity, resolve_identity
21from .db import apply_migrations
22from .rate_limit import RateLimitMiddleware
23from .routes.admin_audit import router as admin_audit_router
24from .routes.agent_keys import router as agent_keys_router
25from .routes.aliases import router as aliases_router
26from .routes.audit import router as audit_router
27from .routes.auth import router as auth_router
28from .routes.cards import router as cards_router
29from .routes.cid_admin import router as cid_admin_router
30from .routes.decay import router as decay_router
31from .routes.facts import router as facts_router
32from .routes.federation import router as federation_router
33from .routes.gardens import router as gardens_router
34from .routes.graph import router as graph_router
35from .routes.identity import router as identity_router
36from .routes.intents import router as intents_router
37from .routes.lint import router as lint_router
38from .routes.mcp import router as mcp_router
39from .routes.quarantine import router as quarantine_router
40from .routes.recall import router as recall_router
41from .routes.resolver import router as resolver_router
42from .routes.subscriptions import router as subscriptions_router
43from .routes.synthesize import router as synthesize_router
44from .routes.wellknown import router as wellknown_router
45from .settings import settings
47_STATIC_DIR = Path(__file__).parent / "static"
49logger = logging.getLogger("stigmem")
51_DEV_LOCALHOST_CORS_REGEX = r"^https?://(localhost|127\.0\.0\.1)(:\d+)?$"
54def _enforce_federation_transport_security() -> None:
55 """Require explicit opt-in for federation without mTLS."""
56 federation_active = settings.federation_enabled or settings.federation_push_enabled
57 if not federation_active or settings.mtls_enabled:
58 return
60 if not settings.federation_insecure:
61 raise RuntimeError(
62 "Federation requires mTLS by default. Configure STIGMEM_TLS_CERT_PATH, "
63 "STIGMEM_TLS_KEY_PATH, and STIGMEM_TLS_CA_BUNDLE, or set "
64 "STIGMEM_FEDERATION_INSECURE=1 only for local/dev/test federation."
65 )
67 if not _node_url_is_loopback(settings.node_url) and not (
68 settings.local_dev_allow_insecure_non_loopback
69 ):
70 raise RuntimeError(
71 "STIGMEM_FEDERATION_INSECURE=1 is only permitted when node_url is "
72 f"bound to 127.0.0.1 or localhost. Got node_url={settings.node_url!r}. "
73 "Configure mTLS for any non-loopback deployment, or set "
74 "STIGMEM_LOCAL_DEV_ALLOW_INSECURE_NON_LOOPBACK=1 only for local "
75 "Docker/dev networks."
76 )
78 logger.warning(
79 "SECURITY WARNING: federation is running without mTLS because "
80 "STIGMEM_FEDERATION_INSECURE=1 is set. This is only allowed because "
81 "node_url is a loopback address or "
82 "STIGMEM_LOCAL_DEV_ALLOW_INSECURE_NON_LOOPBACK=1 is set. Use this only "
83 "for local/dev/test."
84 )
87def _enforce_auth_required_in_production() -> None:
88 """Refuse to run unauthenticated outside loopback."""
89 if settings.auth_required:
90 return
91 if not _node_url_is_loopback(settings.node_url) and not (
92 settings.local_dev_allow_insecure_non_loopback
93 ):
94 raise RuntimeError(
95 "STIGMEM_AUTH_REQUIRED=false is only permitted when node_url is "
96 f"bound to 127.0.0.1 or localhost. Got node_url={settings.node_url!r}. "
97 "Anonymous identity has read/write/federate permissions; never expose "
98 "this configuration to a network. Set "
99 "STIGMEM_LOCAL_DEV_ALLOW_INSECURE_NON_LOOPBACK=1 only for local "
100 "Docker/dev networks."
101 )
102 logger.warning(
103 "SECURITY WARNING: STIGMEM_AUTH_REQUIRED=false. Anonymous identity has "
104 "full read/write/federate permissions. This is only allowed because "
105 "node_url is a loopback address or "
106 "STIGMEM_LOCAL_DEV_ALLOW_INSECURE_NON_LOOPBACK=1 is set."
107 )
110def _enforce_rate_limit_kill_switch_ack() -> None:
111 """Refuse boot when quota is fully disabled without an explicit acknowledgement."""
112 if settings.rate_limit_write_per_hour != 0 or settings.rate_limit_read_per_hour != 0:
113 return
114 if not settings.rate_limit_disabled_ack:
115 raise RuntimeError(
116 "STIGMEM_RATE_LIMIT_WRITE_PER_HOUR=0 and "
117 "STIGMEM_RATE_LIMIT_READ_PER_HOUR=0 fully disable quota enforcement. "
118 "To proceed, set STIGMEM_RATE_LIMIT_DISABLED_ACK=1 to acknowledge "
119 "that this node accepts unbounded read and write traffic."
120 )
122 logger.warning(
123 "SECURITY WARNING: quota enforcement is fully disabled "
124 "(write=0, read=0) with explicit operator acknowledgment via "
125 "STIGMEM_RATE_LIMIT_DISABLED_ACK=1."
126 )
129def _warn_if_cors_dev_localhost_enabled() -> None:
130 """Log the expanded development CORS posture at startup."""
131 if settings.cors_dev_localhost:
132 logger.warning(
133 "SECURITY WARNING: STIGMEM_CORS_DEV_LOCALHOST=1 enables browser "
134 "access from localhost and loopback origins. Use this only for "
135 "local development."
136 )
139def _node_url_is_loopback(node_url: str) -> bool:
140 """Return True iff node_url's host is a loopback address."""
141 try:
142 parsed = urlparse(node_url)
143 host = (parsed.hostname or "").lower()
144 except ValueError:
145 return False
146 return host in {"localhost", "127.0.0.1", "::1"}
149def create_app() -> FastAPI:
150 @asynccontextmanager
151 async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
152 from .plugins import get_registry, register_discovered_plugins
154 if settings.trust_mode == "strict" and not settings.node_private_key: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true
155 raise RuntimeError("STIGMEM_NODE_PRIVATE_KEY must be set when trust_mode=strict")
156 _enforce_federation_transport_security()
157 _enforce_auth_required_in_production()
158 _enforce_rate_limit_kill_switch_ack()
159 _warn_if_cors_dev_localhost_enabled()
161 discovered_plugins = register_discovered_plugins(freeze=False)
162 _include_plugin_routers(app, discovered_plugins)
163 apply_migrations()
164 from .memory_garden_acl_gate import warn_if_memory_garden_acl_filtering_disabled
166 warn_if_memory_garden_acl_filtering_disabled(logger)
167 get_registry().freeze()
169 if settings.otel_enabled: 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true
170 from .observability.tracing import init_tracing
172 init_tracing(
173 service_name=settings.otel_service_name,
174 otlp_endpoint=settings.otel_exporter_otlp_endpoint,
175 )
177 pull_task: asyncio.Task[None] | None = None
178 if settings.federation_enabled:
179 from .federation.federation_pull import pull_loop_task
180 from .federation.peer_token import init_federation_keys
182 init_federation_keys()
183 pull_task = asyncio.create_task(pull_loop_task())
184 logger.info("Federation enabled — pull %ds", settings.federation_pull_interval_s)
186 from .subscription_delivery import sweep_loop as _sub_sweep_loop
188 sweep_task: asyncio.Task[None] = asyncio.create_task(_sub_sweep_loop())
189 logger.info(
190 "Stigmem subscription sweep enabled — interval %ds",
191 settings.subscription_delivery_sweep_s,
192 )
194 logger.info(
195 "Stigmem node ready — db=%s auth=%s federation=%s",
196 settings.db_path,
197 settings.auth_required,
198 "enabled" if settings.federation_enabled else "disabled",
199 )
200 yield
202 sweep_task.cancel()
203 with suppress(asyncio.CancelledError):
204 _unused_result = await cast("asyncio.Task[object]", sweep_task)
206 if pull_task is not None:
207 pull_task.cancel()
208 with suppress(asyncio.CancelledError):
209 _unused_result = await cast("asyncio.Task[object]", pull_task)
211 app = FastAPI(
212 title="Stigmem Reference Node",
213 version="0.9.0a9",
214 description=(
215 "Reference node implementing the Stigmem v0.9.0a9 HTTP API — facts, federation, "
216 "gardens, recall, subscriptions, audit, identity, content-addressed fact IDs. "
217 "Cross-cutting features (tombstones, time-travel, multi-tenant) are opt-in plugins."
218 ),
219 license_info={"name": "Apache-2.0", "url": "https://www.apache.org/licenses/LICENSE-2.0"},
220 lifespan=lifespan,
221 )
223 app.add_middleware(RateLimitMiddleware)
224 _cors_regex = settings.cors_allowed_origin_regex
225 if settings.cors_dev_localhost:
226 _cors_regex = _DEV_LOCALHOST_CORS_REGEX
227 if settings.cors_allowed_origins or _cors_regex:
228 app.add_middleware(
229 CORSMiddleware,
230 allow_origins=settings.cors_allowed_origins,
231 allow_origin_regex=_cors_regex,
232 allow_credentials=settings.cors_allow_credentials,
233 allow_methods=["GET", "POST", "PATCH", "DELETE", "OPTIONS"],
234 allow_headers=["*"],
235 expose_headers=["ETag"],
236 max_age=600,
237 )
239 @app.middleware("http")
240 async def unsigned_plugin_override_warning(
241 _request: Request,
242 call_next: Callable[[Request], Awaitable[Response]],
243 ) -> Response:
244 """Warn every request while development unsigned-plugin override is active."""
245 from .plugins import get_registry
247 unsigned_plugins = get_registry().development_unsigned_plugins()
248 if unsigned_plugins: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true
249 logger.warning(
250 "SECURITY WARNING: unsigned plugins active via "
251 "STIGMEM_PLUGIN_SIGNING_REQUIRED=false: %s",
252 ", ".join(unsigned_plugins),
253 )
254 return await call_next(_request)
256 if settings.mtls_enabled:
258 @app.middleware("http")
259 async def mtls_plaintext_guard(
260 request: Request,
261 call_next: Callable[[Request], Awaitable[Response]],
262 ) -> Response:
263 """Reject plaintext federation requests when mTLS is configured (§22.1)."""
264 if request.method == "OPTIONS" and not request.url.path.startswith("/v1/federation"):
265 return await call_next(request)
266 if request.url.path.startswith("/v1/federation") and request.url.scheme != "https":
267 return JSONResponse(
268 {
269 "error": "mTLS required",
270 "detail": "Federation transport requires mutual TLS (spec §22.1). "
271 "Connect via HTTPS with a valid node certificate.",
272 },
273 status_code=421,
274 )
275 return await call_next(request)
277 app.include_router(admin_audit_router)
278 app.include_router(cid_admin_router)
279 app.include_router(auth_router)
280 app.include_router(agent_keys_router)
281 app.include_router(audit_router)
282 app.include_router(facts_router)
283 app.include_router(gardens_router)
284 app.include_router(graph_router)
285 app.include_router(identity_router)
286 app.include_router(intents_router)
287 app.include_router(federation_router)
288 app.include_router(quarantine_router)
289 app.include_router(lint_router)
290 app.include_router(synthesize_router)
291 app.include_router(decay_router)
292 app.include_router(aliases_router)
293 app.include_router(resolver_router)
294 app.include_router(cards_router)
295 app.include_router(recall_router)
296 app.include_router(subscriptions_router)
297 app.include_router(mcp_router)
298 app.include_router(wellknown_router)
300 @app.get("/healthz", tags=["ops"])
301 def health() -> dict[str, str]:
302 return {"status": "ok"}
304 @app.get("/v1/doctor", tags=["ops"])
305 def doctor() -> dict[str, str]:
306 """Return coarse node health and operator posture.
308 This endpoint is unauthenticated in v0.9.0a9. The garden ACL posture
309 field is accepted as ops-endpoint disclosure and intentionally avoids
310 garden names, membership rows, tenant identifiers, or policy subjects.
311 """
312 from .memory_garden_acl_gate import memory_garden_acl_filtering_state
314 return {
315 "status": "ok",
316 "memory_garden_acl_filtering": memory_garden_acl_filtering_state(),
317 }
319 @app.get("/metrics", include_in_schema=False, tags=["ops"])
320 def prometheus_metrics() -> Response:
321 from .observability.metrics import make_metrics_response
323 resp = make_metrics_response()
324 if resp is None: 324 ↛ 328line 324 didn't jump to line 328 because the condition on line 324 was always true
325 from fastapi.responses import PlainTextResponse
327 return PlainTextResponse("# prometheus_client not installed\n", status_code=200)
328 return resp
330 @app.get("/v1/me", tags=["auth"])
331 def whoami(identity: Annotated[Identity, Depends(resolve_identity)]) -> dict[str, Any]:
332 return {
333 "entity_uri": identity.entity_uri,
334 "permissions": sorted(identity.permissions),
335 "oidc_sub": identity.oidc_sub,
336 "tenant_id": identity.tenant_id,
337 }
339 @app.get("/ui", include_in_schema=False)
340 def ui_index() -> FileResponse:
341 return FileResponse(_STATIC_DIR / "index.html", media_type="text/html")
343 return app
346def _include_plugin_routers(app: FastAPI, discovered_plugins: tuple[Any, ...]) -> None:
347 """Include routers declared by installed plugins once per app instance."""
348 if getattr(app.state, "stigmem_plugin_routes_included", False):
349 return
350 for plugin in discovered_plugins:
351 for router in plugin.manifest.routes:
352 app.include_router(router)
353 app.state.stigmem_plugin_routes_included = True
356app = create_app()
359def run() -> None:
360 if not settings.mtls_enabled:
361 uvicorn.run(
362 "stigmem_node.main:app",
363 host=settings.host,
364 port=settings.port,
365 log_level=settings.log_level,
366 reload=False,
367 )
368 return
370 from .federation.tls import cert_watcher_task, reload_tls_cert
372 # Let uvicorn build the SSL context from cert/key files, then enforce TLS 1.3
373 # floor and mTLS client-cert requirement on the resulting context object.
374 config = uvicorn.Config(
375 "stigmem_node.main:app",
376 host=settings.host,
377 port=settings.port,
378 log_level=settings.log_level,
379 reload=False,
380 ssl_certfile=settings.tls_cert_path,
381 ssl_keyfile=settings.tls_key_path,
382 ssl_ca_certs=settings.tls_ca_bundle or None,
383 ssl_cert_reqs=ssl.CERT_REQUIRED,
384 )
385 config.load()
387 if config.ssl:
388 config.ssl.minimum_version = ssl.TLSVersion.TLSv1_3
389 ssl_ctx = config.ssl
391 async def _serve_with_cert_watcher() -> None:
392 loop = asyncio.get_running_loop()
393 if ssl_ctx is not None:
394 loop.add_signal_handler(
395 signal.SIGHUP,
396 lambda: reload_tls_cert(ssl_ctx),
397 )
399 server = uvicorn.Server(config)
400 watcher_task: asyncio.Task[None] | None = None
401 if ssl_ctx is not None:
402 watcher_task = asyncio.create_task(cert_watcher_task(ssl_ctx))
404 try:
405 await server.serve()
406 finally:
407 if watcher_task is not None:
408 watcher_task.cancel()
409 with suppress(asyncio.CancelledError):
410 _unused_result = await cast("asyncio.Task[object]", watcher_task)
412 asyncio.run(_serve_with_cert_watcher())
415if __name__ == "__main__": 415 ↛ 416line 415 didn't jump to line 416 because the condition on line 415 was never true
416 run()