Skip to content

Commit

Permalink
refactor after review. Instead of using two separate signals, use a c…
Browse files Browse the repository at this point in the history
…ommon struct
  • Loading branch information
akichidis committed Jan 4, 2024
1 parent 76bdfde commit 720e394
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 101 deletions.
23 changes: 7 additions & 16 deletions mysticeti-core/src/core_thread/simulated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,25 @@ use std::collections::HashSet;
use crate::block_handler::BlockHandler;
use crate::commit_observer::CommitObserver;
use crate::data::Data;
use crate::syncer::{RoundAdvancedSignal, Syncer, SyncerSignals};
use crate::syncer::{Syncer, SyncerSignals};
use crate::types::{AuthoritySet, BlockReference};
use crate::types::{RoundNumber, StatementBlock};
use parking_lot::Mutex;

pub struct CoreThreadDispatcher<
H: BlockHandler,
S: SyncerSignals,
R: RoundAdvancedSignal,
C: CommitObserver,
> {
syncer: Mutex<Syncer<H, S, R, C>>,
pub struct CoreThreadDispatcher<H: BlockHandler, S: SyncerSignals, C: CommitObserver> {
syncer: Mutex<Syncer<H, S, C>>,
}

impl<
H: BlockHandler + 'static,
S: SyncerSignals + 'static,
R: RoundAdvancedSignal + 'static,
C: CommitObserver + 'static,
> CoreThreadDispatcher<H, S, R, C>
impl<H: BlockHandler + 'static, S: SyncerSignals + 'static, C: CommitObserver + 'static>
CoreThreadDispatcher<H, S, C>
{
pub fn start(syncer: Syncer<H, S, R, C>) -> Self {
pub fn start(syncer: Syncer<H, S, C>) -> Self {
Self {
syncer: Mutex::new(syncer),
}
}

pub fn stop(self) -> Syncer<H, S, R, C> {
pub fn stop(self) -> Syncer<H, S, C> {
self.syncer.into_inner()
}

Expand Down
34 changes: 11 additions & 23 deletions mysticeti-core/src/core_thread/spawned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,22 @@
use crate::block_handler::BlockHandler;
use crate::commit_observer::CommitObserver;
use crate::metrics::{Metrics, UtilizationTimerExt};
use crate::syncer::{RoundAdvancedSignal, Syncer, SyncerSignals};
use crate::syncer::{Syncer, SyncerSignals};
use crate::types::AuthoritySet;
use crate::types::{RoundNumber, StatementBlock};
use crate::{data::Data, types::BlockReference};
use std::sync::Arc;
use std::{collections::HashSet, thread};
use tokio::sync::{mpsc, oneshot};

pub struct CoreThreadDispatcher<
H: BlockHandler,
S: SyncerSignals,
R: RoundAdvancedSignal,
C: CommitObserver,
> {
pub struct CoreThreadDispatcher<H: BlockHandler, S: SyncerSignals, C: CommitObserver> {
sender: mpsc::Sender<CoreThreadCommand>,
join_handle: thread::JoinHandle<Syncer<H, S, R, C>>,
join_handle: thread::JoinHandle<Syncer<H, S, C>>,
metrics: Arc<Metrics>,
}

pub struct CoreThread<H: BlockHandler, S: SyncerSignals, R: RoundAdvancedSignal, C: CommitObserver>
{
syncer: Syncer<H, S, R, C>,
pub struct CoreThread<H: BlockHandler, S: SyncerSignals, C: CommitObserver> {
syncer: Syncer<H, S, C>,
receiver: mpsc::Receiver<CoreThreadCommand>,
}

Expand All @@ -41,14 +35,10 @@ enum CoreThreadCommand {
),
}

impl<
H: BlockHandler + 'static,
S: SyncerSignals + 'static,
R: RoundAdvancedSignal + 'static,
C: CommitObserver + 'static,
> CoreThreadDispatcher<H, S, R, C>
impl<H: BlockHandler + 'static, S: SyncerSignals + 'static, C: CommitObserver + 'static>
CoreThreadDispatcher<H, S, C>
{
pub fn start(syncer: Syncer<H, S, R, C>) -> Self {
pub fn start(syncer: Syncer<H, S, C>) -> Self {
let (sender, receiver) = mpsc::channel(32);
let metrics = syncer.core().metrics.clone();
let core_thread = CoreThread { syncer, receiver };
Expand All @@ -63,7 +53,7 @@ impl<
}
}

pub fn stop(self) -> Syncer<H, S, R, C> {
pub fn stop(self) -> Syncer<H, S, C> {
drop(self.sender);
self.join_handle.join().unwrap()
}
Expand Down Expand Up @@ -119,10 +109,8 @@ impl<
}
}

impl<H: BlockHandler, S: SyncerSignals, R: RoundAdvancedSignal, C: CommitObserver>
CoreThread<H, S, R, C>
{
pub fn run(mut self) -> Syncer<H, S, R, C> {
impl<H: BlockHandler, S: SyncerSignals, C: CommitObserver> CoreThread<H, S, C> {
pub fn run(mut self) -> Syncer<H, S, C> {
tracing::info!("Started core thread with tid {}", gettid::gettid());
let metrics = self.syncer.core().metrics.clone();
while let Some(command) = self.receiver.blocking_recv() {
Expand Down
30 changes: 10 additions & 20 deletions mysticeti-core/src/net_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::network::{Connection, Network, NetworkMessage};
use crate::runtime::Handle;
use crate::runtime::{self, timestamp_utc};
use crate::runtime::{JoinError, JoinHandle};
use crate::syncer::{Syncer, SyncerSignals};
use crate::syncer::{Signals, Syncer};
use crate::types::{AuthorityIndex, StatementBlock};
use crate::types::{AuthoritySet, RoundNumber};
use crate::wal::WalSyncer;
Expand All @@ -23,7 +23,7 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use tokio::select;
use tokio::sync::watch::{Receiver, Sender};
use tokio::sync::watch::Receiver;
use tokio::sync::{mpsc, oneshot, Notify};

/// The maximum number of blocks that can be requested in a single message.
Expand Down Expand Up @@ -67,7 +67,7 @@ pub struct NetworkSyncer<H: BlockHandler, C: CommitObserver> {
}

pub struct NetworkSyncerInner<H: BlockHandler, C: CommitObserver> {
pub syncer: CoreThreadDispatcher<H, Arc<Notify>, Sender<RoundNumber>, C>,
pub syncer: CoreThreadDispatcher<H, Signals, C>,
pub block_store: BlockStore,
pub notify: Arc<Notify>,
committee: Arc<Committee>,
Expand All @@ -93,20 +93,20 @@ impl<H: BlockHandler + 'static, C: CommitObserver + 'static> NetworkSyncer<H, C>
) -> Self {
let authority_index = core.authority();
let handle = Handle::current();
let notify = Arc::new(Notify::new());
let committee = core.committee().clone();
let wal_syncer = core.wal_syncer();
let block_store = core.block_store().clone();
let epoch_closing_time = core.epoch_closing_time();
let notify = Arc::new(Notify::new());
let (round_advanced_sender, round_advanced_receiver) =
tokio::sync::watch::channel(0 as RoundNumber);
let signals = Signals::new(notify.clone(), round_advanced_sender);
let mut syncer = Syncer::new(
core,
commit_period,
notify.clone(),
signals,
commit_observer,
metrics.clone(),
round_advanced_sender,
);
syncer.force_new_block(1, Default::default());
let syncer = CoreThreadDispatcher::start(syncer);
Expand All @@ -117,9 +117,9 @@ impl<H: BlockHandler + 'static, C: CommitObserver + 'static> NetworkSyncer<H, C>
let connected_authorities =
Arc::new(Mutex::new(ConnectedAuthorities::new(metrics.clone())));
let inner = Arc::new(NetworkSyncerInner {
notify,
syncer,
block_store,
notify,
committee: committee.clone(),
stop: stop_sender.clone(),
epoch_close_signal: epoch_sender.clone(),
Expand Down Expand Up @@ -154,7 +154,7 @@ impl<H: BlockHandler + 'static, C: CommitObserver + 'static> NetworkSyncer<H, C>
}
}

pub async fn shutdown(self) -> Syncer<H, Arc<Notify>, Sender<RoundNumber>, C> {
pub async fn shutdown(self) -> Syncer<H, Signals, C> {
drop(self.stop);
// todo - wait for network shutdown as well
self.main_task.await.ok();
Expand Down Expand Up @@ -477,12 +477,6 @@ impl<H: BlockHandler + 'static, C: CommitObserver + 'static> NetworkSyncerInner<
}
}

impl SyncerSignals for Arc<Notify> {
fn new_block_ready(&mut self) {
self.notify_waiters();
}
}

pub struct AsyncWalSyncer {
wal_syncer: WalSyncer,
stop: mpsc::Sender<()>,
Expand Down Expand Up @@ -581,17 +575,13 @@ mod sim_tests {
use crate::future_simulator::SimulatedExecutorState;
use crate::runtime;
use crate::simulator_tracing::setup_simulator_tracing;
use crate::syncer::Syncer;
use crate::syncer::{Signals, Syncer};
use crate::test_util::{
check_commits, print_stats, rng_at_seed, simulated_network_syncers,
simulated_network_syncers_with_epoch_duration,
};
use crate::types::RoundNumber;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch::Sender;
use tokio::sync::Notify;

#[test]
fn test_epoch_close() {
Expand All @@ -616,7 +606,7 @@ mod sim_tests {

async fn wait_for_epoch_to_close(
network_syncers: Vec<NetworkSyncer<TestBlockHandler, TestCommitObserver>>,
) -> Vec<Syncer<TestBlockHandler, Arc<Notify>, Sender<RoundNumber>, TestCommitObserver>> {
) -> Vec<Syncer<TestBlockHandler, Signals, TestCommitObserver>> {
let mut any_closed = false;
while !any_closed {
for net_sync in network_syncers.iter() {
Expand Down
56 changes: 28 additions & 28 deletions mysticeti-core/src/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,47 +10,54 @@ use crate::types::{AuthoritySet, RoundNumber, StatementBlock};
use crate::{block_handler::BlockHandler, metrics::Metrics};
use std::sync::Arc;
use tokio::sync::watch::Sender;
use tokio::sync::Notify;

pub struct Syncer<H: BlockHandler, S: SyncerSignals, R: RoundAdvancedSignal, C: CommitObserver> {
pub struct Syncer<H: BlockHandler, S: SyncerSignals, C: CommitObserver> {
core: Core<H>,
force_new_block: bool,
commit_period: u64,
signals: S,
commit_observer: C,
metrics: Arc<Metrics>,
round_advanced_signal: R,
}

pub struct Signals {
pub round_advanced_sender: Sender<RoundNumber>,
pub block_ready: Arc<Notify>,
}

impl Signals {
pub fn new(block_ready: Arc<Notify>, round_advanced_sender: Sender<RoundNumber>) -> Self {
Self {
round_advanced_sender,
block_ready,
}
}
}

pub trait SyncerSignals: Send + Sync {
fn new_block_ready(&mut self);
}

pub trait RoundAdvancedSignal: Send + Sync {
fn new_round(&mut self, round_number: RoundNumber);
}

impl RoundAdvancedSignal for Sender<RoundNumber> {
fn new_round(&mut self, round_number: RoundNumber) {
self.send(round_number).ok();
impl SyncerSignals for Signals {
fn new_block_ready(&mut self) {
self.block_ready.notify_waiters();
}
}

impl RoundAdvancedSignal for Option<RoundNumber> {
fn new_round(&mut self, round_number: RoundNumber) {
*self = Some(round_number)
self.round_advanced_sender.send(round_number).ok();
}
}

impl<H: BlockHandler, S: SyncerSignals, R: RoundAdvancedSignal, C: CommitObserver>
Syncer<H, S, R, C>
{
impl<H: BlockHandler, S: SyncerSignals, C: CommitObserver> Syncer<H, S, C> {
pub fn new(
core: Core<H>,
commit_period: u64,
signals: S,
commit_observer: C,
metrics: Arc<Metrics>,
round_advanced_signal: R,
) -> Self {
Self {
core,
Expand All @@ -59,7 +66,6 @@ impl<H: BlockHandler, S: SyncerSignals, R: RoundAdvancedSignal, C: CommitObserve
signals,
commit_observer,
metrics,
round_advanced_signal,
}
}

Expand All @@ -78,7 +84,7 @@ impl<H: BlockHandler, S: SyncerSignals, R: RoundAdvancedSignal, C: CommitObserve

// we got a new quorum of blocks. Let leader timeout task know about it.
if new_round > previous_round {
self.round_advanced_signal.new_round(new_round);
self.signals.new_round(new_round);
}

self.try_new_block(connected_authorities);
Expand Down Expand Up @@ -172,20 +178,14 @@ impl<H: BlockHandler, S: SyncerSignals, R: RoundAdvancedSignal, C: CommitObserve
}
}

impl SyncerSignals for bool {
fn new_block_ready(&mut self) {
*self = true;
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::block_handler::TestBlockHandler;
use crate::commit_observer::TestCommitObserver;
use crate::data::Data;
use crate::simulator::{Scheduler, Simulator, SimulatorState};
use crate::test_util::{check_commits, committee_and_syncers, rng_at_seed};
use crate::test_util::{check_commits, committee_and_syncers, rng_at_seed, SyncerSignalsMock};
use rand::Rng;
use std::ops::Range;
use std::time::Duration;
Expand All @@ -198,7 +198,7 @@ mod tests {
DeliverBlock(Data<StatementBlock>),
}

impl SimulatorState for Syncer<TestBlockHandler, bool, Option<RoundNumber>, TestCommitObserver> {
impl SimulatorState for Syncer<TestBlockHandler, SyncerSignalsMock, TestCommitObserver> {
type Event = SyncerEvent;

fn handle_event(&mut self, event: Self::Event) {
Expand All @@ -215,8 +215,8 @@ mod tests {
}

// New quorum has been received and round has advanced
if let Some(new_round) = self.round_advanced_signal {
self.round_advanced_signal = None;
if let Some(new_round) = self.signals.new_round {
self.signals.new_round = None;
Scheduler::schedule_event(
ROUND_TIMEOUT,
self.scheduler_state_id(),
Expand All @@ -225,8 +225,8 @@ mod tests {
}

// New block was created
if self.signals {
self.signals = false;
if self.signals.new_block_ready {
self.signals.new_block_ready = false;
let last_block = self.core.last_own_block().clone();
for authority in self.core.committee().authorities() {
if authority == self.core.authority() {
Expand Down
Loading

0 comments on commit 720e394

Please sign in to comment.