Coverage for node / src / stigmem_node / plugins / discovery.py: 96%

100 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Plugin package discovery via Python entry points.""" 

2 

3from __future__ import annotations 

4 

5from collections.abc import Iterable 

6from dataclasses import dataclass 

7from importlib.metadata import EntryPoint, entry_points 

8from typing import Any 

9 

10from .errors import PluginDependencyError, PluginDiscoveryError 

11from .manifest import PluginManifest 

12 

13ENTRY_POINT_GROUP = "stigmem.plugins" 

14_State = str 

15_VISITING: _State = "visiting" 

16_VISITED: _State = "visited" 

17 

18 

19@dataclass(frozen=True, slots=True) 

20class DiscoveredPlugin: 

21 """Plugin manifest loaded from a package entry point.""" 

22 

23 manifest: PluginManifest 

24 entry_point_name: str 

25 entry_point_value: str 

26 distribution: str | None = None 

27 signing_identity: str = "unsigned" 

28 signature_verified: bool = False 

29 

30 

31def discover_plugin_manifests( 

32 *, group: str = ENTRY_POINT_GROUP 

33) -> tuple[DiscoveredPlugin, ...]: 

34 """Load plugin manifests declared through Python package entry points. 

35 

36 Entry points in ``stigmem.plugins`` must resolve to a zero-argument callable 

37 returning a :class:`PluginManifest`. PR 4-INF.2 keeps discovery separate from 

38 registration so dependency ordering and startup lifecycle can build on this 

39 deterministic manifest list. 

40 """ 

41 

42 discovered: list[DiscoveredPlugin] = [] 

43 seen_names: dict[str, str] = {} 

44 for entry_point in sorted(_entry_points_for_group(group), key=lambda ep: ep.name): 

45 factory = _load_entry_point(entry_point) 

46 if not callable(factory): 

47 raise PluginDiscoveryError( 

48 f"plugin entry point {entry_point.name!r} loaded " 

49 f"{type(factory).__name__}; expected a callable returning PluginManifest" 

50 ) 

51 manifest = _call_manifest_factory(entry_point, factory) 

52 previous_entry_point = seen_names.get(manifest.name) 

53 if previous_entry_point is not None: 

54 raise PluginDiscoveryError( 

55 f"duplicate plugin name {manifest.name!r} discovered from entry points " 

56 f"{previous_entry_point!r} and {entry_point.name!r}" 

57 ) 

58 seen_names[manifest.name] = entry_point.name 

59 discovered.append( 

60 DiscoveredPlugin( 

61 manifest=manifest, 

62 entry_point_name=entry_point.name, 

63 entry_point_value=entry_point.value, 

64 distribution=_distribution_name(entry_point), 

65 ) 

66 ) 

67 return tuple(discovered) 

68 

69 

70def resolve_plugin_dependencies( 

71 discovered: Iterable[DiscoveredPlugin], 

72 *, 

73 registered_plugins: Iterable[str] = (), 

74) -> tuple[DiscoveredPlugin, ...]: 

75 """Return discovered plugins in deterministic dependency-first order. 

76 

77 Dependencies may be satisfied by other discovered plugins or by names already 

78 present in the registry. Already-registered dependencies are not returned; 

79 they simply unlock discovered dependents for follow-on registration. 

80 """ 

81 

82 by_name: dict[str, DiscoveredPlugin] = {} 

83 for plugin in discovered: 

84 name = plugin.manifest.name 

85 if name in by_name: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true

86 raise PluginDependencyError(f"duplicate discovered plugin name {name!r}") 

87 by_name[name] = plugin 

88 

89 registered = frozenset(registered_plugins) 

90 missing: dict[str, list[str]] = {} 

91 for name, plugin in sorted(by_name.items()): 

92 missing_deps = sorted( 

93 dependency 

94 for dependency in plugin.manifest.depends_on 

95 if dependency not in by_name and dependency not in registered 

96 ) 

97 if missing_deps: 

98 missing[name] = missing_deps 

99 if missing: 

100 details = "; ".join( 

101 f"{name} missing {', '.join(dependencies)}" 

102 for name, dependencies in missing.items() 

103 ) 

104 raise PluginDependencyError(f"missing plugin dependencies: {details}") 

105 

106 ordered: list[DiscoveredPlugin] = [] 

107 states: dict[str, _State] = {} 

108 stack: list[str] = [] 

109 

110 def visit(name: str) -> None: 

111 state = states.get(name) 

112 if state == _VISITED: 

113 return 

114 if state == _VISITING: 

115 cycle_start = stack.index(name) 

116 cycle_path = [*stack[cycle_start:], name] 

117 raise PluginDependencyError( 

118 f"plugin dependency cycle detected: {' -> '.join(cycle_path)}" 

119 ) 

120 

121 states[name] = _VISITING 

122 stack.append(name) 

123 plugin = by_name[name] 

124 for dependency in sorted(plugin.manifest.depends_on): 

125 if dependency in registered: 

126 continue 

127 visit(dependency) 

128 stack.pop() 

129 states[name] = _VISITED 

130 ordered.append(plugin) 

131 

132 for name in sorted(by_name): 

133 visit(name) 

134 

135 return tuple(ordered) 

136 

137 

138def _entry_points_for_group(group: str) -> Iterable[EntryPoint]: 

139 all_entry_points = entry_points() 

140 if hasattr(all_entry_points, "select"): 140 ↛ 142line 140 didn't jump to line 142 because the condition on line 140 was always true

141 return all_entry_points.select(group=group) 

142 return all_entry_points.get(group, ()) 

143 

144 

145def _load_entry_point(entry_point: EntryPoint) -> Any: 

146 try: 

147 return entry_point.load() 

148 except Exception as exc: 

149 raise PluginDiscoveryError( 

150 f"failed to load plugin entry point {entry_point.name!r}: {exc}" 

151 ) from exc 

152 

153 

154def _call_manifest_factory(entry_point: EntryPoint, factory: Any) -> PluginManifest: 

155 try: 

156 manifest = factory() 

157 except Exception as exc: 

158 raise PluginDiscoveryError( 

159 f"plugin entry point {entry_point.name!r} failed while creating " 

160 f"PluginManifest: {exc}" 

161 ) from exc 

162 if not isinstance(manifest, PluginManifest): 

163 raise PluginDiscoveryError( 

164 f"plugin entry point {entry_point.name!r} returned " 

165 f"{type(manifest).__name__}; expected PluginManifest" 

166 ) 

167 return manifest 

168 

169 

170def _distribution_name(entry_point: EntryPoint) -> str | None: 

171 dist = getattr(entry_point, "dist", None) 

172 if dist is None: 

173 return None 

174 metadata = getattr(dist, "metadata", None) 

175 if metadata is None: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true

176 return None 

177 name = metadata.get("Name") 

178 return str(name) if name else None