From 1cea84f4da4b2d839e90e4bb961ce31e23396dde Mon Sep 17 00:00:00 2001 From: Robin Sommer Date: Mon, 21 Feb 2022 13:31:02 +0100 Subject: [PATCH 1/7] Push all asynchronous activity back to the main thread. The scheduler now accepts, in addition timers, also "tasks" for immediate execution at the next opportunity. If any of our threads needs to do work, we now schedule that back to the scheduler as such a task. It will then be processed as part of the main loop. This means that thread-safety becomes much less of an issue, and hence require less locking. We'll clean lock up in a subsequent thread. --- src/core/scheduler.cc | 46 +++++++++++++++++++++++++++++++++++++++++-- src/core/scheduler.h | 22 +++++++++++++++++++++ src/io/console.cc | 28 ++++++++------------------ src/io/zeek.cc | 38 ++++++++++++++++++----------------- src/main.cc | 19 +----------------- zeek-agent | 2 +- 6 files changed, 96 insertions(+), 59 deletions(-) diff --git a/src/core/scheduler.cc b/src/core/scheduler.cc index dda15804..c1de8de1 100644 --- a/src/core/scheduler.cc +++ b/src/core/scheduler.cc @@ -7,6 +7,7 @@ #include "util/testing.h" #include "util/threading.h" +#include #include #include #include @@ -40,6 +41,9 @@ struct Pimpl::Implementation { // Runs all update hooks. void updated(); + // Executes scheduled activity up to current wall clock. + bool loop(); + SynchronizedBase* _synchronized = nullptr; // scheduler's synchronizer, so that we can release it for callback execution timer::ID _next_id = 1; // counter for creating timer IDs @@ -48,6 +52,7 @@ struct Pimpl::Implementation { std::priority_queue, TimerComparator> _timers; // timers sorted by time std::vector> _update_callbacks; // registered update callbacks std::atomic _terminating; // true once termination has been requested; ok to access wo/ lock + ConditionVariable interrupt_loop; }; timer::ID Scheduler::Implementation::schedule(timer::ID id, Time t, timer::Callback cb) { @@ -90,11 +95,31 @@ bool Scheduler::Implementation::advance(Time now) { return true; } +bool Scheduler::Implementation::loop() { + Interval timeout = 0s; + + if ( _timers.empty() ) + timeout = 15s; // TODO: Make max timoeut configurable + else + timeout = std::max(Interval(0s), _timers.top()->due - std::chrono::system_clock::now()); + + if ( timeout > 0s ) { + ZEEK_AGENT_DEBUG("scheduler", "sleeping with timeout={}", to_string(timeout)); + _synchronized->unlockWhile([&]() { interrupt_loop.wait(timeout); }); + } + + advance(std::chrono::system_clock::now()); + return _terminating; +} + void Scheduler::Implementation::updated() { _synchronized->unlockWhile([&]() { for ( const auto& cb : _update_callbacks ) cb(); }); + + ZEEK_AGENT_DEBUG("scheduler", "updated"); + interrupt_loop.notify(); } Scheduler::Scheduler() { @@ -112,6 +137,17 @@ timer::ID Scheduler::schedule(Time t, timer::Callback cb) { return id; } +void Scheduler::schedule(task::Callback cb) { + Synchronize _(this); + auto id = pimpl()->schedule(pimpl()->_next_id++, currentTime(), [cb = std::move(cb)](timer::ID) -> Interval { + cb(); + return Interval(0); + }); + + ZEEK_AGENT_DEBUG("scheduler", "scheduled task {} for immediate execution", id); + pimpl()->updated(); +} + void Scheduler::cancel(timer::ID id) { Synchronize _(this); ZEEK_AGENT_DEBUG("scheduler", "canceling timer {}", id); @@ -119,6 +155,12 @@ void Scheduler::cancel(timer::ID id) { pimpl()->updated(); } +bool Scheduler::loop() { + Synchronize _(this); + ZEEK_AGENT_DEBUG("scheduler", "executing pending activity"); + return pimpl()->loop(); +} + void Scheduler::advance(Time t) { Synchronize _(this); ZEEK_AGENT_DEBUG("scheduler", "advancing time to t={}", to_string(t)); @@ -139,12 +181,12 @@ void Scheduler::terminate() { } bool Scheduler::terminating() const { - Synchronize _(this); + // Synchronize _(this); return pimpl()->_terminating; } Time Scheduler::currentTime() const { - Synchronize _(this); + // Synchronize _(this); return pimpl()->_now; } diff --git a/src/core/scheduler.h b/src/core/scheduler.h index 91c6b85a..181cd435 100644 --- a/src/core/scheduler.h +++ b/src/core/scheduler.h @@ -21,6 +21,11 @@ using ID = uint64_t; using Callback = std::function; } // namespace timer +namespace task { +/** Callback for an operation queued for execution. */ +using Callback = std::function; +} // namespace task + /** * Manages a set of scheduled timers with associated callbacks to eventually * execute. The scheduler has an internal notion of time and will execute the @@ -51,6 +56,13 @@ class Scheduler : public Pimpl, SynchronizedBase { */ timer::ID schedule(Time t, timer::Callback cb); + /** + * Enqueues a callback to execute at the next possible opportunity fro the scheduler's thread. + + * @param cb callback to execute as soon as possible from the scheduler's thread. + */ + void schedule(task::Callback cb); + /** * Cancels a previously installed timer. The timer will be deleted without * its callback executing. @@ -60,6 +72,16 @@ class Scheduler : public Pimpl, SynchronizedBase { */ void cancel(timer::ID id); + /** + * Executes pending activity up to current wall clock. If nothing is + * pending, blocks for a little while (with new activity interrupting the + * block). + * + * @return true if processing is to continue; false if the scheduler has + * been asked to terminate + */ + bool loop(); + /** * Advances to scheduler's notion of the current time. This will let all * timers fire that are currently scheduled for a time <= `now`. Their diff --git a/src/io/console.cc b/src/io/console.cc index fc6fd158..7d09e51a 100644 --- a/src/io/console.cc +++ b/src/io/console.cc @@ -34,9 +34,6 @@ struct Pimpl::Implementation { // Performance a query against the database. void query(const std::string& stmt, std::optional subscription, bool terminate = false); - // Cancels the current query. - void cancelQuery(); - // Prints a message to the console. void message(const std::string& msg); @@ -70,11 +67,6 @@ struct Pimpl::Implementation { replxx::Replxx _rx; // instance of the REPL }; -void Console::Implementation::cancelQuery() { - // Note we can come here from a signal handler. - _query_done.notify(); -} - void Console::Implementation::execute(const std::string& cmd, bool terminate) { ZEEK_AGENT_DEBUG("console", "executing: {}", cmd); @@ -234,20 +226,16 @@ void Console::Implementation::query(const std::string& stmt, std::optional sigint_handler; if ( ! terminate ) // Temporarily install our our SIGINT handler while the query is running. - sigint_handler = std::make_unique(_signal_mgr, SIGINT, [this]() { - cancelQuery(); - std::cout << std::endl; - }); + sigint_handler = std::make_unique(_signal_mgr, SIGINT, [this]() { _query_done.notify(); }); _query_done.reset(); - if ( auto id = _db->query(query) ) { - _query_done.wait(); - if ( *id ) - _db->cancel(**id); - } - else - error(id.error()); + _scheduler->schedule([this, query]() { + if ( auto id = _db->query(query); ! id ) + error(id.error()); + }); + + _query_done.wait(); } void Console::Implementation::message(const std::string& msg) { _rx.print("%s\n", msg.c_str()); } @@ -315,7 +303,7 @@ void Console::stop() { if ( pimpl()->_thread ) { ZEEK_AGENT_DEBUG("console", "stopping"); - pimpl()->cancelQuery(); + pimpl()->_query_done.notify(); pimpl()->_thread->join(); } } diff --git a/src/io/zeek.cc b/src/io/zeek.cc index 24156b0d..28131e47 100644 --- a/src/io/zeek.cc +++ b/src/io/zeek.cc @@ -177,24 +177,26 @@ Result BrokerConnection::connect(const std::string& destination) { _endpoint.subscribe_nosync( topics, []() { /* nop */ }, [this](const broker::data_message& msg) { - if ( broker::get_topic(msg) == broker::topic::statuses_str ) { - Synchronize _(this); - auto x = broker::to(broker::get_data(msg)); - assert(x); // NOLINT(bugprone-lambda-function-name) - processStatus(*x); - } - else if ( broker::get_topic(msg) == broker::topic::errors_str ) { - Synchronize _(this); - auto x = broker::to(broker::get_data(msg)); - assert(x); // NOLINT(bugprone-lambda-function-name) - processError(*x); - } - else { - Synchronize _(this); - processEvent(msg); - } - }, // - [](const broker::error&) { /* nop */ }); // + _scheduler->schedule([this, msg]() { // process message on the main thread + if ( broker::get_topic(msg) == broker::topic::statuses_str ) { + Synchronize _(this); + auto x = broker::to(broker::get_data(msg)); + assert(x); // NOLINT(bugprone-lambda-function-name) + processStatus(*x); + } + else if ( broker::get_topic(msg) == broker::topic::errors_str ) { + Synchronize _(this); + auto x = broker::to(broker::get_data(msg)); + assert(x); // NOLINT(bugprone-lambda-function-name) + processError(*x); + } + else { + Synchronize _(this); + processEvent(msg); + } + }); + }, + [](const broker::error&) { /* nop */ }); _subscriber = _endpoint.make_subscriber(topics); _status_subscriber = _endpoint.make_status_subscriber(true); diff --git a/src/main.cc b/src/main.cc index b658d594..ecc1f967 100644 --- a/src/main.cc +++ b/src/main.cc @@ -41,7 +41,6 @@ int main(int argc, char** argv) { if ( geteuid() != 0 && ! cfg.options().use_mock_data ) logger()->warn("not running as root, information may be incomplete"); - ConditionVariable main_loop; Scheduler scheduler; SignalManager signal_mgr({SIGINT}); signal::Handler sigint(&signal_mgr, SIGINT, [&]() { scheduler.terminate(); }); @@ -68,29 +67,13 @@ int main(int argc, char** argv) { ZEEK_AGENT_DEBUG("main", "looping until terminated"); - scheduler.registerUpdateCallback([&main_loop]() { main_loop.notify(); }); - - while ( ! scheduler.terminating() ) { + while ( ! scheduler.loop() ) { db.poll(); if ( zeek ) zeek->poll(); - if ( auto next_timer = scheduler.nextTimer(); next_timer == 0_time ) { - // TODO: make timeout configurable - auto t = Interval(15s); - ZEEK_AGENT_DEBUG("main", "completely idle, sleeping with timeout={}", to_string(t)); - main_loop.wait(t); - } - - else if ( auto t = next_timer - std::chrono::system_clock::now(); t > 0s ) { - ZEEK_AGENT_DEBUG("main", "sleeping with timeout={}", to_string(t)); - main_loop.wait(t); - } - - scheduler.advance(std::chrono::system_clock::now()); db.expire(); - main_loop.reset(); // clear any updates that were just flagged } return 0; diff --git a/zeek-agent b/zeek-agent index becd56f4..0caff56e 160000 --- a/zeek-agent +++ b/zeek-agent @@ -1 +1 @@ -Subproject commit becd56f4e0168eb033304c289db5f8024e874a4a +Subproject commit 0caff56e17928fa9e6c452737db41bffac0ac030 From dac6a52963b20fce12189294412aaa5543b77402 Mon Sep 17 00:00:00 2001 From: Robin Sommer Date: Mon, 21 Feb 2022 13:32:38 +0100 Subject: [PATCH 2/7] Remove locking from most classes. They are no longer accessed from different threads. --- src/core/configuration.cc | 9 +-------- src/core/configuration.h | 4 ++-- src/core/database.cc | 41 +++++++++------------------------------ src/core/database.h | 5 ++--- src/core/signal.cc | 1 - src/core/sqlite.cc | 4 ---- src/core/sqlite.h | 3 +-- src/io/console.cc | 38 +++++++++++++++++++++++------------- src/io/console.h | 3 +-- src/io/zeek.cc | 16 +-------------- src/io/zeek.h | 3 +-- src/main.cc | 1 - zeek-agent | 2 +- 13 files changed, 43 insertions(+), 87 deletions(-) diff --git a/src/core/configuration.cc b/src/core/configuration.cc index 2375f0e2..f61c1d18 100644 --- a/src/core/configuration.cc +++ b/src/core/configuration.cc @@ -395,14 +395,9 @@ Configuration::~Configuration() { logger()->set_level(pimpl()->old_log_level); } -const Options& Configuration::options() const { - Synchronize _(this); - return pimpl()->_options; -} +const Options& Configuration::options() const { return pimpl()->_options; } Result Configuration::initFromArgv(int argc, const char* const* argv) { - Synchronize _(this); - std::vector vargv; vargv.reserve(argc); for ( auto i = 0; i < argc; i++ ) @@ -413,13 +408,11 @@ Result Configuration::initFromArgv(int argc, const char* const* argv) { } Result Configuration::read(const filesystem::path& path) { - Synchronize _(this); ZEEK_AGENT_DEBUG("configuration", "reading file {}", path.native()); return pimpl()->read(path); } Result Configuration::read(std::istream& in, const filesystem::path& path) { - Synchronize _(this); ZEEK_AGENT_DEBUG("configuration", "reading stream associated with file {}", path.native()); return pimpl()->read(in, path); } diff --git a/src/core/configuration.h b/src/core/configuration.h index be0a71eb..f6560a43 100644 --- a/src/core/configuration.h +++ b/src/core/configuration.h @@ -3,9 +3,9 @@ #pragma once #include "util/filesystem.h" +#include "util/helpers.h" #include "util/pimpl.h" #include "util/result.h" -#include "util/threading.h" #include #include @@ -107,7 +107,7 @@ struct Options { * * All public methods in this class are thread-safe. */ -class Configuration : public Pimpl, protected SynchronizedBase { +class Configuration : public Pimpl { public: Configuration(); ~Configuration(); diff --git a/src/core/database.cc b/src/core/database.cc index 06386815..60a6b6e7 100644 --- a/src/core/database.cc +++ b/src/core/database.cc @@ -7,7 +7,6 @@ #include "sqlite.h" #include "util/helpers.h" #include "util/testing.h" -#include "util/threading.h" #include #include @@ -59,8 +58,6 @@ struct Pimpl::Implementation { // Helper to lookup scheduled query. std::optional::iterator> lookupQuery(query::ID); - SynchronizedBase* _synchronized = - nullptr; // database's synchronizer, so that we can grab it during callback execution const Configuration* _configuration = nullptr; // configuration object, as passed into constructor Scheduler* _scheduler = nullptr; // scheduler as passed into constructor std::unique_ptr _sqlite; // SQLite backend for performing queries @@ -127,9 +124,7 @@ void Database::Implementation::expire() { continue; if ( (*i)->query.callback_done ) - _synchronized->unlockWhile([&, id = id, regular_shutdown = regular_shutdown]() { - (*(*i)->query.callback_done)(id, ! regular_shutdown); - }); + (*(*i)->query.callback_done)(id, ! regular_shutdown); } for ( const auto& [id, regular_shutdown] : _cancelled_queries ) { @@ -231,15 +226,12 @@ static auto newRows(std::vector> old, std::vectorquery.cancelled ) // already gone, or will be cleaned up shortly return 0s; - auto sql_result = _synchronized->unlockWhile( - [&]() { return _sqlite->runStatement(*(*i)->prepared_query, (*i)->previous_execution); }); + auto sql_result = _sqlite->runStatement(*(*i)->prepared_query, (*i)->previous_execution); // re-lookup because we released the lock i = lookupQuery(id); @@ -282,14 +274,12 @@ Interval Database::Implementation::timerCallback(timer::ID id) { #endif if ( (*i)->query.callback_result ) { - _synchronized->unlockWhile([&]() { - auto query_result = query::Result{.columns = sql_result->columns, - .rows = std::move(rows), - .cookie = (*i)->query.cookie, - .initial_result = ! (*i)->previous_result.has_value()}; + auto query_result = query::Result{.columns = sql_result->columns, + .rows = std::move(rows), + .cookie = (*i)->query.cookie, + .initial_result = ! (*i)->previous_result.has_value()}; - (*(*i)->query.callback_result)(id, query_result); - }); + (*(*i)->query.callback_result)(id, query_result); // repeat search in case map was modified by callback i = lookupQuery(id); @@ -328,7 +318,6 @@ Table* Database::Implementation::table(const std::string& name) { Database::Database(Configuration* configuration, Scheduler* scheduler) { ZEEK_AGENT_DEBUG("database", "creating instance"); - pimpl()->_synchronized = this; pimpl()->_configuration = configuration; pimpl()->_scheduler = scheduler; pimpl()->_sqlite = std::make_unique(); @@ -349,15 +338,9 @@ Time Database::currentTime() const { return pimpl()->_scheduler->currentTime(); } -size_t Database::numberQueries() const { - Synchronize _(this); - return pimpl()->_queries.size(); -} +size_t Database::numberQueries() const { return pimpl()->_queries.size(); } -Table* Database::table(const std::string& name) { - Synchronize _(this); - return pimpl()->table(name); -} +Table* Database::table(const std::string& name) { return pimpl()->table(name); } std::vector Database::tables() { std::vector out; @@ -371,7 +354,6 @@ std::vector Database::tables() { Result> Database::query(const Query& q) { ZEEK_AGENT_DEBUG("database", "new query: {} ", q.sql_stmt); - Synchronize _(this); auto id = pimpl()->query(q); if ( id ) { @@ -388,26 +370,21 @@ Result> Database::query(const Query& q) { void Database::cancel(query::ID id) { ZEEK_AGENT_DEBUG("database", "canceling query {}", id); - Synchronize _(this); return pimpl()->cancel(id, false); } void Database::poll() { ZEEK_AGENT_DEBUG("database", "polling database"); - Synchronize _(this); pimpl()->poll(); pimpl()->expire(); } void Database::expire() { ZEEK_AGENT_DEBUG("database", "expiring database state"); - Synchronize _(this); pimpl()->expire(); } void Database::addTable(Table* t) { - Synchronize _(this); - t->setDatabase(this); if ( configuration().options().use_mock_data ) diff --git a/src/core/database.h b/src/core/database.h index 25e5a056..af7148b7 100644 --- a/src/core/database.h +++ b/src/core/database.h @@ -5,7 +5,6 @@ #include "table.h" #include "util/pimpl.h" #include "util/result.h" -#include "util/threading.h" #include #include @@ -124,9 +123,9 @@ class Scheduler; * Tables are typically registered at agent startup, and then remain available * for querying through corresponding SQL statements. * - * All public methods in this class are thread-safe. + * Note that database methods are not safe against access from different threads. */ -class Database : public Pimpl, public SynchronizedBase { +class Database : public Pimpl { public: /** * Constructor. diff --git a/src/core/signal.cc b/src/core/signal.cc index 3774e71b..6cd2f806 100644 --- a/src/core/signal.cc +++ b/src/core/signal.cc @@ -5,7 +5,6 @@ #include "logger.h" #include "util/fmt.h" #include "util/testing.h" -#include "util/threading.h" #include #include diff --git a/src/core/sqlite.cc b/src/core/sqlite.cc index ca02b4d3..d91facaf 100644 --- a/src/core/sqlite.cc +++ b/src/core/sqlite.cc @@ -585,7 +585,6 @@ SQLite::~SQLite() { Result> SQLite::prepareStatement(const std::string& stmt) { ZEEK_AGENT_DEBUG("sqlite", "preparing statement: \"{}\"", stmt); - Synchronize _(this); return pimpl()->prepareStatement(stmt); } @@ -593,19 +592,16 @@ Result SQLite::runStatement(const sqlite::PreparedStatement& stm ZEEK_AGENT_DEBUG("sqlite", "executing compiled statement: \"{}\"", ::sqlite3_sql(stmt.statement())); assert(stmt.statement()); - Synchronize _(this); return pimpl()->runStatement(stmt, t); } Result SQLite::runStatement(const std::string& stmt, std::optional