Skip to content

Commit

Permalink
Merge pull request CopernicaMarketingSoftware#502 from CopernicaMarke…
Browse files Browse the repository at this point in the history
…tingSoftware/connection-blocked-notification

Implement connection.blocked notification handling
  • Loading branch information
EmielBruijntjes authored Jul 3, 2023
2 parents 12f4314 + 89ba913 commit 19b7136
Show file tree
Hide file tree
Showing 8 changed files with 305 additions and 3 deletions.
33 changes: 33 additions & 0 deletions include/amqpcpp/connectionhandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,39 @@ class ConnectionHandler
// make sure compilers dont complain about unused parameters
(void) connection;
}

/**
* Method that is called when the AMQP connection was blocked.
*
* This method is called, when the server connection gets blocked for the first
* time due to the broker running low on a resource (memory or disk). For
* example, when a RabbitMQ node detects that it is low on RAM, it sends a
* notification to all connected publishing clients supporting this feature.
* If before the connections are unblocked the node also starts running low on
* disk space, another notification will not be sent.
*
* @param connection The connection that was blocked
* @param reason Why was the connection blocked
*/
virtual void onBlocked(Connection *connection, const char *reason)
{
// make sure compilers dont complain about unused parameters
(void) connection;
}

/**
* Method that is called when the AMQP connection is no longer blocked.
*
* This method is called when all resource alarms have cleared and the
* connection is fully unblocked.
*
* @param connection The connection that is no longer blocked
*/
virtual void onUnblocked(Connection *connection)
{
// make sure compilers dont complain about unused parameters
(void) connection;
}
};

/**
Expand Down
21 changes: 20 additions & 1 deletion include/amqpcpp/connectionimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class ConnectionImpl : public Watchable
state_handshake, // busy with the handshake to open the connection
state_connected, // connection is set up and ready for communication
state_closing, // connection is busy closing (we have sent the close frame)
state_closed // connection is closed
state_closed, // connection is closed
} _state = state_protocol;

/**
Expand Down Expand Up @@ -428,6 +428,25 @@ class ConnectionImpl : public Watchable
_handler->onClosed(_parent);
}

/**
* Report that the connection is blocked
* @param reason
*/
void reportBlocked(const char *reason)
{
// inform the handler
_handler->onBlocked(_parent, reason);
}

/**
* Report that the connection is unblocked
*/
void reportUnblocked()
{
// inform the handler
_handler->onUnblocked(_parent);
}

/**
* Retrieve the amount of channels this connection has
* @return std::size_t
Expand Down
23 changes: 22 additions & 1 deletion include/amqpcpp/linux_tcp/tcpconnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,28 @@ class TcpConnection :
* @param connection The connection that was closed and that is now unusable
*/
virtual void onClosed(Connection *connection) override;


/**
* Method that is called when the AMQP connection was blocked.
* @param connection The connection that was blocked
* @param reason Why was the connection blocked
*/
virtual void onBlocked(Connection *connection, const char *reason) override
{
// pass to user space
if (_handler) _handler->onBlocked(this, reason);
}

/**
* Method that is called when the AMQP connection is no longer blocked.
* @param connection The connection that is no longer blocked
*/
virtual void onUnblocked(Connection *connection)
{
// pass to user space
if (_handler) _handler->onUnblocked(this);
}

/**
* Method that is called when the tcp connection has been established
* @param state
Expand Down
35 changes: 34 additions & 1 deletion include/amqpcpp/linux_tcp/tcphandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,40 @@ class TcpHandler
// make sure compilers dont complain about unused parameters
(void) connection;
}


/**
* Method that is called when the AMQP connection was blocked.
*
* This method is called, when the server connection gets blocked for the first
* time due to the broker running low on a resource (memory or disk). For
* example, when a RabbitMQ node detects that it is low on RAM, it sends a
* notification to all connected publishing clients supporting this feature.
* If before the connections are unblocked the node also starts running low on
* disk space, another notification will not be sent.
*
* @param connection The connection that was blocked
* @param reason Why was the connection blocked
*/
virtual void onBlocked(TcpConnection *connection, const char *reason)
{
// make sure compilers dont complain about unused parameters
(void) connection;
}

/**
* Method that is called when the AMQP connection is no longer blocked.
*
* This method is called when all resource alarms have cleared and the
* connection is fully unblocked.
*
* @param connection The connection that is no longer blocked
*/
virtual void onUnblocked(TcpConnection *connection)
{
// make sure compilers dont complain about unused parameters
(void) connection;
}

/**
* Method that is called when the TCP connection is lost or closed. This
* is always called if you have also received a call to onConnected().
Expand Down
108 changes: 108 additions & 0 deletions src/connectionblockframe.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* Class describing a connection blocked frame
*
* This frame is sent by the server to the client, when their connection gets
* blocked for the first time due to the broker running low on a resource
* (memory or disk). For example, when a RabbitMQ node detects that it is low
* on RAM, it sends a notification to all connected publishing clients
* supporting this feature. If before the connections are unblocked the node
* also starts running low on disk space, another notification will not be sent.
*
* @copyright 2023 Copernica BV
*/

/**
* Set up namespace
*/
namespace AMQP {

/**
* Class implementation
*/
class ConnectionBlockFrame : public ConnectionFrame
{
private:
/**
* The reason for blocking
* @var ShortString
*/
ShortString _reason;

protected:
/**
* Encode a frame on a string buffer
*
* @param buffer buffer to write frame to
*/
virtual void fill(OutBuffer& buffer) const override
{
// call base
ConnectionFrame::fill(buffer);

// encode the field
_reason.fill(buffer);
}

public:
/**
* Construct a connection blocked frame from a received frame
*
* @param frame received frame
*/
ConnectionBlockFrame(ReceivedFrame &frame) :
ConnectionFrame(frame),
_reason(frame)
{}

/**
* Construct a connection blocked frame
*
* @param reason the reason for blocking
*/
ConnectionBlockFrame(uint16_t code, std::string reason) :
ConnectionFrame((uint32_t)(reason.length() + 1)), // 1 for extra string byte
_reason(std::move(reason))
{}

/**
* Destructor
*/
virtual ~ConnectionBlockFrame() {}

/**
* Method id
* @return uint16_t
*/
virtual uint16_t methodID() const override
{
return 60;
}

/**
* Get the reason for blocking
* @return string
*/
const std::string& reason() const
{
return _reason;
}

/**
* Process the frame
* @param connection
*/
virtual bool process(ConnectionImpl *connection) override
{
// report that it is blocked
connection->reportBlocked(this->reason().c_str());

// done
return true;
}
};

/**
* end namespace
*/
}

3 changes: 3 additions & 0 deletions src/connectionstartframe.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ class ConnectionStartFrame : public ConnectionFrame
// queue lives dies, we want to receive a notification that the consumer is no longer alive)
capabilities["consumer_cancel_notify"] = true;

// when the rabbitmq server reaches its max capacity, it can send a notification to us, we want them
capabilities["connection.blocked"] = true;

// fill the peer properties
if (!properties.contains("version")) properties["version"] = "AMQP-CPP " VERSION_NAME;
if (!properties.contains("copyright")) properties["copyright"] = "Copernica AMQP-CPP library :: Copyright 2015-2023 Copernica BV";
Expand Down
81 changes: 81 additions & 0 deletions src/connectionunblockframe.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Class describing a connection unblocked frame
*
* This frame is sent by the server to the client, when all resource alarms
* have cleared and the connection is fully unblocked.
*
* @copyright 2023 Copernica BV
*/

/**
* Set up namespace
*/
namespace AMQP {

/**
* Class implementation
*/
class ConnectionUnblockFrame : public ConnectionFrame
{
protected:
/**
* Encode a frame on a string buffer
*
* @param buffer buffer to write frame to
*/
virtual void fill(OutBuffer& buffer) const override
{
// call base
ConnectionFrame::fill(buffer);
}

public:
/**
* Construct a connection unblocked frame from a received frame
*
* @param frame received frame
*/
ConnectionUnblockFrame(ReceivedFrame &frame) :
ConnectionFrame(frame)
{}

/**
* Construct a connection unblocked frame
*/
ConnectionUnblockFrame(uint16_t code, std::string reason) :
ConnectionFrame(0)
{}

/**
* Destructor
*/
virtual ~ConnectionUnblockFrame() {}

/**
* Method id
* @return uint16_t
*/
virtual uint16_t methodID() const override
{
return 61;
}

/**
* Process the frame
* @param connection
*/
virtual bool process(ConnectionImpl *connection) override
{
// report that it is no longer blocked
connection->reportUnblocked();

// done
return true;
}
};

/**
* end namespace
*/
}

4 changes: 4 additions & 0 deletions src/receivedframe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include "connectiontuneframe.h"
#include "connectioncloseokframe.h"
#include "connectioncloseframe.h"
#include "connectionblockframe.h"
#include "connectionunblockframe.h"
#include "channelopenframe.h"
#include "channelopenokframe.h"
#include "channelflowframe.h"
Expand Down Expand Up @@ -195,6 +197,8 @@ bool ReceivedFrame::processConnectionFrame(ConnectionImpl *connection)
case 41: return ConnectionOpenOKFrame(*this).process(connection);
case 50: return ConnectionCloseFrame(*this).process(connection);
case 51: return ConnectionCloseOKFrame(*this).process(connection);
case 60: return ConnectionBlockFrame(*this).process(connection);
case 61: return ConnectionUnblockFrame(*this).process(connection);
}

// this is a problem
Expand Down

0 comments on commit 19b7136

Please sign in to comment.