Skip to content

Commit

Permalink
added digest mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
mdorier committed Apr 4, 2024
1 parent 9e8da31 commit f0a88a7
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 11 deletions.
121 changes: 120 additions & 1 deletion include/flock/flock-group-view.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ MERCURY_GEN_PROC(flock_metadata_t,

/**
* @brief Group view.
*
* A group view contains a dynamic array of members (flock_member_t),
* a dynamic array of metadata (flock_metadata_t), a digest, and a
* mutex to protect access to the view's fields.
*
* Important: while the fields can be read without the need for the
* bellow flock_* functions and FLOCK_* macros, they SHOULD NOT BE
* MODIFIED without calling these functions/macros. This is because
* (1) these functions also keep the digest up-to-date when the view is
* modified, and (2) these functions also ensure some invariants on
* the content of the view, such as the fact that members are sorted
* by rank, ranks are unique, metadata are sorted by key, keys are unique,
* and so on. Any direct modification of these fields risk breaking these
* invariants.
*/
typedef struct {
// Dynamic array of members (sorted by rank)
Expand All @@ -73,10 +87,13 @@ typedef struct {
uint64_t capacity;
flock_metadata_t* data;
} metadata;
// Digest of the group's content
uint64_t digest;
// Mutex to protect access to the group view
ABT_mutex_memory mtx;
} flock_group_view_t;

#define FLOCK_GROUP_VIEW_INITIALIZER {{0,0,NULL},{0,0,NULL},ABT_MUTEX_INITIALIZER}
#define FLOCK_GROUP_VIEW_INITIALIZER {{0,0,NULL},{0,0,NULL},0,ABT_MUTEX_INITIALIZER}

/**
* @brief This macro takes two pointers to flock_group_view_t and moves
Expand All @@ -89,10 +106,12 @@ typedef struct {
* @param __dst__ flock_group_view_t* into which to move.
*/
#define FLOCK_GROUP_VIEW_MOVE(__src__, __dst__) do { \
(__dst__)->digest = (__src__)->digest; \
memcpy(&(__dst__)->members, &(__src__)->members, sizeof((__src__)->members)); \
memcpy(&(__dst__)->metadata, &(__src__)->metadata, sizeof((__src__)->metadata)); \
memset(&(__src__)->members, 0, sizeof((__src__)->members)); \
memset(&(__src__)->metadata, 0, sizeof((__src__)->metadata)); \
(__src__)->digest = 0; \
} while(0)

#define FLOCK_GROUP_VIEW_LOCK(view) do { \
Expand Down Expand Up @@ -126,6 +145,8 @@ static inline void flock_group_view_clear(flock_group_view_t *view)
view->metadata.data = NULL;
view->metadata.capacity = 0;
view->metadata.size = 0;

view->digest = 0;
}

/**
Expand Down Expand Up @@ -154,6 +175,69 @@ static inline ssize_t flock_group_view_members_binary_search(
return -1; // Rank not found
}


/**
* @brief This function is used to compute string hashes to
* update a group view's digest.
*
* @param str string to hash.
*
* @return a uint64_t hash.
*/
static inline uint64_t flock_djb2_hash(const char *str)
{
uint64_t hash = 5381;
int c;

while((c = *str++))
hash = ((hash << 5) + hash) + c; /* hash * 33 + c */

return hash;
}

/**
* @brief This function is used to compute hashes to
* update a group view's digest when a member is added
* or removed.
*
* @param rank Member rank.
* @param provider_id Member provider ID.
* @param address Member address.
*
* @return a uint64_t hash.
*/
static inline uint64_t flock_hash_member(
uint64_t rank,
uint16_t provider_id,
const char *address)
{
uint64_t hash = flock_djb2_hash(address);
for(unsigned i=0; i < sizeof(rank); ++i)
hash = ((hash << 5) + hash) + ((char*)&rank)[i];
for(unsigned i=0; i < sizeof(provider_id); ++i)
hash = ((hash << 5) + hash) + ((char*)&provider_id)[i];
return hash;
}

/**
* @brief This function is used to compute metadata hashes to
* update a group view's digest when a metadata is updated.
*
* @param key Key of the metadata to hash.
* @param val Value of the metadata to hash.
*
* @return a uint64_t hash.
*/
static inline uint64_t flock_hash_metadata(const char *key, const char *val)
{
uint64_t kh = flock_djb2_hash(key);
uint64_t vh = flock_djb2_hash(val);
// To avoid the (key,value) pair to be equivalent to the (value,key) pair,
// we rotate the value's hash by 3 bytes
vh = (vh << 3) | (vh >> ((sizeof(vh) * CHAR_BIT - 3) % (sizeof(vh) * CHAR_BIT)));
return kh ^ vh;
}

/**
* @brief Add a member to the view.
*
Expand All @@ -173,6 +257,9 @@ static inline bool flock_group_view_add_member(
uint16_t provider_id,
const char *address)
{
// Compute the new member's hash
uint64_t member_hash = flock_hash_member(rank, provider_id, address);

// Check if there is enough capacity, if not, resize
if (view->members.size == view->members.capacity) {
if (view->members.capacity == 0)
Expand Down Expand Up @@ -206,6 +293,9 @@ static inline bool flock_group_view_add_member(

++view->members.size;

// Update digest
view->digest ^= member_hash;

return true;
}

Expand All @@ -222,6 +312,12 @@ static inline bool flock_group_view_remove_member(flock_group_view_t *view, uint
ssize_t idx = flock_group_view_members_binary_search(view, rank);
if (idx == -1) return false;

// Compute the hash of the member to remove
uint64_t member_hash = flock_hash_member(
view->members.data[idx].rank,
view->members.data[idx].provider_id,
view->members.data[idx].address);

// Free the memory allocated for the address
free(view->members.data[idx].address);

Expand All @@ -231,6 +327,9 @@ static inline bool flock_group_view_remove_member(flock_group_view_t *view, uint

--view->members.size;

// Update digest
view->digest ^= member_hash;

return true;
}

Expand Down Expand Up @@ -294,11 +393,19 @@ static inline bool flock_group_view_add_metadata(
const char* key,
const char* value)
{
// Compute the hash of the key and value
uint64_t metadata_hash = flock_hash_metadata(key, value);

// Try to find existing entry
ssize_t idx = flock_group_view_metadata_binary_search(view, key);
if (idx != -1) {
// Compute old metadata hash
uint64_t old_metadata_hash = flock_hash_metadata(key, view->metadata.data[idx].value);
// Free old value
free(view->metadata.data[idx].value);
view->metadata.data[idx].value = strdup(value);
// Update the digest
view->digest ^= old_metadata_hash ^ metadata_hash;
return true;
}

Expand Down Expand Up @@ -339,6 +446,9 @@ static inline bool flock_group_view_add_metadata(

++view->metadata.size;

// Update the digest
view->digest ^= metadata_hash;

return true;
}

Expand All @@ -355,6 +465,10 @@ static inline bool flock_group_view_remove_metadata(flock_group_view_t *view, co
ssize_t idx = flock_group_view_metadata_binary_search(view, key);
if (idx == -1) return false;

// Compute the hash of the key and value
uint64_t metadata_hash = flock_hash_metadata(
view->metadata.data[idx].key, view->metadata.data[idx].value);

// Free the memory allocated for the key and value
free(view->metadata.data[idx].key);
free(view->metadata.data[idx].value);
Expand All @@ -365,6 +479,9 @@ static inline bool flock_group_view_remove_metadata(flock_group_view_t *view, co

--view->metadata.size;

// Update the digest
view->digest ^= metadata_hash;

return true;
}

Expand Down Expand Up @@ -395,6 +512,8 @@ static inline const char *flock_group_view_find_metadata(const flock_group_view_
*/
static inline hg_return_t hg_proc_flock_group_view_t(hg_proc_t proc, flock_group_view_t* view) {
hg_return_t ret = HG_SUCCESS;
ret = hg_proc_uint64_t(proc, &view->digest);
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_hg_size_t(proc, &view->members.size);
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_hg_size_t(proc, &view->metadata.size);
Expand Down
4 changes: 2 additions & 2 deletions src/centralized/centralized-backend.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ static flock_return_t centralized_create_group(
ctx->is_primary = (primary_member->provider_id == args->provider_id)
&& (strcmp(primary_member->address, self_address_string) == 0);

FLOCK_GROUP_VIEW_MOVE(&args->initial_view, &ctx->view);

flock_return_t ret = flock_client_init(mid, args->pool, &ctx->client);
if(ret != FLOCK_SUCCESS) {
centralized_destroy_group(ctx);
Expand All @@ -71,6 +69,8 @@ static flock_return_t centralized_create_group(

ctx->config = json_object_new_object();

FLOCK_GROUP_VIEW_MOVE(&args->initial_view, &ctx->view);

*context = ctx;
return FLOCK_SUCCESS;
}
Expand Down
17 changes: 11 additions & 6 deletions src/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ flock_return_t flock_client_init(margo_instance_id mid, ABT_pool pool, flock_cli
if(flag == HG_TRUE) {
margo_registered_name(mid, "flock_get_view", &c->get_view_id, &flag);
} else {
c->get_view_id = MARGO_REGISTER(mid, "flock_get_view", void, get_view_out_t, NULL);
c->get_view_id = MARGO_REGISTER(mid, "flock_get_view", get_view_in_t, get_view_out_t, NULL);
}

*client = c;
Expand Down Expand Up @@ -62,10 +62,12 @@ static flock_return_t flock_group_update_view_cb(flock_request_t req)
}
ret = out.ret;

FLOCK_GROUP_VIEW_LOCK(&req->group_handle->view);
flock_group_view_clear(&req->group_handle->view);
FLOCK_GROUP_VIEW_MOVE(&out.view, &req->group_handle->view);
FLOCK_GROUP_VIEW_UNLOCK(&req->group_handle->view);
if(!out.no_change) {
FLOCK_GROUP_VIEW_LOCK(&req->group_handle->view);
flock_group_view_clear(&req->group_handle->view);
FLOCK_GROUP_VIEW_MOVE(&out.view, &req->group_handle->view);
FLOCK_GROUP_VIEW_UNLOCK(&req->group_handle->view);
}

margo_free_output(req->rpc_handle, &out);

Expand All @@ -91,7 +93,10 @@ flock_return_t flock_group_update_view(
tmp_req->group_handle = handle;
tmp_req->on_completion = flock_group_update_view_cb;

hret = margo_provider_iforward(handle->provider_id, h, NULL, &tmp_req->request);
get_view_in_t in;
in.digest = handle->view.digest;

hret = margo_provider_iforward(handle->provider_id, h, &in, &tmp_req->request);
if(hret != HG_SUCCESS) {
margo_destroy(h);
return FLOCK_ERR_FROM_MERCURY;
Expand Down
11 changes: 9 additions & 2 deletions src/provider.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ flock_return_t flock_provider_register(
/* Client RPCs */

id = MARGO_REGISTER_PROVIDER(mid, "flock_get_view",
void, get_view_out_t,
get_view_in_t, get_view_out_t,
flock_get_view_ult, provider_id, p->pool);
margo_register_data(mid, id, (void*)p, NULL);
p->get_view_id = id;
Expand Down Expand Up @@ -278,9 +278,16 @@ flock_return_t flock_provider_register_backend(
static void get_view_callback(void* uargs, const flock_group_view_t* view)
{
hg_handle_t handle = (hg_handle_t)uargs;
get_view_in_t in = {0};
margo_get_input(handle, &in);
get_view_out_t out = {0};
out.view = *view;
if(in.digest == view->digest) {
out.no_change = 1;
} else {
out.view = *view;
}
margo_respond(handle, &out);
margo_free_input(handle, &in);
}

static void flock_get_view_ult(hg_handle_t h)
Expand Down
4 changes: 4 additions & 0 deletions src/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

/* Client RPC types */

MERCURY_GEN_PROC(get_view_in_t,
((uint64_t)(digest)))

MERCURY_GEN_PROC(get_view_out_t,
((uint8_t)(no_change))\
((flock_group_view_t)(view))\
((int32_t)(ret)))

Expand Down
Loading

0 comments on commit f0a88a7

Please sign in to comment.