From 494ce6572991d9efb2b4644ee8e0967e32fe339f Mon Sep 17 00:00:00 2001 From: maco Date: Tue, 14 May 2024 09:57:30 +0800 Subject: [PATCH] feat: limiting the size of query results to Dashboard (#3901) * feat: limiting the size of query results to Dashboard * optimize code * fix by cr * fix integration tests error * remove RequestSource::parse * refactor: sql query params * fix: unit test --------- Co-authored-by: tison --- src/servers/src/http.rs | 34 ++++++++++ src/servers/src/http/csv_result.rs | 6 ++ src/servers/src/http/greptime_result_v1.rs | 6 ++ src/servers/src/http/handler.rs | 8 ++- src/servers/src/http/table_result.rs | 6 ++ src/servers/tests/http/http_handler_test.rs | 74 +++++++++++++++++---- src/table/src/test_util/memtable.rs | 6 +- tests-integration/tests/http.rs | 18 ++--- 8 files changed, 132 insertions(+), 26 deletions(-) diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 4ba4e56b088c..a06b8c6f4b20 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -190,6 +190,10 @@ impl From for OutputSchema { pub struct HttpRecordsOutput { schema: OutputSchema, rows: Vec>, + // total_rows is equal to rows.len() in most cases, + // the Dashboard query result may be truncated, so we need to return the total_rows. + #[serde(default)] + total_rows: usize, // plan level execution metrics #[serde(skip_serializing_if = "HashMap::is_empty")] @@ -224,6 +228,7 @@ impl HttpRecordsOutput { Ok(HttpRecordsOutput { schema: OutputSchema::from(schema), rows: vec![], + total_rows: 0, metrics: Default::default(), }) } else { @@ -244,6 +249,7 @@ impl HttpRecordsOutput { Ok(HttpRecordsOutput { schema: OutputSchema::from(schema), + total_rows: rows.len(), rows, metrics: Default::default(), }) @@ -357,6 +363,34 @@ impl HttpResponse { HttpResponse::Error(resp) => resp.with_execution_time(execution_time).into(), } } + + pub fn with_limit(self, limit: usize) -> Self { + match self { + HttpResponse::Csv(resp) => resp.with_limit(limit).into(), + HttpResponse::Table(resp) => resp.with_limit(limit).into(), + HttpResponse::GreptimedbV1(resp) => resp.with_limit(limit).into(), + _ => self, + } + } +} + +pub fn process_with_limit( + mut outputs: Vec, + limit: usize, +) -> Vec { + outputs + .drain(..) + .map(|data| match data { + GreptimeQueryOutput::Records(mut records) => { + if records.rows.len() > limit { + records.rows.truncate(limit); + records.total_rows = limit; + } + GreptimeQueryOutput::Records(records) + } + _ => data, + }) + .collect() } impl IntoResponse for HttpResponse { diff --git a/src/servers/src/http/csv_result.rs b/src/servers/src/http/csv_result.rs index ad89ac21b7e4..d6b512653bde 100644 --- a/src/servers/src/http/csv_result.rs +++ b/src/servers/src/http/csv_result.rs @@ -23,6 +23,7 @@ use mime_guess::mime; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use super::process_with_limit; use crate::http::error_result::ErrorResponse; use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat}; @@ -65,6 +66,11 @@ impl CsvResponse { pub fn execution_time_ms(&self) -> u64 { self.execution_time_ms } + + pub fn with_limit(mut self, limit: usize) -> Self { + self.output = process_with_limit(self.output, limit); + self + } } impl IntoResponse for CsvResponse { diff --git a/src/servers/src/http/greptime_result_v1.rs b/src/servers/src/http/greptime_result_v1.rs index ee87a5dca639..9cb2924ba689 100644 --- a/src/servers/src/http/greptime_result_v1.rs +++ b/src/servers/src/http/greptime_result_v1.rs @@ -23,6 +23,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use super::header::GREPTIME_DB_HEADER_METRICS; +use super::process_with_limit; use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat}; @@ -62,6 +63,11 @@ impl GreptimedbV1Response { pub fn execution_time_ms(&self) -> u64 { self.execution_time_ms } + + pub fn with_limit(mut self, limit: usize) -> Self { + self.output = process_with_limit(self.output, limit); + self + } } impl IntoResponse for GreptimedbV1Response { diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 8a16065df2c4..fa8fe98e4cf1 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -62,6 +62,7 @@ pub struct SqlQuery { // specified time precision. Maybe greptimedb format can support this // param too. pub epoch: Option, + pub limit: Option, } /// Handler to execute sql @@ -98,7 +99,7 @@ pub async fn sql( if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await { Err((status, msg)) } else { - Ok(sql_handler.do_query(sql, query_ctx).await) + Ok(sql_handler.do_query(sql, query_ctx.clone()).await) } } else { Err(( @@ -117,7 +118,7 @@ pub async fn sql( Ok(outputs) => outputs, }; - let resp = match format { + let mut resp = match format { ResponseFormat::Arrow => ArrowResponse::from_output(outputs).await, ResponseFormat::Csv => CsvResponse::from_output(outputs).await, ResponseFormat::Table => TableResponse::from_output(outputs).await, @@ -125,6 +126,9 @@ pub async fn sql( ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await, }; + if let Some(limit) = query_params.limit { + resp = resp.with_limit(limit); + } resp.with_execution_time(start.elapsed().as_millis() as u64) } diff --git a/src/servers/src/http/table_result.rs b/src/servers/src/http/table_result.rs index a7fac46e89a7..dacef51beace 100644 --- a/src/servers/src/http/table_result.rs +++ b/src/servers/src/http/table_result.rs @@ -24,6 +24,7 @@ use mime_guess::mime; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use super::process_with_limit; use crate::http::error_result::ErrorResponse; use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat}; @@ -66,6 +67,11 @@ impl TableResponse { pub fn execution_time_ms(&self) -> u64 { self.execution_time_ms } + + pub fn with_limit(mut self, limit: usize) -> Self { + self.output = process_with_limit(self.output, limit); + self + } } impl Display for TableResponse { diff --git a/src/servers/tests/http/http_handler_test.rs b/src/servers/tests/http/http_handler_test.rs index 0f0c8966ca5d..cb4a5e8ada9f 100644 --- a/src/servers/tests/http/http_handler_test.rs +++ b/src/servers/tests/http/http_handler_test.rs @@ -23,6 +23,7 @@ use headers::HeaderValue; use http_body::combinators::UnsyncBoxBody; use hyper::Response; use mime_guess::mime; +use servers::http::GreptimeQueryOutput::Records; use servers::http::{ handler as http_handler, script as script_handler, ApiState, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpResponse, @@ -48,10 +49,8 @@ async fn test_sql_not_provided() { for format in ["greptimedb_v1", "influxdb_v1", "csv", "table"] { let query = http_handler::SqlQuery { - db: None, - sql: None, format: Some(format.to_string()), - epoch: None, + ..Default::default() }; let HttpResponse::Error(resp) = http_handler::sql( @@ -82,8 +81,9 @@ async fn test_sql_output_rows() { script_handler: None, }; + let query_sql = "select sum(uint32s) from numbers limit 20"; for format in ["greptimedb_v1", "influxdb_v1", "csv", "table"] { - let query = create_query(format); + let query = create_query(format, query_sql, None); let json = http_handler::sql( State(api_state.clone()), query, @@ -112,7 +112,8 @@ async fn test_sql_output_rows() { [ 4950 ] - ] + ], + "total_rows": 1 }"# ); } @@ -176,6 +177,49 @@ async fn test_sql_output_rows() { } } +#[tokio::test] +async fn test_dashboard_sql_limit() { + let sql_handler = create_testing_sql_query_handler(MemTable::specified_numbers_table(2000)); + let ctx = QueryContext::arc(); + ctx.set_current_user(Some(auth::userinfo_by_name(None))); + let api_state = ApiState { + sql_handler, + script_handler: None, + }; + for format in ["greptimedb_v1", "csv", "table"] { + let query = create_query(format, "select * from numbers", Some(1000)); + let sql_response = http_handler::sql( + State(api_state.clone()), + query, + axum::Extension(ctx.clone()), + Form(http_handler::SqlQuery::default()), + ) + .await; + + match sql_response { + HttpResponse::GreptimedbV1(resp) => match resp.output().first().unwrap() { + Records(records) => { + assert_eq!(records.num_rows(), 1000); + } + _ => unreachable!(), + }, + HttpResponse::Csv(resp) => match resp.output().first().unwrap() { + Records(records) => { + assert_eq!(records.num_rows(), 1000); + } + _ => unreachable!(), + }, + HttpResponse::Table(resp) => match resp.output().first().unwrap() { + Records(records) => { + assert_eq!(records.num_rows(), 1000); + } + _ => unreachable!(), + }, + _ => unreachable!(), + } + } +} + #[tokio::test] async fn test_sql_form() { common_telemetry::init_default_ut_logging(); @@ -219,7 +263,8 @@ async fn test_sql_form() { [ 4950 ] - ] + ], + "total_rows": 1 }"# ); } @@ -393,7 +438,8 @@ def test(n) -> vector[i64]: [ 4 ] - ] + ], + "total_rows": 5 }"# ); } @@ -460,7 +506,8 @@ def test(n, **params) -> vector[i64]: [ 46 ] - ] + ], + "total_rows": 5 }"# ); } @@ -484,21 +531,20 @@ fn create_invalid_script_query() -> Query { }) } -fn create_query(format: &str) -> Query { +fn create_query(format: &str, sql: &str, limit: Option) -> Query { Query(http_handler::SqlQuery { - sql: Some("select sum(uint32s) from numbers limit 20".to_string()), - db: None, + sql: Some(sql.to_string()), format: Some(format.to_string()), - epoch: None, + limit, + ..Default::default() }) } fn create_form(format: &str) -> Form { Form(http_handler::SqlQuery { sql: Some("select sum(uint32s) from numbers limit 20".to_string()), - db: None, format: Some(format.to_string()), - epoch: None, + ..Default::default() }) } diff --git a/src/table/src/test_util/memtable.rs b/src/table/src/test_util/memtable.rs index 22562fa1a719..737ef644f637 100644 --- a/src/table/src/test_util/memtable.rs +++ b/src/table/src/test_util/memtable.rs @@ -101,6 +101,10 @@ impl MemTable { /// Creates a 1 column 100 rows table, with table name "numbers", column name "uint32s" and /// column type "uint32". Column data increased from 0 to 100. pub fn default_numbers_table() -> TableRef { + Self::specified_numbers_table(100) + } + + pub fn specified_numbers_table(rows: u32) -> TableRef { let column_schemas = vec![ColumnSchema::new( "uint32s", ConcreteDataType::uint32_datatype(), @@ -108,7 +112,7 @@ impl MemTable { )]; let schema = Arc::new(Schema::new(column_schemas)); let columns: Vec = vec![Arc::new(UInt32Vector::from_slice( - (0..100).collect::>(), + (0..rows).collect::>(), ))]; let recordbatch = RecordBatch::new(schema, columns).unwrap(); MemTable::table("numbers", recordbatch) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 74ce4f6add1d..14c3e8cac265 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -147,7 +147,7 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!( output[0], serde_json::from_value::(json!({ - "records" :{"schema":{"column_schemas":[{"name":"number","data_type":"UInt32"}]},"rows":[[0],[1],[2],[3],[4],[5],[6],[7],[8],[9]]} + "records" :{"schema":{"column_schemas":[{"name":"number","data_type":"UInt32"}]},"rows":[[0],[1],[2],[3],[4],[5],[6],[7],[8],[9]],"total_rows":10} })).unwrap() ); @@ -189,7 +189,7 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!( output[0], serde_json::from_value::(json!({ - "records":{"schema":{"column_schemas":[{"name":"host","data_type":"String"},{"name":"cpu","data_type":"Float64"},{"name":"memory","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[["host",66.6,1024.0,0]]} + "records":{"schema":{"column_schemas":[{"name":"host","data_type":"String"},{"name":"cpu","data_type":"Float64"},{"name":"memory","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[["host",66.6,1024.0,0]],"total_rows":1} })).unwrap() ); @@ -207,7 +207,7 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!( output[0], serde_json::from_value::(json!({ - "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]} + "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]],"total_rows":1} })).unwrap() ); @@ -224,7 +224,7 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!( output[0], serde_json::from_value::(json!({ - "records":{"schema":{"column_schemas":[{"name":"c","data_type":"Float64"},{"name":"time","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]} + "records":{"schema":{"column_schemas":[{"name":"c","data_type":"Float64"},{"name":"time","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]],"total_rows":1} })).unwrap() ); @@ -241,13 +241,13 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!( outputs[0], serde_json::from_value::(json!({ - "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]} + "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]],"total_rows":1} })).unwrap() ); assert_eq!( outputs[1], serde_json::from_value::(json!({ - "records":{"rows":[], "schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]}} + "records":{"rows":[], "schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]}, "total_rows":0} })) .unwrap() ); @@ -276,7 +276,7 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!( outputs[0], serde_json::from_value::(json!({ - "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]} + "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]],"total_rows":1} })).unwrap() ); @@ -302,7 +302,7 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!( outputs[0], serde_json::from_value::(json!({ - "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]} + "records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]],"total_rows":1} })).unwrap() ); @@ -673,7 +673,7 @@ def test(n) -> vector[f64]: assert_eq!( output[0], serde_json::from_value::(json!({ - "records":{"schema":{"column_schemas":[{"name":"n","data_type":"Float64"}]},"rows":[[1.0],[2.0],[3.0],[4.0],[5.0],[6.0],[7.0],[8.0],[9.0],[10.0]]} + "records":{"schema":{"column_schemas":[{"name":"n","data_type":"Float64"}]},"rows":[[1.0],[2.0],[3.0],[4.0],[5.0],[6.0],[7.0],[8.0],[9.0],[10.0]],"total_rows": 10} })).unwrap() );