From b6604693edfd23d59e8e00cc0979f658ca760853 Mon Sep 17 00:00:00 2001 From: David Braden Date: Fri, 4 Aug 2023 13:11:17 -0600 Subject: [PATCH] Influxive (#45) * swap out prometheus metrics * integrate influxive fixes and dashboard * test update * include some backend library metrics * don't take metrics, incase poll freq is faster * metrics test * windows test fix * bump versions --- .github/workflows/test.yml | 2 +- Cargo.lock | 488 +++++++++++++++++- Cargo.toml | 6 +- Makefile | 2 +- crates/tx5-demo/Cargo.toml | 5 +- .../src/influxive-dashboards/tx5.json | 1 + crates/tx5-demo/src/main.rs | 21 +- crates/tx5/Cargo.toml | 9 +- crates/tx5/src/config.rs | 25 - crates/tx5/src/endpoint.rs | 157 ++++++ crates/tx5/src/state.rs | 113 +--- crates/tx5/src/state/conn.rs | 151 +++--- crates/tx5/src/state/test.rs | 38 +- crates/tx5/src/test.rs | 36 ++ 14 files changed, 841 insertions(+), 213 deletions(-) create mode 100644 crates/tx5-demo/src/influxive-dashboards/tx5.json diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 12129d47..db21e8f4 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -43,4 +43,4 @@ jobs: - name: Cargo Test env: RUST_BACKTRACE: 1 - run: cargo test -- --nocapture + run: cargo test -- --test-threads 1 --nocapture diff --git a/Cargo.lock b/Cargo.lock index 2dc8273a..ed7e8a2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -161,6 +161,12 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd" +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + [[package]] name = "anes" version = "0.1.6" @@ -403,6 +409,27 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +[[package]] +name = "bzip2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdb116a6ef3f6c3698828873ad02c3014b3c85cadb88496095628e3ef1e347f8" +dependencies = [ + "bzip2-sys", + "libc", +] + +[[package]] +name = "bzip2-sys" +version = "0.1.11+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "cast" version = "0.3.0" @@ -414,6 +441,9 @@ name = "cc" version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" +dependencies = [ + "jobserver", +] [[package]] name = "ccm" @@ -432,6 +462,17 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec837a71355b28f6556dbd569b37b3f363091c0bd4b2e735674521b4c5fd9bc5" +dependencies = [ + "android-tzdata", + "num-traits", + "serde", +] + [[package]] name = "ciborium" version = "0.2.1" @@ -542,6 +583,12 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "520fbf3c07483f94e3e3ca9d0cfd913d7718ef2483d2cfd91c0d9e91474ab913" +[[package]] +name = "constant_time_eq" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" + [[package]] name = "core-foundation" version = "0.9.3" @@ -1054,6 +1101,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.0" @@ -1322,6 +1384,12 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hex-literal" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46" + [[package]] name = "hkdf" version = "0.12.3" @@ -1419,6 +1487,33 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97" +dependencies = [ + "futures-util", + "http", + "hyper", + "rustls 0.21.5", + "tokio", + "tokio-rustls 0.24.1", +] + +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -1466,6 +1561,109 @@ dependencies = [ "hashbrown 0.12.3", ] +[[package]] +name = "influxdb" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f79168e8b5047761c3e027438d6a46150380b2d78b15b723da04beefde29832" +dependencies = [ + "chrono", + "futures-util", + "http", + "lazy_static", + "regex", + "reqwest", + "serde", + "serde_json", + "thiserror", +] + +[[package]] +name = "influxive" +version = "0.0.1-alpha.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e751ac3b2c3a7943237d176f43d051e81611ab60927ee0de778eef8eb8a9946" +dependencies = [ + "influxive-child-svc", + "influxive-otel", + "influxive-writer", +] + +[[package]] +name = "influxive-child-svc" +version = "0.0.1-alpha.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3304096e465620b1e8b8f126e385050d8a0071f20ad09d1dae240474fde52bb9" +dependencies = [ + "hex-literal", + "influxive-core", + "influxive-downloader", + "influxive-writer", + "tempfile", + "tokio", + "tracing", +] + +[[package]] +name = "influxive-core" +version = "0.0.1-alpha.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "361ab244896d34c02cdfb4fb90a0cf11d1db7e140f6d0f9eb872c335e3f7f602" + +[[package]] +name = "influxive-downloader" +version = "0.0.1-alpha.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc2575384f9b4e5f94f316c69844f0a31e139694a530ffcc9cea6c98ce81627f" +dependencies = [ + "base64 0.21.2", + "digest 0.10.7", + "dirs", + "flate2", + "futures", + "hex", + "hex-literal", + "influxive-core", + "reqwest", + "sha2", + "tar", + "tempfile", + "tokio", + "zip", +] + +[[package]] +name = "influxive-otel" +version = "0.0.1-alpha.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a29ee9911b66e6f433734c900ed1d95b3cffeab1ea7dace3ecd3a3dd224e8be" +dependencies = [ + "influxive-core", + "tokio", + "ts_opentelemetry_api", +] + +[[package]] +name = "influxive-otel-atomic-obs" +version = "0.0.1-alpha.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b07bcce79167d27b8b2d639cf026029506ed3dfa7bf7ee402c29cab03a7afd16" +dependencies = [ + "ts_opentelemetry_api", +] + +[[package]] +name = "influxive-writer" +version = "0.0.1-alpha.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee73e5486492eafd6c50790545df7b73c81edf69b299f96183bdcb9371619951" +dependencies = [ + "influxdb", + "influxive-core", + "tokio", + "tracing", +] + [[package]] name = "inout" version = "0.1.3" @@ -1523,7 +1721,7 @@ dependencies = [ "socket2 0.5.3", "widestring", "windows-sys 0.48.0", - "winreg", + "winreg 0.50.0", ] [[package]] @@ -1559,6 +1757,15 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" +[[package]] +name = "jobserver" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "936cfd212a0155903bcbc060e316fb6cc7cbf2e1907329391ebadc1fe0ce77c2" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.64" @@ -1834,6 +2041,24 @@ dependencies = [ "rand", ] +[[package]] +name = "native-tls" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nix" version = "0.24.3" @@ -1954,12 +2179,50 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "openssl" +version = "0.10.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "345df152bc43501c5eb9e4654ff05f794effb78d4efe3d53abc158baddc0703d" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.18", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "374533b0e45f3a7ced10fcaeccca020e66656bc03dac384f852e4e5a7a8104a6" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -2050,12 +2313,35 @@ dependencies = [ "windows-targets 0.48.0", ] +[[package]] +name = "password-hash" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7676374caaee8a325c9e7a2ae557f216c5563a171d6997b0ef8a65af35147700" +dependencies = [ + "base64ct", + "rand_core 0.6.4", + "subtle", +] + [[package]] name = "paste" version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f746c4065a8fa3fe23974dd82f15431cc8d40779821001404d10d2e79ca7d79" +[[package]] +name = "pbkdf2" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83a0692ec44e4cf1ef28ca317f14f8f07da2d95ec3fa01f86e4467b725e60917" +dependencies = [ + "digest 0.10.7", + "hmac 0.12.1", + "password-hash", + "sha2", +] + [[package]] name = "pem" version = "1.1.1" @@ -2406,6 +2692,50 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78" +[[package]] +name = "reqwest" +version = "0.11.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cde824a14b7c14f85caff81225f411faacc04a2013f41670f41443742b1c1c55" +dependencies = [ + "base64 0.21.2", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-rustls", + "hyper-tls", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls 0.21.5", + "rustls-pemfile", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-native-tls", + "tokio-rustls 0.24.1", + "tokio-util", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-streams", + "web-sys", + "webpki-roots 0.22.6", + "winreg 0.10.1", +] + [[package]] name = "resolv-conf" version = "0.7.0" @@ -2555,6 +2885,18 @@ dependencies = [ "webpki 0.22.0", ] +[[package]] +name = "rustls" +version = "0.21.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79ea77c539259495ce8ca47f53e66ae0330a8819f67e23ac96ca02f50e7b7d36" +dependencies = [ + "log", + "ring", + "rustls-webpki 0.101.2", + "sct 0.7.0", +] + [[package]] name = "rustls-native-certs" version = "0.6.2" @@ -2586,6 +2928,16 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustls-webpki" +version = "0.101.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "513722fd73ad80a71f72b61009ea1b584bcfa1483ca93949c8f290298837fa59" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "ryu" version = "1.0.13" @@ -3122,6 +3474,16 @@ dependencies = [ "syn 2.0.18", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.23.4" @@ -3133,6 +3495,16 @@ dependencies = [ "webpki 0.22.0", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.5", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.14" @@ -3155,7 +3527,7 @@ dependencies = [ "rustls 0.20.8", "rustls-native-certs", "tokio", - "tokio-rustls", + "tokio-rustls 0.23.4", "tungstenite", "webpki 0.22.0", ] @@ -3348,6 +3720,22 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +[[package]] +name = "ts_opentelemetry_api" +version = "0.20.0-beta.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d186495330f646b5881aeb3b83bac75b8a462d7ef32fda06a2a68f3869d5ba82" +dependencies = [ + "futures-channel", + "futures-util", + "indexmap", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", + "urlencoding", +] + [[package]] name = "tungstenite" version = "0.18.0" @@ -3390,20 +3778,25 @@ dependencies = [ [[package]] name = "tx5" -version = "0.0.1-alpha.18" +version = "0.0.1-alpha.19" dependencies = [ "bytes", "criterion", "futures", + "influxive", + "influxive-child-svc", + "influxive-otel-atomic-obs", "once_cell", "parking_lot", - "prometheus", "rand", "rand-utf8", + "serde", "serde_json", + "tempfile", "tokio", "tracing", "tracing-subscriber", + "ts_opentelemetry_api", "tx5-core", "tx5-go-pion", "tx5-signal", @@ -3431,19 +3824,22 @@ dependencies = [ [[package]] name = "tx5-demo" -version = "0.0.1-alpha.18" +version = "0.0.1-alpha.19" dependencies = [ "base64 0.13.1", "bytes", "clap", "futures", + "influxive", "rand", "serde", "serde_json", + "tempfile", "tokio", "tracing", "tracing-appender", "tracing-subscriber", + "ts_opentelemetry_api", "tx5", ] @@ -3526,7 +3922,7 @@ dependencies = [ "socket2 0.5.3", "sodoken", "tokio", - "tokio-rustls", + "tokio-rustls 0.23.4", "tokio-tungstenite", "tracing", "tracing-subscriber", @@ -3656,6 +4052,12 @@ dependencies = [ "serde", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf-8" version = "0.7.6" @@ -3792,6 +4194,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.87" @@ -3821,6 +4235,19 @@ version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +[[package]] +name = "wasm-streams" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bbae3363c08332cadccd13b67db371814cd214c2524020932f0804b8cf7c078" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.64" @@ -3866,7 +4293,7 @@ version = "0.23.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b03058f88386e5ff5310d9111d53f48b17d732b401aeb83a8d5190f2ac459338" dependencies = [ - "rustls-webpki", + "rustls-webpki 0.100.1", ] [[package]] @@ -4270,6 +4697,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "winreg" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" +dependencies = [ + "winapi", +] + [[package]] name = "winreg" version = "0.50.0" @@ -4373,8 +4809,46 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "760394e246e4c28189f19d488c058bf16f564016aefac5d32bb1f3b51d5e9261" dependencies = [ + "aes 0.8.2", "byteorder", + "bzip2", + "constant_time_eq", "crc32fast", "crossbeam-utils", "flate2", + "hmac 0.12.1", + "pbkdf2", + "sha1", + "time", + "zstd", +] + +[[package]] +name = "zstd" +version = "0.11.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "5.0.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.8+zstd.1.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5556e6ee25d32df2586c098bbfa278803692a20d0ab9565e049480d52707ec8c" +dependencies = [ + "cc", + "libc", + "pkg-config", ] diff --git a/Cargo.toml b/Cargo.toml index 9937404a..d1443fc8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,10 +21,14 @@ dunce = "1.0.3" futures = "0.3.28" if-addrs = "0.10.1" Inflector = "0.11.4" +influxive-otel-atomic-obs = "=0.0.1-alpha.11" +influxive-child-svc = "=0.0.1-alpha.11" +influxive = "=0.0.1-alpha.11" lair_keystore_api = "0.2.4" libc = "0.2.141" libloading = "0.8.0" once_cell = "1.17.1" +opentelemetry_api = { version = "=0.20.0-beta.1", features = [ "metrics" ], package = "ts_opentelemetry_api" } ouroboros = "0.15.6" parking_lot = "0.12.1" prometheus = "0.13.3" @@ -54,7 +58,7 @@ tx5-go-pion-sys = { version = "0.0.1-alpha.12", path = "crates/tx5-go-pion-sys" tx5-go-pion = { version = "0.0.1-alpha.11", path = "crates/tx5-go-pion" } tx5-signal-srv = { version = "0.0.1-alpha.9", path = "crates/tx5-signal-srv" } tx5-signal = { version = "0.0.1-alpha.10", path = "crates/tx5-signal" } -tx5 = { version = "0.0.1-alpha.18", path = "crates/tx5" } +tx5 = { version = "0.0.1-alpha.19", path = "crates/tx5" } url = { version = "2.3.1", features = [ "serde" ] } warp = { version = "0.3.4", features = [ "websocket" ] } webpki-roots = { version = "0.23.0" } diff --git a/Makefile b/Makefile index 618601d3..c5264f3b 100644 --- a/Makefile +++ b/Makefile @@ -57,7 +57,7 @@ publish: test: static tools cargo build --all-targets - RUST_BACKTRACE=1 cargo test -- --nocapture + RUST_BACKTRACE=1 cargo test -- --test-threads 1 --nocapture static: docs tools cargo fmt -- --check diff --git a/crates/tx5-demo/Cargo.toml b/crates/tx5-demo/Cargo.toml index 8ff7b131..73c0b941 100644 --- a/crates/tx5-demo/Cargo.toml +++ b/crates/tx5-demo/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tx5-demo" -version = "0.0.1-alpha.18" +version = "0.0.1-alpha.19" edition = "2021" description = "Demo crate showing off Tx5 WebRTC functionality" license = "MIT OR Apache-2.0" @@ -15,9 +15,12 @@ base64 = { workspace = true } bytes = { workspace = true } clap = { workspace = true } futures = { workspace = true } +influxive = { workspace = true } +opentelemetry_api = { workspace = true } rand = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +tempfile = { workspace = true } tokio = { workspace = true, features = [ "full" ] } tracing-appender = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/crates/tx5-demo/src/influxive-dashboards/tx5.json b/crates/tx5-demo/src/influxive-dashboards/tx5.json new file mode 100644 index 00000000..db49d598 --- /dev/null +++ b/crates/tx5-demo/src/influxive-dashboards/tx5.json @@ -0,0 +1 @@ +[{"apiVersion":"influxdata.com/v2alpha1","kind":"Dashboard","metadata":{"name":"friendly-newton-bcb001"},"spec":{"charts":[{"axes":[{"base":"10","name":"x","scale":"linear"},{"base":"10","name":"y","scale":"linear"}],"colorizeRows":true,"colors":[{"id":"19KQVrepsKKiJharUsQRL","name":"Nineteen Eighty Four","type":"scale","hex":"#31C0F6"},{"id":"FhLy7t4V0RZX2QmEFw08Y","name":"Nineteen Eighty Four","type":"scale","hex":"#A500A5"},{"id":"v8IB6UJ2wuRwD8QxSsVxS","name":"Nineteen Eighty Four","type":"scale","hex":"#FF7E27"}],"geom":"line","height":4,"hoverDimension":"auto","kind":"Xy","legendColorizeRows":true,"legendOpacity":1,"legendOrientationThreshold":100000000,"name":"Message Count Received","opacity":1,"orientationThreshold":100000000,"position":"overlaid","queries":[{"query":"from(bucket: \"influxive\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r[\"_measurement\"] == \"tx5.conn.data.recv.message.count\")\n |> drop(columns: [\"conn_uniq\"]) \n |> sort(columns: [\"_time\"], desc: false)"}],"staticLegend":{"colorizeRows":true,"opacity":1,"orientationThreshold":100000000,"widthRatio":1},"width":4,"widthRatio":1,"xCol":"_time","yCol":"_value"},{"axes":[{"base":"10","name":"x","scale":"linear"},{"base":"10","name":"y","scale":"linear"}],"colorizeRows":true,"colors":[{"id":"19KQVrepsKKiJharUsQRL","name":"Nineteen Eighty Four","type":"scale","hex":"#31C0F6"},{"id":"FhLy7t4V0RZX2QmEFw08Y","name":"Nineteen Eighty Four","type":"scale","hex":"#A500A5"},{"id":"v8IB6UJ2wuRwD8QxSsVxS","name":"Nineteen Eighty Four","type":"scale","hex":"#FF7E27"}],"geom":"line","height":4,"hoverDimension":"auto","kind":"Xy","legendColorizeRows":true,"legendOpacity":1,"legendOrientationThreshold":100000000,"name":"Bytes Received","opacity":1,"orientationThreshold":100000000,"position":"overlaid","queries":[{"query":"from(bucket: \"influxive\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r[\"_measurement\"] == \"tx5.conn.data.recv.By\")\n |> drop(columns: [\"conn_uniq\"]) \n |> sort(columns: [\"_time\"], desc: false)"}],"staticLegend":{"colorizeRows":true,"opacity":1,"orientationThreshold":100000000,"widthRatio":1},"width":4,"widthRatio":1,"xCol":"_time","yCol":"_value","yPos":4},{"axes":[{"base":"10","name":"x","scale":"linear"},{"base":"10","name":"y","scale":"linear"}],"colorizeRows":true,"colors":[{"id":"19KQVrepsKKiJharUsQRL","name":"Nineteen Eighty Four","type":"scale","hex":"#31C0F6"},{"id":"FhLy7t4V0RZX2QmEFw08Y","name":"Nineteen Eighty Four","type":"scale","hex":"#A500A5"},{"id":"v8IB6UJ2wuRwD8QxSsVxS","name":"Nineteen Eighty Four","type":"scale","hex":"#FF7E27"}],"geom":"line","height":4,"hoverDimension":"auto","kind":"Xy","legendColorizeRows":true,"legendOpacity":1,"legendOrientationThreshold":100000000,"name":"Message Count Sent","opacity":1,"orientationThreshold":100000000,"position":"overlaid","queries":[{"query":"from(bucket: \"influxive\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r[\"_measurement\"] == \"tx5.conn.data.send.message.count\")\n |> drop(columns: [\"conn_uniq\"]) \n |> sort(columns: [\"_time\"], desc: false)"}],"staticLegend":{"colorizeRows":true,"opacity":1,"orientationThreshold":100000000,"widthRatio":1},"width":4,"widthRatio":1,"xCol":"_time","xPos":4,"yCol":"_value"},{"axes":[{"base":"10","name":"x","scale":"linear"},{"base":"10","name":"y","scale":"linear"}],"colorizeRows":true,"colors":[{"id":"19KQVrepsKKiJharUsQRL","name":"Nineteen Eighty Four","type":"scale","hex":"#31C0F6"},{"id":"FhLy7t4V0RZX2QmEFw08Y","name":"Nineteen Eighty Four","type":"scale","hex":"#A500A5"},{"id":"v8IB6UJ2wuRwD8QxSsVxS","name":"Nineteen Eighty Four","type":"scale","hex":"#FF7E27"}],"geom":"line","height":4,"hoverDimension":"auto","kind":"Xy","legendColorizeRows":true,"legendOpacity":1,"legendOrientationThreshold":100000000,"name":"Bytes Sent","opacity":1,"orientationThreshold":100000000,"position":"overlaid","queries":[{"query":"from(bucket: \"influxive\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r[\"_measurement\"] == \"tx5.conn.data.send.By\")\n |> drop(columns: [\"conn_uniq\"]) \n |> sort(columns: [\"_time\"], desc: false)"}],"staticLegend":{"colorizeRows":true,"opacity":1,"orientationThreshold":100000000,"widthRatio":1},"width":4,"widthRatio":1,"xCol":"_time","xPos":4,"yCol":"_value","yPos":4},{"axes":[{"base":"10","name":"x","scale":"linear"},{"base":"10","name":"y","scale":"linear"}],"colorizeRows":true,"colors":[{"id":"O3aLWnkE56AGRCu6zCxUy","name":"Nineteen Eighty Four","type":"scale","hex":"#31C0F6"},{"id":"kzi9LOJHiS97efftC_3Kw","name":"Nineteen Eighty Four","type":"scale","hex":"#A500A5"},{"id":"dCwK-lfuiQaxty6sIoicN","name":"Nineteen Eighty Four","type":"scale","hex":"#FF7E27"}],"geom":"line","height":4,"hoverDimension":"auto","kind":"Xy","legendColorizeRows":true,"legendOpacity":1,"legendOrientationThreshold":100000000,"name":"Connection Count","opacity":1,"orientationThreshold":100000000,"position":"overlaid","queries":[{"query":"from(bucket: \"influxive\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r[\"_measurement\"] == \"tx5.endpoint.conn.count\")"}],"shade":true,"staticLegend":{"colorizeRows":true,"opacity":1,"orientationThreshold":100000000,"widthRatio":1},"width":4,"widthRatio":1,"xCol":"_time","xPos":8,"yCol":"_value"}],"description":"Tx5 Metrics Dashboard","name":"Tx5"}}] \ No newline at end of file diff --git a/crates/tx5-demo/src/main.rs b/crates/tx5-demo/src/main.rs index af8a8660..41b7c86d 100644 --- a/crates/tx5-demo/src/main.rs +++ b/crates/tx5-demo/src/main.rs @@ -3,6 +3,8 @@ #![allow(clippy::needless_range_loop)] //! tx5-demo +const DASH_TX5: &[u8] = include_bytes!("influxive-dashboards/tx5.json"); + use bytes::Buf; use clap::Parser; use std::collections::HashMap; @@ -296,6 +298,23 @@ async fn main_err() -> Result<()> { .init(); } + let tmp = tempfile::tempdir()?; + + let (i, meter_provider) = + influxive::influxive_child_process_meter_provider( + influxive::InfluxiveChildSvcConfig::default() + .with_database_path(Some(tmp.path().to_owned())), + influxive::InfluxiveMeterProviderConfig::default(), + ) + .await?; + if let Ok(cur) = i.list_dashboards().await { + if cur.contains("\"dashboards\": []") { + let _ = i.apply(DASH_TX5).await; + } + } + opentelemetry_api::global::set_meter_provider(meter_provider); + d!(info, "METRICS", "{}", i.get_host()); + let sig_url = Tx5Url::new(sig_url)?; let (ep, mut evt) = tx5::Ep::new().await?; @@ -384,7 +403,7 @@ async fn main_err() -> Result<()> { })) => { node.add_known_peer(rem_cli_url.clone()); match Message::decode(data.copy_to_bytes(data.remaining())) { - Err(err) => panic!("{err:?}"), + Err(err) => d!(error, "RECV_ERROR", "{err:?}"), Ok(Message::Hello { known_peers: kp }) => { for peer in kp { node.add_known_peer(Tx5Url::new(peer).unwrap()); diff --git a/crates/tx5/Cargo.toml b/crates/tx5/Cargo.toml index b76a3b8a..5a910702 100644 --- a/crates/tx5/Cargo.toml +++ b/crates/tx5/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tx5" -version = "0.0.1-alpha.18" +version = "0.0.1-alpha.19" edition = "2021" description = "The main holochain tx5 webrtc networking crate" license = "MIT OR Apache-2.0" @@ -22,11 +22,13 @@ backend-webrtc-rs = [ "webrtc" ] [dependencies] bytes = { workspace = true } futures = { workspace = true } +influxive-otel-atomic-obs = { workspace = true } once_cell = { workspace = true } +opentelemetry_api = { workspace = true } parking_lot = { workspace = true } -prometheus = { workspace = true } rand = { workspace = true } rand-utf8 = { workspace = true } +serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true, features = [ "rt", "sync" ] } tracing = { workspace = true } @@ -38,6 +40,9 @@ webrtc = { workspace = true, optional = true } [dev-dependencies] criterion = { workspace = true } +influxive-child-svc = { workspace = true } +influxive = { workspace = true } +tempfile = { workspace = true } tokio = { workspace = true, features = [ "macros", "rt", "rt-multi-thread", "sync" ] } tracing-subscriber = { workspace = true } tx5-signal-srv = { workspace = true } diff --git a/crates/tx5/src/config.rs b/crates/tx5/src/config.rs index 373d2101..1cb27181 100644 --- a/crates/tx5/src/config.rs +++ b/crates/tx5/src/config.rs @@ -19,9 +19,6 @@ pub trait Config: 'static + Send + Sync { /// Get the max init (connect) time for a connection. fn max_conn_init(&self) -> std::time::Duration; - /// Request the prometheus registry used by this config. - fn metrics(&self) -> &prometheus::Registry; - /// Request the lair client associated with this config. fn lair_client(&self) -> &LairClient; @@ -89,7 +86,6 @@ struct DefConfigBuilt { max_recv_bytes: u32, max_conn_count: u32, max_conn_init: std::time::Duration, - metrics: prometheus::Registry, _lair_keystore: Option, lair_client: LairClient, lair_tag: Arc, @@ -142,10 +138,6 @@ impl Config for DefConfigBuilt { self.max_conn_init } - fn metrics(&self) -> &prometheus::Registry { - &self.metrics - } - fn lair_client(&self) -> &LairClient { &self.lair_client } @@ -202,7 +194,6 @@ pub struct DefConfig { max_recv_bytes: Option, max_conn_count: Option, max_conn_init: Option, - metrics: Option, lair_client: Option, lair_tag: Option>, on_new_sig_cb: Option< @@ -258,9 +249,6 @@ impl IntoConfig for DefConfig { let max_conn_init = self .max_conn_init .unwrap_or(std::time::Duration::from_secs(60)); - let metrics = self - .metrics - .unwrap_or_else(|| prometheus::default_registry().clone()); let mut lair_keystore = None; let lair_tag = self.lair_tag.unwrap_or_else(|| { @@ -335,7 +323,6 @@ impl IntoConfig for DefConfig { max_recv_bytes, max_conn_count, max_conn_init, - metrics, _lair_keystore: lair_keystore, lair_client, lair_tag, @@ -402,18 +389,6 @@ impl DefConfig { self } - /// Set the prometheus metrics registry to use. - /// The default is the global static default registry. - pub fn set_metrics(&mut self, metrics: prometheus::Registry) { - self.metrics = Some(metrics); - } - - /// See `set_metrics()`, this is the builder version. - pub fn with_metrics(mut self, metrics: prometheus::Registry) -> Self { - self.set_metrics(metrics); - self - } - /// Set the lair client. /// The default is a generated in-process, in-memory only keystore. pub fn set_lair_client(&mut self, lair_client: LairClient) { diff --git a/crates/tx5/src/endpoint.rs b/crates/tx5/src/endpoint.rs index 2c80b969..b0df40b5 100644 --- a/crates/tx5/src/endpoint.rs +++ b/crates/tx5/src/endpoint.rs @@ -1,6 +1,8 @@ //! Tx5 endpoint. use crate::*; +use opentelemetry_api::{metrics::Unit, KeyValue}; +use std::collections::HashMap; use std::sync::Arc; use tx5_core::Tx5Url; @@ -331,6 +333,19 @@ async fn new_sig_task( tracing::warn!("signal connection CLOSED"); } +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +struct BackendMetrics { + #[serde(default)] + messages_sent: u64, + #[serde(default)] + messages_received: u64, + #[serde(default)] + bytes_sent: u64, + #[serde(default)] + bytes_received: u64, +} + #[cfg(feature = "backend-go-pion")] pub(crate) fn on_new_conn( config: DynConfig, @@ -351,6 +366,11 @@ async fn new_conn_task( use tx5_go_pion::PeerConnectionState as PeerState; enum MultiEvt { + Stats( + tokio::sync::oneshot::Sender< + Option>, + >, + ), Peer(PeerEvt), Data(DataEvt), } @@ -383,6 +403,132 @@ async fn new_conn_task( Ok(r) => r, }; + let state_uniq = conn_state.meta().state_uniq.clone(); + let conn_uniq = conn_state.meta().conn_uniq.clone(); + let rem_id = conn_state.meta().cli_url.id().unwrap(); + + struct Unregister( + Option>, + ); + impl Drop for Unregister { + fn drop(&mut self) { + if let Some(mut unregister) = self.0.take() { + let _ = unregister.unregister(); + } + } + } + + let slot: Arc>>> = + Arc::new(std::sync::Mutex::new(None)); + let weak_slot = Arc::downgrade(&slot); + let peer_snd_task = peer_snd.clone(); + tokio::task::spawn(async move { + loop { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + if let Some(slot) = weak_slot.upgrade() { + let (s, r) = tokio::sync::oneshot::channel(); + if peer_snd_task.send(MultiEvt::Stats(s)).is_err() { + break; + } + if let Ok(stats) = r.await { + *slot.lock().unwrap() = stats; + } + } else { + break; + } + } + }); + + let weak_slot = Arc::downgrade(&slot); + let _unregister = { + use opentelemetry_api::metrics::MeterProvider; + + let meter = opentelemetry_api::global::meter_provider() + .versioned_meter( + "tx5", + None::<&'static str>, + None::<&'static str>, + Some(vec![ + KeyValue::new("state_uniq", state_uniq.to_string()), + KeyValue::new("conn_uniq", conn_uniq.to_string()), + KeyValue::new("remote_id", rem_id.to_string()), + ]), + ); + let ice_snd = meter + .u64_observable_counter("tx5.conn.ice.send") + .with_description("Bytes sent on ice channel") + .with_unit(Unit::new("By")) + .init(); + let ice_rcv = meter + .u64_observable_counter("tx5.conn.ice.recv") + .with_description("Bytes received on ice channel") + .with_unit(Unit::new("By")) + .init(); + let data_snd = meter + .u64_observable_counter("tx5.conn.data.send") + .with_description("Bytes sent on data channel") + .with_unit(Unit::new("By")) + .init(); + let data_rcv = meter + .u64_observable_counter("tx5.conn.data.recv") + .with_description("Bytes received on data channel") + .with_unit(Unit::new("By")) + .init(); + let data_snd_msg = meter + .u64_observable_counter("tx5.conn.data.send.message.count") + .with_description("Message count sent on data channel") + .init(); + let data_rcv_msg = meter + .u64_observable_counter("tx5.conn.data.recv.message.count") + .with_description("Message count received on data channel") + .init(); + let unregister = match meter.register_callback( + &[data_snd.as_any(), data_rcv.as_any()], + move |obs| { + if let Some(slot) = weak_slot.upgrade() { + let guard = slot.lock().unwrap(); + if let Some(slot) = &*guard { + for (k, v) in slot.iter() { + if k.starts_with("DataChannel") { + obs.observe_u64(&data_snd, v.bytes_sent, &[]); + obs.observe_u64( + &data_rcv, + v.bytes_received, + &[], + ); + obs.observe_u64( + &data_snd_msg, + v.messages_sent, + &[], + ); + obs.observe_u64( + &data_rcv_msg, + v.messages_received, + &[], + ); + } else if k.starts_with("iceTransport") { + obs.observe_u64(&ice_snd, v.bytes_sent, &[]); + obs.observe_u64( + &ice_rcv, + v.bytes_received, + &[], + ); + } + } + } + } + }, + ) { + Ok(unregister) => Some(unregister), + Err(err) => { + tracing::warn!(?err, "unable to register connection metrics"); + None + } + }; + Unregister(unregister) + }; + let mut data_chan: Option = None; tracing::debug!("PEER CON OPEN"); @@ -395,6 +541,17 @@ async fn new_conn_task( conn_state.close(Error::id("PeerConClosed")); break; } + Some(MultiEvt::Stats(resp)) => { + if let Ok(mut buf) = peer.stats().await.map(BackBuf::from_raw) { + if let Ok(val) = buf.to_json() { + let _ = resp.send(Some(val)); + } else { + let _ = resp.send(None); + } + } else { + let _ = resp.send(None); + } + } Some(MultiEvt::Peer(PeerEvt::Error(err))) => { conn_state.close(err); break; diff --git a/crates/tx5/src/state.rs b/crates/tx5/src/state.rs index 0bd50a71..e12b4406 100644 --- a/crates/tx5/src/state.rs +++ b/crates/tx5/src/state.rs @@ -8,6 +8,9 @@ use std::collections::{hash_map, HashMap}; use std::future::Future; use std::sync::Arc; +use influxive_otel_atomic_obs::*; +use opentelemetry_api::metrics::MeterProvider; + use tx5_core::{Id, Tx5Url}; mod sig; @@ -33,64 +36,6 @@ const MAX_CON_TIME: std::time::Duration = const CON_CLOSE_SEND_GRACE: std::time::Duration = std::time::Duration::from_secs(30); -// TODO - creates too many time series, just aggregate the full counts -pub(crate) fn bad_uniq() -> u64 { - use std::sync::atomic::{AtomicU64, Ordering}; - static UNIQ: AtomicU64 = AtomicU64::new(1); - UNIQ.fetch_add(1, Ordering::Relaxed) -} - -#[derive(Clone, Debug)] -pub(crate) struct MetricTimestamp( - Arc>, - prometheus::IntGauge, -); - -impl MetricTimestamp { - pub fn new, S2: Into>( - name: S1, - help: S2, - ) -> Result { - let now = Arc::new(parking_lot::Mutex::new(std::time::Instant::now())); - let metric = - prometheus::IntGauge::new(name, help).map_err(Error::err)?; - Ok(Self(now, metric)) - } - - pub fn reset(&self) { - let now = std::time::Instant::now(); - *self.0.lock() = now; - self.1.set(0); - } - - pub fn elapsed(&self) -> std::time::Duration { - self.0.lock().elapsed() - } - - fn priv_update(&self) { - let elapsed = self.elapsed().as_micros(); - self.1.set(elapsed as i64); - } -} - -impl prometheus::core::Collector for MetricTimestamp { - fn desc(&self) -> Vec<&prometheus::core::Desc> { - self.1.desc() - } - - fn collect(&self) -> Vec { - self.priv_update(); - self.1.collect() - } -} - -impl prometheus::core::Metric for MetricTimestamp { - fn metric(&self) -> prometheus::proto::Metric { - self.priv_update(); - self.1.metric() - } -} - /// Respond type. #[must_use] pub struct OneSnd( @@ -241,8 +186,6 @@ struct StateData { state_uniq: Uniq, this_id: Option, this: StateWeak, - state_prefix: Arc, - metrics: prometheus::Registry, meta: StateMeta, evt: StateEvtSnd, signal_map: HashMap, @@ -262,9 +205,6 @@ impl Drop for StateData { impl StateData { fn shutdown(&mut self, err: std::io::Error) { tracing::trace!(state_uniq = %self.state_uniq, this_id = ?self.this_id, "StateShutdown"); - let _ = self - .metrics - .unregister(Box::new(self.meta.metric_conn_count.clone())); for (_, sig) in self.signal_map.drain() { if let Some(sig) = sig.upgrade() { sig.close(err.err_clone()); @@ -380,7 +320,7 @@ impl StateData { let meta = conn.meta(); tot_snd_bytes += meta.metric_bytes_snd.get(); tot_rcv_bytes += meta.metric_bytes_rcv.get(); - tot_age += meta.metric_age.elapsed().as_secs_f64(); + tot_age += meta.created_at.elapsed().as_secs_f64(); age_cnt += 1.0; } @@ -406,11 +346,8 @@ impl StateData { .load(std::sync::atomic::Ordering::SeqCst), this_snd_bytes: meta.metric_bytes_snd.get(), this_rcv_bytes: meta.metric_bytes_rcv.get(), - this_age_s: meta.metric_age.elapsed().as_secs_f64(), - this_last_active_s: meta - .metric_last_active - .elapsed() - .as_secs_f64(), + this_age_s: meta.created_at.elapsed().as_secs_f64(), + this_last_active_s: meta.last_active_at.elapsed().as_secs_f64(), }; if let drop_consider::DropConsiderResult::MustDrop = @@ -536,11 +473,10 @@ impl StateData { let conn = match ConnState::new_and_publish( self.meta.config.clone(), self.meta.conn_limit.clone(), - self.state_prefix.clone(), - self.metrics.clone(), self.meta.metric_conn_count.clone(), self.this.clone(), sig, + self.state_uniq.clone(), conn_uniq, self.this_id.unwrap(), cli_url.clone(), @@ -912,8 +848,6 @@ async fn state_task( mut rcv: ManyRcv, state_uniq: Uniq, this: StateWeak, - state_prefix: Arc, - metrics: prometheus::Registry, meta: StateMeta, evt: StateEvtSnd, recv_limit: Arc, @@ -922,8 +856,6 @@ async fn state_task( state_uniq, this_id: None, this, - state_prefix, - metrics, meta, evt, signal_map: HashMap::new(), @@ -954,7 +886,7 @@ pub(crate) struct StateMeta { pub(crate) config: DynConfig, pub(crate) conn_limit: Arc, pub(crate) snd_limit: Arc, - pub(crate) metric_conn_count: prometheus::IntGauge, + pub(crate) metric_conn_count: AtomicObservableUpDownCounterI64, pub(crate) snd_ident: Arc, } @@ -984,8 +916,6 @@ impl Eq for State {} impl State { /// Construct a new state instance. pub fn new(config: DynConfig) -> Result<(Self, ManyRcv)> { - let metrics = config.metrics().clone(); - let conn_limit = Arc::new(tokio::sync::Semaphore::new( config.max_conn_count() as usize, )); @@ -999,17 +929,20 @@ impl State { let state_uniq = Uniq::default(); - let bad_uniq = bad_uniq(); - let state_prefix = format!("tx5_ep_{bad_uniq}").into_boxed_str().into(); - - let metric_conn_count = prometheus::IntGauge::new( - format!("{state_prefix}_conn_count"), - "active connection count", - ) - .map_err(Error::err)?; - metrics - .register(Box::new(metric_conn_count.clone())) - .map_err(Error::err)?; + let metric_conn_count = opentelemetry_api::global::meter_provider() + .versioned_meter( + "tx5", + None::<&'static str>, + None::<&'static str>, + Some(vec![opentelemetry_api::KeyValue::new( + "state_uniq", + state_uniq.to_string(), + )]), + ) + .i64_observable_up_down_counter_atomic("tx5.endpoint.conn.count", 0) + .with_description("Count of open connections managed by endpoint") + .init() + .0; let meta = StateMeta { state_uniq: state_uniq.clone(), @@ -1028,8 +961,6 @@ impl State { rcv, state_uniq, StateWeak(this, meta.clone()), - state_prefix, - metrics, meta, StateEvtSnd(state_snd), rcv_limit, diff --git a/crates/tx5/src/state/conn.rs b/crates/tx5/src/state/conn.rs index 3d81ede1..43cda1d8 100644 --- a/crates/tx5/src/state/conn.rs +++ b/crates/tx5/src/state/conn.rs @@ -241,8 +241,7 @@ impl ConnStateEvtSnd { struct ConnStateData { conn_uniq: Uniq, this: ConnStateWeak, - metrics: prometheus::Registry, - metric_conn_count: prometheus::IntGauge, + metric_conn_count: AtomicObservableUpDownCounterI64, meta: ConnStateMeta, state: StateWeak, this_id: Id, @@ -260,6 +259,7 @@ struct ConnStateData { impl Drop for ConnStateData { fn drop(&mut self) { + self.metric_conn_count.add(-1); self.shutdown(Error::id("Dropped")); } } @@ -277,19 +277,6 @@ impl ConnStateData { rem_id = ?self.rem_id, "ConnShutdown", ); - self.metric_conn_count.dec(); - let _ = self - .metrics - .unregister(Box::new(self.meta.metric_bytes_snd.clone())); - let _ = self - .metrics - .unregister(Box::new(self.meta.metric_bytes_rcv.clone())); - let _ = self - .metrics - .unregister(Box::new(self.meta.metric_age.clone())); - let _ = self - .metrics - .unregister(Box::new(self.meta.metric_last_active.clone())); if let Some(state) = self.state.upgrade() { state.close_conn(self.rem_id, self.this.clone(), err.err_clone()); } @@ -334,8 +321,7 @@ impl ConnStateData { } async fn tick_1s(&mut self) -> Result<()> { - if self.meta.metric_last_active.elapsed() - > self.meta.config.max_conn_init() + if self.meta.last_active_at.elapsed() > self.meta.config.max_conn_init() && !self.connected() { self.shutdown(Error::id("InactivityTimeout")); @@ -531,7 +517,7 @@ impl ConnStateData { // if we are within the close time send grace period // do not send any new messages so we can try to shut // down gracefully - if self.meta.metric_age.elapsed() + if self.meta.created_at.elapsed() > (MAX_CON_TIME - CON_CLOSE_SEND_GRACE) { return Ok(()); @@ -558,8 +544,8 @@ impl ConnStateData { tracing::trace!(conn_uniq = %self.conn_uniq, %msg_uniq, "conn send"); - self.meta.metric_last_active.reset(); - self.meta.metric_bytes_snd.inc_by(data.len()? as u64); + self.meta.last_active_at = std::time::Instant::now(); + self.meta.metric_bytes_snd.add(data.len()? as u64); self.conn_evt.snd_data( self.this.clone(), @@ -610,9 +596,9 @@ impl ConnStateData { permit: tokio::sync::OwnedSemaphorePermit, ) -> Result<()> { let len = data.len(); - self.meta.metric_last_active.reset(); + self.meta.last_active_at = std::time::Instant::now(); - self.meta.metric_bytes_rcv.inc_by(len as u64); + self.meta.metric_bytes_rcv.add(len as u64); let is_finish = ident.is_finish(); let ident = ident.unset_finish(); @@ -699,8 +685,7 @@ enum ConnCmd { #[allow(clippy::too_many_arguments)] async fn conn_state_task( conn_limit: Arc, - metrics: prometheus::Registry, - metric_conn_count: prometheus::IntGauge, + metric_conn_count: AtomicObservableUpDownCounterI64, meta: ConnStateMeta, strong: ConnState, conn_rcv: ManyRcv, @@ -714,12 +699,11 @@ async fn conn_state_task( sig_state: SigStateWeak, sig_ready: tokio::sync::oneshot::Receiver>, ) -> Result<()> { - metric_conn_count.inc(); + metric_conn_count.add(1); let mut data = ConnStateData { conn_uniq, this, - metrics, metric_conn_count, meta, state, @@ -784,17 +768,17 @@ async fn conn_state_task( #[derive(Clone)] pub(crate) struct ConnStateMeta { - created_at: std::time::Instant, - cli_url: Tx5Url, + pub(crate) created_at: std::time::Instant, + pub(crate) last_active_at: std::time::Instant, + pub(crate) cli_url: Tx5Url, + pub(crate) state_uniq: Uniq, pub(crate) conn_uniq: Uniq, pub(crate) config: DynConfig, pub(crate) connected: Arc, _conn_snd: ConnStateEvtSnd, pub(crate) rcv_limit: Arc, - pub(crate) metric_bytes_snd: prometheus::IntCounter, - pub(crate) metric_bytes_rcv: prometheus::IntCounter, - pub(crate) metric_age: MetricTimestamp, - pub(crate) metric_last_active: MetricTimestamp, + pub(crate) metric_bytes_snd: AtomicObservableCounterU64, + pub(crate) metric_bytes_rcv: AtomicObservableCounterU64, snd_ident: Arc, } @@ -854,6 +838,10 @@ impl ConnState { } */ + pub(crate) fn meta(&self) -> &ConnStateMeta { + &self.1 + } + /// Get a weak version of this ConnState instance. pub fn weak(&self) -> ConnStateWeak { ConnStateWeak(self.0.weak(), self.1.clone()) @@ -973,11 +961,10 @@ impl ConnState { pub(crate) fn new_and_publish( config: DynConfig, conn_limit: Arc, - state_prefix: Arc, - metrics: prometheus::Registry, - metric_conn_count: prometheus::IntGauge, + metric_conn_count: AtomicObservableUpDownCounterI64, state: StateWeak, sig_state: SigStateWeak, + state_uniq: Uniq, conn_uniq: Uniq, this_id: Id, cli_url: Tx5Url, @@ -990,48 +977,63 @@ impl ConnState { let (conn_snd, conn_rcv) = tokio::sync::mpsc::unbounded_channel(); let conn_snd = ConnStateEvtSnd(conn_snd); - // TODO - creates too many time series, just aggregate the full counts - let bad_uniq = bad_uniq(); - - let metric_bytes_snd = prometheus::IntCounter::new( - format!("{state_prefix}_conn_{bad_uniq}_bytes_snd"), - "bytes sent out of this connection", - ) - .map_err(Error::err)?; - metrics - .register(Box::new(metric_bytes_snd.clone())) - .map_err(Error::err)?; - - let metric_bytes_rcv = prometheus::IntCounter::new( - format!("{state_prefix}_conn_{bad_uniq}_bytes_rcv"), - "incoming bytes received by this connection", - ) - .map_err(Error::err)?; - metrics - .register(Box::new(metric_bytes_rcv.clone())) - .map_err(Error::err)?; - - let metric_age = MetricTimestamp::new( - format!("{state_prefix}_conn_{bad_uniq}_age"), - "microseconds since this connection was created", - ) - .map_err(Error::err)?; - metrics - .register(Box::new(metric_age.clone())) - .map_err(Error::err)?; - - let metric_last_active = MetricTimestamp::new( - format!("{state_prefix}_conn_{bad_uniq}_last_active"), - "microseconds since we last sent or received data on this connection", - ) - .map_err(Error::err)?; - metrics - .register(Box::new(metric_last_active.clone())) - .map_err(Error::err)?; + let metric_bytes_snd = opentelemetry_api::global::meter_provider() + .versioned_meter( + "tx5", + None::<&'static str>, + None::<&'static str>, + Some(vec![ + opentelemetry_api::KeyValue::new( + "state_uniq", + state_uniq.to_string(), + ), + opentelemetry_api::KeyValue::new( + "conn_uniq", + conn_uniq.to_string(), + ), + opentelemetry_api::KeyValue::new( + "remote_id", + rem_id.to_string(), + ), + ]), + ) + .u64_observable_counter_atomic("tx5.endpoint.conn.send", 0) + .with_description("Outgoing bytes sent on this connection") + .with_unit(opentelemetry_api::metrics::Unit::new("By")) + .init() + .0; + + let metric_bytes_rcv = opentelemetry_api::global::meter_provider() + .versioned_meter( + "tx5", + None::<&'static str>, + None::<&'static str>, + Some(vec![ + opentelemetry_api::KeyValue::new( + "state_uniq", + state_uniq.to_string(), + ), + opentelemetry_api::KeyValue::new( + "conn_uniq", + conn_uniq.to_string(), + ), + opentelemetry_api::KeyValue::new( + "remote_id", + rem_id.to_string(), + ), + ]), + ) + .u64_observable_counter_atomic("tx5.endpoint.conn.recv", 0) + .with_description("Incoming bytes received on this connection") + .with_unit(opentelemetry_api::metrics::Unit::new("By")) + .init() + .0; let meta = ConnStateMeta { created_at: std::time::Instant::now(), + last_active_at: std::time::Instant::now(), cli_url, + state_uniq, conn_uniq: conn_uniq.clone(), config: config.clone(), connected: Arc::new(atomic::AtomicBool::new(false)), @@ -1039,8 +1041,6 @@ impl ConnState { rcv_limit, metric_bytes_snd, metric_bytes_rcv, - metric_age, - metric_last_active, snd_ident, }; @@ -1056,7 +1056,6 @@ impl ConnState { let strong = ConnState(this.upgrade().unwrap(), meta.clone()); conn_state_task( conn_limit, - metrics, metric_conn_count, meta.clone(), strong, diff --git a/crates/tx5/src/state/test.rs b/crates/tx5/src/state/test.rs index a3472a09..fb8a7f58 100644 --- a/crates/tx5/src/state/test.rs +++ b/crates/tx5/src/state/test.rs @@ -13,6 +13,7 @@ fn init_tracing() { struct Test { shutdown: bool, + influxive: Arc, cli_a: Tx5Url, id_a: Id, cli_b: Tx5Url, @@ -35,6 +36,23 @@ impl Drop for Test { impl Test { pub async fn new(as_a: bool) -> Self { + init_tracing(); + + let tmp = tempfile::tempdir().unwrap(); + + let (influxive, meter_provider) = + influxive::influxive_child_process_meter_provider( + influxive::InfluxiveChildSvcConfig::default() + .with_database_path(Some(tmp.path().to_owned())), + influxive::InfluxiveMeterProviderConfig::default() + .with_observable_report_interval(Some( + std::time::Duration::from_millis(1), + )), + ) + .await + .unwrap(); + opentelemetry_api::global::set_meter_provider(meter_provider); + let sig: Tx5Url = Tx5Url::new("wss://s").unwrap(); let cli_a: Tx5Url = Tx5Url::new( "wss://s/tx5-ws/AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", @@ -91,6 +109,7 @@ impl Test { Self { shutdown: false, + influxive, cli_a, id_a, cli_b, @@ -105,12 +124,19 @@ impl Test { pub async fn shutdown(mut self) { self.shutdown = true; - let enc = prometheus::TextEncoder::new(); - let mut buf = Vec::new(); - use prometheus::Encoder; - enc.encode(&prometheus::default_registry().gather(), &mut buf) + let result = self + .influxive + .query( + r#"from(bucket: "influxive") + |> range(start: -15m, stop: now()) + "#, + ) + .await .unwrap(); - println!("{}", String::from_utf8_lossy(&buf)); + + println!("{result}"); + + self.influxive.shutdown(); self.state.close(Error::id("TestShutdown")); @@ -143,8 +169,6 @@ impl Test { #[tokio::test(flavor = "multi_thread")] async fn extended_outgoing() { - init_tracing(); - let mut test = Test::new(true).await; // -- send data to a "peer" (causes connecting to that peer) -- // diff --git a/crates/tx5/src/test.rs b/crates/tx5/src/test.rs index f0d43d9f..f67a6307 100644 --- a/crates/tx5/src/test.rs +++ b/crates/tx5/src/test.rs @@ -16,6 +16,23 @@ fn init_tracing() { async fn endpoint_sanity() { init_tracing(); + let tmp = tempfile::tempdir().unwrap(); + + let (influxive, meter_provider) = + influxive::influxive_child_process_meter_provider( + influxive::InfluxiveChildSvcConfig::default() + .with_database_path(Some(tmp.path().to_owned())), + influxive::InfluxiveMeterProviderConfig::default() + .with_observable_report_interval(Some( + std::time::Duration::from_millis(200), + )), + ) + .await + .unwrap(); + opentelemetry_api::global::set_meter_provider(meter_provider); + + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let mut srv_config = tx5_signal_srv::Config::default(); srv_config.port = 0; srv_config.demo = true; @@ -71,6 +88,25 @@ async fn endpoint_sanity() { "{}", serde_json::to_string_pretty(&ep2.get_stats().await.unwrap()).unwrap() ); + + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + let metrics = influxive + .query( + r#"from(bucket: "influxive") +|> range(start: -15m, stop: now()) +"#, + ) + .await + .unwrap(); + + println!("@@@@@@\n{metrics}\n@@@@@@"); + + assert!(metrics.matches("tx5.endpoint.conn.count").count() > 0); + assert!(metrics.matches("tx5.endpoint.conn.recv.By").count() > 0); + assert!(metrics.matches("tx5.endpoint.conn.send.By").count() > 0); + + influxive.shutdown(); } #[tokio::test(flavor = "multi_thread")]