diff --git a/examples/client.c b/examples/client.c index 05a3ebb..f47295e 100644 --- a/examples/client.c +++ b/examples/client.c @@ -43,7 +43,7 @@ int main(int argc, char** argv) flock_group_handle_t flock_rh; margo_info(mid, "Creating FLOCK client"); - ret = flock_client_init(mid, &flock_clt); + ret = flock_client_init(mid, ABT_POOL_NULL, &flock_clt); if(ret != FLOCK_SUCCESS) { FATAL(mid,"flock_client_init failed (ret = %d)", ret); } @@ -54,14 +54,6 @@ int main(int argc, char** argv) FATAL(mid,"flock_group_handle_create failed (ret = %d)", ret); } - margo_info(mid, "Computing sum"); - int32_t result; - ret = flock_compute_sum(flock_rh, 45, 23, &result); - if(ret != FLOCK_SUCCESS) { - FATAL(mid,"flock_compute_sum failed (ret = %d)", ret); - } - margo_info(mid, "45 + 23 = %d", result); - margo_info(mid, "Releasing group handle"); ret = flock_group_handle_release(flock_rh); if(ret != FLOCK_SUCCESS) { diff --git a/include/flock/flock-client.h b/include/flock/flock-client.h index 67f18f7..3d6011b 100644 --- a/include/flock/flock-client.h +++ b/include/flock/flock-client.h @@ -20,11 +20,15 @@ typedef struct flock_client* flock_client_t; * @brief Creates a FLOCK client. * * @param[in] mid Margo instance + * @param[in] pool Pool in which to run operations such as updates * @param[out] client FLOCK client * * @return FLOCK_SUCCESS or error code defined in flock-common.h */ -flock_return_t flock_client_init(margo_instance_id mid, flock_client_t* client); +flock_return_t flock_client_init( + margo_instance_id mid, + ABT_pool pool, + flock_client_t* client); /** * @brief Finalizes a FLOCK client. diff --git a/include/flock/flock-common.h b/include/flock/flock-common.h index a614fd0..1b0f4df 100644 --- a/include/flock/flock-common.h +++ b/include/flock/flock-common.h @@ -7,11 +7,21 @@ #define __FLOCK_COMMON_H #include +#include #ifdef __cplusplus extern "C" { #endif +typedef enum flock_update_t { + FLOCK_MEMBER_JOINED, + FLOCK_MEMBER_LEFT, + FLOCK_MEMBER_DIED, +} flock_update_t; + +#define FLOCK_MODE_INIT_UPDATE 0x1 /* Update the group on initialization */ +#define FLOCK_MODE_SUBSCRIBE 0x2 /* Subscribe to the group on initialization */ + /** * @brief Error codes that can be returned by FLOCK functions. */ @@ -20,18 +30,42 @@ typedef enum flock_return_t { FLOCK_ERR_ALLOCATION, /* Allocation error */ FLOCK_ERR_INVALID_ARGS, /* Invalid argument */ FLOCK_ERR_INVALID_PROVIDER, /* Invalid provider id */ - FLOCK_ERR_INVALID_GROUP, /* Invalid group id */ + FLOCK_ERR_INVALID_GROUP, /* Invalid group id */ FLOCK_ERR_INVALID_BACKEND, /* Invalid backend type */ FLOCK_ERR_INVALID_CONFIG, /* Invalid configuration */ - FLOCK_ERR_INVALID_TOKEN, /* Invalid token */ FLOCK_ERR_FROM_MERCURY, /* Mercurt error */ FLOCK_ERR_FROM_ARGOBOTS, /* Argobots error */ FLOCK_ERR_OP_UNSUPPORTED, /* Unsupported operation */ FLOCK_ERR_OP_FORBIDDEN, /* Forbidden operation */ + FLOCK_ERR_NO_MEMBER, /* No member at this rank */ + FLOCK_ERR_NO_METADATA, /* Invalid metadata key */ /* ... TODO add more error codes here if needed */ FLOCK_ERR_OTHER /* Other error */ } flock_return_t; +/** + * @brief Type of function called when a member joins, leaves, or dies. + * + * @param void* User-provided context + * @param flock_update_t Update type + * @param size_t Rank of the member + * @param hg_addr_t Address of the member + * @param uint16_t Provider ID of the member + */ +typedef void (*flock_membership_update_fn)(void*, flock_update_t, size_t, hg_addr_t, uint16_t); + +/** + * @brief Type of function called when a key/value pair in the metadata of + * a group is updated. + * + * @param void* User-provided context + * @param const char* Metadata key + * @param size_t Size of the metadata key + * @param const char* Metadata value + * @param size_t Size of the metadata value + */ +typedef void (*flock_metadata_update_fn)(void*, const char*, size_t, const char*, size_t); + #ifdef __cplusplus } #endif diff --git a/include/flock/flock-group.h b/include/flock/flock-group.h index 92bd795..4c34ef4 100644 --- a/include/flock/flock-group.h +++ b/include/flock/flock-group.h @@ -18,12 +18,18 @@ typedef struct flock_group_handle *flock_group_handle_t; #define FLOCK_GROUP_HANDLE_NULL ((flock_group_handle_t)NULL) /** - * @brief Creates a FLOCK group handle. + * @brief Handle for a non-blocking request. + */ +typedef struct flock_request* flock_request_t; + +/** + * @brief Creates a FLOCK group handle by contacting the group member + * specified by the given address and provider ID. * * @param[in] client FLOCK client responsible for the group handle * @param[in] addr Mercury address of the provider * @param[in] provider_id id of the provider - * @param[in] check If true, will send an RPC to check that the provider exists + * @param[in] mode Optional mode * @param[out] handle group handle * * @return FLOCK_SUCCESS or error code defined in flock-common.h @@ -32,7 +38,43 @@ flock_return_t flock_group_handle_create( flock_client_t client, hg_addr_t addr, uint16_t provider_id, - bool check, + uint32_t mode, + flock_group_handle_t* handle); + +/** + * @brief Creates a FLOCK group handle by reading the specified file. + * + * @param[in] client FLOCK client responsible for the group handle + * @param[in] filename File name of the group file + * @param[in] mode Optional mode + * @param[out] handle group handle + * + * @return FLOCK_SUCCESS or error code defined in flock-common.h + */ +flock_return_t flock_group_handle_create_from_file( + flock_client_t client, + const char* filename, + uint32_t mode, + flock_group_handle_t* handle); + +/** + * @brief Creates a FLOCK group handle from a serialized view. + * This serialized view must have been generated using + * flock_group_handle_serialize. + * + * @param[in] client FLOCK client responsible for the group handle + * @param[in] addr Mercury address of the provider + * @param[in] provider_id id of the provider + * @param[in] mode Optional mode + * @param[out] handle group handle + * + * @return FLOCK_SUCCESS or error code defined in flock-common.h + */ +flock_return_t flock_group_handle_create_from_serialized( + flock_client_t client, + const char* serialized_view, + size_t view_size, + uint32_t mode, flock_group_handle_t* handle); /** @@ -57,21 +99,238 @@ flock_return_t flock_group_handle_ref_incr( flock_return_t flock_group_handle_release(flock_group_handle_t handle); /** - * @brief Makes the target FLOCK group compute the sum of the - * two numbers and return the result. + * @brief Serialize the current group handle and pass the resulting + * string representation to the serializer function pointer. + * + * @param handle Group handle + * @param serializer Serializer function + * @param context Context to pass as first argument of the serializer function + * + * @return FLOCK_SUCCESS or error code defined in flock-common.h + */ +flock_return_t flock_group_serialize( + flock_group_handle_t handle, + void (*serializer)(void*, const char*, size_t), + void* context); + +/** + * @brief Get the size of the group. + * + * @warning The size of the group is NOT the current number of processes, + * it is N where N-1 is the maximum rank that a process of the group has + * ever been associated with. + * + * This function will not incure any communication. The size returned + * is the last size known to this client. + * + * @param[in] handle Group handle + * @param[out] size Current known size + * + * @return FLOCK_SUCCESS or error code defined in flock-common.h + */ +flock_return_t flock_group_size( + flock_group_handle_t handle, + size_t* size); + +/** + * @brief Get the currently known number of live members. + * + * @param handle Group handle + * @param count Number of live members known + * + * @return FLOCK_SUCCESS or error code defined in flock-common.h + */ +flock_return_t flock_group_live_member_count( + flock_group_handle_t handle, + size_t* count); + +/** + * @brief Function type used to access member information. + * + * @param void* User-provided context + * @param size_t Member rank + * @param hg_addr_t Address of the member + * @param uint16_t Provider ID of the member + * + * @return true to continue iterating, false to break + */ +typedef bool (*flock_member_access_fn)(void*, size_t, hg_addr_t, uint16_t); + +/** + * @brief Iterate over the members of the group. The iteration + * is garanteed to be from rank 0 to N-1, where N is the size of the group, + * skipping ranks that are not associated with a live member. + * + * @param handle Group handle + * @param access_fn Function to call on each member + * @param context Context to pass to the callback + * + * @return FLOCK_SUCCESS or error code defined in flock-common.h + */ +flock_return_t flock_group_member_iterate( + flock_group_handle_t handle, + flock_member_access_fn access_fn, + void* context); + +/** + * @brief Get the address of a member at a given rank. + * + * @important The caller is responsible for calling margo_addr_free + * on the resuling hg_addr_t. + * + * If no member exist at that rank, the address will be set to HG_ADDR_NULL + * and the function will return FLOCK_ERR_NO_MEMBER. + * + * @param[in] handle Group handle + * @param[in] rank Rank of the process + * @param[out] address Address of the process + * + * @return FLOCK_SUCCESS or error code defined in flock-common.h + */ +flock_return_t flock_group_member_get_address( + flock_group_handle_t handle, + size_t rank, + hg_addr_t* address); + +/** + * @brief Get the provider ID of a member at a given rank. + * + * If no member exist at that rank, the function will return FLOCK_ERR_NO_MEMBER. + * + * @param[in] handle Group handle + * @param[in] rank Rank of the process + * @param[out] provider_id Provider ID + * + * @return FLOCK_SUCCESS or error code defined in flock-common.h + */ +flock_return_t flock_group_member_get_provider_id( + flock_group_handle_t handle, + size_t rank, + uint16_t* provider_id); + +/** + * @brief Get the rank of a member from its address and provider ID. + * + * @param[in] handle Group handle + * @param[in] address Address of the member + * @param[in] provider_id Provider ID of the member + * @param[out] rank Rank of the member + * + * @return FLOCK_SUCCESS or error code defined in flock-common.h + */ +flock_return_t flock_group_member_get_rank( + flock_group_handle_t handle, + hg_addr_t address, + uint16_t provider_id, + size_t* rank); + +/** + * @brief Function type used to access the metadata of the group. + * + * @param void* User-provided context + * @param const char* Metadata key + * @param size_t Size of the metadata key + * @param const char* Metadata value + * @param size_t Size of the metadata value + * + * @return true to continue iterating, false to break + */ +typedef bool (*flock_metadata_access_fn)(void*, const char*, size_t, const char*, size_t); + +/** + * @brief Iterate over the metadata associated with the group. + * + * @param handle Group handle + * @param access_fn Function to call on each key/value pair + * @param context Context to pass to the callback + * + * @return FLOCK_SUCCESS or error code defined in flock-common.h + */ +flock_return_t flock_group_metadata_iterate( + flock_group_handle_t handle, + flock_metadata_access_fn access_fn, + void* context); + +/** + * @brief Get the value associated with a given key and pass + * the key/value pair to the provided access function. + * + * @param handle Group handle + * @param key Metadata key + * @param key_size Size of the key + * @param access_fn Function to call on the metadata + * @param context Context to pass to the callback + * + * @return FLOCK_SUCCESS or error code defined in flock-common.h + */ +flock_return_t flock_group_metadata_access( + flock_group_handle_t handle, + const char* key, + size_t key_size, + flock_metadata_access_fn access_fn, + void* context); + +/** + * @brief Update the internal view of the group. * - * @param[in] handle group handle. - * @param[in] x first number. - * @param[in] y second number. - * @param[out] result resulting value. + * If req != NULL, the operation will be non-blocking + * + * @param handle Group handle. + * @param req Optional request pointer. * * @return FLOCK_SUCCESS or error code defined in flock-common.h */ -flock_return_t flock_compute_sum( +flock_return_t flock_group_update( flock_group_handle_t handle, - int32_t x, - int32_t y, - int32_t* result); + flock_request_t* req); + +/** + * @brief Wait for completion of a request. + * + * @param req Request to wait on. + * + * @return FLOCK_SUCCESS or error code defined in flock-common.h + */ +flock_return_t flock_request_wait(flock_request_t req); + +/** + * @brief Test for completion of a request. + * + * @param req Request to test. + * + * @return FLOCK_SUCCESS or error code defined in flock-common.h + */ +flock_return_t flock_request_test(flock_request_t req, bool* completed); + +/** + * @brief Subscribe to updates from the group. The consistency of such + * updates depend on the backend implementation of the group's fault detection protocol. + * + * @important The client's margo_instance_id must have been initialized as a server + * for this functionality to be available. + * + * @param handle Group handle + * @param member_update_fn Function to call when a member is updated + * @param metadata_update_fn Function to call when a metadata is updated + * @param context Context to pass to the above functions + * + * @return FLOCK_SUCCESS or error code defined in flock-common.h + */ +flock_return_t flock_group_subscribe( + flock_group_handle_t handle, + flock_membership_update_fn member_update_fn, + flock_metadata_update_fn metadata_update_fn, + void* context); + +/** + * @brief Stop being notified about updates from this group. + * + * @param handle Group handle + * + * @return FLOCK_SUCCESS or error code defined in flock-common.h + */ +flock_return_t flock_group_unsubscribe( + flock_group_handle_t handle); #ifdef __cplusplus } diff --git a/include/flock/flock-server.h b/include/flock/flock-server.h index c454105..9be186d 100644 --- a/include/flock/flock-server.h +++ b/include/flock/flock-server.h @@ -50,6 +50,40 @@ flock_return_t flock_provider_register( const struct flock_provider_args* args, flock_provider_t* provider); +/** + * @brief Register callbacks that the provider will call when + * a member is updated or when the metadata is changed. + * + * @param provider Provider + * @param member_update_fn Function to call when a member is updated + * @param metadata_update_fn Function to call when a metadata is updated + * @param context Context to pass to the above functions + * + * @note The context argument is what will uniquely identify this + * callback registration and can be used in flock_provider_remove_update_callback + * to deregister these callbacks. It is valid to call flock_provider_add_update_callbacks + * multiple times with distinct contexts. + * + * @return FLOCK_SUCCESS or error code defined in flock-common.h + */ +flock_return_t flock_provider_add_update_callbacks( + flock_provider_t provider, + flock_membership_update_fn member_update_fn, + flock_metadata_update_fn metadata_update_fn, + void* context); + +/** + * @brief Remove the callbask associated with the given context. + * + * @param provider Provider + * @param context Context + * + * @return FLOCK_SUCCESS or error code defined in flock-common.h + */ +flock_return_t flock_provider_remove_update_callbacks( + flock_provider_t provider, + void* context); + /** * @brief Destroys the Alpha provider and deregisters its RPC. * diff --git a/src/bedrock-module.c b/src/bedrock-module.c index c2941cf..db3a4fe 100644 --- a/src/bedrock-module.c +++ b/src/bedrock-module.c @@ -41,7 +41,7 @@ static int flock_init_client( bedrock_module_client_t* client) { margo_instance_id mid = bedrock_args_get_margo_instance(args); - return flock_client_init(mid, (flock_client_t*)client); + return flock_client_init(mid, ABT_POOL_NULL, (flock_client_t*)client); } static int flock_finalize_client( diff --git a/src/client.c b/src/client.c index 202d520..eecb314 100644 --- a/src/client.c +++ b/src/client.c @@ -7,7 +7,7 @@ #include "client.h" #include "flock/flock-client.h" -flock_return_t flock_client_init(margo_instance_id mid, flock_client_t* client) +flock_return_t flock_client_init(margo_instance_id mid, ABT_pool pool, flock_client_t* client) { flock_client_t c = (flock_client_t)calloc(1, sizeof(*c)); if(!c) return FLOCK_ERR_ALLOCATION; @@ -43,7 +43,7 @@ flock_return_t flock_group_handle_create( flock_client_t client, hg_addr_t addr, uint16_t provider_id, - bool check, + uint32_t mode, flock_group_handle_t* handle) { if(client == FLOCK_CLIENT_NULL) @@ -51,7 +51,7 @@ flock_return_t flock_group_handle_create( hg_return_t ret; - if(check) { + if(mode & FLOCK_MODE_INIT_UPDATE) { char buffer[sizeof("flock")]; size_t bufsize = sizeof("flock"); ret = margo_provider_get_identity(client->mid, addr, provider_id, buffer, &bufsize); @@ -102,6 +102,7 @@ flock_return_t flock_group_handle_release(flock_group_handle_t handle) return FLOCK_SUCCESS; } +#if 0 flock_return_t flock_compute_sum( flock_group_handle_t handle, int32_t x, @@ -143,3 +144,4 @@ flock_return_t flock_compute_sum( margo_destroy(h); return ret; } +#endif diff --git a/tests/test-client.cpp b/tests/test-client.cpp index da2e1c6..6ac562b 100644 --- a/tests/test-client.cpp +++ b/tests/test-client.cpp @@ -45,7 +45,7 @@ TEST_CASE("Test client interface", "[client]") { flock_client_t client; flock_return_t ret; // test that we can create a client object - ret = flock_client_init(context->mid, &client); + ret = flock_client_init(context->mid, ABT_POOL_NULL, &client); REQUIRE(ret == FLOCK_SUCCESS); SECTION("Open group") { @@ -61,14 +61,6 @@ TEST_CASE("Test client interface", "[client]") { context->addr, provider_id + 123, true, &rh2); REQUIRE(ret == FLOCK_ERR_INVALID_PROVIDER); - SECTION("Send sum RPC") { - // test that we can send a sum RPC to the group - int32_t result = 0; - ret = flock_compute_sum(rh, 45, 55, &result); - REQUIRE(ret == FLOCK_SUCCESS); - REQUIRE(result == 100); - } - // test that we can increase the ref count ret = flock_group_handle_ref_incr(rh); REQUIRE(ret == FLOCK_SUCCESS);