Skip to content

Commit

Permalink
refactor: remove re-export from logging (#3865)
Browse files Browse the repository at this point in the history
* refactor: remove re-export from logging

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix merge problem

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* run formatter

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
waynexia authored May 6, 2024
1 parent 573c19b commit 5303537
Show file tree
Hide file tree
Showing 38 changed files with 112 additions and 120 deletions.
2 changes: 1 addition & 1 deletion src/catalog/src/information_schema/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use common_meta::peer::Peer;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::logging::warn;
use common_telemetry::warn;
use common_time::timestamp::Timestamp;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
Expand Down
6 changes: 3 additions & 3 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use common_error::ext::ErrorExt;
use common_meta::cache_invalidator::MultiCacheInvalidator;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::logging;
use common_telemetry::debug;
use either::Either;
use meta_client::client::MetaClientBuilder;
use query::datafusion::DatafusionQueryEngine;
Expand Down Expand Up @@ -78,7 +78,7 @@ impl Repl {

let history_file = history_file();
if let Err(e) = rl.load_history(&history_file) {
logging::debug!(
debug!(
"failed to load history file on {}, error: {e}",
history_file.display()
);
Expand Down Expand Up @@ -225,7 +225,7 @@ impl Drop for Repl {
if self.rl.helper().is_some() {
let history_file = history_file();
if let Err(e) = self.rl.save_history(&history_file) {
logging::debug!(
debug!(
"failed to save history file on {}, error: {e}",
history_file.display()
);
Expand Down
6 changes: 3 additions & 3 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::time::Duration;
use async_trait::async_trait;
use catalog::kvbackend::MetaKvBackend;
use clap::Parser;
use common_telemetry::{info, logging};
use common_telemetry::info;
use common_wal::config::DatanodeWalConfig;
use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder};
Expand Down Expand Up @@ -210,8 +210,8 @@ impl StartCommand {
.await
.context(StartDatanodeSnafu)?;

logging::info!("Datanode start command: {:#?}", self);
logging::info!("Datanode options: {:#?}", opts);
info!("Datanode start command: {:#?}", self);
info!("Datanode options: {:#?}", opts);

let node_id = opts
.node_id
Expand Down
6 changes: 3 additions & 3 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use client::client_manager::DatanodeClients;
use common_meta::cache_invalidator::MultiCacheInvalidator;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_telemetry::logging;
use common_telemetry::info;
use common_time::timezone::set_default_timezone;
use frontend::frontend::FrontendOptions;
use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
Expand Down Expand Up @@ -219,8 +219,8 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;

logging::info!("Frontend start command: {:#?}", self);
logging::info!("Frontend options: {:#?}", opts);
info!("Frontend start command: {:#?}", self);
info!("Frontend options: {:#?}", opts);

set_default_timezone(opts.default_timezone.as_deref()).context(InitTimezoneSnafu)?;

Expand Down
6 changes: 3 additions & 3 deletions src/cmd/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::time::Duration;

use async_trait::async_trait;
use clap::Parser;
use common_telemetry::logging;
use common_telemetry::info;
use meta_srv::bootstrap::MetasrvInstance;
use meta_srv::metasrv::MetasrvOptions;
use snafu::ResultExt;
Expand Down Expand Up @@ -198,8 +198,8 @@ impl StartCommand {
.await
.context(StartMetaServerSnafu)?;

logging::info!("Metasrv start command: {:#?}", self);
logging::info!("Metasrv options: {:#?}", opts);
info!("Metasrv start command: {:#?}", self);
info!("Metasrv options: {:#?}", opts);

let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins.clone(), None)
.await
Expand Down
2 changes: 1 addition & 1 deletion src/common/function/src/table/migrate_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use common_meta::rpc::procedure::MigrateRegionRequest;
use common_query::error::Error::ThreadJoin;
use common_query::error::{InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use common_telemetry::logging::error;
use common_telemetry::error;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{Value, ValueRef};
Expand Down
24 changes: 11 additions & 13 deletions src/common/procedure/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use async_trait::async_trait;
use backon::ExponentialBuilder;
use common_runtime::{RepeatedTask, TaskFunction};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{info, logging, tracing};
use common_telemetry::{error, info, tracing};
use snafu::{ensure, ResultExt};
use tokio::sync::watch::{self, Receiver, Sender};
use tokio::sync::{Mutex as TokioMutex, Notify};
Expand Down Expand Up @@ -244,20 +244,18 @@ impl ManagerContext {
) -> Option<LoadedProcedure> {
let loaders = self.loaders.lock().unwrap();
let loader = loaders.get(&message.type_name).or_else(|| {
logging::error!(
error!(
"Loader not found, procedure_id: {}, type_name: {}",
procedure_id,
message.type_name
procedure_id, message.type_name
);
None
})?;

let procedure = loader(&message.data)
.map_err(|e| {
logging::error!(
error!(
"Failed to load procedure data, key: {}, source: {:?}",
procedure_id,
e
procedure_id, e
);
e
})
Expand Down Expand Up @@ -496,7 +494,7 @@ impl LocalManager {
continue;
};

logging::info!(
info!(
"Recover root procedure {}-{}, step: {}",
loaded_procedure.procedure.type_name(),
procedure_id,
Expand All @@ -521,15 +519,15 @@ impl LocalManager {
loaded_procedure.step,
loaded_procedure.procedure,
) {
logging::error!(e; "Failed to recover procedure {}", procedure_id);
error!(e; "Failed to recover procedure {}", procedure_id);
}
}
}
}

/// Recovers unfinished procedures and reruns them.
async fn recover(&self) -> Result<()> {
logging::info!("LocalManager start to recover");
info!("LocalManager start to recover");
let recover_start = Instant::now();

let (messages, rollback_messages, finished_ids) =
Expand All @@ -539,19 +537,19 @@ impl LocalManager {
self.submit_recovered_messages(messages, InitProcedureState::Running);

if !finished_ids.is_empty() {
logging::info!(
info!(
"LocalManager try to clean finished procedures, num: {}",
finished_ids.len()
);

for procedure_id in finished_ids {
if let Err(e) = self.procedure_store.delete_procedure(procedure_id).await {
logging::error!(e; "Failed to delete procedure {}", procedure_id);
error!(e; "Failed to delete procedure {}", procedure_id);
}
}
}

logging::info!(
info!(
"LocalManager finish recovery, cost: {}ms",
recover_start.elapsed().as_millis()
);
Expand Down
30 changes: 15 additions & 15 deletions src/common/procedure/src/local/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;
use std::time::Duration;

use backon::{BackoffBuilder, ExponentialBuilder};
use common_telemetry::logging;
use common_telemetry::{debug, error, info};
use tokio::time;

use super::rwlock::OwnedKeyRwLockGuard;
Expand Down Expand Up @@ -54,7 +54,7 @@ impl ProcedureGuard {
impl Drop for ProcedureGuard {
fn drop(&mut self) {
if !self.finish {
logging::error!("Procedure {} exits unexpectedly", self.meta.id);
error!("Procedure {} exits unexpectedly", self.meta.id);

// Set state to failed. This is useful in test as runtime may not abort when the runner task panics.
// See https://github.com/tokio-rs/tokio/issues/2002 .
Expand Down Expand Up @@ -104,7 +104,7 @@ impl Runner {
// Ensure we can update the procedure state.
let mut guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone());

logging::info!(
info!(
"Runner {}-{} starts",
self.procedure.type_name(),
self.meta.id
Expand Down Expand Up @@ -149,7 +149,7 @@ impl Runner {

for id in procedure_ids {
if let Err(e) = self.store.delete_procedure(id).await {
logging::error!(
error!(
e;
"Runner {}-{} failed to delete procedure {}",
self.procedure.type_name(),
Expand All @@ -160,7 +160,7 @@ impl Runner {
}
}

logging::info!(
info!(
"Runner {}-{} exits",
self.procedure.type_name(),
self.meta.id
Expand Down Expand Up @@ -260,7 +260,7 @@ impl Runner {
ProcedureState::Running | ProcedureState::Retrying { .. } => {
match self.procedure.execute(ctx).await {
Ok(status) => {
logging::debug!(
debug!(
"Execute procedure {}-{} once, status: {:?}, need_persist: {}",
self.procedure.type_name(),
self.meta.id,
Expand Down Expand Up @@ -299,7 +299,7 @@ impl Runner {
}
}
Err(e) => {
logging::error!(
error!(
e;
"Failed to execute procedure {}-{}, retry: {}",
self.procedure.type_name(),
Expand Down Expand Up @@ -394,7 +394,7 @@ impl Runner {

/// Extend the retry time to wait for the next retry.
async fn wait_on_err(&mut self, d: Duration, i: u64) {
logging::info!(
info!(
"Procedure {}-{} retry for the {} times after {} millis",
self.procedure.type_name(),
self.meta.id,
Expand All @@ -407,7 +407,7 @@ impl Runner {
async fn on_suspended(&mut self, subprocedures: Vec<ProcedureWithId>) {
let has_child = !subprocedures.is_empty();
for subprocedure in subprocedures {
logging::info!(
info!(
"Procedure {}-{} submit subprocedure {}-{}",
self.procedure.type_name(),
self.meta.id,
Expand All @@ -422,7 +422,7 @@ impl Runner {
);
}

logging::info!(
info!(
"Procedure {}-{} is waiting for subprocedures",
self.procedure.type_name(),
self.meta.id,
Expand All @@ -432,7 +432,7 @@ impl Runner {
if has_child {
self.meta.child_notify.notified().await;

logging::info!(
info!(
"Procedure {}-{} is waked up",
self.procedure.type_name(),
self.meta.id,
Expand All @@ -454,7 +454,7 @@ impl Runner {
)
.await
.map_err(|e| {
logging::error!(
error!(
e; "Failed to persist procedure {}-{}",
self.procedure.type_name(),
self.meta.id
Expand All @@ -470,7 +470,7 @@ impl Runner {
.commit_procedure(self.meta.id, self.step)
.await
.map_err(|e| {
logging::error!(
error!(
e; "Failed to commit procedure {}-{}",
self.procedure.type_name(),
self.meta.id
Expand All @@ -496,7 +496,7 @@ impl Runner {
.rollback_procedure(self.meta.id, message)
.await
.map_err(|e| {
logging::error!(
error!(
e; "Failed to write rollback key for procedure {}-{}",
self.procedure.type_name(),
self.meta.id
Expand All @@ -509,7 +509,7 @@ impl Runner {

fn done(&self, output: Option<Output>) {
// TODO(yingwen): Add files to remove list.
logging::info!(
info!(
"Procedure {}-{} done",
self.procedure.type_name(),
self.meta.id,
Expand Down
16 changes: 7 additions & 9 deletions src/common/procedure/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::fmt;

use common_telemetry::logging;
use common_telemetry::{debug, error, info, warn};
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
Expand Down Expand Up @@ -65,7 +65,7 @@ impl ProcedureStore {
/// Creates a new [ProcedureStore] from specific [StateStoreRef].
pub(crate) fn new(parent_path: &str, store: StateStoreRef) -> ProcedureStore {
let proc_path = format!("{}{PROC_PATH}", parent_path);
logging::info!("The procedure state store path is: {}", &proc_path);
info!("The procedure state store path is: {}", &proc_path);
ProcedureStore { proc_path, store }
}

Expand Down Expand Up @@ -154,7 +154,7 @@ impl ProcedureStore {
while let Some((key_set, _)) = key_values.try_next().await? {
let key = key_set.key();
let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, key) else {
logging::warn!("Unknown key while deleting procedures, key: {}", key);
warn!("Unknown key while deleting procedures, key: {}", key);
continue;
};
if curr_key.key_type == KeyType::Step {
Expand All @@ -165,11 +165,9 @@ impl ProcedureStore {
}
}

logging::debug!(
debug!(
"Delete keys for procedure {}, step_keys: {:?}, finish_keys: {:?}",
procedure_id,
step_keys,
finish_keys
procedure_id, step_keys, finish_keys
);
// We delete all step keys first.
self.store.batch_delete(step_keys.as_slice()).await?;
Expand Down Expand Up @@ -203,7 +201,7 @@ impl ProcedureStore {
while let Some((key_set, value)) = key_values.try_next().await? {
let key = key_set.key();
let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, key) else {
logging::warn!("Unknown key while loading procedures, key: {}", key);
warn!("Unknown key while loading procedures, key: {}", key);
continue;
};

Expand Down Expand Up @@ -251,7 +249,7 @@ impl ProcedureStore {
serde_json::from_slice(value)
.map_err(|e| {
// `e` doesn't impl ErrorExt so we print it as normal error.
logging::error!("Failed to parse value, key: {:?}, source: {:?}", key, e);
error!("Failed to parse value, key: {:?}, source: {:?}", key, e);
e
})
.ok()
Expand Down
Loading

0 comments on commit 5303537

Please sign in to comment.