Skip to content

Commit

Permalink
feat: support querying field column names in Prometheus HTTP API (#3880)
Browse files Browse the repository at this point in the history
* feat: support querying field column names in Prometheus HTTP API

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use tables stream API

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
waynexia committed May 8, 2024
1 parent d997463 commit a6a702d
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 3 deletions.
4 changes: 2 additions & 2 deletions src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,14 +310,14 @@ pub enum Error {
location: Location,
},

#[snafu(display("Cannot find requested database: {}-{}", catalog, schema))]
#[snafu(display("Cannot find requested database: {}.{}", catalog, schema))]
DatabaseNotFound {
catalog: String,
schema: String,
location: Location,
},

#[snafu(display("Cannot find requested table: {}-{}-{}", catalog, schema, table))]
#[snafu(display("Cannot find requested table: {}.{}.{}", catalog, schema, table))]
TableNotFound {
catalog: String,
schema: String,
Expand Down
58 changes: 57 additions & 1 deletion src/servers/src/http/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use common_version::BuildInfo;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{Float64Vector, StringVector};
use futures::StreamExt;
use promql_parser::label::METRIC_NAME;
use promql_parser::parser::{
AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr,
Expand All @@ -49,7 +50,7 @@ use crate::error::{
UnexpectedResultSnafu,
};
use crate::http::header::collect_plan_metrics;
use crate::prom_store::METRIC_NAME_LABEL;
use crate::prom_store::{FIELD_NAME_LABEL, METRIC_NAME_LABEL};
use crate::prometheus_handler::PrometheusHandlerRef;

/// For [ValueType::Vector] result type
Expand Down Expand Up @@ -690,6 +691,23 @@ pub async fn label_values_query(
};
table_names.sort_unstable();
return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names));
} else if label_name == FIELD_NAME_LABEL {
let field_columns =
match retrieve_field_names(&query_ctx, handler.catalog_manager(), params.matches.0)
.await
{
Ok(table_names) => table_names,
Err(e) => {
return PrometheusJsonResponse::error(
e.status_code().to_string(),
e.output_msg(),
);
}
};

return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(
field_columns.into_iter().collect(),
));
}

let queries = params.matches.0;
Expand Down Expand Up @@ -742,6 +760,44 @@ pub async fn label_values_query(
resp
}

async fn retrieve_field_names(
query_ctx: &QueryContext,
manager: CatalogManagerRef,
matches: Vec<String>,
) -> Result<HashSet<String>> {
let mut field_columns = HashSet::new();
let catalog = query_ctx.current_catalog();
let schema = query_ctx.current_schema();

if matches.is_empty() {
// query all tables if no matcher is provided
while let Some(table) = manager.tables(catalog, schema).await.next().await {
let table = table.context(CatalogSnafu)?;
for column in table.field_columns() {
field_columns.insert(column.name);
}
}
return Ok(field_columns);
}

for table_name in matches {
let table = manager
.table(catalog, schema, &table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
catalog: catalog.to_string(),
schema: schema.to_string(),
table: table_name.to_string(),
})?;

for column in table.field_columns() {
field_columns.insert(column.name);
}
}
Ok(field_columns)
}

async fn retrieve_label_values(
result: Result<Output>,
label_name: &str,
Expand Down
3 changes: 3 additions & 0 deletions src/servers/src/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ pub const METRIC_NAME_LABEL: &str = "__name__";

pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";

/// The same as `FIELD_COLUMN_MATCHER` in `promql` crate
pub const FIELD_NAME_LABEL: &str = "__field__";

/// Metrics for push gateway protocol
pub struct Metrics {
pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,
Expand Down
22 changes: 22 additions & 0 deletions src/table/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::sync::Arc;

use common_query::logical_plan::Expr;
Expand Down Expand Up @@ -90,4 +91,25 @@ impl Table {
.iter()
.map(|i| self.table_info.meta.schema.column_schemas()[*i].clone())
}

/// Get field columns in the definition order.
pub fn field_columns(&self) -> impl Iterator<Item = ColumnSchema> + '_ {
// `value_indices` in TableMeta is not reliable. Do a filter here.
let primary_keys = self
.table_info
.meta
.primary_key_indices
.iter()
.copied()
.collect::<HashSet<_>>();

self.table_info
.meta
.schema
.column_schemas()
.iter()
.enumerate()
.filter(move |(i, c)| !primary_keys.contains(i) && !c.is_time_index())
.map(|(_, c)| c.clone())
}
}
13 changes: 13 additions & 0 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,19 @@ pub async fn test_prom_http_api(store_type: StorageType) {
serde_json::from_value::<PrometheusResponse>(json!(["host1", "host2"])).unwrap()
);

// search field name
let res = client
.get("/v1/prometheus/api/v1/label/__field__/values?match[]=demo")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(["cpu", "memory"])).unwrap()
);

// query an empty database should return nothing
let res = client
.get("/v1/prometheus/api/v1/label/host/values?match[]=demo&start=0&end=600")
Expand Down

0 comments on commit a6a702d

Please sign in to comment.