diff --git a/include/amqpcpp/libboostasio.h b/include/amqpcpp/libboostasio.h index 5f7cabae..0df9011f 100644 --- a/include/amqpcpp/libboostasio.h +++ b/include/amqpcpp/libboostasio.h @@ -83,6 +83,12 @@ class LibBoostAsioHandler : public virtual TcpHandler */ boost::asio::posix::stream_descriptor _socket; + /** + * The boost asynchronous deadline timer. + * @var class boost::asio::deadline_timer + */ + boost::asio::deadline_timer _timer; + /** * A boolean that indicates if the watcher is monitoring for read events. * @var _read True if reads are being monitored else false. @@ -109,6 +115,7 @@ class LibBoostAsioHandler : public virtual TcpHandler using handler_cb = boost::function; using io_handler = boost::function; + using timer_handler = boost::function; /** * Builds a io handler callback that executes the io callback in a strand. @@ -167,6 +174,35 @@ class LibBoostAsioHandler : public virtual TcpHandler return get_dispatch_wrapper(fn); } + /** + * Binds and returns a lamba function handler for the io operation. + * @param connection The connection being watched. + * @param timeout The file descripter being watched. + * @return handler callback + */ + timer_handler get_timer_handler(TcpConnection *const connection, const uint16_t timeout) + { + const auto fn = boost::bind(&Watcher::timeout_handler, + this, + boost::placeholders::_1, + PTR_FROM_THIS(Watcher), + connection, + timeout); + + const strand_weak_ptr wpstrand = _wpstrand; + + return [fn, wpstrand](const boost::system::error_code &ec) + { + const strand_shared_ptr strand = wpstrand.lock(); + if (!strand) + { + fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled)); + return; + } + boost::asio::dispatch(strand->context().get_executor(), boost::bind(fn, ec)); + }; + } + /** * Handler method that is called by boost's io_context when the socket pumps a read event. * @param ec The status of the callback. @@ -235,6 +271,39 @@ class LibBoostAsioHandler : public virtual TcpHandler } } + /** + * Callback method that is called by libev when the timer expires + * @param ec error code returned from loop + * @param loop The loop in which the event was triggered + * @param connection + * @param timeout + */ + void timeout_handler(const boost::system::error_code &ec, + std::weak_ptr awpThis, + TcpConnection *const connection, + const uint16_t timeout) + { + // Resolve any potential problems with dangling pointers + // (remember we are using async). + const std::shared_ptr apTimer = awpThis.lock(); + if (!apTimer) { return; } + + if (!ec) + { + if (connection) + { + // send the heartbeat + connection->heartbeat(); + } + + // Reschedule the timer for the future: + _timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout)); + + // Posts the timer event + _timer.async_wait(get_timer_handler(connection, timeout)); + } + } + public: /** * Constructor- initialises the watcher and assigns the filedescriptor to @@ -248,7 +317,8 @@ class LibBoostAsioHandler : public virtual TcpHandler const int fd) : _iocontext(io_context), _wpstrand(wpstrand), - _socket(io_context) + _socket(io_context), + _timer(io_context) { _socket.assign(fd); @@ -271,6 +341,7 @@ class LibBoostAsioHandler : public virtual TcpHandler _read = false; _write = false; _socket.release(); + stop_timer(); } /** @@ -305,156 +376,32 @@ class LibBoostAsioHandler : public virtual TcpHandler get_write_handler(connection, fd)); } } - }; - - /** - * Timer class to periodically fire a heartbeat - */ - class Timer : public std::enable_shared_from_this - { - private: - - /** - * The boost asio io_context which is responsible for detecting events. - * @var class boost::asio::io_context& - */ - boost::asio::io_context & _iocontext; - - using strand_weak_ptr = std::weak_ptr; - - /** - * The boost asio io_context::strand managed pointer. - * @var class std::shared_ptr - */ - strand_weak_ptr _wpstrand; - - /** - * The boost asynchronous deadline timer. - * @var class boost::asio::deadline_timer - */ - boost::asio::deadline_timer _timer; - - using handler_fn = boost::function; - - /** - * Binds and returns a lamba function handler for the io operation. - * @param connection The connection being watched. - * @param timeout The file descripter being watched. - * @return handler callback - */ - handler_fn get_handler(TcpConnection *const connection, const uint16_t timeout) - { - const auto fn = boost::bind(&Timer::timeout, - this, - boost::placeholders::_1, - PTR_FROM_THIS(Timer), - connection, - timeout); - - const strand_weak_ptr wpstrand = _wpstrand; - - return [fn, wpstrand](const boost::system::error_code &ec) - { - const strand_shared_ptr strand = wpstrand.lock(); - if (!strand) - { - fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled)); - return; - } - boost::asio::dispatch(strand->context().get_executor(), boost::bind(fn, ec)); - }; - } /** - * Callback method that is called by libev when the timer expires - * @param ec error code returned from loop - * @param loop The loop in which the event was triggered + * Change the expire time * @param connection * @param timeout */ - void timeout(const boost::system::error_code &ec, - std::weak_ptr awpThis, - TcpConnection *const connection, - const uint16_t timeout) + void set_timer(TcpConnection *connection, uint16_t timeout) { - // Resolve any potential problems with dangling pointers - // (remember we are using async). - const std::shared_ptr apTimer = awpThis.lock(); - if (!apTimer) { return; } - - if (!ec) - { - if (connection) - { - // send the heartbeat - connection->heartbeat(); - } + // stop timer in case it was already set + stop_timer(); - // Reschedule the timer for the future: - _timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout)); + // Reschedule the timer for the future: + _timer.expires_from_now(boost::posix_time::seconds(timeout)); - // Posts the timer event - _timer.async_wait(get_handler(connection, timeout)); - } + // Posts the timer event + _timer.async_wait(get_timer_handler(connection, timeout)); } /** * Stop the timer */ - void stop() + void stop_timer() { // do nothing if it was never set _timer.cancel(); } - - public: - /** - * Constructor - * @param io_context The boost asio io_context. - * @param wpstrand A weak pointer to a io_context::strand instance. - */ - Timer(boost::asio::io_context &io_context, - const strand_weak_ptr wpstrand) : - _iocontext(io_context), - _wpstrand(wpstrand), - _timer(io_context) - { - - } - - /** - * Timers cannot be copied or moved - * - * @param that The object to not move or copy - */ - Timer(Timer &&that) = delete; - Timer(const Timer &that) = delete; - - /** - * Destructor - */ - ~Timer() - { - // stop the timer - stop(); - } - - /** - * Change the expire time - * @param connection - * @param timeout - */ - void set(TcpConnection *connection, uint16_t timeout) - { - // stop timer in case it was already set - stop(); - - // Reschedule the timer for the future: - _timer.expires_from_now(boost::posix_time::seconds(timeout)); - - // Posts the timer event - _timer.async_wait(get_handler(connection, timeout)); - } }; /** @@ -477,13 +424,6 @@ class LibBoostAsioHandler : public virtual TcpHandler */ std::map > _watchers; - /** - * The boost asio io_context::deadline_timer managed pointer. - * THIS IS DISABLED FOR NOW BECAUSE THIS BREAKS IF THERE IS MORE THAN ONE CONNECTION - * @var class std::shared_ptr - */ - //std::shared_ptr _timer; - /** * Method that is called by AMQP-CPP to register a filedescriptor for readability or writability * @param connection The TCP connection object that is reporting @@ -531,20 +471,22 @@ class LibBoostAsioHandler : public virtual TcpHandler * @param interval The suggested interval from the server * @return uint16_t The interval to use */ - /* THIS IS DISABLED FOR NOW BECAUSE THIS BREAKS IF THERE IS MORE THAN ONE CONNECTION */ - /* virtual uint16_t onNegotiate(TcpConnection *connection, uint16_t interval) override { // skip if no heartbeats are needed if (interval == 0) return 0; + const auto fd = connection->fileno(); + + auto iter = _watchers.find(fd); + if (iter == _watchers.end()) return 0; + // set the timer - _timer->set(connection, interval); + iter->second->set_timer(connection, interval); // we agree with the interval return interval; } - */ public: