Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/topic/robin/threading-reorg'
Browse files Browse the repository at this point in the history
* origin/topic/robin/threading-reorg:
  Small test case fix reported by thread sanitizer.
  Fix configure's `--sanitizer` argument.
  In `.schema` output, break out table parameters separately.
  Remove helper class we no longer need.
  Remove most locking from scheduler.
  Remove locking from most classes.
  Push all asynchronous activity back to the main thread.
  • Loading branch information
rsmmr committed Feb 21, 2022
2 parents ffe978a + 12ac2ec commit 6959d07
Show file tree
Hide file tree
Showing 19 changed files with 195 additions and 273 deletions.
9 changes: 9 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
2.0.2 | 2022-02-21 16:36:16 +0100

* Push all asynchronous activity to the main thread to avoid most
inter-thread locking.

* Fix configure's `--sanitizer` argument.

* In `.schema` output, break out table parameters separately.

2.0.1 | 2022-02-21 15:26:44 +0100

* Add a test build of the source code tarball to CI.
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.0.1
2.0.2
2 changes: 1 addition & 1 deletion configure
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ while [ $# -ne 0 ]; do
--enable-debug) cmake_build_type="Debug";;
--enable-osx-universal) cmake_osx_architectures="'arm64;x86_64'";;
--enable-sanitizer) cmake_use_sanitizers="address";;
--enable-sanitizer=*) cmake_use_sanitizers="${optargs}";;
--enable-sanitizer=*) cmake_use_sanitizers="${optarg}";;
--enable-static) cmake_use_static_linking="yes";;
--enable-werror) cmake_use_werror="yes";;
--generator=*) cmake_generator="${optarg}";;
Expand Down
9 changes: 1 addition & 8 deletions src/core/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -395,14 +395,9 @@ Configuration::~Configuration() {
logger()->set_level(pimpl()->old_log_level);
}

const Options& Configuration::options() const {
Synchronize _(this);
return pimpl()->_options;
}
const Options& Configuration::options() const { return pimpl()->_options; }

Result<Nothing> Configuration::initFromArgv(int argc, const char* const* argv) {
Synchronize _(this);

std::vector<std::string> vargv;
vargv.reserve(argc);
for ( auto i = 0; i < argc; i++ )
Expand All @@ -413,13 +408,11 @@ Result<Nothing> Configuration::initFromArgv(int argc, const char* const* argv) {
}

Result<Nothing> Configuration::read(const filesystem::path& path) {
Synchronize _(this);
ZEEK_AGENT_DEBUG("configuration", "reading file {}", path.native());
return pimpl()->read(path);
}

Result<Nothing> Configuration::read(std::istream& in, const filesystem::path& path) {
Synchronize _(this);
ZEEK_AGENT_DEBUG("configuration", "reading stream associated with file {}", path.native());
return pimpl()->read(in, path);
}
Expand Down
4 changes: 2 additions & 2 deletions src/core/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
#pragma once

#include "util/filesystem.h"
#include "util/helpers.h"
#include "util/pimpl.h"
#include "util/result.h"
#include "util/threading.h"

#include <optional>
#include <string>
Expand Down Expand Up @@ -107,7 +107,7 @@ struct Options {
*
* All public methods in this class are thread-safe.
*/
class Configuration : public Pimpl<Configuration>, protected SynchronizedBase {
class Configuration : public Pimpl<Configuration> {
public:
Configuration();
~Configuration();
Expand Down
41 changes: 9 additions & 32 deletions src/core/database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "sqlite.h"
#include "util/helpers.h"
#include "util/testing.h"
#include "util/threading.h"

#include <algorithm>
#include <list>
Expand Down Expand Up @@ -59,8 +58,6 @@ struct Pimpl<Database>::Implementation {
// Helper to lookup scheduled query.
std::optional<std::list<ScheduledQuery>::iterator> lookupQuery(query::ID);

SynchronizedBase* _synchronized =
nullptr; // database's synchronizer, so that we can grab it during callback execution
const Configuration* _configuration = nullptr; // configuration object, as passed into constructor
Scheduler* _scheduler = nullptr; // scheduler as passed into constructor
std::unique_ptr<SQLite> _sqlite; // SQLite backend for performing queries
Expand Down Expand Up @@ -127,9 +124,7 @@ void Database::Implementation::expire() {
continue;

if ( (*i)->query.callback_done )
_synchronized->unlockWhile([&, id = id, regular_shutdown = regular_shutdown]() {
(*(*i)->query.callback_done)(id, ! regular_shutdown);
});
(*(*i)->query.callback_done)(id, ! regular_shutdown);
}

for ( const auto& [id, regular_shutdown] : _cancelled_queries ) {
Expand Down Expand Up @@ -231,15 +226,12 @@ static auto newRows(std::vector<std::vector<Value>> old, std::vector<std::vector
}

Interval Database::Implementation::timerCallback(timer::ID id) {
SynchronizedBase::Synchronize _(_synchronized);

auto i = lookupQuery(id);
if ( ! i || (*i)->query.cancelled )
// already gone, or will be cleaned up shortly
return 0s;

auto sql_result = _synchronized->unlockWhile(
[&]() { return _sqlite->runStatement(*(*i)->prepared_query, (*i)->previous_execution); });
auto sql_result = _sqlite->runStatement(*(*i)->prepared_query, (*i)->previous_execution);

// re-lookup because we released the lock
i = lookupQuery(id);
Expand Down Expand Up @@ -282,14 +274,12 @@ Interval Database::Implementation::timerCallback(timer::ID id) {
#endif

if ( (*i)->query.callback_result ) {
_synchronized->unlockWhile([&]() {
auto query_result = query::Result{.columns = sql_result->columns,
.rows = std::move(rows),
.cookie = (*i)->query.cookie,
.initial_result = ! (*i)->previous_result.has_value()};
auto query_result = query::Result{.columns = sql_result->columns,
.rows = std::move(rows),
.cookie = (*i)->query.cookie,
.initial_result = ! (*i)->previous_result.has_value()};

(*(*i)->query.callback_result)(id, query_result);
});
(*(*i)->query.callback_result)(id, query_result);

// repeat search in case map was modified by callback
i = lookupQuery(id);
Expand Down Expand Up @@ -328,7 +318,6 @@ Table* Database::Implementation::table(const std::string& name) {

Database::Database(Configuration* configuration, Scheduler* scheduler) {
ZEEK_AGENT_DEBUG("database", "creating instance");
pimpl()->_synchronized = this;
pimpl()->_configuration = configuration;
pimpl()->_scheduler = scheduler;
pimpl()->_sqlite = std::make_unique<SQLite>();
Expand All @@ -349,15 +338,9 @@ Time Database::currentTime() const {
return pimpl()->_scheduler->currentTime();
}

size_t Database::numberQueries() const {
Synchronize _(this);
return pimpl()->_queries.size();
}
size_t Database::numberQueries() const { return pimpl()->_queries.size(); }

Table* Database::table(const std::string& name) {
Synchronize _(this);
return pimpl()->table(name);
}
Table* Database::table(const std::string& name) { return pimpl()->table(name); }

std::vector<const Table*> Database::tables() {
std::vector<const Table*> out;
Expand All @@ -371,7 +354,6 @@ std::vector<const Table*> Database::tables() {

Result<std::optional<query::ID>> Database::query(const Query& q) {
ZEEK_AGENT_DEBUG("database", "new query: {} ", q.sql_stmt);
Synchronize _(this);

auto id = pimpl()->query(q);
if ( id ) {
Expand All @@ -388,26 +370,21 @@ Result<std::optional<query::ID>> Database::query(const Query& q) {

void Database::cancel(query::ID id) {
ZEEK_AGENT_DEBUG("database", "canceling query {}", id);
Synchronize _(this);
return pimpl()->cancel(id, false);
}

void Database::poll() {
ZEEK_AGENT_DEBUG("database", "polling database");
Synchronize _(this);
pimpl()->poll();
pimpl()->expire();
}

void Database::expire() {
ZEEK_AGENT_DEBUG("database", "expiring database state");
Synchronize _(this);
pimpl()->expire();
}

void Database::addTable(Table* t) {
Synchronize _(this);

t->setDatabase(this);

if ( configuration().options().use_mock_data )
Expand Down
5 changes: 2 additions & 3 deletions src/core/database.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include "table.h"
#include "util/pimpl.h"
#include "util/result.h"
#include "util/threading.h"

#include <map>
#include <memory>
Expand Down Expand Up @@ -124,9 +123,9 @@ class Scheduler;
* Tables are typically registered at agent startup, and then remain available
* for querying through corresponding SQL statements.
*
* All public methods in this class are thread-safe.
* Note that database methods are not safe against access from different threads.
*/
class Database : public Pimpl<Database>, public SynchronizedBase {
class Database : public Pimpl<Database> {
public:
/**
* Constructor.
Expand Down
Loading

0 comments on commit 6959d07

Please sign in to comment.