Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batchify API #51

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 86 additions & 24 deletions include/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,14 @@ typedef int (
raft_node_t* node
);

/** Callback for detecting when non-voting nodes have obtained enough logs.
/** Callback for detecting when non-voting nodes have obtained enough entries.
* This triggers only when there are no pending configuration changes.
* @param[in] raft The Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] node The node
* @return 0 does not want to be notified again; otherwise -1 */
typedef int (
*func_node_has_sufficient_logs_f
*func_node_has_sufficient_entries_f
) (
raft_server_t* raft,
void *user_data,
Expand Down Expand Up @@ -303,6 +303,21 @@ typedef int (
int vote
);

/** Callback for applying log entries to the state machine.
* @param[in] raft The Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] entries Array of entries to be applied
* @param[in] n_entries Number of entries to be applied
* @return 0 on success */
typedef int (
*func_apply_entries_f
) (
raft_server_t* raft,
void *user_data,
raft_entry_t* entries,
int n_entries
);

/** Callback for saving log entry changes.
*
* This callback is used for:
Expand All @@ -322,8 +337,19 @@ typedef int (
* memory pointed to in the raft_entry_data_t struct. This MUST be done if
* the memory is temporary.
* @param[in] entry_idx The entries index in the log
* @param[in] n_entries The numberof entries.
* @return 0 on success */
typedef int (
*func_logentries_event_f
) (
raft_server_t* raft,
void *user_data,
raft_entry_t *entry,
int entry_idx,
int n_entries
);

typedef int (
*func_logentry_event_f
) (
raft_server_t* raft,
Expand All @@ -343,11 +369,6 @@ typedef struct
/** Callback for notifying user that a node needs a snapshot sent */
func_send_snapshot_f send_snapshot;

/** Callback for finite state machine application
* Return 0 on success.
* Return RAFT_ERR_SHUTDOWN if you want the server to shutdown. */
func_logentry_event_f applylog;

/** Callback for persisting vote data
* For safety reasons this callback MUST flush the change to disk. */
func_persist_vote_f persist_vote;
Expand All @@ -357,10 +378,40 @@ typedef struct
* disk atomically. */
func_persist_term_f persist_term;

/** Callback for adding an entry to the log
/** Callback for adding entries to the log
* For safety reasons this callback MUST flush the change to disk.
* Return 0 on success.
* Return RAFT_ERR_SHUTDOWN if you want the server to shutdown. */
func_logentries_event_f offer_entries;

/** Callback for removing the oldest entries from the log
* For safety reasons this callback MUST flush the change to disk.
* @note If memory was malloc'd in log_offer then this should be the right
* time to free the memory. */
func_logentries_event_f poll_entries;

/** Callback for removing the youngest entries from the log
* For safety reasons this callback MUST flush the change to disk.
* @note If memory was malloc'd in log_offer then this should be the right
* time to free the memory. */
func_logentries_event_f pop_entries;

/** Callback for determining which node this configuration log entry
* affects. This call only applies to configuration change log entries.
* @return the node ID of the node */
func_logentry_event_f entry_get_node_id;

/** Callback for detecting when a non-voting node has sufficient entries. */
func_node_has_sufficient_entries_f node_has_sufficient_entries;

/** Callback for catching debugging log messages
* This callback is optional */
func_log_f log;

/** WARNING: the below callbacks are deprecated */

/** Callback for adding an entry to the log
* For safety reasons this callback MUST flush the change to disk. */
func_logentry_event_f log_offer;

/** Callback for removing the oldest entry from the log
Expand All @@ -375,17 +426,18 @@ typedef struct
* time to free the memory. */
func_logentry_event_f log_pop;

/** Callback for finite state machine application
* Return 0 on success.
* Return RAFT_ERR_SHUTDOWN if you want the server to shutdown. */
func_logentry_event_f applylog;

/** Callback for detecting when a non-voting node has sufficient entries. */
func_node_has_sufficient_entries_f node_has_sufficient_logs;

/** Callback for determining which node this configuration log entry
* affects. This call only applies to configuration change log entries.
* @return the node ID of the node */
func_logentry_event_f log_get_node_id;

/** Callback for detecting when a non-voting node has sufficient logs. */
func_node_has_sufficient_logs_f node_has_sufficient_logs;

/** Callback for catching debugging log messages
* This callback is optional */
func_log_f log;
} raft_cbs_t;

typedef struct
Expand Down Expand Up @@ -477,12 +529,12 @@ int raft_periodic(raft_server_t* me, int msec_elapsed);
*
* Might call malloc once to increase the log entry array size.
*
* The log_offer callback will be called.
* The offer_entries callback will be called.
*
* @note The memory pointer (ie. raft_entry_data_t) for each msg_entry_t is
* copied directly. If the memory is temporary you MUST either make the
* memory permanent (ie. via malloc) OR re-assign the memory within the
* log_offer callback.
* offer_entries callback.
*
* @param[in] node The node who sent us this message
* @param[in] ae The appendentries message
Expand Down Expand Up @@ -535,12 +587,12 @@ int raft_recv_requestvote_response(raft_server_t* me,
*
* Might call malloc once to increase the log entry array size.
*
* The log_offer callback will be called.
* The offer_entries callback will be called.
*
* @note The memory pointer (ie. raft_entry_data_t) in msg_entry_t is
* copied directly. If the memory is temporary you MUST either make the
* memory permanent (ie. via malloc) OR re-assign the memory within the
* log_offer callback.
* offer_entries callback.
*
* Will fail:
* <ul>
Expand Down Expand Up @@ -710,6 +762,8 @@ void raft_set_commit_idx(raft_server_t* me, int commit_idx);
* RAFT_ERR_NOMEM memory allocation failure */
int raft_append_entry(raft_server_t* me, raft_entry_t* ety);

int raft_append_entries(raft_server_t* me_, raft_entry_t* etys, int n_etys);

/** Confirm if a msg_entry_response has been committed.
* @param[in] r The response we want to check */
int raft_msg_entry_response_committed(raft_server_t* me_,
Expand All @@ -736,9 +790,11 @@ void raft_node_set_voting(raft_node_t* node, int voting);
* @return 1 if this is a voting node. Otherwise 0. */
int raft_node_is_voting(raft_node_t* me_);

/** Check if a node has sufficient logs to be able to join the cluster.
/** Check if a node has sufficient entries to be able to join the cluster.
**/
int raft_node_has_sufficient_logs(raft_node_t* me_);
int raft_node_has_sufficient_entries(raft_node_t* me_);

#define raft_node_has_sufficient_logs raft_node_has_sufficient_entries

/** Apply all entries up to the commit index
* @return
Expand Down Expand Up @@ -803,11 +859,17 @@ int raft_get_snapshot_entry_idx(raft_server_t *me_);
int raft_snapshot_is_in_progress(raft_server_t *me_);

/** Remove the first log entry.
* This should be used for compacting logs.
* This should be used for compacting entries.
* @return 0 on success
**/
int raft_poll_entry(raft_server_t* me_, raft_entry_t **ety);

/** Remove the first log entries.
* This should be used for compacting entries.
* @return 0 on success
**/
int raft_poll_entries(raft_server_t* me_, raft_entry_t **ety, const int len);

/** Get last applied entry
**/
raft_entry_t *raft_get_last_applied_entry(raft_server_t *me_);
Expand All @@ -819,8 +881,8 @@ int raft_get_first_entry_idx(raft_server_t* me_);
* This is usually the result of a snapshot being loaded.
* We need to send an appendentries response.
*
* @param[in] last_included_term Term of the last log of the snapshot
* @param[in] last_included_index Index of the last log of the snapshot
* @param[in] last_included_term Term of the last entry of the snapshot
* @param[in] last_included_index Index of the last entry of the snapshot
*
* @return
* 0 on success
Expand Down
16 changes: 11 additions & 5 deletions include/raft_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ typedef struct {
/* my node ID */
raft_node_t* node;

/* the log which has a voting cfg change, otherwise -1 */
/* log entry which has a voting cfg change, otherwise -1 */
int voting_cfg_change_log_idx;

/* Our membership with the cluster is confirmed (ie. configuration log was
* committed) */
/* Our membership with the cluster is confirmed (ie. configuration log
* entry was committed) */
int connected;

int snapshot_in_progress;
Expand Down Expand Up @@ -122,15 +122,21 @@ void raft_node_vote_for_me(raft_node_t* me_, const int vote);

int raft_node_has_vote_for_me(raft_node_t* me_);

void raft_node_set_has_sufficient_logs(raft_node_t* me_);
void raft_node_set_has_sufficient_entries(raft_node_t* me_);

int raft_votes_is_majority(const int nnodes, const int nvotes);

/* DEPRECATED */
void raft_offer_log(raft_server_t* me_, raft_entry_t* ety, const int idx);

/* DEPRECATED */
void raft_pop_log(raft_server_t* me_, raft_entry_t* ety, const int idx);

int raft_get_num_snapshottable_logs(raft_server_t* me_);
void raft_offer_entries(raft_server_t* me_, raft_entry_t* ety, const int idx, int len);

void raft_pop_entries(raft_server_t* me_, raft_entry_t* ety, const int idx, int len);

int raft_get_num_snapshottable_entries(raft_server_t* me_);

int raft_node_is_active(raft_node_t* me_);

Expand Down
13 changes: 11 additions & 2 deletions src/raft_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* found in the LICENSE file.
*
* @file
* @brief ADT for managing Raft log entries (aka entries)
* @brief ADT for managing Raft log entries
* @author Willem Thiart himself@willemthiart.com
*/

Expand Down Expand Up @@ -136,7 +136,6 @@ void log_clear(log_t* me_)
me->base = 0;
}

/** TODO: rename log_append */
int log_append_entry(log_t* me_, raft_entry_t* ety)
{
log_private_t* me = (log_private_t*)me_;
Expand Down Expand Up @@ -165,6 +164,11 @@ int log_append_entry(log_t* me_, raft_entry_t* ety)
return 0;
}

int log_append_entries(log_t* me_, raft_entry_t* ety, int len)
{
return 0;
}

raft_entry_t* log_get_from_idx(log_t* me_, int idx, int *n_etys)
{
log_private_t* me = (log_private_t*)me_;
Expand Down Expand Up @@ -274,6 +278,11 @@ int log_poll(log_t * me_, void** etyp)
return 0;
}

int log_poll_entries(log_t * me_, void** etyp, const int len)
{
return 0;
}

raft_entry_t *log_peektail(log_t * me_)
{
log_private_t* me = (log_private_t*)me_;
Expand Down
20 changes: 10 additions & 10 deletions src/raft_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

#include "raft.h"

#define RAFT_NODE_VOTED_FOR_ME (1 << 0)
#define RAFT_NODE_VOTING (1 << 1)
#define RAFT_NODE_HAS_SUFFICIENT_LOG (1 << 2)
#define RAFT_NODE_INACTIVE (1 << 3)
#define RAFT_NODE_VOTING_COMMITTED (1 << 4)
#define RAFT_NODE_ADDITION_COMMITTED (1 << 5)
#define RAFT_NODE_VOTED_FOR_ME (1 << 0)
#define RAFT_NODE_VOTING (1 << 1)
#define RAFT_NODE_HAS_SUFFICIENT_ENTRIES (1 << 2)
#define RAFT_NODE_INACTIVE (1 << 3)
#define RAFT_NODE_VOTING_COMMITTED (1 << 4)
#define RAFT_NODE_ADDITION_COMMITTED (1 << 5)

typedef struct
{
Expand Down Expand Up @@ -127,16 +127,16 @@ int raft_node_is_voting(raft_node_t* me_)
return (me->flags & RAFT_NODE_VOTING) != 0;
}

int raft_node_has_sufficient_logs(raft_node_t* me_)
int raft_node_has_sufficient_entries(raft_node_t* me_)
{
raft_node_private_t* me = (raft_node_private_t*)me_;
return (me->flags & RAFT_NODE_HAS_SUFFICIENT_LOG) != 0;
return (me->flags & RAFT_NODE_HAS_SUFFICIENT_ENTRIES) != 0;
}

void raft_node_set_has_sufficient_logs(raft_node_t* me_)
void raft_node_set_has_sufficient_entries(raft_node_t* me_)
{
raft_node_private_t* me = (raft_node_private_t*)me_;
me->flags |= RAFT_NODE_HAS_SUFFICIENT_LOG;
me->flags |= RAFT_NODE_HAS_SUFFICIENT_ENTRIES;
}

void raft_node_set_active(raft_node_t* me_, int active)
Expand Down
Loading