diff --git a/amqpcpp.h b/amqpcpp.h index 3f74d53b..8940e73a 100644 --- a/amqpcpp.h +++ b/amqpcpp.h @@ -37,6 +37,8 @@ #include // amqp types +#include +#include #include #include #include diff --git a/include/buffer.h b/include/buffer.h new file mode 100644 index 00000000..73eaf13e --- /dev/null +++ b/include/buffer.h @@ -0,0 +1,42 @@ +/** + * Buffer.h + * + * Interface that can be implemented by client applications and that + * is parsed to the Connection::parse() method. + * + * Normally, the Connection::parse() method is fed with a byte + * array. However, if you're receiving big frames, it may be inconvenient + * to copy these big frames into continguous byte arrays, and you + * prefer using objects that internally use linked lists or other + * ways to store the bytes. In such sitations, you can implement this + * interface and pass that to the connection. + * + * @author Emiel Bruijntjes + * @copyright 2014 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class Buffer +{ + + +}; + +/** + * End of namespace + */ +} + + diff --git a/include/bytebuffer.h b/include/bytebuffer.h new file mode 100644 index 00000000..7074ee00 --- /dev/null +++ b/include/bytebuffer.h @@ -0,0 +1,57 @@ +/** + * ByteByffer.h + * + * Very simple implementation of the buffer class that simply wraps + * around a buffer of bytes + * + * @author Emiel Bruijntjes + * @copyright 2014 Copernica BV + */ + +/** + * Include guard + */ +#pragma once + +/** + * Open namespace + */ +namespace AMQP { + +/** + * Class definition + */ +class ByteBuffer : public Buffer +{ +private: + /** + * The actual byte buffer + * @var const char * + */ + const char *_data; + + /** + * Size of the buffer + * @var size_t + */ + size_t _size; + +public: + /** + * Constructor + * @param data + * @param size + */ + ByteBuffer(const char *data, size_t size) : _data(data), _size(size) {} + + /** + * Destructor + */ + virtual ~ByteBuffer() {} + +}; + +/** + * End namespace + */ +} diff --git a/include/channel.h b/include/channel.h index e4da8bb7..0473c5ed 100644 --- a/include/channel.h +++ b/include/channel.h @@ -320,6 +320,7 @@ class Channel bool publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope) { return _implementation.publish(exchange, routingKey, envelope); } bool publish(const std::string &exchange, const std::string &routingKey, const std::string &message) { return _implementation.publish(exchange, routingKey, Envelope(message)); } bool publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size) { return _implementation.publish(exchange, routingKey, Envelope(message, size)); } + bool publish(const std::string &exchange, const std::string &routingKey, const char *message) { return _implementation.publish(exchange, routingKey, Envelope(message, strlen(message))); } /** * Set the Quality of Service (QOS) for this channel diff --git a/include/connection.h b/include/connection.h index a67e6ad4..c1fccea8 100644 --- a/include/connection.h +++ b/include/connection.h @@ -77,8 +77,32 @@ class Connection */ size_t parse(const char *buffer, size_t size) { - return _implementation.parse(buffer, size); + //return _implementation.parse(ByteBuffer(buffer, size)); + return _implementation.parse(buffer, size); } + + /** + * Parse data that was recevied from RabbitMQ + * + * Every time that data comes in from RabbitMQ, you should call this method to parse + * the incoming data, and let it handle by the AMQP library. This method returns the number + * of bytes that were processed. + * + * If not all bytes could be processed because it only contained a partial frame, you should + * call this same method later on when more data is available. The AMQP library does not do + * any buffering, so it is up to the caller to ensure that the old data is also passed in that + * later call. + * + * This method accepts a buffer object. This is an interface that is defined by the AMQP + * library, that can be implemented by you to allow faster access to a buffer. + * + * @param buffer buffer to decode + * @return number of bytes that were processed + */ + //size_t parse(const Buffer &buffer) + //{ + // return _implementation.parse(buffer); + //} /** * Close the connection diff --git a/src/basicgetokframe.h b/src/basicgetokframe.h index 6945adba..ebccaccd 100644 --- a/src/basicgetokframe.h +++ b/src/basicgetokframe.h @@ -177,8 +177,8 @@ class BasicGetOKFrame : public BasicFrame // construct the message channel->message(*this); - // we're synchronized - channel->synchronized(); + // notice that the channel is not yet synchronized here, because + // we first have to receive the entire body // done return true; diff --git a/src/channelimpl.cpp b/src/channelimpl.cpp index 1027cd28..2664216c 100644 --- a/src/channelimpl.cpp +++ b/src/channelimpl.cpp @@ -703,6 +703,15 @@ void ChannelImpl::reportMessage() // skip if there is no message if (!_message) return; + // after the report the channel may be destructed, monitor that + Monitor monitor(this); + + // synchronize the channel if this comes from a basic.get frame + if (_message->consumer().empty()) synchronized(); + + // syncing the channel may destruct the channel + if (!monitor.valid()) return; + // look for the consumer auto iter = _consumers.find(_message->consumer()); if (iter == _consumers.end()) return; @@ -710,9 +719,6 @@ void ChannelImpl::reportMessage() // is this a valid callback method if (!iter->second) return; - // after the report the channel may be destructed, monitor that - Monitor monitor(this); - // call the callback _message->report(iter->second);