Skip to content

Commit

Permalink
feat: implement otel log appender
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun committed Aug 1, 2024
1 parent 4fd68a2 commit 098e6f5
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 5 deletions.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/tisonkun/logforth"
rust-version = "1.71.0"
version = "0.2.0"
version = "0.3.0"

[features]
fastrace = ["dep:fastrace"]
file = ["dep:crossbeam-channel", "dep:parking_lot", "dep:time"]
json = ["dep:serde_json", "dep:serde"]
no-color = ["colored/no-color"]
opentelemetry = ["dep:opentelemetry", "dep:opentelemetry-otlp", "dep:opentelemetry_sdk"]

[dependencies]
anyhow = { version = "1.0" }
Expand All @@ -38,6 +39,9 @@ crossbeam-channel = { version = "0.5", optional = true }
fastrace = { version = "0.6", optional = true }
humantime = { version = "2.1" }
log = { version = "0.4", features = ["std", "kv_unstable"] }
opentelemetry = { version = "0.24", features = ["logs"], optional = true }
opentelemetry-otlp = { version = "0.17", features = ["logs", "grpc-tonic"], optional = true }
opentelemetry_sdk = { version = "0.24", features = ["logs", "rt-tokio"], optional = true }
parking_lot = { version = "0.12", optional = true }
paste = { version = "1.0" }
serde = { version = "1.0", features = ["derive"], optional = true }
Expand Down
23 changes: 19 additions & 4 deletions src/append/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::dynlog::DynLog;
use crate::filter::FilterImpl;
use crate::layout;
use crate::layout::Layout;
pub use boxdyn::*;
#[cfg(feature = "fastrace")]
pub use fastrace::*;
#[cfg(feature = "file")]
pub use file::*;
#[cfg(feature = "opentelemetry")]
pub use opentelemetry::*;
pub use stdio::*;

use crate::dynlog::DynLog;
use crate::filter::FilterImpl;
use crate::layout;
use crate::layout::Layout;

mod boxdyn;
#[cfg(feature = "fastrace")]
mod fastrace;
#[cfg(feature = "file")]
mod file;
#[cfg(feature = "opentelemetry")]
mod opentelemetry;
mod stdio;

pub trait Append {
Expand Down Expand Up @@ -56,6 +61,8 @@ pub enum AppendImpl {
DynLog(DynLog),
#[cfg(feature = "fastrace")]
Fastrace(Fastrace),
#[cfg(feature = "opentelemetry")]
OpenTelemetryLog(OpenTelemetryLog),
#[cfg(feature = "file")]
RollingFile(RollingFile),
Stdout(Stdout),
Expand All @@ -69,6 +76,8 @@ impl Append for AppendImpl {
AppendImpl::DynLog(append) => append.try_append(record),
#[cfg(feature = "fastrace")]
AppendImpl::Fastrace(append) => append.try_append(record),
#[cfg(feature = "opentelemetry")]
AppendImpl::OpenTelemetryLog(append) => append.try_append(record),
#[cfg(feature = "file")]
AppendImpl::RollingFile(append) => append.try_append(record),
AppendImpl::Stdout(append) => append.try_append(record),
Expand All @@ -82,6 +91,8 @@ impl Append for AppendImpl {
AppendImpl::DynLog(append) => append.flush(),
#[cfg(feature = "fastrace")]
AppendImpl::Fastrace(append) => append.flush(),
#[cfg(feature = "opentelemetry")]
AppendImpl::OpenTelemetryLog(append) => append.flush(),
#[cfg(feature = "file")]
AppendImpl::RollingFile(append) => append.flush(),
AppendImpl::Stdout(append) => append.flush(),
Expand All @@ -95,6 +106,8 @@ impl Append for AppendImpl {
AppendImpl::DynLog(append) => append.default_layout(),
#[cfg(feature = "fastrace")]
AppendImpl::Fastrace(append) => append.default_layout(),
#[cfg(feature = "opentelemetry")]
AppendImpl::OpenTelemetryLog(append) => append.default_layout(),
#[cfg(feature = "file")]
AppendImpl::RollingFile(append) => append.default_layout(),
AppendImpl::Stdout(append) => append.default_layout(),
Expand All @@ -108,6 +121,8 @@ impl Append for AppendImpl {
AppendImpl::DynLog(append) => append.default_filters(),
#[cfg(feature = "fastrace")]
AppendImpl::Fastrace(append) => append.default_filters(),
#[cfg(feature = "opentelemetry")]
AppendImpl::OpenTelemetryLog(append) => append.default_filters(),
#[cfg(feature = "file")]
AppendImpl::RollingFile(append) => append.default_filters(),
AppendImpl::Stdout(append) => append.default_filters(),
Expand Down
95 changes: 95 additions & 0 deletions src/append/opentelemetry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;

use log::Record;
use opentelemetry::logs::{AnyValue, LoggerProvider as ILoggerProvider};
use opentelemetry::logs::{Logger, Severity};
use opentelemetry::InstrumentationLibrary;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::logs::LoggerProvider;

use crate::append::Append;

#[derive(Debug)]
pub struct OpenTelemetryLog {
name: String,
category: String,
library: Arc<InstrumentationLibrary>,
provider: LoggerProvider,
}

impl OpenTelemetryLog {
pub fn new(
name: impl Into<String>,
category: impl Into<String>,
otlp_endpoint: impl Into<String>,
) -> Self {
let name = name.into();
let category = category.into();
let otlp_endpoint = otlp_endpoint.into();

let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(otlp_endpoint)
.with_protocol(opentelemetry_otlp::Protocol::Grpc)
.with_timeout(Duration::from_secs(
opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT,
))
.build_log_exporter()
.expect("failed to initialize oltp exporter");

let provider = LoggerProvider::builder()
.with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
.build();

let library = Arc::new(InstrumentationLibrary::builder(name.clone()).build());

Self {
name,
category,
library,
provider,
}
}
}

impl Append for OpenTelemetryLog {
fn try_append(&self, log_record: &Record) -> anyhow::Result<()> {
let provider = self.provider.clone();
let logger = provider.library_logger(self.library.clone());

let mut record = opentelemetry_sdk::logs::LogRecord::default();
record.observed_timestamp = Some(SystemTime::now());
record.severity_number = Some(log_level_to_otel_severity(log_record.level()));
record.severity_text = Some(log_record.level().as_str().into());
record.body = Some(AnyValue::from(log_record.args().to_string()));

logger.emit(record);
Ok(())
}

fn flush(&self) {
for err in self
.provider
.force_flush()
.into_iter()
.filter_map(|r| r.err())
{
eprintln!(
"failed to flush logger ({}@{}): {}",
self.name, self.category, err
);
}
}
}

fn log_level_to_otel_severity(level: log::Level) -> Severity {
match level {
log::Level::Error => Severity::Error,
log::Level::Warn => Severity::Warn,
log::Level::Info => Severity::Info,
log::Level::Debug => Severity::Debug,
log::Level::Trace => Severity::Trace,
}
}

0 comments on commit 098e6f5

Please sign in to comment.