Coverage for node / src / stigmem_node / jobs.py: 85%
36 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Async job store — spec §14.5 / §15.4.
3Jobs are created by lint/decay routes when the scope exceeds the async threshold
4(default 100,000 facts). Callers poll GET /v1/{lint,decay}/jobs/:job_id until
5status is "done" or "failed".
6"""
8from __future__ import annotations
10import json
11import uuid
12from datetime import UTC, datetime
13from typing import Any
15from .db import db
18def create_job(job_type: str, scope: str | None, estimated_s: int) -> str:
19 job_id = str(uuid.uuid4())
20 now = datetime.now(UTC).isoformat()
21 with db() as conn:
22 conn.execute(
23 "INSERT INTO jobs (id, job_type, status, scope, estimated_s, created_at)"
24 " VALUES (?, ?, 'pending', ?, ?, ?)",
25 (job_id, job_type, scope, estimated_s, now),
26 )
27 return job_id
30def get_job(job_id: str, job_type: str) -> dict[str, Any] | None:
31 """Return the job record, or None if not found (or wrong type)."""
32 with db() as conn:
33 row = conn.execute(
34 "SELECT * FROM jobs WHERE id = ? AND job_type = ?", (job_id, job_type)
35 ).fetchone()
36 if row is None:
37 return None
38 out: dict[str, Any] = {
39 "job_id": row["id"],
40 "status": row["status"],
41 "scope": row["scope"],
42 "estimated_s": row["estimated_s"],
43 "created_at": row["created_at"],
44 }
45 if row["started_at"]: 45 ↛ 47line 45 didn't jump to line 47 because the condition on line 45 was always true
46 out["started_at"] = row["started_at"]
47 if row["completed_at"]: 47 ↛ 49line 47 didn't jump to line 49 because the condition on line 47 was always true
48 out["completed_at"] = row["completed_at"]
49 if row["result_json"]: 49 ↛ 51line 49 didn't jump to line 51 because the condition on line 49 was always true
50 out.update(json.loads(row["result_json"]))
51 if row["error"]: 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true
52 out["error"] = row["error"]
53 return out
56def mark_running(job_id: str) -> None:
57 with db() as conn:
58 conn.execute(
59 "UPDATE jobs SET status='running', started_at=? WHERE id=?",
60 (datetime.now(UTC).isoformat(), job_id),
61 )
64def mark_done(job_id: str, result: dict[str, Any]) -> None:
65 with db() as conn:
66 conn.execute(
67 "UPDATE jobs SET status='done', completed_at=?, result_json=? WHERE id=?",
68 (datetime.now(UTC).isoformat(), json.dumps(result), job_id),
69 )
72def mark_failed(job_id: str, error: str) -> None:
73 with db() as conn:
74 conn.execute(
75 "UPDATE jobs SET status='failed', completed_at=?, error=? WHERE id=?",
76 (datetime.now(UTC).isoformat(), error, job_id),
77 )