Coverage for node / src / stigmem_node / instruction_migrate.py: 94%

277 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Instruction file migration — spec §21, Phase 10 (ACM-226). 

2 

3Converts markdown instruction files into atomic stigmem facts and a manifest. 

4Used by the `stigmem instruction migrate` CLI command. 

5 

6Public API: 

7 parse_instruction_chunks(path) → list[Chunk] 

8 compute_diff(chunks, ...) → list[DiffEntry] 

9 load_existing_facts(diff, ...) → dict[str, str] 

10 load_prev_manifest_names(...) → set[str] 

11 format_preview(diff, tombstones, ...) → str 

12""" 

13 

14from __future__ import annotations 

15 

16import logging 

17import re 

18import sys 

19from dataclasses import dataclass 

20from pathlib import Path 

21from typing import Any 

22 

23logger = logging.getLogger("stigmem.instruction_migrate") 

24 

25 

26# --------------------------------------------------------------------------- 

27# Data classes 

28# --------------------------------------------------------------------------- 

29 

30 

31@dataclass 

32class Chunk: 

33 filename: str 

34 heading_text: str 

35 slug: str 

36 content: str 

37 keywords: list[str] 

38 token_estimate: int 

39 

40 

41@dataclass 

42class DiffEntry: 

43 action: str # CREATE | UPDATE | NOOP | TOMBSTONE 

44 unit_name: str 

45 fact_uri: str 

46 content: str 

47 heading_text: str 

48 keywords: list[str] 

49 token_estimate: int 

50 existing_content: str | None = None 

51 

52 

53# --------------------------------------------------------------------------- 

54# Parsing 

55# --------------------------------------------------------------------------- 

56 

57 

58_HEADING_RE = re.compile(r"(?m)^(#{1,3}\s+.+)$") 

59_HEADING_PREFIX_RE = re.compile(r"^#{1,3}\s+") 

60_FRONTMATTER_RE = re.compile(r"^---\r?\n.*?\r?\n---\r?\n", re.DOTALL) 

61 

62 

63def _strip_frontmatter(text: str) -> str: 

64 m = _FRONTMATTER_RE.match(text) 

65 return text[m.end() :].lstrip("\n") if m else text 

66 

67 

68def _to_slug(text: str, fallback: str) -> str: 

69 slug = re.sub(r"[^a-z0-9]+", "-", text.lower()).strip("-") 

70 return slug or fallback 

71 

72 

73def _extract_keywords(heading: str, body_prefix: str) -> list[str]: 

74 words = re.findall(r"\b[a-zA-Z]{4,}\b", heading + " " + body_prefix) 

75 seen: set[str] = set() 

76 result: list[str] = [] 

77 for w in words: 

78 lw = w.lower() 

79 if lw not in seen: 

80 seen.add(lw) 

81 result.append(lw) 

82 if len(result) == 8: 

83 break 

84 return result 

85 

86 

87def _collect_md_files(path: Path) -> list[Path]: 

88 """Return ordered list of markdown files under *path* (file or directory).""" 

89 if path.is_file(): 

90 return [path] 

91 seen_paths: set[Path] = set() 

92 md_files: list[Path] = [] 

93 for p in sorted(path.glob("*.md")): 

94 if p not in seen_paths: 94 ↛ 93line 94 didn't jump to line 93 because the condition on line 94 was always true

95 seen_paths.add(p) 

96 md_files.append(p) 

97 for p in sorted(path.glob("**/*.md")): 

98 if p not in seen_paths: 

99 seen_paths.add(p) 

100 md_files.append(p) 

101 return md_files 

102 

103 

104def _split_into_section_pairs(text: str, md_file: Path) -> list[tuple[str, str]]: 

105 """Split markdown text into (heading, body) pairs; fall back to single chunk.""" 

106 sections = _HEADING_RE.split(text) 

107 file_pairs: list[tuple[str, str]] = [] 

108 i = 0 

109 while i < len(sections): 

110 part = sections[i] 

111 if _HEADING_RE.match(part.strip()): 

112 heading = part.strip() 

113 body = sections[i + 1].strip() if i + 1 < len(sections) else "" 

114 file_pairs.append((heading, body)) 

115 i += 2 

116 else: 

117 if part.strip(): 

118 fallback_heading = "# " + md_file.stem.replace("-", " ").title() 

119 file_pairs.append((fallback_heading, part.strip())) 

120 i += 1 

121 

122 if not file_pairs and text.strip(): 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true

123 file_pairs = [("# " + md_file.stem.replace("-", " ").title(), text.strip())] 

124 return file_pairs 

125 

126 

127def _disambiguate_slug(base_slug: str, md_file: Path, seen_slugs: dict[str, int]) -> str: 

128 """Return a unique slug, prefixing with the file stem and a counter on collision.""" 

129 if base_slug not in seen_slugs: 

130 return base_slug 

131 slug = f"{md_file.stem}-{base_slug}" 

132 # Still might collide; append counter 

133 if slug in seen_slugs: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true

134 seen_slugs[slug] = seen_slugs.get(slug, 0) + 1 

135 slug = f"{slug}-{seen_slugs[slug]}" 

136 return slug 

137 

138 

139def _build_chunk(heading: str, body: str, slug: str, md_file: Path) -> Chunk: 

140 """Assemble a Chunk from a parsed (heading, body) pair.""" 

141 heading_text = _HEADING_PREFIX_RE.sub("", heading).strip() 

142 content = f"{heading}\n\n{body}".strip() if body else heading.strip() 

143 keywords = _extract_keywords(heading_text, body[:200]) 

144 token_estimate = max(1, len(content) // 4) 

145 return Chunk( 

146 filename=md_file.stem, 

147 heading_text=heading_text, 

148 slug=slug, 

149 content=content, 

150 keywords=keywords, 

151 token_estimate=token_estimate, 

152 ) 

153 

154 

155def parse_instruction_chunks(path: Path) -> list[Chunk]: 

156 """Parse *.md files under *path* into atomic instruction chunks. 

157 

158 One chunk per H1/H2/H3 heading (or one chunk per file if no headings). 

159 Strips YAML frontmatter. Deduplicates slugs across files. 

160 """ 

161 md_files = _collect_md_files(path) 

162 

163 chunks: list[Chunk] = [] 

164 seen_slugs: dict[str, int] = {} 

165 

166 for md_file in md_files: 

167 try: 

168 text = md_file.read_text(encoding="utf-8") 

169 except Exception as exc: # noqa: BLE001 

170 print(f"warning: skipping {md_file}: {exc}", file=sys.stderr) 

171 continue 

172 

173 text = _strip_frontmatter(text) 

174 file_pairs = _split_into_section_pairs(text, md_file) 

175 

176 for heading, body in file_pairs: 

177 heading_text = _HEADING_PREFIX_RE.sub("", heading).strip() 

178 base_slug = _to_slug(heading_text, md_file.stem) 

179 slug = _disambiguate_slug(base_slug, md_file, seen_slugs) 

180 seen_slugs[slug] = seen_slugs.get(slug, 0) + 1 

181 

182 chunks.append(_build_chunk(heading, body, slug, md_file)) 

183 

184 return chunks 

185 

186 

187# --------------------------------------------------------------------------- 

188# Fact URI building 

189# --------------------------------------------------------------------------- 

190 

191 

192def build_fact_uri(scope_prefix: str, slug: str, version: str) -> str: 

193 return f"{scope_prefix}/{slug}/{version}" 

194 

195 

196def scope_prefix_for_role(deployment: str, agent_id: str) -> str: 

197 return f"instruction:{deployment}/agent/{agent_id}" 

198 

199 

200def scope_prefix_for_skill(deployment: str, skill_name: str) -> str: 

201 return f"instruction:{deployment}/skill/{skill_name}" 

202 

203 

204# --------------------------------------------------------------------------- 

205# Diff computation 

206# --------------------------------------------------------------------------- 

207 

208 

209def compute_diff( 

210 chunks: list[Chunk], 

211 scope_prefix: str, 

212 version: str, 

213 existing_content: dict[str, str], 

214 prev_manifest_names: set[str], 

215) -> list[DiffEntry]: 

216 """Build a diff: CREATE / UPDATE / NOOP for each chunk, plus TOMBSTONE entries.""" 

217 diff: list[DiffEntry] = [] 

218 new_names: set[str] = set() 

219 

220 for chunk in chunks: 

221 fact_uri = build_fact_uri(scope_prefix, chunk.slug, version) 

222 unit_name = chunk.slug 

223 new_names.add(unit_name) 

224 existing = existing_content.get(fact_uri) 

225 

226 if existing is None: 

227 action = "CREATE" 

228 elif existing.strip() == chunk.content.strip(): 

229 action = "NOOP" 

230 else: 

231 action = "UPDATE" 

232 

233 diff.append( 

234 DiffEntry( 

235 action=action, 

236 unit_name=unit_name, 

237 fact_uri=fact_uri, 

238 content=chunk.content, 

239 heading_text=chunk.heading_text, 

240 keywords=chunk.keywords, 

241 token_estimate=chunk.token_estimate, 

242 existing_content=existing, 

243 ) 

244 ) 

245 

246 # Tombstone: manifest names present before but not in new set 

247 for name in sorted(prev_manifest_names - new_names): 

248 diff.append( 

249 DiffEntry( 

250 action="TOMBSTONE", 

251 unit_name=name, 

252 fact_uri="", 

253 content="", 

254 heading_text=name, 

255 keywords=[], 

256 token_estimate=0, 

257 ) 

258 ) 

259 

260 return diff 

261 

262 

263# --------------------------------------------------------------------------- 

264# Existing state loaders 

265# --------------------------------------------------------------------------- 

266 

267 

268def load_existing_facts_from_db( 

269 diff: list[DiffEntry], 

270 db_path: str, 

271) -> dict[str, str]: 

272 """Query local SQLite for existing fact content keyed by fact_uri.""" 

273 import sqlite3 

274 

275 result: dict[str, str] = {} 

276 uris = [d.fact_uri for d in diff if d.fact_uri] 

277 if not uris: 

278 return result 

279 try: 

280 conn = sqlite3.connect(db_path) 

281 conn.row_factory = sqlite3.Row 

282 for uri in uris: 

283 row = conn.execute( 

284 "SELECT value_v FROM facts WHERE entity = ? ORDER BY timestamp DESC LIMIT 1", 

285 (uri,), 

286 ).fetchone() 

287 if row: 287 ↛ 282line 287 didn't jump to line 282 because the condition on line 287 was always true

288 result[uri] = str(row["value_v"]) 

289 conn.close() 

290 except Exception as exc: 

291 print(f"warning: db query failed ({db_path}): {exc}", file=sys.stderr) 

292 return result 

293 

294 

295def load_existing_facts_from_api( 

296 diff: list[DiffEntry], 

297 node_url: str, 

298 api_key: str, 

299) -> dict[str, str]: 

300 """Query the stigmem facts API for existing fact content keyed by fact_uri.""" 

301 try: 

302 import httpx 

303 except ImportError: 

304 print("warning: httpx not installed — skipping existing-fact check", file=sys.stderr) 

305 return {} 

306 

307 result: dict[str, str] = {} 

308 headers = {"Authorization": f"Bearer {api_key}"} 

309 url = node_url.rstrip("/") + "/v1/facts" 

310 for d in diff: 

311 if not d.fact_uri: 

312 continue 

313 try: 

314 r = httpx.get( 

315 url, 

316 params={"entity": d.fact_uri, "limit": 1}, 

317 headers=headers, 

318 timeout=10.0, 

319 ) 

320 if r.status_code == 200: 

321 facts = r.json().get("facts", []) 

322 if facts: 322 ↛ 310line 322 didn't jump to line 310 because the condition on line 322 was always true

323 result[d.fact_uri] = str(facts[0]["value"]["v"]) 

324 except Exception as exc: # noqa: BLE001 # nosec B110 — best-effort pre-flight; node may not be reachable 

325 logger.debug("load_existing_facts_from_api failed for %s: %s", d.fact_uri, exc) 

326 return result 

327 

328 

329def load_prev_manifest_names_from_db(agent_id: str, db_path: str) -> set[str]: 

330 """Return unit names from the current manifest in local SQLite.""" 

331 import json 

332 import sqlite3 

333 

334 try: 

335 conn = sqlite3.connect(db_path) 

336 conn.row_factory = sqlite3.Row 

337 row = conn.execute( 

338 "SELECT body FROM instruction_manifests WHERE agent_id = ? AND superseded_at IS NULL" 

339 " ORDER BY created_at DESC LIMIT 1", 

340 (agent_id,), 

341 ).fetchone() 

342 conn.close() 

343 if row: 

344 entries = json.loads(row["body"]) 

345 return {e["name"] for e in entries} 

346 except Exception as exc: # noqa: BLE001 # nosec B110 — best-effort read; DB may be absent or schema may differ 

347 logger.debug("load_prev_manifest_names_from_db failed for %s: %s", agent_id, exc) 

348 return set() 

349 

350 

351def load_prev_manifest_names_from_api(agent_id: str, node_url: str, api_key: str) -> set[str]: 

352 """Return unit names from the current manifest via API.""" 

353 try: 

354 import httpx 

355 except ImportError: 

356 return set() 

357 

358 try: 

359 r = httpx.get( 

360 f"{node_url.rstrip('/')}/v1/agents/{agent_id}/instruction-manifest", 

361 headers={"Authorization": f"Bearer {api_key}"}, 

362 timeout=10.0, 

363 ) 

364 if r.status_code == 200: 

365 return {e["name"] for e in r.json().get("entries", [])} 

366 except Exception as exc: # noqa: BLE001 # nosec B110 — best-effort pre-flight; node may not be reachable 

367 logger.debug("load_prev_manifest_names_from_api failed for %s: %s", agent_id, exc) 

368 return set() 

369 

370 

371# --------------------------------------------------------------------------- 

372# Preview / formatting 

373# --------------------------------------------------------------------------- 

374 

375 

376def format_preview( 

377 diff: list[DiffEntry], 

378 scope_label: str, 

379 path: Path, 

380 version: str, 

381) -> str: 

382 """Return a human-readable migration preview string.""" 

383 lines: list[str] = [] 

384 lines.append("=== Migration Preview ===") 

385 lines.append(f"Path: {path}") 

386 lines.append(f"Scope: {scope_label}") 

387 lines.append(f"Version: {version}") 

388 lines.append("") 

389 

390 creates = [d for d in diff if d.action == "CREATE"] 

391 updates = [d for d in diff if d.action == "UPDATE"] 

392 noops = [d for d in diff if d.action == "NOOP"] 

393 tombstones = [d for d in diff if d.action == "TOMBSTONE"] 

394 

395 lines.append( 

396 f"Facts: {len(creates)} create, {len(updates)} update, " 

397 f"{len(noops)} noop, {len(tombstones)} tombstone" 

398 ) 

399 lines.append("") 

400 

401 for d in diff: 

402 if d.action == "TOMBSTONE": 

403 continue 

404 symbol = {"CREATE": "+", "UPDATE": "~", "NOOP": "="}[d.action] 

405 lines.append(f" [{symbol}] {d.unit_name}") 

406 lines.append(f" URI: {d.fact_uri}") 

407 lines.append(f" tokens: ~{d.token_estimate}") 

408 if d.action == "UPDATE" and d.existing_content is not None: 

409 old_lines = d.existing_content.count("\n") 

410 new_lines = d.content.count("\n") 

411 lines.append(f" lines: {old_lines} → {new_lines}") 

412 lines.append("") 

413 

414 for d in tombstones: 

415 lines.append(f" [T] {d.unit_name} (removed — existing facts kept for history)") 

416 if tombstones: 

417 lines.append("") 

418 

419 return "\n".join(lines) 

420 

421 

422# --------------------------------------------------------------------------- 

423# HTTP writers 

424# --------------------------------------------------------------------------- 

425 

426 

427def write_facts( 

428 diff: list[DiffEntry], 

429 node_url: str, 

430 api_key: str, 

431) -> tuple[int, int]: 

432 """Write CREATE/UPDATE facts via HTTP. Returns (written, failed).""" 

433 try: 

434 import httpx 

435 except ImportError: 

436 print("error: httpx not installed", file=sys.stderr) 

437 return 0, len([d for d in diff if d.action in ("CREATE", "UPDATE")]) 

438 

439 headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} 

440 url = node_url.rstrip("/") + "/v1/facts" 

441 written = 0 

442 failed = 0 

443 for d in diff: 

444 if d.action not in ("CREATE", "UPDATE"): 

445 continue 

446 payload: dict[str, Any] = { 

447 "entity": d.fact_uri, 

448 "relation": "instruction:content", 

449 "value": {"type": "text", "v": d.content}, 

450 "source": "instruction-migration", 

451 "scope": "local", 

452 } 

453 try: 

454 r = httpx.post(url, json=payload, headers=headers, timeout=15.0) 

455 if r.status_code == 201: 

456 written += 1 

457 print(f" [{d.action}] {d.unit_name} ✓") 

458 else: 

459 print( 

460 f" [{d.action}] {d.unit_name} FAILED: {r.status_code} {r.text[:120]}", 

461 file=sys.stderr, 

462 ) 

463 failed += 1 

464 except Exception as exc: 

465 print(f" [{d.action}] {d.unit_name} FAILED: {exc}", file=sys.stderr) 

466 failed += 1 

467 return written, failed 

468 

469 

470def publish_manifest( 

471 agent_id: str, 

472 diff: list[DiffEntry], 

473 manifest_version: str, 

474 node_url: str, 

475 api_key: str, 

476) -> bool: 

477 """Publish a new manifest via HTTP. Returns True on success.""" 

478 try: 

479 import httpx 

480 except ImportError: 

481 print("error: httpx not installed", file=sys.stderr) 

482 return False 

483 

484 entries = [] 

485 for d in diff: 

486 if d.action == "TOMBSTONE": 

487 continue 

488 entries.append( 

489 { 

490 "name": d.unit_name, 

491 "description": d.heading_text[:120], 

492 "fact_uri": d.fact_uri, 

493 "load_triggers": { 

494 "intents": [d.heading_text.lower()], 

495 "keywords": d.keywords, 

496 "task_types": [], 

497 }, 

498 "token_estimate": d.token_estimate, 

499 } 

500 ) 

501 

502 payload = { 

503 "version": manifest_version, 

504 "entries": entries, 

505 "skip_coverage_gate": False, 

506 } 

507 headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} 

508 url = node_url.rstrip("/") + f"/v1/agents/{agent_id}/instruction-manifest" 

509 try: 

510 r = httpx.put(url, json=payload, headers=headers, timeout=30.0) 

511 if r.status_code == 200: 

512 return True 

513 print(f"Manifest publish FAILED: {r.status_code} {r.text[:200]}", file=sys.stderr) 

514 return False 

515 except Exception as exc: 

516 print(f"Manifest publish FAILED: {exc}", file=sys.stderr) 

517 return False