Coverage for node / src / stigmem_node / trust_rules.py: 95%
59 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Operator-configured auto-trust rules — spec §19 (ACM-186 scope).
3Rules are loaded from a YAML file at path STIGMEM_TRUST_RULES_FILE, or
4from the database table `quarantine_rules` (loaded at runtime via the admin API).
6YAML format (documented in docs/trust-rules.md):
8 # Always trust facts from org-a about any entity
9 always_trust:
10 - org_uri: "stigmem://org-a.example.com"
11 scope: null # null = all scopes
12 entity_prefix: null # null = all entities
13 reason: "Tier-1 partner"
15 # Never trust org-b when asserting about scope 'public'
16 never_trust:
17 - org_uri: "stigmem://org-b.example.com"
18 scope: "public"
19 entity_prefix: null
20 reason: "Untrusted external feed"
22evaluate_auto_rules(source_uri, scope) returns:
23 1.0 — if an always_trust rule matches
24 0.0 — if a never_trust rule matches
25 None — no matching rule; caller falls through to computed score
26"""
28from __future__ import annotations
30import logging
31from typing import Any
33logger = logging.getLogger("stigmem.trust_rules")
36def _load_yaml_rules(path: str) -> dict[str, list[dict[str, Any]]]:
37 try:
38 import yaml
39 except ImportError:
40 logger.warning("PyYAML not installed — trust_rules_file ignored")
41 return {}
43 try:
44 with open(path) as f:
45 data = yaml.safe_load(f)
46 except FileNotFoundError:
47 logger.warning("trust_rules_file not found: %s", path)
48 return {}
49 except Exception as exc:
50 logger.error("Failed to load trust_rules_file %s: %s", path, exc)
51 return {}
53 if not isinstance(data, dict):
54 return {}
55 return data
58def _rule_matches(rule: dict[str, Any], source_uri: str, scope: str) -> bool:
59 if rule.get("org_uri") != source_uri:
60 return False
61 rule_scope = rule.get("scope")
62 return not (rule_scope is not None and rule_scope != scope)
65def evaluate_auto_rules(source_uri: str, scope: str) -> float | None:
66 """Return 1.0, 0.0, or None per the operator-configured rules."""
67 from .settings import settings
69 # First check DB-stored rules (runtime admin API)
70 db_result = _evaluate_db_rules(source_uri, scope)
71 if db_result is not None:
72 return db_result
74 # Then check file-based rules
75 rules_file = settings.trust_rules_file
76 if not rules_file:
77 return None
79 rules = _load_yaml_rules(rules_file)
81 for rule in rules.get("always_trust", []):
82 if _rule_matches(rule, source_uri, scope):
83 return 1.0
85 for rule in rules.get("never_trust", []):
86 if _rule_matches(rule, source_uri, scope):
87 return 0.0
89 return None
92def _evaluate_db_rules(source_uri: str, scope: str) -> float | None:
93 """Check quarantine_rules table for matching always/never rules."""
94 try:
95 from .db import db
97 with db() as conn:
98 rows = conn.execute(
99 "SELECT rule_type, scope FROM quarantine_rules WHERE org_uri = ?",
100 (source_uri,),
101 ).fetchall()
102 except Exception:
103 return None
105 for row in rows:
106 rule_scope = row["scope"]
107 if rule_scope is not None and rule_scope != scope:
108 continue
109 if row["rule_type"] == "always_trust":
110 return 1.0
111 if row["rule_type"] == "never_trust": 111 ↛ 105line 111 didn't jump to line 105 because the condition on line 111 was always true
112 return 0.0
114 return None