Coverage for node / src / stigmem_node / trust_rules.py: 95%

59 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Operator-configured auto-trust rules — spec §19 (ACM-186 scope). 

2 

3Rules are loaded from a YAML file at path STIGMEM_TRUST_RULES_FILE, or 

4from the database table `quarantine_rules` (loaded at runtime via the admin API). 

5 

6YAML format (documented in docs/trust-rules.md): 

7 

8 # Always trust facts from org-a about any entity 

9 always_trust: 

10 - org_uri: "stigmem://org-a.example.com" 

11 scope: null # null = all scopes 

12 entity_prefix: null # null = all entities 

13 reason: "Tier-1 partner" 

14 

15 # Never trust org-b when asserting about scope 'public' 

16 never_trust: 

17 - org_uri: "stigmem://org-b.example.com" 

18 scope: "public" 

19 entity_prefix: null 

20 reason: "Untrusted external feed" 

21 

22evaluate_auto_rules(source_uri, scope) returns: 

23 1.0 — if an always_trust rule matches 

24 0.0 — if a never_trust rule matches 

25 None — no matching rule; caller falls through to computed score 

26""" 

27 

28from __future__ import annotations 

29 

30import logging 

31from typing import Any 

32 

33logger = logging.getLogger("stigmem.trust_rules") 

34 

35 

36def _load_yaml_rules(path: str) -> dict[str, list[dict[str, Any]]]: 

37 try: 

38 import yaml 

39 except ImportError: 

40 logger.warning("PyYAML not installed — trust_rules_file ignored") 

41 return {} 

42 

43 try: 

44 with open(path) as f: 

45 data = yaml.safe_load(f) 

46 except FileNotFoundError: 

47 logger.warning("trust_rules_file not found: %s", path) 

48 return {} 

49 except Exception as exc: 

50 logger.error("Failed to load trust_rules_file %s: %s", path, exc) 

51 return {} 

52 

53 if not isinstance(data, dict): 

54 return {} 

55 return data 

56 

57 

58def _rule_matches(rule: dict[str, Any], source_uri: str, scope: str) -> bool: 

59 if rule.get("org_uri") != source_uri: 

60 return False 

61 rule_scope = rule.get("scope") 

62 return not (rule_scope is not None and rule_scope != scope) 

63 

64 

65def evaluate_auto_rules(source_uri: str, scope: str) -> float | None: 

66 """Return 1.0, 0.0, or None per the operator-configured rules.""" 

67 from .settings import settings 

68 

69 # First check DB-stored rules (runtime admin API) 

70 db_result = _evaluate_db_rules(source_uri, scope) 

71 if db_result is not None: 

72 return db_result 

73 

74 # Then check file-based rules 

75 rules_file = settings.trust_rules_file 

76 if not rules_file: 

77 return None 

78 

79 rules = _load_yaml_rules(rules_file) 

80 

81 for rule in rules.get("always_trust", []): 

82 if _rule_matches(rule, source_uri, scope): 

83 return 1.0 

84 

85 for rule in rules.get("never_trust", []): 

86 if _rule_matches(rule, source_uri, scope): 

87 return 0.0 

88 

89 return None 

90 

91 

92def _evaluate_db_rules(source_uri: str, scope: str) -> float | None: 

93 """Check quarantine_rules table for matching always/never rules.""" 

94 try: 

95 from .db import db 

96 

97 with db() as conn: 

98 rows = conn.execute( 

99 "SELECT rule_type, scope FROM quarantine_rules WHERE org_uri = ?", 

100 (source_uri,), 

101 ).fetchall() 

102 except Exception: 

103 return None 

104 

105 for row in rows: 

106 rule_scope = row["scope"] 

107 if rule_scope is not None and rule_scope != scope: 

108 continue 

109 if row["rule_type"] == "always_trust": 

110 return 1.0 

111 if row["rule_type"] == "never_trust": 111 ↛ 105line 111 didn't jump to line 105 because the condition on line 111 was always true

112 return 0.0 

113 

114 return None