Coverage for node / src / stigmem_node / storage / libsql_backend.py: 45%
138 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""libSQL / Turso adapter for StorageBackend.
3Supports two modes:
4 * **Local** — ``db_path`` only; behaves like SQLite but via the libsql client.
5 * **Embedded-replica** — ``db_path`` + ``sync_url`` + ``auth_token``; local
6 SQLite file kept in sync with a Turso database. Drop-in for Fly.io
7 persistent volumes.
9Encryption-at-rest is enabled by passing *encryption_key* (32 bytes). The key
10is forwarded to ``libsql.connect()`` as a hex string. Both local and
11embedded-replica modes support encryption; the Turso cloud primary uses its own
12server-side encryption independently of the local replica key.
14Install the optional dependency before use::
16 pip install "stigmem-node[libsql]"
17"""
19from __future__ import annotations
21import re
22import sqlite3
23from collections.abc import Generator
24from contextlib import contextmanager
25from datetime import UTC, datetime
26from pathlib import Path
27from typing import Any
29from .base import StorageBackend
32def _split_sql(sql: str) -> list[str]:
33 """Split a SQL script into individual statements, stripping comments.
35 libsql-experimental does not expose ``executescript()``, so we split
36 scripts into complete SQLite statements before executing each one. Trigger
37 bodies contain ``BEGIN...END`` blocks with inner semicolons, so a plain
38 string split would emit incomplete fragments. FTS5 virtual tables and their
39 sync triggers are SQLite-only and are silently dropped so libsql-experimental
40 (which lacks FTS5) can still run every migration.
41 """
42 sql = re.sub(r"--[^\n]*", "", sql)
43 sql = re.sub(r"/\*.*?\*/", "", sql, flags=re.DOTALL)
44 stmts: list[str] = []
45 pending: list[str] = []
47 for line in sql.splitlines():
48 if not line.strip():
49 continue
50 pending.append(line)
51 candidate = "\n".join(pending).strip()
52 if not sqlite3.complete_statement(candidate):
53 continue
55 pending.clear()
56 stmt = candidate.removesuffix(";").strip()
57 upper = stmt.upper()
58 # Bare keywords left after splitting trigger/transaction bodies
59 if upper in ("BEGIN", "END", "COMMIT", "ROLLBACK"): 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true
60 continue
61 # FTS5 virtual table — not supported by libsql-experimental
62 if re.search(r"CREATE\s+VIRTUAL\s+TABLE", stmt, re.IGNORECASE): 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true
63 continue
64 # Trigger bodies and backfill inserts that reference facts_fts
65 if re.search(r"\bfacts_fts\b", stmt, re.IGNORECASE): 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true
66 continue
67 stmts.append(stmt)
69 if pending: 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true
70 raise ValueError("incomplete SQL statement in migration")
72 return stmts
75class _LibSQLRow:
76 """Dict-like row wrapper that supports ``row["col"]`` and ``row[i]`` access.
78 libsql-experimental's ``row_factory`` protocol mirrors sqlite3: the factory
79 is called as ``factory(cursor, row_tuple)`` for each fetched row.
80 """
82 __slots__ = ("_data", "_values")
84 def __init__(self, cursor: Any, row: tuple[Any, ...]) -> None:
85 cols = [d[0] for d in cursor.description]
86 self._values: tuple[Any, ...] = row
87 self._data: dict[str, Any] = dict(zip(cols, row, strict=True))
89 def __getitem__(self, key: str | int) -> Any:
90 if isinstance(key, int):
91 return self._values[key]
92 return self._data[key]
94 def __iter__(self) -> Any:
95 return iter(self._values)
97 def keys(self) -> list[str]:
98 return list(self._data.keys())
100 def get(self, key: str, default: Any = None) -> Any:
101 return self._data.get(key, default)
104class _LibSQLCursor:
105 """Wraps a raw libsql-experimental cursor and returns _LibSQLRow objects.
107 libsql-experimental has no row_factory on the connection; we wrap at the
108 cursor level instead so all fetch methods return dict-accessible rows.
109 """
111 __slots__ = ("_cur",)
113 def __init__(self, cur: Any) -> None:
114 self._cur = cur
116 @property
117 def description(self) -> Any:
118 return self._cur.description
120 @property
121 def lastrowid(self) -> Any:
122 return self._cur.lastrowid
124 @property
125 def rowcount(self) -> int:
126 count: int = self._cur.rowcount
127 return count
129 def fetchone(self) -> Any:
130 row = self._cur.fetchone()
131 return None if row is None else _LibSQLRow(self._cur, row)
133 def fetchall(self) -> list[_LibSQLRow]:
134 return [_LibSQLRow(self._cur, r) for r in self._cur.fetchall()]
136 def fetchmany(self, size: int | None = None) -> list[_LibSQLRow]:
137 rows = self._cur.fetchmany(size) if size is not None else self._cur.fetchmany()
138 return [_LibSQLRow(self._cur, r) for r in rows]
141class _LibSQLConnection:
142 """Wraps a raw libsql-experimental connection so execute() returns _LibSQLCursor."""
144 __slots__ = ("_conn",)
146 def __init__(self, conn: Any) -> None:
147 self._conn = conn
149 def execute(self, sql: str, params: Any = ()) -> _LibSQLCursor:
150 # libsql-experimental only accepts tuples, not lists, for parameters.
151 if isinstance(params, list):
152 params = tuple(params)
153 return _LibSQLCursor(self._conn.execute(sql, params))
155 def executemany(self, sql: str, params: Any = ()) -> _LibSQLCursor:
156 if not isinstance(params, list):
157 params = list(params)
158 return _LibSQLCursor(self._conn.executemany(sql, params))
160 def commit(self) -> None:
161 self._conn.commit()
163 def rollback(self) -> None:
164 self._conn.rollback()
166 def close(self) -> None:
167 self._conn.close()
170class LibSQLBackend(StorageBackend):
171 """libSQL backend (Turso-compatible).
173 In embedded-replica mode (``sync_url`` set) the local file acts as the
174 read/write store; the client syncs with Turso on every new connection.
175 In local mode (no ``sync_url``) the behaviour is equivalent to SQLite.
177 Pass *encryption_key* (32 bytes) to enable at-rest encryption for the local
178 replica file. Requires ``libsql-experimental >= 0.0.4`` or a build that
179 exposes the ``encryption_key`` parameter in ``libsql.connect()``.
180 """
182 def __init__(
183 self,
184 db_path: str,
185 sync_url: str = "",
186 auth_token: str = "", # nosec B107 — empty string is the correct default for optional Turso auth
187 encryption_key: bytes | None = None,
188 ) -> None:
189 self._db_path = db_path
190 self._sync_url = sync_url
191 self._auth_token = auth_token
192 self._encryption_key = encryption_key
194 @property
195 def backend_name(self) -> str:
196 return "libsql"
198 def _connect(self) -> Any:
199 try:
200 import libsql_experimental as libsql
201 except ImportError as exc:
202 raise RuntimeError(
203 "libsql-experimental is required for the libSQL backend. "
204 "Install it with: pip install 'stigmem-node[libsql]'"
205 ) from exc
207 enc_kwargs: dict[str, str] = {}
208 if self._encryption_key is not None:
209 enc_kwargs["encryption_key"] = self._encryption_key.hex()
211 if self._sync_url:
212 conn = libsql.connect(
213 database=self._db_path,
214 sync_url=self._sync_url,
215 auth_token=self._auth_token,
216 **enc_kwargs,
217 )
218 conn.sync()
219 else:
220 conn = libsql.connect(database=self._db_path, **enc_kwargs)
222 conn.execute("PRAGMA foreign_keys=ON")
223 return _LibSQLConnection(conn)
225 @contextmanager
226 def connection(self) -> Generator[Any, None, None]:
227 conn = self._connect()
228 try:
229 yield conn
230 conn.commit()
231 except Exception:
232 conn.rollback()
233 raise
234 finally:
235 conn.close()
237 def apply_migrations(self, migrations_dir: Path) -> None:
238 conn = self._connect()
239 try:
240 conn.execute(
241 """CREATE TABLE IF NOT EXISTS schema_migrations (
242 id INTEGER PRIMARY KEY AUTOINCREMENT,
243 version TEXT NOT NULL UNIQUE,
244 applied_at TEXT NOT NULL
245 )"""
246 )
247 conn.commit()
249 applied = {
250 row["version"]
251 for row in conn.execute("SELECT version FROM schema_migrations").fetchall()
252 }
254 for f in sorted(migrations_dir.glob("*.sql")):
255 version = f.stem
256 if version in applied:
257 continue
258 for stmt in _split_sql(f.read_text()):
259 conn.execute(stmt)
260 conn.execute(
261 "INSERT INTO schema_migrations (version, applied_at) VALUES (?, ?)",
262 (version, datetime.now(UTC).isoformat()),
263 )
264 conn.commit()
265 finally:
266 conn.close()