diff --git a/include/amqpcpp/connectionhandler.h b/include/amqpcpp/connectionhandler.h index ceceb25e..d024f90e 100644 --- a/include/amqpcpp/connectionhandler.h +++ b/include/amqpcpp/connectionhandler.h @@ -192,6 +192,39 @@ class ConnectionHandler // make sure compilers dont complain about unused parameters (void) connection; } + + /** + * Method that is called when the AMQP connection was blocked. + * + * This method is called, when the server connection gets blocked for the first + * time due to the broker running low on a resource (memory or disk). For + * example, when a RabbitMQ node detects that it is low on RAM, it sends a + * notification to all connected publishing clients supporting this feature. + * If before the connections are unblocked the node also starts running low on + * disk space, another notification will not be sent. + * + * @param connection The connection that was blocked + * @param reason Why was the connection blocked + */ + virtual void onBlocked(Connection *connection, const char *reason) + { + // make sure compilers dont complain about unused parameters + (void) connection; + } + + /** + * Method that is called when the AMQP connection is no longer blocked. + * + * This method is called when all resource alarms have cleared and the + * connection is fully unblocked. + * + * @param connection The connection that is no longer blocked + */ + virtual void onUnblocked(Connection *connection) + { + // make sure compilers dont complain about unused parameters + (void) connection; + } }; /** diff --git a/include/amqpcpp/connectionimpl.h b/include/amqpcpp/connectionimpl.h index 6d9e32ab..c38b7231 100644 --- a/include/amqpcpp/connectionimpl.h +++ b/include/amqpcpp/connectionimpl.h @@ -66,7 +66,7 @@ class ConnectionImpl : public Watchable state_handshake, // busy with the handshake to open the connection state_connected, // connection is set up and ready for communication state_closing, // connection is busy closing (we have sent the close frame) - state_closed // connection is closed + state_closed, // connection is closed } _state = state_protocol; /** @@ -428,6 +428,25 @@ class ConnectionImpl : public Watchable _handler->onClosed(_parent); } + /** + * Report that the connection is blocked + * @param reason + */ + void reportBlocked(const char *reason) + { + // inform the handler + _handler->onBlocked(_parent, reason); + } + + /** + * Report that the connection is unblocked + */ + void reportUnblocked() + { + // inform the handler + _handler->onUnblocked(_parent); + } + /** * Retrieve the amount of channels this connection has * @return std::size_t diff --git a/include/amqpcpp/linux_tcp/tcpconnection.h b/include/amqpcpp/linux_tcp/tcpconnection.h index 95e3f4d7..7cfd533e 100644 --- a/include/amqpcpp/linux_tcp/tcpconnection.h +++ b/include/amqpcpp/linux_tcp/tcpconnection.h @@ -116,7 +116,28 @@ class TcpConnection : * @param connection The connection that was closed and that is now unusable */ virtual void onClosed(Connection *connection) override; - + + /** + * Method that is called when the AMQP connection was blocked. + * @param connection The connection that was blocked + * @param reason Why was the connection blocked + */ + virtual void onBlocked(Connection *connection, const char *reason) override + { + // pass to user space + if (_handler) _handler->onBlocked(this, reason); + } + + /** + * Method that is called when the AMQP connection is no longer blocked. + * @param connection The connection that is no longer blocked + */ + virtual void onUnblocked(Connection *connection) + { + // pass to user space + if (_handler) _handler->onUnblocked(this); + } + /** * Method that is called when the tcp connection has been established * @param state diff --git a/include/amqpcpp/linux_tcp/tcphandler.h b/include/amqpcpp/linux_tcp/tcphandler.h index 2322a02c..2389e4c8 100644 --- a/include/amqpcpp/linux_tcp/tcphandler.h +++ b/include/amqpcpp/linux_tcp/tcphandler.h @@ -190,7 +190,40 @@ class TcpHandler // make sure compilers dont complain about unused parameters (void) connection; } - + + /** + * Method that is called when the AMQP connection was blocked. + * + * This method is called, when the server connection gets blocked for the first + * time due to the broker running low on a resource (memory or disk). For + * example, when a RabbitMQ node detects that it is low on RAM, it sends a + * notification to all connected publishing clients supporting this feature. + * If before the connections are unblocked the node also starts running low on + * disk space, another notification will not be sent. + * + * @param connection The connection that was blocked + * @param reason Why was the connection blocked + */ + virtual void onBlocked(TcpConnection *connection, const char *reason) + { + // make sure compilers dont complain about unused parameters + (void) connection; + } + + /** + * Method that is called when the AMQP connection is no longer blocked. + * + * This method is called when all resource alarms have cleared and the + * connection is fully unblocked. + * + * @param connection The connection that is no longer blocked + */ + virtual void onUnblocked(TcpConnection *connection) + { + // make sure compilers dont complain about unused parameters + (void) connection; + } + /** * Method that is called when the TCP connection is lost or closed. This * is always called if you have also received a call to onConnected(). diff --git a/src/connectionblockframe.h b/src/connectionblockframe.h new file mode 100644 index 00000000..ab789df0 --- /dev/null +++ b/src/connectionblockframe.h @@ -0,0 +1,108 @@ +/** + * Class describing a connection blocked frame + * + * This frame is sent by the server to the client, when their connection gets + * blocked for the first time due to the broker running low on a resource + * (memory or disk). For example, when a RabbitMQ node detects that it is low + * on RAM, it sends a notification to all connected publishing clients + * supporting this feature. If before the connections are unblocked the node + * also starts running low on disk space, another notification will not be sent. + * + * @copyright 2023 Copernica BV + */ + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class implementation + */ +class ConnectionBlockFrame : public ConnectionFrame +{ +private: + /** + * The reason for blocking + * @var ShortString + */ + ShortString _reason; + +protected: + /** + * Encode a frame on a string buffer + * + * @param buffer buffer to write frame to + */ + virtual void fill(OutBuffer& buffer) const override + { + // call base + ConnectionFrame::fill(buffer); + + // encode the field + _reason.fill(buffer); + } + +public: + /** + * Construct a connection blocked frame from a received frame + * + * @param frame received frame + */ + ConnectionBlockFrame(ReceivedFrame &frame) : + ConnectionFrame(frame), + _reason(frame) + {} + + /** + * Construct a connection blocked frame + * + * @param reason the reason for blocking + */ + ConnectionBlockFrame(uint16_t code, std::string reason) : + ConnectionFrame((uint32_t)(reason.length() + 1)), // 1 for extra string byte + _reason(std::move(reason)) + {} + + /** + * Destructor + */ + virtual ~ConnectionBlockFrame() {} + + /** + * Method id + * @return uint16_t + */ + virtual uint16_t methodID() const override + { + return 60; + } + + /** + * Get the reason for blocking + * @return string + */ + const std::string& reason() const + { + return _reason; + } + + /** + * Process the frame + * @param connection + */ + virtual bool process(ConnectionImpl *connection) override + { + // report that it is blocked + connection->reportBlocked(this->reason().c_str()); + + // done + return true; + } +}; + +/** + * end namespace + */ +} + diff --git a/src/connectionstartframe.h b/src/connectionstartframe.h index 1fdb5cb4..55c90c77 100644 --- a/src/connectionstartframe.h +++ b/src/connectionstartframe.h @@ -219,6 +219,9 @@ class ConnectionStartFrame : public ConnectionFrame // queue lives dies, we want to receive a notification that the consumer is no longer alive) capabilities["consumer_cancel_notify"] = true; + // when the rabbitmq server reaches its max capacity, it can send a notification to us, we want them + capabilities["connection.blocked"] = true; + // fill the peer properties if (!properties.contains("version")) properties["version"] = "AMQP-CPP " VERSION_NAME; if (!properties.contains("copyright")) properties["copyright"] = "Copernica AMQP-CPP library :: Copyright 2015-2023 Copernica BV"; diff --git a/src/connectionunblockframe.h b/src/connectionunblockframe.h new file mode 100644 index 00000000..997af566 --- /dev/null +++ b/src/connectionunblockframe.h @@ -0,0 +1,81 @@ +/** + * Class describing a connection unblocked frame + * + * This frame is sent by the server to the client, when all resource alarms + * have cleared and the connection is fully unblocked. + * + * @copyright 2023 Copernica BV + */ + +/** + * Set up namespace + */ +namespace AMQP { + +/** + * Class implementation + */ +class ConnectionUnblockFrame : public ConnectionFrame +{ +protected: + /** + * Encode a frame on a string buffer + * + * @param buffer buffer to write frame to + */ + virtual void fill(OutBuffer& buffer) const override + { + // call base + ConnectionFrame::fill(buffer); + } + +public: + /** + * Construct a connection unblocked frame from a received frame + * + * @param frame received frame + */ + ConnectionUnblockFrame(ReceivedFrame &frame) : + ConnectionFrame(frame) + {} + + /** + * Construct a connection unblocked frame + */ + ConnectionUnblockFrame(uint16_t code, std::string reason) : + ConnectionFrame(0) + {} + + /** + * Destructor + */ + virtual ~ConnectionUnblockFrame() {} + + /** + * Method id + * @return uint16_t + */ + virtual uint16_t methodID() const override + { + return 61; + } + + /** + * Process the frame + * @param connection + */ + virtual bool process(ConnectionImpl *connection) override + { + // report that it is no longer blocked + connection->reportUnblocked(); + + // done + return true; + } +}; + +/** + * end namespace + */ +} + diff --git a/src/receivedframe.cpp b/src/receivedframe.cpp index 742274ab..62ba98a2 100644 --- a/src/receivedframe.cpp +++ b/src/receivedframe.cpp @@ -19,6 +19,8 @@ #include "connectiontuneframe.h" #include "connectioncloseokframe.h" #include "connectioncloseframe.h" +#include "connectionblockframe.h" +#include "connectionunblockframe.h" #include "channelopenframe.h" #include "channelopenokframe.h" #include "channelflowframe.h" @@ -195,6 +197,8 @@ bool ReceivedFrame::processConnectionFrame(ConnectionImpl *connection) case 41: return ConnectionOpenOKFrame(*this).process(connection); case 50: return ConnectionCloseFrame(*this).process(connection); case 51: return ConnectionCloseOKFrame(*this).process(connection); + case 60: return ConnectionBlockFrame(*this).process(connection); + case 61: return ConnectionUnblockFrame(*this).process(connection); } // this is a problem