Coverage for node / src / stigmem_node / billing.py: 97%
31 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Billing hook bus — emit usage events on write operations.
3Event types:
4 fact_written — emitted after every successful POST /v1/facts
5 garden_created — emitted after every successful POST /v1/gardens
7The default bus logs structured JSON to stderr. Replace with a queue-backed
8implementation by calling set_hook_bus() at startup (or in tests via a
9CaptureBus). The interface is intentionally narrow so it can be wired to any
10message queue without changing call sites.
11"""
13from __future__ import annotations
15import json
16import sys
17from dataclasses import asdict, dataclass, field
18from datetime import UTC, datetime
19from typing import Protocol, runtime_checkable
22@dataclass
23class BillingEvent:
24 event_type: str # "fact_written" | "garden_created"
25 tenant_id: str
26 entity_uri: str # caller's entity_uri
27 fact_id: str | None = None
28 garden_id: str | None = None
29 ts: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
32@runtime_checkable
33class HookBus(Protocol):
34 def emit(self, event: BillingEvent) -> None:
35 raise NotImplementedError
38class LogHookBus:
39 """Writes billing events as structured JSON to stderr (default implementation)."""
41 def emit(self, event: BillingEvent) -> None:
42 print(json.dumps({"billing": asdict(event)}), file=sys.stderr)
45class CaptureBus:
46 """In-memory bus for tests — collects emitted events for assertion."""
48 def __init__(self) -> None:
49 self.events: list[BillingEvent] = []
51 def emit(self, event: BillingEvent) -> None:
52 self.events.append(event)
55_bus: HookBus = LogHookBus()
58def get_hook_bus() -> HookBus:
59 return _bus
62def set_hook_bus(bus: HookBus) -> None:
63 global _bus
64 _bus = bus