diff --git a/crates/sequencing/papyrus_consensus/src/stream_handler.rs b/crates/sequencing/papyrus_consensus/src/stream_handler.rs index e241d784de..951eb74137 100644 --- a/crates/sequencing/papyrus_consensus/src/stream_handler.rs +++ b/crates/sequencing/papyrus_consensus/src/stream_handler.rs @@ -183,7 +183,7 @@ impl> + TryFrom, Error = ProtobufConversi } // Returns true if the receiver for this stream is dropped. - fn inbound_send(data: &mut StreamData, message: StreamMessage) -> bool { + fn inbound_send(&mut self, data: &mut StreamData, message: StreamMessage) -> bool { // TODO(guyn): reconsider the "expect" here. let sender = &mut data.sender; if let StreamMessageBody::Content(content) = message.message { @@ -255,24 +255,39 @@ impl> + TryFrom, Error = ProtobufConversi } }; - let peer_id = metadata.originator_id; + let peer_id = metadata.originator_id.clone(); let stream_id = message.stream_id; let key = (peer_id, stream_id); - let message_id = message.message_id; let data = match self.inbound_stream_data.entry(key.clone()) { - Occupied(entry) => entry.into_mut(), - Vacant(e) => { + // If data exists, remove it (it will be returned to hash map at end of function). + Occupied(entry) => entry.remove_entry().1, + Vacant(_) => { // If we received a message for a stream that we have not seen before, // we need to create a new receiver for it. let (sender, receiver) = mpsc::channel(CHANNEL_BUFFER_LENGTH); // TODO(guyn): reconsider the "expect" here. self.inbound_channel_sender.try_send(receiver).expect("Send should succeed"); - - let data = StreamData::new(sender); - e.insert(data) + StreamData::new(sender) } }; + if let Some(data) = self.handle_message_inner(message, metadata, data) { + self.inbound_stream_data.insert(key, data); + } + } + + /// Returns the StreamData struct if it should be put back into the hash map. None if the data + /// should be dropped. + fn handle_message_inner( + &mut self, + message: StreamMessage, + metadata: BroadcastedMessageMetadata, + mut data: StreamData, + ) -> Option> { + let peer_id = metadata.originator_id; + let stream_id = message.stream_id; + let key = (peer_id, stream_id); + let message_id = message.message_id; if data.max_message_id_received < message_id { data.max_message_id_received = message_id; @@ -288,9 +303,11 @@ impl> + TryFrom, Error = ProtobufConversi warn!( "Received fin message with id that is smaller than a previous message! \ key: {:?}, fin_message_id: {}, max_message_id_received: {}", - key, message_id, data.max_message_id_received + key.clone(), + message_id, + data.max_message_id_received ); - return; + return None; } } } @@ -300,41 +317,44 @@ impl> + TryFrom, Error = ProtobufConversi warn!( "Received message with id that is bigger than the id of the fin message! key: \ {:?}, message_id: {}, fin_message_id: {}", - key, + key.clone(), message_id, data.fin_message_id.unwrap_or(u64::MAX) ); - return; + return None; } // This means we can just send the message without buffering it. match message_id.cmp(&data.next_message_id) { Ordering::Equal => { - let mut receiver_dropped = Self::inbound_send(data, message); + let mut receiver_dropped = self.inbound_send(&mut data, message); if !receiver_dropped { - receiver_dropped = Self::process_buffer(data); + receiver_dropped = self.process_buffer(&mut data); } if data.message_buffer.is_empty() && data.fin_message_id.is_some() || receiver_dropped { data.sender.close_channel(); - self.inbound_stream_data.remove(&key); + return None; } } Ordering::Greater => { - Self::store(data, key, message); + Self::store(&mut data, key.clone(), message); } Ordering::Less => { // TODO(guyn): replace warnings with more graceful error handling warn!( "Received message with id that is smaller than the next message expected! \ key: {:?}, message_id: {}, next_message_id: {}", - key, message_id, data.next_message_id + key.clone(), + message_id, + data.next_message_id ); - return; + return None; } } + Some(data) } // Store an inbound message in the buffer. @@ -358,9 +378,9 @@ impl> + TryFrom, Error = ProtobufConversi // Tries to drain as many messages as possible from the buffer (in order), // DOES NOT guarantee that the buffer will be empty after calling this function. // Returns true if the receiver for this stream is dropped. - fn process_buffer(data: &mut StreamData) -> bool { + fn process_buffer(&mut self, data: &mut StreamData) -> bool { while let Some(message) = data.message_buffer.remove(&data.next_message_id) { - if Self::inbound_send(data, message) { + if self.inbound_send(data, message) { return true; } }