Skip to content

Commit

Permalink
Merge pull request #26 from dynatrace-oss/one_poll
Browse files Browse the repository at this point in the history
All events handlers moved to bpf_events.
  • Loading branch information
pawsten authored Oct 10, 2022
2 parents 0712060 + ac16ad9 commit ba41e1f
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 86 deletions.
66 changes: 31 additions & 35 deletions libnettracer/src/bpf_events.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "bpf_generic/src/perf_event.h"
#include <algorithm>
#include <exception>
#include <iostream>
#include <poll.h>
#include <stdint.h>
#include <unistd.h>
Expand All @@ -21,23 +22,17 @@ void bpf_events::stop() {

std::vector<pollfd> bpf_events::create_pfds() {
std::vector<pollfd> fds;

std::transform(ipv4_event_observer.md.pfd.begin(), ipv4_event_observer.md.pfd.end(), std::back_inserter(fds), [](auto& it) {
return pollfd{it, POLLIN, 0};
});
std::transform(ipv6_event_observer.md.pfd.begin(), ipv6_event_observer.md.pfd.end(), std::back_inserter(fds), [](auto& it) {
return pollfd{it, POLLIN, 0};
});
std::transform(log_event_observer.md.pfd.begin(), log_event_observer.md.pfd.end(), std::back_inserter(fds), [](auto& it) {
return pollfd{it, POLLIN, 0};
});
fds.push_back(pollfd{STDIN_FILENO, POLLIN, 0});
for (const auto& ito : observers) {
std::transform(ito.md.pfd.begin(), ito.md.pfd.end(), std::back_inserter(fds), [](auto& it) { return pollfd{it, POLLIN, 0}; });
}
return fds;
}

void bpf_events::loop() {
using namespace std::chrono_literals;
int page_size = getpagesize();
std::vector<pollfd> fds = create_pfds();
std::vector<pollfd> fds = create_pfds();
while (running) {
int res = poll(fds.data(), fds.size(), 100);
if (res < 0) {
Expand All @@ -52,37 +47,38 @@ void bpf_events::loop() {
if (!(fd.revents & POLLIN)) {
continue;
}
if (fd.fd == STDIN_FILENO) {
char tmp[128];
if (!std::cin.read(tmp, std::min(128, res))) {
exit(1);
}
kbhit_observer();
} else {

auto ac = fd_to_evtype(fd.fd);
const size_t cpu = ac.first;
std::visit(
[page_size, cpu](auto&& arg) {
using atype = typename std::decay<decltype(arg)>::type;
auto events = bpf::deserializeEvent<typename atype::evt_type>(arg.md, page_size, cpu);
std::sort(events.begin(), events.end(), [](auto const& a, auto const& b) { return a.timestamp < b.timestamp; });
std::for_each(events.begin(), events.end(), arg.action);
},
ac.second);
auto ac = fd_to_evtype(fd.fd);
const size_t cpu = ac.first;
std::visit(
[page_size, cpu, &ac](auto&& arg) {
using atype = typename std::decay<decltype(arg)>::type::argument_type;
auto events = bpf::deserializeEvent<typename std::decay<atype>::type>(ac.second.md, page_size, cpu);
std::sort(events.begin(), events.end(), [](auto const& a, auto const& b) { return a.timestamp < b.timestamp; });
std::for_each(events.begin(), events.end(), arg);
},
ac.second.action);
}

fd.events = POLLIN;
fd.revents = 0;
}
}
}

bpf_events::evt_variant bpf_events::fd_to_evtype(
int fd) {
auto it = std::find(ipv4_event_observer.md.pfd.begin(), ipv4_event_observer.md.pfd.end(), fd);
if (it != ipv4_event_observer.md.pfd.end())
return {std::distance(ipv4_event_observer.md.pfd.begin(), it), ipv4_event_observer};

auto it2 = std::find(ipv6_event_observer.md.pfd.begin(), ipv6_event_observer.md.pfd.end(), fd);
if (it2 != ipv6_event_observer.md.pfd.end())
return {std::distance( ipv6_event_observer.md.pfd.begin(), it2), ipv6_event_observer};

auto it3 = std::find(log_event_observer.md.pfd.begin(), log_event_observer.md.pfd.end(), fd);
if (it3 != log_event_observer.md.pfd.end())
return {std::distance( log_event_observer.md.pfd.begin(), it3), log_event_observer};

bpf_events::evt_source bpf_events::fd_to_evtype(int fd) {
for (const auto& it : observers) {
auto ft = std::find(it.md.pfd.begin(), it.md.pfd.end(), fd);
if (ft != it.md.pfd.end())
return {std::distance(it.md.pfd.begin(), ft), it};
}
throw std::range_error("no type conversion");
}

34 changes: 19 additions & 15 deletions libnettracer/src/bpf_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,40 @@
struct pollfd;

template <typename T>
using f_ac = std::function<void(const T&)>;
using actions = std::variant<f_ac<tcp_ipv4_event_t>, f_ac<tcp_ipv6_event_t>, f_ac<bpf_log_event_t>>;

struct evt_descr {
bpf::map_data md;
std::function<void(const T&)> action;
using evt_type = T;
actions action;
};

class bpf_events {
std::thread reader;
bool running = false;
void read_loop();
evt_descr<tcp_ipv4_event_t> ipv4_event_observer;
evt_descr<tcp_ipv6_event_t> ipv6_event_observer;
evt_descr<bpf_log_event_t> log_event_observer;
std::vector<pollfd> create_pfds();
std::vector<evt_descr> observers;
std::vector<pollfd> create_pfds();
std::function<void()> kbhit_observer;

public:
void add_observer(evt_descr<tcp_ipv4_event_t>&& o) {
ipv4_event_observer = o;
template <typename T>
void add_observer(const bpf::map_data md, f_ac<T> ac) {
evt_descr tmp;
tmp.md = md;
tmp.action = ac;
observers.push_back(tmp);
}
void add_observer(evt_descr<tcp_ipv6_event_t>&& o) {
ipv6_event_observer = o;
}
void add_observer(evt_descr<bpf_log_event_t>&& o) {
log_event_observer = o;

void set_kbhit_observer(std::function<void()>&& f) {
kbhit_observer = f;
}
void start();
void stop();
void loop();

private:
using evt_variant = std::pair<int, std::variant<evt_descr<tcp_ipv4_event_t>, evt_descr<tcp_ipv6_event_t>, evt_descr<bpf_log_event_t>>>;
evt_variant fd_to_evtype(int fd);
using evt_source = std::pair<int, evt_descr>;
evt_source fd_to_evtype(int fd);
};

32 changes: 5 additions & 27 deletions libnettracer/src/netstat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,6 @@ void NetStat::init() {
static bpf::BPFMapsWrapper wrapper;
mapsWrapper = &wrapper;

kbhit_t = std::thread(&NetStat::kbhit_check, this);
}

system_clock::time_point NetStat::getCurrentTimeFromSystemClock() const {
Expand All @@ -327,34 +326,13 @@ NetStat::NetStat(ExitCtrl& e, bool deltaMode, bool headerMode, bool nonInteracti
}

NetStat::~NetStat() {
if (kbhit_t.joinable()) {
kbhit_t.join();
}
}

// kbhit should enforce refersh data display
void NetStat::kbhit_check() {
char tmp[128];
std::vector<pollfd> pfd(1);
while (exitCtrl.running) {
pfd[0].fd = 0;
pfd[0].events = POLLIN;

int res = poll(pfd.data(), pfd.size(), 200);
if (res < 0) {
break;
} else if (res == 0) {
continue;
}

if (!std::cin.read(tmp, std::min(128, res))) {
exit(1);
}
std::unique_lock<std::mutex> ul(exitCtrl.m);
kbhit = true;
ul.unlock();
exitCtrl.cv.notify_all();
}
void NetStat::set_kbhit() {
std::unique_lock<std::mutex> ul(exitCtrl.m);
kbhit = true;
ul.unlock();
exitCtrl.cv.notify_all();
}

} // namespace netstat
5 changes: 1 addition & 4 deletions libnettracer/src/netstat.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ class NetStat {
ConnectionsIPv6 aggr6_;
std::mutex mx;
bool kbhit;
std::thread kbhit_t;
const int max_map_size = 1024;
ExitCtrl& exitCtrl;
bool incremental;
Expand All @@ -63,8 +62,6 @@ class NetStat {
template <typename IPTYPE>
inline auto& connections(); // no default instantiation

void kbhit_check();

template<typename IPTYPE>
void initConnection(const tcpTable<IPTYPE>&);
void initConnections();
Expand Down Expand Up @@ -93,7 +90,7 @@ class NetStat {
public:
explicit NetStat(ExitCtrl& e, bool deltaMode, bool headerMode, bool nonInteractive);
virtual ~NetStat();

void set_kbhit();
void init();

void map_loop(const bpf_fds& fdsIPv4, const bpf_fds& fdsIPv6);
Expand Down
11 changes: 6 additions & 5 deletions nettracersrv/nettracer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ int main(int argc, char* argv[]) {
netstat::NetStat netst(exitCtrl, vm.count("incremental"), vm.count("header"), vm.count("noninteractive"));
netst.init();
bpf_events bevents;
bevents.set_kbhit_observer( std::bind(&netstat::NetStat::set_kbhit, &netst));

std::function<void(const tcp_ipv4_event_t&)> ipv4_event_update;
std::function<void(const tcp_ipv6_event_t&)> ipv6_event_update;
Expand Down Expand Up @@ -258,22 +259,22 @@ int main(int argc, char* argv[]) {
auto log_pmap = ebpf.get_perf_map("bpf_logs");
if (!log_pmap.pfd.empty() && !noStdoutLog) {
LOG_INFO("Starting BPF log events");
bevents.add_observer({ log_pmap,bpf_log_event_update});
bevents.add_observer<bpf_log_event_t>(log_pmap, bpf_log_event_update);
}

auto ipv4_pmap = ebpf.get_perf_map("tcp_event_ipv4");
auto ipv4_pmap = ebpf.get_perf_map("tcp_event_ipv4");
if (!ipv4_pmap.pfd.empty()) {
LOG_INFO("Starting TCP IPv4 events");
bevents.add_observer({ipv4_pmap, ipv4_event_update});
bevents.add_observer<tcp_ipv4_event_t>(ipv4_pmap, ipv4_event_update);
}

auto ipv6_pmap = ebpf.get_perf_map("tcp_event_ipv6");
if (!ipv6_pmap.pfd.empty() && monitorIPv6) {
LOG_INFO("Starting TCP IPv6 events");
bevents.add_observer({ipv6_pmap, ipv6_event_update});
bevents.add_observer<tcp_ipv6_event_t>(ipv6_pmap, ipv6_event_update);
}

bevents.start();
bevents.start();
auto map_reader = std::thread{map_reading};

if (map_reader.joinable()) {
Expand Down

0 comments on commit ba41e1f

Please sign in to comment.