Coverage for node / src / stigmem_node / routes / facts / query.py: 92%
203 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""GET /v1/facts query route and query helpers."""
3from __future__ import annotations
5import uuid
6from datetime import UTC, datetime, timedelta
7from typing import Annotated, Any
9from fastapi import Depends, Header, HTTPException, Query, Response, status
11from ... import settings as _settings_pkg
12from ...auth import Identity, resolve_identity
13from ...db import db
14from ...entity_normalizer import NormalizationError, normalize_entity_uri
15from ...garden_acl import get_garden_by_garden_uri, require_garden_read
16from ...memory_garden_acl_gate import recall_filter_enabled
17from ...metrics import FACT_READ
18from ...models.constants import VALID_SCOPES
19from ...models.facts import FactRecord, QueryResponse, row_to_record
20from ...models.tombstones import TombstoneNotice
21from ...plugins import Deny, Failure, Success, TenantContext, get_registry
22from ...recall.recall_pipeline import apply_recall_pipeline
23from ...session_graph import record_read_scopes
24from ..cid_integrity import enforce_read_path_cid
25from ..time_travel_gate import require_time_travel_enabled
26from .common import _get_tombstone_filter, logger, router
29def _validate_as_of(as_of: str) -> datetime:
30 """Parse and validate an as_of timestamp per §24.2.2."""
31 import re
33 # URL query strings decode + as space; restore the + in timezone offsets like "+00:00".
34 normalized = re.sub(r" (\d{2}:\d{2})$", r"+\1", as_of).replace("Z", "+00:00")
35 try:
36 ts = datetime.fromisoformat(normalized)
37 except ValueError as exc:
38 raise HTTPException(
39 status_code=400,
40 detail={"code": "as_of_invalid_timestamp", "message": str(exc)},
41 ) from exc
42 if ts.tzinfo is None: 42 ↛ 43line 42 didn't jump to line 43 because the condition on line 42 was never true
43 ts = ts.replace(tzinfo=UTC)
44 if ts > datetime.now(UTC) + timedelta(seconds=5):
45 raise HTTPException(
46 status_code=400,
47 detail={"code": "as_of_future", "message": "as_of must not be in the future (§24.2.2)"},
48 )
49 floor = _settings_pkg.settings.as_of_retention_floor
50 if floor:
51 try:
52 floor_ts = datetime.fromisoformat(floor.replace("Z", "+00:00"))
53 if floor_ts.tzinfo is None: 53 ↛ 54line 53 didn't jump to line 54 because the condition on line 53 was never true
54 floor_ts = floor_ts.replace(tzinfo=UTC)
55 if ts < floor_ts:
56 raise HTTPException(
57 status_code=400,
58 detail={
59 "code": "as_of_before_retention_floor",
60 "message": "as_of predates the retention horizon for this deployment (§24.2.2)", # noqa: E501
61 },
62 )
63 except HTTPException:
64 raise
65 except Exception as exc: # nosec B110
66 logger.warning("could not read retention floor while validating as_of: %s", exc)
67 return ts
70def _legal_hold_blocks_query(conn: Any, entity: str) -> bool:
71 """F-14 §24.3.2: True if a legal-hold tombstone covers *entity* (no active revocation)."""
72 legal_hold_row = conn.execute(
73 """SELECT 1 FROM tombstones t
74 WHERE t.entity_uri = ?
75 AND t.legal_hold = 1
76 AND NOT EXISTS (
77 SELECT 1 FROM tombstone_revocations r WHERE r.tombstone_id = t.id
78 )
79 LIMIT 1""",
80 (entity,),
81 ).fetchone()
82 return legal_hold_row is not None
85_AS_OF_SELECT_SQL = (
86 "SELECT f.* FROM facts f"
87 " WHERE f.tenant_id = ?"
88 " AND f.timestamp <= ?"
89 " AND (f.valid_until IS NULL OR f.valid_until > ?)"
90 " AND NOT EXISTS ("
91 " SELECT 1 FROM fact_retractions fr"
92 " WHERE fr.fact_id = f.id AND fr.retracted_at <= ?"
93 " )"
94 " AND (? IS NULL"
95 " OR f.entity = ?"
96 " OR f.entity IN (SELECT raw_uri FROM entity_aliases WHERE canonical_uri = ?))"
97 " AND (? IS NULL OR f.relation = ?)"
98 " AND (? IS NULL OR f.scope = ?)"
99 " AND (? IS NULL OR f.id > ?)"
100 " ORDER BY f.timestamp DESC, f.id DESC"
101 " LIMIT ?"
102)
104_GARDEN_VISIBILITY_NONE = 0
105_GARDEN_VISIBILITY_EXACT = 1
106_GARDEN_VISIBILITY_VISIBLE_SET = 2
107_GARDEN_VISIBILITY_NULL_ONLY = 3
109_FACT_QUERY_SQL = (
110 "SELECT f.*, "
111 "COALESCE(fvo.valid_until, f.valid_until) AS projected_valid_until, "
112 "COALESCE(fvo.confidence, f.confidence) AS projected_confidence, "
113 "COALESCE(fgm.garden_id, f.garden_id) AS projected_garden_id, "
114 "COALESCE(fqs.quarantine_status, f.quarantine_status) AS projected_quarantine_status, "
115 "COALESCE(fqs.quarantine_garden_id, f.quarantine_garden_id) "
116 "AS projected_quarantine_garden_id, "
117 "COALESCE(f.cid, (SELECT fca.cid FROM fact_cid_aliases fca "
118 "WHERE fca.fact_id = f.id ORDER BY fca.cid LIMIT 1)) AS projected_cid"
119 " FROM facts f"
120 " LEFT JOIN fact_validity_overrides fvo ON fvo.fact_id = f.id"
121 " LEFT JOIN fact_garden_membership fgm ON fgm.fact_id = f.id"
122 " LEFT JOIN fact_quarantine_status fqs ON fqs.fact_id = f.id"
123 " WHERE COALESCE(fvo.confidence, f.confidence) >= ?"
124 " AND f.tenant_id = ?"
125 " AND ("
126 " ? = 0"
127 " OR (? = 1 AND COALESCE(fgm.garden_id, f.garden_id) = ?)"
128 " OR (? = 2 AND (COALESCE(fgm.garden_id, f.garden_id) IS NULL"
129 " OR EXISTS (SELECT 1 FROM _query_visible_gardens qvg"
130 " WHERE qvg.id = COALESCE(fgm.garden_id, f.garden_id))))"
131 " OR (? = 3 AND COALESCE(fgm.garden_id, f.garden_id) IS NULL)"
132 " )"
133 " AND (? IS NULL OR f.attested = ?)"
134 " AND (? IS NULL"
135 " OR f.entity = ?"
136 " OR f.entity IN (SELECT raw_uri FROM entity_aliases WHERE canonical_uri = ?))"
137 " AND (? IS NULL OR f.relation = ?)"
138 " AND (? IS NULL"
139 " OR f.source = ?"
140 " OR f.source IN (SELECT raw_uri FROM entity_aliases WHERE canonical_uri = ?))"
141 " AND (? IS NULL OR f.scope = ?)"
142 " AND (? IS NULL OR f.timestamp > ?)"
143 " AND (? IS NULL OR f.id > ?)"
144 " AND (? = 1"
145 " OR COALESCE(fvo.valid_until, f.valid_until) IS NULL"
146 " OR COALESCE(fvo.valid_until, f.valid_until) > ?)"
147 " ORDER BY f.timestamp DESC, f.id DESC LIMIT ?"
148)
150def _build_as_of_params(
151 *,
152 entity: str | None,
153 scope: str | None,
154 relation: str | None,
155 as_of: str,
156 tenant_id: str,
157 cursor: str | None,
158 limit: int,
159) -> list[Any]:
160 """Return the bind values for ``_AS_OF_SELECT_SQL``.
162 The SQL text is the ``_AS_OF_SELECT_SQL`` module-level constant; this
163 helper only computes bind values. Keeping the SQL string out of any
164 function that takes user input prevents CodeQL from interprocedurally
165 tainting it — see issue #121 for why a function that takes user
166 inputs and returns ``(sql, params)`` still trips ``py/sql-injection``
167 even when the returned SQL value is invariant.
168 """
169 if scope is not None and scope not in VALID_SCOPES: 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true
170 raise HTTPException(status_code=400, detail=f"scope must be one of {VALID_SCOPES}")
172 # Normalize empty strings to None so the IS NULL gate matches the
173 # previous ``if entity:`` truthiness behaviour.
174 entity_p = entity or None
175 relation_p = relation or None
176 scope_p = scope or None
177 cursor_p = cursor or None
179 return [
180 tenant_id,
181 as_of,
182 as_of,
183 as_of,
184 entity_p,
185 entity_p,
186 entity_p,
187 relation_p,
188 relation_p,
189 scope_p,
190 scope_p,
191 cursor_p,
192 cursor_p,
193 limit + 1,
194 ]
197def _query_facts_as_of_impl(
198 conn: Any,
199 *,
200 entity: str | None,
201 scope: str | None,
202 relation: str | None,
203 as_of: str,
204 is_admin_caller: bool,
205 tenant_id: str,
206 limit: int,
207 cursor: str | None,
208) -> QueryResponse:
209 """Return facts visible at as_of per §24.4.
211 Retraction gating uses fact_retractions.retracted_at (append-only log), NOT facts.confidence.
212 Expiry gating uses facts.valid_until.
213 Tombstone filter per §24.3: retroactive RTBF unless legal_hold=true AND is_admin_caller.
214 """
215 # F-14 §24.3.2: pre-check — agent-key callers get empty results if a legal-hold
216 # tombstone covers the queried entity (short-circuit before executing the query)
217 if entity and not is_admin_caller and _legal_hold_blocks_query(conn, entity):
218 return QueryResponse(facts=[], total=None, cursor=None)
220 params = _build_as_of_params(
221 entity=entity,
222 scope=scope,
223 relation=relation,
224 as_of=as_of,
225 tenant_id=tenant_id,
226 cursor=cursor,
227 limit=limit,
228 )
230 rows = conn.execute(_AS_OF_SELECT_SQL, params).fetchall()
231 has_more = len(rows) > limit
232 rows = rows[:limit]
234 seen: dict[tuple[str, str, str], int] = {}
235 for r in rows:
236 key = (r["entity"], r["relation"], r["scope"])
237 seen[key] = seen.get(key, 0) + 1
239 for r in rows:
240 enforce_read_path_cid(r)
241 records = [
242 row_to_record(r, contradicted=seen[(r["entity"], r["relation"], r["scope"])] > 1)
243 for r in rows
244 ]
246 # §24.3: tombstone filter for as_of queries
247 tombstone_notices: list[TombstoneNotice] = []
248 tombstone_filtered = False
249 if records:
250 entity_uris = list({r.entity for r in records})
251 excluded, tombstone_notices = _get_tombstone_filter(
252 conn, entity_uris, scope or "local", is_admin_caller
253 )
254 if excluded:
255 records = [r for r in records if r.entity not in excluded]
256 tombstone_filtered = True
258 next_cursor = rows[-1]["id"] if has_more and rows else None
259 # §23.3.3 r.3: suppress total when tombstone filtering was applied to prevent oracle leakage
260 total = None if tombstone_filtered else len(records)
261 return QueryResponse(
262 facts=records,
263 total=total,
264 cursor=next_cursor,
265 tombstone_notices=tombstone_notices,
266 )
269@router.get("", response_model=QueryResponse)
270def query_facts(
271 identity: Annotated[Identity, Depends(resolve_identity)],
272 response: Response,
273 session_id: Annotated[str | None, Header(alias="Stigmem-Session")] = None,
274 entity: str | None = Query(None),
275 relation: str | None = Query(None),
276 source: str | None = Query(None),
277 scope: str | None = Query(None),
278 min_confidence: float = Query(0.0, ge=0.0, le=1.0),
279 include_contradicted: bool = Query(False),
280 include_expired: bool = Query(False),
281 after: str | None = Query(
282 None, description="Return facts with timestamp > this ISO 8601 value"
283 ), # noqa: E501
284 cursor: str | None = Query(None, description="Opaque pagination cursor (fact id)"),
285 limit: int = Query(50, ge=1, le=500),
286 garden_id: str | None = Query(
287 None, description="Filter to facts in this garden (Spec-02-Scopes-and-ACL)"
288 ), # noqa: E501
289 attested: bool | None = Query(
290 None, description="Filter by source-attestation status (Spec-X6-Source-Attestation)"
291 ), # noqa: E501
292 include_low_trust: bool = Query(
293 False,
294 description="Include facts with effective_confidence < 0.3 (Spec-05-Federation-Trust)",
295 ), # noqa: E501
296 as_of: str | None = Query(
297 None,
298 description="Time-travel query: return facts visible at this ISO 8601 timestamp (Spec-X3-Time-Travel-Queries)", # noqa: E501
299 ), # noqa: E501
300) -> QueryResponse:
301 """Query facts by pattern (Spec-03-HTTP-API).
303 Omitted fields are wildcards. Entity/source are normalized by Spec-01-Fact-Model.
304 """
305 if not identity.can_read():
306 raise HTTPException(
307 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required"
308 ) # noqa: E501
310 request_id = str(uuid.uuid4())
311 tenant = TenantContext(
312 tenant_id=identity.tenant_id,
313 metadata={"tenant_context_source": "hook"},
314 )
315 registry = get_registry()
316 query_payload: dict[str, Any] = {
317 "entity": entity,
318 "relation": relation,
319 "source": source,
320 "scope": scope,
321 "min_confidence": min_confidence,
322 "include_contradicted": include_contradicted,
323 "include_expired": include_expired,
324 "after": after,
325 "cursor": cursor,
326 "limit": limit,
327 "garden_id": garden_id,
328 "attested": attested,
329 "include_low_trust": include_low_trust,
330 "as_of": as_of,
331 }
332 decision = registry.fire_voting(
333 "pre_recall_authorize",
334 identity=identity,
335 tenant=tenant,
336 request_id=request_id,
337 query=query_payload,
338 )
339 if isinstance(decision, Deny):
340 registry.fire_fire_and_forget(
341 "post_recall_audit",
342 result=None,
343 identity=identity,
344 tenant=tenant,
345 request_id=request_id,
346 outcome=Failure(reason=decision.reason),
347 )
348 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=decision.reason)
350 rewritten_query = registry.fire_filter_chain(
351 "pre_recall_rewrite",
352 query_payload,
353 identity=identity,
354 tenant=tenant,
355 request_id=request_id,
356 )
357 entity = rewritten_query["entity"]
358 relation = rewritten_query["relation"]
359 source = rewritten_query["source"]
360 scope = rewritten_query["scope"]
361 min_confidence = rewritten_query["min_confidence"]
362 include_contradicted = rewritten_query["include_contradicted"]
363 include_expired = rewritten_query["include_expired"]
364 after = rewritten_query["after"]
365 cursor = rewritten_query["cursor"]
366 limit = rewritten_query["limit"]
367 garden_id = rewritten_query["garden_id"]
368 attested = rewritten_query["attested"]
369 include_low_trust = rewritten_query["include_low_trust"]
370 as_of = rewritten_query["as_of"]
372 # §24.4: time-travel query — delegate to as_of implementation
373 if as_of is not None:
374 require_time_travel_enabled(registry, surface="fact_query")
375 _validate_as_of(as_of)
376 with db() as conn:
377 result = _query_facts_as_of_impl(
378 conn,
379 entity=entity,
380 scope=scope,
381 relation=relation,
382 as_of=as_of,
383 is_admin_caller=identity.is_admin(),
384 tenant_id=identity.tenant_id,
385 limit=limit,
386 cursor=cursor,
387 )
388 if result.total is not None:
389 response.headers["X-Total-Count"] = str(result.total)
390 with db() as conn:
391 record_read_scopes(
392 conn,
393 identity=identity,
394 session_id=session_id,
395 scopes={fact.scope for fact in result.facts},
396 )
397 registry.fire_fire_and_forget(
398 "post_recall_audit",
399 result=result,
400 identity=identity,
401 tenant=tenant,
402 request_id=request_id,
403 outcome=Success(),
404 )
405 return result
407 FACT_READ.labels(principal=identity.entity_uri, tenant=identity.tenant_id).inc()
409 # Garden ACL: resolve and enforce membership before querying (spec §5.20, §17.3)
410 garden = _resolve_garden_or_404(garden_id, identity)
412 with db() as conn:
413 garden_visibility_mode, exact_garden_id, visible_garden_ids = _resolve_garden_visibility(
414 conn, garden, identity
415 )
416 _prepare_garden_visibility_table(conn, visible_garden_ids)
417 params = _build_query_params(
418 identity=identity,
419 garden_visibility_mode=garden_visibility_mode,
420 exact_garden_id=exact_garden_id,
421 entity=entity,
422 relation=relation,
423 source=source,
424 scope=scope,
425 min_confidence=min_confidence,
426 attested=attested,
427 after=after,
428 cursor=cursor,
429 include_expired=include_expired,
430 limit=limit,
431 )
432 rows = conn.execute(_FACT_QUERY_SQL, params).fetchall()
434 has_more = len(rows) > limit
435 rows = rows[:limit]
437 records = _rows_to_records(rows)
438 if not include_contradicted:
439 records = [r for r in records if not r.contradicted]
441 # v1.1: apply recall-time trust multiplier + content sanitizer (§19.4.4, §19.7)
442 records = apply_recall_pipeline(records, identity=identity, include_low_trust=include_low_trust)
444 # §23.3: tombstone filter — must be applied after scope filtering, before packing
445 records, tombstone_filtered = _apply_tombstone_filter(records, scope, identity)
446 records = registry.fire_filter_chain(
447 "recall_filter",
448 records,
449 identity=identity,
450 tenant=tenant,
451 request_id=request_id,
452 )
453 score_deltas = registry.fire_score_delta(
454 "recall_rank",
455 records,
456 identity=identity,
457 tenant=tenant,
458 request_id=request_id,
459 )
460 if score_deltas: 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true
461 records = sorted(records, key=lambda record: score_deltas.get(record.id, 0.0), reverse=True)
463 next_cursor = rows[-1]["id"] if has_more and rows else None
464 # §23.3.3 r.3: suppress total when tombstone filtering was applied to prevent oracle leakage
465 total = None if tombstone_filtered else len(records)
466 result = QueryResponse(facts=records, total=total, cursor=next_cursor)
467 if result.total is not None:
468 response.headers["X-Total-Count"] = str(result.total)
469 with db() as conn:
470 record_read_scopes(
471 conn,
472 identity=identity,
473 session_id=session_id,
474 scopes={fact.scope for fact in records},
475 )
476 registry.fire_fire_and_forget(
477 "post_recall_audit",
478 result=result,
479 identity=identity,
480 tenant=tenant,
481 request_id=request_id,
482 outcome=Success(),
483 )
484 return result
487def _resolve_garden_or_404(garden_id: str | None, identity: Identity) -> Any:
488 """Return the garden row when ``garden_id`` is set; 404 if missing; enforce read ACL."""
489 if garden_id is None:
490 return None
491 garden = get_garden_by_garden_uri(garden_id, tenant_id=identity.tenant_id)
492 if garden is None: 492 ↛ 493line 492 didn't jump to line 493 because the condition on line 492 was never true
493 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="garden not found")
494 require_garden_read(garden, identity)
495 return garden
498def _resolve_garden_visibility(
499 conn: Any,
500 garden: Any,
501 identity: Identity,
502) -> tuple[int, str | None, list[str]]:
503 """Return query visibility mode, exact garden id, and visible garden id set."""
504 if garden is not None:
505 return _GARDEN_VISIBILITY_EXACT, garden["id"], []
506 if not recall_filter_enabled():
507 return _GARDEN_VISIBILITY_NONE, None, []
509 visible_garden_ids = [
510 row["id"]
511 for row in conn.execute(
512 "SELECT g.id FROM gardens g"
513 " WHERE g.tenant_id = ?"
514 " AND EXISTS ("
515 " SELECT 1 FROM garden_members gm"
516 " WHERE gm.garden_id = g.id AND gm.entity_uri = ?"
517 " )",
518 (identity.tenant_id, identity.entity_uri),
519 ).fetchall()
520 ]
521 if visible_garden_ids: 521 ↛ 522line 521 didn't jump to line 522 because the condition on line 521 was never true
522 return _GARDEN_VISIBILITY_VISIBLE_SET, None, visible_garden_ids
523 return _GARDEN_VISIBILITY_NULL_ONLY, None, []
526def _prepare_garden_visibility_table(conn: Any, visible_garden_ids: list[str]) -> None:
527 """Populate the per-connection visibility table referenced by ``_FACT_QUERY_SQL``."""
528 conn.execute("CREATE TEMP TABLE IF NOT EXISTS _query_visible_gardens (id TEXT PRIMARY KEY)")
529 conn.execute("DELETE FROM _query_visible_gardens")
530 for garden_id in visible_garden_ids: 530 ↛ 531line 530 didn't jump to line 531 because the loop on line 530 never started
531 conn.execute("INSERT OR IGNORE INTO _query_visible_gardens (id) VALUES (?)", (garden_id,))
534def _normalise_uri_or_raw(raw_value: str) -> str:
535 """Best-effort URI normalisation; falls back to the raw value on failure."""
536 try: # noqa: SIM105
537 return normalize_entity_uri(raw_value)
538 except NormalizationError:
539 return raw_value # malformed — fall through to exact match
542def _build_query_params( # noqa: PLR0913 — narrow internal helper, keeps query_facts signature flat
543 *,
544 identity: Identity,
545 garden_visibility_mode: int,
546 exact_garden_id: str | None,
547 entity: str | None,
548 relation: str | None,
549 source: str | None,
550 scope: str | None,
551 min_confidence: float,
552 attested: bool | None,
553 after: str | None,
554 cursor: str | None,
555 include_expired: bool,
556 limit: int,
557) -> list[Any]:
558 """Return bind values for the fixed ``_FACT_QUERY_SQL`` template."""
559 if scope and scope not in VALID_SCOPES: 559 ↛ 560line 559 didn't jump to line 560 because the condition on line 559 was never true
560 raise HTTPException(status_code=400, detail=f"scope must be one of {VALID_SCOPES}")
562 normalised_entity = _normalise_uri_or_raw(entity) if entity else None
563 normalised_source = _normalise_uri_or_raw(source) if source else None
564 attested_value = None if attested is None else 1 if attested else 0
565 now = datetime.now(UTC).isoformat()
566 return [
567 min_confidence,
568 identity.tenant_id,
569 garden_visibility_mode,
570 garden_visibility_mode,
571 exact_garden_id,
572 garden_visibility_mode,
573 garden_visibility_mode,
574 attested_value,
575 attested_value,
576 normalised_entity,
577 normalised_entity,
578 normalised_entity,
579 relation or None,
580 relation or None,
581 normalised_source,
582 normalised_source,
583 normalised_source,
584 scope or None,
585 scope or None,
586 after or None,
587 after or None,
588 cursor or None,
589 cursor or None,
590 1 if include_expired else 0,
591 now,
592 limit + 1,
593 ]
596def _rows_to_records(rows: list[Any]) -> list[FactRecord]:
597 """Convert raw rows into FactRecords; mark within-key duplicates contradicted."""
598 seen: dict[tuple[str, str, str], int] = {}
599 for r in rows:
600 enforce_read_path_cid(r)
601 key = (r["entity"], r["relation"], r["scope"])
602 seen[key] = seen.get(key, 0) + 1
603 return [
604 row_to_record(r, contradicted=seen[(r["entity"], r["relation"], r["scope"])] > 1)
605 for r in rows
606 ]
609def _apply_tombstone_filter(
610 records: list[FactRecord],
611 scope: str | None,
612 identity: Identity,
613) -> tuple[list[FactRecord], bool]:
614 """Return (filtered, tombstone_filtered_flag). Empty input → no work."""
615 if not records:
616 return records, False
617 entity_uris_in_result = list({r.entity for r in records})
618 with db() as _tc_conn:
619 excluded, _notices = _get_tombstone_filter(
620 _tc_conn, entity_uris_in_result, scope or "local", identity.is_admin()
621 )
622 if not excluded:
623 return records, False
624 return [r for r in records if r.entity not in excluded], True