diff --git a/CHANGES b/CHANGES index d93f874f..c09a8ba2 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,12 @@ +2.0.2 | 2022-02-21 16:36:16 +0100 + + * Push all asynchronous activity to the main thread to avoid most + inter-thread locking. + + * Fix configure's `--sanitizer` argument. + + * In `.schema` output, break out table parameters separately. + 2.0.1 | 2022-02-21 15:26:44 +0100 * Add a test build of the source code tarball to CI. diff --git a/VERSION b/VERSION index 38f77a65..e9307ca5 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.0.1 +2.0.2 diff --git a/configure b/configure index cd75f789..3a11043b 100755 --- a/configure +++ b/configure @@ -99,7 +99,7 @@ while [ $# -ne 0 ]; do --enable-debug) cmake_build_type="Debug";; --enable-osx-universal) cmake_osx_architectures="'arm64;x86_64'";; --enable-sanitizer) cmake_use_sanitizers="address";; - --enable-sanitizer=*) cmake_use_sanitizers="${optargs}";; + --enable-sanitizer=*) cmake_use_sanitizers="${optarg}";; --enable-static) cmake_use_static_linking="yes";; --enable-werror) cmake_use_werror="yes";; --generator=*) cmake_generator="${optarg}";; diff --git a/src/core/configuration.cc b/src/core/configuration.cc index 2375f0e2..f61c1d18 100644 --- a/src/core/configuration.cc +++ b/src/core/configuration.cc @@ -395,14 +395,9 @@ Configuration::~Configuration() { logger()->set_level(pimpl()->old_log_level); } -const Options& Configuration::options() const { - Synchronize _(this); - return pimpl()->_options; -} +const Options& Configuration::options() const { return pimpl()->_options; } Result Configuration::initFromArgv(int argc, const char* const* argv) { - Synchronize _(this); - std::vector vargv; vargv.reserve(argc); for ( auto i = 0; i < argc; i++ ) @@ -413,13 +408,11 @@ Result Configuration::initFromArgv(int argc, const char* const* argv) { } Result Configuration::read(const filesystem::path& path) { - Synchronize _(this); ZEEK_AGENT_DEBUG("configuration", "reading file {}", path.native()); return pimpl()->read(path); } Result Configuration::read(std::istream& in, const filesystem::path& path) { - Synchronize _(this); ZEEK_AGENT_DEBUG("configuration", "reading stream associated with file {}", path.native()); return pimpl()->read(in, path); } diff --git a/src/core/configuration.h b/src/core/configuration.h index be0a71eb..f6560a43 100644 --- a/src/core/configuration.h +++ b/src/core/configuration.h @@ -3,9 +3,9 @@ #pragma once #include "util/filesystem.h" +#include "util/helpers.h" #include "util/pimpl.h" #include "util/result.h" -#include "util/threading.h" #include #include @@ -107,7 +107,7 @@ struct Options { * * All public methods in this class are thread-safe. */ -class Configuration : public Pimpl, protected SynchronizedBase { +class Configuration : public Pimpl { public: Configuration(); ~Configuration(); diff --git a/src/core/database.cc b/src/core/database.cc index 06386815..60a6b6e7 100644 --- a/src/core/database.cc +++ b/src/core/database.cc @@ -7,7 +7,6 @@ #include "sqlite.h" #include "util/helpers.h" #include "util/testing.h" -#include "util/threading.h" #include #include @@ -59,8 +58,6 @@ struct Pimpl::Implementation { // Helper to lookup scheduled query. std::optional::iterator> lookupQuery(query::ID); - SynchronizedBase* _synchronized = - nullptr; // database's synchronizer, so that we can grab it during callback execution const Configuration* _configuration = nullptr; // configuration object, as passed into constructor Scheduler* _scheduler = nullptr; // scheduler as passed into constructor std::unique_ptr _sqlite; // SQLite backend for performing queries @@ -127,9 +124,7 @@ void Database::Implementation::expire() { continue; if ( (*i)->query.callback_done ) - _synchronized->unlockWhile([&, id = id, regular_shutdown = regular_shutdown]() { - (*(*i)->query.callback_done)(id, ! regular_shutdown); - }); + (*(*i)->query.callback_done)(id, ! regular_shutdown); } for ( const auto& [id, regular_shutdown] : _cancelled_queries ) { @@ -231,15 +226,12 @@ static auto newRows(std::vector> old, std::vectorquery.cancelled ) // already gone, or will be cleaned up shortly return 0s; - auto sql_result = _synchronized->unlockWhile( - [&]() { return _sqlite->runStatement(*(*i)->prepared_query, (*i)->previous_execution); }); + auto sql_result = _sqlite->runStatement(*(*i)->prepared_query, (*i)->previous_execution); // re-lookup because we released the lock i = lookupQuery(id); @@ -282,14 +274,12 @@ Interval Database::Implementation::timerCallback(timer::ID id) { #endif if ( (*i)->query.callback_result ) { - _synchronized->unlockWhile([&]() { - auto query_result = query::Result{.columns = sql_result->columns, - .rows = std::move(rows), - .cookie = (*i)->query.cookie, - .initial_result = ! (*i)->previous_result.has_value()}; + auto query_result = query::Result{.columns = sql_result->columns, + .rows = std::move(rows), + .cookie = (*i)->query.cookie, + .initial_result = ! (*i)->previous_result.has_value()}; - (*(*i)->query.callback_result)(id, query_result); - }); + (*(*i)->query.callback_result)(id, query_result); // repeat search in case map was modified by callback i = lookupQuery(id); @@ -328,7 +318,6 @@ Table* Database::Implementation::table(const std::string& name) { Database::Database(Configuration* configuration, Scheduler* scheduler) { ZEEK_AGENT_DEBUG("database", "creating instance"); - pimpl()->_synchronized = this; pimpl()->_configuration = configuration; pimpl()->_scheduler = scheduler; pimpl()->_sqlite = std::make_unique(); @@ -349,15 +338,9 @@ Time Database::currentTime() const { return pimpl()->_scheduler->currentTime(); } -size_t Database::numberQueries() const { - Synchronize _(this); - return pimpl()->_queries.size(); -} +size_t Database::numberQueries() const { return pimpl()->_queries.size(); } -Table* Database::table(const std::string& name) { - Synchronize _(this); - return pimpl()->table(name); -} +Table* Database::table(const std::string& name) { return pimpl()->table(name); } std::vector Database::tables() { std::vector out; @@ -371,7 +354,6 @@ std::vector Database::tables() { Result> Database::query(const Query& q) { ZEEK_AGENT_DEBUG("database", "new query: {} ", q.sql_stmt); - Synchronize _(this); auto id = pimpl()->query(q); if ( id ) { @@ -388,26 +370,21 @@ Result> Database::query(const Query& q) { void Database::cancel(query::ID id) { ZEEK_AGENT_DEBUG("database", "canceling query {}", id); - Synchronize _(this); return pimpl()->cancel(id, false); } void Database::poll() { ZEEK_AGENT_DEBUG("database", "polling database"); - Synchronize _(this); pimpl()->poll(); pimpl()->expire(); } void Database::expire() { ZEEK_AGENT_DEBUG("database", "expiring database state"); - Synchronize _(this); pimpl()->expire(); } void Database::addTable(Table* t) { - Synchronize _(this); - t->setDatabase(this); if ( configuration().options().use_mock_data ) diff --git a/src/core/database.h b/src/core/database.h index 25e5a056..af7148b7 100644 --- a/src/core/database.h +++ b/src/core/database.h @@ -5,7 +5,6 @@ #include "table.h" #include "util/pimpl.h" #include "util/result.h" -#include "util/threading.h" #include #include @@ -124,9 +123,9 @@ class Scheduler; * Tables are typically registered at agent startup, and then remain available * for querying through corresponding SQL statements. * - * All public methods in this class are thread-safe. + * Note that database methods are not safe against access from different threads. */ -class Database : public Pimpl, public SynchronizedBase { +class Database : public Pimpl { public: /** * Constructor. diff --git a/src/core/scheduler.cc b/src/core/scheduler.cc index dda15804..fc42b206 100644 --- a/src/core/scheduler.cc +++ b/src/core/scheduler.cc @@ -7,6 +7,7 @@ #include "util/testing.h" #include "util/threading.h" +#include #include #include #include @@ -37,27 +38,36 @@ struct Pimpl::Implementation { // Advances the current time, fireing all timers now expired. bool advance(Time now); - // Runs all update hooks. + // Signals an externally visible state change.. void updated(); - SynchronizedBase* _synchronized = - nullptr; // scheduler's synchronizer, so that we can release it for callback execution - timer::ID _next_id = 1; // counter for creating timer IDs - Time _now = 0_time; // current time + // Executes scheduled activity up to current wall clock. + bool loop(); + + timer::ID _next_id = 1; // counter for creating timer IDs + Time _now = 0_time; // current time + bool _terminating; // true once termination has been requested; ok to access wo/ lock + ConditionVariable _interrupt_loop; // notify to interrupt idnle waiting in loop() + + mutable std::mutex + _timers_mutex; // mutex protecting access to the current set of timers (_timers_by_id and _timers) std::unordered_map _timers_by_id; // maps IDs to their timers std::priority_queue, TimerComparator> _timers; // timers sorted by time - std::vector> _update_callbacks; // registered update callbacks - std::atomic _terminating; // true once termination has been requested; ok to access wo/ lock }; timer::ID Scheduler::Implementation::schedule(timer::ID id, Time t, timer::Callback cb) { + std::scoped_lock lock(_timers_mutex); + auto timer = Timer{.due = t, .id = id, .callback = std::move(cb)}; auto x = _timers_by_id.emplace(id, std::move(timer)); _timers.push(&x.first->second); + updated(); return id; } void Scheduler::Implementation::cancel(timer::ID id) { + std::scoped_lock lock(_timers_mutex); + if ( auto t = _timers_by_id.find(id); t != _timers_by_id.end() ) // mark as canceled, expiration will eventually delete it t->second.canceled = true; @@ -68,6 +78,7 @@ bool Scheduler::Implementation::advance(Time now) { return false; _now = now; + std::unique_lock lock(_timers_mutex); while ( _timers.size() && _timers.top()->due <= _now ) { auto t = std::move(*_timers.top()); // copy it out so that we can remove it before running the callback @@ -78,86 +89,93 @@ bool Scheduler::Implementation::advance(Time now) { _timers_by_id.erase(t.id); if ( ! t.canceled ) { - // Release lock before running callback, so that that can access the scheduler. - if ( auto reschedule = _synchronized->unlockWhile([&]() { return t.callback(t.id); }); reschedule > 0s ) { + // Release lock before running callback + lock.unlock(); + + if ( auto reschedule = t.callback(t.id); reschedule > 0s ) { auto due = _now + reschedule; ZEEK_AGENT_DEBUG("scheduler", "rescheduling timer {} for t={}", t.id, to_string(due)); schedule(t.id, due, t.callback); } + + lock.lock(); } } return true; } -void Scheduler::Implementation::updated() { - _synchronized->unlockWhile([&]() { - for ( const auto& cb : _update_callbacks ) - cb(); - }); -} +void Scheduler::Implementation::updated() { _interrupt_loop.notify(); } + +bool Scheduler::Implementation::loop() { + Interval timeout = 0s; + + { + std::scoped_lock lock(_timers_mutex); -Scheduler::Scheduler() { - ZEEK_AGENT_DEBUG("scheduler", "creating instance"); - pimpl()->_synchronized = this; + if ( _timers.empty() ) + timeout = 5s; // TODO: Make max timeout configurable + else + timeout = std::max(Interval(0s), _timers.top()->due - std::chrono::system_clock::now()); + } + + if ( timeout > 0s ) { + ZEEK_AGENT_DEBUG("scheduler", "sleeping with timeout={}", to_string(timeout)); + _interrupt_loop.wait(timeout); + } + + advance(std::chrono::system_clock::now()); + return _terminating; } +Scheduler::Scheduler() { ZEEK_AGENT_DEBUG("scheduler", "creating instance"); } + Scheduler::~Scheduler() { ZEEK_AGENT_DEBUG("scheduler", "destroying instance"); } timer::ID Scheduler::schedule(Time t, timer::Callback cb) { - Synchronize _(this); auto id = pimpl()->schedule(pimpl()->_next_id++, t, std::move(cb)); - ZEEK_AGENT_DEBUG("scheduler", "scheduled timer {} for t={}", id, to_string(t)); - pimpl()->updated(); + ZEEK_AGENT_DEBUG("scheduler", "scheduling timer {} for t={}", id, to_string(t)); return id; } +void Scheduler::schedule(task::Callback cb) { + auto id = pimpl()->schedule(pimpl()->_next_id++, currentTime(), [cb = std::move(cb)](timer::ID) -> Interval { + cb(); + return Interval(0); + }); + + ZEEK_AGENT_DEBUG("scheduler", "scheduling task {} for immediate execution", id); +} + void Scheduler::cancel(timer::ID id) { - Synchronize _(this); ZEEK_AGENT_DEBUG("scheduler", "canceling timer {}", id); pimpl()->cancel(id); pimpl()->updated(); } +bool Scheduler::loop() { + ZEEK_AGENT_DEBUG("scheduler", "executing pending activity"); + return pimpl()->loop(); +} + void Scheduler::advance(Time t) { - Synchronize _(this); ZEEK_AGENT_DEBUG("scheduler", "advancing time to t={}", to_string(t)); if ( pimpl()->advance(t) ) pimpl()->updated(); } -void Scheduler::registerUpdateCallback(std::function cb) { - Synchronize _(this); - pimpl()->_update_callbacks.push_back(std::move(cb)); -} - void Scheduler::terminate() { - Synchronize _(this); ZEEK_AGENT_DEBUG("scheduler", "got request to terminate"); pimpl()->_terminating = true; pimpl()->updated(); } -bool Scheduler::terminating() const { - Synchronize _(this); - return pimpl()->_terminating; -} - -Time Scheduler::currentTime() const { - Synchronize _(this); - return pimpl()->_now; -} +bool Scheduler::terminating() const { return pimpl()->_terminating; } -Time Scheduler::nextTimer() const { - Synchronize _(this); - if ( pimpl()->_timers.empty() ) - return 0_time; - else - return pimpl()->_timers.top()->due; -} +Time Scheduler::currentTime() const { return pimpl()->_now; } size_t Scheduler::pendingTimers() const { - Synchronize _(this); + std::scoped_lock lock(pimpl()->_timers_mutex); return pimpl()->_timers_by_id.size(); } @@ -184,7 +202,6 @@ TEST_CASE("timer management") { return 0s; }); - CHECK_EQ(scheduler.nextTimer(), 5_time); CHECK_EQ(scheduler.pendingTimers(), 2); scheduler.advance(3_time); @@ -269,24 +286,4 @@ TEST_CASE("timer management") { scheduler.terminate(); CHECK(scheduler.terminating()); } - - SUBCASE("update callback") { - // Checking all non-const methods here (except callback registration) - Scheduler scheduler; - uint64_t counter = 0; - scheduler.registerUpdateCallback([&]() { counter += 1; }); - - scheduler.advance(20_time); - scheduler.advance(40_time); - CHECK_EQ(counter, 2); - - auto id = scheduler.schedule(50_time, [](timer::ID) { return 0s; }); - CHECK_EQ(counter, 3); - - scheduler.cancel(id); - CHECK_EQ(counter, 4); - - scheduler.terminate(); - CHECK_EQ(counter, 5); - } } diff --git a/src/core/scheduler.h b/src/core/scheduler.h index 91c6b85a..af9c7742 100644 --- a/src/core/scheduler.h +++ b/src/core/scheduler.h @@ -2,8 +2,8 @@ #pragma once +#include "util/helpers.h" #include "util/pimpl.h" -#include "util/threading.h" #include @@ -21,6 +21,11 @@ using ID = uint64_t; using Callback = std::function; } // namespace timer +namespace task { +/** Callback for an operation queued for execution. */ +using Callback = std::function; +} // namespace task + /** * Manages a set of scheduled timers with associated callbacks to eventually * execute. The scheduler has an internal notion of time and will execute the @@ -33,10 +38,9 @@ using Callback = std::function; * for example through a deterministic sequence of fixed steps. The latter is * particularly useful for unit testing. * - * All public methods in this class are thread-safe. Callbacks will run from - * the inside the thread that advances the schedule's time. + * Note that methods of this class aren't thread-safe unless stated othewise. */ -class Scheduler : public Pimpl, SynchronizedBase { +class Scheduler : public Pimpl { public: Scheduler(); ~Scheduler(); @@ -44,6 +48,9 @@ class Scheduler : public Pimpl, SynchronizedBase { /** * Schedules a new timer to a specific point of time in the future. * + * This method is thread-safe and may be called from any thread. The + * callback will always be executed on the main thread. + * * @param t time to schedule the timer for * @param cb callback to execute when `t` has been reached * @returns a unique ID for the scheduled timer, which will remain valid @@ -51,6 +58,17 @@ class Scheduler : public Pimpl, SynchronizedBase { */ timer::ID schedule(Time t, timer::Callback cb); + /** + * Enqueues a callback to execute at the next possible opportunity fro the + * scheduler's thread. + * + * This method is thread-safe and may be called from any thread. The + * callback will always be executed on the main thread. + * + * @param cb callback to execute as soon as possible + */ + void schedule(task::Callback cb); + /** * Cancels a previously installed timer. The timer will be deleted without * its callback executing. @@ -80,12 +98,8 @@ class Scheduler : public Pimpl, SynchronizedBase { Time currentTime() const; /** - * Returns the time that the most recent upcoming timer will fire at. - * Returns time zero if there's no timer currently scheduled. + * Returns the number of timers/tasks currently scheduled for execution. */ - Time nextTimer() const; - - /** Returns the number of timers currently scheduled for execution. */ size_t pendingTimers() const; /** @@ -100,15 +114,14 @@ class Scheduler : public Pimpl, SynchronizedBase { bool terminating() const; /** - * Registers a callback to execute whenever something about the scheduler's - * state may have changed, including new or canceled timers, or advances in - * time. Callbacks will execute from inside the same thread that triggered - * the update. They may run in cases where there's no actual state - * change. + * Executes pending activity up to current wall clock. If nothing is + * pending, blocks for a little while (with new activity interrupting the + * block). * - * @param callback callback to execute after any change in state - **/ - void registerUpdateCallback(std::function cb); + * @return true if processing is to continue; false if the scheduler has + * been asked to terminate + */ + bool loop(); }; } // namespace zeek::agent diff --git a/src/core/signal.cc b/src/core/signal.cc index 3774e71b..b7065563 100644 --- a/src/core/signal.cc +++ b/src/core/signal.cc @@ -5,7 +5,6 @@ #include "logger.h" #include "util/fmt.h" #include "util/testing.h" -#include "util/threading.h" #include #include @@ -117,8 +116,8 @@ SignalManager::~SignalManager() { TEST_SUITE("Signal manager") { TEST_CASE("signal callbacks") { - int count1 = 0; - int count2 = 0; + std::atomic count1 = 0; + std::atomic count2 = 0; // TODO: For unknown reasons, the first kill() below sometimes gets // lost, at least on macOS (sigwait() returns, but it doesn't set a diff --git a/src/core/sqlite.cc b/src/core/sqlite.cc index ca02b4d3..d91facaf 100644 --- a/src/core/sqlite.cc +++ b/src/core/sqlite.cc @@ -585,7 +585,6 @@ SQLite::~SQLite() { Result> SQLite::prepareStatement(const std::string& stmt) { ZEEK_AGENT_DEBUG("sqlite", "preparing statement: \"{}\"", stmt); - Synchronize _(this); return pimpl()->prepareStatement(stmt); } @@ -593,19 +592,16 @@ Result SQLite::runStatement(const sqlite::PreparedStatement& stm ZEEK_AGENT_DEBUG("sqlite", "executing compiled statement: \"{}\"", ::sqlite3_sql(stmt.statement())); assert(stmt.statement()); - Synchronize _(this); return pimpl()->runStatement(stmt, t); } Result SQLite::runStatement(const std::string& stmt, std::optional