diff --git a/cmd/ethrex/cli.rs b/cmd/ethrex/cli.rs index 3e22b96fdb8..8fa0a6052e6 100644 --- a/cmd/ethrex/cli.rs +++ b/cmd/ethrex/cli.rs @@ -15,10 +15,8 @@ use ethrex_blockchain::{ }; use ethrex_common::types::{Block, DEFAULT_BUILDER_GAS_CEIL, Genesis, validate_block_body}; use ethrex_p2p::{ - discv4::{peer_table::TARGET_PEERS, server::INITIAL_LOOKUP_INTERVAL_MS}, - sync::SyncMode, - tx_broadcaster::BROADCAST_INTERVAL_MS, - types::Node, + discv4::server::INITIAL_LOOKUP_INTERVAL_MS, peer_table::TARGET_PEERS, sync::SyncMode, + tx_broadcaster::BROADCAST_INTERVAL_MS, types::Node, }; use ethrex_rlp::encode::RLPEncode; use ethrex_storage::error::StoreError; diff --git a/cmd/ethrex/ethrex.rs b/cmd/ethrex/ethrex.rs index b7a33b0fe66..a11027822ef 100644 --- a/cmd/ethrex/ethrex.rs +++ b/cmd/ethrex/ethrex.rs @@ -4,7 +4,7 @@ use ethrex::{ initializers::{init_l1, init_tracing}, utils::{NodeConfigFile, get_client_version, is_memory_datadir, store_node_config_file}, }; -use ethrex_p2p::{discv4::peer_table::PeerTable, types::NodeRecord}; +use ethrex_p2p::{peer_table::PeerTable, types::NodeRecord}; use serde::Deserialize; use std::{path::Path, time::Duration}; use tokio::signal::unix::{SignalKind, signal}; diff --git a/cmd/ethrex/initializers.rs b/cmd/ethrex/initializers.rs index 5afd4d19a9f..7f554fd1c19 100644 --- a/cmd/ethrex/initializers.rs +++ b/cmd/ethrex/initializers.rs @@ -14,9 +14,9 @@ use ethrex_metrics::profiling::{FunctionProfilingLayer, initialize_block_process use ethrex_metrics::rpc::initialize_rpc_metrics; use ethrex_p2p::rlpx::initiator::RLPxInitiator; use ethrex_p2p::{ - discv4::peer_table::PeerTable, network::P2PContext, peer_handler::PeerHandler, + peer_table::PeerTable, sync::SyncMode, sync_manager::SyncManager, types::{Node, NodeRecord}, diff --git a/cmd/ethrex/utils.rs b/cmd/ethrex/utils.rs index fc02af32099..c86550c0c70 100644 --- a/cmd/ethrex/utils.rs +++ b/cmd/ethrex/utils.rs @@ -3,7 +3,7 @@ use bytes::Bytes; use directories::ProjectDirs; use ethrex_common::types::{Block, Genesis}; use ethrex_p2p::{ - discv4::peer_table::PeerTable, + peer_table::PeerTable, sync::SyncMode, types::{Node, NodeRecord}, }; diff --git a/crates/common/rlp/structs.rs b/crates/common/rlp/structs.rs index 69950b0ee6a..98827a902c7 100644 --- a/crates/common/rlp/structs.rs +++ b/crates/common/rlp/structs.rs @@ -117,6 +117,10 @@ impl<'a> Decoder<'a> { pub const fn finish_unchecked(self) -> &'a [u8] { self.remaining } + + pub const fn get_payload_len(&self) -> usize { + self.payload.len() + } } fn field_decode_error(field_name: &str, err: RLPDecodeError) -> RLPDecodeError { diff --git a/crates/networking/p2p/discv4/server.rs b/crates/networking/p2p/discv4/server.rs index c5dea9a35b0..f2909a3d067 100644 --- a/crates/networking/p2p/discv4/server.rs +++ b/crates/networking/p2p/discv4/server.rs @@ -5,9 +5,9 @@ use crate::{ ENRRequestMessage, ENRResponseMessage, FindNodeMessage, Message, NeighborsMessage, Packet, PacketDecodeErr, PingMessage, PongMessage, }, - peer_table::{Contact, OutMessage as PeerTableOutMessage, PeerTable, PeerTableError}, }, metrics::METRICS, + peer_table::{Contact, OutMessage as PeerTableOutMessage, PeerTable, PeerTableError}, types::{Endpoint, Node, NodeRecord}, utils::{ get_msg_expiration_from_seconds, is_msg_expired, node_id, public_key_from_signing_key, @@ -98,7 +98,7 @@ impl DiscoveryServer { storage: Store, local_node: Node, signer: SecretKey, - udp_socket: Arc, + udp_socket: UdpSocket, mut peer_table: PeerTable, bootnodes: Vec, initial_lookup_interval: f64, @@ -117,7 +117,7 @@ impl DiscoveryServer { local_node: local_node.clone(), local_node_record, signer, - udp_socket, + udp_socket: Arc::new(udp_socket), store: storage.clone(), peer_table: peer_table.clone(), find_node_message: Self::random_message(&signer), diff --git a/crates/networking/p2p/discv5/codec.rs b/crates/networking/p2p/discv5/codec.rs index 083ff824884..d3680ef8ab2 100644 --- a/crates/networking/p2p/discv5/codec.rs +++ b/crates/networking/p2p/discv5/codec.rs @@ -1,52 +1,17 @@ use crate::discv5::messages::{Packet, PacketCodecError}; -use crate::discv5::session::Session; use bytes::BytesMut; use ethrex_common::H256; -use rand::{RngCore, thread_rng}; use tokio_util::codec::{Decoder, Encoder}; #[derive(Debug)] pub struct Discv5Codec { - dest_id: H256, - /// Outgoing message count, used for nonce generation as per the spec. - counter: u32, - session: Option, + local_node_id: H256, } impl Discv5Codec { - pub fn new(dest_id: H256) -> Self { - Self { - dest_id, - counter: 0, - session: None, - } - } - - pub fn with_session(dest_id: H256, session: Session) -> Self { - Self { - dest_id, - counter: 0, - session: Some(session), - } - } - - pub fn set_session(&mut self, session: Session) { - self.session = Some(session); - } - - /// Generates a 96-bit AES-GCM nonce - /// ## Spec Recommendation - /// Encode the current outgoing message count into the first 32 bits of the nonce and fill the remaining 64 bits with random data generated - /// by a cryptographically secure random number generator. - pub fn next_nonce(&mut self, rng: &mut R) -> [u8; 12] { - let counter = self.counter; - self.counter = self.counter.wrapping_add(1); - - let mut nonce = [0u8; 12]; - nonce[..4].copy_from_slice(&counter.to_be_bytes()); - rng.fill_bytes(&mut nonce[4..]); - nonce + pub fn new(local_node_id: H256) -> Self { + Self { local_node_id } } } @@ -56,13 +21,8 @@ impl Decoder for Discv5Codec { fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { if !buf.is_empty() { - let key: &[u8; 16] = match &self.session { - Some(session) => session.inbound_key(), - None => &[0; 16], - }; Ok(Some(Packet::decode( - &self.dest_id, - key, + &self.local_node_id, &buf.split_to(buf.len()), )?)) } else { @@ -74,17 +34,8 @@ impl Decoder for Discv5Codec { impl Encoder for Discv5Codec { type Error = PacketCodecError; - fn encode(&mut self, packet: Packet, buf: &mut BytesMut) -> Result<(), Self::Error> { - let masking_iv: u128 = rand::random(); - let mut rng = thread_rng(); - let nonce = self.next_nonce(&mut rng); - // key isnt needed in WHOAREYOU packets - let key = match (&packet, &mut self.session) { - (Packet::WhoAreYou(_), _) => &[][..], - (_, Some(session)) => session.outbound_key(), - (_, None) => return Err(PacketCodecError::SessionNotEstablished), - }; - - packet.encode(buf, masking_iv, &nonce, &self.dest_id, key) + fn encode(&mut self, _packet: Packet, _buf: &mut BytesMut) -> Result<(), Self::Error> { + // We are not going to use Discv5Coded to send messages, only to receive them + unimplemented!(); } } diff --git a/crates/networking/p2p/discv5/messages.rs b/crates/networking/p2p/discv5/messages.rs index 507b7b0135d..66b72471e47 100644 --- a/crates/networking/p2p/discv5/messages.rs +++ b/crates/networking/p2p/discv5/messages.rs @@ -1,5 +1,3 @@ -use std::{array::TryFromSliceError, net::IpAddr}; - use aes::cipher::{KeyIvInit, StreamCipher, StreamCipherError}; use aes_gcm::{Aes128Gcm, KeyInit, aead::AeadMutInPlace}; use bytes::{BufMut, Bytes}; @@ -10,6 +8,7 @@ use ethrex_rlp::{ error::RLPDecodeError, structs::{Decoder, Encoder}, }; +use std::{array::TryFromSliceError, fmt::Display, net::IpAddr}; use crate::types::NodeRecord; @@ -30,6 +29,8 @@ const IV_MASKING_SIZE: usize = 16; // static_header size is 23 bytes const STATIC_HEADER_SIZE: usize = 23; const STATIC_HEADER_END: usize = IV_MASKING_SIZE + STATIC_HEADER_SIZE; +// Number of distances to include in a FindNode message +pub const DISTANCES_PER_FIND_NODE_MSG: u8 = 3; #[derive(Debug, thiserror::Error)] pub enum PacketCodecError { @@ -47,6 +48,8 @@ pub enum PacketCodecError { TryFromSliceError(#[from] TryFromSliceError), #[error("Io Error: {0}")] IoError(#[from] std::io::Error), + #[error("Malformed Data")] + MalformedData, } impl From for PacketCodecError { @@ -56,28 +59,14 @@ impl From for PacketCodecError { } #[derive(Debug, Clone, PartialEq, Eq)] -pub enum Packet { - Ordinary(Ordinary), - WhoAreYou(WhoAreYou), - Handshake(Handshake), -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct PacketHeader { - pub static_header: [u8; STATIC_HEADER_SIZE], - pub flag: u8, - pub nonce: [u8; 12], - pub authdata: Vec, - /// Offset in the encoded packet where authdata ends, i.e where the header ends. - pub header_end_offset: usize, +pub struct Packet { + pub(crate) masking_iv: [u8; IV_MASKING_SIZE], + pub(crate) header: PacketHeader, + pub(crate) encrypted_message: Vec, } impl Packet { - pub fn decode( - dest_id: &H256, - decrypt_key: &[u8; 16], - encoded_packet: &[u8], - ) -> Result { + pub fn decode(dest_id: &H256, encoded_packet: &[u8]) -> Result { if encoded_packet.len() < MIN_PACKET_SIZE || encoded_packet.len() > MAX_PACKET_SIZE { return Err(PacketCodecError::InvalidSize); } @@ -89,74 +78,41 @@ impl Packet { let mut cipher = ::new(dest_id[..16].into(), masking_iv.into()); - let packet_header = Packet::decode_header(&mut cipher, encoded_packet)?; - let encrypted_message = &encoded_packet[packet_header.header_end_offset..]; - - match packet_header.flag { - 0x00 => Ok(Packet::Ordinary(Ordinary::decode( - masking_iv, - &packet_header.static_header, - packet_header.authdata, - &packet_header.nonce, - decrypt_key, - encrypted_message, - )?)), - 0x01 => Ok(Packet::WhoAreYou(WhoAreYou::decode( - &packet_header.authdata, - )?)), - 0x02 => Ok(Packet::Handshake(Handshake::decode( - masking_iv, - packet_header, - decrypt_key, - encrypted_message, - )?)), - _ => Err(RLPDecodeError::MalformedData)?, - } + let header = PacketHeader::decode(&mut cipher, encoded_packet)?; + let encrypted_message = encoded_packet[header.header_end_offset..].to_vec(); + Ok(Packet { + masking_iv: masking_iv.try_into()?, + header, + encrypted_message, + }) } - pub fn encode( - &self, - buf: &mut dyn BufMut, - masking_iv: u128, - nonce: &[u8; 12], - dest_id: &H256, - encrypt_key: &[u8], - ) -> Result<(), PacketCodecError> { - let masking_as_bytes = masking_iv.to_be_bytes(); - buf.put_slice(&masking_as_bytes); + pub fn encode(&self, buf: &mut dyn BufMut, dest_id: &H256) -> Result<(), PacketCodecError> { + let masking_iv = self.masking_iv; + buf.put_slice(&masking_iv); let mut cipher = - ::new(dest_id[..16].into(), masking_as_bytes[..].into()); + ::new(dest_id[..16].into(), masking_iv[..].into()); + + self.header.encode(buf, &mut cipher)?; + buf.put_slice(&self.encrypted_message); - match self { - Packet::Ordinary(ordinary) => { - let (mut static_header, mut authdata, encrypted_message) = - ordinary.encode(nonce, &masking_as_bytes, encrypt_key)?; - - cipher.try_apply_keystream(&mut static_header)?; - buf.put_slice(&static_header); - cipher.try_apply_keystream(&mut authdata)?; - buf.put_slice(&authdata); - buf.put_slice(&encrypted_message); - } - Packet::WhoAreYou(who_are_you) => { - who_are_you.encode_header(buf, &mut cipher, nonce)?; - } - Packet::Handshake(handshake) => { - let (mut static_header, mut authdata, encrypted_message) = - handshake.encode(nonce, &masking_as_bytes, encrypt_key)?; - - cipher.try_apply_keystream(&mut static_header)?; - buf.put_slice(&static_header); - cipher.try_apply_keystream(&mut authdata)?; - buf.put_slice(&authdata); - buf.put_slice(&encrypted_message); - } - } Ok(()) } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PacketHeader { + pub static_header: [u8; STATIC_HEADER_SIZE], + pub flag: u8, + pub nonce: [u8; 12], + pub authdata: Vec, + /// Offset in the encoded packet where authdata ends, i.e where the header ends. + pub header_end_offset: usize, +} - fn decode_header( +impl PacketHeader { + fn decode( cipher: &mut T, encoded_packet: &[u8], ) -> Result { @@ -195,69 +151,111 @@ impl Packet { header_end_offset: authdata_end, }) } -} -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Ordinary { - pub src_id: H256, - pub message: Message, -} + fn encode( + &self, + buf: &mut dyn BufMut, + cipher: &mut T, + ) -> Result<(), PacketCodecError> { + let mut static_header = Vec::new(); + static_header.put_slice(PROTOCOL_ID); + static_header.put_slice(&PROTOCOL_VERSION.to_be_bytes()); + static_header.put_u8(self.flag); + static_header.put_slice(&self.nonce); + static_header.put_slice(&(self.authdata.len() as u16).to_be_bytes()); + cipher.try_apply_keystream(&mut static_header)?; + buf.put_slice(&static_header); + + let mut authdata = self.authdata.clone(); + cipher.try_apply_keystream(&mut authdata)?; + buf.put_slice(&authdata); -impl Ordinary { - fn encode_authdata(&self, buf: &mut dyn BufMut) -> Result<(), PacketCodecError> { - buf.put_slice(self.src_id.as_bytes()); Ok(()) } +} - /// Encodes the ordinary packet returning the header, authdata and encrypted_message - #[allow(clippy::type_complexity)] - fn encode( - &self, - nonce: &[u8; 12], - masking_iv: &[u8], - encrypt_key: &[u8], - ) -> Result<(Vec, Vec, Vec), PacketCodecError> { - if encrypt_key.len() < 16 { - return Err(PacketCodecError::InvalidSize); - } +pub trait PacketTrait { + const TYPE_FLAG: u8; + fn encode_authdata(&self, buf: &mut dyn BufMut) -> Result<(), PacketCodecError>; + fn get_encoded_message(&self) -> Vec; + fn build_header(&self, nonce: &[u8; 12]) -> Result { let mut authdata = Vec::new(); self.encode_authdata(&mut authdata)?; - let authdata_size: u16 = + let authdata_size = u16::try_from(authdata.len()).map_err(|_| PacketCodecError::InvalidSize)?; - let mut static_header = Vec::new(); - static_header.put_slice(PROTOCOL_ID); - static_header.put_slice(&PROTOCOL_VERSION.to_be_bytes()); - static_header.put_u8(0x0); - static_header.put_slice(nonce); - static_header.put_slice(&authdata_size.to_be_bytes()); + let mut static_header: [u8; 23] = [0; 23]; + static_header[0..6].copy_from_slice(PROTOCOL_ID); + static_header[6..8].copy_from_slice(&PROTOCOL_VERSION.to_be_bytes()); + static_header[8] = Self::TYPE_FLAG; + static_header[9..21].copy_from_slice(nonce); + static_header[21..].copy_from_slice(&authdata_size.to_be_bytes()); + let header_end_offset = 16 + authdata.len() + static_header.len(); + Ok(PacketHeader { + static_header, + flag: Self::TYPE_FLAG, + nonce: *nonce, + authdata, + header_end_offset, + }) + } - let mut message = Vec::new(); - self.message.encode(&mut message); + /// Encodes the packet + fn encode( + &self, + nonce: &[u8; 12], + masking_iv: [u8; 16], + encrypt_key: &[u8], + ) -> Result { + if encrypt_key.len() < 16 { + return Err(PacketCodecError::InvalidSize); + } + let header = self.build_header(nonce)?; + let mut message = self.get_encoded_message(); let mut message_ad = masking_iv.to_vec(); - message_ad.extend_from_slice(&static_header); - message_ad.extend_from_slice(&authdata); + message_ad.extend_from_slice(&header.static_header); + message_ad.extend_from_slice(&header.authdata); let mut cipher = Aes128Gcm::new(encrypt_key[..16].into()); cipher - .encrypt_in_place(nonce.into(), &message_ad, &mut message) + .encrypt_in_place(&header.nonce.into(), &message_ad, &mut message) .map_err(|e| PacketCodecError::CipherError(e.to_string()))?; - Ok((static_header, authdata, message)) + Ok(Packet { + masking_iv, + header, + encrypted_message: message, + }) } +} - pub fn decode( - masking_iv: &[u8], - static_header: &[u8; STATIC_HEADER_SIZE], - authdata: Vec, - nonce: &[u8; 12], - decrypt_key: &[u8], - encrypted_message: &[u8], - ) -> Result { - if authdata.len() != 32 { +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Ordinary { + pub src_id: H256, + pub message: Message, +} + +impl PacketTrait for Ordinary { + const TYPE_FLAG: u8 = 0x00; + + fn encode_authdata(&self, buf: &mut dyn BufMut) -> Result<(), PacketCodecError> { + buf.put_slice(self.src_id.as_bytes()); + Ok(()) + } + + fn get_encoded_message(&self) -> Vec { + let mut message = Vec::new(); + self.message.encode(&mut message); + message + } +} + +impl Ordinary { + pub fn decode(packet: &Packet, decrypt_key: &[u8]) -> Result { + if packet.header.authdata.len() != 32 { return Err(PacketCodecError::InvalidSize); } if decrypt_key.len() < 16 { @@ -267,14 +265,14 @@ impl Ordinary { // message = aesgcm_encrypt(initiator-key, nonce, message-pt, message-ad) // message-pt = message-type || message-data // message-ad = masking-iv || header - let mut message_ad = masking_iv.to_vec(); - message_ad.extend_from_slice(static_header.as_slice()); - message_ad.extend_from_slice(&authdata); + let mut message_ad = packet.masking_iv.to_vec(); + message_ad.extend_from_slice(packet.header.static_header.as_slice()); + message_ad.extend_from_slice(&packet.header.authdata); - let mut message = encrypted_message.to_vec(); - Self::decrypt(decrypt_key, nonce, &mut message, message_ad)?; + let mut message = packet.encrypted_message.to_vec(); + Self::decrypt(decrypt_key, &packet.header.nonce, &mut message, message_ad)?; - let src_id = H256::from_slice(&authdata); + let src_id = H256::from_slice(&packet.header.authdata); let message = Message::decode(&message)?; Ok(Ordinary { src_id, message }) @@ -300,36 +298,38 @@ pub struct WhoAreYou { pub enr_seq: u64, } -impl WhoAreYou { - fn encode_header( - &self, - buf: &mut dyn BufMut, - cipher: &mut T, - nonce: &[u8], - ) -> Result<(), PacketCodecError> { - let mut static_header = Vec::new(); - static_header.put_slice(PROTOCOL_ID); - static_header.put_slice(&PROTOCOL_VERSION.to_be_bytes()); - static_header.put_u8(0x01); - static_header.put_slice(nonce); - static_header.put_slice(&24u16.to_be_bytes()); - cipher.try_apply_keystream(&mut static_header)?; - buf.put_slice(&static_header); - - let mut authdata = Vec::new(); - self.encode(&mut authdata); - cipher.try_apply_keystream(&mut authdata)?; - buf.put_slice(&authdata); +impl PacketTrait for WhoAreYou { + const TYPE_FLAG: u8 = 0x01; + fn encode_authdata(&self, buf: &mut dyn BufMut) -> Result<(), PacketCodecError> { + buf.put_slice(&self.id_nonce.to_be_bytes()); + buf.put_slice(&self.enr_seq.to_be_bytes()); Ok(()) } - fn encode(&self, buf: &mut dyn BufMut) { - buf.put_slice(&self.id_nonce.to_be_bytes()); - buf.put_slice(&self.enr_seq.to_be_bytes()); + fn get_encoded_message(&self) -> Vec { + Vec::new() + } + + /// Encodes the WhoAreYou packet. + /// No encryption needed, just an empty message + fn encode( + &self, + nonce: &[u8; 12], + masking_iv: [u8; 16], + _encrypt_key: &[u8], + ) -> Result { + Ok(Packet { + masking_iv, + header: self.build_header(nonce)?, + encrypted_message: Vec::new(), + }) } +} - pub fn decode(authdata: &[u8]) -> Result { +impl WhoAreYou { + pub fn decode(packet: &Packet) -> Result { + let authdata = packet.header.authdata.clone(); let id_nonce = u128::from_be_bytes(authdata[..16].try_into()?); let enr_seq = u64::from_be_bytes(authdata[16..].try_into()?); @@ -348,7 +348,9 @@ pub struct Handshake { pub message: Message, } -impl Handshake { +impl PacketTrait for Handshake { + const TYPE_FLAG: u8 = 0x02; + fn encode_authdata(&self, buf: &mut dyn BufMut) -> Result<(), PacketCodecError> { let sig_size: u8 = self .id_signature @@ -373,53 +375,15 @@ impl Handshake { Ok(()) } - /// Encodes the handshake returning the header, authdata and encrypted_message - #[allow(clippy::type_complexity)] - fn encode( - &self, - nonce: &[u8; 12], - masking_iv: &[u8], - encrypt_key: &[u8], - ) -> Result<(Vec, Vec, Vec), PacketCodecError> { - let mut authdata = Vec::new(); - self.encode_authdata(&mut authdata)?; - - let authdata_size = - u16::try_from(authdata.len()).map_err(|_| PacketCodecError::InvalidSize)?; - - let mut static_header = Vec::new(); - static_header.put_slice(PROTOCOL_ID); - static_header.put_slice(&PROTOCOL_VERSION.to_be_bytes()); - static_header.put_u8(0x02); - static_header.put_slice(nonce); - static_header.put_slice(&authdata_size.to_be_bytes()); - + fn get_encoded_message(&self) -> Vec { let mut message = Vec::new(); self.message.encode(&mut message); - - if encrypt_key.len() < 16 { - return Err(PacketCodecError::InvalidSize); - } - - let mut message_ad = masking_iv.to_vec(); - message_ad.extend_from_slice(&static_header); - message_ad.extend_from_slice(&authdata); - - let mut cipher = Aes128Gcm::new(encrypt_key[..16].into()); - cipher - .encrypt_in_place(nonce.into(), &message_ad, &mut message) - .map_err(|e| PacketCodecError::CipherError(e.to_string()))?; - - Ok((static_header, authdata, message)) + message } +} - #[allow(clippy::too_many_arguments)] - pub fn decode( - masking_iv: &[u8], - header: PacketHeader, - decrypt_key: &[u8], - encrypted_message: &[u8], - ) -> Result { +impl Handshake { + pub fn decode(packet: &Packet, decrypt_key: &[u8]) -> Result { if decrypt_key.len() < 16 { return Err(PacketCodecError::InvalidSize); } @@ -428,7 +392,7 @@ impl Handshake { nonce, authdata, .. - } = header; + } = &packet.header; if authdata.len() < HANDSHAKE_AUTHDATA_HEAD { return Err(PacketCodecError::InvalidSize); @@ -445,6 +409,17 @@ impl Handshake { let id_signature = authdata[HANDSHAKE_AUTHDATA_HEAD..HANDSHAKE_AUTHDATA_HEAD + sig_size].to_vec(); + + // TODO + // When node B receives the handshake message packet, it first loads the node record and WHOAREYOU challenge which it sent and stored earlier. + // + // If node B did not have the node record of node A, the handshake message packet must contain a node record. + // A record may also be present if node A determined that its record is newer than B's current copy. + // If the packet contains a node record, B must first validate it by checking the record's signature. + // + // Node B then verifies the id-signature against the identity public key of A's record. + // SECP256K1.verify_ecdsa(msg, sig, pk); + let eph_key_start = HANDSHAKE_AUTHDATA_HEAD + sig_size; let eph_pubkey = authdata[eph_key_start..authdata_head].to_vec(); @@ -459,12 +434,12 @@ impl Handshake { None }; - let mut message_ad = masking_iv.to_vec(); - message_ad.extend_from_slice(&static_header); - message_ad.extend_from_slice(&authdata); + let mut message_ad = packet.masking_iv.to_vec(); + message_ad.extend_from_slice(static_header); + message_ad.extend_from_slice(authdata); - let mut message = encrypted_message.to_vec(); - Ordinary::decrypt(decrypt_key, &nonce, &mut message, message_ad)?; + let mut message = packet.encrypted_message.to_vec(); + Ordinary::decrypt(decrypt_key, nonce, &mut message, message_ad)?; let message = Message::decode(&message)?; Ok(Handshake { @@ -551,6 +526,20 @@ impl Message { } } +impl Display for Message { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Message::Ping(_) => write!(f, "Ping"), + Message::Pong(_) => write!(f, "Pong"), + Message::FindNode(_) => write!(f, "FindNode"), + Message::Nodes(_) => write!(f, "Nodes"), + Message::TalkReq(_) => write!(f, "TalkReq"), + Message::TalkRes(_) => write!(f, "TalkRes"), + Message::Ticket(_) => write!(f, "Ticket"), + } + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct PingMessage { /// The request id of the sender. @@ -622,14 +611,14 @@ impl RLPDecode for PongMessage { #[derive(Debug, Clone, PartialEq, Eq)] pub struct FindNodeMessage { pub req_id: Bytes, - pub distance: Vec, + pub distances: Vec, } impl RLPEncode for FindNodeMessage { fn encode(&self, buf: &mut dyn BufMut) { Encoder::new(buf) .encode_field(&self.req_id) - .encode_field(&self.distance) + .encode_field(&self.distances) .finish(); } } @@ -640,7 +629,13 @@ impl RLPDecode for FindNodeMessage { let (req_id, decoder) = decoder.decode_field("req_id")?; let (distance, decoder) = decoder.decode_field("distance")?; - Ok((Self { req_id, distance }, decoder.finish()?)) + Ok(( + Self { + req_id, + distances: distance, + }, + decoder.finish()?, + )) } } @@ -784,19 +779,19 @@ mod tests { use super::*; use crate::{ discv5::{ - codec::Discv5Codec, - messages::{Message, Ordinary, Packet, PingMessage, WhoAreYou}, + messages::{Message, Ordinary, PingMessage, WhoAreYou}, + session::{build_challenge_data, create_id_signature, derive_session_keys}, }, + rlpx::utils::compress_pubkey, types::NodeRecordPairs, utils::{node_id, public_key_from_signing_key}, }; use aes_gcm::{Aes128Gcm, KeyInit, aead::AeadMutInPlace}; use bytes::BytesMut; - use ethrex_common::H512; + use ethrex_common::{H264, H512}; use hex_literal::hex; use secp256k1::SecretKey; - use std::net::Ipv4Addr; - use tokio_util::codec::Decoder as _; + use std::{net::Ipv4Addr, str::FromStr}; // node-a-key = 0xeef77acb6c6a6eebc5b363a475ac583ec7eccdb42b6481424c60f59aa326547f // node-b-key = 0x66fb62bfbd66b9177a138c1e5cddbe4f7c30c343e94e68df8769459cb1cde628 @@ -809,27 +804,22 @@ mod tests { // )) // .unwrap(); + /// Ping message packet (flag 0) from https://github.com/ethereum/devp2p/blob/master/discv5/discv5-wire-test-vectors.md #[test] - fn aes_gcm_vector() { - // https://github.com/ethereum/devp2p/blob/master/discv5/discv5-wire-test-vectors.md#encryptiondecryption - let key = hex!("9f2d77db7004bf8a1a85107ac686990b"); - let nonce = hex!("27b5af763c446acd2749fe8e"); - let ad = hex!("93a7400fa0d6a694ebc24d5cf570f65d04215b6ac00757875e3f3a5f42107903"); - let mut pt = hex!("01c20101").to_vec(); - - let mut cipher = Aes128Gcm::new_from_slice(&key).unwrap(); - cipher - .encrypt_in_place(nonce.as_slice().into(), &ad, &mut pt) - .unwrap(); + fn decode_ping_packet() { + /* + # src-node-id = 0xaaaa8419e9f49d0083561b48287df592939a8d19947d8c0ef88f2a4856a69fbb + # dest-node-id = 0xbbbb9d047f0488c0b5a93c1c3f2d8bafc7c8ff337024a55434a0d0555de64db9 + # nonce = 0xffffffffffffffffffffffff + # read-key = 0x00000000000000000000000000000000 + # ping.req-id = 0x00000001 + # ping.enr-seq = 2 - assert_eq!( - pt, - hex!("a5d12a2d94b8ccb3ba55558229867dc13bfa3648").to_vec() - ); - } + 00000000000000000000000000000000088b3d4342774649325f313964a39e55 + ea96c005ad52be8c7560413a7008f16c9e6d2f43bbea8814a546b7409ce783d3 + 4c4f53245d08dab84102ed931f66d1492acb308fa1c6715b9d139b81acbdcc + */ - #[test] - fn handshake_packet_roundtrip() { let node_a_key = SecretKey::from_byte_array(&hex!( "eef77acb6c6a6eebc5b363a475ac583ec7eccdb42b6481424c60f59aa326547f" )) @@ -842,121 +832,331 @@ mod tests { let src_id = node_id(&public_key_from_signing_key(&node_a_key)); let dest_id = node_id(&public_key_from_signing_key(&node_b_key)); - let handshake = Handshake { + let encoded = &hex!( + "00000000000000000000000000000000088b3d4342774649325f313964a39e55ea96c005ad52be8c7560413a7008f16c9e6d2f43bbea8814a546b7409ce783d34c4f53245d08dab84102ed931f66d1492acb308fa1c6715b9d139b81acbdcc" + ); + let packet = Packet::decode(&dest_id, encoded).unwrap(); + assert_eq!([0; 16], packet.masking_iv); + assert_eq!(0x00, packet.header.flag); + assert_eq!(hex!("ffffffffffffffffffffffff"), packet.header.nonce); + + // # read-key = 0x00000000000000000000000000000000 + let read_key = [0; 16]; + + let decoded_message = Ordinary::decode(&packet, &read_key).unwrap(); + + let expected_message = Ordinary { src_id, - id_signature: vec![1; 64], - eph_pubkey: vec![2; 33], - record: None, message: Message::Ping(PingMessage { - req_id: Bytes::from_static(&[3]), - enr_seq: 4, + req_id: Bytes::from(hex!("00000001").as_slice()), + enr_seq: 2, }), }; - let key = [0x10; 16]; - let nonce = hex!("000102030405060708090a0b"); - let mut buf = Vec::new(); - let packet = Packet::Handshake(handshake.clone()); - packet.encode(&mut buf, 0, &nonce, &dest_id, &key).unwrap(); - - let decoded = Packet::decode(&dest_id, &key, &buf).unwrap(); - assert_eq!(decoded, Packet::Handshake(handshake)); + assert_eq!(decoded_message, expected_message); } - /// Ping handshake packet (flag 2) from https://github.com/ethereum/devp2p/blob/master/discv5/discv5-wire-test-vectors.md + /// Ping message packet (flag 0) from https://github.com/ethereum/devp2p/blob/master/discv5/discv5-wire-test-vectors.md #[test] - fn handshake_packet_vector_roundtrip() { + fn encode_ping_packet() { /* # src-node-id = 0xaaaa8419e9f49d0083561b48287df592939a8d19947d8c0ef88f2a4856a69fbb # dest-node-id = 0xbbbb9d047f0488c0b5a93c1c3f2d8bafc7c8ff337024a55434a0d0555de64db9 # nonce = 0xffffffffffffffffffffffff - # read-key = 0x4f9fac6de7567d1e3b1241dffe90f662 + # read-key = 0x00000000000000000000000000000000 # ping.req-id = 0x00000001 - # ping.enr-seq = 1 - # - # handshake inputs: - # - # whoareyou.challenge-data = 0x000000000000000000000000000000006469736376350001010102030405060708090a0b0c00180102030405060708090a0b0c0d0e0f100000000000000001 - # whoareyou.request-nonce = 0x0102030405060708090a0b0c - # whoareyou.id-nonce = 0x0102030405060708090a0b0c0d0e0f10 - # whoareyou.enr-seq = 1 - # ephemeral-key = 0x0288ef00023598499cb6c940146d050d2b1fb914198c327f76aad590bead68b6 - # ephemeral-pubkey = 0x039a003ba6517b473fa0cd74aefe99dadfdb34627f90fec6362df85803908f53a5 + # ping.enr-seq = 2 - 00000000000000000000000000000000088b3d4342774649305f313964a39e55 - ea96c005ad521d8c7560413a7008f16c9e6d2f43bbea8814a546b7409ce783d3 - 4c4f53245d08da4bb252012b2cba3f4f374a90a75cff91f142fa9be3e0a5f3ef - 268ccb9065aeecfd67a999e7fdc137e062b2ec4a0eb92947f0d9a74bfbf44dfb - a776b21301f8b65efd5796706adff216ab862a9186875f9494150c4ae06fa4d1 - f0396c93f215fa4ef524f1eadf5f0f4126b79336671cbcf7a885b1f8bd2a5d83 - 9cf8 - */ + 00000000000000000000000000000000088b3d4342774649325f313964a39e55 + ea96c005ad52be8c7560413a7008f16c9e6d2f43bbea8814a546b7409ce783d3 + 4c4f53245d08dab84102ed931f66d1492acb308fa1c6715b9d139b81acbdcc + */ + + let node_a_key = SecretKey::from_byte_array(&hex!( + "eef77acb6c6a6eebc5b363a475ac583ec7eccdb42b6481424c60f59aa326547f" + )) + .unwrap(); let node_b_key = SecretKey::from_byte_array(&hex!( "66fb62bfbd66b9177a138c1e5cddbe4f7c30c343e94e68df8769459cb1cde628" )) .unwrap(); + + let src_id = node_id(&public_key_from_signing_key(&node_a_key)); let dest_id = node_id(&public_key_from_signing_key(&node_b_key)); - let encoded = &hex!( - "00000000000000000000000000000000088b3d4342774649305f313964a39e55ea96c005ad521d8c7560413a7008f16c9e6d2f43bbea8814a546b7409ce783d34c4f53245d08da4bb252012b2cba3f4f374a90a75cff91f142fa9be3e0a5f3ef268ccb9065aeecfd67a999e7fdc137e062b2ec4a0eb92947f0d9a74bfbf44dfba776b21301f8b65efd5796706adff216ab862a9186875f9494150c4ae06fa4d1f0396c93f215fa4ef524f1eadf5f0f4126b79336671cbcf7a885b1f8bd2a5d839cf8" + let message = Ordinary { + src_id, + message: Message::Ping(PingMessage { + req_id: Bytes::from(hex!("00000001").as_slice()), + enr_seq: 2, + }), + }; + + let masking_iv = [0; 16]; + let nonce = hex!("ffffffffffffffffffffffff"); + + // # read-key = 0x00000000000000000000000000000000 + let encrypt_key = [0; 16]; + + let packet = message.encode(&nonce, masking_iv, &encrypt_key).unwrap(); + + let expected_encoded = &hex!( + "00000000000000000000000000000000088b3d4342774649325f313964a39e55ea96c005ad52be8c7560413a7008f16c9e6d2f43bbea8814a546b7409ce783d34c4f53245d08dab84102ed931f66d1492acb308fa1c6715b9d139b81acbdcc" ); - let read_key = hex!("4f9fac6de7567d1e3b1241dffe90f662"); - let packet = Packet::decode(&dest_id, &read_key, encoded).unwrap(); - let handshake = match packet { - Packet::Handshake(hs) => hs, - other => panic!("unexpected packet {other:?}"), - }; + let mut buf = BytesMut::new(); + packet.encode(&mut buf, &dest_id).unwrap(); - assert_eq!( - handshake.src_id, - H256::from_slice(&hex!( - "aaaa8419e9f49d0083561b48287df592939a8d19947d8c0ef88f2a4856a69fbb" - )) + assert_eq!(buf.to_vec(), expected_encoded); + } + + #[test] + fn decode_whoareyou_packet() { + // # src-node-id = 0xaaaa8419e9f49d0083561b48287df592939a8d19947d8c0ef88f2a4856a69fbb + // # dest-node-id = 0xbbbb9d047f0488c0b5a93c1c3f2d8bafc7c8ff337024a55434a0d0555de64db9 + // # whoareyou.challenge-data = 0x000000000000000000000000000000006469736376350001010102030405060708090a0b0c00180102030405060708090a0b0c0d0e0f100000000000000000 + // # whoareyou.request-nonce = 0x0102030405060708090a0b0c + // # whoareyou.id-nonce = 0x0102030405060708090a0b0c0d0e0f10 + // # whoareyou.enr-seq = 0 + // + // 00000000000000000000000000000000088b3d434277464933a1ccc59f5967ad + // 1d6035f15e528627dde75cd68292f9e6c27d6b66c8100a873fcbaed4e16b8d + let node_b_key = SecretKey::from_byte_array(&hex!( + "66fb62bfbd66b9177a138c1e5cddbe4f7c30c343e94e68df8769459cb1cde628" + )) + .unwrap(); + + let dest_id = node_id(&public_key_from_signing_key(&node_b_key)); + + let encoded = &hex!( + "00000000000000000000000000000000088b3d434277464933a1ccc59f5967ad1d6035f15e528627dde75cd68292f9e6c27d6b66c8100a873fcbaed4e16b8d" ); - assert_eq!(handshake.record, None); - assert_eq!( - handshake.eph_pubkey, - hex!("039a003ba6517b473fa0cd74aefe99dadfdb34627f90fec6362df85803908f53a5").to_vec() + + let packet = Packet::decode(&dest_id, encoded).unwrap(); + assert_eq!([0; 16], packet.masking_iv); + assert_eq!(0x01, packet.header.flag); + assert_eq!(hex!("0102030405060708090a0b0c"), packet.header.nonce); + + let challenge_data = build_challenge_data( + &packet.masking_iv, + &packet.header.static_header, + &packet.header.authdata, ); - assert_eq!( - handshake.message, - Message::Ping(PingMessage { - req_id: Bytes::from(hex!("00000001").as_slice()), - enr_seq: 1, - }) + + let expected_challenge_data = &hex!( + "000000000000000000000000000000006469736376350001010102030405060708090a0b0c00180102030405060708090a0b0c0d0e0f100000000000000000" + ); + assert_eq!(challenge_data, expected_challenge_data); + let decoded_message = WhoAreYou::decode(&packet).unwrap(); + + let expected_message = WhoAreYou { + id_nonce: u128::from_be_bytes( + hex!("0102030405060708090a0b0c0d0e0f10") + .to_vec() + .try_into() + .unwrap(), + ), + enr_seq: 0, + }; + + assert_eq!(decoded_message, expected_message); + } + + #[test] + fn encode_whoareyou_packet() { + // # src-node-id = 0xaaaa8419e9f49d0083561b48287df592939a8d19947d8c0ef88f2a4856a69fbb + // # dest-node-id = 0xbbbb9d047f0488c0b5a93c1c3f2d8bafc7c8ff337024a55434a0d0555de64db9 + // # whoareyou.challenge-data = 0x000000000000000000000000000000006469736376350001010102030405060708090a0b0c00180102030405060708090a0b0c0d0e0f100000000000000000 + // # whoareyou.request-nonce = 0x0102030405060708090a0b0c + // # whoareyou.id-nonce = 0x0102030405060708090a0b0c0d0e0f10 + // # whoareyou.enr-seq = 0 + // + // 00000000000000000000000000000000088b3d434277464933a1ccc59f5967ad + // 1d6035f15e528627dde75cd68292f9e6c27d6b66c8100a873fcbaed4e16b8d + let node_b_key = SecretKey::from_byte_array(&hex!( + "66fb62bfbd66b9177a138c1e5cddbe4f7c30c343e94e68df8769459cb1cde628" + )) + .unwrap(); + + let who_are_you = WhoAreYou { + id_nonce: u128::from_be_bytes( + hex!("0102030405060708090a0b0c0d0e0f10") + .to_vec() + .try_into() + .unwrap(), + ), + enr_seq: 0, + }; + + let dest_id = node_id(&public_key_from_signing_key(&node_b_key)); + + let masking_iv = [0; 16]; + let nonce = hex!("0102030405060708090a0b0c"); + + let packet = who_are_you.encode(&nonce, masking_iv, &[]).unwrap(); + + let expected_encoded = &hex!( + "00000000000000000000000000000000088b3d434277464933a1ccc59f5967ad1d6035f15e528627dde75cd68292f9e6c27d6b66c8100a873fcbaed4e16b8d" + ); + + let mut buf = BytesMut::new(); + packet.encode(&mut buf, &dest_id).unwrap(); + + assert_eq!(buf.to_vec(), expected_encoded); + } + + /// Ping handshake packet (flag 2) from https://github.com/ethereum/devp2p/blob/master/discv5/discv5-wire-test-vectors.md + #[test] + fn encode_ping_handshake_packet() { + /* + # src-node-id = 0xaaaa8419e9f49d0083561b48287df592939a8d19947d8c0ef88f2a4856a69fbb + # dest-node-id = 0xbbbb9d047f0488c0b5a93c1c3f2d8bafc7c8ff337024a55434a0d0555de64db9 + # nonce = 0xffffffffffffffffffffffff + # read-key = 0x4f9fac6de7567d1e3b1241dffe90f662 + # ping.req-id = 0x00000001 + # ping.enr-seq = 1 + # + # handshake inputs: + # + # whoareyou.challenge-data = 0x000000000000000000000000000000006469736376350001010102030405060708090a0b0c00180102030405060708090a0b0c0d0e0f100000000000000001 + # whoareyou.request-nonce = 0x0102030405060708090a0b0c + # whoareyou.id-nonce = 0x0102030405060708090a0b0c0d0e0f10 + # whoareyou.enr-seq = 1 + # ephemeral-key = 0x0288ef00023598499cb6c940146d050d2b1fb914198c327f76aad590bead68b6 + # ephemeral-pubkey = 0x039a003ba6517b473fa0cd74aefe99dadfdb34627f90fec6362df85803908f53a5 + + 00000000000000000000000000000000088b3d4342774649305f313964a39e55 + ea96c005ad521d8c7560413a7008f16c9e6d2f43bbea8814a546b7409ce783d3 + 4c4f53245d08da4bb252012b2cba3f4f374a90a75cff91f142fa9be3e0a5f3ef + 268ccb9065aeecfd67a999e7fdc137e062b2ec4a0eb92947f0d9a74bfbf44dfb + a776b21301f8b65efd5796706adff216ab862a9186875f9494150c4ae06fa4d1 + f0396c93f215fa4ef524f1eadf5f0f4126b79336671cbcf7a885b1f8bd2a5d83 + 9cf8 + */ + let node_a_key = SecretKey::from_byte_array(&hex!( + "eef77acb6c6a6eebc5b363a475ac583ec7eccdb42b6481424c60f59aa326547f" + )) + .unwrap(); + let src_id = node_id(&public_key_from_signing_key(&node_a_key)); + let expected_src_id = H256::from_slice(&hex!( + "aaaa8419e9f49d0083561b48287df592939a8d19947d8c0ef88f2a4856a69fbb" + )); + assert_eq!(src_id, expected_src_id); + + let node_b_key = SecretKey::from_byte_array(&hex!( + "66fb62bfbd66b9177a138c1e5cddbe4f7c30c343e94e68df8769459cb1cde628" + )) + .unwrap(); + let dest_pub_key = public_key_from_signing_key(&node_b_key); + let dest_pubkey = compress_pubkey(dest_pub_key).unwrap(); + let dest_id = node_id(&dest_pub_key); + + let message = Message::Ping(PingMessage { + req_id: Bytes::from(hex!("00000001").as_slice()), + enr_seq: 1, + }); + + let challenge_data = hex!("000000000000000000000000000000006469736376350001010102030405060708090a0b0c00180102030405060708090a0b0c0d0e0f100000000000000001").to_vec(); + + let ephemeral_key = SecretKey::from_byte_array(&hex!( + "0288ef00023598499cb6c940146d050d2b1fb914198c327f76aad590bead68b6" + )) + .unwrap(); + let expected_ephemeral_pubkey = + hex!("039a003ba6517b473fa0cd74aefe99dadfdb34627f90fec6362df85803908f53a5"); + + let ephemeral_pubkey = ephemeral_key.public_key(secp256k1::SECP256K1).serialize(); + + assert_eq!(ephemeral_pubkey, expected_ephemeral_pubkey); + + let session = derive_session_keys( + &ephemeral_key, + &dest_pubkey, + &src_id, + &dest_id, + &challenge_data, ); - let masking_iv = u128::from_be_bytes(encoded[..16].try_into().unwrap()); + let expected_read_key = hex!("4f9fac6de7567d1e3b1241dffe90f662"); + assert_eq!(session.outbound_key, expected_read_key); + + let signature = + create_id_signature(&node_a_key, &challenge_data, &ephemeral_pubkey, &dest_id); + + let handshake = Handshake { + src_id, + id_signature: signature.serialize_compact().to_vec(), + eph_pubkey: ephemeral_pubkey.to_vec(), + record: None, + message, + }; + + let masking_iv = [0; 16]; let nonce = hex!("ffffffffffffffffffffffff"); - let mut buf = Vec::new(); - Packet::Handshake(handshake) - .encode(&mut buf, masking_iv, &nonce, &dest_id, &read_key) + + let packet = handshake + .encode(&nonce, masking_iv, &session.outbound_key) .unwrap(); - assert_eq!(buf, encoded.to_vec()); + let expected_encoded = &hex!( + "00000000000000000000000000000000088b3d4342774649305f313964a39e55ea96c005ad521d8c7560413a7008f16c9e6d2f43bbea8814a546b7409ce783d34c4f53245d08da4bb252012b2cba3f4f374a90a75cff91f142fa9be3e0a5f3ef268ccb9065aeecfd67a999e7fdc137e062b2ec4a0eb92947f0d9a74bfbf44dfba776b21301f8b65efd5796706adff216ab862a9186875f9494150c4ae06fa4d1f0396c93f215fa4ef524f1eadf5f0f4126b79336671cbcf7a885b1f8bd2a5d839cf8" + ); + + let mut buf = BytesMut::new(); + packet.encode(&mut buf, &dest_id).unwrap(); + + assert_eq!(buf.to_vec(), expected_encoded); } /// Ping handshake message packet (flag 2, with ENR) from https://github.com/ethereum/devp2p/blob/master/discv5/discv5-wire-test-vectors.md #[test] - fn handshake_packet_with_enr_vector_roundtrip() { + fn decode_ping_handshake_packet_with_enr() { + /* + # src-node-id = 0xaaaa8419e9f49d0083561b48287df592939a8d19947d8c0ef88f2a4856a69fbb + # dest-node-id = 0xbbbb9d047f0488c0b5a93c1c3f2d8bafc7c8ff337024a55434a0d0555de64db9 + # nonce = 0xffffffffffffffffffffffff + # read-key = 0x53b1c075f41876423154e157470c2f48 + # ping.req-id = 0x00000001 + # ping.enr-seq = 1 + # + # handshake inputs: + # + # whoareyou.challenge-data = 0x000000000000000000000000000000006469736376350001010102030405060708090a0b0c00180102030405060708090a0b0c0d0e0f100000000000000000 + # whoareyou.request-nonce = 0x0102030405060708090a0b0c + # whoareyou.id-nonce = 0x0102030405060708090a0b0c0d0e0f10 + # whoareyou.enr-seq = 0 + # ephemeral-key = 0x0288ef00023598499cb6c940146d050d2b1fb914198c327f76aad590bead68b6 + # ephemeral-pubkey = 0x039a003ba6517b473fa0cd74aefe99dadfdb34627f90fec6362df85803908f53a5 + + 00000000000000000000000000000000088b3d4342774649305f313964a39e55 + ea96c005ad539c8c7560413a7008f16c9e6d2f43bbea8814a546b7409ce783d3 + 4c4f53245d08da4bb23698868350aaad22e3ab8dd034f548a1c43cd246be9856 + 2fafa0a1fa86d8e7a3b95ae78cc2b988ded6a5b59eb83ad58097252188b902b2 + 1481e30e5e285f19735796706adff216ab862a9186875f9494150c4ae06fa4d1 + f0396c93f215fa4ef524e0ed04c3c21e39b1868e1ca8105e585ec17315e755e6 + cfc4dd6cb7fd8e1a1f55e49b4b5eb024221482105346f3c82b15fdaae36a3bb1 + 2a494683b4a3c7f2ae41306252fed84785e2bbff3b022812d0882f06978df84a + 80d443972213342d04b9048fc3b1d5fcb1df0f822152eced6da4d3f6df27e70e + 4539717307a0208cd208d65093ccab5aa596a34d7511401987662d8cf62b1394 + 71 + */ let node_b_key = SecretKey::from_byte_array(&hex!( "66fb62bfbd66b9177a138c1e5cddbe4f7c30c343e94e68df8769459cb1cde628" )) .unwrap(); let dest_id = node_id(&public_key_from_signing_key(&node_b_key)); - let encoded = &hex!( + let encoded_packet = &hex!( "00000000000000000000000000000000088b3d4342774649305f313964a39e55ea96c005ad539c8c7560413a7008f16c9e6d2f43bbea8814a546b7409ce783d34c4f53245d08da4bb23698868350aaad22e3ab8dd034f548a1c43cd246be98562fafa0a1fa86d8e7a3b95ae78cc2b988ded6a5b59eb83ad58097252188b902b21481e30e5e285f19735796706adff216ab862a9186875f9494150c4ae06fa4d1f0396c93f215fa4ef524e0ed04c3c21e39b1868e1ca8105e585ec17315e755e6cfc4dd6cb7fd8e1a1f55e49b4b5eb024221482105346f3c82b15fdaae36a3bb12a494683b4a3c7f2ae41306252fed84785e2bbff3b022812d0882f06978df84a80d443972213342d04b9048fc3b1d5fcb1df0f822152eced6da4d3f6df27e70e4539717307a0208cd208d65093ccab5aa596a34d7511401987662d8cf62b139471" ); - let nonce = hex!("ffffffffffffffffffffffff"); let read_key = hex!("53b1c075f41876423154e157470c2f48"); - let packet = Packet::decode(&dest_id, &read_key, encoded).unwrap(); - let handshake = match packet { - Packet::Handshake(hs) => hs, - other => panic!("unexpected packet {other:?}"), - }; + let packet = Packet::decode(&dest_id, encoded_packet).unwrap(); + assert_eq!([0; 16], packet.masking_iv); + assert_eq!(0x02, packet.header.flag); + assert_eq!(hex!("ffffffffffffffffffffffff"), packet.header.nonce); + + let handshake = Handshake::decode(&packet, &read_key).unwrap(); assert_eq!( handshake.src_id, @@ -980,135 +1180,307 @@ mod tests { let pairs = record.decode_pairs(); assert_eq!(pairs.id.as_deref(), Some("v4")); assert!(pairs.secp256k1.is_some()); - - let masking_iv = u128::from_be_bytes(encoded[..16].try_into().unwrap()); - let mut buf = Vec::new(); - Packet::Handshake(handshake) - .encode(&mut buf, masking_iv, &nonce, &dest_id, &read_key) - .unwrap(); - - assert_eq!(buf, encoded.to_vec()); } + /// Ping handshake packet (flag 2, with ENR) from https://github.com/ethereum/devp2p/blob/master/discv5/discv5-wire-test-vectors.md #[test] - fn encode_whoareyou_packet() { - // # src-node-id = 0xaaaa8419e9f49d0083561b48287df592939a8d19947d8c0ef88f2a4856a69fbb - // # dest-node-id = 0xbbbb9d047f0488c0b5a93c1c3f2d8bafc7c8ff337024a55434a0d0555de64db9 - // # whoareyou.challenge-data = 0x000000000000000000000000000000006469736376350001010102030405060708090a0b0c00180102030405060708090a0b0c0d0e0f100000000000000000 - // # whoareyou.request-nonce = 0x0102030405060708090a0b0c - // # whoareyou.id-nonce = 0x0102030405060708090a0b0c0d0e0f10 - // # whoareyou.enr-seq = 0 - // - // 00000000000000000000000000000000088b3d434277464933a1ccc59f5967ad - // 1d6035f15e528627dde75cd68292f9e6c27d6b66c8100a873fcbaed4e16b8d + fn encode_ping_handshake_packet_with_enr() { + /* + # src-node-id = 0xaaaa8419e9f49d0083561b48287df592939a8d19947d8c0ef88f2a4856a69fbb + # dest-node-id = 0xbbbb9d047f0488c0b5a93c1c3f2d8bafc7c8ff337024a55434a0d0555de64db9 + # nonce = 0xffffffffffffffffffffffff + # read-key = 0x53b1c075f41876423154e157470c2f48 + # ping.req-id = 0x00000001 + # ping.enr-seq = 1 + # + # handshake inputs: + # + # whoareyou.challenge-data = 0x000000000000000000000000000000006469736376350001010102030405060708090a0b0c00180102030405060708090a0b0c0d0e0f100000000000000000 + # whoareyou.request-nonce = 0x0102030405060708090a0b0c + # whoareyou.id-nonce = 0x0102030405060708090a0b0c0d0e0f10 + # whoareyou.enr-seq = 0 + # ephemeral-key = 0x0288ef00023598499cb6c940146d050d2b1fb914198c327f76aad590bead68b6 + # ephemeral-pubkey = 0x039a003ba6517b473fa0cd74aefe99dadfdb34627f90fec6362df85803908f53a5 + + 00000000000000000000000000000000088b3d4342774649305f313964a39e55 + ea96c005ad539c8c7560413a7008f16c9e6d2f43bbea8814a546b7409ce783d3 + 4c4f53245d08da4bb23698868350aaad22e3ab8dd034f548a1c43cd246be9856 + 2fafa0a1fa86d8e7a3b95ae78cc2b988ded6a5b59eb83ad58097252188b902b2 + 1481e30e5e285f19735796706adff216ab862a9186875f9494150c4ae06fa4d1 + f0396c93f215fa4ef524e0ed04c3c21e39b1868e1ca8105e585ec17315e755e6 + cfc4dd6cb7fd8e1a1f55e49b4b5eb024221482105346f3c82b15fdaae36a3bb1 + 2a494683b4a3c7f2ae41306252fed84785e2bbff3b022812d0882f06978df84a + 80d443972213342d04b9048fc3b1d5fcb1df0f822152eced6da4d3f6df27e70e + 4539717307a0208cd208d65093ccab5aa596a34d7511401987662d8cf62b1394 + 71 + */ + let node_a_key = SecretKey::from_byte_array(&hex!( + "eef77acb6c6a6eebc5b363a475ac583ec7eccdb42b6481424c60f59aa326547f" + )) + .unwrap(); + let src_id = node_id(&public_key_from_signing_key(&node_a_key)); + let expected_src_id = H256::from_slice(&hex!( + "aaaa8419e9f49d0083561b48287df592939a8d19947d8c0ef88f2a4856a69fbb" + )); + assert_eq!(src_id, expected_src_id); + let node_b_key = SecretKey::from_byte_array(&hex!( "66fb62bfbd66b9177a138c1e5cddbe4f7c30c343e94e68df8769459cb1cde628" )) .unwrap(); + let dest_pub_key = public_key_from_signing_key(&node_b_key); + let dest_pubkey = compress_pubkey(dest_pub_key).unwrap(); + let dest_id: H256 = node_id(&dest_pub_key); - let packet = Packet::WhoAreYou(WhoAreYou { - id_nonce: u128::from_be_bytes( - hex!("0102030405060708090a0b0c0d0e0f10") - .to_vec() - .try_into() - .unwrap(), - ), - enr_seq: 0, + let message = Message::Ping(PingMessage { + req_id: Bytes::from(hex!("00000001").as_slice()), + enr_seq: 1, }); - let dest_id = node_id(&public_key_from_signing_key(&node_b_key)); - let mut buf = Vec::new(); + let challenge_data = hex!("000000000000000000000000000000006469736376350001010102030405060708090a0b0c00180102030405060708090a0b0c0d0e0f100000000000000000").to_vec(); + + let ephemeral_key = SecretKey::from_byte_array(&hex!( + "0288ef00023598499cb6c940146d050d2b1fb914198c327f76aad590bead68b6" + )) + .unwrap(); + let expected_ephemeral_pubkey = + hex!("039a003ba6517b473fa0cd74aefe99dadfdb34627f90fec6362df85803908f53a5"); + + let ephemeral_pubkey = ephemeral_key.public_key(secp256k1::SECP256K1).serialize(); - let _ = packet.encode( - &mut buf, - 0, - &hex!("0102030405060708090a0b0c"), + assert_eq!(ephemeral_pubkey, expected_ephemeral_pubkey); + + let session = derive_session_keys( + &ephemeral_key, + &dest_pubkey, + &src_id, &dest_id, - &[], + &challenge_data, ); - let expected = &hex!( - "00000000000000000000000000000000088b3d434277464933a1ccc59f5967ad1d6035f15e528627dde75cd68292f9e6c27d6b66c8100a873fcbaed4e16b8d" + + let expected_read_key = hex!("53b1c075f41876423154e157470c2f48"); + assert_eq!(session.outbound_key, expected_read_key); + + let signature = + create_id_signature(&node_a_key, &challenge_data, &ephemeral_pubkey, &dest_id); + + let sig = "17e1b073918da32d640642c762c0e2781698e4971f8ab39a77746adad83f01e76ffc874c5924808bbe7c50890882c2b8a01287a0b08312d1d53a17d517f5eb27"; + let key = "0313d14211e0287b2361a1615890a9b5212080546d0a257ae4cff96cf534992cb9"; + + let record = NodeRecord { + signature: H512::from_str(sig).unwrap(), + seq: 1, + pairs: NodeRecordPairs { + id: Some("v4".to_owned()), + ip: Some(Ipv4Addr::new(127, 0, 0, 1)), + ip6: None, + tcp_port: None, + udp_port: None, + secp256k1: Some(H264::from_str(key).unwrap()), + eth: None, + } + .into(), + }; + + let handshake = Handshake { + src_id, + id_signature: signature.serialize_compact().to_vec(), + eph_pubkey: ephemeral_pubkey.to_vec(), + record: Some(record), + message, + }; + + let masking_iv = [0; 16]; + let nonce = hex!("ffffffffffffffffffffffff"); + + let packet = handshake + .encode(&nonce, masking_iv, &session.outbound_key) + .unwrap(); + + let expected_encoded = &hex!( + "00000000000000000000000000000000088b3d4342774649305f313964a39e55ea96c005ad539c8c7560413a7008f16c9e6d2f43bbea8814a546b7409ce783d34c4f53245d08da4bb23698868350aaad22e3ab8dd034f548a1c43cd246be98562fafa0a1fa86d8e7a3b95ae78cc2b988ded6a5b59eb83ad58097252188b902b21481e30e5e285f19735796706adff216ab862a9186875f9494150c4ae06fa4d1f0396c93f215fa4ef524e0ed04c3c21e39b1868e1ca8105e585ec17315e755e6cfc4dd6cb7fd8e1a1f55e49b4b5eb024221482105346f3c82b15fdaae36a3bb12a494683b4a3c7f2ae41306252fed84785e2bbff3b022812d0882f06978df84a80d443972213342d04b9048fc3b1d5fcb1df0f822152eced6da4d3f6df27e70e4539717307a0208cd208d65093ccab5aa596a34d7511401987662d8cf62b139471" ); - assert_eq!(buf, expected); + let mut buf = BytesMut::new(); + packet.encode(&mut buf, &dest_id).unwrap(); + + assert_eq!(buf.to_vec(), expected_encoded); } #[test] - fn decode_whoareyou_packet() { - // # src-node-id = 0xaaaa8419e9f49d0083561b48287df592939a8d19947d8c0ef88f2a4856a69fbb - // # dest-node-id = 0xbbbb9d047f0488c0b5a93c1c3f2d8bafc7c8ff337024a55434a0d0555de64db9 - // # whoareyou.challenge-data = 0x000000000000000000000000000000006469736376350001010102030405060708090a0b0c00180102030405060708090a0b0c0d0e0f100000000000000000 - // # whoareyou.request-nonce = 0x0102030405060708090a0b0c - // # whoareyou.id-nonce = 0x0102030405060708090a0b0c0d0e0f10 - // # whoareyou.enr-seq = 0 - // - // 00000000000000000000000000000000088b3d434277464933a1ccc59f5967ad - // 1d6035f15e528627dde75cd68292f9e6c27d6b66c8100a873fcbaed4e16b8d + fn aes_gcm_vector() { + // https://github.com/ethereum/devp2p/blob/master/discv5/discv5-wire-test-vectors.md#encryptiondecryption + let key = hex!("9f2d77db7004bf8a1a85107ac686990b"); + let nonce = hex!("27b5af763c446acd2749fe8e"); + let ad = hex!("93a7400fa0d6a694ebc24d5cf570f65d04215b6ac00757875e3f3a5f42107903"); + let mut pt = hex!("01c20101").to_vec(); + + let mut cipher = Aes128Gcm::new_from_slice(&key).unwrap(); + cipher + .encrypt_in_place(nonce.as_slice().into(), &ad, &mut pt) + .unwrap(); + + assert_eq!( + pt, + hex!("a5d12a2d94b8ccb3ba55558229867dc13bfa3648").to_vec() + ); + } + + #[test] + fn handshake_packet_roundtrip() { + let node_a_key = SecretKey::from_byte_array(&hex!( + "eef77acb6c6a6eebc5b363a475ac583ec7eccdb42b6481424c60f59aa326547f" + )) + .unwrap(); let node_b_key = SecretKey::from_byte_array(&hex!( "66fb62bfbd66b9177a138c1e5cddbe4f7c30c343e94e68df8769459cb1cde628" )) .unwrap(); + let src_id = node_id(&public_key_from_signing_key(&node_a_key)); let dest_id = node_id(&public_key_from_signing_key(&node_b_key)); - let mut codec = Discv5Codec::new(dest_id); - let mut encoded = BytesMut::from(hex!( - "00000000000000000000000000000000088b3d434277464933a1ccc59f5967ad1d6035f15e528627dde75cd68292f9e6c27d6b66c8100a873fcbaed4e16b8d" - ).as_slice()); - let packet = codec.decode(&mut encoded).unwrap(); - let expected = Some(Packet::WhoAreYou(WhoAreYou { - id_nonce: u128::from_be_bytes( - hex!("0102030405060708090a0b0c0d0e0f10") - .to_vec() - .try_into() - .unwrap(), - ), - enr_seq: 0, - })); + let handshake = Handshake { + src_id, + id_signature: vec![1; 64], + eph_pubkey: vec![2; 33], + record: None, + message: Message::Ping(PingMessage { + req_id: Bytes::from_static(&[3]), + enr_seq: 4, + }), + }; - assert_eq!(packet, expected); + let key = [0x10; 16]; + let nonce = hex!("000102030405060708090a0b"); + let mut buf = Vec::new(); + + let masking_iv = [0; 16]; + let packet = handshake.encode(&nonce, masking_iv, &key).unwrap(); + packet.encode(&mut buf, &dest_id).unwrap(); + + let decoded = Packet::decode(&dest_id, &buf).unwrap(); + assert_eq!(decoded, packet); } + /// Ping handshake packet (flag 2) from https://github.com/ethereum/devp2p/blob/master/discv5/discv5-wire-test-vectors.md #[test] - fn decode_ping_packet() { - // # src-node-id = 0xaaaa8419e9f49d0083561b48287df592939a8d19947d8c0ef88f2a4856a69fbb - // # dest-node-id = 0xbbbb9d047f0488c0b5a93c1c3f2d8bafc7c8ff337024a55434a0d0555de64db9 - // # nonce = 0xffffffffffffffffffffffff - // # read-key = 0x00000000000000000000000000000000 - // # ping.req-id = 0x00000001 - // # ping.enr-seq = 2 - // - // 00000000000000000000000000000000088b3d4342774649325f313964a39e55 - // ea96c005ad52be8c7560413a7008f16c9e6d2f43bbea8814a546b7409ce783d3 - // 4c4f53245d08dab84102ed931f66d1492acb308fa1c6715b9d139b81acbdcc + fn handshake_packet_vector_roundtrip() { + /* + # src-node-id = 0xaaaa8419e9f49d0083561b48287df592939a8d19947d8c0ef88f2a4856a69fbb + # dest-node-id = 0xbbbb9d047f0488c0b5a93c1c3f2d8bafc7c8ff337024a55434a0d0555de64db9 + # nonce = 0xffffffffffffffffffffffff + # read-key = 0x4f9fac6de7567d1e3b1241dffe90f662 + # ping.req-id = 0x00000001 + # ping.enr-seq = 1 + # + # handshake inputs: + # + # whoareyou.challenge-data = 0x000000000000000000000000000000006469736376350001010102030405060708090a0b0c00180102030405060708090a0b0c0d0e0f100000000000000001 + # whoareyou.request-nonce = 0x0102030405060708090a0b0c + # whoareyou.id-nonce = 0x0102030405060708090a0b0c0d0e0f10 + # whoareyou.enr-seq = 1 + # ephemeral-key = 0x0288ef00023598499cb6c940146d050d2b1fb914198c327f76aad590bead68b6 + # ephemeral-pubkey = 0x039a003ba6517b473fa0cd74aefe99dadfdb34627f90fec6362df85803908f53a5 - let node_a_key = SecretKey::from_byte_array(&hex!( - "eef77acb6c6a6eebc5b363a475ac583ec7eccdb42b6481424c60f59aa326547f" + 00000000000000000000000000000000088b3d4342774649305f313964a39e55 + ea96c005ad521d8c7560413a7008f16c9e6d2f43bbea8814a546b7409ce783d3 + 4c4f53245d08da4bb252012b2cba3f4f374a90a75cff91f142fa9be3e0a5f3ef + 268ccb9065aeecfd67a999e7fdc137e062b2ec4a0eb92947f0d9a74bfbf44dfb + a776b21301f8b65efd5796706adff216ab862a9186875f9494150c4ae06fa4d1 + f0396c93f215fa4ef524f1eadf5f0f4126b79336671cbcf7a885b1f8bd2a5d83 + 9cf8 + */ + let node_b_key = SecretKey::from_byte_array(&hex!( + "66fb62bfbd66b9177a138c1e5cddbe4f7c30c343e94e68df8769459cb1cde628" )) .unwrap(); + let dest_id = node_id(&public_key_from_signing_key(&node_b_key)); + + let encoded = &hex!( + "00000000000000000000000000000000088b3d4342774649305f313964a39e55ea96c005ad521d8c7560413a7008f16c9e6d2f43bbea8814a546b7409ce783d34c4f53245d08da4bb252012b2cba3f4f374a90a75cff91f142fa9be3e0a5f3ef268ccb9065aeecfd67a999e7fdc137e062b2ec4a0eb92947f0d9a74bfbf44dfba776b21301f8b65efd5796706adff216ab862a9186875f9494150c4ae06fa4d1f0396c93f215fa4ef524f1eadf5f0f4126b79336671cbcf7a885b1f8bd2a5d839cf8" + ); + let read_key = hex!("4f9fac6de7567d1e3b1241dffe90f662"); + + let packet = Packet::decode(&dest_id, encoded).unwrap(); + let handshake = Handshake::decode(&packet, &read_key).unwrap(); + + assert_eq!( + handshake.src_id, + H256::from_slice(&hex!( + "aaaa8419e9f49d0083561b48287df592939a8d19947d8c0ef88f2a4856a69fbb" + )) + ); + assert_eq!(handshake.record, None); + assert_eq!( + handshake.eph_pubkey, + hex!("039a003ba6517b473fa0cd74aefe99dadfdb34627f90fec6362df85803908f53a5").to_vec() + ); + assert_eq!( + handshake.message, + Message::Ping(PingMessage { + req_id: Bytes::from(hex!("00000001").as_slice()), + enr_seq: 1, + }) + ); + + let masking_iv = encoded[..16].try_into().unwrap(); + let nonce = hex!("ffffffffffffffffffffffff"); + let mut buf = Vec::new(); + let packet = handshake.encode(&nonce, masking_iv, &read_key).unwrap(); + packet.encode(&mut buf, &dest_id).unwrap(); + + assert_eq!(buf, encoded.to_vec()); + } + + /// Ping handshake message packet (flag 2, with ENR) from https://github.com/ethereum/devp2p/blob/master/discv5/discv5-wire-test-vectors.md + #[test] + fn handshake_packet_with_enr_vector_roundtrip() { let node_b_key = SecretKey::from_byte_array(&hex!( "66fb62bfbd66b9177a138c1e5cddbe4f7c30c343e94e68df8769459cb1cde628" )) .unwrap(); - - let src_id = node_id(&public_key_from_signing_key(&node_a_key)); let dest_id = node_id(&public_key_from_signing_key(&node_b_key)); let encoded = &hex!( - "00000000000000000000000000000000088b3d4342774649325f313964a39e55ea96c005ad52be8c7560413a7008f16c9e6d2f43bbea8814a546b7409ce783d34c4f53245d08dab84102ed931f66d1492acb308fa1c6715b9d139b81acbdcc" + "00000000000000000000000000000000088b3d4342774649305f313964a39e55ea96c005ad539c8c7560413a7008f16c9e6d2f43bbea8814a546b7409ce783d34c4f53245d08da4bb23698868350aaad22e3ab8dd034f548a1c43cd246be98562fafa0a1fa86d8e7a3b95ae78cc2b988ded6a5b59eb83ad58097252188b902b21481e30e5e285f19735796706adff216ab862a9186875f9494150c4ae06fa4d1f0396c93f215fa4ef524e0ed04c3c21e39b1868e1ca8105e585ec17315e755e6cfc4dd6cb7fd8e1a1f55e49b4b5eb024221482105346f3c82b15fdaae36a3bb12a494683b4a3c7f2ae41306252fed84785e2bbff3b022812d0882f06978df84a80d443972213342d04b9048fc3b1d5fcb1df0f822152eced6da4d3f6df27e70e4539717307a0208cd208d65093ccab5aa596a34d7511401987662d8cf62b139471" ); - // # read-key = 0x00000000000000000000000000000000 - let read_key = [0; 16]; - let packet = Packet::decode(&dest_id, &read_key, encoded).unwrap(); - let expected = Packet::Ordinary(Ordinary { - src_id, - message: Message::Ping(PingMessage { + let nonce = hex!("ffffffffffffffffffffffff"); + let read_key = hex!("53b1c075f41876423154e157470c2f48"); + + let packet = Packet::decode(&dest_id, encoded).unwrap(); + let handshake = Handshake::decode(&packet, &read_key).unwrap(); + + assert_eq!( + handshake.src_id, + H256::from_slice(&hex!( + "aaaa8419e9f49d0083561b48287df592939a8d19947d8c0ef88f2a4856a69fbb" + )) + ); + assert_eq!( + handshake.eph_pubkey, + hex!("039a003ba6517b473fa0cd74aefe99dadfdb34627f90fec6362df85803908f53a5").to_vec() + ); + assert_eq!( + handshake.message, + Message::Ping(PingMessage { req_id: Bytes::from(hex!("00000001").as_slice()), - enr_seq: 2, - }), - }); + enr_seq: 1, + }) + ); - assert_eq!(packet, expected); + let record = handshake.record.clone().expect("expected ENR record"); + let pairs = record.decode_pairs(); + assert_eq!(pairs.id.as_deref(), Some("v4")); + assert!(pairs.secp256k1.is_some()); + + let masking_iv = encoded[..16].try_into().unwrap(); + let mut buf = Vec::new(); + + let packet = handshake.encode(&nonce, masking_iv, &read_key).unwrap(); + packet.encode(&mut buf, &dest_id).unwrap(); + + assert_eq!(buf, encoded.to_vec()); } /// Ping message packet (flag 0) from https://github.com/ethereum/devp2p/blob/master/discv5/discv5-wire-test-vectors.md @@ -1126,8 +1498,8 @@ mod tests { let nonce = hex!("ffffffffffffffffffffffff"); let read_key = [0; 16]; - let packet = Packet::decode(&dest_id, &read_key, encoded).unwrap(); - let expected = Packet::Ordinary(Ordinary { + let packet = Packet::decode(&dest_id, encoded).unwrap(); + let message = Ordinary { src_id: H256::from_slice(&hex!( "aaaa8419e9f49d0083561b48287df592939a8d19947d8c0ef88f2a4856a69fbb" )), @@ -1135,14 +1507,14 @@ mod tests { req_id: Bytes::from(hex!("00000001").as_slice()), enr_seq: 2, }), - }); + }; + let masking_iv = [0; 16]; + let expected = message.encode(&nonce, masking_iv, &read_key).unwrap(); + assert_eq!(packet, expected); - let masking_iv = u128::from_be_bytes(encoded[..16].try_into().unwrap()); let mut buf = Vec::new(); - packet - .encode(&mut buf, masking_iv, &nonce, &dest_id, &read_key) - .unwrap(); + packet.encode(&mut buf, &dest_id).unwrap(); assert_eq!(buf, encoded.to_vec()); } @@ -1175,7 +1547,7 @@ mod tests { fn findnode_packet_codec_roundtrip() { let pkt = FindNodeMessage { req_id: Bytes::from_static(&[1, 2, 3, 4]), - distance: vec![1, 2, 3, 4], + distances: vec![0], }; let buf = pkt.encode_to_vec(); diff --git a/crates/networking/p2p/discv5/mod.rs b/crates/networking/p2p/discv5/mod.rs index 719d9e319f2..a166d1d5c45 100644 --- a/crates/networking/p2p/discv5/mod.rs +++ b/crates/networking/p2p/discv5/mod.rs @@ -1,3 +1,5 @@ pub mod codec; pub mod messages; +pub mod peer_table; +pub mod server; pub mod session; diff --git a/crates/networking/p2p/discv5/peer_table.rs b/crates/networking/p2p/discv5/peer_table.rs new file mode 100644 index 00000000000..27d8bf310ce --- /dev/null +++ b/crates/networking/p2p/discv5/peer_table.rs @@ -0,0 +1,1320 @@ +use crate::{ + discv4::server::MAX_NODES_IN_NEIGHBORS_PACKET, + discv5::session::Session, + metrics::METRICS, + rlpx::{connection::server::PeerConnection, p2p::Capability}, + types::{Node, NodeRecord}, + utils::distance, +}; +use ethrex_common::H256; +use indexmap::{IndexMap, map::Entry}; +use rand::seq::SliceRandom; +use rustc_hash::FxHashSet; +use spawned_concurrency::{ + error::GenServerError, + tasks::{CallResponse, CastResponse, GenServer, GenServerHandle, InitResult, send_message_on}, +}; +use std::{ + net::IpAddr, + time::{Duration, Instant}, +}; +use thiserror::Error; + +const MAX_SCORE: i64 = 50; +const MIN_SCORE: i64 = -50; +/// Score assigned to peers who are acting maliciously (e.g., returning a node with wrong hash) +const MIN_SCORE_CRITICAL: i64 = MIN_SCORE * 3; +/// Maximum amount of FindNode messages sent to a single node. +const MAX_FIND_NODE_PER_PEER: u64 = 20; +/// Score weight for the load balancing function. +const SCORE_WEIGHT: i64 = 1; +/// Weight for amount of requests being handled by the peer for the load balancing function. +const REQUESTS_WEIGHT: i64 = 1; +/// Max amount of ongoing requests per peer. +const MAX_CONCURRENT_REQUESTS_PER_PEER: i64 = 100; +/// The target number of RLPx connections to reach. +pub const TARGET_PEERS: usize = 100; +/// The target number of contacts to maintain in peer_table. +const TARGET_CONTACTS: usize = 100_000; + +#[derive(Debug, Clone)] +pub struct Contact { + pub node: Node, + /// The timestamp when the contact was last sent a ping. + /// If None, the contact has never been pinged. + pub validation_timestamp: Option, + /// The hash of the last unacknowledged ping sent to this contact, or + /// None if no ping was sent yet or it was already acknowledged. + pub ping_hash: Option, + + /// The hash of the last unacknowledged ENRRequest sent to this contact, or + /// None if no request was sent yet or it was already acknowledged. + pub enr_request_hash: Option, + + pub n_find_node_sent: u64, + /// ENR associated with this contact, if it was provided by the peer. + pub record: Option, + // This contact failed to respond our Ping. + pub disposable: bool, + // Set to true after we send a successful ENRResponse to it. + pub knows_us: bool, + // This is a known-bad peer (on another network, no matching capabilities, etc) + pub unwanted: bool, + /// Whether the last known fork ID is valid, None if unknown. + pub is_fork_id_valid: Option, + /// Session information for discv5 + session: Option, +} + +impl Contact { + pub fn was_validated(&self) -> bool { + self.validation_timestamp.is_some() && !self.has_pending_ping() + } + + pub fn has_pending_ping(&self) -> bool { + self.ping_hash.is_some() + } + + pub fn record_ping_sent(&mut self, ping_hash: H256) { + self.validation_timestamp = Some(Instant::now()); + self.ping_hash = Some(ping_hash); + } + + pub fn record_enr_request_sent(&mut self, request_hash: H256) { + self.enr_request_hash = Some(request_hash); + } + + // If hash does not match, ignore. Otherwise, reset enr_request_hash + pub fn record_enr_response_received(&mut self, request_hash: H256, record: NodeRecord) { + if self + .enr_request_hash + .take_if(|h| *h == request_hash) + .is_some() + { + self.record = Some(record); + } + } + + pub fn has_pending_enr_request(&self) -> bool { + self.enr_request_hash.is_some() + } +} + +impl From for Contact { + fn from(node: Node) -> Self { + Self { + node, + validation_timestamp: None, + ping_hash: None, + enr_request_hash: None, + n_find_node_sent: 0, + record: None, + disposable: false, + knows_us: true, + unwanted: false, + is_fork_id_valid: None, + session: None, + } + } +} + +#[derive(Debug, Clone)] +pub struct PeerData { + pub node: Node, + pub record: Option, + pub supported_capabilities: Vec, + /// Set to true if the connection is inbound (aka the connection was started by the peer and not by this node) + /// It is only valid as long as is_connected is true + pub is_connection_inbound: bool, + /// communication channels between the peer data and its active connection + pub connection: Option, + /// This tracks the score of a peer + score: i64, + /// Track the amount of concurrent requests this peer is handling + requests: i64, +} + +impl PeerData { + pub fn new( + node: Node, + record: Option, + connection: Option, + capabilities: Vec, + ) -> Self { + Self { + node, + record, + supported_capabilities: capabilities, + is_connection_inbound: false, + connection, + score: Default::default(), + requests: Default::default(), + } + } +} + +#[derive(Clone, Debug)] +pub struct PeerTable { + handle: GenServerHandle, +} + +impl PeerTable { + pub fn spawn(target_peers: usize) -> PeerTable { + PeerTable { + handle: PeerTableServer::new(target_peers).start(), + } + } + + /// We received a list of Nodes to contact. No conection has been established yet. + pub async fn new_contacts( + &mut self, + nodes: Vec, + local_node_id: H256, + ) -> Result<(), PeerTableError> { + self.handle + .cast(CastMessage::NewContacts { + nodes, + local_node_id, + }) + .await?; + Ok(()) + } + + /// We received a list of NodeRecords to contact. No conection has been established yet. + pub async fn new_contact_records( + &mut self, + node_records: Vec, + local_node_id: H256, + ) -> Result<(), PeerTableError> { + self.handle + .cast(CastMessage::NewContactRecords { + node_records, + local_node_id, + }) + .await?; + Ok(()) + } + + /// We have established a connection with the remote peer. + pub async fn new_connected_peer( + &mut self, + node: Node, + connection: PeerConnection, + capabilities: Vec, + ) -> Result<(), PeerTableError> { + self.handle + .cast(CastMessage::NewConnectedPeer { + node, + connection, + capabilities, + }) + .await?; + Ok(()) + } + + /// Set or update discv5 Session info. + pub async fn set_session_info( + &mut self, + node_id: H256, + session: Session, + ) -> Result<(), PeerTableError> { + self.handle + .cast(CastMessage::SetSessionInfo { node_id, session }) + .await?; + Ok(()) + } + + /// Remove from list of connected peers. + pub async fn remove_peer(&mut self, node_id: H256) -> Result<(), PeerTableError> { + self.handle + .cast(CastMessage::RemovePeer { node_id }) + .await?; + Ok(()) + } + + /// Increment the number of ongoing requests for this peer + pub async fn inc_requests(&mut self, node_id: H256) -> Result<(), PeerTableError> { + self.handle + .cast(CastMessage::IncRequests { node_id }) + .await?; + Ok(()) + } + + /// Decrement the number of ongoing requests for this peer + pub async fn dec_requests(&mut self, node_id: H256) -> Result<(), PeerTableError> { + self.handle + .cast(CastMessage::DecRequests { node_id }) + .await?; + Ok(()) + } + + /// Mark node as not wanted + pub async fn set_unwanted(&mut self, node_id: &H256) -> Result<(), PeerTableError> { + self.handle + .cast(CastMessage::SetUnwanted { node_id: *node_id }) + .await?; + Ok(()) + } + + /// Set whether the contact fork id is valid. + pub async fn set_is_fork_id_valid( + &mut self, + node_id: &H256, + valid: bool, + ) -> Result<(), PeerTableError> { + self.handle + .cast(CastMessage::SetIsForkIdValid { + node_id: *node_id, + valid, + }) + .await?; + Ok(()) + } + + /// Record a successful connection, used to score peers + pub async fn record_success(&mut self, node_id: &H256) -> Result<(), PeerTableError> { + self.handle + .cast(CastMessage::RecordSuccess { node_id: *node_id }) + .await?; + Ok(()) + } + + /// Record a failed connection, used to score peers + pub async fn record_failure(&mut self, node_id: &H256) -> Result<(), PeerTableError> { + self.handle + .cast(CastMessage::RecordFailure { node_id: *node_id }) + .await?; + Ok(()) + } + + /// Record a critical failure for connection, used to score peers + pub async fn record_critical_failure(&mut self, node_id: &H256) -> Result<(), PeerTableError> { + self.handle + .cast(CastMessage::RecordCriticalFailure { node_id: *node_id }) + .await?; + Ok(()) + } + + /// Record ping sent, store the ping hash for later check + pub async fn record_ping_sent( + &mut self, + node_id: &H256, + hash: H256, + ) -> Result<(), PeerTableError> { + self.handle + .cast(CastMessage::RecordPingSent { + node_id: *node_id, + hash, + }) + .await?; + Ok(()) + } + + /// Record a pong received. Check previously saved hash and reset it if it matches + pub async fn record_pong_received( + &mut self, + node_id: &H256, + ping_hash: H256, + ) -> Result<(), PeerTableError> { + self.handle + .cast(CastMessage::RecordPongReceived { + node_id: *node_id, + ping_hash, + }) + .await?; + Ok(()) + } + + /// Record request sent, store the request hash for later check + pub async fn record_enr_request_sent( + &mut self, + node_id: &H256, + request_hash: H256, + ) -> Result<(), PeerTableError> { + self.handle + .cast(CastMessage::RecordEnrRequestSent { + node_id: *node_id, + request_hash, + }) + .await?; + Ok(()) + } + + /// Record a response received. Check previously saved hash and reset it if it matches + pub async fn record_enr_response_received( + &mut self, + node_id: &H256, + request_hash: H256, + record: NodeRecord, + ) -> Result<(), PeerTableError> { + self.handle + .cast(CastMessage::RecordEnrResponseReceived { + node_id: *node_id, + request_hash, + record, + }) + .await?; + Ok(()) + } + + /// Set peer as disposable + pub async fn set_disposable(&mut self, node_id: &H256) -> Result<(), PeerTableError> { + self.handle + .cast(CastMessage::SetDisposable { node_id: *node_id }) + .await?; + Ok(()) + } + + /// Increment FindNode message counter for peer + pub async fn increment_find_node_sent(&mut self, node_id: &H256) -> Result<(), PeerTableError> { + self.handle + .cast(CastMessage::IncrementFindNodeSent { node_id: *node_id }) + .await?; + Ok(()) + } + + /// Set flag for peer that tells that it knows us + pub async fn knows_us(&mut self, node_id: &H256) -> Result<(), PeerTableError> { + self.handle + .cast(CastMessage::KnowsUs { node_id: *node_id }) + .await?; + Ok(()) + } + + /// Remove from list of contacts the ones marked as disposable + pub async fn prune(&mut self) -> Result<(), PeerTableError> { + self.handle.cast(CastMessage::Prune).await?; + Ok(()) + } + + /// Return the amount of connected peers + pub async fn peer_count(&mut self) -> Result { + match self.handle.call(CallMessage::PeerCount).await? { + OutMessage::PeerCount(peer_count) => Ok(peer_count), + _ => unreachable!(), + } + } + + /// Return the amount of connected peers that matches any of the given capabilities + pub async fn peer_count_by_capabilities( + &mut self, + capabilities: &[Capability], + ) -> Result { + match self + .handle + .call(CallMessage::PeerCountByCapabilities { + capabilities: capabilities.to_vec(), + }) + .await? + { + OutMessage::PeerCount(peer_count) => Ok(peer_count), + _ => unreachable!(), + } + } + + /// Check if target number of contacts and connected peers is reached + pub async fn target_reached(&mut self) -> Result { + match self.handle.call(CallMessage::TargetReached).await? { + OutMessage::TargetReached(result) => Ok(result), + _ => unreachable!(), + } + } + + /// Check if target number of connected peers is reached + pub async fn target_peers_reached(&mut self) -> Result { + match self.handle.call(CallMessage::TargetPeersReached).await? { + OutMessage::TargetReached(result) => Ok(result), + _ => unreachable!(), + } + } + + /// Return rate of target peers completion + pub async fn target_peers_completion(&mut self) -> Result { + match self.handle.call(CallMessage::TargetPeersCompletion).await? { + OutMessage::TargetCompletion(result) => Ok(result), + _ => unreachable!(), + } + } + + /// Provide a contact to initiate a connection + pub async fn get_contact_to_initiate(&mut self) -> Result, PeerTableError> { + match self.handle.call(CallMessage::GetContactToInitiate).await? { + OutMessage::Contact(contact) => Ok(Some(*contact)), + OutMessage::NotFound => Ok(None), + _ => unreachable!(), + } + } + + /// Provide a contact to perform Discovery lookup + pub async fn get_contact_for_lookup(&mut self) -> Result, PeerTableError> { + match self.handle.call(CallMessage::GetContactForLookup).await? { + OutMessage::Contact(contact) => Ok(Some(*contact)), + OutMessage::NotFound => Ok(None), + _ => unreachable!(), + } + } + + /// Provide a contact to perform ENR lookup + pub async fn get_contact_for_enr_lookup(&mut self) -> Result, PeerTableError> { + match self + .handle + .call(CallMessage::GetContactForEnrLookup) + .await? + { + OutMessage::Contact(contact) => Ok(Some(*contact)), + OutMessage::NotFound => Ok(None), + _ => unreachable!(), + } + } + + /// Get a contact using node_id + pub async fn get_contact(&mut self, node_id: H256) -> Result, PeerTableError> { + match self + .handle + .call(CallMessage::GetContact { node_id }) + .await? + { + OutMessage::Contact(contact) => Ok(Some(*contact)), + OutMessage::NotFound => Ok(None), + _ => unreachable!(), + } + } + + /// Get discv5 Session info. + pub async fn get_session_info( + &mut self, + node_id: H256, + ) -> Result, PeerTableError> { + match self + .handle + .call(CallMessage::GetContact { node_id }) + .await? + { + OutMessage::Contact(contact) => Ok(contact.session), + OutMessage::NotFound => Ok(None), + _ => unreachable!(), + } + } + + /// Get all contacts available to revalidate + pub async fn get_contacts_to_revalidate( + &mut self, + revalidation_interval: Duration, + ) -> Result, PeerTableError> { + match self + .handle + .call(CallMessage::GetContactsToRevalidate(revalidation_interval)) + .await? + { + OutMessage::Contacts(contacts) => Ok(contacts), + _ => unreachable!(), + } + } + + /// Returns the peer with the highest score and its peer channel. + pub async fn get_best_peer( + &mut self, + capabilities: &[Capability], + ) -> Result, PeerTableError> { + match self + .handle + .call(CallMessage::GetBestPeer { + capabilities: capabilities.to_vec(), + }) + .await? + { + OutMessage::FoundPeer { + node_id, + connection, + } => Ok(Some((node_id, connection))), + OutMessage::NotFound => Ok(None), + _ => unreachable!(), + } + } + + /// Get peer score + pub async fn get_score(&mut self, node_id: &H256) -> Result { + match self + .handle + .call(CallMessage::GetScore { node_id: *node_id }) + .await? + { + OutMessage::PeerScore(score) => Ok(score), + _ => unreachable!(), + } + } + + /// Get list of connected peers + pub async fn get_connected_nodes(&mut self) -> Result, PeerTableError> { + if let OutMessage::Nodes(nodes) = self.handle.call(CallMessage::GetConnectedNodes).await? { + Ok(nodes) + } else { + unreachable!() + } + } + + /// Get list of connected peers with their capabilities + pub async fn get_peers_with_capabilities( + &mut self, + ) -> Result)>, PeerTableError> { + match self + .handle + .call(CallMessage::GetPeersWithCapabilities) + .await? + { + OutMessage::PeersWithCapabilities(peers_with_capabilities) => { + Ok(peers_with_capabilities) + } + _ => unreachable!(), + } + } + + /// Get peer channels for communication. It returns a PeerConnection that implements + /// at least one of the required capabilities. + pub async fn get_peer_connections( + &mut self, + capabilities: &[Capability], + ) -> Result, PeerTableError> { + match self + .handle + .call(CallMessage::GetPeerConnections { + capabilities: capabilities.to_vec(), + }) + .await? + { + OutMessage::PeerConnection(connection) => Ok(connection), + _ => unreachable!(), + } + } + + /// Insert new peer if it is new. Returns a boolean telling if it was new or not. + pub async fn insert_if_new(&mut self, node: &Node) -> Result { + match self + .handle + .call(CallMessage::InsertIfNew { node: node.clone() }) + .await? + { + OutMessage::IsNew(is_new) => Ok(is_new), + _ => unreachable!(), + } + } + + /// Validate a contact + pub async fn validate_contact( + &mut self, + node_id: &H256, + sender_ip: IpAddr, + ) -> Result { + self.handle + .call(CallMessage::ValidateContact { + node_id: *node_id, + sender_ip, + }) + .await + .map_err(PeerTableError::InternalError) + } + + /// Get closest nodes according to kademlia's distance + pub async fn get_closest_nodes(&mut self, node_id: &H256) -> Result, PeerTableError> { + match self + .handle + .call(CallMessage::GetClosestNodes { node_id: *node_id }) + .await? + { + OutMessage::Nodes(nodes) => Ok(nodes), + _ => unreachable!(), + } + } + + /// Get metadata associated to peer + pub async fn get_peers_data(&mut self) -> Result, PeerTableError> { + match self.handle.call(CallMessage::GetPeersData).await? { + OutMessage::PeersData(peers_data) => Ok(peers_data), + _ => unreachable!(), + } + } + + /// Retrieve a random peer. + pub async fn get_random_peer( + &mut self, + capabilities: &[Capability], + ) -> Result, PeerTableError> { + match self + .handle + .call(CallMessage::GetRandomPeer { + capabilities: capabilities.to_vec(), + }) + .await? + { + OutMessage::FoundPeer { + node_id, + connection, + } => Ok(Some((node_id, connection))), + OutMessage::NotFound => Ok(None), + _ => unreachable!(), + } + } +} + +#[derive(Debug)] +struct PeerTableServer { + contacts: IndexMap, + peers: IndexMap, + already_tried_peers: FxHashSet, + discarded_contacts: FxHashSet, + target_peers: usize, +} + +impl PeerTableServer { + pub(crate) fn new(target_peers: usize) -> Self { + Self { + contacts: Default::default(), + peers: Default::default(), + already_tried_peers: Default::default(), + discarded_contacts: Default::default(), + target_peers, + } + } + // Internal functions // + + // Weighting function used to select best peer + // TODO: Review this formula and weight constants. + fn weight_peer(&self, score: &i64, requests: &i64) -> i64 { + score * SCORE_WEIGHT - requests * REQUESTS_WEIGHT + } + + // Returns if the peer has room for more connections given the current score + // and amount of inflight requests + fn can_try_more_requests(&self, score: &i64, requests: &i64) -> bool { + let score_ratio = (score - MIN_SCORE) as f64 / (MAX_SCORE - MIN_SCORE) as f64; + (*requests as f64) < MAX_CONCURRENT_REQUESTS_PER_PEER as f64 * score_ratio + } + + fn get_best_peer(&self, capabilities: &[Capability]) -> Option<(H256, PeerConnection)> { + self.peers + .iter() + // We filter only to those peers which are useful to us + .filter_map(|(id, peer_data)| { + // Skip the peer if it has too many ongoing requests or if it doesn't match + // the capabilities + if !self.can_try_more_requests(&peer_data.score, &peer_data.requests) + || !capabilities + .iter() + .any(|cap| peer_data.supported_capabilities.contains(cap)) + { + None + } else { + // if the peer doesn't have the channel open, we skip it. + let connection = peer_data.connection.clone()?; + + // We return the id, the score and the channel to connect with. + Some((*id, peer_data.score, peer_data.requests, connection)) + } + }) + .max_by_key(|(_, score, reqs, _)| self.weight_peer(score, reqs)) + .map(|(k, _, _, v)| (k, v)) + } + + fn prune(&mut self) { + let disposable_contacts = self + .contacts + .iter() + .filter_map(|(c_id, c)| c.disposable.then_some(*c_id)) + .collect::>(); + + for contact_to_discard_id in disposable_contacts { + self.contacts.swap_remove(&contact_to_discard_id); + self.discarded_contacts.insert(contact_to_discard_id); + } + } + + fn get_contact_to_initiate(&mut self) -> Option { + for contact in self.contacts.values() { + let node_id = contact.node.node_id(); + if !self.peers.contains_key(&node_id) + && !self.already_tried_peers.contains(&node_id) + && contact.knows_us + && !contact.unwanted + { + self.already_tried_peers.insert(node_id); + + return Some(contact.clone()); + } + } + // No untried contact found, resetting tried peers. + tracing::trace!("Resetting list of tried peers."); + self.already_tried_peers.clear(); + None + } + + fn get_contact_for_lookup(&self) -> Option { + self.contacts + .values() + .filter(|c| { + c.n_find_node_sent < MAX_FIND_NODE_PER_PEER + && !c.disposable + && c.is_fork_id_valid != Some(false) + }) + .collect::>() + .choose(&mut rand::rngs::OsRng) + .cloned() + .cloned() + } + + fn get_contact_for_enr_lookup(&mut self) -> Option { + self.contacts + .values() + .filter(|c| { + c.was_validated() + && !c.has_pending_enr_request() + && c.record.is_none() + && !c.disposable + }) + .collect::>() + .choose(&mut rand::rngs::OsRng) + .cloned() + .cloned() + } + + fn get_contacts_to_revalidate(&self, revalidation_interval: Duration) -> Vec { + self.contacts + .values() + .filter(|c| Self::is_validation_needed(c, revalidation_interval)) + .cloned() + .collect() + } + + fn validate_contact(&self, node_id: H256, sender_ip: IpAddr) -> OutMessage { + let Some(contact) = self.contacts.get(&node_id) else { + return OutMessage::UnknownContact; + }; + if !contact.was_validated() { + return OutMessage::InvalidContact; + } + + // Check that the IP address from which we receive the request matches the one we have stored to prevent amplification attacks + // This prevents an attack vector where the discovery protocol could be used to amplify traffic in a DDOS attack. + // A malicious actor would send a findnode request with the IP address and UDP port of the target as the source address. + // The recipient of the findnode packet would then send a neighbors packet (which is a much bigger packet than findnode) to the victim. + if sender_ip != contact.node.ip { + return OutMessage::IpMismatch; + } + OutMessage::Contact(Box::new(contact.clone())) + } + + fn get_closest_nodes(&self, node_id: H256) -> Vec { + let mut nodes: Vec<(Node, usize)> = vec![]; + + for (contact_id, contact) in &self.contacts { + let distance = distance(&node_id, contact_id); + if nodes.len() < MAX_NODES_IN_NEIGHBORS_PACKET { + nodes.push((contact.node.clone(), distance)); + } else { + for (i, (_, dis)) in &mut nodes.iter().enumerate() { + if distance < *dis { + nodes[i] = (contact.node.clone(), distance); + break; + } + } + } + } + nodes.into_iter().map(|(node, _distance)| node).collect() + } + + async fn new_contacts(&mut self, nodes: Vec, local_node_id: H256) { + for node in nodes { + let node_id = node.node_id(); + if let Entry::Vacant(vacant_entry) = self.contacts.entry(node_id) + && !self.discarded_contacts.contains(&node_id) + && node_id != local_node_id + { + vacant_entry.insert(Contact::from(node)); + METRICS.record_new_discovery().await; + } + } + } + + async fn new_contact_records(&mut self, node_records: Vec, local_node_id: H256) { + for node_record in node_records { + if let Ok(node) = Node::from_enr(&node_record) { + let node_id = node.node_id(); + if let Entry::Vacant(vacant_entry) = self.contacts.entry(node_id) + && !self.discarded_contacts.contains(&node_id) + && node_id != local_node_id + { + let mut contact = Contact::from(node); + // TODO: validate fork_id from enr + // (https://github.com/lambdaclass/ethrex/issues/5776) + //contact.is_fork_id_valid = backend.is_fork_id_valid(&node_record).await.ok().or(Some(false)); + contact.record = Some(node_record); + vacant_entry.insert(contact); + METRICS.record_new_discovery().await; + } + // TODO Handle the case the contact is already present + } + } + } + + fn peer_count_by_capabilities(&self, capabilities: Vec) -> usize { + self.peers + .iter() + .filter_map(|(node_id, peer_data)| { + // if the peer doesn't have any of the capabilities we need, we skip it + if !capabilities + .iter() + .any(|cap| peer_data.supported_capabilities.contains(cap)) + { + None + } else { + Some(*node_id) + } + }) + .collect::>() + .len() + } + + fn get_peer_connections(&self, capabilities: Vec) -> Vec<(H256, PeerConnection)> { + self.peers + .iter() + .filter_map(|(peer_id, peer_data)| { + // if the peer doesn't have any of the capabilities we need, we skip it + if !capabilities + .iter() + .any(|cap| peer_data.supported_capabilities.contains(cap)) + { + return None; + } + peer_data + .connection + .clone() + .map(|connection| (*peer_id, connection)) + }) + .collect() + } + + fn get_random_peer(&self, capabilities: Vec) -> Option<(H256, PeerConnection)> { + let peers: Vec<(H256, PeerConnection)> = self + .peers + .iter() + .filter_map(|(node_id, peer_data)| { + // if the peer doesn't have any of the capabilities we need, we skip it + if !capabilities + .iter() + .any(|cap| peer_data.supported_capabilities.contains(cap)) + { + return None; + } + peer_data + .connection + .clone() + .map(|connection| (*node_id, connection)) + }) + .collect(); + peers.choose(&mut rand::rngs::OsRng).cloned() + } + + fn is_validation_needed(contact: &Contact, revalidation_interval: Duration) -> bool { + let sent_ping_ttl = Duration::from_secs(30); + + let validation_is_stale = !contact.was_validated() + || contact + .validation_timestamp + .map(|ts| Instant::now().saturating_duration_since(ts) > revalidation_interval) + .unwrap_or(false); + + let sent_ping_is_stale = contact + .validation_timestamp + .map(|ts| Instant::now().saturating_duration_since(ts) > sent_ping_ttl) + .unwrap_or(false); + + !contact.disposable || validation_is_stale || sent_ping_is_stale + } +} + +#[derive(Clone, Debug)] +enum CastMessage { + NewContacts { + nodes: Vec, + local_node_id: H256, + }, + NewContactRecords { + node_records: Vec, + local_node_id: H256, + }, + NewConnectedPeer { + node: Node, + connection: PeerConnection, + capabilities: Vec, + }, + SetSessionInfo { + node_id: H256, + session: Session, + }, + RemovePeer { + node_id: H256, + }, + IncRequests { + node_id: H256, + }, + DecRequests { + node_id: H256, + }, + SetUnwanted { + node_id: H256, + }, + SetIsForkIdValid { + node_id: H256, + valid: bool, + }, + RecordSuccess { + node_id: H256, + }, + RecordFailure { + node_id: H256, + }, + RecordCriticalFailure { + node_id: H256, + }, + RecordPingSent { + node_id: H256, + hash: H256, + }, + RecordPongReceived { + node_id: H256, + ping_hash: H256, + }, + RecordEnrRequestSent { + node_id: H256, + request_hash: H256, + }, + RecordEnrResponseReceived { + node_id: H256, + request_hash: H256, + record: NodeRecord, + }, + SetDisposable { + node_id: H256, + }, + IncrementFindNodeSent { + node_id: H256, + }, + KnowsUs { + node_id: H256, + }, + Prune, + Shutdown, +} + +#[derive(Clone, Debug)] +enum CallMessage { + PeerCount, + PeerCountByCapabilities { capabilities: Vec }, + TargetReached, + TargetPeersReached, + TargetPeersCompletion, + GetContactToInitiate, + GetContactForLookup, + GetContactForEnrLookup, + GetContact { node_id: H256 }, + GetContactsToRevalidate(Duration), + GetBestPeer { capabilities: Vec }, + GetScore { node_id: H256 }, + GetConnectedNodes, + GetPeersWithCapabilities, + GetPeerConnections { capabilities: Vec }, + InsertIfNew { node: Node }, + ValidateContact { node_id: H256, sender_ip: IpAddr }, + GetClosestNodes { node_id: H256 }, + GetPeersData, + GetRandomPeer { capabilities: Vec }, +} + +#[derive(Debug)] +pub enum OutMessage { + PeerCount(usize), + FoundPeer { + node_id: H256, + connection: PeerConnection, + }, + NotFound, + PeerScore(i64), + PeersWithCapabilities(Vec<(H256, PeerConnection, Vec)>), + PeerConnection(Vec<(H256, PeerConnection)>), + Contacts(Vec), + TargetReached(bool), + TargetCompletion(f64), + IsNew(bool), + Nodes(Vec), + Contact(Box), + InvalidContact, + UnknownContact, + IpMismatch, + PeersData(Vec), +} + +#[derive(Debug, Error)] +pub enum PeerTableError { + #[error("Internal error: {0}")] + InternalError(#[from] GenServerError), +} + +impl GenServer for PeerTableServer { + type CallMsg = CallMessage; + type CastMsg = CastMessage; + type OutMsg = OutMessage; + type Error = PeerTableError; + + async fn init(self, handle: &GenServerHandle) -> Result, Self::Error> { + send_message_on( + handle.clone(), + tokio::signal::ctrl_c(), + CastMessage::Shutdown, + ); + Ok(InitResult::Success(self)) + } + + async fn handle_call( + &mut self, + message: Self::CallMsg, + _handle: &GenServerHandle, + ) -> CallResponse { + match message { + CallMessage::PeerCount => { + CallResponse::Reply(Self::OutMsg::PeerCount(self.peers.len())) + } + CallMessage::PeerCountByCapabilities { capabilities } => CallResponse::Reply( + OutMessage::PeerCount(self.peer_count_by_capabilities(capabilities)), + ), + CallMessage::TargetReached => CallResponse::Reply(Self::OutMsg::TargetReached( + self.contacts.len() >= TARGET_CONTACTS && self.peers.len() >= self.target_peers, + )), + CallMessage::TargetPeersReached => CallResponse::Reply(Self::OutMsg::TargetReached( + self.peers.len() >= self.target_peers, + )), + CallMessage::TargetPeersCompletion => CallResponse::Reply( + Self::OutMsg::TargetCompletion(self.peers.len() as f64 / self.target_peers as f64), + ), + CallMessage::GetContactToInitiate => CallResponse::Reply( + self.get_contact_to_initiate() + .map(Box::new) + .map_or(Self::OutMsg::NotFound, Self::OutMsg::Contact), + ), + CallMessage::GetContactForLookup => CallResponse::Reply( + self.get_contact_for_lookup() + .map(Box::new) + .map_or(Self::OutMsg::NotFound, Self::OutMsg::Contact), + ), + CallMessage::GetContactForEnrLookup => CallResponse::Reply( + self.get_contact_for_enr_lookup() + .map(Box::new) + .map_or(Self::OutMsg::NotFound, Self::OutMsg::Contact), + ), + CallMessage::GetContact { node_id } => CallResponse::Reply( + self.contacts + .get(&node_id) + .cloned() + .map(Box::new) + .map_or(Self::OutMsg::NotFound, Self::OutMsg::Contact), + ), + CallMessage::GetContactsToRevalidate(revalidation_interval) => CallResponse::Reply( + Self::OutMsg::Contacts(self.get_contacts_to_revalidate(revalidation_interval)), + ), + CallMessage::GetBestPeer { capabilities } => { + let channels = self.get_best_peer(&capabilities); + CallResponse::Reply(channels.map_or( + Self::OutMsg::NotFound, + |(node_id, connection)| Self::OutMsg::FoundPeer { + node_id, + connection, + }, + )) + } + CallMessage::GetScore { node_id } => CallResponse::Reply(Self::OutMsg::PeerScore( + self.peers + .get(&node_id) + .map(|peer_data| peer_data.score) + .unwrap_or_default(), + )), + CallMessage::GetConnectedNodes => CallResponse::Reply(Self::OutMsg::Nodes( + self.peers + .values() + .map(|peer_data| peer_data.node.clone()) + .collect(), + )), + CallMessage::GetPeersWithCapabilities => { + CallResponse::Reply(Self::OutMsg::PeersWithCapabilities( + self.peers + .iter() + .filter_map(|(peer_id, peer_data)| { + peer_data.connection.clone().map(|connection| { + ( + *peer_id, + connection, + peer_data.supported_capabilities.clone(), + ) + }) + }) + .collect(), + )) + } + CallMessage::GetPeerConnections { capabilities } => CallResponse::Reply( + OutMessage::PeerConnection(self.get_peer_connections(capabilities)), + ), + CallMessage::InsertIfNew { node } => CallResponse::Reply(Self::OutMsg::IsNew( + match self.contacts.entry(node.node_id()) { + Entry::Occupied(_) => false, + Entry::Vacant(entry) => { + METRICS.record_new_discovery().await; + entry.insert(Contact::from(node)); + true + } + }, + )), + CallMessage::ValidateContact { node_id, sender_ip } => { + CallResponse::Reply(self.validate_contact(node_id, sender_ip)) + } + CallMessage::GetClosestNodes { node_id } => { + CallResponse::Reply(Self::OutMsg::Nodes(self.get_closest_nodes(node_id))) + } + CallMessage::GetPeersData => CallResponse::Reply(OutMessage::PeersData( + self.peers.values().cloned().collect(), + )), + CallMessage::GetRandomPeer { capabilities } => CallResponse::Reply( + if let Some((node_id, connection)) = self.get_random_peer(capabilities) { + OutMessage::FoundPeer { + node_id, + connection, + } + } else { + OutMessage::NotFound + }, + ), + } + } + + async fn handle_cast( + &mut self, + message: Self::CastMsg, + _handle: &GenServerHandle, + ) -> CastResponse { + match message { + CastMessage::NewContacts { + nodes, + local_node_id, + } => { + self.new_contacts(nodes, local_node_id).await; + } + CastMessage::NewContactRecords { + node_records, + local_node_id, + } => { + self.new_contact_records(node_records, local_node_id).await; + } + CastMessage::NewConnectedPeer { + node, + connection, + capabilities, + } => { + let new_peer_id = node.node_id(); + let new_peer = PeerData::new(node, None, Some(connection), capabilities); + self.peers.insert(new_peer_id, new_peer); + } + CastMessage::SetSessionInfo { node_id, session } => { + self.contacts + .entry(node_id) + .and_modify(|contact| contact.session = Some(session)); + } + CastMessage::RemovePeer { node_id } => { + self.peers.swap_remove(&node_id); + } + CastMessage::IncRequests { node_id } => { + self.peers + .entry(node_id) + .and_modify(|peer_data| peer_data.requests += 1); + } + CastMessage::DecRequests { node_id } => { + self.peers + .entry(node_id) + .and_modify(|peer_data| peer_data.requests -= 1); + } + CastMessage::SetUnwanted { node_id } => { + self.contacts + .entry(node_id) + .and_modify(|contact| contact.unwanted = true); + } + CastMessage::SetIsForkIdValid { node_id, valid } => { + self.contacts + .entry(node_id) + .and_modify(|contact| contact.is_fork_id_valid = Some(valid)); + } + CastMessage::RecordSuccess { node_id } => { + self.peers + .entry(node_id) + .and_modify(|peer_data| peer_data.score = (peer_data.score + 1).min(MAX_SCORE)); + } + CastMessage::RecordFailure { node_id } => { + self.peers + .entry(node_id) + .and_modify(|peer_data| peer_data.score = (peer_data.score - 1).max(MIN_SCORE)); + } + CastMessage::RecordCriticalFailure { node_id } => { + self.peers + .entry(node_id) + .and_modify(|peer_data| peer_data.score = MIN_SCORE_CRITICAL); + } + CastMessage::RecordPingSent { node_id, hash } => { + self.contacts + .entry(node_id) + .and_modify(|contact| contact.record_ping_sent(hash)); + } + CastMessage::RecordPongReceived { node_id, ping_hash } => { + // If entry does not exist or hash does not match, ignore pong record + // Otherwise, reset ping_hash + self.contacts.entry(node_id).and_modify(|contact| { + if contact + .ping_hash + .map(|value| value == ping_hash) + .unwrap_or(false) + { + contact.ping_hash = None + } + }); + } + CastMessage::RecordEnrRequestSent { + node_id, + request_hash, + } => { + self.contacts + .entry(node_id) + .and_modify(|contact| contact.record_enr_request_sent(request_hash)); + } + CastMessage::RecordEnrResponseReceived { + node_id, + request_hash, + record, + } => { + self.contacts.entry(node_id).and_modify(|contact| { + contact.record_enr_response_received(request_hash, record); + }); + } + CastMessage::SetDisposable { node_id } => { + self.contacts + .entry(node_id) + .and_modify(|contact| contact.disposable = true); + } + CastMessage::IncrementFindNodeSent { node_id } => { + self.contacts + .entry(node_id) + .and_modify(|contact| contact.n_find_node_sent += 1); + } + CastMessage::KnowsUs { node_id } => { + self.contacts + .entry(node_id) + .and_modify(|c| c.knows_us = true); + } + CastMessage::Prune => self.prune(), + CastMessage::Shutdown => return CastResponse::Stop, + } + CastResponse::NoReply + } +} diff --git a/crates/networking/p2p/discv5/server.rs b/crates/networking/p2p/discv5/server.rs new file mode 100644 index 00000000000..394260bc2d2 --- /dev/null +++ b/crates/networking/p2p/discv5/server.rs @@ -0,0 +1,607 @@ +use crate::{ + discv5::{ + codec::Discv5Codec, + messages::{ + DISTANCES_PER_FIND_NODE_MSG, FindNodeMessage, Handshake, Message, NodesMessage, + Ordinary, Packet, PacketCodecError, PacketTrait as _, PingMessage, PongMessage, + }, + session::{build_challenge_data, create_id_signature, derive_session_keys}, + }, + metrics::METRICS, + peer_table::{PeerTable, PeerTableError}, + rlpx::utils::compress_pubkey, + types::{Node, NodeRecord}, + utils::distance, +}; +use bytes::{Bytes, BytesMut}; +use ethrex_common::H256; +use ethrex_storage::{Store, error::StoreError}; +use futures::StreamExt; +use indexmap::IndexMap; +use rand::{Rng, RngCore, rngs::OsRng}; +use secp256k1::{SecretKey, ecdsa::Signature}; +use spawned_concurrency::{ + messages::Unused, + tasks::{ + CastResponse, GenServer, GenServerHandle, InitResult::Success, send_after, send_interval, + send_message_on, spawn_listener, + }, +}; +use std::{ + net::SocketAddr, + sync::Arc, + time::{Duration, Instant}, +}; +use tokio::net::UdpSocket; +use tokio_util::udp::UdpFramed; +use tracing::{debug, error, info, trace}; + +/// Interval between revalidation checks. +const REVALIDATION_CHECK_INTERVAL: Duration = Duration::from_secs(12 * 60 * 60); // 12 hours, +/// Interval between revalidations. +const REVALIDATION_INTERVAL: Duration = Duration::from_secs(12 * 60 * 60); // 12 hours, +/// The initial interval between peer lookups, until the number of peers reaches +/// [target_peers](DiscoverySideCarState::target_peers), or the number of +/// contacts reaches [target_contacts](DiscoverySideCarState::target_contacts). +pub const INITIAL_LOOKUP_INTERVAL_MS: f64 = 100.0; // 10 per second +pub const LOOKUP_INTERVAL_MS: f64 = 600.0; // 100 per minute +const PRUNE_INTERVAL: Duration = Duration::from_secs(5); + +#[derive(Debug, thiserror::Error)] +pub enum DiscoveryServerError { + #[error(transparent)] + IoError(#[from] std::io::Error), + #[error("Failed to decode packet")] + InvalidPacket(#[from] PacketCodecError), + #[error("Failed to send message")] + MessageSendFailure(PacketCodecError), + #[error("Only partial message was sent")] + PartialMessageSent, + #[error("Unknown or invalid contact")] + InvalidContact, + #[error(transparent)] + PeerTable(#[from] PeerTableError), + #[error(transparent)] + Store(#[from] StoreError), + #[error("Internal error {0}")] + InternalError(String), + #[error("Cryptography Error {0}")] + CryptographyError(String), +} + +#[derive(Debug, Clone)] +pub enum InMessage { + Message(Box), + Revalidate, + Lookup, + Prune, + Shutdown, +} + +#[derive(Debug, Clone)] +pub enum OutMessage { + Done, +} + +#[derive(Debug)] +pub struct DiscoveryServer { + local_node: Node, + local_node_record: NodeRecord, + signer: SecretKey, + udp_socket: Arc, + peer_table: PeerTable, + initial_lookup_interval: f64, + /// Outgoing message count, used for nonce generation as per the spec. + counter: u32, + messages_by_nonce: IndexMap<[u8; 12], (Node, Message, Instant)>, +} + +impl DiscoveryServer { + pub async fn spawn( + storage: Store, + local_node: Node, + signer: SecretKey, + udp_socket: UdpSocket, + mut peer_table: PeerTable, + bootnodes: Vec, + // Sending part of the UdpFramed to send messages to remote nodes + initial_lookup_interval: f64, + ) -> Result<(), DiscoveryServerError> { + info!("Starting Discovery Server"); + + let mut local_node_record = NodeRecord::from_node(&local_node, 1, &signer) + .expect("Failed to create local node record"); + if let Ok(fork_id) = storage.get_fork_id().await { + local_node_record + .set_fork_id(fork_id, &signer) + .expect("Failed to set fork_id on local node record"); + } + + let discovery_server = Self { + local_node: local_node.clone(), + local_node_record, + signer, + udp_socket: Arc::new(udp_socket), + peer_table: peer_table.clone(), + initial_lookup_interval, + counter: 0, + messages_by_nonce: Default::default(), + }; + + info!(count = bootnodes.len(), "Adding bootnodes"); + peer_table + .new_contacts(bootnodes, local_node.node_id()) + .await?; + + discovery_server.start(); + Ok(()) + } + + async fn handle_packet( + &mut self, + Discv5Message { packet, from }: Discv5Message, + ) -> Result<(), DiscoveryServerError> { + // TODO retrieve session info + match packet.header.flag { + 0x00 => self.handle_ordinary(packet, from).await, + 0x01 => self.handle_who_are_you(packet, from).await, + 0x02 => { + // Handshake handling not yet implemented + tracing::info!("Received handsake message"); + Ok(()) + } + _ => Err(PacketCodecError::MalformedData)?, + } + } + async fn handle_ordinary( + &mut self, + packet: Packet, + addr: SocketAddr, + ) -> Result<(), DiscoveryServerError> { + let src_id = H256::from_slice(&packet.header.authdata); + let decrypt_key = self + .peer_table + .get_session_info(src_id) + .await? + .map_or([0; 16], |s| s.inbound_key); + + let ordinary = Ordinary::decode(&packet, &decrypt_key)?; + + tracing::trace!(received = %ordinary.message, from = %src_id, %addr); + + self.handle_message(ordinary).await + } + + async fn handle_who_are_you( + &mut self, + packet: Packet, + addr: SocketAddr, + ) -> Result<(), DiscoveryServerError> { + // TODO: check enr-seq to decide if we have to send the ENR in the handshake. + // (https://github.com/lambdaclass/ethrex/issues/5777) + // let whoareyou = WhoAreYou::decode(&packet)?; + let nonce = packet.header.nonce; + let Some((node, message, _)) = self.messages_by_nonce.swap_remove(&nonce) else { + tracing::trace!("Received unexpected WhoAreYou packet. Ignoring it"); + return Ok(()); + }; + tracing::trace!(received = "WhoAreYou", from = %node.node_id(), %addr); + + // challenge-data = masking-iv || static-header || authdata + let challenge_data = build_challenge_data( + &packet.masking_iv, + &packet.header.static_header, + &packet.header.authdata, + ); + + // ephemeral-key = random private key generated by node A + // ephemeral-pubkey = public key corresponding to ephemeral-key + let ephemeral_key = SecretKey::new(&mut rand::thread_rng()); + let ephemeral_pubkey = ephemeral_key.public_key(secp256k1::SECP256K1).serialize(); + + // dest-pubkey = public key corresponding to node B's static private key + let Some(dest_pubkey) = compress_pubkey(node.public_key) else { + return Err(DiscoveryServerError::CryptographyError( + "Invalid public key".to_string(), + )); + }; + + let session = derive_session_keys( + &ephemeral_key, + &dest_pubkey, + &self.local_node.node_id(), + &node.node_id(), + &challenge_data, + ); + + // Create the signature included in the message. + let signature = create_id_signature( + &self.signer, + &challenge_data, + &ephemeral_pubkey, + &node.node_id(), + ); + + self.peer_table + .set_session_info(node.node_id(), session) + .await?; + self.send_handshake(&message, signature, &ephemeral_pubkey, &node) + .await?; + + Ok(()) + } + + async fn revalidate(&mut self) -> Result<(), DiscoveryServerError> { + for _contact in self + .peer_table + .get_contacts_to_revalidate(REVALIDATION_INTERVAL) + .await? + { + // TODO: Implement Ping/Pong workflow + // (https://github.com/lambdaclass/ethrex/issues/5778) + // self.send_ping(&contact.node).await?; + } + Ok(()) + } + + async fn lookup(&mut self) -> Result<(), DiscoveryServerError> { + if let Some(contact) = self.peer_table.get_contact_for_lookup().await? { + if let Err(e) = self + .send_ordinary( + &self.get_random_find_node_message(&contact.node), + &contact.node, + ) + .await + { + error!(sending = "FindNode", addr = ?&contact.node.udp_addr(), err=?e, "Error sending message"); + self.peer_table + .set_disposable(&contact.node.node_id()) + .await?; + METRICS.record_new_discarded_node(); + } + + self.peer_table + .increment_find_node_sent(&contact.node.node_id()) + .await?; + } + Ok(()) + } + + fn get_random_find_node_message(&self, node: &Node) -> Message { + let mut rng = OsRng; + let target = rng.r#gen(); + let distance = distance(&target, &node.node_id()) as u8; + let mut distances = Vec::new(); + distances.push(distance as u32); + for i in 0..DISTANCES_PER_FIND_NODE_MSG / 2 { + if let Some(d) = distance.checked_add(i + 1) { + distances.push(d as u32) + } + if let Some(d) = distance.checked_sub(i + 1) { + distances.push(d as u32) + } + } + Message::FindNode(FindNodeMessage { + req_id: Bytes::from(rng.r#gen::().to_be_bytes().to_vec()), + distances, + }) + } + + async fn prune(&mut self) -> Result<(), DiscoveryServerError> { + self.peer_table.prune().await?; + Ok(()) + } + + async fn get_lookup_interval(&mut self) -> Duration { + let peer_completion = self + .peer_table + .target_peers_completion() + .await + .unwrap_or_default(); + lookup_interval_function( + peer_completion, + self.initial_lookup_interval, + LOOKUP_INTERVAL_MS, + ) + } + + async fn handle_ping( + &mut self, + _ping_message: PingMessage, + ) -> Result<(), DiscoveryServerError> { + // TODO: Implement Ping/Pong workflow + // (https://github.com/lambdaclass/ethrex/issues/5778) + Ok(()) + } + + async fn handle_pong( + &mut self, + _pong_message: PongMessage, + ) -> Result<(), DiscoveryServerError> { + // TODO: Implement Ping/Pong workflow + // (https://github.com/lambdaclass/ethrex/issues/5778) + Ok(()) + } + + async fn handle_find_node( + &mut self, + _find_node_message: FindNodeMessage, + ) -> Result<(), DiscoveryServerError> { + // TODO: Handle FindNode requests + // (https://github.com/lambdaclass/ethrex/issues/5779) + Ok(()) + } + + async fn handle_nodes_message( + &mut self, + nodes_message: NodesMessage, + ) -> Result<(), DiscoveryServerError> { + // TODO(#3746): check that we requested neighbors from the node + self.peer_table + .new_contact_records(nodes_message.nodes, self.local_node.node_id()) + .await?; + Ok(()) + } + + async fn send_ordinary( + &mut self, + message: &Message, + node: &Node, + ) -> Result<(), DiscoveryServerError> { + let ordinary = Ordinary { + src_id: self.local_node.node_id(), + message: message.clone(), + }; + let encrypt_key = self + .peer_table + .get_session_info(node.node_id()) + .await? + .map_or([0; 16], |s| s.outbound_key); + + let mut rng = OsRng; + let masking_iv: u128 = rng.r#gen(); + let nonce = self.next_nonce(&mut rng); + + let packet = ordinary.encode(&nonce, masking_iv.to_be_bytes(), &encrypt_key)?; + + let mut buf = BytesMut::new(); + packet.encode(&mut buf, &node.node_id())?; + + let addr = node.udp_addr(); + let _ = self.udp_socket.send_to(&buf, addr).await.inspect_err( + |e| error!(sending = ?message, addr = ?addr, err=?e, "Error sending message"), + )?; + trace!(msg = %message, node = %node.public_key, address= %addr, "Discv5 ordinary message sent"); + self.messages_by_nonce + .insert(nonce, (node.clone(), message.clone(), Instant::now())); + Ok(()) + } + + async fn send_handshake( + &mut self, + message: &Message, + signature: Signature, + eph_pubkey: &[u8], + node: &Node, + ) -> Result<(), DiscoveryServerError> { + let handshake = Handshake { + src_id: self.local_node.node_id(), + id_signature: signature.serialize_compact().to_vec(), + eph_pubkey: eph_pubkey.to_vec(), + record: Some(self.local_node_record.clone()), + message: message.clone(), + }; + let encrypt_key = self + .peer_table + .get_session_info(node.node_id()) + .await? + .map_or([0; 16], |s| s.outbound_key); + + let mut rng = OsRng; + let masking_iv: u128 = rng.r#gen(); + let nonce = self.next_nonce(&mut rng); + + let packet = handshake.encode(&nonce, masking_iv.to_be_bytes(), &encrypt_key)?; + + let mut buf = BytesMut::new(); + packet.encode(&mut buf, &node.node_id())?; + + let addr = node.udp_addr(); + let _ = self.udp_socket.send_to(&buf, addr).await.inspect_err( + |e| error!(sending = ?message, addr = ?addr, err=?e, "Error sending message"), + )?; + trace!(msg = %message, "Discv5 handshake message sent"); + self.messages_by_nonce + .insert(nonce, (node.clone(), message.clone(), Instant::now())); + Ok(()) + } + + /// Generates a 96-bit AES-GCM nonce + /// ## Spec Recommendation + /// Encode the current outgoing message count into the first 32 bits of the nonce and fill the remaining 64 bits with random data generated + /// by a cryptographically secure random number generator. + fn next_nonce(&mut self, rng: &mut R) -> [u8; 12] { + let counter = self.counter; + self.counter = self.counter.wrapping_add(1); + + let mut nonce = [0u8; 12]; + nonce[..4].copy_from_slice(&counter.to_be_bytes()); + rng.fill_bytes(&mut nonce[4..]); + nonce + } + + async fn handle_message(&mut self, ordinary: Ordinary) -> Result<(), DiscoveryServerError> { + // Ignore packets sent by ourselves + if ordinary.src_id == self.local_node.node_id() { + return Ok(()); + } + match ordinary.message { + Message::Ping(ping_message) => self.handle_ping(ping_message).await?, + Message::Pong(pong_message) => { + self.handle_pong(pong_message).await?; + } + Message::FindNode(find_node_message) => { + self.handle_find_node(find_node_message).await?; + } + Message::Nodes(nodes_message) => { + self.handle_nodes_message(nodes_message).await?; + } + // We are ignoring these messages currently + Message::TalkReq(_talk_req_message) => (), + Message::TalkRes(_talk_res_message) => (), + Message::Ticket(_ticket_message) => (), + } + Ok(()) + } +} + +impl GenServer for DiscoveryServer { + type CallMsg = Unused; + type CastMsg = InMessage; + type OutMsg = OutMessage; + type Error = DiscoveryServerError; + + async fn init( + self, + handle: &GenServerHandle, + ) -> Result, Self::Error> { + let stream = UdpFramed::new( + self.udp_socket.clone(), + Discv5Codec::new(self.local_node.node_id()), + ); + + spawn_listener( + handle.clone(), + stream.filter_map(|result| async move { + match result { + Ok((packet, addr)) => Some(InMessage::Message(Box::new(Discv5Message::from( + packet, addr, + )))), + Err(e) => { + debug!(error=?e, "Error receiving Discv5 message"); + // Skipping invalid data + None + } + } + }), + ); + send_interval( + REVALIDATION_CHECK_INTERVAL, + handle.clone(), + InMessage::Revalidate, + ); + send_interval(PRUNE_INTERVAL, handle.clone(), InMessage::Prune); + let _ = handle.clone().cast(InMessage::Lookup).await; + send_message_on(handle.clone(), tokio::signal::ctrl_c(), InMessage::Shutdown); + + Ok(Success(self)) + } + + async fn handle_cast( + &mut self, + message: Self::CastMsg, + handle: &GenServerHandle, + ) -> CastResponse { + match message { + Self::CastMsg::Message(message) => { + let _ = self + .handle_packet(*message) + .await + // log level trace as we don't want to spam decoding errors from bad peers. + .inspect_err(|e| trace!(err=?e, "Error Handling Discovery message")); + } + Self::CastMsg::Revalidate => { + trace!(received = "Revalidate"); + let _ = self + .revalidate() + .await + .inspect_err(|e| error!(err=?e, "Error revalidating discovered peers")); + } + Self::CastMsg::Lookup => { + trace!(received = "Lookup"); + let _ = self + .lookup() + .await + .inspect_err(|e| error!(err=?e, "Error performing Discovery lookup")); + + let interval = self.get_lookup_interval().await; + send_after(interval, handle.clone(), Self::CastMsg::Lookup); + } + Self::CastMsg::Prune => { + trace!(received = "Prune"); + let _ = self + .prune() + .await + .inspect_err(|e| error!(err=?e, "Error Pruning peer table")); + } + Self::CastMsg::Shutdown => return CastResponse::Stop, + } + CastResponse::NoReply + } +} + +#[derive(Debug, Clone)] +pub struct Discv5Message { + from: SocketAddr, + packet: Packet, +} + +impl Discv5Message { + pub fn from(packet: Packet, from: SocketAddr) -> Self { + Self { from, packet } + } +} + +pub fn lookup_interval_function(progress: f64, lower_limit: f64, upper_limit: f64) -> Duration { + // Smooth progression curve + // See https://easings.net/#easeInOutCubic + let ease_in_out_cubic = if progress < 0.5 { + 4.0 * progress.powf(3.0) + } else { + 1.0 - ((-2.0 * progress + 2.0).powf(3.0)) / 2.0 + }; + Duration::from_micros( + // Use `progress` here instead of `ease_in_out_cubic` for a linear function. + (1000f64 * (ease_in_out_cubic * (upper_limit - lower_limit) + lower_limit)).round() as u64, + ) +} + +#[cfg(test)] +mod tests { + use crate::{ + discv5::server::DiscoveryServer, + peer_table::PeerTable, + types::{Node, NodeRecord}, + }; + use rand::{SeedableRng, rngs::StdRng}; + use secp256k1::SecretKey; + use std::sync::Arc; + use tokio::net::UdpSocket; + + #[tokio::test] + async fn test_next_nonce_counter() { + let mut rng = StdRng::seed_from_u64(7); + let local_node = Node::from_enode_url( + "enode://d860a01f9722d78051619d1e2351aba3f43f943f6f00718d1b9baa4101932a1f5011f16bb2b1bb35db20d6fe28fa0bf09636d26a87d31de9ec6203eeedb1f666@18.138.108.67:30303", + ).expect("Bad enode url"); + let signer = SecretKey::new(&mut rand::rngs::OsRng); + let local_node_record = NodeRecord::from_node(&local_node, 1, &signer).unwrap(); + let mut server = DiscoveryServer { + local_node, + local_node_record, + signer, + udp_socket: Arc::new(UdpSocket::bind("127.0.0.1:30303").await.unwrap()), + peer_table: PeerTable::spawn(10), + initial_lookup_interval: 1000.0, + counter: 0, + messages_by_nonce: Default::default(), + }; + + let n1 = server.next_nonce(&mut rng); + let n2 = server.next_nonce(&mut rng); + + assert_eq!(&n1[..4], &[0, 0, 0, 0]); + assert_eq!(&n2[..4], &[0, 0, 0, 1]); + assert_ne!(&n1[4..], &n2[4..]); + } +} diff --git a/crates/networking/p2p/discv5/session.rs b/crates/networking/p2p/discv5/session.rs index 5a8f2c7f3a8..a446b3ba0dc 100644 --- a/crates/networking/p2p/discv5/session.rs +++ b/crates/networking/p2p/discv5/session.rs @@ -6,44 +6,11 @@ use secp256k1::{ }; use sha2::{Digest, Sha256}; -/// Role of the local node in the given session -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum SessionRole { - Initiator, - Recipient, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct SessionKeys { - pub initiator_key: [u8; 16], - pub recipient_key: [u8; 16], -} - /// A discv5 session #[derive(Debug, Clone, PartialEq, Eq)] pub struct Session { - pub keys: SessionKeys, - pub role: SessionRole, -} - -impl Session { - pub fn new(keys: SessionKeys, role: SessionRole) -> Self { - Self { keys, role } - } - - pub fn outbound_key(&self) -> &[u8; 16] { - match self.role { - SessionRole::Initiator => &self.keys.initiator_key, - SessionRole::Recipient => &self.keys.recipient_key, - } - } - - pub fn inbound_key(&self) -> &[u8; 16] { - match self.role { - SessionRole::Initiator => &self.keys.recipient_key, - SessionRole::Recipient => &self.keys.initiator_key, - } - } + pub outbound_key: [u8; 16], + pub inbound_key: [u8; 16], } /// Builds the challenge-data from a WHOAREYOU packet @@ -62,7 +29,7 @@ pub fn derive_session_keys( node_id_a: &H256, node_id_b: &H256, challenge_data: &[u8], -) -> SessionKeys { +) -> Session { let shared_secret = compressed_shared_secret(dest_pubkey, ephemeral_key); let hkdf = Hkdf::::new(Some(challenge_data), &shared_secret); @@ -74,9 +41,9 @@ pub fn derive_session_keys( hkdf.expand(&kdf_info, &mut key_data) .expect("key_data is 32 bytes long, it can never fail"); - SessionKeys { - initiator_key: key_data[..16].try_into().expect("sizes always match"), - recipient_key: key_data[16..].try_into().expect("sizes always match"), + Session { + outbound_key: key_data[..16].try_into().expect("sizes always match"), + inbound_key: key_data[16..].try_into().expect("sizes always match"), } } @@ -118,11 +85,8 @@ fn compressed_shared_secret(dest_pubkey: &PublicKey, ephemeral_key: &SecretKey) #[cfg(test)] mod tests { - use crate::discv5::codec::Discv5Codec; - use super::*; use hex_literal::hex; - use rand::{SeedableRng, rngs::StdRng}; #[test] fn derivation_matches_vector() { @@ -144,15 +108,21 @@ mod tests { "000000000000000000000000000000006469736376350001010102030405060708090a0b0c00180102030405060708090a0b0c0d0e0f100000000000000000" ); - let keys = derive_session_keys( + let session = derive_session_keys( &ephemeral_key, &dest_pubkey, &node_id_a, &node_id_b, &challenge_data, ); - assert_eq!(keys.initiator_key, hex!("dccc82d81bd610f4f76d3ebe97a40571")); - assert_eq!(keys.recipient_key, hex!("ac74bb8773749920b0d3a8881c173ec5")); + assert_eq!( + session.outbound_key, + hex!("dccc82d81bd610f4f76d3ebe97a40571") + ); + assert_eq!( + session.inbound_key, + hex!("ac74bb8773749920b0d3a8881c173ec5") + ); } #[test] @@ -179,18 +149,4 @@ mod tests { ) ); } - - #[test] - fn test_next_nonce_counter() { - let mut codec = Discv5Codec::new(H256::zero()); - - let mut rng = StdRng::seed_from_u64(7); - - let n1 = codec.next_nonce(&mut rng); - let n2 = codec.next_nonce(&mut rng); - - assert_eq!(&n1[..4], &[0, 0, 0, 0]); - assert_eq!(&n2[..4], &[0, 0, 0, 1]); - assert_ne!(&n1[4..], &n2[4..]); - } } diff --git a/crates/networking/p2p/network.rs b/crates/networking/p2p/network.rs index fca6c9c3193..696ee5d4560 100644 --- a/crates/networking/p2p/network.rs +++ b/crates/networking/p2p/network.rs @@ -4,11 +4,9 @@ use crate::rlpx::l2::l2_connection::P2PBasedContext; #[derive(Clone, Debug)] pub struct P2PBasedContext; use crate::{ - discv4::{ - peer_table::{PeerData, PeerTable}, - server::{DiscoveryServer, DiscoveryServerError}, - }, + discovery_server::{DiscoveryServer, DiscoveryServerError}, metrics::METRICS, + peer_table::{PeerData, PeerTable}, rlpx::{ connection::server::{PeerConnBroadcastSender, PeerConnection}, message::Message, @@ -106,17 +104,15 @@ pub enum NetworkError { } pub async fn start_network(context: P2PContext, bootnodes: Vec) -> Result<(), NetworkError> { - let udp_socket = Arc::new( - UdpSocket::bind(context.local_node.udp_addr()) - .await - .expect("Failed to bind udp socket"), - ); + let udp_socket = UdpSocket::bind(context.local_node.udp_addr()) + .await + .expect("Failed to bind udp socket"); DiscoveryServer::spawn( context.storage.clone(), context.local_node.clone(), context.signer, - udp_socket.clone(), + udp_socket, context.table.clone(), bootnodes, context.initial_lookup_interval, diff --git a/crates/networking/p2p/p2p.rs b/crates/networking/p2p/p2p.rs index 80a64bc485c..ac27dd072f3 100644 --- a/crates/networking/p2p/p2p.rs +++ b/crates/networking/p2p/p2p.rs @@ -82,3 +82,12 @@ pub mod utils; pub use network::periodically_show_peer_stats; pub use network::start_network; + +#[cfg(not(feature = "experimental-discv5"))] +pub use discv4::peer_table; +#[cfg(not(feature = "experimental-discv5"))] +pub use discv4::server as discovery_server; +#[cfg(feature = "experimental-discv5")] +pub use discv5::peer_table; +#[cfg(feature = "experimental-discv5")] +pub use discv5::server as discovery_server; diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index 7bdf46bd488..724a4890a6e 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -1,7 +1,7 @@ use crate::rlpx::initiator::RLPxInitiator; use crate::{ - discv4::peer_table::{PeerData, PeerTable, PeerTableError}, metrics::{CurrentStepValue, METRICS}, + peer_table::{PeerData, PeerTable, PeerTableError}, rlpx::{ connection::server::PeerConnection, error::PeerConnectionError, diff --git a/crates/networking/p2p/rlpx/connection/server.rs b/crates/networking/p2p/rlpx/connection/server.rs index 16cd9a3e9ed..39a93f528a4 100644 --- a/crates/networking/p2p/rlpx/connection/server.rs +++ b/crates/networking/p2p/rlpx/connection/server.rs @@ -6,9 +6,9 @@ use crate::rlpx::l2::{ }, }; use crate::{ - discv4::peer_table::PeerTable, metrics::METRICS, network::P2PContext, + peer_table::PeerTable, rlpx::{ Message, connection::{codec::RLPxCodec, handshake}, diff --git a/crates/networking/p2p/rlpx/error.rs b/crates/networking/p2p/rlpx/error.rs index f1c510a1916..f475f317d26 100644 --- a/crates/networking/p2p/rlpx/error.rs +++ b/crates/networking/p2p/rlpx/error.rs @@ -1,5 +1,5 @@ use super::{message::Message, p2p::DisconnectReason}; -use crate::discv4::peer_table::PeerTableError; +use crate::peer_table::PeerTableError; use aes::cipher::InvalidLength; use ethrex_blockchain::error::{ChainError, MempoolError}; use ethrex_rlp::error::{RLPDecodeError, RLPEncodeError}; diff --git a/crates/networking/p2p/rlpx/initiator.rs b/crates/networking/p2p/rlpx/initiator.rs index 1e379c370fd..4f11357a35f 100644 --- a/crates/networking/p2p/rlpx/initiator.rs +++ b/crates/networking/p2p/rlpx/initiator.rs @@ -1,11 +1,7 @@ -use crate::discv4::server::lookup_interval_function; +use crate::discv4::server::{LOOKUP_INTERVAL_MS, lookup_interval_function}; +use crate::peer_table::PeerTableError; use crate::types::Node; -use crate::{ - discv4::{peer_table::PeerTableError, server::LOOKUP_INTERVAL_MS}, - metrics::METRICS, - network::P2PContext, - rlpx::connection::server::PeerConnection, -}; +use crate::{metrics::METRICS, network::P2PContext, rlpx::connection::server::PeerConnection}; use spawned_concurrency::{ messages::Unused, tasks::{CastResponse, GenServer, GenServerHandle, InitResult, send_after, send_message_on}, diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 2f363c5b8f3..2e9f7612f3d 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -2,8 +2,8 @@ mod code_collector; mod state_healing; mod storage_healing; -use crate::discv4::peer_table::PeerTableError; use crate::peer_handler::{BlockRequestOrder, PeerHandlerError, SNAP_LIMIT}; +use crate::peer_table::PeerTableError; use crate::rlpx::p2p::SUPPORTED_ETH_CAPABILITIES; use crate::sync::code_collector::CodeHashCollector; use crate::sync::state_healing::heal_state_trie_wrap; diff --git a/crates/networking/p2p/tx_broadcaster.rs b/crates/networking/p2p/tx_broadcaster.rs index dd5c4739321..6cf324ef8b3 100644 --- a/crates/networking/p2p/tx_broadcaster.rs +++ b/crates/networking/p2p/tx_broadcaster.rs @@ -16,7 +16,7 @@ use spawned_concurrency::{ use tracing::{debug, error, info, trace}; use crate::{ - discv4::peer_table::{PeerTable, PeerTableError}, + peer_table::{PeerTable, PeerTableError}, rlpx::{ Message, connection::server::PeerConnection, diff --git a/crates/networking/p2p/types.rs b/crates/networking/p2p/types.rs index cb5be55ec57..609a389ecea 100644 --- a/crates/networking/p2p/types.rs +++ b/crates/networking/p2p/types.rs @@ -184,6 +184,10 @@ impl Node { pub fn from_enr_url(enr: &str) -> Result { let base64_decoded = ethrex_common::base64::decode(&enr.as_bytes()[4..]); let record = NodeRecord::decode(&base64_decoded).map_err(NodeError::from)?; + Node::from_enr(&record) + } + + pub fn from_enr(record: &NodeRecord) -> Result { let pairs = record.decode_pairs(); let public_key = pairs.secp256k1.ok_or(NodeError::MissingField( "public key not found in record".into(), @@ -305,7 +309,7 @@ impl NodeRecord { let Ok(bytes) = Bytes::decode(&value) else { continue; }; - if bytes.len() < 33 { + if bytes.len() != 33 { continue; } decoded_pairs.secp256k1 = Some(H264::from_slice(&bytes)) @@ -445,10 +449,10 @@ impl From for Vec<(Bytes, Bytes)> { impl RLPDecode for NodeRecord { fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> { - if rlp.len() > MAX_NODE_RECORD_ENCODED_SIZE { + let decoder = Decoder::new(rlp)?; + if decoder.get_payload_len() > MAX_NODE_RECORD_ENCODED_SIZE { return Err(RLPDecodeError::InvalidLength); } - let decoder = Decoder::new(rlp)?; let (signature, decoder) = decoder.decode_field("signature")?; let (seq, decoder) = decoder.decode_field("seq")?; let (pairs, decoder) = decode_node_record_optional_fields(vec![], decoder)?; diff --git a/crates/networking/p2p/utils.rs b/crates/networking/p2p/utils.rs index 3bebd2c3f95..1209dcc0ff8 100644 --- a/crates/networking/p2p/utils.rs +++ b/crates/networking/p2p/utils.rs @@ -221,3 +221,12 @@ pub fn dump_storages_to_file( .encode_to_vec(), ) } + +/// Computes the distance between two nodes according to the discv4/5 protocols +/// +/// +pub fn distance(node_id_1: &H256, node_id_2: &H256) -> usize { + let xor = node_id_1 ^ node_id_2; + let distance = U256::from_big_endian(xor.as_bytes()); + distance.bits().saturating_sub(1) +} diff --git a/crates/networking/rpc/admin/peers.rs b/crates/networking/rpc/admin/peers.rs index 95525cfcf2c..94b488b85ec 100644 --- a/crates/networking/rpc/admin/peers.rs +++ b/crates/networking/rpc/admin/peers.rs @@ -3,8 +3,8 @@ use crate::{rpc::RpcApiContext, utils::RpcErr}; use core::net::SocketAddr; use ethrex_common::H256; use ethrex_p2p::{ - discv4::peer_table::PeerData, peer_handler::PeerHandler, + peer_table::PeerData, rlpx::{initiator::InMessage, p2p::Capability}, types::Node, }; diff --git a/crates/networking/rpc/test_utils.rs b/crates/networking/rpc/test_utils.rs index c111901b210..c53ddb252fa 100644 --- a/crates/networking/rpc/test_utils.rs +++ b/crates/networking/rpc/test_utils.rs @@ -13,9 +13,9 @@ use ethrex_common::{ }, }; use ethrex_p2p::{ - discv4::peer_table::{PeerTable, TARGET_PEERS}, network::P2PContext, peer_handler::PeerHandler, + peer_table::{PeerTable, TARGET_PEERS}, rlpx::initiator::RLPxInitiator, sync::SyncMode, sync_manager::SyncManager,