diff --git a/src/lean_spec/subspecs/validator/constants.py b/src/lean_spec/subspecs/validator/constants.py new file mode 100644 index 00000000..9e9a7345 --- /dev/null +++ b/src/lean_spec/subspecs/validator/constants.py @@ -0,0 +1,43 @@ +"""Validator duty-gate thresholds. + +Informative, not normative: + +- Shape when this node signs. +- Do not change what consensus accepts. +- Clients may diverge without breaking interop. +""" + +from typing import Final + +SYNC_LAG_THRESHOLD: Final[int] = 4 +"""Slot lag past which the local view is too stale to sign. + +Why: + We justify and finalize within a handful of slots. + A 4-slot lag is one full justification window behind real time. + A vote from that view lands on a subtree the network has left. +""" + +NETWORK_STALL_THRESHOLD: Final[int] = 8 +"""Slot lag past which the whole network is treated as stalled. + +Why: + Set to twice the local threshold (8 = 2 * 4). + Ordinary jitter at the local boundary must not trip this branch. + +Effect: + Even the freshest locally validated block is 8 slots behind. + The cause is a streak of skipped proposals, not this node lagging. + Duties stay live so the chain can advance through the gap. +""" + +HYSTERESIS_BAND: Final[int] = 2 +"""Slot band that holds the gate closed near the threshold. + +Why: + Without a band a single late gossip block flips the decision. + Slot-over-slot flips would stutter the attestation stream. + +Effect: + Once closed, the gate reopens only when lag drops to 4 - 2 = 2. +""" diff --git a/src/lean_spec/subspecs/validator/service.py b/src/lean_spec/subspecs/validator/service.py index ab60e12d..b129b87c 100644 --- a/src/lean_spec/subspecs/validator/service.py +++ b/src/lean_spec/subspecs/validator/service.py @@ -54,6 +54,7 @@ from lean_spec.subspecs.xmss.containers import Signature from lean_spec.types import Bytes32, Slot, Uint64, ValidatorIndex +from .constants import HYSTERESIS_BAND, NETWORK_STALL_THRESHOLD, SYNC_LAG_THRESHOLD from .registry import ValidatorEntry, ValidatorRegistry logger = logging.getLogger(__name__) @@ -111,6 +112,15 @@ class ValidatorService: _attested_slots: set[Slot] = field(default_factory=set, repr=False) """Slots for which we've already produced attestations (prevents duplicates).""" + _blocks_skipped_lag: int = field(default=0, repr=False) + """Block proposals skipped because the local view was too stale.""" + + _attestations_skipped_lag: int = field(default=0, repr=False) + """Attestations skipped because the local view was too stale.""" + + _duty_gate_closed: bool = field(default=False, repr=False) + """Hysteresis flag. True while signing is silenced.""" + async def run(self) -> None: """ Main loop - check duties every interval. @@ -168,7 +178,10 @@ async def run(self) -> None: # # Check if any of our validators is the proposer. logger.debug("ValidatorService: checking block production for slot %d", slot) - await self._maybe_produce_block(slot) + if self._is_synced_for_duties(slot, "block"): + await self._maybe_produce_block(slot) + else: + self._blocks_skipped_lag += 1 logger.debug("ValidatorService: done block production check for slot %d", slot) # Re-fetch interval after block production. @@ -191,22 +204,41 @@ async def run(self) -> None: slot, slot in self._attested_slots, ) - if interval >= Uint64(1) and slot not in self._attested_slots: + # Decide whether this iteration owes an attestation. + # + # Two conditions: + # + # - Interval has reached the attestation slot (>= 1). + # - This slot has not already been attested. + # + # Why split eligibility from the sync gate: the skip counter + # must only tick on real misses, never on wrong-interval + # iterations. + needs_attestation = interval >= Uint64(1) and slot not in self._attested_slots + if needs_attestation: logger.debug( "ValidatorService: producing attestations for slot %d (interval %d)", slot, interval, ) - await self._produce_attestations(slot) - logger.debug("ValidatorService: done producing attestations for slot %d", slot) - self._attested_slots.add(slot) - - # Prune old entries to prevent unbounded growth. + # Apply the sync gate. # - # Keep only recent slots (current slot - 4) to bound memory usage. - # We never need to attest for slots that far in the past. - prune_threshold = Slot(max(0, int(slot) - 4)) - self._attested_slots = {s for s in self._attested_slots if s >= prune_threshold} + # Invariant: a gated slot stays out of the attested set. + # If the node catches up before the slot ends, the next + # iteration retries the duty. + if self._is_synced_for_duties(slot, "attestation"): + await self._produce_attestations(slot) + logger.debug("ValidatorService: done producing attestations for slot %d", slot) + self._attested_slots.add(slot) + + # Prune old entries to bound memory. + # + # Keep only slots at or after (current slot - 4). + # Older slots are no longer attestable. + prune_threshold = Slot(max(0, int(slot) - 4)) + self._attested_slots = {s for s in self._attested_slots if s >= prune_threshold} + else: + self._attestations_skipped_lag += 1 # Intervals 2-4 have no additional validator duties. @@ -498,6 +530,104 @@ def _sign_with_key( self.registry.add(updated_entry) return updated_entry, signature + def _is_synced_for_duties( + self, + slot: Slot, + duty: Literal["block", "attestation"], + ) -> bool: + """Decide whether duties may run for the given slot. + + Combines local lag and local-store stall evidence with + hysteresis. Returns False only when the local view is stale + relative to a network that is otherwise making progress. + + Args: + slot: Wall-clock slot for which a duty would run. + duty: Tag for the transition log. + + Returns: + True when duties should run, False to silence them. + """ + store = self.sync_service.store + head_block = store.blocks.get(store.head) + + # No head: nothing to compare against, let downstream code no-op. + if head_block is None: + return True + + head_slot = head_block.slot + + # Saturate at zero lag when the head is ahead of wall clock. + # + # Why: + # Local clock drift is normal. Unconditional trust would let + # a chain 100 slots in the future bypass every check. + lag = 0 if head_slot >= slot else int(slot - head_slot) + + # Local stall evidence from the block map. + # + # Why: + # Only blocks with valid signatures enter the map, so the + # freshest entry is an authenticated lower bound on the + # network tip. A stale max here means the network is not + # producing. + max_seen_slot = max( + (b.slot for b in store.blocks.values()), + default=head_slot, + ) + network_lag = 0 if max_seen_slot >= slot else int(slot - max_seen_slot) + network_stalling = network_lag > NETWORK_STALL_THRESHOLD + + # Decision matrix: + # + # - Network stalling: keep signing, reopen if currently closed. + # - Gate closed: reopen only when lag drops to 4 - 2 = 2. + # - Gate open: close as soon as lag crosses 4. + if network_stalling: + allow = True + if self._duty_gate_closed: + self._duty_gate_closed = False + logger.info( + "Validator duty gate reopened: network stall detected. " + "duty=%s slot=%d head_slot=%d lag=%d max_seen_slot=%d network_lag=%d", + duty, + int(slot), + int(head_slot), + lag, + int(max_seen_slot), + network_lag, + ) + elif self._duty_gate_closed: + # Hysteresis: reopen only well below the threshold. + allow = lag <= SYNC_LAG_THRESHOLD - HYSTERESIS_BAND + if allow: + self._duty_gate_closed = False + logger.info( + "Validator duty gate reopened: local view caught up. " + "duty=%s slot=%d head_slot=%d lag=%d", + duty, + int(slot), + int(head_slot), + lag, + ) + else: + # Open gate: close once the local threshold is crossed. + allow = lag <= SYNC_LAG_THRESHOLD + if not allow: + self._duty_gate_closed = True + logger.info( + "Validator duty gate closed: local view is stale. " + "duty=%s slot=%d head_slot=%d lag=%d max_seen_slot=%d network_lag=%d", + duty, + int(slot), + int(head_slot), + lag, + int(max_seen_slot), + network_lag, + ) + + return allow + def stop(self) -> None: """ Stop the service. @@ -521,3 +651,18 @@ def blocks_produced(self) -> int: def attestations_produced(self) -> int: """Total attestations produced since creation.""" return self._attestations_produced + + @property + def blocks_skipped_lag(self) -> int: + """Block proposals skipped because the local view was too stale.""" + return self._blocks_skipped_lag + + @property + def attestations_skipped_lag(self) -> int: + """Attestations skipped because the local view was too stale.""" + return self._attestations_skipped_lag + + @property + def duty_gate_closed(self) -> bool: + """True while the sync-lag gate is silencing duties.""" + return self._duty_gate_closed diff --git a/tests/lean_spec/subspecs/validator/test_service.py b/tests/lean_spec/subspecs/validator/test_service.py index 0d0a4ebf..5a0c067f 100644 --- a/tests/lean_spec/subspecs/validator/test_service.py +++ b/tests/lean_spec/subspecs/validator/test_service.py @@ -22,6 +22,7 @@ from lean_spec.subspecs.sync.peer_manager import PeerManager from lean_spec.subspecs.sync.service import SyncService from lean_spec.subspecs.validator import ValidatorRegistry, ValidatorService +from lean_spec.subspecs.validator.constants import SYNC_LAG_THRESHOLD from lean_spec.subspecs.validator.registry import ValidatorEntry from lean_spec.subspecs.xmss import TARGET_SIGNATURE_SCHEME from lean_spec.subspecs.xmss.aggregation import AggregatedSignatureProof @@ -1315,3 +1316,313 @@ async def capture_attestation(attestation: SignedAttestation) -> None: sig=signed_att.signature, ) assert not is_invalid, "Signature should not verify with the wrong slot" + + +def _replace_head_at_slot(sync_service: SyncService, head_slot: Slot) -> None: + """Rewrite the head block at the given slot, preserving the map invariant. + + Preserves + --------- + - All other blocks already in the store stay in place. + - The new head is keyed by the cryptographic root of its content. + + Why + --- + The duty gate reads both the head block and the freshest block in + the map. A helper that broke the key-equals-root invariant would + mask real bugs. + """ + blocks = dict(sync_service.store.blocks) + old_head_block = blocks.pop(sync_service.store.head) + new_head_block = old_head_block.model_copy(update={"slot": head_slot}) + new_root = hash_tree_root(new_head_block) + blocks[new_root] = new_head_block + sync_service.store = sync_service.store.model_copy(update={"blocks": blocks, "head": new_root}) + + +def _add_block_at_slot(sync_service: SyncService, slot: Slot) -> Bytes32: + """Add a non-head block at the given slot, returning its root. + + Why + --- + Injects freshness evidence without touching the head. The gate's + stall signal scans the highest slot across every block in the map. + """ + template = next(iter(sync_service.store.blocks.values())) + new_block = template.model_copy(update={"slot": slot}) + new_root = hash_tree_root(new_block) + new_blocks = {**sync_service.store.blocks, new_root: new_block} + sync_service.store = sync_service.store.model_copy(update={"blocks": new_blocks}) + return new_root + + +def _build_gate_service(sync_service: SyncService) -> ValidatorService: + """Build a service for gate-only tests with an empty registry. + + The gate logic never consults the registry, so emptying it keeps + the focus on the predicate under test. + """ + return ValidatorService( + sync_service=sync_service, + clock=SlotClock(genesis_time=Uint64(0)), + registry=ValidatorRegistry(), + ) + + +class TestSyncLagGate: + """Sync-lag duty gate. + + Decision matrix + --------------- + - Lag at or under threshold: duties run. + - Lag over threshold, fresh blocks locally: duties skip. + - Lag over threshold, no fresh blocks: duties run (network stall). + - Once closed, the gate reopens only after lag drops past the band. + """ + + def test_lag_within_threshold_allows_duties(self, sync_service: SyncService) -> None: + """Lag 0..threshold leaves the gate open.""" + + # Head at slot 10, wall clock sweeps 10..14 (lag 0..4). + _replace_head_at_slot(sync_service, Slot(10)) + service = _build_gate_service(sync_service) + + # Every lag in the inclusive range must pass. + for lag in range(SYNC_LAG_THRESHOLD + 1): + assert service._is_synced_for_duties(Slot(10 + lag), "block") + + def test_lag_over_threshold_with_fresh_local_block_gates( + self, sync_service: SyncService + ) -> None: + """Stale head plus a fresh local block: gate closes.""" + + # Head at slot 10, wall clock at 20: local lag 10 (> 4). + _replace_head_at_slot(sync_service, Slot(10)) + + # Fresh local block at slot 20 makes the freshest seen slot 20. + # Network is not stalling, only local lag drives the decision. + _add_block_at_slot(sync_service, Slot(20)) + service = _build_gate_service(sync_service) + + assert not service._is_synced_for_duties(Slot(20), "block") + + def test_clock_skew_saturates_to_zero_lag(self, sync_service: SyncService) -> None: + """Head ahead of wall clock saturates to zero lag, not unlimited trust.""" + + # Head at slot 20, wall clock at slot 15: head leads by 5 slots. + # Saturation pins lag at 0, which trivially passes the threshold. + _replace_head_at_slot(sync_service, Slot(20)) + service = _build_gate_service(sync_service) + + assert service._is_synced_for_duties(Slot(15), "block") + + def test_no_extra_blocks_treats_isolation_as_network_stall( + self, sync_service: SyncService + ) -> None: + """Isolated node with only a stale head: gate stays open.""" + + # Head at slot 0, wall clock at slot 100, nothing else in the map. + # Freshest seen slot is 0, network lag is 100 (> 8): stall fires. + _replace_head_at_slot(sync_service, Slot(0)) + service = _build_gate_service(sync_service) + + assert service._is_synced_for_duties(Slot(100), "block") + + def test_network_wide_stall_keeps_duties_live(self, sync_service: SyncService) -> None: + """All locally-known blocks stale: gate stays open.""" + + # Head at slot 0, wall clock at slot 50, no fresh blocks. + # Network lag 50 (> 8). Without this branch every validator + # would silence at once and recovery would be impossible. + _replace_head_at_slot(sync_service, Slot(0)) + service = _build_gate_service(sync_service) + + assert service._is_synced_for_duties(Slot(50), "block") + + def test_boundary_lag_equal_threshold_allowed(self, sync_service: SyncService) -> None: + """Lag exactly at the threshold (4) leaves the gate open.""" + + # Head at slot 10, wall clock at slot 14: lag equals threshold. + # Fresh block at slot 14 keeps the stall branch from masking this. + _replace_head_at_slot(sync_service, Slot(10)) + _add_block_at_slot(sync_service, Slot(14)) + service = _build_gate_service(sync_service) + + assert service._is_synced_for_duties(Slot(10 + SYNC_LAG_THRESHOLD), "block") + + def test_boundary_lag_one_over_threshold_gated(self, sync_service: SyncService) -> None: + """Lag of threshold + 1 closes the gate.""" + + # Head at slot 10, wall clock at slot 15: lag is 5. + _replace_head_at_slot(sync_service, Slot(10)) + _add_block_at_slot(sync_service, Slot(15)) + service = _build_gate_service(sync_service) + + assert not service._is_synced_for_duties(Slot(10 + SYNC_LAG_THRESHOLD + 1), "block") + + def test_hysteresis_prevents_flap(self, sync_service: SyncService) -> None: + """Closed gate stays closed near the threshold. + + Lag sequence + ------------ + - 5 -> gate closes (lag past threshold of 4). + - 4 -> stays closed (still inside the band). + - 5 -> stays closed (no flap). + - 2 -> reopens (lag at or below 4 - 2). + """ + + # Initial head at slot 10, fresh local block at slot 20. + # The fresh block keeps the stall escape from masking the band test. + _replace_head_at_slot(sync_service, Slot(10)) + _add_block_at_slot(sync_service, Slot(20)) + service = _build_gate_service(sync_service) + + # Lag = 5: gate closes. + assert not service._is_synced_for_duties(Slot(15), "block") + + # Lag = 4: stays closed because the band requires lag <= 2. + _replace_head_at_slot(sync_service, Slot(11)) + _add_block_at_slot(sync_service, Slot(20)) + assert not service._is_synced_for_duties(Slot(15), "block") + + # Lag back to 5: still closed, no flap event. + _replace_head_at_slot(sync_service, Slot(10)) + _add_block_at_slot(sync_service, Slot(20)) + assert not service._is_synced_for_duties(Slot(15), "block") + + # Lag = 2: at or below the 4 - 2 band, gate reopens. + _replace_head_at_slot(sync_service, Slot(13)) + _add_block_at_slot(sync_service, Slot(20)) + assert service._is_synced_for_duties(Slot(15), "block") + + def test_counters_split_block_and_attestation(self, sync_service: SyncService) -> None: + """Counters live on the loop, not on the gate.""" + + # Head 0, wall clock 20, fresh block at 20: gate closes. + _replace_head_at_slot(sync_service, Slot(0)) + _add_block_at_slot(sync_service, Slot(20)) + service = _build_gate_service(sync_service) + + # Invariant: the gate never moves counters. Attribution belongs + # to the run loop. Querying the gate must leave them at zero. + assert not service._is_synced_for_duties(Slot(20), "block") + assert service.blocks_skipped_lag == 0 + assert service.attestations_skipped_lag == 0 + assert service.duty_gate_closed is True + + async def test_run_loop_skips_block_production_when_gated( + self, sync_service: SyncService, key_manager: XmssKeyManager + ) -> None: + """Closed gate at interval 0 skips block production and ticks only the block counter.""" + + # Wall clock at slot 10 interval 0, head stuck at slot 0. + # Fresh local block at slot 10 makes the lag local, not network-wide. + _replace_head_at_slot(sync_service, Slot(0)) + _add_block_at_slot(sync_service, Slot(10)) + clock = SlotClock(genesis_time=Uint64(0), time_fn=lambda: _interval_time(10, 0)) + service = ValidatorService( + sync_service=sync_service, + clock=clock, + registry=_make_registry(key_manager, 0), + ) + + block_calls: list[Slot] = [] + + async def mock_block(_self, slot: Slot) -> None: + block_calls.append(slot) + + async def stop_on_sleep(_d: float) -> None: + service.stop() + + with ( + patch.object(ValidatorService, "_maybe_produce_block", mock_block), + patch("asyncio.sleep", new=stop_on_sleep), + ): + await service.run() + + # Block path bypassed, attestation counter untouched. + assert block_calls == [] + assert service.blocks_skipped_lag >= 1 + assert service.attestations_skipped_lag == 0 + + async def test_run_loop_skips_attestation_when_gated( + self, sync_service: SyncService, key_manager: XmssKeyManager + ) -> None: + """Closed gate at interval 1 skips attestation and leaves the slot retryable. + + Why + --- + Keeping the slot out of the attested set lets the next loop + iteration retry within the same slot if the node catches up + before slot end. + """ + + # Same setup as the block path but advanced to interval 1. + _replace_head_at_slot(sync_service, Slot(0)) + _add_block_at_slot(sync_service, Slot(10)) + clock = SlotClock(genesis_time=Uint64(0), time_fn=lambda: _interval_time(10, 1)) + service = ValidatorService( + sync_service=sync_service, + clock=clock, + registry=_make_registry(key_manager, 0), + ) + + attest_calls: list[Slot] = [] + + async def mock_attest(_self, slot: Slot) -> None: + attest_calls.append(slot) + + async def stop_on_sleep(_d: float) -> None: + service.stop() + + with ( + patch.object(ValidatorService, "_produce_attestations", mock_attest), + patch("asyncio.sleep", new=stop_on_sleep), + ): + await service.run() + + # Attestation skipped, slot retryable, block counter untouched. + assert attest_calls == [] + assert Slot(10) not in service._attested_slots + assert service.attestations_skipped_lag >= 1 + assert service.blocks_skipped_lag == 0 + + def test_gate_logs_only_on_transition( + self, sync_service: SyncService, caplog: pytest.LogCaptureFixture + ) -> None: + """One INFO record per state change, not one per query. + + Fields recorded + --------------- + - duty + - slot + - head_slot + - lag + - max_seen_slot + """ + + # Head at slot 3, fresh block at slot 20. + # Wall clock 20 puts lag at 17 with no stall escape. + _replace_head_at_slot(sync_service, Slot(3)) + _add_block_at_slot(sync_service, Slot(20)) + service = _build_gate_service(sync_service) + + with caplog.at_level("INFO"): + # Two consecutive queries: only the first is a transition. + first = service._is_synced_for_duties(Slot(20), "block") + second = service._is_synced_for_duties(Slot(20), "block") + + assert first is False + assert second is False + + # Exactly one closure record, with the expected fields. + transition_records = [ + r.getMessage() for r in caplog.records if "duty gate closed" in r.getMessage() + ] + assert len(transition_records) == 1 + message = transition_records[0] + assert "duty=block" in message + assert "slot=20" in message + assert "head_slot=3" in message + assert "lag=17" in message + assert "max_seen_slot=20" in message