Coverage for node / src / stigmem_node / instruction_migrate.py: 94%
277 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Instruction file migration — spec §21, Phase 10 (ACM-226).
3Converts markdown instruction files into atomic stigmem facts and a manifest.
4Used by the `stigmem instruction migrate` CLI command.
6Public API:
7 parse_instruction_chunks(path) → list[Chunk]
8 compute_diff(chunks, ...) → list[DiffEntry]
9 load_existing_facts(diff, ...) → dict[str, str]
10 load_prev_manifest_names(...) → set[str]
11 format_preview(diff, tombstones, ...) → str
12"""
14from __future__ import annotations
16import logging
17import re
18import sys
19from dataclasses import dataclass
20from pathlib import Path
21from typing import Any
23logger = logging.getLogger("stigmem.instruction_migrate")
26# ---------------------------------------------------------------------------
27# Data classes
28# ---------------------------------------------------------------------------
31@dataclass
32class Chunk:
33 filename: str
34 heading_text: str
35 slug: str
36 content: str
37 keywords: list[str]
38 token_estimate: int
41@dataclass
42class DiffEntry:
43 action: str # CREATE | UPDATE | NOOP | TOMBSTONE
44 unit_name: str
45 fact_uri: str
46 content: str
47 heading_text: str
48 keywords: list[str]
49 token_estimate: int
50 existing_content: str | None = None
53# ---------------------------------------------------------------------------
54# Parsing
55# ---------------------------------------------------------------------------
58_HEADING_RE = re.compile(r"(?m)^(#{1,3}\s+.+)$")
59_HEADING_PREFIX_RE = re.compile(r"^#{1,3}\s+")
60_FRONTMATTER_RE = re.compile(r"^---\r?\n.*?\r?\n---\r?\n", re.DOTALL)
63def _strip_frontmatter(text: str) -> str:
64 m = _FRONTMATTER_RE.match(text)
65 return text[m.end() :].lstrip("\n") if m else text
68def _to_slug(text: str, fallback: str) -> str:
69 slug = re.sub(r"[^a-z0-9]+", "-", text.lower()).strip("-")
70 return slug or fallback
73def _extract_keywords(heading: str, body_prefix: str) -> list[str]:
74 words = re.findall(r"\b[a-zA-Z]{4,}\b", heading + " " + body_prefix)
75 seen: set[str] = set()
76 result: list[str] = []
77 for w in words:
78 lw = w.lower()
79 if lw not in seen:
80 seen.add(lw)
81 result.append(lw)
82 if len(result) == 8:
83 break
84 return result
87def _collect_md_files(path: Path) -> list[Path]:
88 """Return ordered list of markdown files under *path* (file or directory)."""
89 if path.is_file():
90 return [path]
91 seen_paths: set[Path] = set()
92 md_files: list[Path] = []
93 for p in sorted(path.glob("*.md")):
94 if p not in seen_paths: 94 ↛ 93line 94 didn't jump to line 93 because the condition on line 94 was always true
95 seen_paths.add(p)
96 md_files.append(p)
97 for p in sorted(path.glob("**/*.md")):
98 if p not in seen_paths:
99 seen_paths.add(p)
100 md_files.append(p)
101 return md_files
104def _split_into_section_pairs(text: str, md_file: Path) -> list[tuple[str, str]]:
105 """Split markdown text into (heading, body) pairs; fall back to single chunk."""
106 sections = _HEADING_RE.split(text)
107 file_pairs: list[tuple[str, str]] = []
108 i = 0
109 while i < len(sections):
110 part = sections[i]
111 if _HEADING_RE.match(part.strip()):
112 heading = part.strip()
113 body = sections[i + 1].strip() if i + 1 < len(sections) else ""
114 file_pairs.append((heading, body))
115 i += 2
116 else:
117 if part.strip():
118 fallback_heading = "# " + md_file.stem.replace("-", " ").title()
119 file_pairs.append((fallback_heading, part.strip()))
120 i += 1
122 if not file_pairs and text.strip(): 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true
123 file_pairs = [("# " + md_file.stem.replace("-", " ").title(), text.strip())]
124 return file_pairs
127def _disambiguate_slug(base_slug: str, md_file: Path, seen_slugs: dict[str, int]) -> str:
128 """Return a unique slug, prefixing with the file stem and a counter on collision."""
129 if base_slug not in seen_slugs:
130 return base_slug
131 slug = f"{md_file.stem}-{base_slug}"
132 # Still might collide; append counter
133 if slug in seen_slugs: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true
134 seen_slugs[slug] = seen_slugs.get(slug, 0) + 1
135 slug = f"{slug}-{seen_slugs[slug]}"
136 return slug
139def _build_chunk(heading: str, body: str, slug: str, md_file: Path) -> Chunk:
140 """Assemble a Chunk from a parsed (heading, body) pair."""
141 heading_text = _HEADING_PREFIX_RE.sub("", heading).strip()
142 content = f"{heading}\n\n{body}".strip() if body else heading.strip()
143 keywords = _extract_keywords(heading_text, body[:200])
144 token_estimate = max(1, len(content) // 4)
145 return Chunk(
146 filename=md_file.stem,
147 heading_text=heading_text,
148 slug=slug,
149 content=content,
150 keywords=keywords,
151 token_estimate=token_estimate,
152 )
155def parse_instruction_chunks(path: Path) -> list[Chunk]:
156 """Parse *.md files under *path* into atomic instruction chunks.
158 One chunk per H1/H2/H3 heading (or one chunk per file if no headings).
159 Strips YAML frontmatter. Deduplicates slugs across files.
160 """
161 md_files = _collect_md_files(path)
163 chunks: list[Chunk] = []
164 seen_slugs: dict[str, int] = {}
166 for md_file in md_files:
167 try:
168 text = md_file.read_text(encoding="utf-8")
169 except Exception as exc: # noqa: BLE001
170 print(f"warning: skipping {md_file}: {exc}", file=sys.stderr)
171 continue
173 text = _strip_frontmatter(text)
174 file_pairs = _split_into_section_pairs(text, md_file)
176 for heading, body in file_pairs:
177 heading_text = _HEADING_PREFIX_RE.sub("", heading).strip()
178 base_slug = _to_slug(heading_text, md_file.stem)
179 slug = _disambiguate_slug(base_slug, md_file, seen_slugs)
180 seen_slugs[slug] = seen_slugs.get(slug, 0) + 1
182 chunks.append(_build_chunk(heading, body, slug, md_file))
184 return chunks
187# ---------------------------------------------------------------------------
188# Fact URI building
189# ---------------------------------------------------------------------------
192def build_fact_uri(scope_prefix: str, slug: str, version: str) -> str:
193 return f"{scope_prefix}/{slug}/{version}"
196def scope_prefix_for_role(deployment: str, agent_id: str) -> str:
197 return f"instruction:{deployment}/agent/{agent_id}"
200def scope_prefix_for_skill(deployment: str, skill_name: str) -> str:
201 return f"instruction:{deployment}/skill/{skill_name}"
204# ---------------------------------------------------------------------------
205# Diff computation
206# ---------------------------------------------------------------------------
209def compute_diff(
210 chunks: list[Chunk],
211 scope_prefix: str,
212 version: str,
213 existing_content: dict[str, str],
214 prev_manifest_names: set[str],
215) -> list[DiffEntry]:
216 """Build a diff: CREATE / UPDATE / NOOP for each chunk, plus TOMBSTONE entries."""
217 diff: list[DiffEntry] = []
218 new_names: set[str] = set()
220 for chunk in chunks:
221 fact_uri = build_fact_uri(scope_prefix, chunk.slug, version)
222 unit_name = chunk.slug
223 new_names.add(unit_name)
224 existing = existing_content.get(fact_uri)
226 if existing is None:
227 action = "CREATE"
228 elif existing.strip() == chunk.content.strip():
229 action = "NOOP"
230 else:
231 action = "UPDATE"
233 diff.append(
234 DiffEntry(
235 action=action,
236 unit_name=unit_name,
237 fact_uri=fact_uri,
238 content=chunk.content,
239 heading_text=chunk.heading_text,
240 keywords=chunk.keywords,
241 token_estimate=chunk.token_estimate,
242 existing_content=existing,
243 )
244 )
246 # Tombstone: manifest names present before but not in new set
247 for name in sorted(prev_manifest_names - new_names):
248 diff.append(
249 DiffEntry(
250 action="TOMBSTONE",
251 unit_name=name,
252 fact_uri="",
253 content="",
254 heading_text=name,
255 keywords=[],
256 token_estimate=0,
257 )
258 )
260 return diff
263# ---------------------------------------------------------------------------
264# Existing state loaders
265# ---------------------------------------------------------------------------
268def load_existing_facts_from_db(
269 diff: list[DiffEntry],
270 db_path: str,
271) -> dict[str, str]:
272 """Query local SQLite for existing fact content keyed by fact_uri."""
273 import sqlite3
275 result: dict[str, str] = {}
276 uris = [d.fact_uri for d in diff if d.fact_uri]
277 if not uris:
278 return result
279 try:
280 conn = sqlite3.connect(db_path)
281 conn.row_factory = sqlite3.Row
282 for uri in uris:
283 row = conn.execute(
284 "SELECT value_v FROM facts WHERE entity = ? ORDER BY timestamp DESC LIMIT 1",
285 (uri,),
286 ).fetchone()
287 if row: 287 ↛ 282line 287 didn't jump to line 282 because the condition on line 287 was always true
288 result[uri] = str(row["value_v"])
289 conn.close()
290 except Exception as exc:
291 print(f"warning: db query failed ({db_path}): {exc}", file=sys.stderr)
292 return result
295def load_existing_facts_from_api(
296 diff: list[DiffEntry],
297 node_url: str,
298 api_key: str,
299) -> dict[str, str]:
300 """Query the stigmem facts API for existing fact content keyed by fact_uri."""
301 try:
302 import httpx
303 except ImportError:
304 print("warning: httpx not installed — skipping existing-fact check", file=sys.stderr)
305 return {}
307 result: dict[str, str] = {}
308 headers = {"Authorization": f"Bearer {api_key}"}
309 url = node_url.rstrip("/") + "/v1/facts"
310 for d in diff:
311 if not d.fact_uri:
312 continue
313 try:
314 r = httpx.get(
315 url,
316 params={"entity": d.fact_uri, "limit": 1},
317 headers=headers,
318 timeout=10.0,
319 )
320 if r.status_code == 200:
321 facts = r.json().get("facts", [])
322 if facts: 322 ↛ 310line 322 didn't jump to line 310 because the condition on line 322 was always true
323 result[d.fact_uri] = str(facts[0]["value"]["v"])
324 except Exception as exc: # noqa: BLE001 # nosec B110 — best-effort pre-flight; node may not be reachable
325 logger.debug("load_existing_facts_from_api failed for %s: %s", d.fact_uri, exc)
326 return result
329def load_prev_manifest_names_from_db(agent_id: str, db_path: str) -> set[str]:
330 """Return unit names from the current manifest in local SQLite."""
331 import json
332 import sqlite3
334 try:
335 conn = sqlite3.connect(db_path)
336 conn.row_factory = sqlite3.Row
337 row = conn.execute(
338 "SELECT body FROM instruction_manifests WHERE agent_id = ? AND superseded_at IS NULL"
339 " ORDER BY created_at DESC LIMIT 1",
340 (agent_id,),
341 ).fetchone()
342 conn.close()
343 if row:
344 entries = json.loads(row["body"])
345 return {e["name"] for e in entries}
346 except Exception as exc: # noqa: BLE001 # nosec B110 — best-effort read; DB may be absent or schema may differ
347 logger.debug("load_prev_manifest_names_from_db failed for %s: %s", agent_id, exc)
348 return set()
351def load_prev_manifest_names_from_api(agent_id: str, node_url: str, api_key: str) -> set[str]:
352 """Return unit names from the current manifest via API."""
353 try:
354 import httpx
355 except ImportError:
356 return set()
358 try:
359 r = httpx.get(
360 f"{node_url.rstrip('/')}/v1/agents/{agent_id}/instruction-manifest",
361 headers={"Authorization": f"Bearer {api_key}"},
362 timeout=10.0,
363 )
364 if r.status_code == 200:
365 return {e["name"] for e in r.json().get("entries", [])}
366 except Exception as exc: # noqa: BLE001 # nosec B110 — best-effort pre-flight; node may not be reachable
367 logger.debug("load_prev_manifest_names_from_api failed for %s: %s", agent_id, exc)
368 return set()
371# ---------------------------------------------------------------------------
372# Preview / formatting
373# ---------------------------------------------------------------------------
376def format_preview(
377 diff: list[DiffEntry],
378 scope_label: str,
379 path: Path,
380 version: str,
381) -> str:
382 """Return a human-readable migration preview string."""
383 lines: list[str] = []
384 lines.append("=== Migration Preview ===")
385 lines.append(f"Path: {path}")
386 lines.append(f"Scope: {scope_label}")
387 lines.append(f"Version: {version}")
388 lines.append("")
390 creates = [d for d in diff if d.action == "CREATE"]
391 updates = [d for d in diff if d.action == "UPDATE"]
392 noops = [d for d in diff if d.action == "NOOP"]
393 tombstones = [d for d in diff if d.action == "TOMBSTONE"]
395 lines.append(
396 f"Facts: {len(creates)} create, {len(updates)} update, "
397 f"{len(noops)} noop, {len(tombstones)} tombstone"
398 )
399 lines.append("")
401 for d in diff:
402 if d.action == "TOMBSTONE":
403 continue
404 symbol = {"CREATE": "+", "UPDATE": "~", "NOOP": "="}[d.action]
405 lines.append(f" [{symbol}] {d.unit_name}")
406 lines.append(f" URI: {d.fact_uri}")
407 lines.append(f" tokens: ~{d.token_estimate}")
408 if d.action == "UPDATE" and d.existing_content is not None:
409 old_lines = d.existing_content.count("\n")
410 new_lines = d.content.count("\n")
411 lines.append(f" lines: {old_lines} → {new_lines}")
412 lines.append("")
414 for d in tombstones:
415 lines.append(f" [T] {d.unit_name} (removed — existing facts kept for history)")
416 if tombstones:
417 lines.append("")
419 return "\n".join(lines)
422# ---------------------------------------------------------------------------
423# HTTP writers
424# ---------------------------------------------------------------------------
427def write_facts(
428 diff: list[DiffEntry],
429 node_url: str,
430 api_key: str,
431) -> tuple[int, int]:
432 """Write CREATE/UPDATE facts via HTTP. Returns (written, failed)."""
433 try:
434 import httpx
435 except ImportError:
436 print("error: httpx not installed", file=sys.stderr)
437 return 0, len([d for d in diff if d.action in ("CREATE", "UPDATE")])
439 headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
440 url = node_url.rstrip("/") + "/v1/facts"
441 written = 0
442 failed = 0
443 for d in diff:
444 if d.action not in ("CREATE", "UPDATE"):
445 continue
446 payload: dict[str, Any] = {
447 "entity": d.fact_uri,
448 "relation": "instruction:content",
449 "value": {"type": "text", "v": d.content},
450 "source": "instruction-migration",
451 "scope": "local",
452 }
453 try:
454 r = httpx.post(url, json=payload, headers=headers, timeout=15.0)
455 if r.status_code == 201:
456 written += 1
457 print(f" [{d.action}] {d.unit_name} ✓")
458 else:
459 print(
460 f" [{d.action}] {d.unit_name} FAILED: {r.status_code} {r.text[:120]}",
461 file=sys.stderr,
462 )
463 failed += 1
464 except Exception as exc:
465 print(f" [{d.action}] {d.unit_name} FAILED: {exc}", file=sys.stderr)
466 failed += 1
467 return written, failed
470def publish_manifest(
471 agent_id: str,
472 diff: list[DiffEntry],
473 manifest_version: str,
474 node_url: str,
475 api_key: str,
476) -> bool:
477 """Publish a new manifest via HTTP. Returns True on success."""
478 try:
479 import httpx
480 except ImportError:
481 print("error: httpx not installed", file=sys.stderr)
482 return False
484 entries = []
485 for d in diff:
486 if d.action == "TOMBSTONE":
487 continue
488 entries.append(
489 {
490 "name": d.unit_name,
491 "description": d.heading_text[:120],
492 "fact_uri": d.fact_uri,
493 "load_triggers": {
494 "intents": [d.heading_text.lower()],
495 "keywords": d.keywords,
496 "task_types": [],
497 },
498 "token_estimate": d.token_estimate,
499 }
500 )
502 payload = {
503 "version": manifest_version,
504 "entries": entries,
505 "skip_coverage_gate": False,
506 }
507 headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
508 url = node_url.rstrip("/") + f"/v1/agents/{agent_id}/instruction-manifest"
509 try:
510 r = httpx.put(url, json=payload, headers=headers, timeout=30.0)
511 if r.status_code == 200:
512 return True
513 print(f"Manifest publish FAILED: {r.status_code} {r.text[:200]}", file=sys.stderr)
514 return False
515 except Exception as exc:
516 print(f"Manifest publish FAILED: {exc}", file=sys.stderr)
517 return False