Skip to content

Commit

Permalink
Improve summary (#677)
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Dec 19, 2024
1 parent a297836 commit 4ddbbb7
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 41 deletions.
14 changes: 8 additions & 6 deletions include/cinatra/ylt/metric/summary.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ YLT_REFL(json_summary_t, name, help, type, labels_name, quantiles_key, metrics);
class summary_t : public static_metric {
public:
summary_t(std::string name, std::string help, std::vector<double> quantiles,
std::chrono::seconds max_age = std::chrono::seconds{36000})
std::chrono::seconds max_age = std::chrono::seconds{0})
: static_metric(MetricType::Summary, std::move(name), std::move(help)),
quantiles_(std::move(quantiles)),
impl_(quantiles_,
Expand All @@ -48,7 +48,7 @@ class summary_t : public static_metric {

summary_t(std::string name, std::string help, std::vector<double> quantiles,
std::map<std::string, std::string> static_labels,
std::chrono::seconds max_age = std::chrono::seconds{36000})
std::chrono::seconds max_age = std::chrono::seconds{60})
: static_metric(MetricType::Summary, std::move(name), std::move(help),
std::move(static_labels)),
quantiles_(std::move(quantiles)),
Expand Down Expand Up @@ -133,20 +133,22 @@ class summary_t : public static_metric {

private:
std::vector<double> quantiles_;
ylt::metric::detail::summary_impl<> impl_;
ylt::metric::detail::summary_impl<uint64_t> impl_;
};

template <size_t N>
class basic_dynamic_summary
: public dynamic_metric_impl<ylt::metric::detail::summary_impl<>, N> {
: public dynamic_metric_impl<ylt::metric::detail::summary_impl<uint32_t>,
N> {
private:
using Base = dynamic_metric_impl<ylt::metric::detail::summary_impl<>, N>;
using Base =
dynamic_metric_impl<ylt::metric::detail::summary_impl<uint32_t>, N>;

public:
basic_dynamic_summary(
std::string name, std::string help, std::vector<double> quantiles,
std::array<std::string, N> labels_name,
std::chrono::milliseconds max_age = std::chrono::seconds{36000})
std::chrono::milliseconds max_age = std::chrono::seconds{60})
: Base(MetricType::Summary, std::move(name), std::move(help),
std::move(labels_name)),
quantiles_(std::move(quantiles)),
Expand Down
63 changes: 36 additions & 27 deletions include/cinatra/ylt/metric/summary_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@
#include <iterator>
#include <limits>
#include <memory>
#include <type_traits>
#include <vector>

namespace ylt::metric::detail {

template <std::size_t frac_bit = 6>
template <typename uint_type, std::size_t frac_bit = 6>
class summary_impl {
static_assert(sizeof(uint_type) >= 4);
static_assert(std::is_unsigned_v<uint_type>);
constexpr static uint32_t decode_impl(uint16_t float16_value) {
float16_value <<= (8 - frac_bit);
uint32_t sign = float16_value >> 15;
Expand Down Expand Up @@ -57,7 +60,8 @@ class summary_impl {
static constexpr float float16_max = (1ull << 63) * 2.0f; // 2^64

static uint16_t encode(float flt) {
unsigned int& fltInt32 = *(unsigned int*)&flt;
static_assert(sizeof(float) == 4);
uint32_t& fltInt32 = *(uint32_t*)&flt;
if (std::abs(flt) >= float16_max || std::isnan(flt)) {
flt = (fltInt32 & 0x8000'0000) ? (-float16_max) : (float16_max);
}
Expand Down Expand Up @@ -88,9 +92,9 @@ class summary_impl {

struct data_t {
static constexpr size_t piece_size = bucket_size / piece_cnt;
using piece_t = std::array<std::atomic<uint32_t>, piece_size>;
using piece_t = std::array<std::atomic<uint_type>, piece_size>;

std::atomic<uint32_t>& operator[](std::size_t index) {
std::atomic<uint_type>& operator[](std::size_t index) {
piece_t* piece = arr[index / piece_size];
if (piece == nullptr) {
auto ptr = new piece_t{};
Expand Down Expand Up @@ -122,7 +126,7 @@ class summary_impl {
}
template <bool inc_order>
void stat_impl(uint64_t& count,
std::vector<std::pair<int16_t, uint32_t>>& result, int i) {
std::vector<std::pair<int16_t, uint_type>>& result, int i) {
auto piece = arr[i].load(std::memory_order_relaxed);
if (piece) {
if constexpr (inc_order) {
Expand All @@ -146,7 +150,7 @@ class summary_impl {
}
}
void stat(uint64_t& count,
std::vector<std::pair<int16_t, uint32_t>>& result) {
std::vector<std::pair<int16_t, uint_type>>& result) {
for (int i = piece_cnt - 1; i >= piece_cnt / 2; --i) {
stat_impl<false>(count, result, i);
}
Expand Down Expand Up @@ -182,36 +186,38 @@ class summary_impl {
static inline const unsigned long ms_count =
std::chrono::steady_clock::duration{std::chrono::milliseconds{1}}.count();

constexpr static unsigned int near_uint32_max = 4290000000U;
constexpr static uint32_t near_uint32_max = 4290000000U;

void increase(data_t& arr, uint16_t pos) {
if (arr[pos].fetch_add(1, std::memory_order::relaxed) >
near_uint32_max) /*no overflow*/ [[likely]] {
arr[pos].fetch_sub(1, std::memory_order::relaxed);
int upper = (pos < bucket_size / 2) ? (bucket_size / 2) : (bucket_size);
int lower = (pos < bucket_size / 2) ? (0) : (bucket_size / 2);
for (int delta = 1, lim = (std::max)(upper - pos, pos - lower + 1);
delta < lim; ++delta) {
if (pos + delta < upper) {
if (arr[pos + delta].fetch_add(1, std::memory_order::relaxed) <=
near_uint32_max) {
break;
auto res = arr[pos].fetch_add(1, std::memory_order::relaxed);
if constexpr (std::is_same_v<uint_type, uint32_t>) {
if (res > near_uint32_max) /*no overflow*/ [[likely]] {
arr[pos].fetch_sub(1, std::memory_order::relaxed);
int upper = (pos < bucket_size / 2) ? (bucket_size / 2) : (bucket_size);
int lower = (pos < bucket_size / 2) ? (0) : (bucket_size / 2);
for (int delta = 1, lim = (std::max)(upper - pos, pos - lower + 1);
delta < lim; ++delta) {
if (pos + delta < upper) {
if (arr[pos + delta].fetch_add(1, std::memory_order::relaxed) <=
near_uint32_max) {
break;
}
arr[pos + delta].fetch_sub(1, std::memory_order::relaxed);
}
arr[pos + delta].fetch_sub(1, std::memory_order::relaxed);
}
if (pos - delta >= lower) {
if (arr[pos - delta].fetch_add(1, std::memory_order::relaxed) <=
near_uint32_max) {
break;
if (pos - delta >= lower) {
if (arr[pos - delta].fetch_add(1, std::memory_order::relaxed) <=
near_uint32_max) {
break;
}
arr[pos - delta].fetch_sub(1, std::memory_order::relaxed);
}
arr[pos - delta].fetch_sub(1, std::memory_order::relaxed);
}
}
}
}

struct data_copy_t {
std::vector<std::pair<int16_t, uint32_t>> arr[2];
std::vector<std::pair<int16_t, uint_type>> arr[2];
int index[2] = {}, smaller_one;
void init() {
if (arr[0][0] <= arr[1][0]) {
Expand All @@ -231,7 +237,7 @@ class summary_impl {
}
}
int16_t value() { return arr[smaller_one][index[smaller_one]].first; }
uint32_t count() { return arr[smaller_one][index[smaller_one]].second; }
uint_type count() { return arr[smaller_one][index[smaller_one]].second; }
};

public:
Expand Down Expand Up @@ -304,6 +310,9 @@ class summary_impl {
e = 1;
}
auto target_count = std::min<double>(e * count, count);
if (e == 0) {
target_count = std::min(uint64_t{1}, count);
}
while (true) {
if (target_count <= count_now) [[unlikely]] {
result.push_back(v);
Expand Down
15 changes: 10 additions & 5 deletions tests/test_cinatra_websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,18 +277,24 @@ TEST_CASE("test send after server stop") {

TEST_CASE("test read write in different threads") {
cinatra::coro_http_server server(1, 8090);
size_t count = 0;
std::promise<void> promise;
server.set_http_handler<cinatra::GET>(
"/",
[](coro_http_request &req,
coro_http_response &resp) -> async_simple::coro::Lazy<void> {
[&](coro_http_request &req,
coro_http_response &resp) -> async_simple::coro::Lazy<void> {
CHECK(req.get_content_type() == content_type::websocket);
websocket_result result{};
while (true) {
result = co_await req.get_conn()->read_websocket();
if (result.ec) {
break;
}

count++;
if (count == 100) {
promise.set_value();
break;
}
auto ec = co_await req.get_conn()->write_websocket(result.data);
if (ec) {
break;
Expand Down Expand Up @@ -326,8 +332,7 @@ TEST_CASE("test read write in different threads") {

async_simple::coro::syncAwait(lazy());

std::this_thread::sleep_for(std::chrono::milliseconds(300));

promise.get_future().wait_for(std::chrono::seconds(2));
server.stop();
}

Expand Down
24 changes: 21 additions & 3 deletions tests/test_metric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -950,15 +950,15 @@ TEST_CASE("test summary with illegal quantities") {
CHECK(str.find("test_summary_sum") != std::string::npos);
CHECK(str.find("test_summary{quantile=\"") != std::string::npos);
CHECK(result[0] < 0);
CHECK(result[1] < 0);
CHECK(result[1] == 0);
CHECK(result[result.size() - 1] > result[result.size() - 2]);

#ifdef CINATRA_ENABLE_METRIC_JSON
std::string str_json;
summary.serialize_to_json(str_json);
std::cout << str_json << "\n";
std::cout << str_json.size() << std::endl;
CHECK(str_json.size() == 233);
CHECK(str_json.size() == 222);
#endif
}

Expand Down Expand Up @@ -994,7 +994,7 @@ TEST_CASE("test summary with many quantities") {
summary.serialize_to_json(str_json);
std::cout << str_json << "\n";
std::cout << str_json.size() << std::endl;
CHECK(str_json.size() == 8868);
CHECK(str_json.size() == 8857);
#endif
}

Expand Down Expand Up @@ -2023,6 +2023,24 @@ TEST_CASE("test remove label value") {
CHECK(!counter.has_label_value(std::vector<std::string>{}));
}

TEST_CASE("test static summary with 0 and 1 quantiles") {
{
ylt::metric::summary_t s("test", "help", {0, 1});
for (uint64_t i = 0; i < 100ull; ++i) {
s.observe(1);
}
auto result = s.get_rates();
CHECK(result[0] == 1);
CHECK(result[1] == 1);
}
{
ylt::metric::summary_t s("test", "help", {0, 1});
auto result = s.get_rates();
CHECK(result[0] == 0);
CHECK(result[1] == 0);
}
}

DOCTEST_MSVC_SUPPRESS_WARNING_WITH_PUSH(4007)
int main(int argc, char** argv) { return doctest::Context(argc, argv).run(); }
DOCTEST_MSVC_SUPPRESS_WARNING_POP

0 comments on commit 4ddbbb7

Please sign in to comment.