Coverage for node / src / stigmem_node / models / subscriptions.py: 100%

45 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Subscription models.""" 

2 

3from __future__ import annotations 

4 

5from typing import Any 

6 

7from pydantic import BaseModel, Field, field_validator 

8 

9VALID_ON_CHANGE = {"webhook", "wake"} 

10 

11 

12class SubscriptionCreateRequest(BaseModel): 

13 target: str = Field( 

14 ..., 

15 min_length=1, 

16 description="Scope name (e.g. 'local') or entity URI to watch", 

17 ) 

18 on_change: str = Field(..., description="'webhook' or 'wake'") 

19 delivery_address: str = Field( 

20 ..., 

21 min_length=1, 

22 description="Webhook URL (on_change=webhook) or identity URI (on_change=wake)", 

23 ) 

24 idempotency_key: str | None = Field(None, description="Optional client idempotency key") 

25 

26 @field_validator("on_change") 

27 @classmethod 

28 def check_on_change(cls, v: str) -> str: 

29 if v not in VALID_ON_CHANGE: 

30 raise ValueError(f"on_change must be one of {VALID_ON_CHANGE}") 

31 return v 

32 

33 

34class SubscriptionRecord(BaseModel): 

35 id: str 

36 subscriber_identity: str 

37 target: str 

38 target_kind: str 

39 on_change: str 

40 delivery_address: str 

41 idempotency_key: str | None 

42 created_at: str 

43 last_delivered_at: str | None 

44 circuit_open: bool 

45 consecutive_failures: int 

46 

47 

48class SubscriptionEventRecord(BaseModel): 

49 id: str 

50 subscription_id: str 

51 event_type: str 

52 entity_uri: str | None 

53 fact_id: str | None 

54 payload: dict[str, Any] 

55 created_at: str 

56 delivered_at: str | None 

57 delivery_status: str 

58 delivery_attempts: int 

59 

60 

61class SubscriptionListResponse(BaseModel): 

62 subscriptions: list[SubscriptionRecord] 

63 total: int 

64 

65 

66class SubscriptionEventsResponse(BaseModel): 

67 events: list[SubscriptionEventRecord] 

68 total: int 

69 cursor: str | None 

70