Skip to content

Commit

Permalink
Merge pull request #2301 from mavlink/pr-timeout-fixes
Browse files Browse the repository at this point in the history
TimeoutHandler fixes
  • Loading branch information
julianoes committed May 16, 2024
2 parents e815b8b + 683d2ac commit 8fb12e3
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 90 deletions.
11 changes: 11 additions & 0 deletions src/mavsdk/core/mavlink_command_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,15 @@ MavlinkCommandSender::MavlinkCommandSender(SystemImpl& system_impl) : _system_im

MavlinkCommandSender::~MavlinkCommandSender()
{
if (_command_debugging) {
LogDebug() << "CommandSender destroyed";
}
_system_impl.unregister_all_mavlink_message_handlers(this);

LockedQueue<Work>::Guard work_queue_guard(_work_queue);
for (const auto& work : _work_queue) {
_system_impl.unregister_timeout_handler(work->timeout_cookie);
}
}

MavlinkCommandSender::Result
Expand Down Expand Up @@ -285,6 +293,9 @@ void MavlinkCommandSender::receive_command_ack(mavlink_message_t message)

void MavlinkCommandSender::receive_timeout(const CommandIdentification& identification)
{
if (_command_debugging) {
LogDebug() << "Got timeout!";
}
bool found_command = false;
CommandResultCallback temp_callback = nullptr;
std::pair<Result, float> temp_result{Result::UnknownError, NAN};
Expand Down
57 changes: 13 additions & 44 deletions src/mavsdk/core/system_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,6 @@ void SystemImpl::init(uint8_t system_id, uint8_t comp_id)
[this](const mavlink_message_t& message) { process_autopilot_version(message); },
this);

// register_mavlink_command_handler(
// MAV_CMD_REQUEST_AUTOPILOT_CAPABILITIES,
// [this](const MavlinkCommandReceiver::CommandLong& command) {
// return process_autopilot_version_request(command);
// },
// this);

//// TO-DO!
// register_mavlink_command_handler(
// MAV_CMD_REQUEST_MESSAGE,
// [this](const MavlinkCommandReceiver::CommandLong& command) {
// return make_command_ack_message(command, MAV_RESULT::MAV_RESULT_UNSUPPORTED);
// },
// this);

add_new_component(comp_id);
}

Expand Down Expand Up @@ -313,17 +298,6 @@ void SystemImpl::system_thread()
}
}

// std::optional<mavlink_message_t>
// SystemImpl::process_autopilot_version_request(const MavlinkCommandReceiver::CommandLong& command)
//{
// if (_should_send_autopilot_version) {
// send_autopilot_version();
// return make_command_ack_message(command, MAV_RESULT::MAV_RESULT_ACCEPTED);
// }
//
// return {};
//}

std::string SystemImpl::component_name(uint8_t component_id)
{
switch (component_id) {
Expand Down Expand Up @@ -495,22 +469,6 @@ bool SystemImpl::queue_message(
}

void SystemImpl::send_autopilot_version_request()
{
auto prom = std::promise<MavlinkCommandSender::Result>();
auto fut = prom.get_future();

send_autopilot_version_request_async(
[&prom](MavlinkCommandSender::Result result, float) { prom.set_value(result); });

if (fut.get() == MavlinkCommandSender::Result::Unsupported) {
_old_message_520_supported = false;
LogWarn()
<< "Trying alternative command REQUEST_MESSAGE instead of REQUEST_AUTOPILOT_CAPABILITIES next.";
}
}

void SystemImpl::send_autopilot_version_request_async(
const MavlinkCommandSender::CommandResultCallback& callback)
{
MavlinkCommandSender::CommandLong command{};
command.target_component_id = get_autopilot_id();
Expand All @@ -525,7 +483,18 @@ void SystemImpl::send_autopilot_version_request_async(
command.params.maybe_param1 = {static_cast<float>(MAVLINK_MSG_ID_AUTOPILOT_VERSION)};
}

send_command_async(command, callback);
send_command_async(command, [this](MavlinkCommandSender::Result result, float) {
receive_autopilot_version_request_ack(result);
});
}

void SystemImpl::receive_autopilot_version_request_ack(MavlinkCommandSender::Result result)
{
if (result == MavlinkCommandSender::Result::Unsupported) {
_old_message_520_supported = false;
LogWarn()
<< "Trying alternative command REQUEST_MESSAGE instead of REQUEST_AUTOPILOT_CAPABILITIES next.";
}
}

void SystemImpl::set_connected()
Expand Down Expand Up @@ -567,7 +536,7 @@ void SystemImpl::set_connected()

if (enable_needed) {
if (has_autopilot()) {
send_autopilot_version_request_async(nullptr);
send_autopilot_version_request();
}

std::lock_guard<std::mutex> lock(_plugin_impls_mutex);
Expand Down
5 changes: 1 addition & 4 deletions src/mavsdk/core/system_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,6 @@ class SystemImpl {
const std::string& filename, int linenumber, const std::function<void()>& func);

void send_autopilot_version_request();
void send_autopilot_version_request_async(
const MavlinkCommandSender::CommandResultCallback& callback);

MavlinkMissionTransferClient& mission_transfer_client() { return _mission_transfer_client; };

Expand All @@ -307,8 +305,7 @@ class SystemImpl {
void set_connected();
void set_disconnected();

std::optional<mavlink_message_t>
process_autopilot_version_request(const MavlinkCommandReceiver::CommandLong& command);
void receive_autopilot_version_request_ack(MavlinkCommandSender::Result result);

static std::string component_name(uint8_t component_id);
static System::ComponentType component_type(uint8_t component_id);
Expand Down
71 changes: 36 additions & 35 deletions src/mavsdk/core/timeout_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,19 @@ TimeoutHandler::TimeoutHandler(Time& time) : _time(time) {}

void TimeoutHandler::add(std::function<void()> callback, double duration_s, void** cookie)
{
auto new_timeout = std::make_shared<Timeout>();
new_timeout->callback = callback;
new_timeout->time = _time.steady_time_in_future(duration_s);
new_timeout->duration_s = duration_s;

void* new_cookie = static_cast<void*>(new_timeout.get());

{
std::lock_guard<std::mutex> lock(_timeouts_mutex);
_timeouts.insert(std::pair<void*, std::shared_ptr<Timeout>>(new_cookie, new_timeout));
}
std::lock_guard<std::mutex> lock(_timeouts_mutex);
auto new_timeout = Timeout{};
new_timeout.callback = callback;
new_timeout.time = _time.steady_time_in_future(duration_s);
new_timeout.duration_s = duration_s;
new_timeout.cookie = _next_cookie++;
_timeouts.push_back(new_timeout);

if (cookie != nullptr) {
*cookie = new_cookie;
// TODO: change this API in the future
*cookie = reinterpret_cast<void*>(new_timeout.cookie);
}
_iterator_invalidated = true;
}

void TimeoutHandler::refresh(const void* cookie)
Expand All @@ -31,10 +29,12 @@ void TimeoutHandler::refresh(const void* cookie)

std::lock_guard<std::mutex> lock(_timeouts_mutex);

auto it = _timeouts.find(const_cast<void*>(cookie));
auto it = std::find_if(_timeouts.begin(), _timeouts.end(), [&](const Timeout& timeout) {
return timeout.cookie == reinterpret_cast<uint64_t>(cookie);
});
if (it != _timeouts.end()) {
auto future_time = _time.steady_time_in_future(it->second->duration_s);
it->second->time = future_time;
auto future_time = _time.steady_time_in_future(it->duration_s);
it->time = future_time;
}
}

Expand All @@ -46,46 +46,47 @@ void TimeoutHandler::remove(const void* cookie)

std::lock_guard<std::mutex> lock(_timeouts_mutex);

auto it = _timeouts.find(const_cast<void*>(cookie));
auto it = std::find_if(_timeouts.begin(), _timeouts.end(), [&](auto& timeout) {
return timeout.cookie == reinterpret_cast<uint64_t>(cookie);
});

if (it != _timeouts.end()) {
_timeouts.erase(const_cast<void*>(cookie));
_timeouts.erase(it);
_iterator_invalidated = true;
}
}

void TimeoutHandler::run_once()
{
_timeouts_mutex.lock();
std::unique_lock<std::mutex> lock(_timeouts_mutex);

auto now = _time.steady_time();

for (auto it = _timeouts.begin(); it != _timeouts.end(); /* no ++it */) {
// If time is passed, call timeout callback.
if (it->second->time < now) {
if (it->second->callback) {
// Get a copy for the callback because we will remove it.
std::function<void()> callback = it->second->callback;
if (it->time < now) {
// Get a copy for the callback because we will remove it.
auto callback = it->callback;

// Self-destruct before calling to avoid locking issues.
_timeouts.erase(it++);
// Self-destruct before calling to avoid locking issues.
it = _timeouts.erase(it);

// Unlock while we callback because it might in turn want to add timeouts.
_timeouts_mutex.unlock();
if (callback) {
// Unlock while we call back because it might in turn want to add timeouts.
lock.unlock();
callback();
_timeouts_mutex.lock();
}
lock.lock();

// We start over if anyone has messed with this while we called the callback.
if (_iterator_invalidated) {
_iterator_invalidated = false;
it = _timeouts.begin();
}
}
} else {
++it;
}

// We start over if anyone has messed with this while we called the callback.
if (_iterator_invalidated) {
_iterator_invalidated = false;
it = _timeouts.begin();
}
}
_timeouts_mutex.unlock();
}

} // namespace mavsdk
11 changes: 8 additions & 3 deletions src/mavsdk/core/timeout_handler.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#pragma once

#include "mavsdk_time.h"

#include <cstdint>
#include <mutex>
#include <memory>
#include <functional>
#include <unordered_map>
#include "mavsdk_time.h"
#include <vector>

namespace mavsdk {

Expand All @@ -30,13 +32,16 @@ class TimeoutHandler {
std::function<void()> callback{};
SteadyTimePoint time{};
double duration_s{0.0};
uint64_t cookie{0};
};

std::unordered_map<void*, std::shared_ptr<Timeout>> _timeouts{};
std::vector<Timeout> _timeouts{};
std::mutex _timeouts_mutex{};
bool _iterator_invalidated{false};

Time& _time;

uint64_t _next_cookie{1};
};

} // namespace mavsdk
6 changes: 2 additions & 4 deletions src/mavsdk/plugins/gimbal/gimbal_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,7 @@ Gimbal::ControlHandle GimbalImpl::subscribe_control(const Gimbal::ControlCallbac

if (result.first) {
wait_for_protocol_async([=]() {
// We don't lock here because we don't expect the protocol to change once it has been
// set.
std::lock_guard<std::mutex> lock(_mutex);
_gimbal_protocol->control_async([this](Gimbal::ControlStatus status) {
_control_subscriptions.queue(
status, [this](const auto& func) { _system_impl->call_user_callback(func); });
Expand Down Expand Up @@ -254,8 +253,7 @@ Gimbal::AttitudeHandle GimbalImpl::subscribe_attitude(const Gimbal::AttitudeCall

if (result.first) {
wait_for_protocol_async([=]() {
// We don't lock here because we don't expect the protocol to change once it has been
// set.
std::lock_guard<std::mutex> lock(_mutex);
_gimbal_protocol->attitude_async([this](Gimbal::Attitude attitude) {
_attitude_subscriptions.queue(
attitude, [this](const auto& func) { _system_impl->call_user_callback(func); });
Expand Down

0 comments on commit 8fb12e3

Please sign in to comment.