Coverage for node / src / stigmem_node / plugins / handlers.py: 96%

90 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Typed hook payload and return helper types.""" 

2 

3from __future__ import annotations 

4 

5from collections.abc import Callable 

6from dataclasses import dataclass, field 

7from datetime import datetime 

8from enum import StrEnum 

9from typing import Any, Protocol, TypeAlias, TypeVar, cast 

10 

11T = TypeVar("T") 

12 

13 

14@dataclass(frozen=True, slots=True) 

15class Allow: 

16 """Voting hook allow decision.""" 

17 

18 

19@dataclass(frozen=True, slots=True) 

20class Deny: 

21 """Voting hook deny decision.""" 

22 

23 reason: str 

24 

25 

26VotingDecision: TypeAlias = Allow | Deny 

27ALLOW_SINGLETON = Allow() 

28 

29 

30@dataclass(frozen=True, slots=True) 

31class Success: 

32 """Audit outcome for successful operations.""" 

33 

34 

35@dataclass(frozen=True, slots=True) 

36class Failure: 

37 """Audit outcome for failed operations.""" 

38 

39 reason: str 

40 exception_type: str | None = None 

41 

42 

43Outcome: TypeAlias = Success | Failure 

44 

45 

46@dataclass(frozen=True, slots=True) 

47class TenantContext: 

48 """Resolved tenant context passed through hook payloads. 

49 

50 Standard metadata keys: 

51 - ``tenant_context_source``: one of ``hook``, ``pinned``, ``resolved``, or 

52 ``system``. ``hook`` marks request-context construction, ``pinned`` marks 

53 federation default-tenant dispatch, ``resolved`` marks plugin promotion, 

54 and ``system`` marks internal system contexts. 

55 - ``source_tenant_id``: optional pre-resolution identity tenant. 

56 - ``resolved_by``: optional plugin name that promoted the tenant. 

57 """ 

58 

59 tenant_id: str 

60 metadata: dict[str, Any] = field(default_factory=dict) 

61 

62 

63SYSTEM_TENANT = TenantContext( 

64 tenant_id="system", 

65 metadata={"tenant_context_source": "system"}, 

66) 

67 

68 

69@dataclass(frozen=True, slots=True) 

70class Migration: 

71 """Schema migration declared by core or a plugin.""" 

72 

73 plugin_name: str 

74 migration_id: int 

75 sql: str 

76 description: str 

77 plugin_version: str = "0.0.0" 

78 backend: str = "sqlite" 

79 

80 

81class PluginHealthStatus(StrEnum): 

82 HEALTHY = "healthy" 

83 DEGRADED = "degraded" 

84 UNHEALTHY = "unhealthy" 

85 UNKNOWN = "unknown" 

86 

87 

88@dataclass(frozen=True, slots=True) 

89class PluginHealth: 

90 """Result returned by a plugin lifecycle health check.""" 

91 

92 status: PluginHealthStatus 

93 message: str = "" 

94 

95 

96@dataclass(frozen=True, slots=True) 

97class PluginHealthReport: 

98 """Last observed lifecycle health state for one plugin.""" 

99 

100 plugin_name: str 

101 plugin_version: str 

102 status: PluginHealthStatus 

103 message: str 

104 checked_at: datetime 

105 error_summary: str | None = None 

106 

107 

108@dataclass(frozen=True, slots=True) 

109class PluginInfo: 

110 """Operator-facing registered plugin metadata.""" 

111 

112 name: str 

113 version: str 

114 capabilities: tuple[str, ...] 

115 hooks: tuple[str, ...] 

116 depends_on: tuple[str, ...] 

117 discovery_source: dict[str, Any] 

118 signed_by: str 

119 

120 

121@dataclass(frozen=True, slots=True) 

122class AuditEvent: 

123 """Normalized plugin audit event payload.""" 

124 

125 event_type: str 

126 actor_uri: str 

127 target_uri: str | None 

128 tenant_id: str 

129 timestamp: datetime 

130 outcome: Outcome 

131 metadata: dict[str, Any] = field(default_factory=dict) 

132 

133 

134class VotingHandler(Protocol): 

135 def __call__(self, ctx: object, **kwargs: Any) -> VotingDecision: 

136 pass 

137 

138 

139class FilterChainHandler(Protocol[T]): 

140 def __call__(self, ctx: object, value: T, **kwargs: Any) -> T: 

141 pass 

142 

143 

144class ScoreDeltaHandler(Protocol): 

145 def __call__(self, ctx: object, scored_results: list[Any], **kwargs: Any) -> dict[str, float]: 

146 pass 

147 

148 

149class FireAndForgetHandler(Protocol): 

150 def __call__(self, ctx: object, **kwargs: Any) -> None: 

151 pass 

152 

153 

154def handler_timeout(seconds: float) -> Callable[[T], T]: 

155 """Annotate a handler with a per-invocation timeout in seconds.""" 

156 if seconds <= 0: 

157 raise ValueError("handler timeout must be positive") 

158 if seconds > 30: 

159 raise ValueError("handler timeout cannot exceed 30 seconds") 

160 

161 def decorate(handler: T) -> T: 

162 cast(Any, handler).__plugin_timeout__ = float(seconds) 

163 return handler 

164 

165 return cast(Callable[[T], T], decorate)