Skip to content

Commit

Permalink
fix at first is validator then is not validator
Browse files Browse the repository at this point in the history
  • Loading branch information
rink1969 committed Mar 14, 2024
1 parent f099d80 commit 3f72e33
Show file tree
Hide file tree
Showing 4 changed files with 341 additions and 254 deletions.
9 changes: 4 additions & 5 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use slog::Logger;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::RwLock;
use tokio::time;

Expand Down Expand Up @@ -102,7 +101,7 @@ impl Network {
local_id: u64,
local_port: u16,
network_port: u16,
msg_tx: mpsc::Sender<RaftMsg>,
msg_tx: flume::Sender<RaftMsg>,
logger: Logger,
) -> Self {
let mut inner = Inner::new(local_id, network_port, msg_tx, logger);
Expand All @@ -117,7 +116,7 @@ pub struct Inner {
// local peer id
local_id: u64,
// Send msg to our local peer
msg_tx: mpsc::Sender<RaftMsg>,
msg_tx: flume::Sender<RaftMsg>,

// raft_id -> origin
// Cached origin to avoid broadcasting. These records maybe outdated,
Expand All @@ -137,7 +136,7 @@ impl Inner {
fn new(
local_id: u64,
network_port: u16,
msg_tx: mpsc::Sender<RaftMsg>,
msg_tx: flume::Sender<RaftMsg>,
logger: Logger,
) -> Self {
let client_options = ClientOptions::new(
Expand Down Expand Up @@ -275,7 +274,7 @@ impl NetworkMsgHandlerService for Network {
let _ = self.client.clone().send_msg(rest_msg).await;
} else {
self.msg_tx
.send(raft_msg)
.send_async(raft_msg)
.await
.map_err(|_| tonic::Status::internal("consensus service is closed"))?;
}
Expand Down
133 changes: 133 additions & 0 deletions src/grpc_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright Rivtower Technologies LLC.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::client::Network;
use crate::config::ConsensusServiceConfig;
use crate::health_check::HealthCheckServer;
use cita_cloud_proto::common::ConsensusConfiguration;
use cita_cloud_proto::common::ProposalWithProof;
use cita_cloud_proto::common::StatusCode;
use cita_cloud_proto::consensus::consensus_service_server::ConsensusService;
use cita_cloud_proto::consensus::consensus_service_server::ConsensusServiceServer;
use cita_cloud_proto::health_check::health_server::HealthServer;
use cita_cloud_proto::network::network_msg_handler_service_server::NetworkMsgHandlerServiceServer;
use cloud_util::metrics::{run_metrics_exporter, MiddlewareLayer};
use slog::info;
use slog::Logger;
use tokio::sync::mpsc;
use tonic::transport::Server;

#[derive(Debug)]
pub struct RaftConsensusService(mpsc::Sender<ConsensusConfiguration>);

#[tonic::async_trait]
impl ConsensusService for RaftConsensusService {
async fn reconfigure(
&self,
request: tonic::Request<ConsensusConfiguration>,
) -> std::result::Result<tonic::Response<StatusCode>, tonic::Status> {
let config = request.into_inner();
let tx = self.0.clone();
// FIXME: it's not safe; but if we wait for it, it may cause a deadlock
tokio::spawn(async move {
let _ = tx.send(config).await;
});
// horrible
Ok(tonic::Response::new(StatusCode { code: 0 }))
}

async fn check_block(
&self,
_request: tonic::Request<ProposalWithProof>,
) -> Result<tonic::Response<StatusCode>, tonic::Status> {
// Reply ok since we assume no byzantine faults.
// horrible
Ok(tonic::Response::new(StatusCode { code: 0 }))
}
}

pub async fn start_grpc_server(
config: ConsensusServiceConfig,
logger: Logger,
rx_signal: flume::Receiver<()>,
controller_tx: mpsc::Sender<ConsensusConfiguration>,
network: Network,
) {
let raft_svc = RaftConsensusService(controller_tx);

let logger_cloned = logger.clone();
let grpc_listen_port = config.grpc_listen_port;
info!(
logger_cloned,
"grpc port of consensus_raft: {}", grpc_listen_port
);

let layer = if config.enable_metrics {
tokio::spawn(async move {
run_metrics_exporter(config.metrics_port).await.unwrap();
});

Some(
tower::ServiceBuilder::new()
.layer(MiddlewareLayer::new(config.metrics_buckets))
.into_inner(),
)
} else {
None
};

info!(logger_cloned, "start consensus_raft grpc server");
if let Some(layer) = layer {
tokio::spawn(async move {
info!(logger_cloned, "metrics on");
let addr = format!("127.0.0.1:{grpc_listen_port}").parse().unwrap();
let res = Server::builder()
.layer(layer)
.add_service(ConsensusServiceServer::new(raft_svc))
.add_service(NetworkMsgHandlerServiceServer::new(network))
.add_service(HealthServer::new(HealthCheckServer {}))
.serve_with_shutdown(
addr,
cloud_util::graceful_shutdown::grpc_serve_listen_term(rx_signal),
)
.await;

if let Err(e) = res {
info!(logger_cloned, "grpc service exit with error: `{:?}`", e);
} else {
info!(logger_cloned, "grpc service exit");
}
});
} else {
tokio::spawn(async move {
info!(logger_cloned, "metrics off");
let addr = format!("127.0.0.1:{grpc_listen_port}").parse().unwrap();
let res = Server::builder()
.add_service(ConsensusServiceServer::new(raft_svc))
.add_service(NetworkMsgHandlerServiceServer::new(network))
.add_service(HealthServer::new(HealthCheckServer {}))
.serve_with_shutdown(
addr,
cloud_util::graceful_shutdown::grpc_serve_listen_term(rx_signal),
)
.await;

if let Err(e) = res {
info!(logger_cloned, "grpc service exit with error: `{:?}`", e);
} else {
info!(logger_cloned, "grpc service exit");
}
});
}
}
136 changes: 131 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,28 @@

mod client;
mod config;
mod grpc_server;
mod health_check;
mod peer;
mod storage;
mod utils;

use crate::client::{Controller, Network};
use crate::grpc_server::start_grpc_server;
use cita_cloud_proto::common::ProposalWithProof;
use cita_cloud_proto::common::{ConsensusConfiguration, Proposal};
use clap::{crate_authors, crate_version, value_parser, Arg, ArgAction, Command};
use config::ConsensusServiceConfig;
use peer::Peer;
use slog::info;
use slog::{info, o, warn};
use sloggers::file::FileLoggerBuilder;
use sloggers::terminal::TerminalLoggerBuilder;
use sloggers::Build as _;
use std::path::Path;
use std::path::PathBuf;
use utils::clap_about;
use tokio::sync::mpsc;
use tokio::time::Duration;
use utils::{addr_to_peer_id, clap_about};

fn main() {
let run_cmd = Command::new("run")
Expand Down Expand Up @@ -112,10 +119,129 @@ fn main() {
let rt = tokio::runtime::Runtime::new().unwrap();

rt.block_on(async move {
let node_addr = {
let s = &config.node_addr;
hex::decode(s.strip_prefix("0x").unwrap_or(s)).expect("decode node_addr failed")
};

let local_id = addr_to_peer_id(&node_addr);

//start grpc server
let rx_signal = cloud_util::graceful_shutdown::graceful_shutdown();
let mut peer = Peer::setup(config, logger.clone(), rx_signal.clone()).await;
peer.run(rx_signal).await;
info!(logger, "raft service exit");
// raft Communicate with controller
let (controller_tx, mut controller_rx) =
mpsc::channel::<ConsensusConfiguration>(1000);

// raft Communicate with network
let (peer_tx, peer_rx) = flume::bounded(64);

// Controller grpc client
let controller = {
let logger = logger.new(o!("tag" => "controller"));
Controller::new(config.controller_port, logger)
};

// Network grpc client
let network = {
let logger = logger.new(o!("tag" => "network"));
Network::setup(
local_id,
config.grpc_listen_port,
config.network_port,
peer_tx,
logger,
)
.await
};

// stop raft when I'm not in the validators list
let (stop_tx, stop_rx) = flume::bounded(1);

// start consensu grpc server
start_grpc_server(
config.clone(),
logger.clone(),
rx_signal.clone(),
controller_tx,
network.clone(),
)
.await;

// wait for exit signal
let stop_tx_cloned = stop_tx.clone();
tokio::spawn(async move {
let _ = rx_signal.recv_async().await;
// stop task before exit
stop_tx_cloned.send(()).unwrap();
});

loop {
tokio::time::sleep(Duration::from_secs(1)).await;

// check exit signal
if stop_rx.try_recv().is_ok() {
break;
}

// Wait for controller's reconfigure.
info!(logger, "waiting for `reconfigure` from controller..");
let mut opt_trigger_config = None;
// read all reconfigure messages in channel
// when start a new node, sync blocks quickly, there will be many reconfigure messages in channel
// we only need proc latest one
while let Ok(config) = controller_rx.try_recv() {
opt_trigger_config = Some(config);
}
if opt_trigger_config.is_none() {
let pwp = ProposalWithProof {
proposal: Some(Proposal {
height: u64::MAX,
data: vec![],
}),
proof: vec![],
};
info!(logger, "ping_controller..");
match controller.commit_block(pwp).await {
Ok(config) => {
opt_trigger_config = Some(config);
}
Err(e) => {
warn!(logger, "commit block failed: {}", e);
}
}
}
if let Some(trigger_config) = opt_trigger_config {
info!(
logger,
"get reconfigure from controller, height is {}", trigger_config.height
);
if trigger_config.validators.contains(&node_addr) {
let controller_cloned = controller.clone();
let network_cloned = network.clone();
let logger_cloned = logger.clone();
let config_cloned = config.clone();
let stop_rx_cloned = stop_rx.clone();
let peer_rx_cloned = peer_rx.clone();
tokio::spawn(async move {
info!(logger_cloned, "raft start");
let mut peer = Peer::setup(
config_cloned,
logger_cloned.clone(),
trigger_config,
network_cloned,
controller_cloned,
peer_rx_cloned,
)
.await;
peer.run(stop_rx_cloned).await;
info!(logger_cloned, "raft exit");
});
} else {
info!(logger, "I'm not in the validators list, stop raft");
stop_tx.send(()).unwrap();
}
}
}
});
}
None => {
Expand Down
Loading

0 comments on commit 3f72e33

Please sign in to comment.