Coverage for node / src / stigmem_node / storage / libsql_backend.py: 45%

138 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""libSQL / Turso adapter for StorageBackend. 

2 

3Supports two modes: 

4 * **Local** — ``db_path`` only; behaves like SQLite but via the libsql client. 

5 * **Embedded-replica** — ``db_path`` + ``sync_url`` + ``auth_token``; local 

6 SQLite file kept in sync with a Turso database. Drop-in for Fly.io 

7 persistent volumes. 

8 

9Encryption-at-rest is enabled by passing *encryption_key* (32 bytes). The key 

10is forwarded to ``libsql.connect()`` as a hex string. Both local and 

11embedded-replica modes support encryption; the Turso cloud primary uses its own 

12server-side encryption independently of the local replica key. 

13 

14Install the optional dependency before use:: 

15 

16 pip install "stigmem-node[libsql]" 

17""" 

18 

19from __future__ import annotations 

20 

21import re 

22import sqlite3 

23from collections.abc import Generator 

24from contextlib import contextmanager 

25from datetime import UTC, datetime 

26from pathlib import Path 

27from typing import Any 

28 

29from .base import StorageBackend 

30 

31 

32def _split_sql(sql: str) -> list[str]: 

33 """Split a SQL script into individual statements, stripping comments. 

34 

35 libsql-experimental does not expose ``executescript()``, so we split 

36 scripts into complete SQLite statements before executing each one. Trigger 

37 bodies contain ``BEGIN...END`` blocks with inner semicolons, so a plain 

38 string split would emit incomplete fragments. FTS5 virtual tables and their 

39 sync triggers are SQLite-only and are silently dropped so libsql-experimental 

40 (which lacks FTS5) can still run every migration. 

41 """ 

42 sql = re.sub(r"--[^\n]*", "", sql) 

43 sql = re.sub(r"/\*.*?\*/", "", sql, flags=re.DOTALL) 

44 stmts: list[str] = [] 

45 pending: list[str] = [] 

46 

47 for line in sql.splitlines(): 

48 if not line.strip(): 

49 continue 

50 pending.append(line) 

51 candidate = "\n".join(pending).strip() 

52 if not sqlite3.complete_statement(candidate): 

53 continue 

54 

55 pending.clear() 

56 stmt = candidate.removesuffix(";").strip() 

57 upper = stmt.upper() 

58 # Bare keywords left after splitting trigger/transaction bodies 

59 if upper in ("BEGIN", "END", "COMMIT", "ROLLBACK"): 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true

60 continue 

61 # FTS5 virtual table — not supported by libsql-experimental 

62 if re.search(r"CREATE\s+VIRTUAL\s+TABLE", stmt, re.IGNORECASE): 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true

63 continue 

64 # Trigger bodies and backfill inserts that reference facts_fts 

65 if re.search(r"\bfacts_fts\b", stmt, re.IGNORECASE): 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true

66 continue 

67 stmts.append(stmt) 

68 

69 if pending: 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true

70 raise ValueError("incomplete SQL statement in migration") 

71 

72 return stmts 

73 

74 

75class _LibSQLRow: 

76 """Dict-like row wrapper that supports ``row["col"]`` and ``row[i]`` access. 

77 

78 libsql-experimental's ``row_factory`` protocol mirrors sqlite3: the factory 

79 is called as ``factory(cursor, row_tuple)`` for each fetched row. 

80 """ 

81 

82 __slots__ = ("_data", "_values") 

83 

84 def __init__(self, cursor: Any, row: tuple[Any, ...]) -> None: 

85 cols = [d[0] for d in cursor.description] 

86 self._values: tuple[Any, ...] = row 

87 self._data: dict[str, Any] = dict(zip(cols, row, strict=True)) 

88 

89 def __getitem__(self, key: str | int) -> Any: 

90 if isinstance(key, int): 

91 return self._values[key] 

92 return self._data[key] 

93 

94 def __iter__(self) -> Any: 

95 return iter(self._values) 

96 

97 def keys(self) -> list[str]: 

98 return list(self._data.keys()) 

99 

100 def get(self, key: str, default: Any = None) -> Any: 

101 return self._data.get(key, default) 

102 

103 

104class _LibSQLCursor: 

105 """Wraps a raw libsql-experimental cursor and returns _LibSQLRow objects. 

106 

107 libsql-experimental has no row_factory on the connection; we wrap at the 

108 cursor level instead so all fetch methods return dict-accessible rows. 

109 """ 

110 

111 __slots__ = ("_cur",) 

112 

113 def __init__(self, cur: Any) -> None: 

114 self._cur = cur 

115 

116 @property 

117 def description(self) -> Any: 

118 return self._cur.description 

119 

120 @property 

121 def lastrowid(self) -> Any: 

122 return self._cur.lastrowid 

123 

124 @property 

125 def rowcount(self) -> int: 

126 count: int = self._cur.rowcount 

127 return count 

128 

129 def fetchone(self) -> Any: 

130 row = self._cur.fetchone() 

131 return None if row is None else _LibSQLRow(self._cur, row) 

132 

133 def fetchall(self) -> list[_LibSQLRow]: 

134 return [_LibSQLRow(self._cur, r) for r in self._cur.fetchall()] 

135 

136 def fetchmany(self, size: int | None = None) -> list[_LibSQLRow]: 

137 rows = self._cur.fetchmany(size) if size is not None else self._cur.fetchmany() 

138 return [_LibSQLRow(self._cur, r) for r in rows] 

139 

140 

141class _LibSQLConnection: 

142 """Wraps a raw libsql-experimental connection so execute() returns _LibSQLCursor.""" 

143 

144 __slots__ = ("_conn",) 

145 

146 def __init__(self, conn: Any) -> None: 

147 self._conn = conn 

148 

149 def execute(self, sql: str, params: Any = ()) -> _LibSQLCursor: 

150 # libsql-experimental only accepts tuples, not lists, for parameters. 

151 if isinstance(params, list): 

152 params = tuple(params) 

153 return _LibSQLCursor(self._conn.execute(sql, params)) 

154 

155 def executemany(self, sql: str, params: Any = ()) -> _LibSQLCursor: 

156 if not isinstance(params, list): 

157 params = list(params) 

158 return _LibSQLCursor(self._conn.executemany(sql, params)) 

159 

160 def commit(self) -> None: 

161 self._conn.commit() 

162 

163 def rollback(self) -> None: 

164 self._conn.rollback() 

165 

166 def close(self) -> None: 

167 self._conn.close() 

168 

169 

170class LibSQLBackend(StorageBackend): 

171 """libSQL backend (Turso-compatible). 

172 

173 In embedded-replica mode (``sync_url`` set) the local file acts as the 

174 read/write store; the client syncs with Turso on every new connection. 

175 In local mode (no ``sync_url``) the behaviour is equivalent to SQLite. 

176 

177 Pass *encryption_key* (32 bytes) to enable at-rest encryption for the local 

178 replica file. Requires ``libsql-experimental >= 0.0.4`` or a build that 

179 exposes the ``encryption_key`` parameter in ``libsql.connect()``. 

180 """ 

181 

182 def __init__( 

183 self, 

184 db_path: str, 

185 sync_url: str = "", 

186 auth_token: str = "", # nosec B107 — empty string is the correct default for optional Turso auth 

187 encryption_key: bytes | None = None, 

188 ) -> None: 

189 self._db_path = db_path 

190 self._sync_url = sync_url 

191 self._auth_token = auth_token 

192 self._encryption_key = encryption_key 

193 

194 @property 

195 def backend_name(self) -> str: 

196 return "libsql" 

197 

198 def _connect(self) -> Any: 

199 try: 

200 import libsql_experimental as libsql 

201 except ImportError as exc: 

202 raise RuntimeError( 

203 "libsql-experimental is required for the libSQL backend. " 

204 "Install it with: pip install 'stigmem-node[libsql]'" 

205 ) from exc 

206 

207 enc_kwargs: dict[str, str] = {} 

208 if self._encryption_key is not None: 

209 enc_kwargs["encryption_key"] = self._encryption_key.hex() 

210 

211 if self._sync_url: 

212 conn = libsql.connect( 

213 database=self._db_path, 

214 sync_url=self._sync_url, 

215 auth_token=self._auth_token, 

216 **enc_kwargs, 

217 ) 

218 conn.sync() 

219 else: 

220 conn = libsql.connect(database=self._db_path, **enc_kwargs) 

221 

222 conn.execute("PRAGMA foreign_keys=ON") 

223 return _LibSQLConnection(conn) 

224 

225 @contextmanager 

226 def connection(self) -> Generator[Any, None, None]: 

227 conn = self._connect() 

228 try: 

229 yield conn 

230 conn.commit() 

231 except Exception: 

232 conn.rollback() 

233 raise 

234 finally: 

235 conn.close() 

236 

237 def apply_migrations(self, migrations_dir: Path) -> None: 

238 conn = self._connect() 

239 try: 

240 conn.execute( 

241 """CREATE TABLE IF NOT EXISTS schema_migrations ( 

242 id INTEGER PRIMARY KEY AUTOINCREMENT, 

243 version TEXT NOT NULL UNIQUE, 

244 applied_at TEXT NOT NULL 

245 )""" 

246 ) 

247 conn.commit() 

248 

249 applied = { 

250 row["version"] 

251 for row in conn.execute("SELECT version FROM schema_migrations").fetchall() 

252 } 

253 

254 for f in sorted(migrations_dir.glob("*.sql")): 

255 version = f.stem 

256 if version in applied: 

257 continue 

258 for stmt in _split_sql(f.read_text()): 

259 conn.execute(stmt) 

260 conn.execute( 

261 "INSERT INTO schema_migrations (version, applied_at) VALUES (?, ?)", 

262 (version, datetime.now(UTC).isoformat()), 

263 ) 

264 conn.commit() 

265 finally: 

266 conn.close()