-
Notifications
You must be signed in to change notification settings - Fork 107
Persistence
mqtt_cpp provides persistence functionality. This means even if the client process is killed, continue the session after re-run the process.
This example uses leveldb.
https://github.com/google/leveldb
Here is the minimal leveldb wrapper.
// leveldb wrapper
#include <string>
#include <memory>
#include <stdexcept>
#include <leveldb/db.h>
class ldb {
public:
template <typename... Args>
ldb(Args&&... args) {
auto status = leveldb::DB::Open(std::forward<Args>(args)..., &db_);
if (!status.ok()) {
throw std::runtime_error(status.ToString());
}
}
~ldb() {
delete db_;
}
template <typename... Args>
leveldb::Status put(leveldb::WriteOptions const& opt, Args&&... args) {
return db_->Put(opt, std::forward<Args>(args)...);
}
template <typename... Args>
leveldb::Status put(Args&&... args) {
return db_->Put(leveldb::WriteOptions(), std::forward<Args>(args)...);
}
template <typename... Args>
leveldb::Status put_sync(Args&&... args) {
leveldb::WriteOptions wo;
wo.sync = true;
return db_->Put(wo, std::forward<Args>(args)...);
}
template <typename... Args>
std::pair<leveldb::Status, std::string> get(leveldb::ReadOptions const& opt, Args&&... args) {
std::string s;
leveldb::Status ret = db_->Get(leveldb::ReadOptions(), std::forward<Args>(args)..., &s);
return { ret, std::move(s) };
}
template <typename... Args>
std::pair<leveldb::Status, std::string> get(Args&&... args) {
std::string s;
leveldb::Status ret = db_->Get(leveldb::ReadOptions(), std::forward<Args>(args)..., &s);
return { ret, std::move(s) };
}
template <typename... Args>
leveldb::Status del(leveldb::WriteOptions const& opt, Args&&... args) {
return db_->Delete(opt, std::forward<Args>(args)...);
}
template <typename... Args>
leveldb::Status del(Args&&... args) {
return db_->Delete(leveldb::WriteOptions(), std::forward<Args>(args)...);
}
template <typename Func>
void for_each(Func&& f) {
std::unique_ptr<leveldb::Iterator> it(db_->NewIterator(leveldb::ReadOptions()));
if (!it) throw std::bad_alloc();
for (it->SeekToFirst(); it->Valid(); it->Next()) {
if (it->status().ok()) {
std::forward<Func>(f)(it->key().ToString(), it->value().ToString());
}
}
}
private:
leveldb::DB* db_ = nullptr;
};
Here is persistent client code:
#include <iostream>
#include <mqtt_client_cpp.hpp>
int main() {
boost::asio::io_context ioc;
auto c = mqtt::make_client(ioc, "test.mosquitto.org", 1883);
c->set_client_id("cid1");
c->set_clean_session(false);
c->set_connack_handler(
[&c](bool sp, mqtt::connect_return_code connack_return_code){
std::cout << "Connack handler called" << std::endl;
std::cout << "Session Present: " << std::boolalpha << sp << std::endl;
std::cout << "Connack Return Code: " << connack_return_code << std::endl;
std::cout << "Do you publish new packet? [y/n]" << std::endl;
char input;
std::cin >> input;
if (input == 'y') {
c->publish("mqtt_cpp_topic1", "test1", mqtt::qos::at_least_once);
}
std::cout << "Do you shutdown the process here? [y/n]" << std::endl;
std::cin >> input;
if (input == 'y') {
std::terminate();
}
return true;
}
);
c->set_close_handler(
[] {
std::cout << "closed" << std::endl;
}
);
c->set_error_handler(
[]
(boost::system::error_code const& ec) {
std::cout << ec.message() << std::endl;
}
);
c->set_puback_handler(
[&c](std::uint16_t packet_id) {
std::cout << "puback" << std::endl;
c->disconnect();
return true;
}
);
// create leveldb
leveldb::Options options;
options.create_if_missing = true;
ldb db(options, "./testdb");
// serialize order
std::uint64_t count = 0;
c->set_serialize_handlers(
[&db, &count](std::uint16_t packet_id, char const* data, std::size_t size) {
std::cout << "add:" << packet_id << std::endl;
std::string s = std::to_string(count++) + ':';
s += std::string(data, size);
db.put_sync(std::to_string(packet_id), s);
},
[&db, &count](std::uint16_t packet_id, char const* data, std::size_t size) {
std::cout << "upd:" << packet_id << std::endl;
std::string s = std::to_string(count++) + ':';
s += std::string(data, size);
db.put_sync(std::to_string(packet_id), s);
},
[&db](std::uint16_t packet_id) {
std::cout << "del:" << packet_id << std::endl;
db.del(std::to_string(packet_id));
}
);
std::cout << "restore" << std::endl;
std::vector<std::pair<std::uint16_t, std::string>> buf;
// leveldb doesn't keep order
db.for_each(
[&buf](std::string const& key, std::string&& val) {
buf.emplace_back(boost::lexical_cast<std::uint16_t>(key), std::move(val));
}
);
// sort by order
std::sort(
buf.begin(), buf.end(),
[&] (auto const& lhs, auto const& rhs) {
auto lpos = lhs.second.find_first_of(':');
auto lc = boost::lexical_cast<std::uint64_t>(lhs.second.substr(0, lpos));
auto rpos = rhs.second.find_first_of(':');
auto rc = boost::lexical_cast<std::uint64_t>(rhs.second.substr(0, rpos));
return lc < rc;
}
);
for (auto const& e : buf) {
// restore payload
auto pos = e.second.find_first_of(':');
c->restore_serialized_message(e.second.begin() + pos + 1, e.second.end());
}
c->connect();
ioc.run();
}
Let's see step by step.
Create leveldb instance.
// create leveldb
leveldb::Options options;
options.create_if_missing = true;
ldb db(options, "./testdb");
Set serialize handlers using leveldb. The function set_serialize_handlers
take 3 parameters. They are callback handlers.
http://redboltz.github.io/contents/mqtt/classmqtt_1_1endpoint.html#a051193db6f0bd16f4ec8c6c05547f33a
The first handler is called just before sending publish packet. You can store the packet to leveldb. The key is packet id. The value is the whole packet (QoS1 and QoS2 only).
The second handler is called just before sending pubrel packet (QoS2 only). You can update the record that has the same packet id to the pubrel packet.
The third handler is called just before sending puback (QoS1) or pubcomp (QoS2). You can delete the record that has the packet_id.
// serialize order
std::uint64_t count = 0;
c->set_serialize_handlers(
[&db, &count](std::uint16_t packet_id, char const* data, std::size_t size) {
std::cout << "add:" << packet_id << std::endl;
std::string s = std::to_string(count++) + ':';
s += std::string(data, size);
db.put_sync(std::to_string(packet_id), s);
},
[&db, &count](std::uint16_t packet_id, char const* data, std::size_t size) {
std::cout << "upd:" << packet_id << std::endl;
std::string s = std::to_string(count++) + ':';
s += std::string(data, size);
db.put_sync(std::to_string(packet_id), s);
},
[&db](std::uint16_t packet_id) {
std::cout << "del:" << packet_id << std::endl;
db.del(std::to_string(packet_id));
}
);
Thanks to this handlers, the message is kept in the storage.
Unfortunately, leveldb cannot keep the insertion order, so I use the following hack:
std::string s = std::to_string(count++) + ':';
s += std::string(data, size);
count
represents insertion and updating order. I added stringized count + ':' as the prefix of the payload.
You can use other types such as timestamp.
Here is the code that reading all records from leveldb. The data are stored into buf
.
std::vector<std::pair<std::uint16_t, std::string>> buf;
// leveldb doesn't keep order
db.for_each(
[&buf](std::string const& key, std::string&& val) {
buf.emplace_back(boost::lexical_cast<std::uint16_t>(key), std::move(val));
}
);
Then sort buf
by insertion/updating order.
// sort by order
std::sort(
buf.begin(), buf.end(),
[&] (auto const& lhs, auto const& rhs) {
auto lpos = lhs.second.find_first_of(':');
auto lc = boost::lexical_cast<std::uint64_t>(lhs.second.substr(0, lpos));
auto rpos = rhs.second.find_first_of(':');
auto rc = boost::lexical_cast<std::uint64_t>(rhs.second.substr(0, rpos));
return lc < rc;
}
);
Finally, call restore_serialized_message()
. The order prefix is removed.
http://redboltz.github.io/contents/mqtt/classmqtt_1_1endpoint.html#a27bcb5ad4b1292e52318ebea9ebca95f
for (auto const& e : buf) {
// restore payload
auto pos = e.second.find_first_of(':');
c->restore_serialized_message(e.second.begin() + pos + 1, e.second.end());
}
Here is the whole code:
// leveldb wrapper
#include <string>
#include <memory>
#include <stdexcept>
#include <leveldb/db.h>
class ldb {
public:
template <typename... Args>
ldb(Args&&... args) {
auto status = leveldb::DB::Open(std::forward<Args>(args)..., &db_);
if (!status.ok()) {
throw std::runtime_error(status.ToString());
}
}
~ldb() {
delete db_;
}
template <typename... Args>
leveldb::Status put(leveldb::WriteOptions const& opt, Args&&... args) {
return db_->Put(opt, std::forward<Args>(args)...);
}
template <typename... Args>
leveldb::Status put(Args&&... args) {
return db_->Put(leveldb::WriteOptions(), std::forward<Args>(args)...);
}
template <typename... Args>
leveldb::Status put_sync(Args&&... args) {
leveldb::WriteOptions wo;
wo.sync = true;
return db_->Put(wo, std::forward<Args>(args)...);
}
template <typename... Args>
std::pair<leveldb::Status, std::string> get(leveldb::ReadOptions const& opt, Args&&... args) {
std::string s;
leveldb::Status ret = db_->Get(leveldb::ReadOptions(), std::forward<Args>(args)..., &s);
return { ret, std::move(s) };
}
template <typename... Args>
std::pair<leveldb::Status, std::string> get(Args&&... args) {
std::string s;
leveldb::Status ret = db_->Get(leveldb::ReadOptions(), std::forward<Args>(args)..., &s);
return { ret, std::move(s) };
}
template <typename... Args>
leveldb::Status del(leveldb::WriteOptions const& opt, Args&&... args) {
return db_->Delete(opt, std::forward<Args>(args)...);
}
template <typename... Args>
leveldb::Status del(Args&&... args) {
return db_->Delete(leveldb::WriteOptions(), std::forward<Args>(args)...);
}
template <typename Func>
void for_each(Func&& f) {
std::unique_ptr<leveldb::Iterator> it(db_->NewIterator(leveldb::ReadOptions()));
if (!it) throw std::bad_alloc();
for (it->SeekToFirst(); it->Valid(); it->Next()) {
if (it->status().ok()) {
std::forward<Func>(f)(it->key().ToString(), it->value().ToString());
}
}
}
private:
leveldb::DB* db_ = nullptr;
};
#include <iostream>
#include <mqtt_client_cpp.hpp>
int main() {
boost::asio::io_context ioc;
auto c = mqtt::make_client(ioc, "test.mosquitto.org", 1883);
c->set_client_id("cid1");
c->set_clean_session(false);
c->set_connack_handler(
[&c](bool sp, mqtt::connect_return_code connack_return_code){
std::cout << "Connack handler called" << std::endl;
std::cout << "Session Present: " << std::boolalpha << sp << std::endl;
std::cout << "Connack Return Code: " << connack_return_code << std::endl;
std::cout << "Do you publish new packet? [y/n]" << std::endl;
char input;
std::cin >> input;
if (input == 'y') {
c->publish("mqtt_cpp_topic1", "test1", mqtt::qos::at_least_once);
}
std::cout << "Do you shutdown the process here? [y/n]" << std::endl;
std::cin >> input;
if (input == 'y') {
std::terminate();
}
return true;
}
);
c->set_close_handler(
[] {
std::cout << "closed" << std::endl;
}
);
c->set_error_handler(
[]
(boost::system::error_code const& ec) {
std::cout << ec.message() << std::endl;
}
);
c->set_puback_handler(
[&c](std::uint16_t packet_id) {
std::cout << "puback" << std::endl;
c->disconnect();
return true;
}
);
// create leveldb
leveldb::Options options;
options.create_if_missing = true;
ldb db(options, "./testdb");
// serialize order
std::uint64_t count = 0;
c->set_serialize_handlers(
[&db, &count](std::uint16_t packet_id, char const* data, std::size_t size) {
std::cout << "add:" << packet_id << std::endl;
std::string s = std::to_string(count++) + ':';
s += std::string(data, size);
db.put_sync(std::to_string(packet_id), s);
},
[&db, &count](std::uint16_t packet_id, char const* data, std::size_t size) {
std::cout << "upd:" << packet_id << std::endl;
std::string s = std::to_string(count++) + ':';
s += std::string(data, size);
db.put_sync(std::to_string(packet_id), s);
},
[&db](std::uint16_t packet_id) {
std::cout << "del:" << packet_id << std::endl;
db.del(std::to_string(packet_id));
}
);
std::cout << "restore" << std::endl;
std::vector<std::pair<std::uint16_t, std::string>> buf;
// leveldb doesn't keep order
db.for_each(
[&buf](std::string const& key, std::string&& val) {
buf.emplace_back(boost::lexical_cast<std::uint16_t>(key), std::move(val));
}
);
// sort by order
std::sort(
buf.begin(), buf.end(),
[&] (auto const& lhs, auto const& rhs) {
auto lpos = lhs.second.find_first_of(':');
auto lc = boost::lexical_cast<std::uint64_t>(lhs.second.substr(0, lpos));
auto rpos = rhs.second.find_first_of(':');
auto rc = boost::lexical_cast<std::uint64_t>(rhs.second.substr(0, rpos));
return lc < rc;
}
);
for (auto const& e : buf) {
// restore payload
auto pos = e.second.find_first_of(':');
c->restore_serialized_message(e.second.begin() + pos + 1, e.second.end());
}
c->connect();
ioc.run();
}
When you compile the code, you need to add -lleveldb
option.
clang++ -std=c++14 -I path_to_mqtt_cpp persistent.cpp -lboost_system -lpthread -lleveldb
Test the program.
First, run Simple Subscriber.
Then you got the following output:
Connack handler called
Session Present: false
Connack Return Code: accepted
suback received. packet_id: 2
subscribe success: exactly_once
Then you got the following output:
restore
Connack handler called
Session Present: true
Connack Return Code: accepted
Do you publish new packet? [y/n]
Enter y
and enter key to publish.
Simple Subscriber output:
publish received. dup: false pos: at_least_once retain: false
packet_id: 1
topic_name: mqtt_cpp_topic1
contents: test1
The program putput:
y
add:1
Do you shutdown the process here? [y/n]
Enter y
and enter key. to shutdown.
Then the program is terminated
y
terminate called without an active exception
[1] 5371 abort (core dumped) ./a.out
Simple Subscriber output:
publish received. dup: false pos: at_least_once retain: false
packet_id: 2
topic_name: mqtt_cpp_topic1
contents: test1
Enter n
twice to the program.
restore
Connack handler called
Session Present: true
Connack Return Code: accepted
Do you publish new packet? [y/n]
n
Do you shutdown the process here? [y/n]
n
del:1
puback
closed
- Requirements
- Config
- Tutorial
- Authentication and Authorization
- Advanced topics
- Examples
- API Reference
- Versioning Policy
- How to contribute