Asynchronous I/O communication framework during multi threads and section sequence manager.
Communicate threads N-to-N with patterns like request-reply and notify.
$ git clone https://github.com/ysan/thread_manager.git
$ cd thread_manager
$ make
$ sudo make install INSTALLDIR=/usr/local/
$ sudo ldconfig
Installation files:
/usr/local/
├── include
│ └── threadmgr
│ ├── threadmgr_if.h
│ ├── threadmgr_util.h
│ ├── ThreadMgrpp.h
│ ├── ThreadMgr.h
│ ├── ThreadMgrBase.h
│ ├── ThreadMgrExternalIf.h
│ └── ThreadMgrIf.h
└── lib
└── libthreadmgr.so
└── libthreadmgrpp.so
Uninstall:
$ sudo make clean INSTALLDIR=/usr/local/
Here is an excerpt of samples
.
reqrep sequence diagram
also one can look at the example implementations in the testpp
folder to get an idea of how thread manager is integrated into an application.
#include <threadmgr/ThreadMgrpp.h>
Register your instances with CThreadMgr::setup
.
Also, your class should extend CThreadMgrBase
. It has one posix thread context.
enum {
_MODULE_A = 0,
_MODULE_B,
_MODULE_C,
.
.
};
int main (void) {
threadmgr::CThreadMgr *p_mgr = threadmgr::CThreadMgr::get_instance();
// create your class instances (maximum of queue buffer size: 100)
auto module_a = std::make_shared <module_a>(std::move("module_a"), 10); // (name, queue buffer size)
auto module_b = std::make_shared <module_b>(std::move("module_b"), 10);
auto module_c = std::make_shared <module_c>(std::move("module_c"), 10);
.
.
std::vector<std::shared_ptr<threadmgr::CThreadMgrBase>> threads;
threads.push_back(module_a); // enum _MODULE_A
threads.push_back(module_b); // enum _MODULE_B
threads.push_back(module_c); // enum _MODULE_C
.
.
// register your instances (maximum of registration: 32)
p_mgr->setup(threads);
// ThreadManager-framework was ready.
// thread return wait, as daemon process.
p_mgr->wait();
p_mgr->teardown();
return 0;
}
Your class should extend CThreadMgrBase
. It has one posix thread context.
Then register your sequences with CThreadMgrBase::set_sequences
in the constructor.
enum {
_SEQ_1 = 0,
_SEQ_2,
_SEQ_3,
.
.
};
class module_a : public threadmgr::CThreadMgrBase
{
public:
module_a (std::string name, uint8_t que_max) : CThreadMgrBase (name.c_str(), que_max) {
std::vector<threadmgr::sequence_t> sequences;
sequences.push_back ({[&](threadmgr::CThreadMgrIf *p_if){sequence1(p_if);}, std::move("sequence1")}); // enum _SEQ_1
sequences.push_back ({[&](threadmgr::CThreadMgrIf *p_if){sequence2(p_if);}, std::move("sequence2")}); // enum _SEQ_2
sequences.push_back ({[&](threadmgr::CThreadMgrIf *p_if){sequence2(p_if);}, std::move("sequence3")}); // enum _SEQ_3
.
.
// register your sequences (maximum of registration: 64)
set_sequences (sequences);
}
virtual ~module_a (void) {
reset_sequences ();
}
private:
// implements your sequences (member functions)
void sequence1 (threadmgr::CThreadMgrIf *p_if) {
.
.
}
void sequence2 (threadmgr::CThreadMgrIf *p_if) {
.
.
}
void sequence3 (threadmgr::CThreadMgrIf *p_if) {
.
.
}
.
.
};
class module_b : public threadmgr::CThreadMgrBase
{
.
.
class module_c : public threadmgr::CThreadMgrBase
{
.
.
// 1-shot sequence (simple-echo)
void module_a::sequence1 (threadmgr::CThreadMgrIf *p_if) {
// get request message.
char *msg = reinterpret_cast<char*>(p_if->get_source().get_message().data());
size_t msglen = p_if->get_source().get_message().length();
// send reply (maximum of message size: 256bytes)
p_if->reply (threadmgr::result::success, reinterpret_cast<uint8_t*>(msg), msglen);
// at the end of sequence,
// set threadmgr::section_id::init, threadmgr::action::done with set_section_id().
p_if->set_section_id (threadmgr::section_id::init, threadmgr::action::done);
}
// separated section sequence
void module_b::sequence2 (threadmgr::CThreadMgrIf *p_if) {
enum {
SECTID_REQ_MODB_SEQ2 = threadmgr::section_id::init,
SECTID_WAIT_MODB_SEQ2,
SECTID_END,
};
threadmgr::action action;
threadmgr::section_id::type section_id = p_if->get_sect_id();
switch (section_id) {
case SECTID_REQ_MODB_SEQ2: {
// request to module_b::sequence2
request_async(_MODULE_B, _SEQ_2);
std::cout << __PRETTY_FUNCTION__ << " request module_b::sequence1" << std::endl;
// set next section_id and action
section_id = SECTID_WAIT_MODB_SEQ2;
// wait for reply
// while waiting this module can execute other sequences.
action = threadmgr::action::wait;
// If you don't want to execute other sequences, call lock() in advance.
//p_if->lock();
}
break;
case SECTID_WAIT_MODB_SEQ2: {
threadmgr::result rslt = p_if->get_source().get_result();
std::cout << __PRETTY_FUNCTION__ << " reply module_b::sequence1 [" << static_cast<int>(rslt) << "]" << std::endl; // "[1]" --> success
// set next section_id and action
section_id = SECTID_END;
action = threadmgr::action::continue_;
// don't forget to unlock() when you call lock()
//p_if->unlock();
}
break;
case SECTID_END:
// send reply
p_if->reply (threadmgr::result::success);
// at the end of sequence,
// set threadmgr::section_id::init, threadmgr::action::done with set_section_id().
section_id = threadmgr::section_id::init;
action = threadmgr::action::done;
break;
default:
break;
}
p_if->set_section_id (section_id, action);
}
Experimental implementation using thread manager.
auto_rec_mini
Here is a typical gcc link command.
Include <threadmgr/threadmgr_if.h>
<threadmgr/threadmgr_util.h>
in your application and link with libthreadmgr
and libpthread
.
$ gcc myapp.c -o myapp -lthreadmgr -lpthread
Include <threadmgr/ThreadMgrpp.h>
in your application and link with libthreadmgr
libthreadmgrpp
and libpthread
.
$ g++ myapp.cpp -o myapp -lthreadmgr -lthreadmgrpp -lpthread -std=c++11
Will work on generic linux destributions. (confirmed worked on Ubuntu, Fedora, Raspberry Pi OS)
MIT