Coverage for node / src / stigmem_node / storage / sqlite_backend.py: 56%

97 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""SQLite implementation of StorageBackend — the default backend. 

2 

3When *encryption_key* is provided (32 bytes), the backend uses SQLCipher via 

4the ``sqlcipher3`` package instead of stdlib ``sqlite3``. Install the extra 

5before enabling encryption:: 

6 

7 pip install 'stigmem-node[sqlcipher]' 

8 

9When *embed_enabled* is True the backend loads the ``sqlite-vec`` extension on 

10every new connection and creates the ``vec_facts`` virtual table if absent:: 

11 

12 pip install 'stigmem-node[sqlite-vec]' 

13""" 

14 

15from __future__ import annotations 

16 

17import logging 

18import sqlite3 

19from collections.abc import Generator 

20from contextlib import contextmanager 

21from datetime import UTC, datetime 

22from pathlib import Path 

23from typing import Any 

24 

25from .base import StorageBackend 

26 

27logger = logging.getLogger("stigmem.storage.sqlite") 

28 

29 

30class SQLiteBackend(StorageBackend): 

31 """Default SQLite backend. 

32 

33 Behaviour is identical to the pre-trait implementation in ``db.py``. 

34 Uses WAL journal mode and enforces foreign-key constraints on every 

35 connection. When *encryption_key* is provided, SQLCipher is used 

36 transparently — the key is set via ``PRAGMA key`` immediately after open. 

37 When *embed_enabled* is True, sqlite-vec is loaded and ``vec_facts`` is 

38 created with the given *embed_dimension*. 

39 """ 

40 

41 def __init__( 

42 self, 

43 db_path: str, 

44 encryption_key: bytes | None = None, 

45 embed_enabled: bool = False, 

46 embed_dimension: int = 768, 

47 ) -> None: 

48 self._db_path = db_path 

49 self._encryption_key = encryption_key 

50 self._embed_enabled = embed_enabled 

51 self._embed_dimension = embed_dimension 

52 

53 @property 

54 def backend_name(self) -> str: 

55 return "sqlite" 

56 

57 def _open_conn(self) -> Any: 

58 """Open and return a raw (un-transacted) connection, WAL + FK enabled.""" 

59 if self._encryption_key is not None: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true

60 try: 

61 import sqlcipher3 as _sc 

62 except ImportError as exc: 

63 raise RuntimeError( 

64 "sqlcipher3 is required for SQLite encryption-at-rest. " 

65 "Install it with: pip install 'stigmem-node[sqlcipher]'" 

66 ) from exc 

67 conn = _sc.connect(self._db_path) 

68 hex_key = self._encryption_key.hex() 

69 conn.execute(f"PRAGMA key = \"x'{hex_key}'\"") # noqa: S608 

70 conn.row_factory = _sc.Row 

71 else: 

72 conn = sqlite3.connect(self._db_path) 

73 conn.row_factory = sqlite3.Row 

74 conn.execute("PRAGMA journal_mode=WAL") 

75 conn.execute("PRAGMA foreign_keys=ON") 

76 if self._embed_enabled: 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true

77 self._load_sqlite_vec(conn) 

78 return conn 

79 

80 def _load_sqlite_vec(self, conn: Any) -> None: 

81 """Load the sqlite-vec extension and ensure vec_facts virtual table exists.""" 

82 try: 

83 import sqlite_vec 

84 except ImportError as exc: 

85 raise RuntimeError( 

86 "sqlite-vec is required when embed_enabled=true. " 

87 "Install it with: pip install 'stigmem-node[sqlite-vec]'" 

88 ) from exc 

89 

90 try: 

91 conn.enable_load_extension(True) 

92 sqlite_vec.load(conn) 

93 conn.enable_load_extension(False) 

94 except Exception as exc: 

95 raise RuntimeError(f"Failed to load sqlite-vec extension: {exc}") from exc 

96 

97 from stigmem_node.vector_search import ensure_vec_table 

98 

99 ensure_vec_table(conn, self._embed_dimension) 

100 

101 @contextmanager 

102 def connection(self) -> Generator[Any, None, None]: 

103 conn = self._open_conn() 

104 try: 

105 yield conn 

106 conn.commit() 

107 except Exception: 

108 conn.rollback() 

109 raise 

110 finally: 

111 conn.close() 

112 

113 def apply_migrations(self, migrations_dir: Path) -> None: 

114 conn = self._open_conn() 

115 try: 

116 conn.execute( 

117 """CREATE TABLE IF NOT EXISTS schema_migrations ( 

118 id INTEGER PRIMARY KEY AUTOINCREMENT, 

119 version TEXT NOT NULL UNIQUE, 

120 applied_at TEXT NOT NULL 

121 )""" 

122 ) 

123 conn.commit() 

124 

125 applied = {r["version"] for r in conn.execute("SELECT version FROM schema_migrations")} 

126 

127 for f in sorted(migrations_dir.glob("*.sql")): 

128 version = f.stem 

129 if version in applied: 

130 continue 

131 conn.executescript(f.read_text()) 

132 conn.execute( 

133 "INSERT INTO schema_migrations (version, applied_at) VALUES (?, ?)", 

134 (version, datetime.now(UTC).isoformat()), 

135 ) 

136 conn.commit() 

137 finally: 

138 conn.close() 

139 

140 def export_snapshot(self, dest: Path) -> None: 

141 """Online backup via ``sqlite3.Connection.backup()``. 

142 

143 Encrypted databases produce encrypted snapshots (same key). 

144 """ 

145 if self._encryption_key is not None: 

146 try: 

147 import sqlcipher3 as _sc 

148 except ImportError as exc: 

149 raise RuntimeError( 

150 "sqlcipher3 is required to snapshot an encrypted SQLite database." 

151 ) from exc 

152 hex_key = self._encryption_key.hex() 

153 src_conn = _sc.connect(self._db_path) 

154 src_conn.execute(f"PRAGMA key = \"x'{hex_key}'\"") # noqa: S608 

155 dst_conn = _sc.connect(str(dest)) 

156 dst_conn.execute(f"PRAGMA key = \"x'{hex_key}'\"") # noqa: S608 

157 try: 

158 src_conn.backup(dst_conn) 

159 finally: 

160 dst_conn.close() 

161 src_conn.close() 

162 else: 

163 src_conn = sqlite3.connect(self._db_path) 

164 dst_conn = sqlite3.connect(str(dest)) 

165 try: 

166 src_conn.backup(dst_conn) 

167 finally: 

168 dst_conn.close() 

169 src_conn.close() 

170 

171 def import_snapshot(self, src: Path) -> None: 

172 """Restore by replacing the current database file.""" 

173 import shutil 

174 

175 shutil.copy2(str(src), self._db_path)