From 5bd439271d41fed86af83073461e6f0d83c226fc Mon Sep 17 00:00:00 2001 From: "Dirk-Jan C. Binnema" Date: Wed, 5 Jun 2024 12:21:24 +0300 Subject: [PATCH] store-worker: temporarily revert Of course, after merging some problems come up. Let's fix those first. This reverts commit f2f01595a51380ae38aafb4cd11a0d3c17a33a10. --- lib/mu-indexer.cc | 318 +++++++++++++++++++++++++------------------- lib/mu-server.cc | 74 ++--------- mu/mu-cmd-server.cc | 5 +- mu4e/mu4e-server.el | 18 +-- mu4e/mu4e-update.el | 6 +- mu4e/mu4e.el | 65 +++++---- 6 files changed, 234 insertions(+), 252 deletions(-) diff --git a/lib/mu-indexer.cc b/lib/mu-indexer.cc index bfeac4870..283a11b89 100644 --- a/lib/mu-indexer.cc +++ b/lib/mu-indexer.cc @@ -1,5 +1,5 @@ /* -** Copyright (C) 2020-2024 Dirk-Jan C. Binnema +** Copyright (C) 2020-2023 Dirk-Jan C. Binnema ** ** This program is free software; you can redistribute it and/or modify it ** under the terms of the GNU General Public License as published by the @@ -29,7 +29,6 @@ #include #include #include -#include #include using namespace std::chrono_literals; @@ -43,28 +42,49 @@ using namespace std::chrono_literals; using namespace Mu; -// states +struct IndexState { + enum State { Idle, + Scanning, + Finishing, + Cleaning + }; + static const char* name(State s) { + switch (s) { + case Idle: + return "idle"; + case Scanning: + return "scanning"; + case Finishing: + return "finishing"; + case Cleaning: + return "cleaning"; + default: + return ""; + } + } -enum struct State { Idle, Scanning, Draining }; -constexpr std::string_view -format_as(State s) -{ - switch (s) { - case State::Idle: return "idle"; - case State::Scanning: return "scanning"; - case State::Draining: return "draining"; - default: return ""; + bool operator==(State rhs) const { + return state_.load() == rhs; } -} + bool operator!=(State rhs) const { + return state_.load() != rhs; + } + void change_to(State new_state) { + mu_debug("changing indexer state {}->{}", name((State)state_), + name((State)new_state)); + state_.store(new_state); + } + +private: + std::atomic state_{Idle}; +}; struct Indexer::Private { Private(Mu::Store& store) - : store_{store}, - store_worker_{store.store_worker()}, - scanner_{store_.root_maildir(), + : store_{store}, scanner_{store_.root_maildir(), [this](auto&& path, auto&& statbuf, auto&& info) { - return scan_handler(path, statbuf, info); + return handler(path, statbuf, info); }}, max_message_size_{store_.config().get()}, was_empty_{store.empty()} { @@ -77,55 +97,27 @@ struct Indexer::Private { store.config().get()); } - ~Private() { force_cleanup(); } - - void force_cleanup() { - switch_state(State::Idle); - scanner_.stop(); - if (scanner_worker_.joinable()) - scanner_worker_.join(); - msg_paths_.clear(); - for (auto&& w : workers_) - if (w.joinable()) - w.join(); - store_worker_.clear(); + ~Private() { + stop(); } bool dir_predicate(const std::string& path, const struct dirent* dirent) const; - bool scan_handler(const std::string& fullpath, struct stat* statbuf, - Scanner::HandleType htype); + bool handler(const std::string& fullpath, struct stat* statbuf, Scanner::HandleType htype); void maybe_start_worker(); void item_worker(); void scan_worker(); + bool add_message(const std::string& path); + + bool cleanup(); bool start(const Indexer::Config& conf, bool block); bool stop(); - bool is_running() const { return state_ != State::Idle; } - void switch_state(State new_state) { - mu_debug("changing indexer state {}->{}", state_, new_state); - state_ = new_state; - } - - // pace a bit so scan_items queue doesn't get too big. - void pace_scan_worker() { - while (msg_paths_.size() > 8 * max_workers_) { - std::this_thread::sleep_for(25ms); - continue; - } - } - // pace a bit so store-worker queue doesn't get too big. - void pace_store_worker() { - while (store_worker_.size() > 8) { - std::this_thread::sleep_for(25ms); - continue; - } - } + bool is_running() const { return state_ != IndexState::Idle; } Indexer::Config conf_; - const Store& store_; - StoreWorker& store_worker_; + Store& store_; Scanner scanner_; const size_t max_message_size_; @@ -134,18 +126,27 @@ struct Indexer::Private { std::vector workers_; std::thread scanner_worker_; - AsyncQueue msg_paths_; + struct WorkItem { + std::string full_path; + enum Type { + Dir, + File + }; + Type type; + }; + + AsyncQueue todos_; Progress progress_{}; - std::atomic state_{State::Idle}; + IndexState state_{}; std::mutex lock_, w_lock_; std::atomic completed_{}; bool was_empty_{}; }; bool -Indexer::Private::scan_handler(const std::string& fullpath, struct stat* statbuf, - Scanner::HandleType htype) +Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf, + Scanner::HandleType htype) { switch (htype) { case Scanner::HandleType::EnterDir: @@ -159,7 +160,7 @@ Indexer::Private::scan_handler(const std::string& fullpath, struct stat* statbuf return false; } - // in lazy-mode, we ignore this dir if its dirstamp suggests it + // in lazy-mode, we ignore this dir if its dirstamp suggest it // is up-to-date (this is _not_ always true; hence we call it // lazy-mode); only for actual message dirs, since the dir // tstamps may not bubble up.U @@ -192,16 +193,14 @@ Indexer::Private::scan_handler(const std::string& fullpath, struct stat* statbuf return true; } case Scanner::HandleType::LeaveDir: { - // directly push to store worker, bypass scan-items queue - pace_store_worker(); - store_worker_.push(StoreWorker::SetDirStamp{fullpath, ::time({})}); + todos_.push({fullpath, WorkItem::Type::Dir}); return true; } case Scanner::HandleType::File: { ++progress_.checked; - if (static_cast(statbuf->st_size) > max_message_size_) { + if ((size_t)statbuf->st_size > max_message_size_) { mu_debug("skip {} (too big: {} bytes)", fullpath, statbuf->st_size); return false; } @@ -211,10 +210,9 @@ Indexer::Private::scan_handler(const std::string& fullpath, struct stat* statbuf if (statbuf->st_ctime <= dirstamp_ && store_.contains_message(fullpath)) return false; - // push the remaining messages to our "scan-items" queue for + // push the remaining messages to our "todo" queue for // (re)parsing and adding/updating to the database. - pace_scan_worker(); - msg_paths_.push(std::string{fullpath}); // move? + todos_.push({fullpath, WorkItem::Type::File}); return true; } default: @@ -228,94 +226,158 @@ Indexer::Private::maybe_start_worker() { std::lock_guard lock{w_lock_}; - if (msg_paths_.size() > workers_.size() && workers_.size() < max_workers_) { + if (todos_.size() > workers_.size() && workers_.size() < max_workers_) { workers_.emplace_back(std::thread([this] { item_worker(); })); mu_debug("added worker {}", workers_.size()); } } +bool +Indexer::Private::add_message(const std::string& path) +{ + /* + * Having the lock here makes things a _lot_ slower. + * + * The reason for having the lock is some helgrind warnings; + * but it believed those are _false alarms_ + * https://gitlab.gnome.org/GNOME/glib/-/issues/2662 + */ + //std::unique_lock lock{w_lock_}; + auto msg{Message::make_from_path(path, store_.message_options())}; + if (!msg) { + mu_warning("failed to create message from {}: {}", path, msg.error().what()); + return false; + } + // if the store was empty, we know that the message is completely new + // and can use the fast path (Xapians 'add_document' rather than + // 'replace_document) + auto res = store_.consume_message(std::move(msg.value()), was_empty_); + if (!res) { + mu_warning("failed to add message @ {}: {}", path, res.error().what()); + return false; + } + + return true; +} + void Indexer::Private::item_worker() { - mu_debug("started worker"); - - while (state_ == State::Scanning || - (state_ == State::Draining && !msg_paths_.empty())) { + WorkItem item; - std::string msgpath; - if (!msg_paths_.pop(msgpath, 250ms)) - continue; + mu_debug("started worker"); - auto msg{Message::make_from_path(msgpath, store_.message_options())}; - if (!msg) { - mu_warning("failed to create message from {}: {}", - msgpath, msg.error().what()); + while (state_ == IndexState::Scanning) { + if (!todos_.pop(item, 250ms)) continue; + try { + switch (item.type) { + case WorkItem::Type::File: { + if (G_LIKELY(add_message(item.full_path))) + ++progress_.updated; + } break; + case WorkItem::Type::Dir: + store_.set_dirstamp(item.full_path, ::time(NULL)); + break; + default: + g_warn_if_reached(); + break; + } + } catch (const Mu::Error& er) { + mu_warning("error adding message @ {}: {}", item.full_path, er.what()); } - pace_store_worker(); /* slow down if store-worker q gets too big */ + maybe_start_worker(); + std::this_thread::yield(); + } +} - // if the store was empty, we know that the message is - // completely new and can use the fast path (Xapians - // 'add_document' rather than 'replace_document) - if (was_empty_) - store_worker_.push(StoreWorker::AddMessage{std::move(*msg)}); - else - store_worker_.push(StoreWorker::UpdateMessage{std::move(*msg)}); - ++progress_.updated; +bool +Indexer::Private::cleanup() +{ + mu_debug("starting cleanup"); + + size_t n{}; + std::vector orphans; // store messages without files. + store_.for_each_message_path([&](Store::Id id, const std::string& path) { + ++n; + if (::access(path.c_str(), R_OK) != 0) { + mu_debug("cannot read {} (id={}); queuing for removal from store", + path, id); + orphans.emplace_back(id); + } - maybe_start_worker(); + return state_ == IndexState::Cleaning; + }); + + if (orphans.empty()) + mu_debug("nothing to clean up"); + else { + mu_debug("removing {} stale message(s) from store", orphans.size()); + store_.remove_messages(orphans); + progress_.removed += orphans.size(); } + + return true; } void Indexer::Private::scan_worker() { + progress_.reset(); if (conf_.scan) { mu_debug("starting scanner"); if (!scanner_.start()) { // blocks. mu_warning("failed to start scanner"); - switch_state(State::Idle); + state_.change_to(IndexState::Idle); return; } - mu_debug("scanner finished with {} file(s) in queue", msg_paths_.size()); + mu_debug("scanner finished with {} file(s) in queue", todos_.size()); } - if (conf_.cleanup) { - mu_debug("starting cleanup with work-item(s) left: {}", - store_worker_.size()); - - std::vector orphans; // store messages without files. - store_.for_each_message_path([&](Store::Id id, const std::string& path) { - if (::access(path.c_str(), R_OK) != 0) { - mu_debug("orphan: cannot read {} (id={})", path, id); - orphans.emplace_back(id); - } - return true; + // now there may still be messages in the work queue... + // finish those; this is a bit ugly; perhaps we should + // handle SIGTERM etc. + + if (!todos_.empty()) { + const auto workers_size = std::invoke([this] { + std::lock_guard lock{w_lock_}; + return workers_.size(); }); - progress_.removed = orphans.size(); - if (!orphans.empty()) { - mu_info("removing {} orphan message(s)", orphans.size()); - store_worker_.push(StoreWorker::RemoveMessages{std::move(orphans)}); - } + mu_debug("process {} remaining message(s) with {} worker(s)", + todos_.size(), workers_size); + while (!todos_.empty()) + std::this_thread::sleep_for(100ms); } + // and let the worker finish their work. + state_.change_to(IndexState::Finishing); + for (auto&& w : workers_) + if (w.joinable()) + w.join(); - completed_ = ::time({}); - store_worker_.push(StoreWorker::SetLastIndex{completed_}); + if (conf_.cleanup) { + mu_debug("starting cleanup"); + state_.change_to(IndexState::Cleaning); + cleanup(); + mu_debug("cleanup finished"); + } - stop(); + completed_ = ::time({}); + store_.config().set(completed_); + state_.change_to(IndexState::Idle); } bool Indexer::Private::start(const Indexer::Config& conf, bool block) { - force_cleanup(); + stop(); conf_ = conf; if (conf_.max_threads == 0) { /* benchmarking suggests that ~4 threads is the fastest (the * real bottleneck is the database, so adding more threads just - * slows things down) */ + * slows things down) + */ max_workers_ = std::min(4U, std::thread::hardware_concurrency()); } else max_workers_ = conf.max_threads; @@ -329,14 +391,11 @@ Indexer::Private::start(const Indexer::Config& conf, bool block) mu_debug("indexing: {}; clean-up: {}", conf_.scan ? "yes" : "no", conf_.cleanup ? "yes" : "no"); - progress_.reset(); - switch_state(State::Scanning); + state_.change_to(IndexState::Scanning); /* kick off the first worker, which will spawn more if needed. */ - workers_.emplace_back(std::thread([this]{item_worker();})); - /* kick the file-system-scanner thread */ - if (scanner_worker_.joinable()) - scanner_worker_.join(); // kill old one - scanner_worker_ = std::thread([this]{scan_worker();}); + workers_.emplace_back(std::thread([this] { item_worker(); })); + /* kick the disk-scanner thread */ + scanner_worker_ = std::thread([this] { scan_worker(); }); mu_debug("started indexer in {}-mode", block ? "blocking" : "non-blocking"); if (block) { @@ -352,33 +411,18 @@ Indexer::Private::start(const Indexer::Config& conf, bool block) bool Indexer::Private::stop() { - switch_state(State::Draining); - scanner_.stop(); - // cannot join scanner_worker_ here since it may be our - // current thread. - // wait for completion. - while (!msg_paths_.empty()) { - mu_debug("scan-items left: {}", msg_paths_.size()); - std::this_thread::sleep_for(250ms); - } + todos_.clear(); + if (scanner_worker_.joinable()) + scanner_worker_.join(); + state_.change_to(IndexState::Idle); for (auto&& w : workers_) if (w.joinable()) w.join(); workers_.clear(); - store_worker_.push(StoreWorker::EndTransaction()); - - // wait for completion. - while (!store_worker_.empty()) { - mu_debug("work-items left: {}", store_worker_.size()); - std::this_thread::sleep_for(250ms); - } - - switch_state(State::Idle); - return true; } @@ -425,7 +469,7 @@ Indexer::is_running() const const Indexer::Progress& Indexer::progress() const { - priv_->progress_.running = priv_->state_ == State::Idle ? false : true; + priv_->progress_.running = priv_->state_ == IndexState::Idle ? false : true; return priv_->progress_; } diff --git a/lib/mu-server.cc b/lib/mu-server.cc index e18ae3907..62c9ca0c6 100644 --- a/lib/mu-server.cc +++ b/lib/mu-server.cc @@ -1,5 +1,5 @@ /* -** Copyright (C) 2020-2024 Dirk-Jan C. Binnema +** Copyright (C) 2020-2023 Dirk-Jan C. Binnema ** ** This program is free software; you can redistribute it and/or modify it ** under the terms of the GNU General Public License as published by the @@ -29,12 +29,9 @@ #include #include #include -#include #include #include - - #include #include #include @@ -119,7 +116,7 @@ struct OutputStream { } private: - std::string fname_; + std::string fname_; using OutType = std::variant; OutType out_; }; @@ -129,21 +126,11 @@ struct OutputStream { /// @brief object to manage the server-context for all commands. struct Server::Private { Private(Store& store, const Server::Options& opts, Output output) - : store_{store}, - store_worker_{store.store_worker()}, - options_{opts}, output_{output}, + : store_{store}, options_{opts}, output_{output}, command_handler_{make_command_map()}, keep_going_{true}, - tmp_dir_{unwrap(make_temp_dir())} { - - // tell the store-worker that we (this class) can handle - // sexp strings. - store_worker_.install_sexp_handler( - [this](const std::string& sexp) { - this->invoke(sexp); - }); - - } + tmp_dir_{unwrap(make_temp_dir())} + {} ~Private() { indexer().stop(); @@ -161,10 +148,13 @@ struct Server::Private { // acccessors Store& store() { return store_; } const Store& store() const { return store_; } - StoreWorker& store_worker() { return store_worker_; } Indexer& indexer() { return store().indexer(); } //CommandMap& command_map() const { return command_map_; } + // + // invoke + // + bool invoke(const std::string& expr) noexcept; // // output @@ -196,19 +186,7 @@ struct Server::Private { void remove_handler(const Command& cmd); void view_handler(const Command& cmd); - bool keep_going() const { return keep_going_; } - void set_keep_going(bool going) { keep_going_ = going; } - - // make main thread wait until done with the command. - std::mutex done_lock_; - std::condition_variable done_cond_; - private: - // - // invoke - // - bool invoke(const std::string& expr) noexcept; - void move_docid(Store::Id docid, Option flagstr, bool new_name, bool no_view); @@ -231,7 +209,6 @@ struct Server::Private { std::ofstream make_temp_file_stream(std::string& fname) const; Store& store_; - StoreWorker& store_worker_; Server::Options options_; Server::Output output_; const CommandHandler command_handler_; @@ -503,10 +480,6 @@ Server::Private::invoke(const std::string& expr) noexcept keep_going_ = false; } - // tell main thread we're done with the command. - std::lock_guard l{done_lock_}; - done_cond_.notify_one(); - return keep_going_; } @@ -586,9 +559,7 @@ Server::Private::data_handler(const Command& cmd) { const auto request_type{unwrap(cmd.symbol_arg(":kind"))}; - if (request_type == "doccount") { - output(mu_format("(:doccount {})", store().size())); - } else if (request_type == "maildirs") { + if (request_type == "maildirs") { auto&& out{make_output_stream()}; mu_print(out, "("); for (auto&& mdir: store().maildirs()) @@ -719,6 +690,9 @@ Server::Private::find_handler(const Command& cmd) StopWatch sw{mu_format("{} (indexing: {})", __func__, indexer().is_running() ? "yes" : "no")}; + // we need to _lock_ the store while querying (which likely consists of + // multiple actual queries) + grabbing the results. + std::lock_guard l{store_.lock()}; auto qres{store_.run_query(q, sort_field_id, qflags, maxnum)}; if (!qres) throw Error(Error::Code::Query, "failed to run query: {}", qres.error().what()); @@ -1107,27 +1081,7 @@ Server::~Server() = default; bool Server::invoke(const std::string& expr) noexcept { - /* a _little_ hacky; handle _quit_ directly to properly - * shut down the server */ - if (expr == "(quit)") { - mu_debug("quitting"); - priv_->set_keep_going(false); - return false; - } - - /* - * feed the command to the queue; it'll get executed in the - * store-worker thread; however, sync on its completion - * so we get its keep_going() result - * - * as an added bonus, this ensures mu server shell doesn't require an - * extra user RET to get back the prompt - */ - std::unique_lock done_lock{priv_->done_lock_}; - priv_->store_worker().push(StoreWorker::SexpCommand{expr}); - priv_->done_cond_.wait(done_lock); - - return priv_->keep_going(); + return priv_->invoke(expr); } /* LCOV_EXCL_STOP */ diff --git a/mu/mu-cmd-server.cc b/mu/mu-cmd-server.cc index 50c7c010f..3a694566a 100644 --- a/mu/mu-cmd-server.cc +++ b/mu/mu-cmd-server.cc @@ -79,12 +79,11 @@ cookie(size_t n) ::printf(COOKIE_PRE "%x" COOKIE_POST, num); } + + static void output_stdout(const std::string& str, Server::OutputFlags flags) { - // Note: with the StoreWorker, we _always_ need to flush - flags |= Server::OutputFlags::Flush; - cookie(str.size() + 1); if (G_UNLIKELY(::puts(str.c_str()) < 0)) { mu_critical("failed to write output '{}'", str); diff --git a/mu4e/mu4e-server.el b/mu4e/mu4e-server.el index 0790501da..fae835148 100644 --- a/mu4e/mu4e-server.el +++ b/mu4e/mu4e-server.el @@ -385,10 +385,7 @@ The server output is as follows: ((plist-get sexp :info) (funcall mu4e-info-func sexp)) - ;; get some data XXX generalize - ((plist-get sexp :doccount) - (plist-put mu4e--server-props :doccount - (mu4e--server-plist-get sexp :doccount))) + ;; get some data ((plist-get sexp :maildirs) (setq mu4e-maildir-list (mu4e--server-plist-get sexp :maildirs))) @@ -559,16 +556,9 @@ get at most MAX contacts." (defun mu4e--server-data (kind) "Request data of some KIND. -KIND is a symbol or a list of symbols. Currently supported kinds: - `maildirs', `doccount'." - (pcase kind - ((pred (lambda (k) (memq k '(maildirs doccount)))) - (mu4e--server-call-mu `(data :kind ,kind))) - ((pred listp) - (when kind - (mu4e--server-data (car kind)) - (mu4e--server-data (cdr kind)))) - (_ (mu4e-error "Unexpected kind %s" kind)))) +KIND is a symbol. Currently supported kinds: maildirs." + (mu4e--server-call-mu + `(data :kind ,kind))) (defun mu4e--server-find (query threads sortfield sortdir maxnum skip-dups include-related) diff --git a/mu4e/mu4e-update.el b/mu4e/mu4e-update.el index 11228fac2..5bb9e1da1 100644 --- a/mu4e/mu4e-update.el +++ b/mu4e/mu4e-update.el @@ -130,9 +130,9 @@ changed") If non-nil, this is a plist of the form: \( :checked (checked whether up-to-date) -:updated -:cleaned-up -:stamp ") +:updated ;; Maintainer: Dirk-Jan C. Binnema @@ -203,47 +203,42 @@ Otherwise, check requirements, then start mu4e. When successful, invoke (_ (mu4e-error "Error %d: %s" errcode errmsg)))) (defun mu4e--update-status (info) - "Update `mu4e-index-update-status' with INFO. -Return the former. As an additional side-effect, updates -doccount in server-properties." + "Update the status message with INFO." (setq mu4e-index-update-status - `(:tstamp ,(current-time) - :checked ,(or (plist-get info :checked) 0) - :updated ,(or (plist-get info :updated) 0) - :cleaned-up ,(or (plist-get info :cleaned-up) 0))) - mu4e-index-update-status) + `(:tstamp ,(current-time) + :checked ,(plist-get info :checked) + :updated ,(plist-get info :updated) + :cleaned-up ,(plist-get info :cleaned-up)))) (defun mu4e--info-handler (info) "Handler function for (:INFO ...) sexps received from server." - (let* ((type (plist-get info :info))) + (let* ((type (plist-get info :info)) + (checked (plist-get info :checked)) + (updated (plist-get info :updated)) + (cleaned-up (plist-get info :cleaned-up))) (cond ((eq type 'add) t) ;; do nothing ((eq type 'index) - (let* ((info (mu4e--update-status info)) - (checked (plist-get info :checked)) - (updated (plist-get info :updated)) - (cleaned-up (plist-get info :cleaned-up))) - (if (eq (plist-get info :status) 'running) - (mu4e-index-message - "Indexing... checked %d, updated %d" - checked updated) - (progn ;; i.e. 'complete - (mu4e-index-message - "%s completed; checked %d, updated %d, cleaned-up %d" - (if mu4e-index-lazy-check "Lazy indexing" "Indexing") - checked updated cleaned-up) - ;; index done; grab updated queries - (mu4e--query-items-refresh) - (run-hooks 'mu4e-index-updated-hook) - ;; backward compatibility... - (unless (zerop (+ updated cleaned-up)) - mu4e-message-changed-hook) - (unless (and (not (string= mu4e--contacts-tstamp "0")) - (zerop (plist-get info :updated))) - (mu4e--request-contacts-maybe) - (mu4e--server-data '(maildirs doccount))) - ;; update maildir list & doccount - (mu4e--main-redraw))))) + (if (eq (plist-get info :status) 'running) + (mu4e-index-message + "Indexing... checked %d, updated %d" checked updated) + (progn ;; i.e. 'complete + (mu4e--update-status info) + (mu4e-index-message + "%s completed; checked %d, updated %d, cleaned-up %d" + (if mu4e-index-lazy-check "Lazy indexing" "Indexing") + checked updated cleaned-up) + ;; index done; grab updated queries + (mu4e--query-items-refresh) + (run-hooks 'mu4e-index-updated-hook) + ;; backward compatibility... + (unless (zerop (+ updated cleaned-up)) + mu4e-message-changed-hook) + (unless (and (not (string= mu4e--contacts-tstamp "0")) + (zerop (plist-get info :updated))) + (mu4e--request-contacts-maybe) + (mu4e--server-data 'maildirs)) ;; update maildir list + (mu4e--main-redraw)))) ((plist-get info :message) (mu4e-index-message "%s" (plist-get info :message))))))