Skip to content

Commit

Permalink
Merge pull request #309 from doyoubi/MigrationStats
Browse files Browse the repository at this point in the history
Add migration stats
  • Loading branch information
doyoubi authored Jul 26, 2021
2 parents 6eb74a5 + 86f77c0 commit 5ddc77d
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 7 deletions.
12 changes: 12 additions & 0 deletions src/migration/manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::scan_task::{RedisScanImportingTask, RedisScanMigratingTask};
use super::stats::MigrationStats;
use super::task::{ImportingTask, MigratingTask, MigrationError, MigrationState, SwitchArg};
use crate::common::cluster::{
ClusterName, MigrationTaskMeta, RangeList, SlotRange, SlotRangeTag, EMPTY_CLUSTER_NAME,
Expand Down Expand Up @@ -56,6 +57,7 @@ where
proxy_sender_factory: Arc<PTSF>,
cmd_task_factory: Arc<CTF>,
future_registry: Arc<TrackedFutureRegistry>,
stats: Arc<MigrationStats>,
}

impl<RCF, TSF, DTSF, PTSF, CTF> MigrationManager<RCF, TSF, DTSF, PTSF, CTF>
Expand All @@ -81,6 +83,7 @@ where
cmd_task_factory: Arc<CTF>,
future_registry: Arc<TrackedFutureRegistry>,
) -> Self {
let stats = Arc::new(MigrationStats::default());
Self {
config,
client_factory,
Expand All @@ -89,6 +92,7 @@ where
proxy_sender_factory,
cmd_task_factory,
future_registry,
stats,
}
}

Expand Down Expand Up @@ -117,6 +121,7 @@ where
self.proxy_sender_factory.clone(),
self.cmd_task_factory.clone(),
blocking_ctrl_factory,
self.stats.clone(),
)
}

Expand Down Expand Up @@ -195,6 +200,10 @@ where
}
info!("spawn finished");
}

pub fn get_stats(&self) -> Vec<(String, usize)> {
self.stats.to_lines_str()
}
}

pub struct MigrationMap<T>
Expand Down Expand Up @@ -366,6 +375,7 @@ where
proxy_sender_factory: Arc<PTSF>,
cmd_task_factory: Arc<CTF>,
blocking_ctrl_factory: Arc<BCF>,
stats: Arc<MigrationStats>,
) -> (Self, Vec<NewTask<T>>)
where
RCF: RedisClientFactory,
Expand Down Expand Up @@ -439,6 +449,7 @@ where
meta.clone(),
client_factory.clone(),
ctrl,
stats.clone(),
));
new_tasks.push(NewTask {
cluster_name: cluster_name.clone(),
Expand Down Expand Up @@ -474,6 +485,7 @@ where
dst_sender_factory.clone(),
proxy_sender_factory.clone(),
cmd_task_factory.clone(),
stats.clone(),
));
new_tasks.push(NewTask {
cluster_name: cluster_name.clone(),
Expand Down
1 change: 1 addition & 0 deletions src/migration/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod manager;
pub mod scan_migration;
mod scan_task;
pub mod stats;
pub mod task;

pub use self::scan_task::MAX_REDIRECTIONS;
53 changes: 51 additions & 2 deletions src/migration/scan_migration.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::stats::MigrationStats;
use super::task::{ScanResponse, SlotRangeArray};
use crate::common::cluster::SlotRange;
use crate::common::config::AtomicMigrationConfig;
Expand All @@ -21,9 +22,9 @@ use std::cmp::min;
use std::collections::HashSet;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

pub const PTTL_NO_EXPIRE: &[u8] = b"-1";
pub const PTTL_KEY_NOT_FOUND: &[u8] = b"-2";
Expand Down Expand Up @@ -73,6 +74,8 @@ pub struct ScanMigrationTask<T: CmdTask, F: RedisClientFactory> {
slot_mutex: Arc<SlotMutex>,
src_client_pool: Pool<F::Client>,
dst_client_pool: Pool<F::Client>,
stats: Arc<MigrationStats>,
stats_conn_last_update_time: AtomicU64,
}

impl<T: CmdTask, F: RedisClientFactory> ScanMigrationTask<T, F> {
Expand All @@ -82,6 +85,7 @@ impl<T: CmdTask, F: RedisClientFactory> ScanMigrationTask<T, F> {
slot_range: SlotRange,
client_factory: Arc<F>,
config: Arc<AtomicMigrationConfig>,
stats: Arc<MigrationStats>,
) -> Self {
let ranges = slot_range.to_range_list();
let slot_ranges = SlotRangeArray::new(ranges);
Expand All @@ -96,6 +100,7 @@ impl<T: CmdTask, F: RedisClientFactory> ScanMigrationTask<T, F> {
receiver,
config,
slot_mutex.clone(),
stats.clone(),
);

const POOL_SIZE: usize = 1024;
Expand All @@ -110,6 +115,8 @@ impl<T: CmdTask, F: RedisClientFactory> ScanMigrationTask<T, F> {
slot_mutex,
src_client_pool: Pool::new(POOL_SIZE),
dst_client_pool: Pool::new(POOL_SIZE),
stats,
stats_conn_last_update_time: AtomicU64::new(0),
}
}

Expand All @@ -125,6 +132,10 @@ impl<T: CmdTask, F: RedisClientFactory> ScanMigrationTask<T, F> {
let guard = match self.slot_mutex.lock(lock_slot) {
Some(guard) => guard,
None => {
self.stats
.migrating_active_sync_lock_failed
.fetch_add(1, Ordering::Relaxed);

// The slot is locked. We put it into a slow path.
if let Err(err) = self.sync_tasks_sender.unbounded_send(task) {
let task = err.into_inner();
Expand All @@ -135,6 +146,9 @@ impl<T: CmdTask, F: RedisClientFactory> ScanMigrationTask<T, F> {
return;
}
};
self.stats
.migrating_active_sync_lock_success
.fetch_add(1, Ordering::Relaxed);

let mut src_client = {
match self.src_client_pool.get_raw() {
Expand Down Expand Up @@ -223,6 +237,30 @@ impl<T: CmdTask, F: RedisClientFactory> ScanMigrationTask<T, F> {
response::OK_REPLY.to_string().into_bytes(),
)));
drop(guard);

// `pool.len()` may be expensive. Just update it every second.
let last = self.stats_conn_last_update_time.load(Ordering::Relaxed);
let now = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(t) => t.as_secs(),
Err(err) => {
error!(
"failed to get system time for updating for migration stats: {}",
err
);
return;
}
};
const UPDATE_INTERVAL_SECS: u64 = 1;
if now > last + UPDATE_INTERVAL_SECS {
self.stats
.migrating_src_redis_conn
.store(self.src_client_pool.len(), Ordering::Relaxed);
self.stats
.migrating_dst_redis_conn
.store(self.dst_client_pool.len(), Ordering::Relaxed);
self.stats_conn_last_update_time
.store(now, Ordering::Relaxed);
}
}

pub fn start(&self) -> Option<MgrFut> {
Expand Down Expand Up @@ -262,6 +300,7 @@ impl<T: CmdTask, F: RedisClientFactory> ScanMigrationTask<T, F> {
sync_tasks_receiver: UnboundedReceiver<T>,
config: Arc<AtomicMigrationConfig>,
slot_mutex: Arc<SlotMutex>,
stats: Arc<MigrationStats>,
) -> (MgrFut, FutureAutoStopHandle) {
let interval = min(
Duration::from_micros(config.get_scan_interval()),
Expand All @@ -284,6 +323,7 @@ impl<T: CmdTask, F: RedisClientFactory> ScanMigrationTask<T, F> {
sync_tasks_receiver,
config,
slot_mutex,
stats,
);

let (send, handle) = new_auto_drop_future(send);
Expand All @@ -302,6 +342,7 @@ impl<T: CmdTask, F: RedisClientFactory> ScanMigrationTask<T, F> {
mut sync_tasks_receiver: UnboundedReceiver<T>,
config: Arc<AtomicMigrationConfig>,
slot_mutex: Arc<SlotMutex>,
stats: Arc<MigrationStats>,
) -> Result<(), MigrationError> {
const SLEEP_BATCH_TIMES: u64 = 10;

Expand Down Expand Up @@ -398,6 +439,7 @@ impl<T: CmdTask, F: RedisClientFactory> ScanMigrationTask<T, F> {
client_factory.clone(),
scan_count,
&slot_mutex,
&stats,
)
.await
}
Expand Down Expand Up @@ -439,6 +481,7 @@ impl<T: CmdTask, F: RedisClientFactory> ScanMigrationTask<T, F> {
client_factory: Arc<F>,
scan_count: u64,
slot_mutex: &SlotMutex,
stats: &MigrationStats,
) -> Result<(u64, Option<F::Client>), RedisClientError> {
let ScanResponse { next_index, keys } =
Self::scan_keys(src_client, index, scan_count).await?;
Expand All @@ -463,6 +506,12 @@ impl<T: CmdTask, F: RedisClientFactory> ScanMigrationTask<T, F> {
}
}
let need_retry = locked_keys.len() < keys.len();
stats
.migrating_scan_lock_success
.fetch_add(locked_keys.len(), Ordering::Relaxed);
stats
.migrating_scan_lock_failed
.fetch_add(keys.len() - locked_keys.len(), Ordering::Relaxed);

let entries = Self::produce_entries(locked_keys, src_client).await?;
if !entries.is_empty() {
Expand Down
5 changes: 5 additions & 0 deletions src/migration/scan_task.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::scan_migration::ScanMigrationTask;
use super::stats::MigrationStats;
use super::task::{
AtomicMigrationState, ImportingTask, MgrSubCmd, MigratingTask, MigrationError, MigrationState,
SwitchArg,
Expand Down Expand Up @@ -76,6 +77,7 @@ where
meta: MigrationMeta,
client_factory: Arc<RCF>,
blocking_ctrl: Arc<BC>,
stats: Arc<MigrationStats>,
) -> Self {
let (stop_signal_sender, stop_signal_receiver) = oneshot::channel();
let task = ScanMigrationTask::new(
Expand All @@ -84,6 +86,7 @@ where
slot_range.clone(),
client_factory.clone(),
mgr_config.clone(),
stats,
);
let range_map = RangeMap::from(slot_range.get_range_list());
let active_redirection = config.active_redirection;
Expand Down Expand Up @@ -535,6 +538,7 @@ where
dst_sender_factory: Arc<DTSF>,
proxy_sender_factory: Arc<PTSF>,
cmd_task_factory: Arc<CTF>,
stats: Arc<MigrationStats>,
) -> Self {
let src_sender = sender_factory.create(meta.src_node_address.clone());
let dst_sender = dst_sender_factory.create(meta.dst_node_address.clone());
Expand All @@ -544,6 +548,7 @@ where
dst_sender,
src_proxy_sender,
cmd_task_factory.clone(),
stats,
);
let (stop_signal_sender, stop_signal_receiver) = oneshot::channel();
let range_map = RangeMap::from(slot_range.get_range_list());
Expand Down
54 changes: 54 additions & 0 deletions src/migration/stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::sync::atomic::{AtomicUsize, Ordering};

macro_rules! atomic_usize_stats {
(pub struct $struct_name:ident {
$(pub $field_name:ident: AtomicUsize,)*
}) => {
pub struct $struct_name {
$(pub $field_name: AtomicUsize,)*
}

impl Default for $struct_name {
fn default() -> Self {
Self {
$($field_name: AtomicUsize::new(0),)*
}
}
}

impl $struct_name {
pub fn to_lines_str(&self) -> Vec<(String, usize)> {
vec![
$((stringify!($field_name).to_string(), self.$field_name.load(Ordering::Relaxed))),*
]
}
}
}
}

atomic_usize_stats! {
pub struct MigrationStats {
pub migrating_src_redis_conn: AtomicUsize,
pub migrating_dst_redis_conn: AtomicUsize,
pub migrating_active_sync_lock_success: AtomicUsize,
pub migrating_active_sync_lock_failed: AtomicUsize,
pub migrating_scan_lock_success: AtomicUsize,
pub migrating_scan_lock_failed: AtomicUsize,
pub importing_blocking_migration_commands: AtomicUsize,
pub importing_non_blocking_migration_commands: AtomicUsize,
pub importing_umsync_lock_success: AtomicUsize,
pub importing_umsync_lock_failed: AtomicUsize,
pub importing_umsync_lock_failed_again: AtomicUsize,
pub importing_umsync_failed: AtomicUsize,
pub importing_dst_key_existed: AtomicUsize,
pub importing_dst_key_not_existed: AtomicUsize,
pub importing_lock_success: AtomicUsize,
pub importing_lock_failed: AtomicUsize,
pub importing_resend_exists: AtomicUsize,
pub importing_resend_exists_failed: AtomicUsize,
pub importing_lock_loop_retry: AtomicUsize,
pub importing_src_key_existed: AtomicUsize,
pub importing_src_key_not_existed: AtomicUsize,
pub importing_src_failed: AtomicUsize,
}
}
8 changes: 8 additions & 0 deletions src/protocol/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,14 @@ impl<T> Pool<T> {
pub fn get_reclaim_sender(&self) -> &Arc<crossbeam_channel::Sender<T>> {
&self.sender
}

pub fn len(&self) -> usize {
self.receiver.len()
}

pub fn is_empty(&self) -> bool {
self.receiver.is_empty()
}
}

#[derive(Debug)]
Expand Down
12 changes: 12 additions & 0 deletions src/proxy/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ where
self.handle_umctl_slowlog(cmd_ctx);
} else if sub_cmd.eq("DEBUG") {
self.handle_umctl_debug(cmd_ctx);
} else if sub_cmd.eq("STATS") {
self.handle_umctl_stats(cmd_ctx);
} else if sub_cmd.eq("GETEPOCH") {
self.handle_umctl_get_epoch(cmd_ctx);
} else if sub_cmd.eq("READY") {
Expand Down Expand Up @@ -454,6 +456,16 @@ where
}
}

fn handle_umctl_stats(&self, cmd_ctx: CmdCtx) {
let stats = self.manager.get_stats();
cmd_ctx.set_resp_result(Ok(Resp::Arr(Array::Arr(
stats
.into_iter()
.map(|s| Resp::Bulk(BulkStr::Str(s.into_bytes())))
.collect(),
))));
}

fn handle_umctl_get_epoch(&self, cmd_ctx: CmdCtx) {
let epoch = self.manager.get_epoch();
cmd_ctx.set_resp_result(Ok(Resp::Integer(epoch.to_string().into_bytes())))
Expand Down
8 changes: 8 additions & 0 deletions src/proxy/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,14 @@ impl<F: RedisClientFactory, C: ConnFactory<Pkt = RespPacket>> MetaManager<F, C>
]))
}

pub fn get_stats(&self) -> Vec<String> {
let mut lines = vec!["# Migration".to_string()];
for (k, v) in self.migration_manager.get_stats().into_iter() {
lines.push(format!("{}: {}", k, v));
}
lines
}

pub fn handle_switch(
&self,
switch_arg: SwitchArg,
Expand Down
Loading

0 comments on commit 5ddc77d

Please sign in to comment.