Skip to content

Commit

Permalink
Remove malloc from entry appending
Browse files Browse the repository at this point in the history
Added documentation to mention that user is responsible for managing
memory.
  • Loading branch information
willemt committed Sep 26, 2015
1 parent 3b91d4b commit 050268f
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 28 deletions.
57 changes: 43 additions & 14 deletions include/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ typedef int (
);

/** Callback for saving who we voted for to disk.
* This callback MUST flush the change to disk.
* For safety reasons this callback MUST flush the change to disk.
* @param[in] raft The Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] voted_for The node we voted for
Expand All @@ -220,16 +220,21 @@ typedef int (
);

/** Callback for saving log entry changes.
*
* This callback is used for:
* <ul>
* <li>Adding entries to the log (ie. offer)
* <li>Removing the first entry from the log (ie. polling)
* <li>Removing the last entry from the log (ie. popping)
* </ul>
* This callback MUST flush the change to disk.
*
* For safety reasons this callback MUST flush the change to disk.
*
* @param[in] raft The Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] entry The entry that the event is happening to
* @param[in] entry The entry that the event is happening to.
* The user is allowed to change the memory pointed to in the
* raft_entry_data_t struct. This MUST be done if the memory is temporary.
* @param[in] entry_idx The entries index in the log
* @return 0 on success */
typedef int (
Expand All @@ -253,23 +258,27 @@ typedef struct
func_applylog_f applylog;

/** Callback for persisting vote data
* This callback MUST flush the change to disk. */
* For safety reasons this callback MUST flush the change to disk. */
func_persist_int_f persist_vote;

/** Callback for persisting term data
* This callback MUST flush the change to disk. */
* For safety reasons this callback MUST flush the change to disk. */
func_persist_int_f persist_term;

/** Callback for adding an entry to the log
* This callback MUST flush the change to disk. */
* For safety reasons this callback MUST flush the change to disk. */
func_logentry_event_f log_offer;

/** Callback for removing the oldest entry from the log
* This callback MUST flush the change to disk. */
* For safety reasons this callback MUST flush the change to disk.
* @note If memory was malloc'd in log_offer then this should be the right
* time to free the memory. */
func_logentry_event_f log_poll;

/** Callback for removing the youngest entry from the log
* This callback MUST flush the change to disk. */
* For safety reasons this callback MUST flush the change to disk.
* @note If memory was malloc'd in log_offer then this should be the right
* time to free the memory. */
func_logentry_event_f log_pop;

/** Callback for catching debugging log messages
Expand Down Expand Up @@ -318,11 +327,11 @@ __attribute__ ((deprecated));
/** Add node.
*
* @note This library does not yet support membership changes.
* Once raft_periodic has been run this will fail.
* Once raft_periodic has been run this will fail.
*
* @note The order this call is made is important.
* This call MUST be made in the same order as the other raft nodes.
* This is because the node ID is assigned depending on when this call is made
* This call MUST be made in the same order as the other raft nodes.
* This is because the node ID is assigned depending on when this call is made
*
* @param[in] user_data The user data for the node.
* This is obtained using raft_node_get_udata.
Expand Down Expand Up @@ -351,7 +360,18 @@ void raft_set_request_timeout(raft_server_t* me, int msec);
int raft_periodic(raft_server_t* me, int msec_elapsed);

/** Receive an appendentries message.
* This function will block if it needs to append the message.
*
* Will block (ie. by syncing to disk) if we need to append a message.
*
* Might call malloc once to increase the log entry array size.
*
* The log_offer callback will be called.
*
* @note The memory pointer (ie. raft_entry_data_t) for each msg_entry_t is
* copied directly. If the memory is temporary you MUST either make the
* memory permanent (ie. via malloc) OR re-assign the memory within the
* log_offer callback.
*
* @param[in] node Index of the node who sent us this message
* @param[in] ae The appendentries message
* @param[out] r The resulting response
Expand Down Expand Up @@ -387,11 +407,20 @@ int raft_recv_requestvote_response(raft_server_t* me,
int node,
msg_requestvote_response_t* r);

/** Receive an entry message from client.
/** Receive an entry message from the client.
*
* Append the entry to the log and send appendentries to followers.
*
* This function will block if it needs to append the message.
* Will block (ie. by syncing to disk) if we need to append a message.
*
* Might call malloc once to increase the log entry array size.
*
* The log_offer callback will be called.
*
* @note The memory pointer (ie. raft_entry_data_t) in msg_entry_t is
* copied directly. If the memory is temporary you MUST either make the
* memory permanent (ie. via malloc) OR re-assign the memory within the
* log_offer callback.
*
* Will fail:
* <ul>
Expand Down
23 changes: 9 additions & 14 deletions src/raft_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -314,14 +314,11 @@ int raft_recv_appendentries(
{
msg_entry_t* cmd = &ae->entries[i];

/* TODO: replace malloc with mempoll/arena */
raft_entry_t* c = (raft_entry_t*)malloc(sizeof(raft_entry_t));
c->term = cmd->term;
memcpy(&c->data, &cmd->data, sizeof(raft_entry_data_t));
c->data.buf = (unsigned char*)malloc(cmd->data.len);
memcpy(c->data.buf, cmd->data.buf, cmd->data.len);
c->id = cmd->id;
int e = raft_append_entry(me_, c);
raft_entry_t ety;
ety.term = cmd->term;
ety.id = cmd->id;
memcpy(&ety.data, &cmd->data, sizeof(raft_entry_data_t));
int e = raft_append_entry(me_, &ety);
if (-1 == e)
{
__log(me_, "AE failure; couldn't append entry");
Expand Down Expand Up @@ -423,19 +420,17 @@ int raft_recv_entry(raft_server_t* me_, int node, msg_entry_t* e,
msg_entry_response_t *r)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
raft_entry_t ety;
int i;

if (!raft_is_leader(me_))
return -1;

__log(me_, "received entry from: %d", node);

raft_entry_t ety;
ety.term = me->current_term;
ety.id = e->id;
ety.data.len = e->data.len;
ety.data.buf = malloc(e->data.len);
memcpy(ety.data.buf, e->data.buf, e->data.len);
memcpy(&ety.data, &e->data, sizeof(raft_entry_data_t));
raft_append_entry(me_, &ety);
for (i = 0; i < me->num_nodes; i++)
if (me->nodeid != i)
Expand Down Expand Up @@ -463,10 +458,10 @@ int raft_send_requestvote(raft_server_t* me_, int node)
return 0;
}

int raft_append_entry(raft_server_t* me_, raft_entry_t* c)
int raft_append_entry(raft_server_t* me_, raft_entry_t* ety)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
return log_append_entry(me->log, c);
return log_append_entry(me->log, ety);
}

int raft_apply_entry(raft_server_t* me_)
Expand Down

0 comments on commit 050268f

Please sign in to comment.