Coverage for node / src / stigmem_node / routes / facts / query.py: 92%

203 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""GET /v1/facts query route and query helpers.""" 

2 

3from __future__ import annotations 

4 

5import uuid 

6from datetime import UTC, datetime, timedelta 

7from typing import Annotated, Any 

8 

9from fastapi import Depends, Header, HTTPException, Query, Response, status 

10 

11from ... import settings as _settings_pkg 

12from ...auth import Identity, resolve_identity 

13from ...db import db 

14from ...entity_normalizer import NormalizationError, normalize_entity_uri 

15from ...garden_acl import get_garden_by_garden_uri, require_garden_read 

16from ...memory_garden_acl_gate import recall_filter_enabled 

17from ...metrics import FACT_READ 

18from ...models.constants import VALID_SCOPES 

19from ...models.facts import FactRecord, QueryResponse, row_to_record 

20from ...models.tombstones import TombstoneNotice 

21from ...plugins import Deny, Failure, Success, TenantContext, get_registry 

22from ...recall.recall_pipeline import apply_recall_pipeline 

23from ...session_graph import record_read_scopes 

24from ..cid_integrity import enforce_read_path_cid 

25from ..time_travel_gate import require_time_travel_enabled 

26from .common import _get_tombstone_filter, logger, router 

27 

28 

29def _validate_as_of(as_of: str) -> datetime: 

30 """Parse and validate an as_of timestamp per §24.2.2.""" 

31 import re 

32 

33 # URL query strings decode + as space; restore the + in timezone offsets like "+00:00". 

34 normalized = re.sub(r" (\d{2}:\d{2})$", r"+\1", as_of).replace("Z", "+00:00") 

35 try: 

36 ts = datetime.fromisoformat(normalized) 

37 except ValueError as exc: 

38 raise HTTPException( 

39 status_code=400, 

40 detail={"code": "as_of_invalid_timestamp", "message": str(exc)}, 

41 ) from exc 

42 if ts.tzinfo is None: 42 ↛ 43line 42 didn't jump to line 43 because the condition on line 42 was never true

43 ts = ts.replace(tzinfo=UTC) 

44 if ts > datetime.now(UTC) + timedelta(seconds=5): 

45 raise HTTPException( 

46 status_code=400, 

47 detail={"code": "as_of_future", "message": "as_of must not be in the future (§24.2.2)"}, 

48 ) 

49 floor = _settings_pkg.settings.as_of_retention_floor 

50 if floor: 

51 try: 

52 floor_ts = datetime.fromisoformat(floor.replace("Z", "+00:00")) 

53 if floor_ts.tzinfo is None: 53 ↛ 54line 53 didn't jump to line 54 because the condition on line 53 was never true

54 floor_ts = floor_ts.replace(tzinfo=UTC) 

55 if ts < floor_ts: 

56 raise HTTPException( 

57 status_code=400, 

58 detail={ 

59 "code": "as_of_before_retention_floor", 

60 "message": "as_of predates the retention horizon for this deployment (§24.2.2)", # noqa: E501 

61 }, 

62 ) 

63 except HTTPException: 

64 raise 

65 except Exception as exc: # nosec B110 

66 logger.warning("could not read retention floor while validating as_of: %s", exc) 

67 return ts 

68 

69 

70def _legal_hold_blocks_query(conn: Any, entity: str) -> bool: 

71 """F-14 §24.3.2: True if a legal-hold tombstone covers *entity* (no active revocation).""" 

72 legal_hold_row = conn.execute( 

73 """SELECT 1 FROM tombstones t 

74 WHERE t.entity_uri = ? 

75 AND t.legal_hold = 1 

76 AND NOT EXISTS ( 

77 SELECT 1 FROM tombstone_revocations r WHERE r.tombstone_id = t.id 

78 ) 

79 LIMIT 1""", 

80 (entity,), 

81 ).fetchone() 

82 return legal_hold_row is not None 

83 

84 

85_AS_OF_SELECT_SQL = ( 

86 "SELECT f.* FROM facts f" 

87 " WHERE f.tenant_id = ?" 

88 " AND f.timestamp <= ?" 

89 " AND (f.valid_until IS NULL OR f.valid_until > ?)" 

90 " AND NOT EXISTS (" 

91 " SELECT 1 FROM fact_retractions fr" 

92 " WHERE fr.fact_id = f.id AND fr.retracted_at <= ?" 

93 " )" 

94 " AND (? IS NULL" 

95 " OR f.entity = ?" 

96 " OR f.entity IN (SELECT raw_uri FROM entity_aliases WHERE canonical_uri = ?))" 

97 " AND (? IS NULL OR f.relation = ?)" 

98 " AND (? IS NULL OR f.scope = ?)" 

99 " AND (? IS NULL OR f.id > ?)" 

100 " ORDER BY f.timestamp DESC, f.id DESC" 

101 " LIMIT ?" 

102) 

103 

104_GARDEN_VISIBILITY_NONE = 0 

105_GARDEN_VISIBILITY_EXACT = 1 

106_GARDEN_VISIBILITY_VISIBLE_SET = 2 

107_GARDEN_VISIBILITY_NULL_ONLY = 3 

108 

109_FACT_QUERY_SQL = ( 

110 "SELECT f.*, " 

111 "COALESCE(fvo.valid_until, f.valid_until) AS projected_valid_until, " 

112 "COALESCE(fvo.confidence, f.confidence) AS projected_confidence, " 

113 "COALESCE(fgm.garden_id, f.garden_id) AS projected_garden_id, " 

114 "COALESCE(fqs.quarantine_status, f.quarantine_status) AS projected_quarantine_status, " 

115 "COALESCE(fqs.quarantine_garden_id, f.quarantine_garden_id) " 

116 "AS projected_quarantine_garden_id, " 

117 "COALESCE(f.cid, (SELECT fca.cid FROM fact_cid_aliases fca " 

118 "WHERE fca.fact_id = f.id ORDER BY fca.cid LIMIT 1)) AS projected_cid" 

119 " FROM facts f" 

120 " LEFT JOIN fact_validity_overrides fvo ON fvo.fact_id = f.id" 

121 " LEFT JOIN fact_garden_membership fgm ON fgm.fact_id = f.id" 

122 " LEFT JOIN fact_quarantine_status fqs ON fqs.fact_id = f.id" 

123 " WHERE COALESCE(fvo.confidence, f.confidence) >= ?" 

124 " AND f.tenant_id = ?" 

125 " AND (" 

126 " ? = 0" 

127 " OR (? = 1 AND COALESCE(fgm.garden_id, f.garden_id) = ?)" 

128 " OR (? = 2 AND (COALESCE(fgm.garden_id, f.garden_id) IS NULL" 

129 " OR EXISTS (SELECT 1 FROM _query_visible_gardens qvg" 

130 " WHERE qvg.id = COALESCE(fgm.garden_id, f.garden_id))))" 

131 " OR (? = 3 AND COALESCE(fgm.garden_id, f.garden_id) IS NULL)" 

132 " )" 

133 " AND (? IS NULL OR f.attested = ?)" 

134 " AND (? IS NULL" 

135 " OR f.entity = ?" 

136 " OR f.entity IN (SELECT raw_uri FROM entity_aliases WHERE canonical_uri = ?))" 

137 " AND (? IS NULL OR f.relation = ?)" 

138 " AND (? IS NULL" 

139 " OR f.source = ?" 

140 " OR f.source IN (SELECT raw_uri FROM entity_aliases WHERE canonical_uri = ?))" 

141 " AND (? IS NULL OR f.scope = ?)" 

142 " AND (? IS NULL OR f.timestamp > ?)" 

143 " AND (? IS NULL OR f.id > ?)" 

144 " AND (? = 1" 

145 " OR COALESCE(fvo.valid_until, f.valid_until) IS NULL" 

146 " OR COALESCE(fvo.valid_until, f.valid_until) > ?)" 

147 " ORDER BY f.timestamp DESC, f.id DESC LIMIT ?" 

148) 

149 

150def _build_as_of_params( 

151 *, 

152 entity: str | None, 

153 scope: str | None, 

154 relation: str | None, 

155 as_of: str, 

156 tenant_id: str, 

157 cursor: str | None, 

158 limit: int, 

159) -> list[Any]: 

160 """Return the bind values for ``_AS_OF_SELECT_SQL``. 

161 

162 The SQL text is the ``_AS_OF_SELECT_SQL`` module-level constant; this 

163 helper only computes bind values. Keeping the SQL string out of any 

164 function that takes user input prevents CodeQL from interprocedurally 

165 tainting it — see issue #121 for why a function that takes user 

166 inputs and returns ``(sql, params)`` still trips ``py/sql-injection`` 

167 even when the returned SQL value is invariant. 

168 """ 

169 if scope is not None and scope not in VALID_SCOPES: 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true

170 raise HTTPException(status_code=400, detail=f"scope must be one of {VALID_SCOPES}") 

171 

172 # Normalize empty strings to None so the IS NULL gate matches the 

173 # previous ``if entity:`` truthiness behaviour. 

174 entity_p = entity or None 

175 relation_p = relation or None 

176 scope_p = scope or None 

177 cursor_p = cursor or None 

178 

179 return [ 

180 tenant_id, 

181 as_of, 

182 as_of, 

183 as_of, 

184 entity_p, 

185 entity_p, 

186 entity_p, 

187 relation_p, 

188 relation_p, 

189 scope_p, 

190 scope_p, 

191 cursor_p, 

192 cursor_p, 

193 limit + 1, 

194 ] 

195 

196 

197def _query_facts_as_of_impl( 

198 conn: Any, 

199 *, 

200 entity: str | None, 

201 scope: str | None, 

202 relation: str | None, 

203 as_of: str, 

204 is_admin_caller: bool, 

205 tenant_id: str, 

206 limit: int, 

207 cursor: str | None, 

208) -> QueryResponse: 

209 """Return facts visible at as_of per §24.4. 

210 

211 Retraction gating uses fact_retractions.retracted_at (append-only log), NOT facts.confidence. 

212 Expiry gating uses facts.valid_until. 

213 Tombstone filter per §24.3: retroactive RTBF unless legal_hold=true AND is_admin_caller. 

214 """ 

215 # F-14 §24.3.2: pre-check — agent-key callers get empty results if a legal-hold 

216 # tombstone covers the queried entity (short-circuit before executing the query) 

217 if entity and not is_admin_caller and _legal_hold_blocks_query(conn, entity): 

218 return QueryResponse(facts=[], total=None, cursor=None) 

219 

220 params = _build_as_of_params( 

221 entity=entity, 

222 scope=scope, 

223 relation=relation, 

224 as_of=as_of, 

225 tenant_id=tenant_id, 

226 cursor=cursor, 

227 limit=limit, 

228 ) 

229 

230 rows = conn.execute(_AS_OF_SELECT_SQL, params).fetchall() 

231 has_more = len(rows) > limit 

232 rows = rows[:limit] 

233 

234 seen: dict[tuple[str, str, str], int] = {} 

235 for r in rows: 

236 key = (r["entity"], r["relation"], r["scope"]) 

237 seen[key] = seen.get(key, 0) + 1 

238 

239 for r in rows: 

240 enforce_read_path_cid(r) 

241 records = [ 

242 row_to_record(r, contradicted=seen[(r["entity"], r["relation"], r["scope"])] > 1) 

243 for r in rows 

244 ] 

245 

246 # §24.3: tombstone filter for as_of queries 

247 tombstone_notices: list[TombstoneNotice] = [] 

248 tombstone_filtered = False 

249 if records: 

250 entity_uris = list({r.entity for r in records}) 

251 excluded, tombstone_notices = _get_tombstone_filter( 

252 conn, entity_uris, scope or "local", is_admin_caller 

253 ) 

254 if excluded: 

255 records = [r for r in records if r.entity not in excluded] 

256 tombstone_filtered = True 

257 

258 next_cursor = rows[-1]["id"] if has_more and rows else None 

259 # §23.3.3 r.3: suppress total when tombstone filtering was applied to prevent oracle leakage 

260 total = None if tombstone_filtered else len(records) 

261 return QueryResponse( 

262 facts=records, 

263 total=total, 

264 cursor=next_cursor, 

265 tombstone_notices=tombstone_notices, 

266 ) 

267 

268 

269@router.get("", response_model=QueryResponse) 

270def query_facts( 

271 identity: Annotated[Identity, Depends(resolve_identity)], 

272 response: Response, 

273 session_id: Annotated[str | None, Header(alias="Stigmem-Session")] = None, 

274 entity: str | None = Query(None), 

275 relation: str | None = Query(None), 

276 source: str | None = Query(None), 

277 scope: str | None = Query(None), 

278 min_confidence: float = Query(0.0, ge=0.0, le=1.0), 

279 include_contradicted: bool = Query(False), 

280 include_expired: bool = Query(False), 

281 after: str | None = Query( 

282 None, description="Return facts with timestamp > this ISO 8601 value" 

283 ), # noqa: E501 

284 cursor: str | None = Query(None, description="Opaque pagination cursor (fact id)"), 

285 limit: int = Query(50, ge=1, le=500), 

286 garden_id: str | None = Query( 

287 None, description="Filter to facts in this garden (Spec-02-Scopes-and-ACL)" 

288 ), # noqa: E501 

289 attested: bool | None = Query( 

290 None, description="Filter by source-attestation status (Spec-X6-Source-Attestation)" 

291 ), # noqa: E501 

292 include_low_trust: bool = Query( 

293 False, 

294 description="Include facts with effective_confidence < 0.3 (Spec-05-Federation-Trust)", 

295 ), # noqa: E501 

296 as_of: str | None = Query( 

297 None, 

298 description="Time-travel query: return facts visible at this ISO 8601 timestamp (Spec-X3-Time-Travel-Queries)", # noqa: E501 

299 ), # noqa: E501 

300) -> QueryResponse: 

301 """Query facts by pattern (Spec-03-HTTP-API). 

302 

303 Omitted fields are wildcards. Entity/source are normalized by Spec-01-Fact-Model. 

304 """ 

305 if not identity.can_read(): 

306 raise HTTPException( 

307 status_code=status.HTTP_403_FORBIDDEN, detail="read permission required" 

308 ) # noqa: E501 

309 

310 request_id = str(uuid.uuid4()) 

311 tenant = TenantContext( 

312 tenant_id=identity.tenant_id, 

313 metadata={"tenant_context_source": "hook"}, 

314 ) 

315 registry = get_registry() 

316 query_payload: dict[str, Any] = { 

317 "entity": entity, 

318 "relation": relation, 

319 "source": source, 

320 "scope": scope, 

321 "min_confidence": min_confidence, 

322 "include_contradicted": include_contradicted, 

323 "include_expired": include_expired, 

324 "after": after, 

325 "cursor": cursor, 

326 "limit": limit, 

327 "garden_id": garden_id, 

328 "attested": attested, 

329 "include_low_trust": include_low_trust, 

330 "as_of": as_of, 

331 } 

332 decision = registry.fire_voting( 

333 "pre_recall_authorize", 

334 identity=identity, 

335 tenant=tenant, 

336 request_id=request_id, 

337 query=query_payload, 

338 ) 

339 if isinstance(decision, Deny): 

340 registry.fire_fire_and_forget( 

341 "post_recall_audit", 

342 result=None, 

343 identity=identity, 

344 tenant=tenant, 

345 request_id=request_id, 

346 outcome=Failure(reason=decision.reason), 

347 ) 

348 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=decision.reason) 

349 

350 rewritten_query = registry.fire_filter_chain( 

351 "pre_recall_rewrite", 

352 query_payload, 

353 identity=identity, 

354 tenant=tenant, 

355 request_id=request_id, 

356 ) 

357 entity = rewritten_query["entity"] 

358 relation = rewritten_query["relation"] 

359 source = rewritten_query["source"] 

360 scope = rewritten_query["scope"] 

361 min_confidence = rewritten_query["min_confidence"] 

362 include_contradicted = rewritten_query["include_contradicted"] 

363 include_expired = rewritten_query["include_expired"] 

364 after = rewritten_query["after"] 

365 cursor = rewritten_query["cursor"] 

366 limit = rewritten_query["limit"] 

367 garden_id = rewritten_query["garden_id"] 

368 attested = rewritten_query["attested"] 

369 include_low_trust = rewritten_query["include_low_trust"] 

370 as_of = rewritten_query["as_of"] 

371 

372 # §24.4: time-travel query — delegate to as_of implementation 

373 if as_of is not None: 

374 require_time_travel_enabled(registry, surface="fact_query") 

375 _validate_as_of(as_of) 

376 with db() as conn: 

377 result = _query_facts_as_of_impl( 

378 conn, 

379 entity=entity, 

380 scope=scope, 

381 relation=relation, 

382 as_of=as_of, 

383 is_admin_caller=identity.is_admin(), 

384 tenant_id=identity.tenant_id, 

385 limit=limit, 

386 cursor=cursor, 

387 ) 

388 if result.total is not None: 

389 response.headers["X-Total-Count"] = str(result.total) 

390 with db() as conn: 

391 record_read_scopes( 

392 conn, 

393 identity=identity, 

394 session_id=session_id, 

395 scopes={fact.scope for fact in result.facts}, 

396 ) 

397 registry.fire_fire_and_forget( 

398 "post_recall_audit", 

399 result=result, 

400 identity=identity, 

401 tenant=tenant, 

402 request_id=request_id, 

403 outcome=Success(), 

404 ) 

405 return result 

406 

407 FACT_READ.labels(principal=identity.entity_uri, tenant=identity.tenant_id).inc() 

408 

409 # Garden ACL: resolve and enforce membership before querying (spec §5.20, §17.3) 

410 garden = _resolve_garden_or_404(garden_id, identity) 

411 

412 with db() as conn: 

413 garden_visibility_mode, exact_garden_id, visible_garden_ids = _resolve_garden_visibility( 

414 conn, garden, identity 

415 ) 

416 _prepare_garden_visibility_table(conn, visible_garden_ids) 

417 params = _build_query_params( 

418 identity=identity, 

419 garden_visibility_mode=garden_visibility_mode, 

420 exact_garden_id=exact_garden_id, 

421 entity=entity, 

422 relation=relation, 

423 source=source, 

424 scope=scope, 

425 min_confidence=min_confidence, 

426 attested=attested, 

427 after=after, 

428 cursor=cursor, 

429 include_expired=include_expired, 

430 limit=limit, 

431 ) 

432 rows = conn.execute(_FACT_QUERY_SQL, params).fetchall() 

433 

434 has_more = len(rows) > limit 

435 rows = rows[:limit] 

436 

437 records = _rows_to_records(rows) 

438 if not include_contradicted: 

439 records = [r for r in records if not r.contradicted] 

440 

441 # v1.1: apply recall-time trust multiplier + content sanitizer (§19.4.4, §19.7) 

442 records = apply_recall_pipeline(records, identity=identity, include_low_trust=include_low_trust) 

443 

444 # §23.3: tombstone filter — must be applied after scope filtering, before packing 

445 records, tombstone_filtered = _apply_tombstone_filter(records, scope, identity) 

446 records = registry.fire_filter_chain( 

447 "recall_filter", 

448 records, 

449 identity=identity, 

450 tenant=tenant, 

451 request_id=request_id, 

452 ) 

453 score_deltas = registry.fire_score_delta( 

454 "recall_rank", 

455 records, 

456 identity=identity, 

457 tenant=tenant, 

458 request_id=request_id, 

459 ) 

460 if score_deltas: 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true

461 records = sorted(records, key=lambda record: score_deltas.get(record.id, 0.0), reverse=True) 

462 

463 next_cursor = rows[-1]["id"] if has_more and rows else None 

464 # §23.3.3 r.3: suppress total when tombstone filtering was applied to prevent oracle leakage 

465 total = None if tombstone_filtered else len(records) 

466 result = QueryResponse(facts=records, total=total, cursor=next_cursor) 

467 if result.total is not None: 

468 response.headers["X-Total-Count"] = str(result.total) 

469 with db() as conn: 

470 record_read_scopes( 

471 conn, 

472 identity=identity, 

473 session_id=session_id, 

474 scopes={fact.scope for fact in records}, 

475 ) 

476 registry.fire_fire_and_forget( 

477 "post_recall_audit", 

478 result=result, 

479 identity=identity, 

480 tenant=tenant, 

481 request_id=request_id, 

482 outcome=Success(), 

483 ) 

484 return result 

485 

486 

487def _resolve_garden_or_404(garden_id: str | None, identity: Identity) -> Any: 

488 """Return the garden row when ``garden_id`` is set; 404 if missing; enforce read ACL.""" 

489 if garden_id is None: 

490 return None 

491 garden = get_garden_by_garden_uri(garden_id, tenant_id=identity.tenant_id) 

492 if garden is None: 492 ↛ 493line 492 didn't jump to line 493 because the condition on line 492 was never true

493 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="garden not found") 

494 require_garden_read(garden, identity) 

495 return garden 

496 

497 

498def _resolve_garden_visibility( 

499 conn: Any, 

500 garden: Any, 

501 identity: Identity, 

502) -> tuple[int, str | None, list[str]]: 

503 """Return query visibility mode, exact garden id, and visible garden id set.""" 

504 if garden is not None: 

505 return _GARDEN_VISIBILITY_EXACT, garden["id"], [] 

506 if not recall_filter_enabled(): 

507 return _GARDEN_VISIBILITY_NONE, None, [] 

508 

509 visible_garden_ids = [ 

510 row["id"] 

511 for row in conn.execute( 

512 "SELECT g.id FROM gardens g" 

513 " WHERE g.tenant_id = ?" 

514 " AND EXISTS (" 

515 " SELECT 1 FROM garden_members gm" 

516 " WHERE gm.garden_id = g.id AND gm.entity_uri = ?" 

517 " )", 

518 (identity.tenant_id, identity.entity_uri), 

519 ).fetchall() 

520 ] 

521 if visible_garden_ids: 521 ↛ 522line 521 didn't jump to line 522 because the condition on line 521 was never true

522 return _GARDEN_VISIBILITY_VISIBLE_SET, None, visible_garden_ids 

523 return _GARDEN_VISIBILITY_NULL_ONLY, None, [] 

524 

525 

526def _prepare_garden_visibility_table(conn: Any, visible_garden_ids: list[str]) -> None: 

527 """Populate the per-connection visibility table referenced by ``_FACT_QUERY_SQL``.""" 

528 conn.execute("CREATE TEMP TABLE IF NOT EXISTS _query_visible_gardens (id TEXT PRIMARY KEY)") 

529 conn.execute("DELETE FROM _query_visible_gardens") 

530 for garden_id in visible_garden_ids: 530 ↛ 531line 530 didn't jump to line 531 because the loop on line 530 never started

531 conn.execute("INSERT OR IGNORE INTO _query_visible_gardens (id) VALUES (?)", (garden_id,)) 

532 

533 

534def _normalise_uri_or_raw(raw_value: str) -> str: 

535 """Best-effort URI normalisation; falls back to the raw value on failure.""" 

536 try: # noqa: SIM105 

537 return normalize_entity_uri(raw_value) 

538 except NormalizationError: 

539 return raw_value # malformed — fall through to exact match 

540 

541 

542def _build_query_params( # noqa: PLR0913 — narrow internal helper, keeps query_facts signature flat 

543 *, 

544 identity: Identity, 

545 garden_visibility_mode: int, 

546 exact_garden_id: str | None, 

547 entity: str | None, 

548 relation: str | None, 

549 source: str | None, 

550 scope: str | None, 

551 min_confidence: float, 

552 attested: bool | None, 

553 after: str | None, 

554 cursor: str | None, 

555 include_expired: bool, 

556 limit: int, 

557) -> list[Any]: 

558 """Return bind values for the fixed ``_FACT_QUERY_SQL`` template.""" 

559 if scope and scope not in VALID_SCOPES: 559 ↛ 560line 559 didn't jump to line 560 because the condition on line 559 was never true

560 raise HTTPException(status_code=400, detail=f"scope must be one of {VALID_SCOPES}") 

561 

562 normalised_entity = _normalise_uri_or_raw(entity) if entity else None 

563 normalised_source = _normalise_uri_or_raw(source) if source else None 

564 attested_value = None if attested is None else 1 if attested else 0 

565 now = datetime.now(UTC).isoformat() 

566 return [ 

567 min_confidence, 

568 identity.tenant_id, 

569 garden_visibility_mode, 

570 garden_visibility_mode, 

571 exact_garden_id, 

572 garden_visibility_mode, 

573 garden_visibility_mode, 

574 attested_value, 

575 attested_value, 

576 normalised_entity, 

577 normalised_entity, 

578 normalised_entity, 

579 relation or None, 

580 relation or None, 

581 normalised_source, 

582 normalised_source, 

583 normalised_source, 

584 scope or None, 

585 scope or None, 

586 after or None, 

587 after or None, 

588 cursor or None, 

589 cursor or None, 

590 1 if include_expired else 0, 

591 now, 

592 limit + 1, 

593 ] 

594 

595 

596def _rows_to_records(rows: list[Any]) -> list[FactRecord]: 

597 """Convert raw rows into FactRecords; mark within-key duplicates contradicted.""" 

598 seen: dict[tuple[str, str, str], int] = {} 

599 for r in rows: 

600 enforce_read_path_cid(r) 

601 key = (r["entity"], r["relation"], r["scope"]) 

602 seen[key] = seen.get(key, 0) + 1 

603 return [ 

604 row_to_record(r, contradicted=seen[(r["entity"], r["relation"], r["scope"])] > 1) 

605 for r in rows 

606 ] 

607 

608 

609def _apply_tombstone_filter( 

610 records: list[FactRecord], 

611 scope: str | None, 

612 identity: Identity, 

613) -> tuple[list[FactRecord], bool]: 

614 """Return (filtered, tombstone_filtered_flag). Empty input → no work.""" 

615 if not records: 

616 return records, False 

617 entity_uris_in_result = list({r.entity for r in records}) 

618 with db() as _tc_conn: 

619 excluded, _notices = _get_tombstone_filter( 

620 _tc_conn, entity_uris_in_result, scope or "local", identity.is_admin() 

621 ) 

622 if not excluded: 

623 return records, False 

624 return [r for r in records if r.entity not in excluded], True