Skip to content

Commit

Permalink
feat: adds start_time and uptime to cluster_info
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Apr 29, 2024
1 parent 355b11d commit 2c909a5
Show file tree
Hide file tree
Showing 20 changed files with 313 additions and 67 deletions.
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "aba235025ac5643c12bfdcefd656af11ad58ea8e" }
greptime-proto = { git = "https://github.com/killme2008/greptime-proto.git", rev = "83991744468a010512758c2137ff3a4c9fd5eb91" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
1 change: 1 addition & 0 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ datafusion.workspace = true
datatypes.workspace = true
futures = "0.3"
futures-util.workspace = true
humantime.workspace = true
itertools.workspace = true
lazy_static.workspace = true
meta-client.workspace = true
Expand Down
58 changes: 53 additions & 5 deletions src/catalog/src/information_schema/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::sync::{Arc, Weak};
use std::time::Duration;

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID;
Expand All @@ -24,13 +25,17 @@ use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::logging::warn;
use common_time::timestamp::Timestamp;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::timestamp::TimestampMillisecond;
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, UInt64VectorBuilder};
use datatypes::vectors::{
StringVectorBuilder, TimestampMillisecondVectorBuilder, UInt64VectorBuilder,
};
use snafu::ResultExt;
use store_api::storage::{ScanRequest, TableId};

Expand All @@ -44,7 +49,8 @@ const PEER_TYPE: &str = "peer_type";
const PEER_ADDR: &str = "peer_addr";
const VERSION: &str = "version";
const GIT_COMMIT: &str = "git_commit";
// TODO(dennis): adds `uptime`, `start_time` columns etc.
const START_TIME: &str = "start_time";
const UPTIME: &str = "uptime";

const INIT_CAPACITY: usize = 42;

Expand All @@ -55,17 +61,21 @@ const INIT_CAPACITY: usize = 42;
/// - `peer_addr`: the peer gRPC address.
/// - `version`: the build package version of the peer.
/// - `git_commit`: the build git commit hash of the peer.
/// - `start_time`: the starting time of the peer.
/// - `uptime`: the uptime of the peer.
///
pub(super) struct InformationSchemaClusterInfo {
schema: SchemaRef,
catalog_manager: Weak<dyn CatalogManager>,
start_time_ms: u64,
}

impl InformationSchemaClusterInfo {
pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
schema: Self::schema(),
catalog_manager,
start_time_ms: common_time::util::current_time_millis() as u64,
}
}

Expand All @@ -76,11 +86,21 @@ impl InformationSchemaClusterInfo {
ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(VERSION, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(GIT_COMMIT, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(
START_TIME,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
),
ColumnSchema::new(UPTIME, ConcreteDataType::string_datatype(), true),
]))
}

fn builder(&self) -> InformationSchemaClusterInfoBuilder {
InformationSchemaClusterInfoBuilder::new(self.schema.clone(), self.catalog_manager.clone())
InformationSchemaClusterInfoBuilder::new(
self.schema.clone(),
self.catalog_manager.clone(),
self.start_time_ms,
)
}
}

Expand Down Expand Up @@ -120,17 +140,24 @@ impl InformationTable for InformationSchemaClusterInfo {

struct InformationSchemaClusterInfoBuilder {
schema: SchemaRef,
start_time_ms: u64,
catalog_manager: Weak<dyn CatalogManager>,

peer_ids: UInt64VectorBuilder,
peer_types: StringVectorBuilder,
peer_addrs: StringVectorBuilder,
versions: StringVectorBuilder,
git_commits: StringVectorBuilder,
start_times: TimestampMillisecondVectorBuilder,
uptimes: StringVectorBuilder,
}

impl InformationSchemaClusterInfoBuilder {
fn new(schema: SchemaRef, catalog_manager: Weak<dyn CatalogManager>) -> Self {
fn new(
schema: SchemaRef,
catalog_manager: Weak<dyn CatalogManager>,
start_time_ms: u64,
) -> Self {
Self {
schema,
catalog_manager,
Expand All @@ -139,6 +166,9 @@ impl InformationSchemaClusterInfoBuilder {
peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
versions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
git_commits: StringVectorBuilder::with_capacity(INIT_CAPACITY),
start_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
uptimes: StringVectorBuilder::with_capacity(INIT_CAPACITY),
start_time_ms,
}
}

Expand All @@ -164,7 +194,10 @@ impl InformationSchemaClusterInfoBuilder {
last_activity_ts: -1,
status: NodeStatus::Standalone,
version: build_info.version.to_string(),
git_commit: build_info.commit.to_string(),
git_commit: build_info.commit_short.to_string(),
// Use `self.start_time_ms` instead.
// It's not precise but enough.
start_time_ms: self.start_time_ms,
},
);
}
Expand Down Expand Up @@ -208,6 +241,19 @@ impl InformationSchemaClusterInfoBuilder {
self.peer_addrs.push(Some(&node_info.peer.addr));
self.versions.push(Some(&node_info.version));
self.git_commits.push(Some(&node_info.git_commit));
if node_info.start_time_ms > 0 {
self.start_times
.push(Some(TimestampMillisecond(Timestamp::new_millisecond(
node_info.start_time_ms as i64,
))));
let now = common_time::util::current_time_millis() as u64;
let duration_since_start = now - node_info.start_time_ms;
let format = humantime::format_duration(Duration::from_millis(duration_since_start));
self.uptimes.push(Some(format.to_string().as_str()));
} else {
self.start_times.push(None);
self.uptimes.push(None);
}
}

fn finish(&mut self) -> Result<RecordBatch> {
Expand All @@ -217,6 +263,8 @@ impl InformationSchemaClusterInfoBuilder {
Arc::new(self.peer_addrs.finish()),
Arc::new(self.versions.finish()),
Arc::new(self.git_commits.finish()),
Arc::new(self.start_times.finish()),
Arc::new(self.uptimes.finish()),
];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/information_schema/region_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ const INIT_CAPACITY: usize = 42;
///
/// - `region_id`: the region id
/// - `peer_id`: the region storage datanode peer id
/// - `peer_addr`: the region storage datanode peer address
/// - `peer_addr`: the region storage datanode gRPC peer address
/// - `is_leader`: whether the peer is the leader
/// - `status`: the region status, `ALIVE` or `DOWNGRADED`.
/// - `down_seconds`: the duration of being offline, in seconds.
Expand Down
3 changes: 1 addition & 2 deletions src/catalog/src/information_schema/runtime_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,13 @@ impl InformationSchemaMetricsBuilder {
.join(", "),
// Safety: always has a sample
ts.samples[0].value,
// TODO(dennis): fetching other peers metrics

// The peer column is always `None` for standalone
None,
"standalone",
);
}

// FIXME(dennis): fetching other peers metrics
self.finish()
}

Expand Down
1 change: 1 addition & 0 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ impl StartCommand {
]);

let heartbeat_task = HeartbeatTask::new(
&opts,
meta_client.clone(),
opts.heartbeat.clone(),
Arc::new(executor),
Expand Down
4 changes: 4 additions & 0 deletions src/common/meta/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ pub struct NodeInfo {
pub version: String,
// The node build git commit hash
pub git_commit: String,
// The node star timestamp
pub start_time_ms: u64,
}

#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
Expand Down Expand Up @@ -290,6 +292,7 @@ mod tests {
}),
version: "".to_string(),
git_commit: "".to_string(),
start_time_ms: 1,
};

let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
Expand All @@ -306,6 +309,7 @@ mod tests {
leader_regions: 3,
follower_regions: 4,
}),
start_time_ms: 1,
..
}
);
Expand Down
1 change: 1 addition & 0 deletions src/datanode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-version.workspace = true
common-wal.workspace = true
dashmap.workspace = true
datafusion.workspace = true
Expand Down
10 changes: 9 additions & 1 deletion src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

use api::v1::meta::{HeartbeatRequest, Peer, RegionRole, RegionStat, Role};
use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat, Role};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
Expand All @@ -43,6 +43,7 @@ use crate::region_server::RegionServer;
pub(crate) mod handler;
pub(crate) mod task_tracker;

/// The datanode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
pub struct HeartbeatTask {
node_id: u64,
node_epoch: u64,
Expand Down Expand Up @@ -246,6 +247,7 @@ impl HeartbeatTask {
}
}
_ = &mut sleep => {
let build_info = common_version::build_info();
let region_stats = Self::load_region_stats(&region_server_clone).await;
let now = Instant::now();
let duration_since_epoch = (now - epoch).as_millis() as u64;
Expand All @@ -254,6 +256,12 @@ impl HeartbeatTask {
region_stats,
duration_since_epoch,
node_epoch,
info: Some(NodeInfo {
version: build_info.version.to_string(),
git_commit: build_info.commit_short.to_string(),
// The start timestamp is the same as node_epoch currently.
start_time_ms: node_epoch,
}),
..Default::default()
};
sleep.as_mut().reset(now + Duration::from_millis(interval));
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-version.workspace = true
datanode.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
Expand Down
Loading

0 comments on commit 2c909a5

Please sign in to comment.