Coverage for node / src / stigmem_node / models / subscriptions.py: 100%
45 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Subscription models."""
3from __future__ import annotations
5from typing import Any
7from pydantic import BaseModel, Field, field_validator
9VALID_ON_CHANGE = {"webhook", "wake"}
12class SubscriptionCreateRequest(BaseModel):
13 target: str = Field(
14 ...,
15 min_length=1,
16 description="Scope name (e.g. 'local') or entity URI to watch",
17 )
18 on_change: str = Field(..., description="'webhook' or 'wake'")
19 delivery_address: str = Field(
20 ...,
21 min_length=1,
22 description="Webhook URL (on_change=webhook) or identity URI (on_change=wake)",
23 )
24 idempotency_key: str | None = Field(None, description="Optional client idempotency key")
26 @field_validator("on_change")
27 @classmethod
28 def check_on_change(cls, v: str) -> str:
29 if v not in VALID_ON_CHANGE:
30 raise ValueError(f"on_change must be one of {VALID_ON_CHANGE}")
31 return v
34class SubscriptionRecord(BaseModel):
35 id: str
36 subscriber_identity: str
37 target: str
38 target_kind: str
39 on_change: str
40 delivery_address: str
41 idempotency_key: str | None
42 created_at: str
43 last_delivered_at: str | None
44 circuit_open: bool
45 consecutive_failures: int
48class SubscriptionEventRecord(BaseModel):
49 id: str
50 subscription_id: str
51 event_type: str
52 entity_uri: str | None
53 fact_id: str | None
54 payload: dict[str, Any]
55 created_at: str
56 delivered_at: str | None
57 delivery_status: str
58 delivery_attempts: int
61class SubscriptionListResponse(BaseModel):
62 subscriptions: list[SubscriptionRecord]
63 total: int
66class SubscriptionEventsResponse(BaseModel):
67 events: list[SubscriptionEventRecord]
68 total: int
69 cursor: str | None