Coverage for node / src / stigmem_node / cli_federation_handlers.py: 93%

89 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Federation cursor CLI handlers extracted from cli.py. 

2 

3These handlers are imported back into ``stigmem_node.cli`` so the public 

4import surface is preserved. No behavioural changes — code was moved 

5verbatim from cli.py. 

6""" 

7 

8from __future__ import annotations 

9 

10import argparse 

11import os 

12import sys 

13 

14 

15def _write_owner_only_text(path: str, text: str) -> None: 

16 fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) 

17 try: 

18 os.fchmod(fd, 0o600) 

19 file_obj = os.fdopen(fd, "w") 

20 fd = -1 

21 with file_obj as f: 

22 f.write(text) 

23 finally: 

24 if fd != -1: 24 ↛ 25line 24 didn't jump to line 25 because the condition on line 24 was never true

25 os.close(fd) 

26 os.chmod(path, 0o600) 

27 

28 

29def _cmd_federation_cursor_export(args: argparse.Namespace) -> int: 

30 """Export replication cursor state to a portable JSON checkpoint. 

31 

32 Reads every row from replication_cursors (joined to peers for human-readable 

33 node_id / node_url context) and writes a checkpoint file. The checkpoint can 

34 be used with ``cursor-import`` to restore cursor positions after a DB loss so 

35 that the node re-pulls only the facts it missed rather than re-pulling 

36 everything from the beginning of time. 

37 

38 See stigmem/docs/cursor-reset-recovery.md for the full recovery procedure. 

39 """ 

40 import json 

41 import sqlite3 

42 from datetime import UTC, datetime 

43 

44 from .db import apply_migrations 

45 from .settings import settings 

46 

47 db_path: str = args.db or settings.db_path 

48 apply_migrations(db_path=db_path) 

49 

50 conn = sqlite3.connect(db_path) 

51 conn.row_factory = sqlite3.Row 

52 conn.execute("PRAGMA journal_mode=WAL") 

53 try: 

54 rows = conn.execute( 

55 """ 

56 SELECT rc.peer_id, rc.direction, rc.cursor, rc.updated_at, 

57 p.node_id, p.node_url, p.status AS peer_status 

58 FROM replication_cursors rc 

59 JOIN peers p ON p.id = rc.peer_id 

60 ORDER BY p.node_id, rc.direction 

61 """ 

62 ).fetchall() 

63 finally: 

64 conn.close() 

65 

66 checkpoint = { 

67 "checkpoint_timestamp": datetime.now(UTC).isoformat(), 

68 "db_path": db_path, 

69 "cursors": [ 

70 { 

71 "peer_id": r["peer_id"], 

72 "peer_node_id": r["node_id"], 

73 "peer_url": r["node_url"], 

74 "peer_status": r["peer_status"], 

75 "direction": r["direction"], 

76 "cursor": r["cursor"], 

77 "updated_at": r["updated_at"], 

78 } 

79 for r in rows 

80 ], 

81 } 

82 

83 payload = json.dumps(checkpoint, indent=2) 

84 

85 if args.out == "-": 

86 print(payload) 

87 else: 

88 _write_owner_only_text(args.out, f"{payload}\n") 

89 print(f"checkpoint written: {args.out} ({len(rows)} cursor(s))", file=sys.stderr) 

90 

91 return 0 

92 

93 

94def _cmd_federation_cursor_import(args: argparse.Namespace) -> int: 

95 """Restore replication cursors from a checkpoint file after DB loss. 

96 

97 For each entry in the checkpoint: 

98 - Skips entries whose peer_id is not present in the peers table (FK would 

99 fail; the peer may not yet be re-registered after a DB restore). 

100 - Upserts the cursor using ON CONFLICT so the command is idempotent and safe 

101 to re-run. 

102 - With --force, overwrites cursors that are newer than the checkpoint entry. 

103 Without --force (default), skips entries where the existing cursor is already 

104 set to a value (conservative: do not overwrite live state). 

105 

106 After import, the next pull cycle will resume from the restored positions 

107 rather than re-pulling from the start of time. 

108 

109 See stigmem/docs/cursor-reset-recovery.md for the full recovery procedure. 

110 """ 

111 import json 

112 import sqlite3 

113 from datetime import UTC, datetime 

114 

115 from .db import apply_migrations 

116 from .settings import settings 

117 

118 db_path: str = args.db or settings.db_path 

119 apply_migrations(db_path=db_path) 

120 

121 try: 

122 with open(args.checkpoint_file) as fh: 

123 checkpoint = json.load(fh) 

124 except (OSError, json.JSONDecodeError) as exc: 

125 print(f"error: cannot read checkpoint file: {exc}", file=sys.stderr) 

126 return 1 

127 

128 cursors = checkpoint.get("cursors") 

129 if not isinstance(cursors, list): 

130 print("error: checkpoint missing 'cursors' array", file=sys.stderr) 

131 return 1 

132 

133 conn = sqlite3.connect(db_path) 

134 conn.row_factory = sqlite3.Row 

135 conn.execute("PRAGMA journal_mode=WAL") 

136 conn.execute("PRAGMA foreign_keys=ON") 

137 

138 imported = skipped_no_peer = skipped_exists = 0 

139 now_iso = datetime.now(UTC).isoformat() 

140 

141 try: 

142 for entry in cursors: 

143 peer_id = entry.get("peer_id") 

144 direction = entry.get("direction") 

145 cursor_val = entry.get("cursor") 

146 

147 if not peer_id or not direction: 

148 print( 

149 f"warning: skipping malformed entry: {entry}", 

150 file=sys.stderr, 

151 ) 

152 continue 

153 

154 peer_row = conn.execute("SELECT id FROM peers WHERE id = ?", (peer_id,)).fetchone() 

155 if peer_row is None: 

156 print( 

157 f"warning: peer {peer_id!r} ({entry.get('peer_node_id', '?')}) " 

158 "not found in peers table — skipping (re-register the peer first)", 

159 file=sys.stderr, 

160 ) 

161 skipped_no_peer += 1 

162 continue 

163 

164 if not args.force: 

165 existing = conn.execute( 

166 "SELECT cursor FROM replication_cursors WHERE peer_id = ? AND direction = ?", 

167 (peer_id, direction), 

168 ).fetchone() 

169 if existing is not None and existing["cursor"] is not None: 169 ↛ 178line 169 didn't jump to line 178 because the condition on line 169 was always true

170 print( 

171 f"info: cursor for peer {entry.get('peer_node_id', '?')} " 

172 f"({direction}) already set — skipping (use --force to overwrite)", 

173 file=sys.stderr, 

174 ) 

175 skipped_exists += 1 

176 continue 

177 

178 conn.execute( 

179 """INSERT INTO replication_cursors (peer_id, direction, cursor, updated_at) 

180 VALUES (?, ?, ?, ?) 

181 ON CONFLICT(peer_id, direction) 

182 DO UPDATE SET cursor = excluded.cursor, updated_at = excluded.updated_at""", 

183 (peer_id, direction, cursor_val, now_iso), 

184 ) 

185 imported += 1 

186 

187 conn.commit() 

188 except Exception: 

189 conn.rollback() 

190 conn.close() 

191 raise 

192 finally: 

193 conn.close() 

194 

195 print( 

196 f"cursor import complete: {imported} restored, " 

197 f"{skipped_no_peer} skipped (peer not found), " 

198 f"{skipped_exists} skipped (already set)", 

199 file=sys.stderr, 

200 ) 

201 return 0