Skip to content
Takatoshi Kondo edited this page May 23, 2019 · 7 revisions

Persistence

mqtt_cpp provides persistence functionality. This means even if the client process is killed, continue the session after re-run the process.

This example uses leveldb.

https://github.com/google/leveldb

Here is the minimal leveldb wrapper.

// leveldb wrapper

#include <string>
#include <memory>
#include <leveldb/db.h>


class ldb {
public:
    template <typename... Args>
    ldb(Args&&... args) {
        auto status = leveldb::DB::Open(std::forward<Args>(args)..., &db_);
        if (!status.ok()) {
            throw std::runtime_error(status.ToString());
        }
    }

    ~ldb() {
        delete db_;
    }

    template <typename... Args>
    leveldb::Status put(leveldb::WriteOptions const& opt, Args&&... args) {
        return db_->Put(opt, std::forward<Args>(args)...);
    }

    template <typename... Args>
    leveldb::Status put(Args&&... args) {
        return db_->Put(leveldb::WriteOptions(), std::forward<Args>(args)...);
    }

    template <typename... Args>
    leveldb::Status put_sync(Args&&... args) {
        leveldb::WriteOptions wo;
        wo.sync = true;
        return db_->Put(wo, std::forward<Args>(args)...);
    }

    template <typename... Args>
    std::pair<leveldb::Status, std::string> get(leveldb::ReadOptions const& opt, Args&&... args) {
        std::string s;
        leveldb::Status ret = db_->Get(leveldb::ReadOptions(), std::forward<Args>(args)..., &s);
        return { ret, std::move(s) };
    }

    template <typename... Args>
    std::pair<leveldb::Status, std::string> get(Args&&... args) {
        std::string s;
        leveldb::Status ret = db_->Get(leveldb::ReadOptions(), std::forward<Args>(args)..., &s);
        return { ret, std::move(s) };
    }

    template <typename... Args>
    leveldb::Status del(leveldb::WriteOptions const& opt, Args&&... args) {
        return db_->Delete(opt, std::forward<Args>(args)...);
    }

    template <typename... Args>
    leveldb::Status del(Args&&... args) {
        return db_->Delete(leveldb::WriteOptions(), std::forward<Args>(args)...);
    }

    template <typename Func>
    void for_each(Func&& f) {
        std::unique_ptr<leveldb::Iterator> it(db_->NewIterator(leveldb::ReadOptions()));
        if (!it) throw std::bad_alloc();

        for (it->SeekToFirst(); it->Valid(); it->Next()) {
            if (it->status().ok()) {
                std::forward<Func>(f)(it->key().ToString(), it->value().ToString());
            }
        }
    }

private:
    leveldb::DB* db_ = nullptr;
};

Here is persistent client code:

#include <iostream>
#include <mqtt_client_cpp.hpp>

int main() {
    boost::asio::io_context ioc;
    auto c = mqtt::make_sync_client(ioc, "test.mosquitto.org", 1883);

    c->set_client_id("cid1");
    c->set_clean_session(false);

    c->set_connack_handler(
        [&c](bool sp, std::uint8_t connack_return_code) {
            std::cout << "Connack handler called" << std::endl;
            std::cout << "Session Present: " << std::boolalpha << sp << std::endl;
            std::cout << "Connack Return Code: "
                      << mqtt::connect_return_code_to_str(connack_return_code) << std::endl;
            std::cout << "Do you publish new packet? [y/n]" << std::endl;
            char input;
            std::cin >> input;
            if (input == 'y') {
                c->publish_at_least_once("mqtt_cpp_topic1", "test1");
            }
            std::cout << "Do you shutdown the process here? [y/n]" << std::endl;
            std::cin >> input;
            if (input == 'y') {
                std::terminate();
            }
            return true;
        }
    );
    c->set_close_handler(
        [] {
            std::cout << "closed" << std::endl;
        }
    );
    c->set_error_handler(
        []
        (boost::system::error_code const& ec) {
            std::cout << ec.message() << std::endl;
        }
    );
    c->set_puback_handler(
        [&c](std::uint16_t packet_id) {
            std::cout << "puback" << std::endl;
            c->disconnect();
            return true;
        }
    );

    // create leveldb
    leveldb::Options options;
    options.create_if_missing = true;
    ldb db(options, "./testdb");

    // serialize order
    std::uint64_t count = 0;
    c->set_serialize_handlers(
        [&db, &count](std::uint16_t packet_id, char const* data, std::size_t size) {
            std::cout << "add:" << packet_id << std::endl;
            std::string s = std::to_string(count++) + ':';
            s += std::string(data, size);
            db.put_sync(std::to_string(packet_id), s);
        },
        [&db, &count](std::uint16_t packet_id, char const* data, std::size_t size) {
            std::cout << "upd:" << packet_id << std::endl;
            std::string s = std::to_string(count++) + ':';
            s += std::string(data, size);
            db.put_sync(std::to_string(packet_id), s);
        },
        [&db](std::uint16_t packet_id) {
            std::cout << "del:" << packet_id << std::endl;
            db.del(std::to_string(packet_id));
        }
    );

    std::cout << "restore" << std::endl;
    std::vector<std::pair<std::uint16_t, std::string>> buf;

    // leveldb doesn't keep order
    db.for_each(
        [&buf](std::string const& key, std::string&& val) {
            buf.emplace_back(boost::lexical_cast<std::uint16_t>(key), std::move(val));
        }
    );

    // sort by order
    std::sort(
        buf.begin(), buf.end(),
        [&] (auto const& lhs, auto const& rhs) {
            auto lpos = lhs.second.find_first_of(':');
            auto lc = boost::lexical_cast<std::uint64_t>(lhs.second.substr(0, lpos));
            auto rpos = rhs.second.find_first_of(':');
            auto rc = boost::lexical_cast<std::uint64_t>(rhs.second.substr(0, rpos));
            return lc < rc;
        }
    );

    for (auto const& e : buf) {
        // restore payload
        auto pos = e.second.find_first_of(':');
        c->restore_serialized_message(e.first, e.second.begin() + pos + 1, e.second.end());
    }
    c->connect();

    ioc.run();
}

Let's see step by step.

Create leveldb instance.

    // create leveldb
    leveldb::Options options;
    options.create_if_missing = true;
    ldb db(options, "./testdb");

Set serialize handlers using leveldb. The function set_serialize_handlers take 3 parameters. They are callback handlers.

http://redboltz.github.io/contents/mqtt/classmqtt_1_1endpoint.html#a051193db6f0bd16f4ec8c6c05547f33a

The first handler is called just before sending publish packet. You can store the packet to leveldb. The key is packet id. The value is the whole packet (QoS1 and QoS2 only).

The second handler is called just before sending pubrel packet (QoS2 only). You can update the record that has the same packet id to the pubrel packet.

The third handler is called just before sending puback (QoS1) or pubcomp (QoS2). You can delete the record that has the packet_id.

    // serialize order
    std::uint64_t count = 0;
    c->set_serialize_handlers(
        [&db, &count](std::uint16_t packet_id, char const* data, std::size_t size) {
            std::cout << "add:" << packet_id << std::endl;
            std::string s = std::to_string(count++) + ':';
            s += std::string(data, size);
            db.put_sync(std::to_string(packet_id), s);
        },
        [&db, &count](std::uint16_t packet_id, char const* data, std::size_t size) {
            std::cout << "upd:" << packet_id << std::endl;
            std::string s = std::to_string(count++) + ':';
            s += std::string(data, size);
            db.put_sync(std::to_string(packet_id), s);
        },
        [&db](std::uint16_t packet_id) {
            std::cout << "del:" << packet_id << std::endl;
            db.del(std::to_string(packet_id));
        }
    );

Thanks to this handlers, the message is kept in the storage.

Unfortunately, leveldb cannot keep the insertion order, so I use the following hack:

            std::string s = std::to_string(count++) + ':';
            s += std::string(data, size);

count represents insertion and updating order. I added stringized count + ':' as the prefix of the payload. You can use other types such as timestamp.

Here is the code that reading all records from leveldb. The data are stored into buf.

    std::vector<std::pair<std::uint16_t, std::string>> buf;

    // leveldb doesn't keep order
    db.for_each(
        [&buf](std::string const& key, std::string&& val) {
            buf.emplace_back(boost::lexical_cast<std::uint16_t>(key), std::move(val));
        }
    );

Then sort buf by insertion/updating order.

    // sort by order
    std::sort(
        buf.begin(), buf.end(),
        [&] (auto const& lhs, auto const& rhs) {
            auto lpos = lhs.second.find_first_of(':');
            auto lc = boost::lexical_cast<std::uint64_t>(lhs.second.substr(0, lpos));
            auto rpos = rhs.second.find_first_of(':');
            auto rc = boost::lexical_cast<std::uint64_t>(rhs.second.substr(0, rpos));
            return lc < rc;
        }
    );

Finally, call restore_serialized_message(). The order prefix is removed.

http://redboltz.github.io/contents/mqtt/classmqtt_1_1endpoint.html#a27bcb5ad4b1292e52318ebea9ebca95f

    for (auto const& e : buf) {
        // restore payload
        auto pos = e.second.find_first_of(':');
        c->restore_serialized_message(e.first, e.second.begin() + pos + 1, e.second.end());
    }

Here is the whole code:

// leveldb wrapper

#include <string>
#include <memory>
#include <leveldb/db.h>


class ldb {
public:
    template <typename... Args>
    ldb(Args&&... args) {
        auto status = leveldb::DB::Open(std::forward<Args>(args)..., &db_);
        if (!status.ok()) {
            throw std::runtime_error(status.ToString());
        }
    }

    ~ldb() {
        delete db_;
    }

    template <typename... Args>
    leveldb::Status put(leveldb::WriteOptions const& opt, Args&&... args) {
        return db_->Put(opt, std::forward<Args>(args)...);
    }

    template <typename... Args>
    leveldb::Status put(Args&&... args) {
        return db_->Put(leveldb::WriteOptions(), std::forward<Args>(args)...);
    }

    template <typename... Args>
    leveldb::Status put_sync(Args&&... args) {
        leveldb::WriteOptions wo;
        wo.sync = true;
        return db_->Put(wo, std::forward<Args>(args)...);
    }

    template <typename... Args>
    std::pair<leveldb::Status, std::string> get(leveldb::ReadOptions const& opt, Args&&... args) {
        std::string s;
        leveldb::Status ret = db_->Get(leveldb::ReadOptions(), std::forward<Args>(args)..., &s);
        return { ret, std::move(s) };
    }

    template <typename... Args>
    std::pair<leveldb::Status, std::string> get(Args&&... args) {
        std::string s;
        leveldb::Status ret = db_->Get(leveldb::ReadOptions(), std::forward<Args>(args)..., &s);
        return { ret, std::move(s) };
    }

    template <typename... Args>
    leveldb::Status del(leveldb::WriteOptions const& opt, Args&&... args) {
        return db_->Delete(opt, std::forward<Args>(args)...);
    }

    template <typename... Args>
    leveldb::Status del(Args&&... args) {
        return db_->Delete(leveldb::WriteOptions(), std::forward<Args>(args)...);
    }

    template <typename Func>
    void for_each(Func&& f) {
        std::unique_ptr<leveldb::Iterator> it(db_->NewIterator(leveldb::ReadOptions()));
        if (!it) throw std::bad_alloc();

        for (it->SeekToFirst(); it->Valid(); it->Next()) {
            if (it->status().ok()) {
                std::forward<Func>(f)(it->key().ToString(), it->value().ToString());
            }
        }
    }

private:
    leveldb::DB* db_ = nullptr;
};


#include <iostream>
#include <mqtt_client_cpp.hpp>

int main() {
    boost::asio::io_context ioc;
    auto c = mqtt::make_client(ioc, "test.mosquitto.org", 1883);

    c->set_client_id("cid1");
    c->set_clean_session(false);

    c->set_connack_handler(
        [&c](bool sp, std::uint8_t connack_return_code) {
            std::cout << "Connack handler called" << std::endl;
            std::cout << "Session Present: " << std::boolalpha << sp << std::endl;
            std::cout << "Connack Return Code: "
                      << mqtt::connect_return_code_to_str(connack_return_code) << std::endl;
            std::cout << "Do you publish new packet? [y/n]" << std::endl;
            char input;
            std::cin >> input;
            if (input == 'y') {
                c->publish_at_least_once("mqtt_cpp_topic1", "test1");
            }
            std::cout << "Do you shutdown the process here? [y/n]" << std::endl;
            std::cin >> input;
            if (input == 'y') {
                std::terminate();
            }
            return true;
        }
    );
    c->set_close_handler(
        [] {
            std::cout << "closed" << std::endl;
        }
    );
    c->set_error_handler(
        []
        (boost::system::error_code const& ec) {
            std::cout << ec.message() << std::endl;
        }
    );
    c->set_puback_handler(
        [&c](std::uint16_t packet_id) {
            std::cout << "puback" << std::endl;
            c->disconnect();
            return true;
        }
    );

    // create leveldb
    leveldb::Options options;
    options.create_if_missing = true;
    ldb db(options, "./testdb");

    // serialize order
    std::uint64_t count = 0;
    c->set_serialize_handlers(
        [&db, &count](std::uint16_t packet_id, char const* data, std::size_t size) {
            std::cout << "add:" << packet_id << std::endl;
            std::string s = std::to_string(count++) + ':';
            s += std::string(data, size);
            db.put_sync(std::to_string(packet_id), s);
        },
        [&db, &count](std::uint16_t packet_id, char const* data, std::size_t size) {
            std::cout << "upd:" << packet_id << std::endl;
            std::string s = std::to_string(count++) + ':';
            s += std::string(data, size);
            db.put_sync(std::to_string(packet_id), s);
        },
        [&db](std::uint16_t packet_id) {
            std::cout << "del:" << packet_id << std::endl;
            db.del(std::to_string(packet_id));
        }
    );

    std::cout << "restore" << std::endl;
    std::vector<std::pair<std::uint16_t, std::string>> buf;

    // leveldb doesn't keep order
    db.for_each(
        [&buf](std::string const& key, std::string&& val) {
            buf.emplace_back(boost::lexical_cast<std::uint16_t>(key), std::move(val));
        }
    );

    // sort by order
    std::sort(
        buf.begin(), buf.end(),
        [&] (auto const& lhs, auto const& rhs) {
            auto lpos = lhs.second.find_first_of(':');
            auto lc = boost::lexical_cast<std::uint64_t>(lhs.second.substr(0, lpos));
            auto rpos = rhs.second.find_first_of(':');
            auto rc = boost::lexical_cast<std::uint64_t>(rhs.second.substr(0, rpos));
            return lc < rc;
        }
    );

    for (auto const& e : buf) {
        // restore payload
        auto pos = e.second.find_first_of(':');
        c->restore_serialized_message(e.first, e.second.begin() + pos + 1, e.second.end());
    }
    c->connect();

    ioc.run();
}

When you compile the code, you need to add -lleveldb option.

clang++ -std=c++14 -D MQTT_NO_TLS -I path_to_mqtt_cpp persistent.cpp -lboost_system -lpthread -lleveldb

Test the program.

First, run Simple Subscriber.

Then you got the following output:

Connack handler called
Session Present: false
Connack Return Code: accepted
suback received. packet_id: 2
subscribe success: exactly_once

Run the program.

Then you got the following output:

restore
Connack handler called
Session Present: true
Connack Return Code: accepted
Do you publish new packet? [y/n]

Press y and enter key to publish.

Simple Subscriber output:

publish received. dup: false pos: at_least_once retain: false
packet_id: 1
topic_name: mqtt_cpp_topic1
contents: test1

The program putput:

y
add:1
Do you shutdown the process here? [y/n]

Press y and enter key. to shutdown.

Then the program is terminated

y
terminate called without an active exception
[1]    5371 abort (core dumped)  ./a.out

Then run the program again.

Simple Subscriber output:

publish received. dup: false pos: at_least_once retain: false
packet_id: 2
topic_name: mqtt_cpp_topic1
contents: test1

Enter n twice to the program.

restore
Connack handler called
Session Present: true
Connack Return Code: accepted
Do you publish new packet? [y/n]
n
Do you shutdown the process here? [y/n]
n
del:1
puback
closed