Coverage for node / src / stigmem_node / jobs.py: 85%

36 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Async job store — spec §14.5 / §15.4. 

2 

3Jobs are created by lint/decay routes when the scope exceeds the async threshold 

4(default 100,000 facts). Callers poll GET /v1/{lint,decay}/jobs/:job_id until 

5status is "done" or "failed". 

6""" 

7 

8from __future__ import annotations 

9 

10import json 

11import uuid 

12from datetime import UTC, datetime 

13from typing import Any 

14 

15from .db import db 

16 

17 

18def create_job(job_type: str, scope: str | None, estimated_s: int) -> str: 

19 job_id = str(uuid.uuid4()) 

20 now = datetime.now(UTC).isoformat() 

21 with db() as conn: 

22 conn.execute( 

23 "INSERT INTO jobs (id, job_type, status, scope, estimated_s, created_at)" 

24 " VALUES (?, ?, 'pending', ?, ?, ?)", 

25 (job_id, job_type, scope, estimated_s, now), 

26 ) 

27 return job_id 

28 

29 

30def get_job(job_id: str, job_type: str) -> dict[str, Any] | None: 

31 """Return the job record, or None if not found (or wrong type).""" 

32 with db() as conn: 

33 row = conn.execute( 

34 "SELECT * FROM jobs WHERE id = ? AND job_type = ?", (job_id, job_type) 

35 ).fetchone() 

36 if row is None: 

37 return None 

38 out: dict[str, Any] = { 

39 "job_id": row["id"], 

40 "status": row["status"], 

41 "scope": row["scope"], 

42 "estimated_s": row["estimated_s"], 

43 "created_at": row["created_at"], 

44 } 

45 if row["started_at"]: 45 ↛ 47line 45 didn't jump to line 47 because the condition on line 45 was always true

46 out["started_at"] = row["started_at"] 

47 if row["completed_at"]: 47 ↛ 49line 47 didn't jump to line 49 because the condition on line 47 was always true

48 out["completed_at"] = row["completed_at"] 

49 if row["result_json"]: 49 ↛ 51line 49 didn't jump to line 51 because the condition on line 49 was always true

50 out.update(json.loads(row["result_json"])) 

51 if row["error"]: 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true

52 out["error"] = row["error"] 

53 return out 

54 

55 

56def mark_running(job_id: str) -> None: 

57 with db() as conn: 

58 conn.execute( 

59 "UPDATE jobs SET status='running', started_at=? WHERE id=?", 

60 (datetime.now(UTC).isoformat(), job_id), 

61 ) 

62 

63 

64def mark_done(job_id: str, result: dict[str, Any]) -> None: 

65 with db() as conn: 

66 conn.execute( 

67 "UPDATE jobs SET status='done', completed_at=?, result_json=? WHERE id=?", 

68 (datetime.now(UTC).isoformat(), json.dumps(result), job_id), 

69 ) 

70 

71 

72def mark_failed(job_id: str, error: str) -> None: 

73 with db() as conn: 

74 conn.execute( 

75 "UPDATE jobs SET status='failed', completed_at=?, error=? WHERE id=?", 

76 (datetime.now(UTC).isoformat(), error, job_id), 

77 )