Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 58 additions & 18 deletions src/lean_spec/subspecs/networking/client/event_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from lean_spec.subspecs.containers import SignedBlockWithAttestation
from lean_spec.subspecs.containers.attestation import SignedAttestation
from lean_spec.subspecs.networking.config import GOSSIPSUB_DEFAULT_PROTOCOL_ID
from lean_spec.subspecs.networking.gossipsub.topic import GossipTopic, TopicKind
from lean_spec.subspecs.networking.reqresp.message import Status
from lean_spec.subspecs.networking.service.events import (
Expand All @@ -36,6 +37,7 @@
ConnectionManager,
YamuxConnection,
)
from lean_spec.subspecs.networking.varint import encode as encode_varint

from .reqresp_client import ReqRespClient

Expand All @@ -56,20 +58,7 @@ class LiveNetworkEventSource:
- Accept incoming connections and emit PeerConnectedEvent
- Dial outbound connections and emit PeerConnectedEvent
- Exchange Status messages and emit PeerStatusEvent
- (Future) Handle gossip messages and emit GossipBlockEvent/GossipAttestationEvent

Usage
-----
::

event_source = LiveNetworkEventSource.create(connection_manager)

# Dial bootnodes
await event_source.dial("/ip4/127.0.0.1/tcp/9000")

# Consume events
async for event in event_source:
await handle_event(event)
- Publish locally-produced blocks and attestations to the gossip network
"""

connection_manager: ConnectionManager
Expand Down Expand Up @@ -274,10 +263,6 @@ def stop(self) -> None:
"""Stop the event source."""
self._running = False

# =========================================================================
# Gossip Message Handling (placeholder for future implementation)
# =========================================================================

async def _emit_gossip_block(
self,
block: SignedBlockWithAttestation,
Expand Down Expand Up @@ -309,3 +294,58 @@ async def _emit_gossip_attestation(
await self._events.put(
GossipAttestationEvent(attestation=attestation, peer_id=peer_id, topic=topic)
)

async def publish(self, topic: str, data: bytes) -> None:
"""
Broadcast a message to all connected peers on a topic.

Used by NetworkService to publish locally-produced blocks and
attestations to the gossip network.

Args:
topic: Gossip topic string.
data: Compressed message bytes (SSZ + Snappy).
"""
if not self._connections:
logger.debug("No peers connected, cannot publish to %s", topic)
return

for peer_id, conn in list(self._connections.items()):
try:
await self._send_gossip_message(conn, topic, data)
except Exception as e:
logger.warning("Failed to publish to peer %s: %s", peer_id, e)

async def _send_gossip_message(
self,
conn: YamuxConnection,
topic: str,
data: bytes,
) -> None:
"""
Send a gossip message to a peer.

Opens a new stream for the gossip message and sends the data.

Args:
conn: Connection to the peer.
topic: Topic string for the message.
data: Message bytes to send.
"""
# Open a new outbound stream for gossip protocol.
stream = await conn.open_stream(GOSSIPSUB_DEFAULT_PROTOCOL_ID)

try:
# Format: topic length (varint) + topic + data length (varint) + data
topic_bytes = topic.encode("utf-8")

# Write topic length and topic.
await stream.write(encode_varint(len(topic_bytes)))
await stream.write(topic_bytes)

# Write data length and data.
await stream.write(encode_varint(len(data)))
await stream.write(data)

finally:
await stream.close()
38 changes: 33 additions & 5 deletions src/lean_spec/subspecs/networking/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from typing_extensions import Final

from lean_spec.types.byte_arrays import Bytes4
from lean_spec.types.byte_arrays import Bytes1

from .types import DomainType

Expand Down Expand Up @@ -32,8 +32,36 @@

# --- Gossip Message Domains ---

MESSAGE_DOMAIN_INVALID_SNAPPY: Final[DomainType] = Bytes4(b"\x00\x00\x00\x00")
"""4-byte domain for gossip message-id isolation of invalid snappy messages."""
MESSAGE_DOMAIN_INVALID_SNAPPY: Final[DomainType] = Bytes1(b"\x00")
"""1-byte domain for gossip message-id isolation of invalid snappy messages.

MESSAGE_DOMAIN_VALID_SNAPPY: Final[DomainType] = Bytes4(b"\x01\x00\x00\x00")
"""4-byte domain for gossip message-id isolation of valid snappy messages."""
Per Ethereum spec, prepended to the message hash when decompression fails.
"""

MESSAGE_DOMAIN_VALID_SNAPPY: Final[DomainType] = Bytes1(b"\x01")
"""1-byte domain for gossip message-id isolation of valid snappy messages.

Per Ethereum spec, prepended to the message hash when decompression succeeds.
"""

# --- Gossipsub Protocol IDs ---

GOSSIPSUB_PROTOCOL_ID_V10: Final[str] = "/meshsub/1.0.0"
"""Gossipsub v1.0 protocol ID - basic mesh pubsub."""

GOSSIPSUB_PROTOCOL_ID_V11: Final[str] = "/meshsub/1.1.0"
"""Gossipsub v1.1 protocol ID - peer scoring, extended validators.

This is the minimum version required by the Ethereum consensus spec.
"""

GOSSIPSUB_PROTOCOL_ID_V12: Final[str] = "/meshsub/1.2.0"
"""Gossipsub v1.2 protocol ID - IDONTWANT bandwidth optimization."""

GOSSIPSUB_DEFAULT_PROTOCOL_ID: Final[str] = GOSSIPSUB_PROTOCOL_ID_V11
"""
Default protocol ID per Ethereum consensus spec requirements.

The Ethereum consensus P2P spec states:
"Clients MUST support the gossipsub v1 libp2p Protocol including the gossipsub v1.1 extension."
"""
76 changes: 75 additions & 1 deletion src/lean_spec/subspecs/networking/enr/enr.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,24 @@
- EIP-778: https://eips.ethereum.org/EIPS/eip-778
"""

from __future__ import annotations

import base64
from typing import ClassVar

from typing_extensions import Self

from lean_spec.subspecs.networking.types import Multiaddr, NodeId, SeqNumber
from lean_spec.types import StrictBaseModel
from lean_spec.types import RLPDecodingError, StrictBaseModel, Uint64
from lean_spec.types.rlp import decode_list as rlp_decode_list

from . import keys
from .eth2 import AttestationSubnets, Eth2Data
from .keys import EnrKey

ENR_PREFIX = "enr:"
"""Text prefix for ENR strings."""


class ENR(StrictBaseModel):
r"""
Expand Down Expand Up @@ -218,3 +227,68 @@ def __str__(self) -> str:
if eth2 := self.eth2_data:
parts.append(f"fork={eth2.fork_digest.hex()}")
return ", ".join(parts) + ")"

@classmethod
def from_string(cls, enr_text: str) -> Self:
"""
Parse an ENR from its text representation.

Text format is URL-safe base64 with `enr:` prefix.

Args:
enr_text: ENR string (e.g., "enr:-IS4Q...")

Returns:
Parsed ENR instance.

Raises:
ValueError: If the string is malformed or RLP decoding fails.
"""
if not enr_text.startswith(ENR_PREFIX):
raise ValueError(f"ENR must start with '{ENR_PREFIX}'")

# Extract base64url content after prefix.
b64_content = enr_text[len(ENR_PREFIX) :]

# Base64url decode (add padding if needed).
#
# Python's base64.urlsafe_b64decode requires proper padding.
padding = 4 - (len(b64_content) % 4)
if padding != 4:
b64_content += "=" * padding

try:
rlp_data = base64.urlsafe_b64decode(b64_content)
except Exception as e:
raise ValueError(f"Invalid base64 encoding: {e}") from e

# RLP decode: [signature, seq, k1, v1, k2, v2, ...]
try:
items = rlp_decode_list(rlp_data)
except RLPDecodingError as e:
raise ValueError(f"Invalid RLP encoding: {e}") from e

if len(items) < 2:
raise ValueError("ENR must have at least signature and seq")

if len(items) % 2 != 0:
raise ValueError("ENR key/value pairs must be even")

signature = items[0]
seq_bytes = items[1]
seq = int.from_bytes(seq_bytes, "big") if seq_bytes else 0

# Parse key/value pairs.
#
# Keys are strings, values are arbitrary bytes.
pairs: dict[str, bytes] = {}
for i in range(2, len(items), 2):
key = items[i].decode("utf-8")
value = items[i + 1]
pairs[key] = value

return cls(
signature=signature,
seq=Uint64(seq),
pairs=pairs,
)
3 changes: 2 additions & 1 deletion src/lean_spec/subspecs/networking/gossipsub/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from __future__ import annotations

from lean_spec.subspecs.chain.config import DEVNET_CONFIG
from lean_spec.subspecs.networking.config import GOSSIPSUB_DEFAULT_PROTOCOL_ID
from lean_spec.types import StrictBaseModel


Expand All @@ -71,7 +72,7 @@ class GossipsubParameters(StrictBaseModel):
Default values follow the Ethereum consensus P2P specification.
"""

protocol_id: str = "/meshsub/1.3.0"
protocol_id: str = GOSSIPSUB_DEFAULT_PROTOCOL_ID
"""The protocol ID for gossip messages."""

# -------------------------------------------------------------------------
Expand Down
15 changes: 14 additions & 1 deletion src/lean_spec/subspecs/networking/service/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ class NetworkEventSource(Protocol):
Abstract source of network events.

This protocol defines the interface that network implementations must
provide. It is an async iterator that yields NetworkEvent objects.
provide. It is an async iterator that yields NetworkEvent objects and
supports publishing outbound messages.

Any class that implements async iteration over NetworkEvent can serve
as a source.
Expand Down Expand Up @@ -161,3 +162,15 @@ async def __anext__(self) -> NetworkEvent:
StopAsyncIteration: When no more events will arrive.
"""
...

async def publish(self, topic: str, data: bytes) -> None:
"""
Publish a message to all connected peers on a topic.

Used to broadcast locally-produced blocks and attestations.

Args:
topic: Gossip topic string.
data: Message bytes to publish.
"""
...
60 changes: 54 additions & 6 deletions src/lean_spec/subspecs/networking/service/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

1. Consumes events from an abstract source (async iterator)
2. Routes each event to the appropriate sync handler
3. Runs until stopped or the source exhausts
3. Publishes locally-produced blocks and attestations to peers
4. Runs until stopped or the source exhausts

This means:
- The network layer produces events,
Expand All @@ -21,9 +22,15 @@

from __future__ import annotations

import logging
from dataclasses import dataclass, field
from typing import TYPE_CHECKING

from lean_spec.snappy import frame_compress
from lean_spec.subspecs.containers import SignedBlockWithAttestation
from lean_spec.subspecs.containers.attestation import SignedAttestation
from lean_spec.subspecs.networking.gossipsub.topic import GossipTopic

from .events import (
GossipAttestationEvent,
GossipBlockEvent,
Expand All @@ -37,20 +44,24 @@
if TYPE_CHECKING:
from lean_spec.subspecs.sync import SyncService

logger = logging.getLogger(__name__)


@dataclass(slots=True)
class NetworkService:
"""
Routes network events to the sync service.
Routes network events to the sync service and publishes outbound messages.

This service:

This service is intentionally minimal. It does not:
- Routes inbound events to sync handlers
- Publishes locally-produced blocks and attestations to the network

It does not:

- Manage connections (libp2p handles this)
- Score peers (libp2p gossipsub handles this)
- Buffer events (async iteration provides backpressure)
- Produce outbound messages (validators need this, not sync)

It only routes inbound events to the appropriate handlers.
"""

sync_service: SyncService
Expand All @@ -59,6 +70,9 @@ class NetworkService:
event_source: NetworkEventSource
"""Source of network events (libp2p wrapper or test mock)."""

fork_digest: str = field(default="0x00000000")
"""Fork digest for gossip topics (4-byte hex string)."""

_running: bool = field(default=False, repr=False)
"""Whether the event loop is running."""

Expand Down Expand Up @@ -162,3 +176,37 @@ def is_running(self) -> bool:
def events_processed(self) -> int:
"""Total events processed since creation."""
return self._events_processed

async def publish_block(self, block: SignedBlockWithAttestation) -> None:
"""
Publish a block to the gossip network.

Encodes the block as SSZ, compresses with Snappy, and broadcasts
to all connected peers on the block topic.

Args:
block: Signed block to publish.
"""
topic = GossipTopic.block(self.fork_digest)
ssz_bytes = block.encode_bytes()
compressed = frame_compress(ssz_bytes)

await self.event_source.publish(str(topic), compressed)
logger.debug("Published block at slot %s", block.message.block.slot)

async def publish_attestation(self, attestation: SignedAttestation) -> None:
"""
Publish an attestation to the gossip network.

Encodes the attestation as SSZ, compresses with Snappy, and broadcasts
to all connected peers on the attestation topic.

Args:
attestation: Signed attestation to publish.
"""
topic = GossipTopic.attestation(self.fork_digest)
ssz_bytes = attestation.encode_bytes()
compressed = frame_compress(ssz_bytes)

await self.event_source.publish(str(topic), compressed)
logger.debug("Published attestation for slot %s", attestation.message.slot)
Loading
Loading