Skip to content

Commit

Permalink
simplify rpc codec logic (#6304)
Browse files Browse the repository at this point in the history
* simplify rpc codec logic

* Merge branch 'unstable' of github.com:sigp/lighthouse into simplify-rpc-codec

* Merge branch 'unstable' of github.com:sigp/lighthouse into simplify-rpc-codec

* Merge branch 'unstable' of github.com:sigp/lighthouse into simply-rpc-codec

* Merge branch 'unstable' into simplify-rpc-codec

* Merge branch 'unstable' into simplify-rpc-codec
  • Loading branch information
jxs authored Sep 6, 2024
1 parent d686138 commit 873748d
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 491 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::rpc::methods::*;
use crate::rpc::{
codec::base::OutboundCodec,
protocol::{Encoding, ProtocolId, RPCError, SupportedProtocol, ERROR_TYPE_MAX, ERROR_TYPE_MIN},
use crate::rpc::protocol::{
Encoding, ProtocolId, RPCError, SupportedProtocol, ERROR_TYPE_MAX, ERROR_TYPE_MIN,
};
use crate::rpc::{InboundRequest, OutboundRequest};
use libp2p::bytes::BufMut;
use libp2p::bytes::BytesMut;
use snap::read::FrameDecoder;
use snap::write::FrameEncoder;
Expand Down Expand Up @@ -57,13 +57,13 @@ impl<E: EthSpec> SSZSnappyInboundCodec<E> {
max_packet_size,
}
}
}

// Encoder for inbound streams: Encodes RPC Responses sent to peers.
impl<E: EthSpec> Encoder<RPCCodedResponse<E>> for SSZSnappyInboundCodec<E> {
type Error = RPCError;

fn encode(&mut self, item: RPCCodedResponse<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
/// Encodes RPC Responses sent to peers.
fn encode_response(
&mut self,
item: RPCCodedResponse<E>,
dst: &mut BytesMut,
) -> Result<(), RPCError> {
let bytes = match &item {
RPCCodedResponse::Success(resp) => match &resp {
RPCResponse::Status(res) => res.as_ssz_bytes(),
Expand Down Expand Up @@ -125,6 +125,21 @@ impl<E: EthSpec> Encoder<RPCCodedResponse<E>> for SSZSnappyInboundCodec<E> {
}
}

// Encoder for inbound streams: Encodes RPC Responses sent to peers.
impl<E: EthSpec> Encoder<RPCCodedResponse<E>> for SSZSnappyInboundCodec<E> {
type Error = RPCError;

fn encode(&mut self, item: RPCCodedResponse<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
dst.clear();
dst.reserve(1);
dst.put_u8(
item.as_u8()
.expect("Should never encode a stream termination"),
);
self.encode_response(item, dst)
}
}

// Decoder for inbound streams: Decodes RPC requests from peers
impl<E: EthSpec> Decoder for SSZSnappyInboundCodec<E> {
type Item = InboundRequest<E>;
Expand Down Expand Up @@ -188,6 +203,8 @@ pub struct SSZSnappyOutboundCodec<E: EthSpec> {
/// The fork name corresponding to the received context bytes.
fork_name: Option<ForkName>,
fork_context: Arc<ForkContext>,
/// Keeps track of the current response code for a chunk.
current_response_code: Option<u8>,
phantom: PhantomData<E>,
}

Expand All @@ -209,66 +226,12 @@ impl<E: EthSpec> SSZSnappyOutboundCodec<E> {
fork_name: None,
fork_context,
phantom: PhantomData,
current_response_code: None,
}
}
}

// Encoder for outbound streams: Encodes RPC Requests to peers
impl<E: EthSpec> Encoder<OutboundRequest<E>> for SSZSnappyOutboundCodec<E> {
type Error = RPCError;

fn encode(&mut self, item: OutboundRequest<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = match item {
OutboundRequest::Status(req) => req.as_ssz_bytes(),
OutboundRequest::Goodbye(req) => req.as_ssz_bytes(),
OutboundRequest::BlocksByRange(r) => match r {
OldBlocksByRangeRequest::V1(req) => req.as_ssz_bytes(),
OldBlocksByRangeRequest::V2(req) => req.as_ssz_bytes(),
},
OutboundRequest::BlocksByRoot(r) => match r {
BlocksByRootRequest::V1(req) => req.block_roots.as_ssz_bytes(),
BlocksByRootRequest::V2(req) => req.block_roots.as_ssz_bytes(),
},
OutboundRequest::BlobsByRange(req) => req.as_ssz_bytes(),
OutboundRequest::BlobsByRoot(req) => req.blob_ids.as_ssz_bytes(),
OutboundRequest::DataColumnsByRange(req) => req.as_ssz_bytes(),
OutboundRequest::DataColumnsByRoot(req) => req.data_column_ids.as_ssz_bytes(),
OutboundRequest::Ping(req) => req.as_ssz_bytes(),
OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode
};
// SSZ encoded bytes should be within `max_packet_size`
if bytes.len() > self.max_packet_size {
return Err(RPCError::InternalError(
"attempting to encode data > max_packet_size",
));
}

// Inserts the length prefix of the uncompressed bytes into dst
// encoded as a unsigned varint
self.inner
.encode(bytes.len(), dst)
.map_err(RPCError::from)?;

let mut writer = FrameEncoder::new(Vec::new());
writer.write_all(&bytes).map_err(RPCError::from)?;
writer.flush().map_err(RPCError::from)?;

// Write compressed bytes to `dst`
dst.extend_from_slice(writer.get_ref());
Ok(())
}
}

// Decoder for outbound streams: Decodes RPC responses from peers.
//
// The majority of the decoding has now been pushed upstream due to the changing specification.
// We prefer to decode blocks and attestations with extra knowledge about the chain to perform
// faster verification checks before decoding entire blocks/attestations.
impl<E: EthSpec> Decoder for SSZSnappyOutboundCodec<E> {
type Item = RPCResponse<E>;
type Error = RPCError;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
// Decode an Rpc response.
fn decode_response(&mut self, src: &mut BytesMut) -> Result<Option<RPCResponse<E>>, RPCError> {
// Read the context bytes if required
if self.protocol.has_context_bytes() && self.fork_name.is_none() {
if src.len() >= CONTEXT_BYTES_LEN {
Expand Down Expand Up @@ -318,15 +281,8 @@ impl<E: EthSpec> Decoder for SSZSnappyOutboundCodec<E> {
Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len),
}
}
}

impl<E: EthSpec> OutboundCodec<OutboundRequest<E>> for SSZSnappyOutboundCodec<E> {
type CodecErrorType = ErrorType;

fn decode_error(
&mut self,
src: &mut BytesMut,
) -> Result<Option<Self::CodecErrorType>, RPCError> {
fn decode_error(&mut self, src: &mut BytesMut) -> Result<Option<ErrorType>, RPCError> {
let Some(length) = handle_length(&mut self.inner, &mut self.len, src)? else {
return Ok(None);
};
Expand Down Expand Up @@ -361,6 +317,95 @@ impl<E: EthSpec> OutboundCodec<OutboundRequest<E>> for SSZSnappyOutboundCodec<E>
}
}

// Encoder for outbound streams: Encodes RPC Requests to peers
impl<E: EthSpec> Encoder<OutboundRequest<E>> for SSZSnappyOutboundCodec<E> {
type Error = RPCError;

fn encode(&mut self, item: OutboundRequest<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = match item {
OutboundRequest::Status(req) => req.as_ssz_bytes(),
OutboundRequest::Goodbye(req) => req.as_ssz_bytes(),
OutboundRequest::BlocksByRange(r) => match r {
OldBlocksByRangeRequest::V1(req) => req.as_ssz_bytes(),
OldBlocksByRangeRequest::V2(req) => req.as_ssz_bytes(),
},
OutboundRequest::BlocksByRoot(r) => match r {
BlocksByRootRequest::V1(req) => req.block_roots.as_ssz_bytes(),
BlocksByRootRequest::V2(req) => req.block_roots.as_ssz_bytes(),
},
OutboundRequest::BlobsByRange(req) => req.as_ssz_bytes(),
OutboundRequest::BlobsByRoot(req) => req.blob_ids.as_ssz_bytes(),
OutboundRequest::DataColumnsByRange(req) => req.as_ssz_bytes(),
OutboundRequest::DataColumnsByRoot(req) => req.data_column_ids.as_ssz_bytes(),
OutboundRequest::Ping(req) => req.as_ssz_bytes(),
OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode
};
// SSZ encoded bytes should be within `max_packet_size`
if bytes.len() > self.max_packet_size {
return Err(RPCError::InternalError(
"attempting to encode data > max_packet_size",
));
}

// Inserts the length prefix of the uncompressed bytes into dst
// encoded as a unsigned varint
self.inner
.encode(bytes.len(), dst)
.map_err(RPCError::from)?;

let mut writer = FrameEncoder::new(Vec::new());
writer.write_all(&bytes).map_err(RPCError::from)?;
writer.flush().map_err(RPCError::from)?;

// Write compressed bytes to `dst`
dst.extend_from_slice(writer.get_ref());
Ok(())
}
}

// Decoder for outbound streams: Decodes RPC responses from peers.
//
// The majority of the decoding has now been pushed upstream due to the changing specification.
// We prefer to decode blocks and attestations with extra knowledge about the chain to perform
// faster verification checks before decoding entire blocks/attestations.
impl<E: EthSpec> Decoder for SSZSnappyOutboundCodec<E> {
type Item = RPCCodedResponse<E>;
type Error = RPCError;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
// if we have only received the response code, wait for more bytes
if src.len() <= 1 {
return Ok(None);
}
// using the response code determine which kind of payload needs to be decoded.
let response_code = self.current_response_code.unwrap_or_else(|| {
let resp_code = src.split_to(1)[0];
self.current_response_code = Some(resp_code);
resp_code
});

let inner_result = {
if RPCCodedResponse::<E>::is_response(response_code) {
// decode an actual response and mutates the buffer if enough bytes have been read
// returning the result.
self.decode_response(src)
.map(|r| r.map(RPCCodedResponse::Success))
} else {
// decode an error
self.decode_error(src)
.map(|r| r.map(|resp| RPCCodedResponse::from_error(response_code, resp)))
}
};
// if the inner decoder was capable of decoding a chunk, we need to reset the current
// response code for the next chunk
if let Ok(Some(_)) = inner_result {
self.current_response_code = None;
}
// return the result
inner_result
}
}

/// Handle errors that we get from decoding an RPC message from the stream.
/// `num_bytes_read` is the number of bytes the snappy decoder has read from the underlying stream.
/// `max_compressed_len` is the maximum compressed size for a given uncompressed size.
Expand Down Expand Up @@ -1030,7 +1075,7 @@ mod tests {
let mut snappy_inbound_codec =
SSZSnappyInboundCodec::<Spec>::new(snappy_protocol_id, max_packet_size, fork_context);

snappy_inbound_codec.encode(message, &mut buf)?;
snappy_inbound_codec.encode_response(message, &mut buf)?;
Ok(buf)
}

Expand Down Expand Up @@ -1075,7 +1120,7 @@ mod tests {
let mut snappy_outbound_codec =
SSZSnappyOutboundCodec::<Spec>::new(snappy_protocol_id, max_packet_size, fork_context);
// decode message just as snappy message
snappy_outbound_codec.decode(message)
snappy_outbound_codec.decode_response(message)
}

/// Encodes the provided protocol message as bytes and tries to decode the encoding bytes.
Expand Down Expand Up @@ -1847,4 +1892,129 @@ mod tests {
RPCError::InvalidData(_)
));
}

#[test]
fn test_decode_status_message() {
let message = hex::decode("0054ff060000734e615070590032000006e71e7b54989925efd6c9cbcb8ceb9b5f71216f5137282bf6a1e3b50f64e42d6c7fb347abe07eb0db8200000005029e2800").unwrap();
let mut buf = BytesMut::new();
buf.extend_from_slice(&message);

let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy);

let fork_context = Arc::new(fork_context(ForkName::Base));

let chain_spec = Spec::default_spec();

let mut snappy_outbound_codec = SSZSnappyOutboundCodec::<Spec>::new(
snappy_protocol_id,
max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize),
fork_context,
);

// remove response code
let mut snappy_buf = buf.clone();
let _ = snappy_buf.split_to(1);

// decode message just as snappy message
let _snappy_decoded_message = snappy_outbound_codec
.decode_response(&mut snappy_buf)
.unwrap();

// decode message as ssz snappy chunk
let _snappy_decoded_chunk = snappy_outbound_codec.decode(&mut buf).unwrap();
}

#[test]
fn test_invalid_length_prefix() {
let mut uvi_codec: Uvi<u128> = Uvi::default();
let mut dst = BytesMut::with_capacity(1024);

// Smallest > 10 byte varint
let len: u128 = 2u128.pow(70);

// Insert length-prefix
uvi_codec.encode(len, &mut dst).unwrap();

let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy);

let fork_context = Arc::new(fork_context(ForkName::Base));

let chain_spec = Spec::default_spec();

let mut snappy_outbound_codec = SSZSnappyOutboundCodec::<Spec>::new(
snappy_protocol_id,
max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize),
fork_context,
);

let snappy_decoded_message = snappy_outbound_codec.decode_response(&mut dst).unwrap_err();

assert_eq!(
snappy_decoded_message,
RPCError::IoError("input bytes exceed maximum".to_string()),
"length-prefix of > 10 bytes is invalid"
);
}

#[test]
fn test_length_limits() {
fn encode_len(len: usize) -> BytesMut {
let mut uvi_codec: Uvi<usize> = Uvi::default();
let mut dst = BytesMut::with_capacity(1024);
uvi_codec.encode(len, &mut dst).unwrap();
dst
}

let protocol_id = ProtocolId::new(SupportedProtocol::BlocksByRangeV1, Encoding::SSZSnappy);

// Response limits
let fork_context = Arc::new(fork_context(ForkName::Base));

let chain_spec = Spec::default_spec();

let max_rpc_size = max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize);
let limit = protocol_id.rpc_response_limits::<Spec>(&fork_context);
let mut max = encode_len(limit.max + 1);
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
protocol_id.clone(),
max_rpc_size,
fork_context.clone(),
);
assert!(matches!(
codec.decode_response(&mut max).unwrap_err(),
RPCError::InvalidData(_)
));

let mut min = encode_len(limit.min - 1);
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
protocol_id.clone(),
max_rpc_size,
fork_context.clone(),
);
assert!(matches!(
codec.decode_response(&mut min).unwrap_err(),
RPCError::InvalidData(_)
));

// Request limits
let limit = protocol_id.rpc_request_limits(&fork_context.spec);
let mut max = encode_len(limit.max + 1);
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
protocol_id.clone(),
max_rpc_size,
fork_context.clone(),
);
assert!(matches!(
codec.decode_response(&mut max).unwrap_err(),
RPCError::InvalidData(_)
));

let mut min = encode_len(limit.min - 1);
let mut codec =
SSZSnappyOutboundCodec::<Spec>::new(protocol_id, max_rpc_size, fork_context);
assert!(matches!(
codec.decode_response(&mut min).unwrap_err(),
RPCError::InvalidData(_)
));
}
}
Loading

0 comments on commit 873748d

Please sign in to comment.