Coverage for node / src / stigmem_node / routes / agent_keys.py: 95%

71 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Track C / C1 — per-agent Ed25519 keypair registration. 

2 

3Agents register their public key; the node can then verify attestation tokens 

4on fact assertions to prove the caller owns the source identity. 

5 

6POST /v1/auth/agent-keys register a public key for the calling entity 

7GET /v1/auth/agent-keys list my registered agent keys 

8DELETE /v1/auth/agent-keys/{key_id} revoke a key (sets status=revoked) 

9""" 

10 

11from __future__ import annotations 

12 

13import base64 

14import logging 

15import uuid 

16from datetime import UTC, datetime 

17from typing import Annotated 

18 

19from cryptography.exceptions import InvalidKey 

20from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey 

21from fastapi import APIRouter, Depends, HTTPException, status 

22 

23from ..auth import Identity, resolve_identity 

24from ..db import db 

25from ..models.identity import AgentKeyRecord, AgentKeyRegisterRequest 

26 

27logger = logging.getLogger("stigmem.agent_keys") 

28router = APIRouter(prefix="/v1/auth/agent-keys", tags=["auth"]) 

29 

30 

31def _decode_pubkey(b64: str) -> Ed25519PublicKey: 

32 """Decode a base64url Ed25519 public key; raise ValueError on bad input.""" 

33 try: 

34 raw = base64.urlsafe_b64decode(b64 + "=" * (-len(b64) % 4)) 

35 return Ed25519PublicKey.from_public_bytes(raw) 

36 except Exception as exc: 

37 raise ValueError(f"invalid Ed25519 public key: {exc}") from exc 

38 

39 

40@router.post("", response_model=AgentKeyRecord, status_code=status.HTTP_201_CREATED) 

41def register_agent_key( 

42 req: AgentKeyRegisterRequest, 

43 identity: Annotated[Identity, Depends(resolve_identity)], 

44) -> AgentKeyRecord: 

45 """Register an Ed25519 public key for source-attestation on fact assertions.""" 

46 if not identity.can_write(): 46 ↛ 47line 46 didn't jump to line 47 because the condition on line 46 was never true

47 raise HTTPException( 

48 status_code=status.HTTP_403_FORBIDDEN, detail="write permission required" 

49 ) 

50 

51 try: 

52 _decode_pubkey(req.public_key) 

53 except ValueError as exc: 

54 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc 

55 

56 key_id = str(uuid.uuid4()) 

57 now = datetime.now(UTC).isoformat() 

58 

59 # Normalise key: strip padding to store canonical base64url 

60 canonical_key = req.public_key.rstrip("=") 

61 

62 with db() as conn: 

63 conn.execute( 

64 """INSERT INTO agent_keys 

65 (id, entity_uri, public_key, description, registered_at, status) 

66 VALUES (?,?,?,?,?,?)""", 

67 (key_id, identity.entity_uri, canonical_key, req.description, now, "active"), 

68 ) 

69 

70 logger.info("Agent key registered: id=%s entity=%s", key_id, identity.entity_uri) 

71 return AgentKeyRecord( 

72 id=key_id, 

73 entity_uri=identity.entity_uri, 

74 public_key=canonical_key, 

75 description=req.description, 

76 registered_at=now, 

77 status="active", 

78 ) 

79 

80 

81@router.get("", response_model=list[AgentKeyRecord]) 

82def list_agent_keys( 

83 identity: Annotated[Identity, Depends(resolve_identity)], 

84) -> list[AgentKeyRecord]: 

85 """List all agent keys (active and revoked) for the caller's entity.""" 

86 with db() as conn: 

87 rows = conn.execute( 

88 "SELECT * FROM agent_keys WHERE entity_uri = ? ORDER BY registered_at DESC", 

89 (identity.entity_uri,), 

90 ).fetchall() 

91 return [ 

92 AgentKeyRecord( 

93 id=r["id"], 

94 entity_uri=r["entity_uri"], 

95 public_key=r["public_key"], 

96 description=r["description"], 

97 registered_at=r["registered_at"], 

98 status=r["status"], 

99 ) 

100 for r in rows 

101 ] 

102 

103 

104@router.delete("/{key_id}", status_code=status.HTTP_204_NO_CONTENT) 

105def revoke_agent_key( 

106 key_id: str, 

107 identity: Annotated[Identity, Depends(resolve_identity)], 

108) -> None: 

109 """Revoke an agent key. Callers may only revoke their own keys.""" 

110 with db() as conn: 

111 row = conn.execute( 

112 "SELECT entity_uri, status FROM agent_keys WHERE id = ?", (key_id,) 

113 ).fetchone() 

114 if row is None: 

115 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="agent key not found") 

116 if row["entity_uri"] != identity.entity_uri: 

117 raise HTTPException( 

118 status_code=status.HTTP_403_FORBIDDEN, 

119 detail="cannot revoke another entity's key", 

120 ) 

121 if row["status"] == "revoked": 

122 raise HTTPException( 

123 status_code=status.HTTP_409_CONFLICT, 

124 detail="key is already revoked", 

125 ) 

126 conn.execute("UPDATE agent_keys SET status = 'revoked' WHERE id = ?", (key_id,)) 

127 

128 logger.info("Agent key revoked: id=%s by entity=%s", key_id, identity.entity_uri) 

129 

130 

131# --------------------------------------------------------------------------- 

132# Internal helper — used by routes/facts.py for attestation verification 

133# --------------------------------------------------------------------------- 

134 

135 

136def verify_attestation( 

137 key_id: str, 

138 signature_b64: str, 

139 canonical_message: bytes, 

140 caller_entity_uri: str, 

141) -> str: 

142 """Verify an Ed25519 attestation token. Returns key_id on success. 

143 

144 Raises HTTPException on any verification failure. 

145 """ 

146 with db() as conn: 

147 row = conn.execute( 

148 "SELECT entity_uri, public_key, status FROM agent_keys WHERE id = ?", (key_id,) 

149 ).fetchone() 

150 

151 if row is None: 

152 raise HTTPException( 

153 status_code=status.HTTP_400_BAD_REQUEST, 

154 detail=f"attestation.key_id {key_id!r} not found", 

155 ) 

156 if row["status"] == "revoked": 

157 raise HTTPException( 

158 status_code=status.HTTP_400_BAD_REQUEST, 

159 detail=f"attestation.key_id {key_id!r} has been revoked", 

160 ) 

161 if row["entity_uri"] != caller_entity_uri: 

162 raise HTTPException( 

163 status_code=status.HTTP_403_FORBIDDEN, 

164 detail="attestation key belongs to a different entity", 

165 ) 

166 

167 try: 

168 pubkey = _decode_pubkey(row["public_key"]) 

169 except ValueError as exc: 

170 raise HTTPException( 

171 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 

172 detail="stored public key is corrupt", 

173 ) from exc 

174 

175 try: 

176 raw_sig = base64.urlsafe_b64decode(signature_b64 + "=" * (-len(signature_b64) % 4)) 

177 pubkey.verify(raw_sig, canonical_message) 

178 except (InvalidKey, Exception) as exc: 

179 raise HTTPException( 

180 status_code=status.HTTP_400_BAD_REQUEST, 

181 detail="attestation signature verification failed", 

182 ) from exc 

183 

184 return key_id