Coverage for node / src / stigmem_node / hlc.py: 97%
60 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Hybrid Logical Clock — spec §2.4.
3One global HLC instance per node process. Thread-safe.
5Format: "{wall_ms_utc}.{counter}" e.g. "1746230400000.003"
6"""
8from __future__ import annotations
10import threading
11import time
14class HLCRemoteSkewError(ValueError):
15 """Remote HLC wall time is outside the configured federation skew bound."""
17 def __init__(
18 self,
19 *,
20 remote_wall_ms: int,
21 local_wall_ms: int,
22 max_future_skew_ms: int,
23 max_past_skew_ms: int,
24 ) -> None:
25 self.remote_wall_ms = remote_wall_ms
26 self.local_wall_ms = local_wall_ms
27 self.max_future_skew_ms = max_future_skew_ms
28 self.max_past_skew_ms = max_past_skew_ms
29 self.skew_ms = remote_wall_ms - local_wall_ms
30 if self.skew_ms > max_future_skew_ms:
31 self.direction = "future"
32 else:
33 self.direction = "past"
34 super().__init__(
35 "remote HLC wall time is outside configured skew bound "
36 f"(direction={self.direction}, skew_ms={self.skew_ms})"
37 )
40def _parse(s: str) -> tuple[int, int]:
41 parts = s.split(".", 1)
42 return int(parts[0]), int(parts[1]) if len(parts) > 1 else 0
45class HLC:
46 def __init__(self) -> None:
47 self._lock = threading.Lock()
48 self._wall_ms: int = 0
49 self._counter: int = 0
51 def tick(self) -> str:
52 """Advance on local write (§2.4 rule 1)."""
53 with self._lock:
54 now = int(time.time() * 1000)
55 if now > self._wall_ms:
56 self._wall_ms = now
57 self._counter = 0
58 else:
59 self._counter += 1
60 return f"{self._wall_ms}.{self._counter}"
62 def receive(
63 self,
64 remote: str,
65 *,
66 max_future_skew_ms: int | None = None,
67 max_past_skew_ms: int | None = None,
68 ) -> str:
69 """Advance on receiving a federated fact (§2.4 rule 2)."""
70 r_wall, r_ctr = _parse(remote)
71 with self._lock:
72 now = int(time.time() * 1000)
73 future_limit = max_future_skew_ms if max_future_skew_ms is not None else 0
74 past_limit = max_past_skew_ms if max_past_skew_ms is not None else 0
75 if future_limit > 0 and r_wall - now > future_limit:
76 raise HLCRemoteSkewError(
77 remote_wall_ms=r_wall,
78 local_wall_ms=now,
79 max_future_skew_ms=future_limit,
80 max_past_skew_ms=past_limit,
81 )
82 if past_limit > 0 and now - r_wall > past_limit:
83 raise HLCRemoteSkewError(
84 remote_wall_ms=r_wall,
85 local_wall_ms=now,
86 max_future_skew_ms=future_limit,
87 max_past_skew_ms=past_limit,
88 )
89 new_wall = max(now, self._wall_ms, r_wall)
90 if new_wall == self._wall_ms == r_wall: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true
91 self._counter = max(self._counter, r_ctr) + 1
92 elif new_wall == self._wall_ms:
93 self._counter += 1
94 elif new_wall == r_wall:
95 self._counter = r_ctr + 1
96 else:
97 self._counter = 0
98 self._wall_ms = new_wall
99 return f"{self._wall_ms}.{self._counter}"
101 @staticmethod
102 def compare(a: str, b: str) -> int:
103 """Causal ordering: returns -1, 0, or 1."""
104 at = _parse(a)
105 bt = _parse(b)
106 if at < bt:
107 return -1
108 if at > bt:
109 return 1
110 return 0
113node_hlc = HLC()