Skip to content

Commit

Permalink
Merge pull request #442 from matwey/libboostasio/heartbeat
Browse files Browse the repository at this point in the history
libboostasio: Reimplement heartbeats
  • Loading branch information
EmielBruijntjes authored Jan 12, 2022
2 parents d5a7f3c + 7aa321e commit fc89bb8
Showing 1 changed file with 87 additions and 145 deletions.
232 changes: 87 additions & 145 deletions include/amqpcpp/libboostasio.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ class LibBoostAsioHandler : public virtual TcpHandler
*/
boost::asio::posix::stream_descriptor _socket;

/**
* The boost asynchronous deadline timer.
* @var class boost::asio::deadline_timer
*/
boost::asio::deadline_timer _timer;

/**
* A boolean that indicates if the watcher is monitoring for read events.
* @var _read True if reads are being monitored else false.
Expand All @@ -109,6 +115,7 @@ class LibBoostAsioHandler : public virtual TcpHandler

using handler_cb = boost::function<void(boost::system::error_code,std::size_t)>;
using io_handler = boost::function<void(const boost::system::error_code&, const std::size_t)>;
using timer_handler = boost::function<void(boost::system::error_code)>;

/**
* Builds a io handler callback that executes the io callback in a strand.
Expand Down Expand Up @@ -167,6 +174,35 @@ class LibBoostAsioHandler : public virtual TcpHandler
return get_dispatch_wrapper(fn);
}

/**
* Binds and returns a lamba function handler for the io operation.
* @param connection The connection being watched.
* @param timeout The file descripter being watched.
* @return handler callback
*/
timer_handler get_timer_handler(TcpConnection *const connection, const uint16_t timeout)
{
const auto fn = boost::bind(&Watcher::timeout_handler,
this,
boost::placeholders::_1,
PTR_FROM_THIS(Watcher),
connection,
timeout);

const strand_weak_ptr wpstrand = _wpstrand;

return [fn, wpstrand](const boost::system::error_code &ec)
{
const strand_shared_ptr strand = wpstrand.lock();
if (!strand)
{
fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled));
return;
}
boost::asio::dispatch(strand->context().get_executor(), boost::bind(fn, ec));
};
}

/**
* Handler method that is called by boost's io_context when the socket pumps a read event.
* @param ec The status of the callback.
Expand Down Expand Up @@ -235,6 +271,39 @@ class LibBoostAsioHandler : public virtual TcpHandler
}
}

/**
* Callback method that is called by libev when the timer expires
* @param ec error code returned from loop
* @param loop The loop in which the event was triggered
* @param connection
* @param timeout
*/
void timeout_handler(const boost::system::error_code &ec,
std::weak_ptr<Watcher> awpThis,
TcpConnection *const connection,
const uint16_t timeout)
{
// Resolve any potential problems with dangling pointers
// (remember we are using async).
const std::shared_ptr<Watcher> apTimer = awpThis.lock();
if (!apTimer) { return; }

if (!ec)
{
if (connection)
{
// send the heartbeat
connection->heartbeat();
}

// Reschedule the timer for the future:
_timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout));

// Posts the timer event
_timer.async_wait(get_timer_handler(connection, timeout));
}
}

public:
/**
* Constructor- initialises the watcher and assigns the filedescriptor to
Expand All @@ -248,7 +317,8 @@ class LibBoostAsioHandler : public virtual TcpHandler
const int fd) :
_iocontext(io_context),
_wpstrand(wpstrand),
_socket(io_context)
_socket(io_context),
_timer(io_context)
{
_socket.assign(fd);

Expand All @@ -271,6 +341,7 @@ class LibBoostAsioHandler : public virtual TcpHandler
_read = false;
_write = false;
_socket.release();
stop_timer();
}

/**
Expand Down Expand Up @@ -305,156 +376,32 @@ class LibBoostAsioHandler : public virtual TcpHandler
get_write_handler(connection, fd));
}
}
};

/**
* Timer class to periodically fire a heartbeat
*/
class Timer : public std::enable_shared_from_this<Timer>
{
private:

/**
* The boost asio io_context which is responsible for detecting events.
* @var class boost::asio::io_context&
*/
boost::asio::io_context & _iocontext;

using strand_weak_ptr = std::weak_ptr<boost::asio::io_context::strand>;

/**
* The boost asio io_context::strand managed pointer.
* @var class std::shared_ptr<boost::asio::io_context>
*/
strand_weak_ptr _wpstrand;

/**
* The boost asynchronous deadline timer.
* @var class boost::asio::deadline_timer
*/
boost::asio::deadline_timer _timer;

using handler_fn = boost::function<void(boost::system::error_code)>;

/**
* Binds and returns a lamba function handler for the io operation.
* @param connection The connection being watched.
* @param timeout The file descripter being watched.
* @return handler callback
*/
handler_fn get_handler(TcpConnection *const connection, const uint16_t timeout)
{
const auto fn = boost::bind(&Timer::timeout,
this,
boost::placeholders::_1,
PTR_FROM_THIS(Timer),
connection,
timeout);

const strand_weak_ptr wpstrand = _wpstrand;

return [fn, wpstrand](const boost::system::error_code &ec)
{
const strand_shared_ptr strand = wpstrand.lock();
if (!strand)
{
fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled));
return;
}
boost::asio::dispatch(strand->context().get_executor(), boost::bind(fn, ec));
};
}

/**
* Callback method that is called by libev when the timer expires
* @param ec error code returned from loop
* @param loop The loop in which the event was triggered
* Change the expire time
* @param connection
* @param timeout
*/
void timeout(const boost::system::error_code &ec,
std::weak_ptr<Timer> awpThis,
TcpConnection *const connection,
const uint16_t timeout)
void set_timer(TcpConnection *connection, uint16_t timeout)
{
// Resolve any potential problems with dangling pointers
// (remember we are using async).
const std::shared_ptr<Timer> apTimer = awpThis.lock();
if (!apTimer) { return; }

if (!ec)
{
if (connection)
{
// send the heartbeat
connection->heartbeat();
}
// stop timer in case it was already set
stop_timer();

// Reschedule the timer for the future:
_timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout));
// Reschedule the timer for the future:
_timer.expires_from_now(boost::posix_time::seconds(timeout));

// Posts the timer event
_timer.async_wait(get_handler(connection, timeout));
}
// Posts the timer event
_timer.async_wait(get_timer_handler(connection, timeout));
}

/**
* Stop the timer
*/
void stop()
void stop_timer()
{
// do nothing if it was never set
_timer.cancel();
}

public:
/**
* Constructor
* @param io_context The boost asio io_context.
* @param wpstrand A weak pointer to a io_context::strand instance.
*/
Timer(boost::asio::io_context &io_context,
const strand_weak_ptr wpstrand) :
_iocontext(io_context),
_wpstrand(wpstrand),
_timer(io_context)
{

}

/**
* Timers cannot be copied or moved
*
* @param that The object to not move or copy
*/
Timer(Timer &&that) = delete;
Timer(const Timer &that) = delete;

/**
* Destructor
*/
~Timer()
{
// stop the timer
stop();
}

/**
* Change the expire time
* @param connection
* @param timeout
*/
void set(TcpConnection *connection, uint16_t timeout)
{
// stop timer in case it was already set
stop();

// Reschedule the timer for the future:
_timer.expires_from_now(boost::posix_time::seconds(timeout));

// Posts the timer event
_timer.async_wait(get_handler(connection, timeout));
}
};

/**
Expand All @@ -477,13 +424,6 @@ class LibBoostAsioHandler : public virtual TcpHandler
*/
std::map<int, std::shared_ptr<Watcher> > _watchers;

/**
* The boost asio io_context::deadline_timer managed pointer.
* THIS IS DISABLED FOR NOW BECAUSE THIS BREAKS IF THERE IS MORE THAN ONE CONNECTION
* @var class std::shared_ptr<Timer>
*/
//std::shared_ptr<Timer> _timer;

/**
* Method that is called by AMQP-CPP to register a filedescriptor for readability or writability
* @param connection The TCP connection object that is reporting
Expand Down Expand Up @@ -531,20 +471,22 @@ class LibBoostAsioHandler : public virtual TcpHandler
* @param interval The suggested interval from the server
* @return uint16_t The interval to use
*/
/* THIS IS DISABLED FOR NOW BECAUSE THIS BREAKS IF THERE IS MORE THAN ONE CONNECTION */
/*
virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t interval) override
{
// skip if no heartbeats are needed
if (interval == 0) return 0;

const auto fd = connection->fileno();

auto iter = _watchers.find(fd);
if (iter == _watchers.end()) return 0;

// set the timer
_timer->set(connection, interval);
iter->second->set_timer(connection, interval);

// we agree with the interval
return interval;
}
*/

public:

Expand Down

0 comments on commit fc89bb8

Please sign in to comment.