Skip to content

Commit

Permalink
updated C++ and python wrappers
Browse files Browse the repository at this point in the history
  • Loading branch information
mdorier committed Jun 7, 2024
1 parent a2f5f3e commit 5a65e03
Show file tree
Hide file tree
Showing 12 changed files with 207 additions and 473 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ endif ()
pkg_check_modules (margo REQUIRED IMPORTED_TARGET margo)
# search for json-c
pkg_check_modules (json-c REQUIRED IMPORTED_TARGET json-c)
# search for thallium
find_package (thallium REQUIRED)

if (ENABLE_MPI)
find_package (MPI REQUIRED)
Expand Down
15 changes: 14 additions & 1 deletion include/flock/cxx/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#ifndef __FLOCK_CLIENT_HPP
#define __FLOCK_CLIENT_HPP

#include <thallium.hpp>
#include <flock/flock-client.h>
#include <flock/flock-group.h>
#include <flock/cxx/exception.hpp>
Expand All @@ -19,11 +20,18 @@ class Client {

Client() = default;

Client(margo_instance_id mid, ABT_pool pool = ABT_POOL_NULL) {
Client(margo_instance_id mid, ABT_pool pool = ABT_POOL_NULL)
: m_engine{mid} {
auto err = flock_client_init(mid, pool, &m_client);
FLOCK_CONVERT_AND_THROW(err);
}

Client(const thallium::engine& engine, thallium::pool pool = thallium::pool())
: m_engine{engine} {
auto err = flock_client_init(engine.get_margo_instance(), pool.native_handle(), &m_client);
FLOCK_CONVERT_AND_THROW(err);
}

~Client() {
if(m_client != FLOCK_CLIENT_NULL) {
flock_client_finalize(m_client);
Expand Down Expand Up @@ -68,8 +76,13 @@ class Client {
return m_client;
}

auto engine() const {
return m_engine;
}

private:

thallium::engine m_engine;
flock_client_t m_client = FLOCK_CLIENT_NULL;
};

Expand Down
160 changes: 100 additions & 60 deletions include/flock/cxx/group-view.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <flock/flock-group.h>
#include <flock/cxx/exception.hpp>
#include <stdexcept>
#include <memory>
#include <map>
#include <vector>
Expand All @@ -27,9 +28,98 @@ class GroupView {
public:

struct Member {
uint64_t rank;
uint16_t provider_id;
std::string address;
uint16_t provider_id;
};

struct Metadata {
std::string key;
std::string value;
};

struct MembersProxy {

private:

friend class GroupView;

GroupView& m_owner;

MembersProxy(GroupView& gv)
: m_owner(gv) {}

public:

MembersProxy(MembersProxy&&) = default;

void add(const char* address, uint16_t provider_id) {
flock_group_view_add_member(&m_owner.m_view, address, provider_id);
}

void remove(size_t index) {
auto member = flock_group_view_member_at(&m_owner.m_view, index);
if(!flock_group_view_remove_member(&m_owner.m_view, member))
throw Exception{FLOCK_ERR_NO_MEMBER};
}

void remove(const char* address, uint16_t provider_id) {
auto member = flock_group_view_find_member(&m_owner.m_view, address, provider_id);
if(!flock_group_view_remove_member(&m_owner.m_view, member))
throw Exception{FLOCK_ERR_NO_MEMBER};
}

bool exists(const char* address, uint16_t provider_id) const {
return static_cast<bool>(flock_group_view_find_member(
const_cast<flock_group_view_t*>(&m_owner.m_view), address, provider_id));
}

size_t count() const {
return flock_group_view_member_count(&m_owner.m_view);
}

Member operator[](size_t i) const {
if(i >= count()) throw std::out_of_range{"Invalid member index"};
auto member = flock_group_view_member_at(&m_owner.m_view, i);
return Member{member->address, member->provider_id};
}
};

struct MetadataProxy {

private:

friend class GroupView;

GroupView& m_owner;

MetadataProxy(GroupView& gv)
: m_owner(gv) {}

public:

MetadataProxy(MetadataProxy&&) = default;

void add(const char* key, const char* value) {
flock_group_view_add_metadata(&m_owner.m_view, key, value);
}

void remove(const char* key) {
flock_group_view_remove_metadata(&m_owner.m_view, key);
}

size_t count() const {
return flock_group_view_metadata_count(&m_owner.m_view);
}

Metadata operator[](size_t i) const {
if(i >= count()) throw std::out_of_range{"Invalid metadata index"};
auto metadata = flock_group_view_metadata_at(&m_owner.m_view, i);
return Metadata{metadata->key, metadata->value};
}

const char* operator[](const char* key) const {
return flock_group_view_find_metadata(&m_owner.m_view, key);
}
};

GroupView() = default;
Expand All @@ -39,7 +129,7 @@ class GroupView {
}

~GroupView() {
flock_group_view_clear(&m_view);
clear();
}

GroupView(GroupView&& other) {
Expand All @@ -48,7 +138,7 @@ class GroupView {

GroupView& operator=(GroupView&& other) {
if(this == &other) return *this;
flock_group_view_clear(&m_view);
clear();
FLOCK_GROUP_VIEW_MOVE(&other.m_view, &m_view);
return *this;
}
Expand All @@ -69,68 +159,18 @@ class GroupView {
flock_group_view_clear(&m_view);
}

void addMember(uint64_t rank, const char* address, uint16_t provider_id) {
auto member = flock_group_view_add_member(&m_view, rank, provider_id, address);
if(!member) throw Exception{FLOCK_ERR_RANK_USED};
}

void removeMember(uint64_t rank) {
if(!flock_group_view_remove_member(&m_view, rank))
throw Exception{FLOCK_ERR_NO_MEMBER};
auto members() {
return MembersProxy{*this};
}

Member findMember(uint64_t rank) const {
auto member = flock_group_view_find_member(&m_view, rank);
if(!member) throw Exception{FLOCK_ERR_NO_MEMBER};
return Member{member->rank, member->provider_id, member->address};
}

std::vector<Member> members() const {
std::vector<Member> result;
result.reserve(m_view.members.size);
for(size_t i = 0; i < m_view.members.size; ++i) {
result.push_back(Member{
m_view.members.data[i].rank,
m_view.members.data[i].provider_id,
m_view.members.data[i].address
});
}
return result;
}

size_t maxNumMembers() const {
return m_view.members.data[m_view.members.size-1].rank;
}

size_t numLiveMembers() const {
return m_view.members.size;
}

void setMetadata(const char* key, const char* value) {
flock_group_view_add_metadata(&m_view, key, value);
}

void removeMetadata(const char* key) {
if(!flock_group_view_remove_metadata(&m_view, key))
throw Exception{FLOCK_ERR_NO_METADATA};
}

std::map<std::string, std::string> metadata() const {
std::map<std::string, std::string> result;
for(size_t i = 0; i < m_view.metadata.size; ++i) {
result.insert(
std::make_pair<const std::string, std::string>(
m_view.metadata.data[i].key,
m_view.metadata.data[i].value));
}
return result;
auto metadata() {
return MetadataProxy{*this};
}

std::string toString(margo_instance_id mid, uint64_t credentials = 0) const {
operator std::string() const {
std::string result;
flock_return_t ret = flock_group_view_serialize(
mid, credentials, &m_view,
[](void* ctx, const char* content, size_t size) {
&m_view, [](void* ctx, const char* content, size_t size) {
auto str = static_cast<decltype(&result)>(ctx);
str->assign(content, size);
}, static_cast<void*>(&result));
Expand Down
56 changes: 0 additions & 56 deletions include/flock/cxx/group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,69 +111,13 @@ class GroupHandle {
return GroupHandle{gh};
}

operator std::string() const {
std::string result;
auto err = flock_group_serialize(m_gh,
[](void* uargs, const char* content, size_t size) -> void {
auto s = static_cast<std::string*>(uargs);
*s = std::string{content, size};
}, &result);
FLOCK_CONVERT_AND_THROW(err);
return result;
}

template<typename Function>
void serialize(const Function& function) const {
struct {
const Function& cb;
std::exception_ptr ex;
} context{function, nullptr};
auto err = flock_group_serialize(m_gh,
[](void* uargs, const char* content, size_t size) -> void {
auto context_ptr = static_cast<decltype(context)*>(uargs);
try {
(context_ptr->cb)(std::string_view{content, size});
} catch(...) {
context_ptr->ex = std::current_exception();
}
}, &context);
if(context.ex) std::rethrow_exception(context.ex);
FLOCK_CONVERT_AND_THROW(err);
}

void serializeToFile(const char* filename) const {
auto err = flock_group_serialize_to_file(m_gh, filename);
FLOCK_CONVERT_AND_THROW(err);
}

uint64_t digest() const {
uint64_t d;
auto err = flock_group_digest(m_gh, &d);
FLOCK_CONVERT_AND_THROW(err);
return d;
}

GroupView view() const {
GroupView v;
auto err = flock_group_get_view(m_gh, &v.m_view);
FLOCK_CONVERT_AND_THROW(err);
return v;
}

size_t maxNumMembers() const {
size_t count;
auto err = flock_group_size(m_gh, &count);
FLOCK_CONVERT_AND_THROW(err);
return count;
}

size_t numLiveMembers() const {
size_t count;
auto err = flock_group_live_member_count(m_gh, &count);
FLOCK_CONVERT_AND_THROW(err);
return count;
}

void update() const {
auto err = flock_group_update_view(m_gh, NULL);
FLOCK_CONVERT_AND_THROW(err);
Expand Down
22 changes: 16 additions & 6 deletions include/flock/cxx/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
#ifndef __FLOCK_CLIENT_HPP
#define __FLOCK_CLIENT_HPP

#include <thallium.hpp>
#include <flock/flock-server.h>
#include <flock/cxx/exception.hpp>
#include <flock/cxx/group-view.hpp>
#include <string>

namespace flock {

Expand All @@ -23,7 +25,6 @@ class Observer {

virtual void onMembershipUpdate(
flock_update_t update,
uint64_t rank,
const char* address,
uint16_t provider_id) = 0;

Expand All @@ -43,12 +44,10 @@ class Provider {
uint16_t provider_id,
const char* config,
GroupView& initial_view,
ABT_pool pool = ABT_POOL_NULL,
uint64_t credentials = 0) {
ABT_pool pool = ABT_POOL_NULL) {
m_mid = mid;
flock_provider_args args;
args.backend = nullptr;
args.credentials = credentials;
args.initial_view = &initial_view.m_view;
args.pool = pool;
auto err = flock_provider_register(mid, provider_id, config, &args, &m_provider);
Expand All @@ -60,6 +59,17 @@ class Provider {
}, this);
}

Provider(const thallium::engine& engine,
uint16_t provider_id,
const char* config,
GroupView& initial_view,
const thallium::pool& pool = thallium::pool())
: Provider(engine.get_margo_instance(),
provider_id,
config,
initial_view,
pool.native_handle()) {}

~Provider() {
if(m_provider != FLOCK_PROVIDER_NULL) {
margo_provider_pop_finalize_callback(m_mid, this);
Expand Down Expand Up @@ -99,8 +109,8 @@ class Provider {

void addObserver(Observer* observer) {
auto membership_update_fn =
[](void* ctx, flock_update_t update, size_t rank, const char* address, uint16_t provider_id) {
static_cast<Observer*>(ctx)->onMembershipUpdate(update, rank, address, provider_id);
[](void* ctx, flock_update_t update, const char* address, uint16_t provider_id) {
static_cast<Observer*>(ctx)->onMembershipUpdate(update, address, provider_id);
};
auto metadata_update_fn =
[](void* ctx, const char* key, const char* value) {
Expand Down
Loading

0 comments on commit 5a65e03

Please sign in to comment.