Coverage for node / src / stigmem_node / garden_acl.py: 94%

51 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Garden ACL enforcement — spec §17.3, §19.5.3. 

2 

3Gardens are named, ACL'd partitions above scope (v0.9). 

4ACL is checked at fact read and write time in addition to scope enforcement. 

5Quarantine gardens extend this with the quarantine:moderator role (v1.1). 

6""" 

7 

8from __future__ import annotations 

9 

10from typing import Any 

11 

12from fastapi import HTTPException, status 

13 

14from .auth import Identity 

15from .db import db 

16 

17 

18def get_garden_by_slug_or_id( 

19 slug_or_id: str, tenant_id: str | None = None 

20) -> dict[str, Any] | None: 

21 """Return a garden row by slug or by its UUID id. 

22 

23 When tenant_id is provided, slug lookups are scoped to that tenant so that 

24 the same slug used by different tenants resolves to the correct garden. 

25 UUID lookups are globally unique and do not require tenant scoping. 

26 """ 

27 with db() as conn: 

28 if tenant_id is not None: 

29 # Slug lookup scoped to tenant; UUID lookup is always unique 

30 row = conn.execute( 

31 "SELECT * FROM gardens WHERE (slug = ? AND tenant_id = ?) OR id = ?", 

32 (slug_or_id, tenant_id, slug_or_id), 

33 ).fetchone() 

34 else: 

35 row = conn.execute( 

36 "SELECT * FROM gardens WHERE slug = ? OR id = ?", 

37 (slug_or_id, slug_or_id), 

38 ).fetchone() 

39 return dict(row) if row is not None else None 

40 

41 

42def get_garden_by_garden_uri( 

43 garden_uri: str, tenant_id: str | None = None 

44) -> dict[str, Any] | None: 

45 """Return a garden row by its stigmem://authority/garden/{slug} URI.""" 

46 # Extract slug from URI: stigmem://authority/garden/{slug} 

47 parts = garden_uri.split("/garden/", 1) 

48 if len(parts) != 2 or not parts[1]: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true

49 return None 

50 slug = parts[1].rstrip("/") 

51 return get_garden_by_slug_or_id(slug, tenant_id=tenant_id) 

52 

53 

54def get_member_role(garden_id: str, entity_uri: str) -> str | None: 

55 """Return the role of entity_uri in the given garden UUID, or None if not a member.""" 

56 with db() as conn: 

57 row = conn.execute( 

58 "SELECT role FROM garden_members WHERE garden_id = ? AND entity_uri = ?", 

59 (garden_id, entity_uri), 

60 ).fetchone() 

61 return row["role"] if row is not None else None 

62 

63 

64def require_garden_write(garden: dict[str, Any], identity: Identity) -> None: 

65 """Raise 403 if identity cannot write facts into this garden (spec §17.3).""" 

66 role = get_member_role(garden["id"], identity.entity_uri) 

67 if role not in ("admin", "writer"): 

68 if role == "reader": 

69 raise HTTPException( 

70 status_code=status.HTTP_403_FORBIDDEN, 

71 detail="write permission required — you are a reader in this garden", 

72 ) 

73 raise HTTPException( 

74 status_code=status.HTTP_403_FORBIDDEN, 

75 detail="not a member of this garden", 

76 ) 

77 

78 

79def require_garden_read(garden: dict[str, Any], identity: Identity) -> None: 

80 """Raise 403 if identity cannot read facts from this garden (spec §17.3).""" 

81 role = get_member_role(garden["id"], identity.entity_uri) 

82 if role is None: 

83 raise HTTPException( 

84 status_code=status.HTTP_403_FORBIDDEN, 

85 detail="not a member of this garden", 

86 ) 

87 

88 

89def require_garden_admin(garden: dict[str, Any], identity: Identity) -> None: 

90 """Raise 403 if identity is not an admin of this garden.""" 

91 role = get_member_role(garden["id"], identity.entity_uri) 

92 if role != "admin": 

93 raise HTTPException( 

94 status_code=status.HTTP_403_FORBIDDEN, 

95 detail="garden admin permission required", 

96 ) 

97 

98 

99def caller_can_see_garden(garden_id: str, identity: Identity) -> bool: 

100 """Return True if identity holds any role in the garden (for query-time filtering).""" 

101 role = get_member_role(garden_id, identity.entity_uri) 

102 return role is not None 

103 

104 

105def is_node_admin(identity: Identity) -> bool: 

106 """Node admin: any identity with write permission (spec §5.15).""" 

107 return identity.can_write() 

108 

109 

110def require_quarantine_moderator_or_admin(garden: dict[str, Any], identity: Identity) -> None: 

111 """Raise 403 if identity cannot promote/reject quarantined facts (spec §19.5.3). 

112 

113 This helper checks garden-scoped moderation roles only: 'admin' or 

114 'quarantine:moderator' in the quarantine garden membership table. 

115 Route-level callers intentionally allow node admins through before this 

116 helper runs. Node-admin bypass is intentional because node admins are the 

117 system's last-resort moderation authority; garden-scoped moderators must 

118 still be members of the specific quarantine garden. 

119 """ 

120 role = get_member_role(garden["id"], identity.entity_uri) 

121 if role not in ("admin", "quarantine:moderator"): 

122 raise HTTPException( 

123 status_code=status.HTTP_403_FORBIDDEN, 

124 detail="quarantine:moderator or admin role required to promote/reject facts", 

125 ) 

126 

127 

128def has_elevated_quarantine_role(garden: dict[str, Any], identity: Identity) -> bool: 

129 """True if identity holds admin or quarantine:moderator in a quarantine garden.""" 

130 role = get_member_role(garden["id"], identity.entity_uri) 

131 return role in ("admin", "quarantine:moderator") 

132 

133 

134def quarantine_garden_has_pending_facts(garden_uuid: str) -> bool: 

135 """True if the quarantine garden holds at least one fact with quarantine_status='pending'.""" 

136 with db() as conn: 

137 row = conn.execute( 

138 "SELECT f.id FROM facts f" 

139 " LEFT JOIN fact_quarantine_status fqs ON fqs.fact_id = f.id" 

140 " WHERE COALESCE(fqs.quarantine_garden_id, f.quarantine_garden_id) = ?" 

141 " AND COALESCE(fqs.quarantine_status, f.quarantine_status) = 'pending'" 

142 " LIMIT 1", 

143 (garden_uuid,), 

144 ).fetchone() 

145 return row is not None