Coverage for node / src / stigmem_node / hlc.py: 97%

60 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-25 01:49 +0000

1"""Hybrid Logical Clock — spec §2.4. 

2 

3One global HLC instance per node process. Thread-safe. 

4 

5Format: "{wall_ms_utc}.{counter}" e.g. "1746230400000.003" 

6""" 

7 

8from __future__ import annotations 

9 

10import threading 

11import time 

12 

13 

14class HLCRemoteSkewError(ValueError): 

15 """Remote HLC wall time is outside the configured federation skew bound.""" 

16 

17 def __init__( 

18 self, 

19 *, 

20 remote_wall_ms: int, 

21 local_wall_ms: int, 

22 max_future_skew_ms: int, 

23 max_past_skew_ms: int, 

24 ) -> None: 

25 self.remote_wall_ms = remote_wall_ms 

26 self.local_wall_ms = local_wall_ms 

27 self.max_future_skew_ms = max_future_skew_ms 

28 self.max_past_skew_ms = max_past_skew_ms 

29 self.skew_ms = remote_wall_ms - local_wall_ms 

30 if self.skew_ms > max_future_skew_ms: 

31 self.direction = "future" 

32 else: 

33 self.direction = "past" 

34 super().__init__( 

35 "remote HLC wall time is outside configured skew bound " 

36 f"(direction={self.direction}, skew_ms={self.skew_ms})" 

37 ) 

38 

39 

40def _parse(s: str) -> tuple[int, int]: 

41 parts = s.split(".", 1) 

42 return int(parts[0]), int(parts[1]) if len(parts) > 1 else 0 

43 

44 

45class HLC: 

46 def __init__(self) -> None: 

47 self._lock = threading.Lock() 

48 self._wall_ms: int = 0 

49 self._counter: int = 0 

50 

51 def tick(self) -> str: 

52 """Advance on local write (§2.4 rule 1).""" 

53 with self._lock: 

54 now = int(time.time() * 1000) 

55 if now > self._wall_ms: 

56 self._wall_ms = now 

57 self._counter = 0 

58 else: 

59 self._counter += 1 

60 return f"{self._wall_ms}.{self._counter}" 

61 

62 def receive( 

63 self, 

64 remote: str, 

65 *, 

66 max_future_skew_ms: int | None = None, 

67 max_past_skew_ms: int | None = None, 

68 ) -> str: 

69 """Advance on receiving a federated fact (§2.4 rule 2).""" 

70 r_wall, r_ctr = _parse(remote) 

71 with self._lock: 

72 now = int(time.time() * 1000) 

73 future_limit = max_future_skew_ms if max_future_skew_ms is not None else 0 

74 past_limit = max_past_skew_ms if max_past_skew_ms is not None else 0 

75 if future_limit > 0 and r_wall - now > future_limit: 

76 raise HLCRemoteSkewError( 

77 remote_wall_ms=r_wall, 

78 local_wall_ms=now, 

79 max_future_skew_ms=future_limit, 

80 max_past_skew_ms=past_limit, 

81 ) 

82 if past_limit > 0 and now - r_wall > past_limit: 

83 raise HLCRemoteSkewError( 

84 remote_wall_ms=r_wall, 

85 local_wall_ms=now, 

86 max_future_skew_ms=future_limit, 

87 max_past_skew_ms=past_limit, 

88 ) 

89 new_wall = max(now, self._wall_ms, r_wall) 

90 if new_wall == self._wall_ms == r_wall: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true

91 self._counter = max(self._counter, r_ctr) + 1 

92 elif new_wall == self._wall_ms: 

93 self._counter += 1 

94 elif new_wall == r_wall: 

95 self._counter = r_ctr + 1 

96 else: 

97 self._counter = 0 

98 self._wall_ms = new_wall 

99 return f"{self._wall_ms}.{self._counter}" 

100 

101 @staticmethod 

102 def compare(a: str, b: str) -> int: 

103 """Causal ordering: returns -1, 0, or 1.""" 

104 at = _parse(a) 

105 bt = _parse(b) 

106 if at < bt: 

107 return -1 

108 if at > bt: 

109 return 1 

110 return 0 

111 

112 

113node_hlc = HLC()