Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions src/lean_spec/subspecs/validator/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""Validator duty-gate thresholds.
Informative, not normative:
- Shape when this node signs.
- Do not change what consensus accepts.
- Clients may diverge without breaking interop.
"""

from typing import Final

SYNC_LAG_THRESHOLD: Final[int] = 4
"""Slot lag past which the local view is too stale to sign.
Why:
We justify and finalize within a handful of slots.
A 4-slot lag is one full justification window behind real time.
A vote from that view lands on a subtree the network has left.
"""

NETWORK_STALL_THRESHOLD: Final[int] = 8
"""Slot lag past which the whole network is treated as stalled.
Why:
Set to twice the local threshold (8 = 2 * 4).
Ordinary jitter at the local boundary must not trip this branch.
Effect:
Even the freshest locally validated block is 8 slots behind.
The cause is a streak of skipped proposals, not this node lagging.
Duties stay live so the chain can advance through the gap.
"""

HYSTERESIS_BAND: Final[int] = 2
"""Slot band that holds the gate closed near the threshold.
Why:
Without a band a single late gossip block flips the decision.
Slot-over-slot flips would stutter the attestation stream.
Effect:
Once closed, the gate reopens only when lag drops to 4 - 2 = 2.
"""
167 changes: 156 additions & 11 deletions src/lean_spec/subspecs/validator/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from lean_spec.subspecs.xmss.containers import Signature
from lean_spec.types import Bytes32, Slot, Uint64, ValidatorIndex

from .constants import HYSTERESIS_BAND, NETWORK_STALL_THRESHOLD, SYNC_LAG_THRESHOLD
from .registry import ValidatorEntry, ValidatorRegistry

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -111,6 +112,15 @@ class ValidatorService:
_attested_slots: set[Slot] = field(default_factory=set, repr=False)
"""Slots for which we've already produced attestations (prevents duplicates)."""

_blocks_skipped_lag: int = field(default=0, repr=False)
"""Block proposals skipped because the local view was too stale."""

_attestations_skipped_lag: int = field(default=0, repr=False)
"""Attestations skipped because the local view was too stale."""

_duty_gate_closed: bool = field(default=False, repr=False)
"""Hysteresis flag. True while signing is silenced."""

async def run(self) -> None:
"""
Main loop - check duties every interval.
Expand Down Expand Up @@ -168,7 +178,10 @@ async def run(self) -> None:
#
# Check if any of our validators is the proposer.
logger.debug("ValidatorService: checking block production for slot %d", slot)
await self._maybe_produce_block(slot)
if self._is_synced_for_duties(slot, "block"):
await self._maybe_produce_block(slot)
else:
self._blocks_skipped_lag += 1
logger.debug("ValidatorService: done block production check for slot %d", slot)

# Re-fetch interval after block production.
Expand All @@ -191,22 +204,41 @@ async def run(self) -> None:
slot,
slot in self._attested_slots,
)
if interval >= Uint64(1) and slot not in self._attested_slots:
# Decide whether this iteration owes an attestation.
#
# Two conditions:
#
# - Interval has reached the attestation slot (>= 1).
# - This slot has not already been attested.
#
# Why split eligibility from the sync gate: the skip counter
# must only tick on real misses, never on wrong-interval
# iterations.
needs_attestation = interval >= Uint64(1) and slot not in self._attested_slots
if needs_attestation:
logger.debug(
"ValidatorService: producing attestations for slot %d (interval %d)",
slot,
interval,
)
await self._produce_attestations(slot)
logger.debug("ValidatorService: done producing attestations for slot %d", slot)
self._attested_slots.add(slot)

# Prune old entries to prevent unbounded growth.
# Apply the sync gate.
#
# Keep only recent slots (current slot - 4) to bound memory usage.
# We never need to attest for slots that far in the past.
prune_threshold = Slot(max(0, int(slot) - 4))
self._attested_slots = {s for s in self._attested_slots if s >= prune_threshold}
# Invariant: a gated slot stays out of the attested set.
# If the node catches up before the slot ends, the next
# iteration retries the duty.
if self._is_synced_for_duties(slot, "attestation"):
await self._produce_attestations(slot)
logger.debug("ValidatorService: done producing attestations for slot %d", slot)
self._attested_slots.add(slot)

# Prune old entries to bound memory.
#
# Keep only slots at or after (current slot - 4).
# Older slots are no longer attestable.
prune_threshold = Slot(max(0, int(slot) - 4))
self._attested_slots = {s for s in self._attested_slots if s >= prune_threshold}
else:
self._attestations_skipped_lag += 1

# Intervals 2-4 have no additional validator duties.

Expand Down Expand Up @@ -498,6 +530,104 @@ def _sign_with_key(
self.registry.add(updated_entry)
return updated_entry, signature

def _is_synced_for_duties(
self,
slot: Slot,
duty: Literal["block", "attestation"],
) -> bool:
"""Decide whether duties may run for the given slot.

Combines local lag and local-store stall evidence with
hysteresis. Returns False only when the local view is stale
relative to a network that is otherwise making progress.

Args:
slot: Wall-clock slot for which a duty would run.
duty: Tag for the transition log.

Returns:
True when duties should run, False to silence them.
"""
store = self.sync_service.store
head_block = store.blocks.get(store.head)

# No head: nothing to compare against, let downstream code no-op.
if head_block is None:
return True

head_slot = head_block.slot

# Saturate at zero lag when the head is ahead of wall clock.
#
# Why:
# Local clock drift is normal. Unconditional trust would let
# a chain 100 slots in the future bypass every check.
lag = 0 if head_slot >= slot else int(slot - head_slot)

# Local stall evidence from the block map.
#
# Why:
# Only blocks with valid signatures enter the map, so the
# freshest entry is an authenticated lower bound on the
# network tip. A stale max here means the network is not
# producing.
max_seen_slot = max(
(b.slot for b in store.blocks.values()),
default=head_slot,
)
network_lag = 0 if max_seen_slot >= slot else int(slot - max_seen_slot)
network_stalling = network_lag > NETWORK_STALL_THRESHOLD

# Decision matrix:
#
# - Network stalling: keep signing, reopen if currently closed.
# - Gate closed: reopen only when lag drops to 4 - 2 = 2.
# - Gate open: close as soon as lag crosses 4.
if network_stalling:
allow = True
if self._duty_gate_closed:
self._duty_gate_closed = False
logger.info(
"Validator duty gate reopened: network stall detected. "
"duty=%s slot=%d head_slot=%d lag=%d max_seen_slot=%d network_lag=%d",
duty,
int(slot),
int(head_slot),
lag,
int(max_seen_slot),
network_lag,
)
elif self._duty_gate_closed:
# Hysteresis: reopen only well below the threshold.
allow = lag <= SYNC_LAG_THRESHOLD - HYSTERESIS_BAND
if allow:
self._duty_gate_closed = False
logger.info(
"Validator duty gate reopened: local view caught up. "
"duty=%s slot=%d head_slot=%d lag=%d",
duty,
int(slot),
int(head_slot),
lag,
)
else:
# Open gate: close once the local threshold is crossed.
allow = lag <= SYNC_LAG_THRESHOLD
if not allow:
self._duty_gate_closed = True
logger.info(
"Validator duty gate closed: local view is stale. "
"duty=%s slot=%d head_slot=%d lag=%d max_seen_slot=%d network_lag=%d",
duty,
int(slot),
int(head_slot),
lag,
int(max_seen_slot),
network_lag,
)

return allow

def stop(self) -> None:
"""
Stop the service.
Expand All @@ -521,3 +651,18 @@ def blocks_produced(self) -> int:
def attestations_produced(self) -> int:
"""Total attestations produced since creation."""
return self._attestations_produced

@property
def blocks_skipped_lag(self) -> int:
"""Block proposals skipped because the local view was too stale."""
return self._blocks_skipped_lag

@property
def attestations_skipped_lag(self) -> int:
"""Attestations skipped because the local view was too stale."""
return self._attestations_skipped_lag

@property
def duty_gate_closed(self) -> bool:
"""True while the sync-lag gate is silencing duties."""
return self._duty_gate_closed
Loading
Loading