Skip to content

Commit

Permalink
Add Hooks For Reading/Writing Non-Standard Cluster Fields (#917)
Browse files Browse the repository at this point in the history
Add hooks to mtev_clustering to allow applications that use this to read and write custom fields in the cluster and node XML fields. This allows users to set things relevant to them, but are not necessarily relevant to all clusters.
  • Loading branch information
pamaddox authored Feb 14, 2024
1 parent 1d5c32c commit bcfa6a6
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 14 deletions.
2 changes: 2 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

## 2.5

* Add various hooks into `mtev_clustering` to allow applications to use custom fields.

## 2.5.4

* Fix issue in mtev_net_heartbeat where there would be attempts to use invalid messages
Expand Down
79 changes: 65 additions & 14 deletions src/mtev_cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

#include "mtev_defines.h"

#include <libxml/tree.h>
#include <inttypes.h>

#include "mtev_uuid.h"
Expand Down Expand Up @@ -157,6 +156,48 @@ MTEV_HOOK_IMPL(mtev_cluster_handle_node_update,
struct timeval old_boot_time),
(closure,node_change,updated_node,cluster,old_boot_time))

MTEV_HOOK_IMPL(mtev_cluster_on_write_extra_cluster_config_cleanup,
(mtev_cluster_t *cluster, xmlNodePtr node),
void *, closure,
(void *closure, mtev_cluster_t *cluster, xmlNodePtr parent),
(closure, cluster, node));

MTEV_HOOK_IMPL(mtev_cluster_write_extra_cluster_config_xml,
(mtev_cluster_t *cluster, xmlNodePtr node),
void *, closure,
(void *closure, mtev_cluster_t *cluster, xmlNodePtr node),
(closure, cluster, node));

MTEV_HOOK_IMPL(mtev_cluster_write_extra_node_config_xml,
(mtev_cluster_t *cluster, uuid_t node_id, xmlNodePtr node),
void *, closure,
(void *closure, mtev_cluster_t *cluster, uuid_t node_id, xmlNodePtr node),
(closure, cluster, node_id, node));

MTEV_HOOK_IMPL(mtev_cluster_write_extra_cluster_config_json,
(mtev_cluster_t *cluster, struct json_object *obj),
void *, closure,
(void *closure, mtev_cluster_t *cluster, struct json_object *obj),
(closure, cluster, obj));

MTEV_HOOK_IMPL(mtev_cluster_write_extra_node_config_json,
(mtev_cluster_t *cluster, uuid_t node_id, struct json_object *obj),
void *, closure,
(void *closure, mtev_cluster_t *cluster, uuid_t node_id, struct json_object *obj),
(closure, cluster, node_id, obj));

MTEV_HOOK_IMPL(mtev_cluster_read_extra_cluster_config,
(mtev_cluster_t *cluster, mtev_conf_section_t *conf),
void *, closure,
(void *closure, mtev_cluster_t *cluster, mtev_conf_section_t *conf),
(closure, cluster, conf));

MTEV_HOOK_IMPL(mtev_cluster_read_extra_node_config,
(mtev_cluster_t *cluster, uuid_t node_uuid, mtev_conf_section_t *conf),
void *, closure,
(void *closure, mtev_cluster_t *cluster, uuid_t node_uuid, mtev_conf_section_t *conf),
(closure, cluster, node_uuid, conf));

mtev_boolean
mtev_cluster_node_is_dead(mtev_cluster_node_t *node) {
return compare_timeval(node->boot_time, boot_time_of_dead_node) == 0;
Expand Down Expand Up @@ -533,6 +574,7 @@ mtev_cluster_write_config(mtev_cluster_t *cluster) {
xmlUnsetProp(parent, (xmlChar *)"maturity");
xmlUnsetProp(parent, (xmlChar *)"key");
xmlUnsetProp(parent, (xmlChar *)"seq");
mtev_cluster_on_write_extra_cluster_config_cleanup_hook_invoke(cluster, parent);
while(NULL != (child = parent->children)) {
xmlUnlinkNode(child);
xmlFreeNode(child);
Expand All @@ -556,6 +598,7 @@ mtev_cluster_write_config(mtev_cluster_t *cluster) {
xmlSetProp(parent, (xmlChar *)"key", (xmlChar *)cluster->key);
snprintf(new_seq_str, sizeof(new_seq_str), "%"PRId64, cluster->config_seq);
xmlSetProp(parent, (xmlChar *)"seq", (xmlChar *)new_seq_str);
mtev_cluster_write_extra_cluster_config_xml_hook_invoke(cluster, parent);
if(cluster->node_cnt > 0) mtevAssert(cluster->nodes);
for(i=0;i<cluster->node_cnt;i++) {
xmlNodePtr node;
Expand All @@ -564,6 +607,7 @@ mtev_cluster_write_config(mtev_cluster_t *cluster) {
mtev_uuid_unparse_lower(cluster->nodes[i].id, uuid_str);
xmlSetProp(node, (xmlChar *)"id", (xmlChar *)uuid_str);
xmlSetProp(node, (xmlChar *)"cn", (xmlChar *)cluster->nodes[i].cn);
mtev_cluster_write_extra_node_config_xml_hook_invoke(cluster, cluster->nodes[i].id, node);
if(cluster->nodes[i].addr.addr4.sin_family == AF_INET) {
inet_ntop(AF_INET, &cluster->nodes[i].addr.addr4.sin_addr,
ipstr, sizeof(ipstr));
Expand Down Expand Up @@ -664,6 +708,16 @@ int mtev_cluster_update_internal(mtev_conf_section_t cluster) {
}
}

new_cluster = mtev_memory_safe_calloc(1, sizeof(*new_cluster));
new_cluster->name = name; name = NULL;
new_cluster->key = key; key = NULL;
new_cluster->config_seq = seq;
new_cluster->port = port;
new_cluster->period = period;
new_cluster->timeout = timeout;
new_cluster->maturity = maturity;
mtev_cluster_read_extra_cluster_config_hook_invoke(new_cluster, &cluster);

nodes = mtev_conf_get_sections_write(cluster, "node", &n_nodes);
if(n_nodes > 0) {
nlist = mtev_memory_safe_calloc(n_nodes, sizeof(*nlist));
Expand All @@ -678,22 +732,23 @@ int mtev_cluster_update_internal(mtev_conf_section_t cluster) {

if(!mtev_conf_get_stringbuf(nodes[i], "@id", uuid_str, sizeof(uuid_str)) ||
mtev_uuid_parse(uuid_str, nlist[i].id) != 0) {
mtevL(cerror, "Cluster '%s' node %d has no (or bad) id\n", name, i);
mtevL(cerror, "Cluster '%s' node %d has no (or bad) id\n", new_cluster->name, i);
goto bail;
}
if(!mtev_conf_get_stringbuf(nodes[i], "@cn",
nlist[i].cn, sizeof(nlist[i].cn))) {
mtevL(cerror, "Cluster '%s' node %d has no cn\n", name, i);
mtevL(cerror, "Cluster '%s' node %d has no cn\n", new_cluster->name, i);
goto bail;
}
if(!mtev_conf_get_int32(nodes[i], "@port", &port) || port < 0 || port > 0xffff) {
mtevL(cerror, "Cluster '%s' node %d has no (or bad) port\n", name, i);
mtevL(cerror, "Cluster '%s' node %d has no (or bad) port\n", new_cluster->name, i);
goto bail;
}
if(!mtev_conf_get_stringbuf(nodes[i], "@address", bufstr, sizeof(bufstr))) {
mtevL(cerror, "Cluster '%s' node %d has no address\n", name, i);
mtevL(cerror, "Cluster '%s' node %d has no address\n", new_cluster->name, i);
goto bail;
}
mtev_cluster_read_extra_node_config_hook_invoke(new_cluster, nlist[i].id, &nodes[i]);

family = AF_INET;
rv = inet_pton(family, bufstr, &a);
Expand All @@ -702,7 +757,7 @@ int mtev_cluster_update_internal(mtev_conf_section_t cluster) {
rv = inet_pton(family, bufstr, &a);
if(rv != 1) {
mtevL(cerror, "Cluster '%s' node '%s' has bad address '%s'\n",
name, uuid_str, bufstr);
new_cluster->name, uuid_str, bufstr);
goto bail;
}
else {
Expand All @@ -721,14 +776,6 @@ int mtev_cluster_update_internal(mtev_conf_section_t cluster) {
}
}

new_cluster = mtev_memory_safe_calloc(1, sizeof(*new_cluster));
new_cluster->name = name; name = NULL;
new_cluster->key = key; key = NULL;
new_cluster->config_seq = seq;
new_cluster->port = port;
new_cluster->period = period;
new_cluster->timeout = timeout;
new_cluster->maturity = maturity;
if (nlist != NULL) {
qsort(nlist, n_nodes, sizeof(*nlist), mtev_cluster_node_compare);
}
Expand Down Expand Up @@ -1031,6 +1078,7 @@ mtev_cluster_to_json(mtev_cluster_t *c) {
MJ_KV(obj, "period", MJ_INT(c->period));
MJ_KV(obj, "timeout", MJ_INT(c->timeout));
MJ_KV(obj, "maturity", MJ_INT(c->maturity));
mtev_cluster_write_extra_cluster_config_json_hook_invoke(c, obj);

char uuid_str[UUID_STR_LEN+1];
if(c->oldest_node && !mtev_uuid_is_null(c->oldest_node->id)) {
Expand All @@ -1050,6 +1098,7 @@ mtev_cluster_to_json(mtev_cluster_t *c) {
MJ_KV(node, "reference_time", MJ_UINT64(now.tv_sec));
MJ_KV(node, "last_contact", MJ_UINT64(n->last_contact.tv_sec));
MJ_KV(node, "boot_time", MJ_UINT64(n->boot_time.tv_sec));
mtev_cluster_write_extra_node_config_json_hook_invoke(c, n->id, node);

if(n->addr.addr4.sin_family == AF_INET) {
inet_ntop(AF_INET, &n->addr.addr4.sin_addr,
Expand Down Expand Up @@ -1130,6 +1179,7 @@ mtev_cluster_to_xmlnode(mtev_cluster_t *c) {
xmlSetProp(cluster, (xmlChar *)"timeout", (xmlChar *)timeout);
snprintf(maturity, sizeof(maturity), "%d", c->maturity);
xmlSetProp(cluster, (xmlChar *)"maturity", (xmlChar *)maturity);
mtev_cluster_write_extra_cluster_config_xml_hook_invoke(c, cluster);

if(c->oldest_node && !mtev_uuid_is_null(c->oldest_node->id)) {
xmlNodePtr node;
Expand All @@ -1153,6 +1203,7 @@ mtev_cluster_to_xmlnode(mtev_cluster_t *c) {
xmlSetProp(node, (xmlChar *)"last_contact", (xmlChar *)time);
snprintf(time, sizeof(time), "%lu", (unsigned long)n->boot_time.tv_sec);
xmlSetProp(node, (xmlChar *)"boot_time", (xmlChar *)time);
mtev_cluster_write_extra_node_config_xml_hook_invoke(c, n->id, node);

if(n->addr.addr4.sin_family == AF_INET) {
inet_ntop(AF_INET, &n->addr.addr4.sin_addr,
Expand Down
28 changes: 28 additions & 0 deletions src/mtev_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <libxml/tree.h>
#include <mtev_conf.h>

#ifdef __cplusplus
Expand Down Expand Up @@ -351,6 +352,33 @@ MTEV_HOOK_PROTO(mtev_cluster_handle_node_update,
(void *closure, mtev_cluster_node_changes_t node_changes, mtev_cluster_node_t *updated_node, mtev_cluster_t *cluster,
struct timeval old_boot_time));

MTEV_HOOK_PROTO(mtev_cluster_on_write_extra_cluster_config_cleanup,
(mtev_cluster_t *cluster, xmlNodePtr node), void *, closure,
(void *closure, mtev_cluster_t *cluster, xmlNodePtr node));

MTEV_HOOK_PROTO(mtev_cluster_write_extra_cluster_config_xml,
(mtev_cluster_t *cluster, xmlNodePtr node), void *, closure,
(void *closure, mtev_cluster_t *cluster, xmlNodePtr node));

MTEV_HOOK_PROTO(mtev_cluster_write_extra_node_config_xml,
(mtev_cluster_t *cluster, uuid_t node_id, xmlNodePtr node), void *, closure,
(void *closure, mtev_cluster_t *cluster, uuid_t node_id, xmlNodePtr node));

MTEV_HOOK_PROTO(mtev_cluster_write_extra_cluster_config_json,
(mtev_cluster_t *cluster, struct json_object *obj), void *, closure,
(void *closure, mtev_cluster_t *cluster, struct json_object *obj));

MTEV_HOOK_PROTO(mtev_cluster_write_extra_node_config_json,
(mtev_cluster_t *cluster, uuid_t node_id, struct json_object *obj), void *, closure,
(void *closure, mtev_cluster_t *cluster, uuid_t node_id, struct json_object *obj));

MTEV_HOOK_PROTO(mtev_cluster_read_extra_cluster_config,
(mtev_cluster_t *cluster, mtev_conf_section_t *conf), void *, closure,
(void *closure, mtev_cluster_t *cluster, mtev_conf_section_t *conf));

MTEV_HOOK_PROTO(mtev_cluster_read_extra_node_config,
(mtev_cluster_t *cluster, uuid_t node_uuid, mtev_conf_section_t *conf), void *, closure,
(void *closure, mtev_cluster_t *cluster, uuid_t node_uuid, mtev_conf_section_t *conf));

#ifdef __cplusplus
}
Expand Down

0 comments on commit bcfa6a6

Please sign in to comment.