From 99a1ca36969208b765c9bec9557776f748204831 Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Thu, 6 Jun 2024 09:39:09 +0100 Subject: [PATCH 01/10] started python and C++ interface --- CMakeLists.txt | 11 ++ build.sh | 2 +- include/flock/cxx/client.hpp | 78 +++++++++ include/flock/cxx/exception.hpp | 46 ++++++ include/flock/cxx/group-view.hpp | 148 +++++++++++++++++ include/flock/cxx/group.hpp | 198 ++++++++++++++++++++++ include/flock/cxx/server.hpp | 143 ++++++++++++++++ python/CMakeLists.txt | 24 +++ python/src/.py-flock-common.cpp.swp | Bin 0 -> 16384 bytes python/src/py-flock-client.cpp | 153 +++++++++++++++++ python/src/py-flock-common.cpp | 41 +++++ python/src/py-flock-server.cpp | 248 ++++++++++++++++++++++++++++ spack.yaml | 2 + tests/spack.yaml | 2 + 14 files changed, 1095 insertions(+), 1 deletion(-) create mode 100644 include/flock/cxx/client.hpp create mode 100644 include/flock/cxx/exception.hpp create mode 100644 include/flock/cxx/group-view.hpp create mode 100644 include/flock/cxx/group.hpp create mode 100644 include/flock/cxx/server.hpp create mode 100644 python/CMakeLists.txt create mode 100644 python/src/.py-flock-common.cpp.swp create mode 100644 python/src/py-flock-client.cpp create mode 100644 python/src/py-flock-common.cpp create mode 100644 python/src/py-flock-server.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 60bd3fd..071ce5d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,6 +2,8 @@ # See COPYRIGHT in top-level directory. cmake_minimum_required (VERSION 3.8) project (flock C CXX) +set (CMAKE_CXX_STANDARD 17) +set (CMAKE_CXX_STANDARD_REQUIRED ON) enable_testing () add_definitions (-Wextra -Wall -Wpedantic) @@ -13,6 +15,7 @@ option (ENABLE_EXAMPLES "Build examples" OFF) option (ENABLE_BEDROCK "Build bedrock module" OFF) option (ENABLE_COVERAGE "Build with coverage" OFF) option (ENABLE_MPI "Build with MPI support" OFF) +option (ENABLE_PYTHON "Build with Python support" OFF) # library version set here (e.g. for shared libs). set (FLOCK_VERSION_MAJOR 0) @@ -67,6 +70,14 @@ if (ENABLE_MPI) find_package (MPI REQUIRED) endif () +# search for Python +if (ENABLE_PYTHON) + set (Python3_FIND_STRATEGY LOCATION) + find_package (Python3 COMPONENTS Interpreter Development REQUIRED) + find_package (pybind11 REQUIRED) + add_subdirectory (python) +endif () + add_subdirectory (src) if (${ENABLE_TESTS}) enable_testing () diff --git a/build.sh b/build.sh index 92573ba..93061d6 100755 --- a/build.sh +++ b/build.sh @@ -1,4 +1,4 @@ #!/bin/bash SCRIPT_DIR=$(dirname "$0") -cmake .. -DENABLE_TESTS=ON -DENABLE_BEDROCK=ON -DENABLE_EXAMPLES=ON -DENABLE_MPI=ON -DCMAKE_BUILD_TYPE=RelWithDebInfo -DCMAKE_EXPORT_COMPILE_COMMANDS=ON +cmake .. -DENABLE_TESTS=ON -DENABLE_BEDROCK=ON -DENABLE_EXAMPLES=ON -DENABLE_MPI=ON -DENABLE_PYTHON=ON -DCMAKE_BUILD_TYPE=RelWithDebInfo -DCMAKE_EXPORT_COMPILE_COMMANDS=ON make diff --git a/include/flock/cxx/client.hpp b/include/flock/cxx/client.hpp new file mode 100644 index 0000000..c1f0a83 --- /dev/null +++ b/include/flock/cxx/client.hpp @@ -0,0 +1,78 @@ +/* + * (C) 2024 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#ifndef __FLOCK_CLIENT_HPP +#define __FLOCK_CLIENT_HPP + +#include +#include +#include +#include + +namespace flock { + +class Client { + + public: + + Client() = default; + + Client(margo_instance_id mid, ABT_pool pool = ABT_POOL_NULL) { + auto err = flock_client_init(mid, pool, &m_client); + FLOCK_CONVERT_AND_THROW(err); + } + + ~Client() { + if(m_client != FLOCK_CLIENT_NULL) { + flock_client_finalize(m_client); + } + } + + Client(Client&& other) + : m_client(other.m_client) { + other.m_client = FLOCK_CLIENT_NULL; + } + + Client(const Client&) = delete; + + Client& operator=(const Client&) = delete; + + Client& operator=(Client&& other) { + if(this == &other) return *this; + if(m_client != FLOCK_CLIENT_NULL) { + flock_client_finalize(m_client); + } + m_client = other.m_client; + other.m_client = FLOCK_CLIENT_NULL; + return *this; + } + + GroupHandle makeGroupHandle( + hg_addr_t addr, + uint16_t provider_id, + uint32_t mode = 0) const { + flock_group_handle_t gh; + auto err = flock_group_handle_create( + m_client, addr, provider_id, mode, &gh); + FLOCK_CONVERT_AND_THROW(err); + return GroupHandle(gh, false); + } + + auto handle() const { + return m_client; + } + + operator flock_client_t() const { + return m_client; + } + + private: + + flock_client_t m_client = FLOCK_CLIENT_NULL; +}; + +} + +#endif diff --git a/include/flock/cxx/exception.hpp b/include/flock/cxx/exception.hpp new file mode 100644 index 0000000..f80973e --- /dev/null +++ b/include/flock/cxx/exception.hpp @@ -0,0 +1,46 @@ +/* + * (C) 2024 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#ifndef __FLOCK_EXCEPTION_HPP +#define __FLOCK_EXCEPTION_HPP + +#include +#include + +namespace flock { + +class Exception : public std::exception { + + public: + + Exception(flock_return_t code) + : m_code(code) {} + + const char* what() const noexcept override { + #define X(__err__, __msg__) case __err__: return __msg__; + switch(m_code) { + FLOCK_RETURN_VALUES + } + #undef X + return "Unknown error"; + } + + auto code() const { + return m_code; + } + + private: + + flock_return_t m_code; +}; + +#define FLOCK_CONVERT_AND_THROW(__err__) do { \ + if((__err__) != FLOCK_SUCCESS) { \ + throw ::flock::Exception((__err__)); \ + } \ +} while(0) + +} +#endif diff --git a/include/flock/cxx/group-view.hpp b/include/flock/cxx/group-view.hpp new file mode 100644 index 0000000..3fd69fe --- /dev/null +++ b/include/flock/cxx/group-view.hpp @@ -0,0 +1,148 @@ +/* + * (C) 2024 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#ifndef __FLOCK_GROUP_VIEW_HPP +#define __FLOCK_GROUP_VIEW_HPP + +#include +#include +#include +#include +#include +#include +#include + +namespace flock { + +class GroupHandle; +class Provider; + +class GroupView { + + friend class GroupHandle; + friend class Provider; + + public: + + struct Member { + uint64_t rank; + uint16_t provider_id; + std::string address; + }; + + GroupView() = default; + + GroupView(flock_group_view_t view) { + FLOCK_GROUP_VIEW_MOVE(&view, &m_view); + } + + ~GroupView() { + flock_group_view_clear(&m_view); + } + + GroupView(GroupView&& other) { + FLOCK_GROUP_VIEW_MOVE(&other.m_view, &m_view); + } + + GroupView& operator=(GroupView&& other) { + if(this == &other) return *this; + flock_group_view_clear(&m_view); + FLOCK_GROUP_VIEW_MOVE(&other.m_view, &m_view); + return *this; + } + + auto digest() const { + return m_view.digest; + } + + void lock() { + FLOCK_GROUP_VIEW_LOCK(&m_view); + } + + void unlock() { + FLOCK_GROUP_VIEW_UNLOCK(&m_view); + } + + void clear() { + flock_group_view_clear(&m_view); + } + + void addMember(uint64_t rank, const char* address, uint16_t provider_id) { + auto member = flock_group_view_add_member(&m_view, rank, provider_id, address); + if(!member) throw Exception{FLOCK_ERR_RANK_USED}; + } + + void removeMember(uint64_t rank) { + if(!flock_group_view_remove_member(&m_view, rank)) + throw Exception{FLOCK_ERR_NO_MEMBER}; + } + + Member findMember(uint64_t rank) const { + auto member = flock_group_view_find_member(&m_view, rank); + if(!member) throw Exception{FLOCK_ERR_NO_MEMBER}; + return Member{member->rank, member->provider_id, member->address}; + } + + std::vector members() const { + std::vector result; + result.reserve(m_view.members.size); + for(size_t i = 0; i < m_view.members.size; ++i) { + result.push_back(Member{ + m_view.members.data[i].rank, + m_view.members.data[i].provider_id, + m_view.members.data[i].address + }); + } + return result; + } + + size_t maxNumMembers() const { + return m_view.members.data[m_view.members.size-1].rank; + } + + size_t numLiveMembers() const { + return m_view.members.size; + } + + void setMetadata(const char* key, const char* value) { + flock_group_view_add_metadata(&m_view, key, value); + } + + void removeMetadata(const char* key) { + if(!flock_group_view_remove_metadata(&m_view, key)) + throw Exception{FLOCK_ERR_NO_METADATA}; + } + + std::map metadata() const { + std::map result; + for(size_t i = 0; i < m_view.metadata.size; ++i) { + result.insert( + std::make_pair( + m_view.metadata.data[i].key, + m_view.metadata.data[i].value)); + } + return result; + } + + std::string toString(margo_instance_id mid, uint64_t credentials = 0) const { + std::string result; + flock_return_t ret = flock_group_view_serialize( + mid, credentials, &m_view, + [](void* ctx, const char* content, size_t size) { + auto str = static_cast(ctx); + str->assign(content, size); + }, static_cast(&result)); + if(ret != FLOCK_SUCCESS) throw Exception{ret}; + return result; + } + + private: + + flock_group_view_t m_view = FLOCK_GROUP_VIEW_INITIALIZER; +}; + +} + +#endif diff --git a/include/flock/cxx/group.hpp b/include/flock/cxx/group.hpp new file mode 100644 index 0000000..31bebbd --- /dev/null +++ b/include/flock/cxx/group.hpp @@ -0,0 +1,198 @@ +/* + * (C) 2024 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#ifndef __FLOCK_GROUP_HPP +#define __FLOCK_GROUP_HPP + +#include +#include +#include +#include +#include +#include +#include + +namespace flock { + +class Client; + +class GroupHandle { + + friend class Client; + + public: + + GroupHandle() = default; + + GroupHandle(flock_group_handle_t gh, bool copy=true) + : m_gh(gh) { + if(copy && (m_gh != FLOCK_GROUP_HANDLE_NULL)) { + auto err = flock_group_handle_ref_incr(m_gh); + FLOCK_CONVERT_AND_THROW(err); + } + } + + GroupHandle(flock_client_t client, + hg_addr_t addr, + uint16_t provider_id, + bool check = true) + { + auto err = flock_group_handle_create(client, + addr, provider_id, check, &m_gh); + FLOCK_CONVERT_AND_THROW(err); + } + + GroupHandle(const GroupHandle& other) + : m_gh(other.m_gh) { + if(m_gh != FLOCK_GROUP_HANDLE_NULL) { + auto err = flock_group_handle_ref_incr(m_gh); + FLOCK_CONVERT_AND_THROW(err); + } + } + + GroupHandle(GroupHandle&& other) + : m_gh(other.m_gh) { + other.m_gh = FLOCK_GROUP_HANDLE_NULL; + } + + GroupHandle& operator=(const GroupHandle& other) { + if(m_gh == other.m_gh || &other == this) + return *this; + if(m_gh != FLOCK_GROUP_HANDLE_NULL) { + auto err = flock_group_handle_release(m_gh); + FLOCK_CONVERT_AND_THROW(err); + } + m_gh = other.m_gh; + if(m_gh != FLOCK_GROUP_HANDLE_NULL) { + auto err = flock_group_handle_ref_incr(m_gh); + FLOCK_CONVERT_AND_THROW(err); + } + return *this; + } + + GroupHandle& operator=(GroupHandle&& other) { + if(m_gh == other.m_gh || &other == this) + return *this; + if(m_gh != FLOCK_GROUP_HANDLE_NULL) { + auto err = flock_group_handle_release(m_gh); + FLOCK_CONVERT_AND_THROW(err); + } + m_gh = other.m_gh; + other.m_gh = FLOCK_GROUP_HANDLE_NULL; + return *this; + } + + ~GroupHandle() { + if(m_gh != FLOCK_GROUP_HANDLE_NULL) { + flock_group_handle_release(m_gh); + } + } + + static GroupHandle FromFile( + flock_client_t client, + const char* filename, + uint32_t mode = 0) { + flock_group_handle_t gh; + auto err = flock_group_handle_create_from_file(client, filename, mode, &gh); + FLOCK_CONVERT_AND_THROW(err); + return GroupHandle{gh}; + } + + static GroupHandle FromSerialized( + flock_client_t client, + std::string_view serialized_view, + uint32_t mode = 0) { + flock_group_handle_t gh; + auto err = flock_group_handle_create_from_serialized( + client, serialized_view.data(), serialized_view.size(), mode, &gh); + FLOCK_CONVERT_AND_THROW(err); + return GroupHandle{gh}; + } + + operator std::string() const { + std::string result; + auto err = flock_group_serialize(m_gh, + [](void* uargs, const char* content, size_t size) -> void { + auto s = static_cast(uargs); + *s = std::string{content, size}; + }, &result); + FLOCK_CONVERT_AND_THROW(err); + return result; + } + + template + void serialize(const Function& function) const { + struct { + const Function& cb; + std::exception_ptr ex; + } context{function, nullptr}; + auto err = flock_group_serialize(m_gh, + [](void* uargs, const char* content, size_t size) -> void { + auto context_ptr = static_cast(uargs); + try { + (context_ptr->cb)(std::string_view{content, size}); + } catch(...) { + context_ptr->ex = std::current_exception(); + } + }, &context); + if(context.ex) std::rethrow_exception(context.ex); + FLOCK_CONVERT_AND_THROW(err); + } + + void serializeToFile(const char* filename) const { + auto err = flock_group_serialize_to_file(m_gh, filename); + FLOCK_CONVERT_AND_THROW(err); + } + + uint64_t digest() const { + uint64_t d; + auto err = flock_group_digest(m_gh, &d); + FLOCK_CONVERT_AND_THROW(err); + return d; + } + + GroupView view() const { + GroupView v; + auto err = flock_group_get_view(m_gh, &v.m_view); + FLOCK_CONVERT_AND_THROW(err); + return v; + } + + size_t maxNumMembers() const { + size_t count; + auto err = flock_group_size(m_gh, &count); + FLOCK_CONVERT_AND_THROW(err); + return count; + } + + size_t numLiveMembers() const { + size_t count; + auto err = flock_group_live_member_count(m_gh, &count); + FLOCK_CONVERT_AND_THROW(err); + return count; + } + + void update() const { + auto err = flock_group_update_view(m_gh, NULL); + FLOCK_CONVERT_AND_THROW(err); + } + + auto handle() const { + return m_gh; + } + + operator flock_group_handle_t() const { + return m_gh; + } + + private: + + flock_group_handle_t m_gh = FLOCK_GROUP_HANDLE_NULL; + +}; + +} + +#endif diff --git a/include/flock/cxx/server.hpp b/include/flock/cxx/server.hpp new file mode 100644 index 0000000..43d175c --- /dev/null +++ b/include/flock/cxx/server.hpp @@ -0,0 +1,143 @@ +/* + * (C) 2024 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#ifndef __FLOCK_CLIENT_HPP +#define __FLOCK_CLIENT_HPP + +#include +#include +#include + +namespace flock { + +class Observer { + + public: + + Observer(const Observer&) = delete; + Observer(Observer&&) = delete; + + virtual ~Observer() = default; + + virtual void onMembershipUpdate( + flock_update_t update, + uint64_t rank, + const char* address, + uint16_t provider_id) = 0; + + virtual void onMetadataUpdate( + const char* key, + const char* value) = 0; + +}; + +class Provider { + + public: + + Provider() = default; + + Provider(margo_instance_id mid, + uint16_t provider_id, + const char* config, + GroupView& initial_view, + ABT_pool pool = ABT_POOL_NULL, + uint64_t credentials = 0) { + m_mid = mid; + flock_provider_args args; + args.backend = nullptr; + args.credentials = credentials; + args.initial_view = &initial_view.m_view; + args.pool = pool; + auto err = flock_provider_register(mid, provider_id, config, &args, &m_provider); + FLOCK_CONVERT_AND_THROW(err); + margo_provider_push_finalize_callback( + mid, this, [](void* ctx) { + auto self = static_cast(ctx); + self->m_provider = FLOCK_PROVIDER_NULL; + }, this); + } + + ~Provider() { + if(m_provider != FLOCK_PROVIDER_NULL) { + margo_provider_pop_finalize_callback(m_mid, this); + flock_provider_destroy(m_provider); + } + } + + Provider(Provider&& other) + : m_provider(other.m_provider) { + margo_provider_pop_finalize_callback(other.m_mid, &other); + other.m_provider = FLOCK_PROVIDER_NULL; + margo_provider_push_finalize_callback( + m_mid, this, [](void* ctx) { + auto self = static_cast(ctx); + self->m_provider = FLOCK_PROVIDER_NULL; + }, this); + } + + Provider(const Provider&) = delete; + + Provider& operator=(const Provider&) = delete; + + Provider& operator=(Provider&& other) { + if(this == &other) return *this; + this->~Provider(); + m_provider = other.m_provider; + other.m_provider = FLOCK_PROVIDER_NULL; + margo_provider_pop_finalize_callback(other.m_mid, &other); + other.m_provider = FLOCK_PROVIDER_NULL; + margo_provider_push_finalize_callback( + m_mid, this, [](void* ctx) { + auto self = static_cast(ctx); + self->m_provider = FLOCK_PROVIDER_NULL; + }, this); + return *this; + } + + void addObserver(Observer* observer) { + auto membership_update_fn = + [](void* ctx, flock_update_t update, size_t rank, const char* address, uint16_t provider_id) { + static_cast(ctx)->onMembershipUpdate(update, rank, address, provider_id); + }; + auto metadata_update_fn = + [](void* ctx, const char* key, const char* value) { + static_cast(ctx)->onMetadataUpdate(key, value); + }; + auto err = flock_provider_add_update_callbacks( + m_provider, membership_update_fn, metadata_update_fn, observer); + FLOCK_CONVERT_AND_THROW(err); + } + + void removeObserver(Observer* observer) { + auto err = flock_provider_remove_update_callbacks(m_provider, observer); + FLOCK_CONVERT_AND_THROW(err); + } + + std::string config() const { + auto cfg = flock_provider_get_config(m_provider); + if(!cfg) throw Exception{FLOCK_ERR_OTHER}; + auto result = std::string{cfg}; + free(cfg); + return result; + } + + auto handle() const { + return m_provider; + } + + operator flock_provider_t() const { + return handle(); + } + + private: + + margo_instance_id m_mid = MARGO_INSTANCE_NULL; + flock_provider_t m_provider = FLOCK_PROVIDER_NULL; +}; + +} + +#endif diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt new file mode 100644 index 0000000..1719652 --- /dev/null +++ b/python/CMakeLists.txt @@ -0,0 +1,24 @@ +add_library (pyflock_client MODULE src/py-flock-client.cpp) +target_link_libraries (pyflock_client PRIVATE pybind11::module flock-client PRIVATE coverage_config) +pybind11_extension (pyflock_client) +pybind11_strip (pyflock_client) + +add_library (pyflock_server MODULE src/py-flock-server.cpp) +target_link_libraries (pyflock_server PRIVATE pybind11::module flock-server PRIVATE coverage_config) +pybind11_extension (pyflock_server) +pybind11_strip (pyflock_server) + +add_library (pyflock_common MODULE src/py-flock-common.cpp) +target_link_libraries (pyflock_common PRIVATE pybind11::module flock-client PRIVATE coverage_config) +pybind11_extension (pyflock_common) +pybind11_strip (pyflock_common) + +set (PY_VERSION ${Python3_VERSION_MAJOR}.${Python3_VERSION_MINOR}) + +install (TARGETS pyflock_client pyflock_server pyflock_common + EXPORT flock-targets + ARCHIVE DESTINATION lib/python${PY_VERSION}/site-packages + LIBRARY DESTINATION lib/python${PY_VERSION}/site-packages) + +install (DIRECTORY mochi/bedrock + DESTINATION lib/python${PY_VERSION}/site-packages/mochi) diff --git a/python/src/.py-flock-common.cpp.swp b/python/src/.py-flock-common.cpp.swp new file mode 100644 index 0000000000000000000000000000000000000000..5a35f8630a9d3beb1dd944280754847a6b086947 GIT binary patch literal 16384 zcmeI2TZ|J`7{`y`g%xm(CSG_jL)WbBy4%vl80f+(E(jY~SgwXdXlAFU?ZkFwn3=9y z&0pyw=bOt}I<{oT@QAQHnI`z$NXYz(n1AJLCwc!mLMmew#jG$x@TaUwMLAef zv+lJ5riRoUP1T(G@+DPzB&Ai&qN=0}UBXxET%V+t%c_!;G%ZBrND^fr%0OTSDrUZK z*+R0kKfQ$Mx?+okn{V14=n@&D3`7}-G7x1T%0QHXC<9Rjq6|bC_4uFTjEr5;nGRFKJ_yD{B3Sa|R z3Fd-h^9XqpjDbbq8~mQ(Ab1+cU^z&D)7KL65qK3;zY`)To~{LX;;rWYwk0np$LLeOctm)#0Kh%oOQ%I#-#_7*42z#mB;($ey!s@sMsygN8r%P|wrV0fc$2sptS zMttMg8wm=jbjt$cajq?wZ-A>y)ndrpE?n2lKbn2cjKa?(q|z-5?vG=2mf9U8xFmw} z8yF{`f9_So(L3=uCf%{sjy!?&AmHEPnN#~dMsbFn)(dlX;`pNYn#9va!&!Z9_I>v( zk8k^#a@;2q&bQi;6Uec1Os?@ds%BA`G>pdgX27cfcNo{88ZQaS9L;AnUDc>=RlM0dzu?Sd*6M0S+ac3OGa1|1cwb>r;<3D}8)l!LyIzN2P%_fQ6P;vHJTKK zmEcUq7A1T&)`)^?8<%rb&&oOew46vTp?8k1I@Lv`$?`c-m?A_LG8xOk%7*pbNrja@ zofc%n#JK5%L^V@Z6(^NzoD-IFy@~0KR?*6C!OyJaGF_wVs%|HyoXn|GJRz(UVh^&) z5v-p>3)U~C1v+6;#Xuji0W!Mtf#Hp-)9LKUrq$awticM`mN091^a|yK@HqA%xCUi~ z3PY<#w?frFGQ2vT%}SzX;I?lB0sPs3~b;B2{3wyuNYV^O8kP z8{}&0#o(7Xu~UXwN*3MDb&K14$ueYfpgg7e+;UC?fVyj^Vs8m1&)BH0NeMcK?nF5d;F8&D0mX=2DgLD*yCRU z(I?8le<=f#q>Cy!IbVCAHy+6B@jlrbXcS_Gq|2Ha%ZiO>FY{v``&}0A0+ld9!e~4l zwQvYB2TVA~4Ue+=d}mviAYmignGPWjlmr^5G0}Pfgbi#tw5PTz%lfxoU+siwrEO)e zm3t@=yCy;li8HZwJ=)bf{nRf{e*k~G;UEN=rgLdbvtn9~Bg8mENNRVru)66^#m;Wq z-N0`;K2~@23kPrOruTjyKCG{MU(;1Jv-J$|3^lMj`{w1-_?mzB2(9>mow{zaQ||tJ zNB-GbSCj55uwIV!Sq7|m$e=!%uUYR1UuNm0!L#6Y{J>s4`Sf2Od{65v-&X4~L|Uik zJv}crCTA&U9szG=r5QyDat?UM;+1MpC`TVggIB~BGQZj1wo8p2X0wt5yW!umTqD6y Tyyb1#S_BO;*Ve}tOp?C +#include +#include +#include +#include + +namespace py11 = pybind11; +using namespace pybind11::literals; + +struct margo_instance; +typedef struct margo_instance* margo_instance_id; +typedef py11::capsule pymargo_instance_id; + +#define MID2CAPSULE(__mid) py11::capsule((void*)(__mid), "margo_instance_id") +#define CAPSULE2MID(__caps) (margo_instance_id)(__caps) + +PYBIND11_MODULE(pyflock_client, m) { + m.doc() = "Flock client python extension"; +#if 0 + py11::register_exception(m, "Exception", PyExc_RuntimeError); + py11::class_(m, "Client") + .def(py11::init()) + .def_property_readonly("margo_instance_id", + [](const Client& client) { + return MID2CAPSULE(client.engine().get_margo_instance()); + }) + .def("make_service_handle", + &Client::makeServiceHandle, + "Create a ServiceHandle instance", + "address"_a, + "provider_id"_a=0) + .def("make_service_group_handle", + [](const Client& client, + const std::string& groupfile, + uint16_t provider_id) { + return client.makeServiceGroupHandle(groupfile, provider_id); + }, + "Create a ServiceHandle instance", + "group_file"_a, + "provider_id"_a=0) + .def("make_service_group_handle", + [](const Client& client, + const std::vector& addresses, + uint16_t provider_id) { + return client.makeServiceGroupHandle(addresses, provider_id); + }, + "Create a ServiceHandle instance", + "addresses"_a, + "provider_id"_a=0) + .def("make_service_group_handle", + [](const Client& client, + uint64_t gid, + uint16_t provider_id) { + return client.makeServiceGroupHandle(gid, provider_id); + }, + "Create a ServiceHandle instance", + "group_id"_a, + "provider_id"_a=0); + py11::class_(m, "ServiceHandle") + .def_property_readonly("client", &ServiceHandle::client) + .def_property_readonly("address", [](const ServiceHandle& sh) { + return static_cast(sh.providerHandle()); + }) + .def_property_readonly("provider_id", [](const ServiceHandle& sh) { + return sh.providerHandle().provider_id(); + }) + .def("get_config", + [](const ServiceHandle& sh) { + std::string config; + sh.getConfig(&config); + return config; + }) + .def("query_config", + [](const ServiceHandle& sh, const std::string& script) { + std::string result; + sh.queryConfig(script, &result); + return result; + }, "script"_a) + .def("load_module", + [](const ServiceHandle& sh, + const std::string& name, + const std::string& path) { + sh.loadModule(name, path); + }, "name"_a, "path"_a) + .def("add_provider", + [](const ServiceHandle& sh, + const std::string& description) { + uint16_t provider_id_out; + sh.addProvider(description, &provider_id_out); + return provider_id_out; + }, "description"_a) + .def("change_provider_pool", + [](const ServiceHandle& sh, + const std::string& provider_name, + const std::string& pool_name) { + sh.changeProviderPool(provider_name, pool_name); + }) + .def("add_client", + [](const ServiceHandle& sh, + const std::string& description) { + sh.addClient(description); + }, "description"_a) + .def("add_abtio_instance", + [](const ServiceHandle& sh, + const std::string& description) { + sh.addABTioInstance(description); + }, "description"_a = std::string("{}")) + .def("add_ssg_group", [](const ServiceHandle& sh, const std::string& config) { + sh.addSSGgroup(config); + }, + "description"_a) + .def("add_pool", [](const ServiceHandle& sh, const std::string& config) { + sh.addPool(config); + }, + "description"_a) + .def("remove_pool", [](const ServiceHandle& sh, const std::string& pool_name) { + sh.removePool(pool_name); + }, + "name"_a) + .def("add_xstream", [](const ServiceHandle& sh, const std::string& config) { + sh.addXstream(config); + }, + "description"_a) + .def("remove_xstream", [](const ServiceHandle& sh, const std::string& es_name) { + sh.removeXstream(es_name); + }, + "name"_a) + ; + py11::class_(m, "ServiceGroupHandle") + .def("refresh", &ServiceGroupHandle::refresh) + .def_property_readonly("size", &ServiceGroupHandle::size) + .def_property_readonly("client", &ServiceGroupHandle::client) + .def("__getitem__", &ServiceGroupHandle::operator[]) + .def("get_config", + [](const ServiceGroupHandle& sh) { + std::string config; + sh.getConfig(&config); + return config; + }) + .def("query_config", + [](const ServiceGroupHandle& sh, const std::string& script) { + std::string result; + sh.queryConfig(script, &result); + return result; + }, "script"_a) + ; +#endif +} diff --git a/python/src/py-flock-common.cpp b/python/src/py-flock-common.cpp new file mode 100644 index 0000000..bf3d090 --- /dev/null +++ b/python/src/py-flock-common.cpp @@ -0,0 +1,41 @@ +/* + * (C) 2018 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#include +#include +#include +#include + +namespace py11 = pybind11; +using namespace pybind11::literals; + +struct margo_instance; +typedef struct margo_instance* margo_instance_id; +typedef py11::capsule pymargo_instance_id; + +#define MID2CAPSULE(__mid) py11::capsule((void*)(__mid), "margo_instance_id") +#define CAPSULE2MID(__caps) (margo_instance_id)(__caps) + +PYBIND11_MODULE(pyflock_common, m) { + m.doc() = "Flock common python extension"; + py11::register_exception(m, "Exception", PyExc_RuntimeError); + + py11::class_(m, "Member") + .def_readonly("rank", &flock::GroupView::Member::rank) + .def_readonly("provider_id", &flock::GroupView::Member::provider_id) + .def_readonly("address", &flock::GroupView::Member::address); + + py11::class_(m, "GroupView") + .def(py11::init<>()) + .def_property_readonly("digest", &flock::GroupView::digest) + .def("clear", &flock::GroupView::clear) + .def("lock", &flock::GroupView::lock) + .def("unlock", &flock::GroupView::unlock) + .def("add_member", &flock::GroupView::addMember) + .def("remove_member", &flock::GroupView::removeMember) + .def("find_member", &flock::GroupView::findMember) + .def_property_readonly("members", &flock::GroupView::members) + ; +} diff --git a/python/src/py-flock-server.cpp b/python/src/py-flock-server.cpp new file mode 100644 index 0000000..d5ae1c5 --- /dev/null +++ b/python/src/py-flock-server.cpp @@ -0,0 +1,248 @@ +/* + * (C) 2018 The University of Chicago + * + * See COPYRIGHT in top-level directory. + */ +#include +#include + +namespace py11 = pybind11; +using namespace pybind11::literals; + +struct margo_instance; +typedef struct margo_instance* margo_instance_id; +typedef struct hg_addr* hg_addr_t; +typedef py11::capsule pymargo_instance_id; +typedef py11::capsule pyhg_addr_t; + +#define MID2CAPSULE(__mid) py11::capsule((void*)(__mid), "margo_instance_id") +#define CAPSULE2MID(__caps) (margo_instance_id)(__caps) + +#define ADDR2CAPSULE(__addr) py11::capsule((void*)(__addr), "hg_addr_t") +#define CAPSULE2ADDR(__caps) (hg_add_t)(__caps) + +PYBIND11_MODULE(pybedrock_server, m) { + m.doc() = "Flock server python extension"; +#if 0 + py11::register_exception(m, "Exception", PyExc_RuntimeError); + + py11::class_> named_dep(m, "NamedDependency"); + named_dep + .def_property_readonly("name", + [](std::shared_ptr nd) { return nd->getName(); }) + .def_property_readonly("type", + [](std::shared_ptr nd) { return nd->getType(); }) + .def_property_readonly("handle", + [](std::shared_ptr nd) { return nd->getHandle(); }) + ; + + py11::class_> (m, "ProviderDependency", named_dep) + .def_property_readonly("provider_id", + [](std::shared_ptr nd) { return nd->getProviderID(); }) + ; + + py11::class_> server(m, "Server"); + + py11::enum_(server, "ConfigType") + .value("JSON", ConfigType::JSON) + .value("JX9", ConfigType::JX9) + .export_values(); + + server + .def(py11::init([](const std::string& address, const std::string& config, + ConfigType configType, + const Jx9ParamMap& jx9Params) { + return std::make_shared(address, config, configType, jx9Params); + }), "address"_a, "config"_a="{}", "config_type"_a=ConfigType::JSON, + "jx9_params"_a=Jx9ParamMap{}) + .def("wait_for_finalize", + [](std::shared_ptr server) { + server->waitForFinalize(); + }) + .def("finalize", + [](std::shared_ptr server) { + server->finalize(); + }) + .def_property_readonly("margo_instance_id", + [](std::shared_ptr server) { + return MID2CAPSULE(server->getMargoManager().getMargoInstance()); + }) + .def_property_readonly("config", + [](std::shared_ptr server) { + return server->getCurrentConfig(); + }) + .def_property_readonly("margo_manager", + [](std::shared_ptr server) { + return server->getMargoManager(); + }) + .def_property_readonly("abtio_manager", + [](std::shared_ptr server) { + return server->getABTioManager(); + }) + .def_property_readonly("provider_manager", + [](std::shared_ptr server) { + return server->getProviderManager(); + }) + .def_property_readonly("client_manager", + [](std::shared_ptr server) { + return server->getClientManager(); + }) + .def_property_readonly("ssg_manager", + [](std::shared_ptr server) { + return server->getSSGManager(); + }) + ; + + py11::class_ (m, "MargoManager") + .def_property_readonly("margo_instance_id", [](const MargoManager& m) { + return MID2CAPSULE(m.getMargoInstance()); + }) + .def_property_readonly("default_handler_pool", &MargoManager::getDefaultHandlerPool) + .def_property_readonly("config", &MargoManager::getCurrentConfig) + .def("get_pool", [](const MargoManager& m, const std::string& pool_name) { + return m.getPool(pool_name); + }, "name"_a) + .def("get_pool", [](const MargoManager& m, uint32_t index) { + return m.getPool(index); + }, "index"_a) + .def("add_pool", &MargoManager::addPool, + "config"_a) + .def("remove_pool", [](MargoManager& m, const std::string& pool_name) { + return m.removePool(pool_name); + }, "name"_a) + .def("remove_pool", [](MargoManager& m, uint32_t index) { + return m.removePool(index); + }, "index"_a) + .def_property_readonly("num_pools", &MargoManager::getNumPools) + .def("get_xstream", [](const MargoManager& m, const std::string& es_name) { + return m.getXstream(es_name); + }, "name"_a) + .def("get_xstream", [](const MargoManager& m, uint32_t index) { + return m.getXstream(index); + }, "index"_a) + .def("add_xstream", &MargoManager::addXstream, + "config"_a) + .def("remove_xstream", [](MargoManager& m, const std::string& es_name) { + return m.removeXstream(es_name); + }, "name"_a) + .def("remove_xstream", [](MargoManager& m, uint32_t index) { + return m.removeXstream(index); + }, "index"_a) + .def_property_readonly("num_xstreams", &MargoManager::getNumXstreams) + ; + + py11::class_ (m, "SSGManager") + .def_property_readonly("config", [](const SSGManager& manager) { + return manager.getCurrentConfig().dump(); + }) + .def_property_readonly("num_groups", &SSGManager::getNumGroups) + .def("get_group", [](const SSGManager& ssg, const std::string& name) { + return ssg.getGroup(name); + }, "name_a") + .def("get_group", [](const SSGManager& ssg, size_t index) { + return ssg.getGroup(index); + }, "index_a") + .def("add_group", + [](SSGManager& ssg, + const std::string& name, + const py11::dict& config, + const std::shared_ptr& pool, + const std::string& bootstrap_method, + const std::string& group_file, + int64_t credential) { +#ifdef ENABLE_SSG + ssg_group_config_t cfg = SSG_GROUP_CONFIG_INITIALIZER; + cfg.ssg_credential = credential; +#define GET_SSG_FIELD(__field__) do { \ + if(config.contains(#__field__)) \ + cfg.swim_##__field__ = config[#__field__].cast(); \ + } while(0) + GET_SSG_FIELD(period_length_ms); + GET_SSG_FIELD(suspect_timeout_periods); + GET_SSG_FIELD(subgroup_member_count); + GET_SSG_FIELD(disabled); +#undef GET_SSG_FIELD + return ssg.addGroup(name, cfg, pool, bootstrap_method, group_file); +#else + throw Exception{"Bedrock was not compiled with SSG support"}; +#endif + }, "name"_a, "swim"_a=py11::dict{}, + "pool"_a=nullptr, "bootstrap"_a="init", + "group_file"_a="", "credential"_a=-1) + .def("resolve_address", [](const SSGManager& ssg, const std::string& address) { + return ADDR2CAPSULE(ssg.resolveAddress(address)); + }, "address"_a) + ; + + py11::class_ (m, "ABTioManager") + .def_property_readonly("config", [](const ABTioManager& manager) { + return manager.getCurrentConfig().dump(); + }) + .def_property_readonly("num_abtio_instances", &ABTioManager::numABTioInstances) + .def("get_abtio_instance", [](const ABTioManager& abtio, const std::string& name) { + return abtio.getABTioInstance(name); + }, "name"_a) + .def("get_abtio_instance", [](const ABTioManager& abtio, size_t index) { + return abtio.getABTioInstance(index); + }, "index"_a) + .def("add_abtio_instance", &ABTioManager::addABTioInstance) + ; + + py11::class_ (m, "ProviderManager") + .def_property_readonly("config", &ProviderManager::getCurrentConfig) + .def_property_readonly("num_providers", &ProviderManager::numProviders) + .def("get_provider", [](const ProviderManager& pm, size_t index) { + return pm.getProvider(index); + }, "index"_a) + .def("get_provider", [](const ProviderManager& pm, const std::string& name) { + return pm.getProvider(name); + }, "name"_a) + .def("lookup_provider", &ProviderManager::lookupProvider, + "spec"_a) + .def("deregister_provider", + &ProviderManager::deregisterProvider, + "spec"_a) + .def("add_provider", + &ProviderManager::addProviderFromJSON, + "description"_a) + .def("change_pool", + &ProviderManager::changeProviderPool, + "provider"_a, "pool"_a) + .def("migrate_provider", + &ProviderManager::migrateProvider, + "provider"_a, "dest_addr"_a, "dest_provider_id"_a, + "migration_config"_a, "remove_source"_a) + .def("snapshot_provider", + &ProviderManager::snapshotProvider, + "provider"_a, "dest_path"_a, "snapshot_config"_a, "remove_source"_a) + .def("restore_provider", + &ProviderManager::restoreProvider, + "provider"_a, "src_path"_a, "restore_config"_a) + ; + + py11::class_ (m, "ClientManager") + .def_property_readonly("config", &ClientManager::getCurrentConfig) + .def("get_client", [](const ClientManager& cm, const std::string& name) { + return cm.getClient(name); + }, + "name"_a) + .def("get_client", [](const ClientManager& cm, size_t index) { + return cm.getClient(index); + }, + "index"_a) + .def_property_readonly("num_clients", &ClientManager::numClients) + .def("remove_client", [](ClientManager& cm, const std::string& name) { + return cm.removeClient(name); + }, + "name"_a) + .def("remove_client", [](ClientManager& cm, size_t index) { + return cm.removeClient(index); + }, + "index"_a) + .def("get_client_or_create", &ClientManager::getOrCreateAnonymous, + "type"_a) + .def("add_client", &ClientManager::addClientFromJSON, + "description"_a) + ; +#endif +} diff --git a/spack.yaml b/spack.yaml index 9f4a121..a0d2189 100644 --- a/spack.yaml +++ b/spack.yaml @@ -5,6 +5,8 @@ spack: - mochi-margo - json-c - mochi-bedrock + - py-pybind11 + - py-mochi-margo - mpi concretizer: unify: true diff --git a/tests/spack.yaml b/tests/spack.yaml index f5218e9..74adbb7 100644 --- a/tests/spack.yaml +++ b/tests/spack.yaml @@ -6,6 +6,8 @@ spack: - json-c - mochi-bedrock - mpich + - py-pybind11 + - py-mochi-margo concretizer: unify: true reuse: true From a2f5f3e5673ecc9451421baea770e12fce772cb3 Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Thu, 6 Jun 2024 09:40:47 +0100 Subject: [PATCH 02/10] removed swap file --- python/src/.py-flock-common.cpp.swp | Bin 16384 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 python/src/.py-flock-common.cpp.swp diff --git a/python/src/.py-flock-common.cpp.swp b/python/src/.py-flock-common.cpp.swp deleted file mode 100644 index 5a35f8630a9d3beb1dd944280754847a6b086947..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 16384 zcmeI2TZ|J`7{`y`g%xm(CSG_jL)WbBy4%vl80f+(E(jY~SgwXdXlAFU?ZkFwn3=9y z&0pyw=bOt}I<{oT@QAQHnI`z$NXYz(n1AJLCwc!mLMmew#jG$x@TaUwMLAef zv+lJ5riRoUP1T(G@+DPzB&Ai&qN=0}UBXxET%V+t%c_!;G%ZBrND^fr%0OTSDrUZK z*+R0kKfQ$Mx?+okn{V14=n@&D3`7}-G7x1T%0QHXC<9Rjq6|bC_4uFTjEr5;nGRFKJ_yD{B3Sa|R z3Fd-h^9XqpjDbbq8~mQ(Ab1+cU^z&D)7KL65qK3;zY`)To~{LX;;rWYwk0np$LLeOctm)#0Kh%oOQ%I#-#_7*42z#mB;($ey!s@sMsygN8r%P|wrV0fc$2sptS zMttMg8wm=jbjt$cajq?wZ-A>y)ndrpE?n2lKbn2cjKa?(q|z-5?vG=2mf9U8xFmw} z8yF{`f9_So(L3=uCf%{sjy!?&AmHEPnN#~dMsbFn)(dlX;`pNYn#9va!&!Z9_I>v( zk8k^#a@;2q&bQi;6Uec1Os?@ds%BA`G>pdgX27cfcNo{88ZQaS9L;AnUDc>=RlM0dzu?Sd*6M0S+ac3OGa1|1cwb>r;<3D}8)l!LyIzN2P%_fQ6P;vHJTKK zmEcUq7A1T&)`)^?8<%rb&&oOew46vTp?8k1I@Lv`$?`c-m?A_LG8xOk%7*pbNrja@ zofc%n#JK5%L^V@Z6(^NzoD-IFy@~0KR?*6C!OyJaGF_wVs%|HyoXn|GJRz(UVh^&) z5v-p>3)U~C1v+6;#Xuji0W!Mtf#Hp-)9LKUrq$awticM`mN091^a|yK@HqA%xCUi~ z3PY<#w?frFGQ2vT%}SzX;I?lB0sPs3~b;B2{3wyuNYV^O8kP z8{}&0#o(7Xu~UXwN*3MDb&K14$ueYfpgg7e+;UC?fVyj^Vs8m1&)BH0NeMcK?nF5d;F8&D0mX=2DgLD*yCRU z(I?8le<=f#q>Cy!IbVCAHy+6B@jlrbXcS_Gq|2Ha%ZiO>FY{v``&}0A0+ld9!e~4l zwQvYB2TVA~4Ue+=d}mviAYmignGPWjlmr^5G0}Pfgbi#tw5PTz%lfxoU+siwrEO)e zm3t@=yCy;li8HZwJ=)bf{nRf{e*k~G;UEN=rgLdbvtn9~Bg8mENNRVru)66^#m;Wq z-N0`;K2~@23kPrOruTjyKCG{MU(;1Jv-J$|3^lMj`{w1-_?mzB2(9>mow{zaQ||tJ zNB-GbSCj55uwIV!Sq7|m$e=!%uUYR1UuNm0!L#6Y{J>s4`Sf2Od{65v-&X4~L|Uik zJv}crCTA&U9szG=r5QyDat?UM;+1MpC`TVggIB~BGQZj1wo8p2X0wt5yW!umTqD6y Tyyb1#S_BO;*Ve}tOp?C Date: Fri, 7 Jun 2024 15:38:29 +0100 Subject: [PATCH 03/10] updated C++ and python wrappers --- CMakeLists.txt | 2 + include/flock/cxx/client.hpp | 15 +- include/flock/cxx/group-view.hpp | 160 ++++++++++++++-------- include/flock/cxx/group.hpp | 56 -------- include/flock/cxx/server.hpp | 22 ++- python/CMakeLists.txt | 6 +- python/src/py-flock-client.cpp | 144 ++++---------------- python/src/py-flock-common.cpp | 39 +++++- python/src/py-flock-server.cpp | 226 +------------------------------ spack.yaml | 1 + src/CMakeLists.txt | 8 +- tests/spack.yaml | 1 + 12 files changed, 207 insertions(+), 473 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 071ce5d..a07ff11 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -65,6 +65,8 @@ endif () pkg_check_modules (margo REQUIRED IMPORTED_TARGET margo) # search for json-c pkg_check_modules (json-c REQUIRED IMPORTED_TARGET json-c) +# search for thallium +find_package (thallium REQUIRED) if (ENABLE_MPI) find_package (MPI REQUIRED) diff --git a/include/flock/cxx/client.hpp b/include/flock/cxx/client.hpp index c1f0a83..91ad41c 100644 --- a/include/flock/cxx/client.hpp +++ b/include/flock/cxx/client.hpp @@ -6,6 +6,7 @@ #ifndef __FLOCK_CLIENT_HPP #define __FLOCK_CLIENT_HPP +#include #include #include #include @@ -19,11 +20,18 @@ class Client { Client() = default; - Client(margo_instance_id mid, ABT_pool pool = ABT_POOL_NULL) { + Client(margo_instance_id mid, ABT_pool pool = ABT_POOL_NULL) + : m_engine{mid} { auto err = flock_client_init(mid, pool, &m_client); FLOCK_CONVERT_AND_THROW(err); } + Client(const thallium::engine& engine, thallium::pool pool = thallium::pool()) + : m_engine{engine} { + auto err = flock_client_init(engine.get_margo_instance(), pool.native_handle(), &m_client); + FLOCK_CONVERT_AND_THROW(err); + } + ~Client() { if(m_client != FLOCK_CLIENT_NULL) { flock_client_finalize(m_client); @@ -68,8 +76,13 @@ class Client { return m_client; } + auto engine() const { + return m_engine; + } + private: + thallium::engine m_engine; flock_client_t m_client = FLOCK_CLIENT_NULL; }; diff --git a/include/flock/cxx/group-view.hpp b/include/flock/cxx/group-view.hpp index 3fd69fe..07e91d1 100644 --- a/include/flock/cxx/group-view.hpp +++ b/include/flock/cxx/group-view.hpp @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -27,9 +28,98 @@ class GroupView { public: struct Member { - uint64_t rank; - uint16_t provider_id; std::string address; + uint16_t provider_id; + }; + + struct Metadata { + std::string key; + std::string value; + }; + + struct MembersProxy { + + private: + + friend class GroupView; + + GroupView& m_owner; + + MembersProxy(GroupView& gv) + : m_owner(gv) {} + + public: + + MembersProxy(MembersProxy&&) = default; + + void add(const char* address, uint16_t provider_id) { + flock_group_view_add_member(&m_owner.m_view, address, provider_id); + } + + void remove(size_t index) { + auto member = flock_group_view_member_at(&m_owner.m_view, index); + if(!flock_group_view_remove_member(&m_owner.m_view, member)) + throw Exception{FLOCK_ERR_NO_MEMBER}; + } + + void remove(const char* address, uint16_t provider_id) { + auto member = flock_group_view_find_member(&m_owner.m_view, address, provider_id); + if(!flock_group_view_remove_member(&m_owner.m_view, member)) + throw Exception{FLOCK_ERR_NO_MEMBER}; + } + + bool exists(const char* address, uint16_t provider_id) const { + return static_cast(flock_group_view_find_member( + const_cast(&m_owner.m_view), address, provider_id)); + } + + size_t count() const { + return flock_group_view_member_count(&m_owner.m_view); + } + + Member operator[](size_t i) const { + if(i >= count()) throw std::out_of_range{"Invalid member index"}; + auto member = flock_group_view_member_at(&m_owner.m_view, i); + return Member{member->address, member->provider_id}; + } + }; + + struct MetadataProxy { + + private: + + friend class GroupView; + + GroupView& m_owner; + + MetadataProxy(GroupView& gv) + : m_owner(gv) {} + + public: + + MetadataProxy(MetadataProxy&&) = default; + + void add(const char* key, const char* value) { + flock_group_view_add_metadata(&m_owner.m_view, key, value); + } + + void remove(const char* key) { + flock_group_view_remove_metadata(&m_owner.m_view, key); + } + + size_t count() const { + return flock_group_view_metadata_count(&m_owner.m_view); + } + + Metadata operator[](size_t i) const { + if(i >= count()) throw std::out_of_range{"Invalid metadata index"}; + auto metadata = flock_group_view_metadata_at(&m_owner.m_view, i); + return Metadata{metadata->key, metadata->value}; + } + + const char* operator[](const char* key) const { + return flock_group_view_find_metadata(&m_owner.m_view, key); + } }; GroupView() = default; @@ -39,7 +129,7 @@ class GroupView { } ~GroupView() { - flock_group_view_clear(&m_view); + clear(); } GroupView(GroupView&& other) { @@ -48,7 +138,7 @@ class GroupView { GroupView& operator=(GroupView&& other) { if(this == &other) return *this; - flock_group_view_clear(&m_view); + clear(); FLOCK_GROUP_VIEW_MOVE(&other.m_view, &m_view); return *this; } @@ -69,68 +159,18 @@ class GroupView { flock_group_view_clear(&m_view); } - void addMember(uint64_t rank, const char* address, uint16_t provider_id) { - auto member = flock_group_view_add_member(&m_view, rank, provider_id, address); - if(!member) throw Exception{FLOCK_ERR_RANK_USED}; - } - - void removeMember(uint64_t rank) { - if(!flock_group_view_remove_member(&m_view, rank)) - throw Exception{FLOCK_ERR_NO_MEMBER}; + auto members() { + return MembersProxy{*this}; } - Member findMember(uint64_t rank) const { - auto member = flock_group_view_find_member(&m_view, rank); - if(!member) throw Exception{FLOCK_ERR_NO_MEMBER}; - return Member{member->rank, member->provider_id, member->address}; - } - - std::vector members() const { - std::vector result; - result.reserve(m_view.members.size); - for(size_t i = 0; i < m_view.members.size; ++i) { - result.push_back(Member{ - m_view.members.data[i].rank, - m_view.members.data[i].provider_id, - m_view.members.data[i].address - }); - } - return result; - } - - size_t maxNumMembers() const { - return m_view.members.data[m_view.members.size-1].rank; - } - - size_t numLiveMembers() const { - return m_view.members.size; - } - - void setMetadata(const char* key, const char* value) { - flock_group_view_add_metadata(&m_view, key, value); - } - - void removeMetadata(const char* key) { - if(!flock_group_view_remove_metadata(&m_view, key)) - throw Exception{FLOCK_ERR_NO_METADATA}; - } - - std::map metadata() const { - std::map result; - for(size_t i = 0; i < m_view.metadata.size; ++i) { - result.insert( - std::make_pair( - m_view.metadata.data[i].key, - m_view.metadata.data[i].value)); - } - return result; + auto metadata() { + return MetadataProxy{*this}; } - std::string toString(margo_instance_id mid, uint64_t credentials = 0) const { + operator std::string() const { std::string result; flock_return_t ret = flock_group_view_serialize( - mid, credentials, &m_view, - [](void* ctx, const char* content, size_t size) { + &m_view, [](void* ctx, const char* content, size_t size) { auto str = static_cast(ctx); str->assign(content, size); }, static_cast(&result)); diff --git a/include/flock/cxx/group.hpp b/include/flock/cxx/group.hpp index 31bebbd..8182599 100644 --- a/include/flock/cxx/group.hpp +++ b/include/flock/cxx/group.hpp @@ -111,48 +111,6 @@ class GroupHandle { return GroupHandle{gh}; } - operator std::string() const { - std::string result; - auto err = flock_group_serialize(m_gh, - [](void* uargs, const char* content, size_t size) -> void { - auto s = static_cast(uargs); - *s = std::string{content, size}; - }, &result); - FLOCK_CONVERT_AND_THROW(err); - return result; - } - - template - void serialize(const Function& function) const { - struct { - const Function& cb; - std::exception_ptr ex; - } context{function, nullptr}; - auto err = flock_group_serialize(m_gh, - [](void* uargs, const char* content, size_t size) -> void { - auto context_ptr = static_cast(uargs); - try { - (context_ptr->cb)(std::string_view{content, size}); - } catch(...) { - context_ptr->ex = std::current_exception(); - } - }, &context); - if(context.ex) std::rethrow_exception(context.ex); - FLOCK_CONVERT_AND_THROW(err); - } - - void serializeToFile(const char* filename) const { - auto err = flock_group_serialize_to_file(m_gh, filename); - FLOCK_CONVERT_AND_THROW(err); - } - - uint64_t digest() const { - uint64_t d; - auto err = flock_group_digest(m_gh, &d); - FLOCK_CONVERT_AND_THROW(err); - return d; - } - GroupView view() const { GroupView v; auto err = flock_group_get_view(m_gh, &v.m_view); @@ -160,20 +118,6 @@ class GroupHandle { return v; } - size_t maxNumMembers() const { - size_t count; - auto err = flock_group_size(m_gh, &count); - FLOCK_CONVERT_AND_THROW(err); - return count; - } - - size_t numLiveMembers() const { - size_t count; - auto err = flock_group_live_member_count(m_gh, &count); - FLOCK_CONVERT_AND_THROW(err); - return count; - } - void update() const { auto err = flock_group_update_view(m_gh, NULL); FLOCK_CONVERT_AND_THROW(err); diff --git a/include/flock/cxx/server.hpp b/include/flock/cxx/server.hpp index 43d175c..5dd9559 100644 --- a/include/flock/cxx/server.hpp +++ b/include/flock/cxx/server.hpp @@ -6,9 +6,11 @@ #ifndef __FLOCK_CLIENT_HPP #define __FLOCK_CLIENT_HPP +#include #include #include #include +#include namespace flock { @@ -23,7 +25,6 @@ class Observer { virtual void onMembershipUpdate( flock_update_t update, - uint64_t rank, const char* address, uint16_t provider_id) = 0; @@ -43,12 +44,10 @@ class Provider { uint16_t provider_id, const char* config, GroupView& initial_view, - ABT_pool pool = ABT_POOL_NULL, - uint64_t credentials = 0) { + ABT_pool pool = ABT_POOL_NULL) { m_mid = mid; flock_provider_args args; args.backend = nullptr; - args.credentials = credentials; args.initial_view = &initial_view.m_view; args.pool = pool; auto err = flock_provider_register(mid, provider_id, config, &args, &m_provider); @@ -60,6 +59,17 @@ class Provider { }, this); } + Provider(const thallium::engine& engine, + uint16_t provider_id, + const char* config, + GroupView& initial_view, + const thallium::pool& pool = thallium::pool()) + : Provider(engine.get_margo_instance(), + provider_id, + config, + initial_view, + pool.native_handle()) {} + ~Provider() { if(m_provider != FLOCK_PROVIDER_NULL) { margo_provider_pop_finalize_callback(m_mid, this); @@ -99,8 +109,8 @@ class Provider { void addObserver(Observer* observer) { auto membership_update_fn = - [](void* ctx, flock_update_t update, size_t rank, const char* address, uint16_t provider_id) { - static_cast(ctx)->onMembershipUpdate(update, rank, address, provider_id); + [](void* ctx, flock_update_t update, const char* address, uint16_t provider_id) { + static_cast(ctx)->onMembershipUpdate(update, address, provider_id); }; auto metadata_update_fn = [](void* ctx, const char* key, const char* value) { diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 1719652..90505ef 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -1,15 +1,15 @@ add_library (pyflock_client MODULE src/py-flock-client.cpp) -target_link_libraries (pyflock_client PRIVATE pybind11::module flock-client PRIVATE coverage_config) +target_link_libraries (pyflock_client PRIVATE pybind11::module flock-cxx-headers PRIVATE coverage_config) pybind11_extension (pyflock_client) pybind11_strip (pyflock_client) add_library (pyflock_server MODULE src/py-flock-server.cpp) -target_link_libraries (pyflock_server PRIVATE pybind11::module flock-server PRIVATE coverage_config) +target_link_libraries (pyflock_server PRIVATE pybind11::module flock-cxx-headers PRIVATE coverage_config) pybind11_extension (pyflock_server) pybind11_strip (pyflock_server) add_library (pyflock_common MODULE src/py-flock-common.cpp) -target_link_libraries (pyflock_common PRIVATE pybind11::module flock-client PRIVATE coverage_config) +target_link_libraries (pyflock_common PRIVATE pybind11::module flock-cxx-headers PRIVATE coverage_config) pybind11_extension (pyflock_common) pybind11_strip (pyflock_common) diff --git a/python/src/py-flock-client.cpp b/python/src/py-flock-client.cpp index 9f54a31..bffb2cd 100644 --- a/python/src/py-flock-client.cpp +++ b/python/src/py-flock-client.cpp @@ -21,133 +21,41 @@ typedef py11::capsule pymargo_instance_id; PYBIND11_MODULE(pyflock_client, m) { m.doc() = "Flock client python extension"; -#if 0 - py11::register_exception(m, "Exception", PyExc_RuntimeError); - py11::class_(m, "Client") + py11::module_::import("pyflock_common"); + + py11::class_(m, "Client") .def(py11::init()) .def_property_readonly("margo_instance_id", - [](const Client& client) { + [](const flock::Client& client) { return MID2CAPSULE(client.engine().get_margo_instance()); }) - .def("make_service_handle", - &Client::makeServiceHandle, - "Create a ServiceHandle instance", - "address"_a, - "provider_id"_a=0) - .def("make_service_group_handle", - [](const Client& client, - const std::string& groupfile, + .def("make_group_handle", + [](const flock::Client& client, + const std::string& address, uint16_t provider_id) { - return client.makeServiceGroupHandle(groupfile, provider_id); + auto addr = client.engine().lookup(address); + return client.makeGroupHandle(addr.get_addr(), provider_id); }, - "Create a ServiceHandle instance", - "group_file"_a, + "Create a GroupHandle instance", + "address"_a, "provider_id"_a=0) - .def("make_service_group_handle", - [](const Client& client, - const std::vector& addresses, - uint16_t provider_id) { - return client.makeServiceGroupHandle(addresses, provider_id); + .def("make_group_handle_from_file", + [](const flock::Client& client, + const std::string& filename) { + return flock::GroupHandle::FromFile(client, filename.c_str()); }, - "Create a ServiceHandle instance", - "addresses"_a, - "provider_id"_a=0) - .def("make_service_group_handle", - [](const Client& client, - uint64_t gid, - uint16_t provider_id) { - return client.makeServiceGroupHandle(gid, provider_id); + "Create a GroupHandle instance", + "filename"_a) + .def("make_group_handle_from_serialized", + [](const flock::Client& client, + std::string_view serialized) { + return flock::GroupHandle::FromSerialized(client, serialized); }, - "Create a ServiceHandle instance", - "group_id"_a, - "provider_id"_a=0); - py11::class_(m, "ServiceHandle") - .def_property_readonly("client", &ServiceHandle::client) - .def_property_readonly("address", [](const ServiceHandle& sh) { - return static_cast(sh.providerHandle()); - }) - .def_property_readonly("provider_id", [](const ServiceHandle& sh) { - return sh.providerHandle().provider_id(); - }) - .def("get_config", - [](const ServiceHandle& sh) { - std::string config; - sh.getConfig(&config); - return config; - }) - .def("query_config", - [](const ServiceHandle& sh, const std::string& script) { - std::string result; - sh.queryConfig(script, &result); - return result; - }, "script"_a) - .def("load_module", - [](const ServiceHandle& sh, - const std::string& name, - const std::string& path) { - sh.loadModule(name, path); - }, "name"_a, "path"_a) - .def("add_provider", - [](const ServiceHandle& sh, - const std::string& description) { - uint16_t provider_id_out; - sh.addProvider(description, &provider_id_out); - return provider_id_out; - }, "description"_a) - .def("change_provider_pool", - [](const ServiceHandle& sh, - const std::string& provider_name, - const std::string& pool_name) { - sh.changeProviderPool(provider_name, pool_name); - }) - .def("add_client", - [](const ServiceHandle& sh, - const std::string& description) { - sh.addClient(description); - }, "description"_a) - .def("add_abtio_instance", - [](const ServiceHandle& sh, - const std::string& description) { - sh.addABTioInstance(description); - }, "description"_a = std::string("{}")) - .def("add_ssg_group", [](const ServiceHandle& sh, const std::string& config) { - sh.addSSGgroup(config); - }, - "description"_a) - .def("add_pool", [](const ServiceHandle& sh, const std::string& config) { - sh.addPool(config); - }, - "description"_a) - .def("remove_pool", [](const ServiceHandle& sh, const std::string& pool_name) { - sh.removePool(pool_name); - }, - "name"_a) - .def("add_xstream", [](const ServiceHandle& sh, const std::string& config) { - sh.addXstream(config); - }, - "description"_a) - .def("remove_xstream", [](const ServiceHandle& sh, const std::string& es_name) { - sh.removeXstream(es_name); - }, - "name"_a) + "Create a GroupHandle instance", + "serialized"_a) ; - py11::class_(m, "ServiceGroupHandle") - .def("refresh", &ServiceGroupHandle::refresh) - .def_property_readonly("size", &ServiceGroupHandle::size) - .def_property_readonly("client", &ServiceGroupHandle::client) - .def("__getitem__", &ServiceGroupHandle::operator[]) - .def("get_config", - [](const ServiceGroupHandle& sh) { - std::string config; - sh.getConfig(&config); - return config; - }) - .def("query_config", - [](const ServiceGroupHandle& sh, const std::string& script) { - std::string result; - sh.queryConfig(script, &result); - return result; - }, "script"_a) + py11::class_(m, "GroupHandle") + .def("update", &flock::GroupHandle::update) + .def_property_readonly("view", &flock::GroupHandle::view) ; -#endif } diff --git a/python/src/py-flock-common.cpp b/python/src/py-flock-common.cpp index bf3d090..850cb8d 100644 --- a/python/src/py-flock-common.cpp +++ b/python/src/py-flock-common.cpp @@ -23,19 +23,48 @@ PYBIND11_MODULE(pyflock_common, m) { py11::register_exception(m, "Exception", PyExc_RuntimeError); py11::class_(m, "Member") - .def_readonly("rank", &flock::GroupView::Member::rank) .def_readonly("provider_id", &flock::GroupView::Member::provider_id) .def_readonly("address", &flock::GroupView::Member::address); + py11::class_(m, "Metadata") + .def_readonly("key", &flock::GroupView::Metadata::key) + .def_readonly("value", &flock::GroupView::Metadata::value); + + py11::class_(m, "MembersProxy") + .def("__len__", &flock::GroupView::MembersProxy::count) + .def("count", &flock::GroupView::MembersProxy::count) + .def("add", &flock::GroupView::MembersProxy::add) + .def("remove", [](flock::GroupView::MembersProxy& proxy, size_t i) { + proxy.remove(i); + }) + .def("remove", [](flock::GroupView::MembersProxy& proxy, const char* address, uint16_t provider_id) { + proxy.remove(address, provider_id); + }) + .def("exists", &flock::GroupView::MembersProxy::exists) + .def("__getitem__", &flock::GroupView::MembersProxy::operator[]) + .def("__delitem__", [](flock::GroupView::MembersProxy& proxy, size_t i) { + proxy.remove(i); + }) + ; + + py11::class_(m, "MetadataProxy") + .def("__len__", &flock::GroupView::MetadataProxy::count) + .def("count", &flock::GroupView::MetadataProxy::count) + .def("add", &flock::GroupView::MetadataProxy::add) + .def("remove", &flock::GroupView::MetadataProxy::remove) + .def("__getitem__", [](flock::GroupView::MetadataProxy& proxy, const char* key) { + return proxy[key]; + }) + .def("__delitem__", &flock::GroupView::MetadataProxy::remove) + ; + py11::class_(m, "GroupView") .def(py11::init<>()) .def_property_readonly("digest", &flock::GroupView::digest) .def("clear", &flock::GroupView::clear) .def("lock", &flock::GroupView::lock) .def("unlock", &flock::GroupView::unlock) - .def("add_member", &flock::GroupView::addMember) - .def("remove_member", &flock::GroupView::removeMember) - .def("find_member", &flock::GroupView::findMember) - .def_property_readonly("members", &flock::GroupView::members) + .def_property_readonly("members", &flock::GroupView::members, py11::keep_alive<0, 1>()) + .def_property_readonly("metadata", &flock::GroupView::metadata, py11::keep_alive<0, 1>()) ; } diff --git a/python/src/py-flock-server.cpp b/python/src/py-flock-server.cpp index d5ae1c5..d02b111 100644 --- a/python/src/py-flock-server.cpp +++ b/python/src/py-flock-server.cpp @@ -5,6 +5,7 @@ */ #include #include +#include namespace py11 = pybind11; using namespace pybind11::literals; @@ -18,231 +19,10 @@ typedef py11::capsule pyhg_addr_t; #define MID2CAPSULE(__mid) py11::capsule((void*)(__mid), "margo_instance_id") #define CAPSULE2MID(__caps) (margo_instance_id)(__caps) -#define ADDR2CAPSULE(__addr) py11::capsule((void*)(__addr), "hg_addr_t") -#define CAPSULE2ADDR(__caps) (hg_add_t)(__caps) - PYBIND11_MODULE(pybedrock_server, m) { m.doc() = "Flock server python extension"; -#if 0 - py11::register_exception(m, "Exception", PyExc_RuntimeError); - - py11::class_> named_dep(m, "NamedDependency"); - named_dep - .def_property_readonly("name", - [](std::shared_ptr nd) { return nd->getName(); }) - .def_property_readonly("type", - [](std::shared_ptr nd) { return nd->getType(); }) - .def_property_readonly("handle", - [](std::shared_ptr nd) { return nd->getHandle(); }) - ; - - py11::class_> (m, "ProviderDependency", named_dep) - .def_property_readonly("provider_id", - [](std::shared_ptr nd) { return nd->getProviderID(); }) - ; - - py11::class_> server(m, "Server"); - - py11::enum_(server, "ConfigType") - .value("JSON", ConfigType::JSON) - .value("JX9", ConfigType::JX9) - .export_values(); - - server - .def(py11::init([](const std::string& address, const std::string& config, - ConfigType configType, - const Jx9ParamMap& jx9Params) { - return std::make_shared(address, config, configType, jx9Params); - }), "address"_a, "config"_a="{}", "config_type"_a=ConfigType::JSON, - "jx9_params"_a=Jx9ParamMap{}) - .def("wait_for_finalize", - [](std::shared_ptr server) { - server->waitForFinalize(); - }) - .def("finalize", - [](std::shared_ptr server) { - server->finalize(); - }) - .def_property_readonly("margo_instance_id", - [](std::shared_ptr server) { - return MID2CAPSULE(server->getMargoManager().getMargoInstance()); - }) - .def_property_readonly("config", - [](std::shared_ptr server) { - return server->getCurrentConfig(); - }) - .def_property_readonly("margo_manager", - [](std::shared_ptr server) { - return server->getMargoManager(); - }) - .def_property_readonly("abtio_manager", - [](std::shared_ptr server) { - return server->getABTioManager(); - }) - .def_property_readonly("provider_manager", - [](std::shared_ptr server) { - return server->getProviderManager(); - }) - .def_property_readonly("client_manager", - [](std::shared_ptr server) { - return server->getClientManager(); - }) - .def_property_readonly("ssg_manager", - [](std::shared_ptr server) { - return server->getSSGManager(); - }) - ; - - py11::class_ (m, "MargoManager") - .def_property_readonly("margo_instance_id", [](const MargoManager& m) { - return MID2CAPSULE(m.getMargoInstance()); - }) - .def_property_readonly("default_handler_pool", &MargoManager::getDefaultHandlerPool) - .def_property_readonly("config", &MargoManager::getCurrentConfig) - .def("get_pool", [](const MargoManager& m, const std::string& pool_name) { - return m.getPool(pool_name); - }, "name"_a) - .def("get_pool", [](const MargoManager& m, uint32_t index) { - return m.getPool(index); - }, "index"_a) - .def("add_pool", &MargoManager::addPool, - "config"_a) - .def("remove_pool", [](MargoManager& m, const std::string& pool_name) { - return m.removePool(pool_name); - }, "name"_a) - .def("remove_pool", [](MargoManager& m, uint32_t index) { - return m.removePool(index); - }, "index"_a) - .def_property_readonly("num_pools", &MargoManager::getNumPools) - .def("get_xstream", [](const MargoManager& m, const std::string& es_name) { - return m.getXstream(es_name); - }, "name"_a) - .def("get_xstream", [](const MargoManager& m, uint32_t index) { - return m.getXstream(index); - }, "index"_a) - .def("add_xstream", &MargoManager::addXstream, - "config"_a) - .def("remove_xstream", [](MargoManager& m, const std::string& es_name) { - return m.removeXstream(es_name); - }, "name"_a) - .def("remove_xstream", [](MargoManager& m, uint32_t index) { - return m.removeXstream(index); - }, "index"_a) - .def_property_readonly("num_xstreams", &MargoManager::getNumXstreams) - ; - - py11::class_ (m, "SSGManager") - .def_property_readonly("config", [](const SSGManager& manager) { - return manager.getCurrentConfig().dump(); - }) - .def_property_readonly("num_groups", &SSGManager::getNumGroups) - .def("get_group", [](const SSGManager& ssg, const std::string& name) { - return ssg.getGroup(name); - }, "name_a") - .def("get_group", [](const SSGManager& ssg, size_t index) { - return ssg.getGroup(index); - }, "index_a") - .def("add_group", - [](SSGManager& ssg, - const std::string& name, - const py11::dict& config, - const std::shared_ptr& pool, - const std::string& bootstrap_method, - const std::string& group_file, - int64_t credential) { -#ifdef ENABLE_SSG - ssg_group_config_t cfg = SSG_GROUP_CONFIG_INITIALIZER; - cfg.ssg_credential = credential; -#define GET_SSG_FIELD(__field__) do { \ - if(config.contains(#__field__)) \ - cfg.swim_##__field__ = config[#__field__].cast(); \ - } while(0) - GET_SSG_FIELD(period_length_ms); - GET_SSG_FIELD(suspect_timeout_periods); - GET_SSG_FIELD(subgroup_member_count); - GET_SSG_FIELD(disabled); -#undef GET_SSG_FIELD - return ssg.addGroup(name, cfg, pool, bootstrap_method, group_file); -#else - throw Exception{"Bedrock was not compiled with SSG support"}; -#endif - }, "name"_a, "swim"_a=py11::dict{}, - "pool"_a=nullptr, "bootstrap"_a="init", - "group_file"_a="", "credential"_a=-1) - .def("resolve_address", [](const SSGManager& ssg, const std::string& address) { - return ADDR2CAPSULE(ssg.resolveAddress(address)); - }, "address"_a) - ; - - py11::class_ (m, "ABTioManager") - .def_property_readonly("config", [](const ABTioManager& manager) { - return manager.getCurrentConfig().dump(); - }) - .def_property_readonly("num_abtio_instances", &ABTioManager::numABTioInstances) - .def("get_abtio_instance", [](const ABTioManager& abtio, const std::string& name) { - return abtio.getABTioInstance(name); - }, "name"_a) - .def("get_abtio_instance", [](const ABTioManager& abtio, size_t index) { - return abtio.getABTioInstance(index); - }, "index"_a) - .def("add_abtio_instance", &ABTioManager::addABTioInstance) - ; - - py11::class_ (m, "ProviderManager") - .def_property_readonly("config", &ProviderManager::getCurrentConfig) - .def_property_readonly("num_providers", &ProviderManager::numProviders) - .def("get_provider", [](const ProviderManager& pm, size_t index) { - return pm.getProvider(index); - }, "index"_a) - .def("get_provider", [](const ProviderManager& pm, const std::string& name) { - return pm.getProvider(name); - }, "name"_a) - .def("lookup_provider", &ProviderManager::lookupProvider, - "spec"_a) - .def("deregister_provider", - &ProviderManager::deregisterProvider, - "spec"_a) - .def("add_provider", - &ProviderManager::addProviderFromJSON, - "description"_a) - .def("change_pool", - &ProviderManager::changeProviderPool, - "provider"_a, "pool"_a) - .def("migrate_provider", - &ProviderManager::migrateProvider, - "provider"_a, "dest_addr"_a, "dest_provider_id"_a, - "migration_config"_a, "remove_source"_a) - .def("snapshot_provider", - &ProviderManager::snapshotProvider, - "provider"_a, "dest_path"_a, "snapshot_config"_a, "remove_source"_a) - .def("restore_provider", - &ProviderManager::restoreProvider, - "provider"_a, "src_path"_a, "restore_config"_a) - ; - py11::class_ (m, "ClientManager") - .def_property_readonly("config", &ClientManager::getCurrentConfig) - .def("get_client", [](const ClientManager& cm, const std::string& name) { - return cm.getClient(name); - }, - "name"_a) - .def("get_client", [](const ClientManager& cm, size_t index) { - return cm.getClient(index); - }, - "index"_a) - .def_property_readonly("num_clients", &ClientManager::numClients) - .def("remove_client", [](ClientManager& cm, const std::string& name) { - return cm.removeClient(name); - }, - "name"_a) - .def("remove_client", [](ClientManager& cm, size_t index) { - return cm.removeClient(index); - }, - "index"_a) - .def("get_client_or_create", &ClientManager::getOrCreateAnonymous, - "type"_a) - .def("add_client", &ClientManager::addClientFromJSON, - "description"_a) + py11::class_(m, "Provider") + .def(py11::init()) ; -#endif } diff --git a/spack.yaml b/spack.yaml index a0d2189..0f9d184 100644 --- a/spack.yaml +++ b/spack.yaml @@ -3,6 +3,7 @@ spack: - cmake - pkgconfig - mochi-margo + - mochi-thallium - json-c - mochi-bedrock - py-pybind11 diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 8524c2c..9396032 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -76,6 +76,12 @@ set_target_properties (flock-bedrock-module SOVERSION ${FLOCK_VERSION_MAJOR}) endif () +add_library (flock-cxx-headers INTERFACE) +add_library (flock::headers ALIAS flock-cxx-headers) +target_link_libraries (flock-cxx-headers + INTERFACE PkgConfig::margo thallium flock-client flock-server) +target_include_directories (flock-cxx-headers INTERFACE $) + # installation stuff (packaging and install commands) write_basic_package_version_file ( "flock-config-version.cmake" @@ -101,7 +107,7 @@ configure_file ("flock-client.pc.in" "flock-client.pc" @ONLY) configure_file ("config.h.in" "config.h" @ONLY) # "make install" rules -install (TARGETS flock-server flock-client +install (TARGETS flock-server flock-client flock-cxx-headers EXPORT flock-targets ARCHIVE DESTINATION lib LIBRARY DESTINATION lib) diff --git a/tests/spack.yaml b/tests/spack.yaml index 74adbb7..de35b49 100644 --- a/tests/spack.yaml +++ b/tests/spack.yaml @@ -3,6 +3,7 @@ spack: - cmake - pkgconfig - mochi-margo ^mercury~boostsys~checksum ^libfabric fabrics=tcp,rxm + - mochi-thallium - json-c - mochi-bedrock - mpich From fc55d5a99732dbf5403f900dc69eee7cf619da6d Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Fri, 7 Jun 2024 16:25:32 +0100 Subject: [PATCH 04/10] added better python APIs --- python/mochi/flock/client.py | 81 ++++++++++++++++++++++++++++++++++++ python/mochi/flock/common.py | 18 ++++++++ python/mochi/flock/server.py | 26 ++++++++++++ python/mochi/flock/view.py | 18 ++++++++ 4 files changed, 143 insertions(+) create mode 100644 python/mochi/flock/client.py create mode 100644 python/mochi/flock/common.py create mode 100644 python/mochi/flock/server.py create mode 100644 python/mochi/flock/view.py diff --git a/python/mochi/flock/client.py b/python/mochi/flock/client.py new file mode 100644 index 0000000..c18e682 --- /dev/null +++ b/python/mochi/flock/client.py @@ -0,0 +1,81 @@ +# (C) 2024 The University of Chicago +# See COPYRIGHT in top-level directory. + + +""" +.. module:: client + :synopsis: This package provides access to the Flock C++ wrapper + +.. moduleauthor:: Matthieu Dorier + + +""" + + +import pyflock_common +import pyflock_client +import pymargo.core +import pymargo + + +class GroupHandle: + + def __init__(self, internal, client): + self._internal = internal + self._client = client + + @property + def client(self): + return self._client + + def update(self): + self._internal.update() + + def view(self): + return self._internal.view + + +class Client: + + def __init__(self, arg): + if isinstance(arg, pymargo.core.Engine): + self._engine = arg + self._owns_engine = False + elif isinstance(arg, str): + self._engine = pymargo.core.Engine(arg, pymargo.client) + self._owns_engine = True + else: + raise TypeError(f'Invalid argument type {type(arg)}') + self._internal = pyflock_client.Client(self._engine.get_internal_mid()) + + def __del__(self): + del self._internal + if self._owns_engine: + self._engine.finalize() + del self._engine + + @property + def mid(self): + return self._internal.margo_instance_id + + @property + def engine(self): + return self._engine + + def make_group_handle(self, address: str|pymargo.core.Address, provider_id: int = 0): + if isinstance(address, pymargo.core.Address): + address = str(address) + return GroupHandle( + self._internal.make_service_handle(address=address, provider_id=provider_id), + self) + + def make_group_handle_from_file(self, filename: str): + return GroupHandle( + self._internal.make_group_handle_from_file(filename=filename), + self) + + def make_group_handle_from_serialized(self, serialized: str): + return GroupHandle( + self._internal.make_group_handle_from_serialized(serialized=serialized), + self) + diff --git a/python/mochi/flock/common.py b/python/mochi/flock/common.py new file mode 100644 index 0000000..5563936 --- /dev/null +++ b/python/mochi/flock/common.py @@ -0,0 +1,18 @@ +# (C) 2024 The University of Chicago +# See COPYRIGHT in top-level directory. + + +""" +.. module:: view + :synopsis: This package provides access to the Flock C++ wrapper + +.. moduleauthor:: Matthieu Dorier + + +""" + + +import pyflock_common + + +FlockException = pyflock_common.Exception diff --git a/python/mochi/flock/server.py b/python/mochi/flock/server.py new file mode 100644 index 0000000..5eac9d6 --- /dev/null +++ b/python/mochi/flock/server.py @@ -0,0 +1,26 @@ +# (C) 2024 The University of Chicago +# See COPYRIGHT in top-level directory. + + +""" +.. module:: server + :synopsis: This package provides access to the Flock C++ wrapper + +.. moduleauthor:: Matthieu Dorier + + +""" + + +import pyflock_common +from .view import GroupView +import pyflock_server +import pymargo.core +import pymargo + + +class Provider: + + def __init__(self, engine: pymargo.core.Engine, provider_id: int, config: str, initial_view: GroupView): + self._internal = pyflock_server.Provider( + self._engine.get_internal_mid(), provider_id, config, initial_view) diff --git a/python/mochi/flock/view.py b/python/mochi/flock/view.py new file mode 100644 index 0000000..bc261bc --- /dev/null +++ b/python/mochi/flock/view.py @@ -0,0 +1,18 @@ +# (C) 2024 The University of Chicago +# See COPYRIGHT in top-level directory. + + +""" +.. module:: view + :synopsis: This package provides access to the Flock C++ wrapper + +.. moduleauthor:: Matthieu Dorier + + +""" + + +import pyflock_common + + +GroupView = pyflock_common.GroupView From a89ce9ed5636829ff15d67027840c25d51dd1806 Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Mon, 10 Jun 2024 09:11:27 +0100 Subject: [PATCH 05/10] started adding unit tests for python module --- python/mochi/flock/__init__.py | 0 python/mochi/flock/test_client.py | 17 +++++++++++++++++ tests/CMakeLists.txt | 25 +++++++++++++++++++++++++ tests/spack.yaml | 1 + 4 files changed, 43 insertions(+) create mode 100644 python/mochi/flock/__init__.py create mode 100644 python/mochi/flock/test_client.py diff --git a/python/mochi/flock/__init__.py b/python/mochi/flock/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/mochi/flock/test_client.py b/python/mochi/flock/test_client.py new file mode 100644 index 0000000..4535fb5 --- /dev/null +++ b/python/mochi/flock/test_client.py @@ -0,0 +1,17 @@ +import unittest +import mochi.flock.client as mfc +from pymargo.core import Engine + +class TestClient(unittest.TestCase): + + def test_init_client_from_address(self): + client = mfc.Client("na+sm") + + def test_init_client_from_engine(self): + with Engine("na+sm") as engine: + client = mfc.Client(engine) + del client + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index d1417a5..8e7a9fa 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -15,3 +15,28 @@ foreach (test-source ${test-sources}) add_test (NAME ${name} COMMAND ./${name}) endif () endforeach () + +if (ENABLE_PYTHON) + # Set the path to the python directory + set (PYTHON_MODULE_DIR ${CMAKE_SOURCE_DIR}/python) + # Use file(GLOB_RECURSE ...) to find all files matching the test_*.py pattern + file (GLOB_RECURSE PYTHON_TEST_FILES "${PYTHON_MODULE_DIR}/test_*.py") + + foreach (PYTHON_TEST_FILE ${PYTHON_TEST_FILES}) + # Remove the directory part + file (RELATIVE_PATH PYTHON_TEST_FILE_REL ${PYTHON_MODULE_DIR} ${PYTHON_TEST_FILE}) + # Remove the file extension + string (REPLACE ".py" "" PYTHON_TEST_FILE_BASE ${PYTHON_TEST_FILE_REL}) + # Replace slashes with dots + string (REPLACE "/" "." PYTHON_TEST_NAME ${PYTHON_TEST_FILE_BASE}) + # Add the test + if (${ENABLE_COVERAGE}) + message (STATUS "${PYTHON_TEST_NAME} test will run with code coverage") + add_test (NAME ${PYTHON_TEST_NAME} COMMAND coverage run -a -m unittest ${PYTHON_TEST_NAME}) + else () + add_test (NAME ${PYTHON_TEST_NAME} COMMAND python -m unittest ${PYTHON_TEST_NAME}) + endif () + set_property (TEST ${PYTHON_TEST_NAME} PROPERTY ENVIRONMENT + PYTHONPATH=${CMAKE_SOURCE_DIR}/python/:${CMAKE_BINARY_DIR}/python:$ENV{PYTHONPATH}) + endforeach () +endif () diff --git a/tests/spack.yaml b/tests/spack.yaml index de35b49..3477a46 100644 --- a/tests/spack.yaml +++ b/tests/spack.yaml @@ -9,6 +9,7 @@ spack: - mpich - py-pybind11 - py-mochi-margo + - py-coverage concretizer: unify: true reuse: true From 3fd77d7611deaa64d722b5bf6b55336d85f3f1af Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Mon, 10 Jun 2024 12:31:58 +0100 Subject: [PATCH 06/10] Added python tests for views --- python/mochi/flock/test_view.py | 75 +++++++++++++++++++++++++++++++++ python/src/py-flock-common.cpp | 37 ++++++++++------ 2 files changed, 98 insertions(+), 14 deletions(-) create mode 100644 python/mochi/flock/test_view.py diff --git a/python/mochi/flock/test_view.py b/python/mochi/flock/test_view.py new file mode 100644 index 0000000..c9e0c16 --- /dev/null +++ b/python/mochi/flock/test_view.py @@ -0,0 +1,75 @@ +import unittest +from mochi.flock.view import GroupView + + +class TestGroupView(unittest.TestCase): + + def test_init(self): + view = GroupView() + + def test_members(self): + view = GroupView() + # add 5 members + for i in range(0, 5): + view.members.add( + address=f"address{i}", + provider_id=i) + # check that the count is now 5 + self.assertEqual(len(view.members), 5) + self.assertEqual(view.members.count, 5) + # check the members + for i in range(0, 5): + assert view.members.exists(f"address{i}", i) + assert not view.members.exists(f"address{i+1}", i) + assert not view.members.exists(f"address{i}", i+1) + self.assertEqual(view.members[i].address, f"address{i}") + self.assertEqual(view.members[i].provider_id, i) + # erase member 3 via __delitem__ + del view.members[3] + self.assertEqual(view.members.count, 4) + assert not view.members.exists("address3", 3) + # erase member 2 via remove(index) + view.members.remove(2) + self.assertEqual(view.members.count, 3) + assert not view.members.exists("address2", 2) + # erase member 1 via remove(address, provider_id) + view.members.remove("address1", 1) + self.assertEqual(view.members.count, 2) + assert not view.members.exists("address1", 1) + + def test_metadata(self): + view = GroupView() + # add 5 metadata + for i in range(0, 5): + view.metadata.add( + key=f"key{i}", + value=f"value{i}") + # check that the count is now 5 + self.assertEqual(len(view.metadata), 5) + self.assertEqual(view.metadata.count, 5) + # check the metadata + for i in range(0, 5): + self.assertEqual(view.metadata[f"key{i}"], f"value{i}") + self.assertEqual(view.metadata[i].key, f"key{i}") + self.assertEqual(view.metadata[i].value, f"value{i}") + # erase key3 via __delitem__ + del view.metadata["key3"] + self.assertEqual(view.metadata.count, 4) + self.assertIsNone(view.metadata["key3"]) + # erase key2 via remove(key) + view.metadata.remove("key2") + self.assertEqual(view.metadata.count, 3) + self.assertIsNone(view.metadata["key2"]) + + def test_digest(self): + view = GroupView() + self.assertEqual(view.digest, 0) + view.members.add("address", 1) + self.assertNotEqual(view.digest, 0) + d = view.digest + view.metadata.add("key", "value") + self.assertNotEqual(view.digest, 0) + self.assertNotEqual(view.digest, d) + +if __name__ == '__main__': + unittest.main() diff --git a/python/src/py-flock-common.cpp b/python/src/py-flock-common.cpp index 850cb8d..89d09ab 100644 --- a/python/src/py-flock-common.cpp +++ b/python/src/py-flock-common.cpp @@ -32,30 +32,39 @@ PYBIND11_MODULE(pyflock_common, m) { py11::class_(m, "MembersProxy") .def("__len__", &flock::GroupView::MembersProxy::count) - .def("count", &flock::GroupView::MembersProxy::count) - .def("add", &flock::GroupView::MembersProxy::add) + .def_property_readonly("count", &flock::GroupView::MembersProxy::count) + .def("add", &flock::GroupView::MembersProxy::add, + "address"_a, "provider_id"_a) .def("remove", [](flock::GroupView::MembersProxy& proxy, size_t i) { proxy.remove(i); - }) + }, "index"_a) .def("remove", [](flock::GroupView::MembersProxy& proxy, const char* address, uint16_t provider_id) { proxy.remove(address, provider_id); - }) - .def("exists", &flock::GroupView::MembersProxy::exists) - .def("__getitem__", &flock::GroupView::MembersProxy::operator[]) + }, "address"_a, "provider_id"_a) + .def("exists", &flock::GroupView::MembersProxy::exists, + "address"_a, "provider_id"_a) + .def("__getitem__", &flock::GroupView::MembersProxy::operator[], + "index"_a) .def("__delitem__", [](flock::GroupView::MembersProxy& proxy, size_t i) { proxy.remove(i); - }) + }, "index"_a) ; py11::class_(m, "MetadataProxy") .def("__len__", &flock::GroupView::MetadataProxy::count) - .def("count", &flock::GroupView::MetadataProxy::count) - .def("add", &flock::GroupView::MetadataProxy::add) - .def("remove", &flock::GroupView::MetadataProxy::remove) - .def("__getitem__", [](flock::GroupView::MetadataProxy& proxy, const char* key) { - return proxy[key]; - }) - .def("__delitem__", &flock::GroupView::MetadataProxy::remove) + .def_property_readonly("count", &flock::GroupView::MetadataProxy::count) + .def("add", &flock::GroupView::MetadataProxy::add, + "key"_a, "value"_a) + .def("remove", &flock::GroupView::MetadataProxy::remove, + "key"_a) + .def("__getitem__", [](flock::GroupView::MetadataProxy& proxy, const std::string& key) { + return proxy[key.c_str()]; + }, "key"_a) + .def("__getitem__", [](flock::GroupView::MetadataProxy& proxy, size_t index) { + return proxy[index]; + }, "index"_a) + .def("__delitem__", &flock::GroupView::MetadataProxy::remove, + "key"_a) ; py11::class_(m, "GroupView") From 523802c08c9353e8f0add872fd1f7dcb7aea82eb Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Mon, 10 Jun 2024 13:30:09 +0100 Subject: [PATCH 07/10] added python tests for provider --- python/mochi/flock/server.py | 2 +- python/mochi/flock/test_client.py | 1 + python/mochi/flock/test_provider.py | 26 ++++++++++++++++++++++++++ python/src/py-flock-server.cpp | 2 +- 4 files changed, 29 insertions(+), 2 deletions(-) create mode 100644 python/mochi/flock/test_provider.py diff --git a/python/mochi/flock/server.py b/python/mochi/flock/server.py index 5eac9d6..0d60f39 100644 --- a/python/mochi/flock/server.py +++ b/python/mochi/flock/server.py @@ -23,4 +23,4 @@ class Provider: def __init__(self, engine: pymargo.core.Engine, provider_id: int, config: str, initial_view: GroupView): self._internal = pyflock_server.Provider( - self._engine.get_internal_mid(), provider_id, config, initial_view) + engine.get_internal_mid(), provider_id, config, initial_view) diff --git a/python/mochi/flock/test_client.py b/python/mochi/flock/test_client.py index 4535fb5..6f316cf 100644 --- a/python/mochi/flock/test_client.py +++ b/python/mochi/flock/test_client.py @@ -2,6 +2,7 @@ import mochi.flock.client as mfc from pymargo.core import Engine + class TestClient(unittest.TestCase): def test_init_client_from_address(self): diff --git a/python/mochi/flock/test_provider.py b/python/mochi/flock/test_provider.py new file mode 100644 index 0000000..d6316bb --- /dev/null +++ b/python/mochi/flock/test_provider.py @@ -0,0 +1,26 @@ +import unittest +import json +import mochi.flock.server as mfs +import mochi.flock.view as view +import pymargo.core + +class TestClient(unittest.TestCase): + + def test_init_provider(self): + with pymargo.core.Engine("na+sm", pymargo.core.server) as engine: + address = str(engine.address) + config = { + "group": { + "type": "static", + "config": {} + } + } + initial_view = view.GroupView() + initial_view.members.add(address, 42) + provider = mfs.Provider(engine, 42, json.dumps(config), initial_view) + del provider + engine.finalize() + + +if __name__ == '__main__': + unittest.main() diff --git a/python/src/py-flock-server.cpp b/python/src/py-flock-server.cpp index d02b111..d518cb6 100644 --- a/python/src/py-flock-server.cpp +++ b/python/src/py-flock-server.cpp @@ -19,7 +19,7 @@ typedef py11::capsule pyhg_addr_t; #define MID2CAPSULE(__mid) py11::capsule((void*)(__mid), "margo_instance_id") #define CAPSULE2MID(__caps) (margo_instance_id)(__caps) -PYBIND11_MODULE(pybedrock_server, m) { +PYBIND11_MODULE(pyflock_server, m) { m.doc() = "Flock server python extension"; py11::class_(m, "Provider") From 7d7e95c0152eeff8610c94af1855717f9b4bf921 Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Mon, 10 Jun 2024 13:49:14 +0100 Subject: [PATCH 08/10] enabled python in github workflows --- .github/workflows/codecov.yml | 1 + .github/workflows/test.yml | 1 + 2 files changed, 2 insertions(+) diff --git a/.github/workflows/codecov.yml b/.github/workflows/codecov.yml index 61fd6b2..2dfcda3 100644 --- a/.github/workflows/codecov.yml +++ b/.github/workflows/codecov.yml @@ -44,6 +44,7 @@ jobs: -DENABLE_EXAMPLES=ON \ -DENABLE_BEDROCK=ON \ -DENABLE_MPI=ON \ + -DENABLE_PYTHON=ON \ -DCMAKE_BUILD_TYPE=Debug make make test diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 8886278..9490430 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -52,6 +52,7 @@ jobs: -DENABLE_EXAMPLES=ON \ -DENABLE_BEDROCK=ON \ -DENABLE_MPI=ON \ + -DENABLE_PYTHON=ON \ -DCMAKE_BUILD_TYPE=RelWithDebInfo make make test From d577b925d716992b747200438e5ff17ba9b566c7 Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Mon, 10 Jun 2024 14:41:00 +0100 Subject: [PATCH 09/10] added __str__ for GroupView --- python/mochi/flock/test_view.py | 28 ++++++++++++++++++++++++++++ python/src/py-flock-common.cpp | 3 +++ 2 files changed, 31 insertions(+) diff --git a/python/mochi/flock/test_view.py b/python/mochi/flock/test_view.py index c9e0c16..07af987 100644 --- a/python/mochi/flock/test_view.py +++ b/python/mochi/flock/test_view.py @@ -1,4 +1,5 @@ import unittest +import json from mochi.flock.view import GroupView @@ -61,6 +62,33 @@ def test_metadata(self): self.assertEqual(view.metadata.count, 3) self.assertIsNone(view.metadata["key2"]) + def test_str(self): + view = GroupView() + # add 5 metadata + for i in range(0, 5): + view.metadata.add( + key=f"key{i}", + value=f"value{i}") + view = GroupView() + # add 5 members + for i in range(0, 5): + view.members.add( + address=f"address{i}", + provider_id=i) + v = json.loads(str(view)) + self.assertIn("members", v) + self.assertIn("metadata", v) + self.assertIsInstance(v["members"], list) + self.assertIsInstance(v["metadata"], dict) + for i, member in enumerate(v["members"]): + self.assertEqual(member["address"], f"address{i}") + self.assertEqual(member["provider_id"], i) + i = 0 + for key, val in v["metadata"]: + self.assertEqual(key, f"key{i}") + self.assertEqual(val, f"value{i}") + i = i + 1 + def test_digest(self): view = GroupView() self.assertEqual(view.digest, 0) diff --git a/python/src/py-flock-common.cpp b/python/src/py-flock-common.cpp index 89d09ab..8d1b93d 100644 --- a/python/src/py-flock-common.cpp +++ b/python/src/py-flock-common.cpp @@ -75,5 +75,8 @@ PYBIND11_MODULE(pyflock_common, m) { .def("unlock", &flock::GroupView::unlock) .def_property_readonly("members", &flock::GroupView::members, py11::keep_alive<0, 1>()) .def_property_readonly("metadata", &flock::GroupView::metadata, py11::keep_alive<0, 1>()) + .def("__str__", [](const flock::GroupView& gv) { + return static_cast(gv); + }) ; } From 231876aa1b54726702e6fb59808f66733ee2e543 Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Mon, 10 Jun 2024 15:27:57 +0100 Subject: [PATCH 10/10] polished python API and added tests --- include/flock/cxx/group-view.hpp | 13 ++++++ python/mochi/flock/client.py | 3 +- python/mochi/flock/test_group_handle.py | 53 +++++++++++++++++++++++++ python/src/py-flock-common.cpp | 1 + src/provider.c | 19 ++++----- 5 files changed, 79 insertions(+), 10 deletions(-) create mode 100644 python/mochi/flock/test_group_handle.py diff --git a/include/flock/cxx/group-view.hpp b/include/flock/cxx/group-view.hpp index 07e91d1..44a9c8d 100644 --- a/include/flock/cxx/group-view.hpp +++ b/include/flock/cxx/group-view.hpp @@ -167,6 +167,19 @@ class GroupView { return MetadataProxy{*this}; } + auto copy() const { + auto result = GroupView{}; + auto members = const_cast(this)->members(); + for(size_t i = 0; i < members.count(); ++i) { + result.members().add(members[i].address.c_str(), members[i].provider_id); + } + auto metadata = const_cast(this)->metadata(); + for(size_t i = 0; i < metadata.count(); ++i) { + result.metadata().add(metadata[i].key.c_str(), metadata[i].value.c_str()); + } + return result; + } + operator std::string() const { std::string result; flock_return_t ret = flock_group_view_serialize( diff --git a/python/mochi/flock/client.py b/python/mochi/flock/client.py index c18e682..05170a5 100644 --- a/python/mochi/flock/client.py +++ b/python/mochi/flock/client.py @@ -31,6 +31,7 @@ def client(self): def update(self): self._internal.update() + @property def view(self): return self._internal.view @@ -66,7 +67,7 @@ def make_group_handle(self, address: str|pymargo.core.Address, provider_id: int if isinstance(address, pymargo.core.Address): address = str(address) return GroupHandle( - self._internal.make_service_handle(address=address, provider_id=provider_id), + self._internal.make_group_handle(address=address, provider_id=provider_id), self) def make_group_handle_from_file(self, filename: str): diff --git a/python/mochi/flock/test_group_handle.py b/python/mochi/flock/test_group_handle.py new file mode 100644 index 0000000..d10850c --- /dev/null +++ b/python/mochi/flock/test_group_handle.py @@ -0,0 +1,53 @@ +import unittest +import json +import mochi.flock.client as mfc +import mochi.flock.server as mfs +from mochi.flock.view import GroupView +import pymargo.core + +class TestClient(unittest.TestCase): + + def setUp(self): + self.engine = pymargo.core.Engine("na+sm", pymargo.core.server) + self.address = str(self.engine.address) + config = { + "group": { + "type": "static", + "config": {} + } + } + self.initial_view = GroupView() + for i in range(0, 5): + self.initial_view.members.add(self.address, i) + self.initial_view.metadata.add("mykey", "myvalue") + self.providers = [] + for i in range(0, 5): + self.providers.append( + mfs.Provider(self.engine, i, json.dumps(config), self.initial_view.copy())) + self.client = mfc.Client(self.engine) + + def tearDown(self): + del self.client + del self.providers + self.engine.finalize() + del self.engine + + def test_view(self): + gh = self.client.make_group_handle(self.address, 3) + gh.update() + view = gh.view + self.assertIsInstance(view, GroupView) + self.assertEqual(len(view.members), 5) + for i in range(0, 5): + self.assertEqual(view.members[i].address, self.address) + self.assertEqual(view.members[i].provider_id, i) + self.assertEqual(view.metadata["mykey"], "myvalue") + print(view) + + def test_update(self): + gh = self.client.make_group_handle(self.address, 3) + gh.update() + + +if __name__ == '__main__': + unittest.main() diff --git a/python/src/py-flock-common.cpp b/python/src/py-flock-common.cpp index 8d1b93d..1b3a8a7 100644 --- a/python/src/py-flock-common.cpp +++ b/python/src/py-flock-common.cpp @@ -78,5 +78,6 @@ PYBIND11_MODULE(pyflock_common, m) { .def("__str__", [](const flock::GroupView& gv) { return static_cast(gv); }) + .def("copy", &flock::GroupView::copy) ; } diff --git a/src/provider.c b/src/provider.c index 53e2d04..f9f30c0 100644 --- a/src/provider.c +++ b/src/provider.c @@ -74,7 +74,8 @@ flock_return_t flock_provider_register( .initial_view = FLOCK_GROUP_VIEW_INITIALIZER, .callback_context = NULL, .member_update_callback = dispatch_member_update, - .metadata_update_callback = dispatch_metadata_update + .metadata_update_callback = dispatch_metadata_update, + .join = false }; FLOCK_GROUP_VIEW_MOVE(a.initial_view, &backend_init_args.initial_view); @@ -221,17 +222,17 @@ flock_return_t flock_provider_register( goto finish; // LCOV_EXCL_STOP } - backend_init_args.join = true; bool is_first = false; - for(size_t i = 0; i < backend_init_args.initial_view.members.size; ++i) { - flock_member_t* member = &backend_init_args.initial_view.members.data[i]; - if(member->provider_id != provider_id) continue; - if(strcmp(member->address, self_addr_str) != 0) continue; - backend_init_args.join = false; - is_first = i == 0; - break; + flock_member_t* mem = flock_group_view_find_member( + &backend_init_args.initial_view, self_addr_str, provider_id); + if(mem) { + if(mem == backend_init_args.initial_view.members.data) + is_first = true; + } else { + backend_init_args.join = true; } + /* create the new group's context */ void* context = NULL; ret = a.backend->init_group(&backend_init_args, &context);