Coverage for node / src / stigmem_node / routes / federation / tombstones.py: 67%

39 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Federation tombstone routes.""" 

2 

3from __future__ import annotations 

4 

5from typing import Annotated, Any 

6 

7from fastapi import APIRouter, Header, HTTPException, Request, status 

8 

9from ...identity.capability import CapabilityTokenError, verify_token 

10from ...identity.trust_store import get_peer_manifest 

11from ...lifecycle.tombstones import list_revocations, list_tombstones 

12from ...models.tombstones import FederationTombstonesResponse 

13from .._federation_impl import federation_ingest_tombstone_impl 

14from .common import _get_mtls_peer_cert, _public_module, _try_peer_token_auth 

15 

16router = APIRouter(tags=["federation"]) 

17 

18 

19@router.get("/v1/federation/tombstones", response_model=FederationTombstonesResponse) 

20def federation_list_tombstones( 

21 request: Request, 

22 since: str | None = None, 

23 limit: int = 200, 

24 token_header: Annotated[str | None, Header(alias="Authorization")] = None, 

25) -> FederationTombstonesResponse: 

26 """Tombstone poll route. 

27 

28 Requires tombstone:read capability token. Covered by Spec-X2-RTBF-Tombstones. 

29 """ 

30 raw_token = None 

31 if token_header and token_header.startswith("Bearer "): 

32 raw_token = token_header[7:] 

33 

34 if not raw_token: 

35 raise HTTPException( 

36 status_code=status.HTTP_401_UNAUTHORIZED, 

37 detail="capability token required", 

38 ) 

39 

40 fed_settings = _public_module().settings 

41 if fed_settings.trust_mode != "off": 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true

42 try: 

43 import json as _json 

44 

45 token_data = _json.loads(raw_token) if raw_token.startswith("{") else {} 

46 verbs = token_data.get("verbs", token_data.get("verb", "")) 

47 if isinstance(verbs, str): 

48 verbs = [v.strip() for v in verbs.split(",")] if verbs else [] 

49 if "tombstone:read" not in verbs and "admin" not in verbs: 

50 raise HTTPException( 

51 status_code=status.HTTP_401_UNAUTHORIZED, 

52 detail="tombstone:read capability required", 

53 ) 

54 verify_token( 

55 raw_token, 

56 lambda uri: get_peer_manifest( 

57 uri, refresh_if_expired=True, trust_mode=fed_settings.trust_mode 

58 ), 

59 trust_mode=fed_settings.trust_mode, 

60 ) 

61 except CapabilityTokenError as exc: 

62 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(exc)) from exc 

63 else: 

64 import logging as _logging 

65 

66 _logging.getLogger("stigmem.federation").warning( 

67 "tombstone poll: trust_mode=off — token signature verification skipped" 

68 ) 

69 

70 tombstone_list = list_tombstones(since=since)[:limit] 

71 revocation_list = list_revocations(since=since)[:limit] 

72 cursor = tombstone_list[-1].created_at if tombstone_list else None 

73 return FederationTombstonesResponse( 

74 tombstones=tombstone_list, 

75 revocations=revocation_list, 

76 cursor=cursor, 

77 ) 

78 

79 

80@router.post("/v1/federation/tombstones/ingest", status_code=status.HTTP_200_OK) 

81def federation_ingest_tombstone( 

82 request: Request, 

83 payload: dict[str, Any], 

84 authorization: Annotated[str | None, Header(alias="Authorization")] = None, 

85 x_stigmem_capability: Annotated[str | None, Header(alias="x-stigmem-capability")] = None, 

86) -> dict[str, Any]: 

87 """Inbound tombstone push from a federation peer. 

88 

89 Auth: peer JWT or capability token with tombstone:write verb (mirrors push_facts). 

90 Verifies signature against org manifest, writes to local tombstones table. 

91 Covered by Spec-X2-RTBF-Tombstones. 

92 """ 

93 # Implementation lives in _federation_impl.federation_ingest_tombstone_impl. 

94 return federation_ingest_tombstone_impl( 

95 request, 

96 payload, 

97 authorization, 

98 x_stigmem_capability, 

99 _try_peer_token_auth, 

100 _get_mtls_peer_cert, 

101 )