Coverage for node / src / stigmem_node / garden_acl.py: 94%
51 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Garden ACL enforcement — spec §17.3, §19.5.3.
3Gardens are named, ACL'd partitions above scope (v0.9).
4ACL is checked at fact read and write time in addition to scope enforcement.
5Quarantine gardens extend this with the quarantine:moderator role (v1.1).
6"""
8from __future__ import annotations
10from typing import Any
12from fastapi import HTTPException, status
14from .auth import Identity
15from .db import db
18def get_garden_by_slug_or_id(
19 slug_or_id: str, tenant_id: str | None = None
20) -> dict[str, Any] | None:
21 """Return a garden row by slug or by its UUID id.
23 When tenant_id is provided, slug lookups are scoped to that tenant so that
24 the same slug used by different tenants resolves to the correct garden.
25 UUID lookups are globally unique and do not require tenant scoping.
26 """
27 with db() as conn:
28 if tenant_id is not None:
29 # Slug lookup scoped to tenant; UUID lookup is always unique
30 row = conn.execute(
31 "SELECT * FROM gardens WHERE (slug = ? AND tenant_id = ?) OR id = ?",
32 (slug_or_id, tenant_id, slug_or_id),
33 ).fetchone()
34 else:
35 row = conn.execute(
36 "SELECT * FROM gardens WHERE slug = ? OR id = ?",
37 (slug_or_id, slug_or_id),
38 ).fetchone()
39 return dict(row) if row is not None else None
42def get_garden_by_garden_uri(
43 garden_uri: str, tenant_id: str | None = None
44) -> dict[str, Any] | None:
45 """Return a garden row by its stigmem://authority/garden/{slug} URI."""
46 # Extract slug from URI: stigmem://authority/garden/{slug}
47 parts = garden_uri.split("/garden/", 1)
48 if len(parts) != 2 or not parts[1]: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true
49 return None
50 slug = parts[1].rstrip("/")
51 return get_garden_by_slug_or_id(slug, tenant_id=tenant_id)
54def get_member_role(garden_id: str, entity_uri: str) -> str | None:
55 """Return the role of entity_uri in the given garden UUID, or None if not a member."""
56 with db() as conn:
57 row = conn.execute(
58 "SELECT role FROM garden_members WHERE garden_id = ? AND entity_uri = ?",
59 (garden_id, entity_uri),
60 ).fetchone()
61 return row["role"] if row is not None else None
64def require_garden_write(garden: dict[str, Any], identity: Identity) -> None:
65 """Raise 403 if identity cannot write facts into this garden (spec §17.3)."""
66 role = get_member_role(garden["id"], identity.entity_uri)
67 if role not in ("admin", "writer"):
68 if role == "reader":
69 raise HTTPException(
70 status_code=status.HTTP_403_FORBIDDEN,
71 detail="write permission required — you are a reader in this garden",
72 )
73 raise HTTPException(
74 status_code=status.HTTP_403_FORBIDDEN,
75 detail="not a member of this garden",
76 )
79def require_garden_read(garden: dict[str, Any], identity: Identity) -> None:
80 """Raise 403 if identity cannot read facts from this garden (spec §17.3)."""
81 role = get_member_role(garden["id"], identity.entity_uri)
82 if role is None:
83 raise HTTPException(
84 status_code=status.HTTP_403_FORBIDDEN,
85 detail="not a member of this garden",
86 )
89def require_garden_admin(garden: dict[str, Any], identity: Identity) -> None:
90 """Raise 403 if identity is not an admin of this garden."""
91 role = get_member_role(garden["id"], identity.entity_uri)
92 if role != "admin":
93 raise HTTPException(
94 status_code=status.HTTP_403_FORBIDDEN,
95 detail="garden admin permission required",
96 )
99def caller_can_see_garden(garden_id: str, identity: Identity) -> bool:
100 """Return True if identity holds any role in the garden (for query-time filtering)."""
101 role = get_member_role(garden_id, identity.entity_uri)
102 return role is not None
105def is_node_admin(identity: Identity) -> bool:
106 """Node admin: any identity with write permission (spec §5.15)."""
107 return identity.can_write()
110def require_quarantine_moderator_or_admin(garden: dict[str, Any], identity: Identity) -> None:
111 """Raise 403 if identity cannot promote/reject quarantined facts (spec §19.5.3).
113 This helper checks garden-scoped moderation roles only: 'admin' or
114 'quarantine:moderator' in the quarantine garden membership table.
115 Route-level callers intentionally allow node admins through before this
116 helper runs. Node-admin bypass is intentional because node admins are the
117 system's last-resort moderation authority; garden-scoped moderators must
118 still be members of the specific quarantine garden.
119 """
120 role = get_member_role(garden["id"], identity.entity_uri)
121 if role not in ("admin", "quarantine:moderator"):
122 raise HTTPException(
123 status_code=status.HTTP_403_FORBIDDEN,
124 detail="quarantine:moderator or admin role required to promote/reject facts",
125 )
128def has_elevated_quarantine_role(garden: dict[str, Any], identity: Identity) -> bool:
129 """True if identity holds admin or quarantine:moderator in a quarantine garden."""
130 role = get_member_role(garden["id"], identity.entity_uri)
131 return role in ("admin", "quarantine:moderator")
134def quarantine_garden_has_pending_facts(garden_uuid: str) -> bool:
135 """True if the quarantine garden holds at least one fact with quarantine_status='pending'."""
136 with db() as conn:
137 row = conn.execute(
138 "SELECT f.id FROM facts f"
139 " LEFT JOIN fact_quarantine_status fqs ON fqs.fact_id = f.id"
140 " WHERE COALESCE(fqs.quarantine_garden_id, f.quarantine_garden_id) = ?"
141 " AND COALESCE(fqs.quarantine_status, f.quarantine_status) = 'pending'"
142 " LIMIT 1",
143 (garden_uuid,),
144 ).fetchone()
145 return row is not None