From 470987f3097d100aadb9d48a657030ae9809200f Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 15 May 2024 13:06:57 +0000 Subject: [PATCH 1/2] feat: export metric physical tables first --- src/cmd/src/cli.rs | 4 ++++ src/cmd/src/cli/export.rs | 36 +++++++++++++++++++++++++++++++++--- src/cmd/src/lib.rs | 17 ++++++++++++----- 3 files changed, 49 insertions(+), 8 deletions(-) diff --git a/src/cmd/src/cli.rs b/src/cmd/src/cli.rs index a5f089bfe70..b47293240c9 100644 --- a/src/cmd/src/cli.rs +++ b/src/cmd/src/cli.rs @@ -64,6 +64,10 @@ impl App for Instance { self.tool.do_work().await } + fn wait_signal(&self) -> bool { + false + } + async fn stop(&self) -> Result<()> { Ok(()) } diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 70ca80d11db..9603e3a39ce 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::path::Path; use std::sync::Arc; @@ -176,6 +177,28 @@ impl Export { /// Return a list of [`TableReference`] to be exported. /// Includes all tables under the given `catalog` and `schema` async fn get_table_list(&self, catalog: &str, schema: &str) -> Result> { + // Puts all metric table first + let sql = format!( + "select table_catalog, table_schema, table_name from \ + information_schema.columns where column_name = '__tsid' \ + and table_catalog = \'{catalog}\' and table_schema = \'{schema}\'" + ); + let result = self.sql(&sql).await?; + let Some(records) = result else { + EmptyResultSnafu.fail()? + }; + let mut metric_physical_tables = HashSet::with_capacity(records.len()); + for value in records { + let mut t = Vec::with_capacity(3); + for v in &value { + let serde_json::Value::String(value) = v else { + unreachable!() + }; + t.push(value); + } + metric_physical_tables.insert((t[0].clone(), t[1].clone(), t[2].clone())); + } + // TODO: SQL injection hurts let sql = format!( "select table_catalog, table_schema, table_name from \ @@ -202,10 +225,17 @@ impl Export { }; t.push(value); } - result.push((t[0].clone(), t[1].clone(), t[2].clone())); + let table = (t[0].clone(), t[1].clone(), t[2].clone()); + // Ignores the physical table + if !metric_physical_tables.contains(&table) { + result.push(table); + } } + let mut tables = Vec::with_capacity(metric_physical_tables.len() + result.len()); + tables.extend(metric_physical_tables.into_iter()); + tables.extend(result); - Ok(result) + Ok(tables) } async fn show_create_table(&self, catalog: &str, schema: &str, table: &str) -> Result { @@ -270,7 +300,7 @@ impl Export { }) .count(); - info!("success {success}/{db_count} jobs"); + info!("Success {success}/{db_count} jobs"); Ok(()) } diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index 7a5aa44ff48..715bd9fe3fb 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -41,6 +41,11 @@ pub trait App: Send { async fn start(&mut self) -> error::Result<()>; + /// Waits the quit signal by default. + fn wait_signal(&self) -> bool { + true + } + async fn stop(&self) -> error::Result<()>; } @@ -51,11 +56,13 @@ pub async fn start_app(mut app: Box) -> error::Result<()> { app.start().await?; - if let Err(e) = tokio::signal::ctrl_c().await { - error!("Failed to listen for ctrl-c signal: {}", e); - // It's unusual to fail to listen for ctrl-c signal, maybe there's something unexpected in - // the underlying system. So we stop the app instead of running nonetheless to let people - // investigate the issue. + if app.wait_signal() { + if let Err(e) = tokio::signal::ctrl_c().await { + error!("Failed to listen for ctrl-c signal: {}", e); + // It's unusual to fail to listen for ctrl-c signal, maybe there's something unexpected in + // the underlying system. So we stop the app instead of running nonetheless to let people + // investigate the issue. + } } app.stop().await?; From 1dad8dac52e0dfc8f9dfb2902d22077e4fafc1a1 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 16 May 2024 03:48:16 +0000 Subject: [PATCH 2/2] chore: apply suggestions from CR --- src/cmd/src/cli/export.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 9603e3a39ce..d653889dae6 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -29,6 +29,7 @@ use snafu::{OptionExt, ResultExt}; use tokio::fs::File; use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::sync::Semaphore; +use tokio::time::Instant; use crate::cli::{Instance, Tool}; use crate::error::{ @@ -216,7 +217,7 @@ impl Export { return Ok(vec![]); } - let mut result = Vec::with_capacity(records.len()); + let mut remaining_tables = Vec::with_capacity(records.len()); for value in records { let mut t = Vec::with_capacity(3); for v in &value { @@ -228,12 +229,12 @@ impl Export { let table = (t[0].clone(), t[1].clone(), t[2].clone()); // Ignores the physical table if !metric_physical_tables.contains(&table) { - result.push(table); + remaining_tables.push(table); } } - let mut tables = Vec::with_capacity(metric_physical_tables.len() + result.len()); + let mut tables = Vec::with_capacity(metric_physical_tables.len() + remaining_tables.len()); tables.extend(metric_physical_tables.into_iter()); - tables.extend(result); + tables.extend(remaining_tables); Ok(tables) } @@ -255,6 +256,7 @@ impl Export { } async fn export_create_table(&self) -> Result<()> { + let timer = Instant::now(); let semaphore = Arc::new(Semaphore::new(self.parallelism)); let db_names = self.iter_db_names().await?; let db_count = db_names.len(); @@ -300,12 +302,14 @@ impl Export { }) .count(); - info!("Success {success}/{db_count} jobs"); + let elapsed = timer.elapsed(); + info!("Success {success}/{db_count} jobs, cost: {:?}", elapsed); Ok(()) } async fn export_table_data(&self) -> Result<()> { + let timer = Instant::now(); let semaphore = Arc::new(Semaphore::new(self.parallelism)); let db_names = self.iter_db_names().await?; let db_count = db_names.len(); @@ -381,8 +385,8 @@ impl Export { } }) .count(); - - info!("success {success}/{db_count} jobs"); + let elapsed = timer.elapsed(); + info!("Success {success}/{db_count} jobs, costs: {:?}", elapsed); Ok(()) }