Coverage for node / src / stigmem_node / cli_federation_handlers.py: 93%
89 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Federation cursor CLI handlers extracted from cli.py.
3These handlers are imported back into ``stigmem_node.cli`` so the public
4import surface is preserved. No behavioural changes — code was moved
5verbatim from cli.py.
6"""
8from __future__ import annotations
10import argparse
11import os
12import sys
15def _write_owner_only_text(path: str, text: str) -> None:
16 fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
17 try:
18 os.fchmod(fd, 0o600)
19 file_obj = os.fdopen(fd, "w")
20 fd = -1
21 with file_obj as f:
22 f.write(text)
23 finally:
24 if fd != -1: 24 ↛ 25line 24 didn't jump to line 25 because the condition on line 24 was never true
25 os.close(fd)
26 os.chmod(path, 0o600)
29def _cmd_federation_cursor_export(args: argparse.Namespace) -> int:
30 """Export replication cursor state to a portable JSON checkpoint.
32 Reads every row from replication_cursors (joined to peers for human-readable
33 node_id / node_url context) and writes a checkpoint file. The checkpoint can
34 be used with ``cursor-import`` to restore cursor positions after a DB loss so
35 that the node re-pulls only the facts it missed rather than re-pulling
36 everything from the beginning of time.
38 See stigmem/docs/cursor-reset-recovery.md for the full recovery procedure.
39 """
40 import json
41 import sqlite3
42 from datetime import UTC, datetime
44 from .db import apply_migrations
45 from .settings import settings
47 db_path: str = args.db or settings.db_path
48 apply_migrations(db_path=db_path)
50 conn = sqlite3.connect(db_path)
51 conn.row_factory = sqlite3.Row
52 conn.execute("PRAGMA journal_mode=WAL")
53 try:
54 rows = conn.execute(
55 """
56 SELECT rc.peer_id, rc.direction, rc.cursor, rc.updated_at,
57 p.node_id, p.node_url, p.status AS peer_status
58 FROM replication_cursors rc
59 JOIN peers p ON p.id = rc.peer_id
60 ORDER BY p.node_id, rc.direction
61 """
62 ).fetchall()
63 finally:
64 conn.close()
66 checkpoint = {
67 "checkpoint_timestamp": datetime.now(UTC).isoformat(),
68 "db_path": db_path,
69 "cursors": [
70 {
71 "peer_id": r["peer_id"],
72 "peer_node_id": r["node_id"],
73 "peer_url": r["node_url"],
74 "peer_status": r["peer_status"],
75 "direction": r["direction"],
76 "cursor": r["cursor"],
77 "updated_at": r["updated_at"],
78 }
79 for r in rows
80 ],
81 }
83 payload = json.dumps(checkpoint, indent=2)
85 if args.out == "-":
86 print(payload)
87 else:
88 _write_owner_only_text(args.out, f"{payload}\n")
89 print(f"checkpoint written: {args.out} ({len(rows)} cursor(s))", file=sys.stderr)
91 return 0
94def _cmd_federation_cursor_import(args: argparse.Namespace) -> int:
95 """Restore replication cursors from a checkpoint file after DB loss.
97 For each entry in the checkpoint:
98 - Skips entries whose peer_id is not present in the peers table (FK would
99 fail; the peer may not yet be re-registered after a DB restore).
100 - Upserts the cursor using ON CONFLICT so the command is idempotent and safe
101 to re-run.
102 - With --force, overwrites cursors that are newer than the checkpoint entry.
103 Without --force (default), skips entries where the existing cursor is already
104 set to a value (conservative: do not overwrite live state).
106 After import, the next pull cycle will resume from the restored positions
107 rather than re-pulling from the start of time.
109 See stigmem/docs/cursor-reset-recovery.md for the full recovery procedure.
110 """
111 import json
112 import sqlite3
113 from datetime import UTC, datetime
115 from .db import apply_migrations
116 from .settings import settings
118 db_path: str = args.db or settings.db_path
119 apply_migrations(db_path=db_path)
121 try:
122 with open(args.checkpoint_file) as fh:
123 checkpoint = json.load(fh)
124 except (OSError, json.JSONDecodeError) as exc:
125 print(f"error: cannot read checkpoint file: {exc}", file=sys.stderr)
126 return 1
128 cursors = checkpoint.get("cursors")
129 if not isinstance(cursors, list):
130 print("error: checkpoint missing 'cursors' array", file=sys.stderr)
131 return 1
133 conn = sqlite3.connect(db_path)
134 conn.row_factory = sqlite3.Row
135 conn.execute("PRAGMA journal_mode=WAL")
136 conn.execute("PRAGMA foreign_keys=ON")
138 imported = skipped_no_peer = skipped_exists = 0
139 now_iso = datetime.now(UTC).isoformat()
141 try:
142 for entry in cursors:
143 peer_id = entry.get("peer_id")
144 direction = entry.get("direction")
145 cursor_val = entry.get("cursor")
147 if not peer_id or not direction:
148 print(
149 f"warning: skipping malformed entry: {entry}",
150 file=sys.stderr,
151 )
152 continue
154 peer_row = conn.execute("SELECT id FROM peers WHERE id = ?", (peer_id,)).fetchone()
155 if peer_row is None:
156 print(
157 f"warning: peer {peer_id!r} ({entry.get('peer_node_id', '?')}) "
158 "not found in peers table — skipping (re-register the peer first)",
159 file=sys.stderr,
160 )
161 skipped_no_peer += 1
162 continue
164 if not args.force:
165 existing = conn.execute(
166 "SELECT cursor FROM replication_cursors WHERE peer_id = ? AND direction = ?",
167 (peer_id, direction),
168 ).fetchone()
169 if existing is not None and existing["cursor"] is not None: 169 ↛ 178line 169 didn't jump to line 178 because the condition on line 169 was always true
170 print(
171 f"info: cursor for peer {entry.get('peer_node_id', '?')} "
172 f"({direction}) already set — skipping (use --force to overwrite)",
173 file=sys.stderr,
174 )
175 skipped_exists += 1
176 continue
178 conn.execute(
179 """INSERT INTO replication_cursors (peer_id, direction, cursor, updated_at)
180 VALUES (?, ?, ?, ?)
181 ON CONFLICT(peer_id, direction)
182 DO UPDATE SET cursor = excluded.cursor, updated_at = excluded.updated_at""",
183 (peer_id, direction, cursor_val, now_iso),
184 )
185 imported += 1
187 conn.commit()
188 except Exception:
189 conn.rollback()
190 conn.close()
191 raise
192 finally:
193 conn.close()
195 print(
196 f"cursor import complete: {imported} restored, "
197 f"{skipped_no_peer} skipped (peer not found), "
198 f"{skipped_exists} skipped (already set)",
199 file=sys.stderr,
200 )
201 return 0