Skip to content

Commit

Permalink
Add log compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
willemt committed Jan 8, 2018
1 parent 65351cb commit 3ee2c46
Show file tree
Hide file tree
Showing 11 changed files with 1,296 additions and 81 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ static: $(OBJECTS)
ar -r libraft.a $(OBJECTS)

.PHONY: tests
tests: src/raft_server.c src/raft_server_properties.c src/raft_log.c src/raft_node.c $(TEST_DIR)/main_test.c $(TEST_DIR)/test_server.c $(TEST_DIR)/test_node.c $(TEST_DIR)/test_log.c $(TEST_DIR)/test_scenario.c $(TEST_DIR)/mock_send_functions.c $(TEST_DIR)/CuTest.c $(LLQUEUE_DIR)/linked_list_queue.c
tests: src/raft_server.c src/raft_server_properties.c src/raft_log.c src/raft_node.c $(TEST_DIR)/main_test.c $(TEST_DIR)/test_server.c $(TEST_DIR)/test_node.c $(TEST_DIR)/test_log.c $(TEST_DIR)/test_snapshotting.c $(TEST_DIR)/test_scenario.c $(TEST_DIR)/mock_send_functions.c $(TEST_DIR)/CuTest.c $(LLQUEUE_DIR)/linked_list_queue.c
$(CC) $(CFLAGS) -o tests_main $^
./tests_main
gcov raft_server.c
Expand Down
28 changes: 24 additions & 4 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,27 @@ It's highly recommended that when a node is added to the cluster that its node I

3. Once the ``RAFT_LOGTYPE_REMOVE_NODE`` configuration change log is applied in the ``applylog`` callback we shutdown the server if it is to be removed.

Todo
====

- Log compaction
Log Compaction
--------------
The log compaction method supported is called "Snapshotting for memory-based state machines" (Ongaro, 2014)

This library does not send snapshots (ie. there are NO send_snapshot, recv_snapshot callbacks to implement). The user has to send the snapshot outside of this library. The implementor has to serialize and deserialize the snapshot.

The process works like this:

1. Begin snapshotting with ``raft_begin_snapshot``.
2. Save the current membership details to the snapshot.
3. Save the finite state machine to the snapshot.
4. End snapshotting with ``raft_end_snapshot``.
5. When the ``send_snapshot`` callback fires, the user must propogate the snapshot to the other node.
6. Once the peer has the snapshot, they call ``raft_begin_load_snapshot``.
7. Peer calls ``raft_add_node`` to add nodes as per the snapshot's membership info.
8. Peer call s``raft_node_set_voting`` to nodes as per the snapshot's membership info.
9. Peer calls ``raft_node_set_active`` to nodes as per the snapshot's membership info.
10. Finally, peer calls ``raft_node_set_active`` to nodes as per the snapshot's membership info.

When a node receives a snapshot it could reuse that snapshot itself for other nodes.

References
==========
Ongaro, D. (2014). Consensus: bridging theory and practice. Retrieved from https://web.stanford.edu/~ouster/cgi-bin/papers/OngaroPhD.pdf
137 changes: 136 additions & 1 deletion include/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
#define RAFT_ERR_ONE_VOTING_CHANGE_ONLY -3
#define RAFT_ERR_SHUTDOWN -4
#define RAFT_ERR_NOMEM -5
#define RAFT_ERR_NEEDS_SNAPSHOT -6
#define RAFT_ERR_SNAPSHOT_IN_PROGRESS -7
#define RAFT_ERR_SNAPSHOT_ALREADY_LOADED -8
#define RAFT_ERR_LAST -100

#define RAFT_REQUESTVOTE_ERR_GRANTED 1
Expand Down Expand Up @@ -59,6 +62,7 @@ typedef enum {
* Removing nodes is a 2 step process: first demote, then remove.
*/
RAFT_LOGTYPE_REMOVE_NODE,
RAFT_LOGTYPE_SNAPSHOT,
/**
* Users can piggyback the entry mechanism by specifying log types that
* are higher than RAFT_LOGTYPE_NUM.
Expand Down Expand Up @@ -220,6 +224,22 @@ typedef int (
msg_appendentries_t* msg
);

/**
* Log compaction
* Callback for telling the user to send a snapshot.
*
* @param[in] raft Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] node Node's ID that needs a snapshot sent to
**/
typedef int (
*func_send_snapshot_f
) (
raft_server_t* raft,
void *user_data,
raft_node_t* node
);

/** Callback for detecting when non-voting nodes have obtained enough logs.
* This triggers only when there are no pending configuration changes.
* @param[in] raft The Raft server making this callback
Expand Down Expand Up @@ -320,6 +340,9 @@ typedef struct
/** Callback for sending appendentries messages */
func_send_appendentries_f send_appendentries;

/** Callback for notifying user that a node needs a snapshot sent */
func_send_snapshot_f send_snapshot;

/** Callback for finite state machine application
* Return 0 on success.
* Return RAFT_ERR_SHUTDOWN if you want the server to shutdown. */
Expand Down Expand Up @@ -464,7 +487,10 @@ int raft_periodic(raft_server_t* me, int msec_elapsed);
* @param[in] node The node who sent us this message
* @param[in] ae The appendentries message
* @param[out] r The resulting response
* @return 0 on success */
* @return
* 0 on success
* RAFT_ERR_NEEDS_SNAPSHOT
* */
int raft_recv_appendentries(raft_server_t* me,
raft_node_t* node,
msg_appendentries_t* ae,
Expand Down Expand Up @@ -710,6 +736,10 @@ void raft_node_set_voting(raft_node_t* node, int voting);
* @return 1 if this is a voting node. Otherwise 0. */
int raft_node_is_voting(raft_node_t* me_);

/** Check if a node has sufficient logs to be able to join the cluster.
**/
int raft_node_has_sufficient_logs(raft_node_t* me_);

/** Apply all entries up to the commit index
* @return
* 0 on success;
Expand All @@ -735,4 +765,109 @@ int raft_entry_is_voting_cfg_change(raft_entry_t* ety);
* @return 1 if this is a configuration change. */
int raft_entry_is_cfg_change(raft_entry_t* ety);

raft_entry_t *raft_get_last_applied_entry(raft_server_t *me_);

/** Begin snapshotting.
*
* While snapshotting, raft will:
* - not apply log entries
* - not start elections
*
* @return 0 on success
*
**/
int raft_begin_snapshot(raft_server_t *me_);

/** Stop snapshotting.
*
* The user MUST include membership changes inside the snapshot. This means
* that membership changes are included in the size of the snapshot. For peers
* that load the snapshot, the user needs to deserialize the snapshot to
* obtain the membership changes.
*
* The user MUST compact the log up to the commit index. This means all
* log entries up to the commit index MUST be deleted (aka polled).
*
* @return
* 0 on success
* -1 on failure
**/
int raft_end_snapshot(raft_server_t *me_);

/** Get the entry index of the entry that was snapshotted
**/
int raft_get_snapshot_entry_idx(raft_server_t *me_);

/** Check is a snapshot is in progress
**/
int raft_snapshot_is_in_progress(raft_server_t *me_);

/** Remove the first log entry.
* This should be used for compacting logs.
* @return 0 on success
**/
int raft_poll_entry(raft_server_t* me_, raft_entry_t **ety);

/** Get last applied entry
**/
raft_entry_t *raft_get_last_applied_entry(raft_server_t *me_);

int raft_get_first_entry_idx(raft_server_t* me_);

/** Start loading snapshot
*
* This is usually the result of a snapshot being loaded.
* We need to send an appendentries response.
*
* @param[in] last_included_term Term of the last log of the snapshot
* @param[in] last_included_index Index of the last log of the snapshot
*
* @return
* 0 on success
* -1 on failure
* RAFT_ERR_SNAPSHOT_ALREADY_LOADED
**/
int raft_begin_load_snapshot(raft_server_t *me_,
int last_included_term,
int last_included_index);

/** Stop loading snapshot.
*
* @return
* 0 on success
* -1 on failure
**/
int raft_end_load_snapshot(raft_server_t *me_);

int raft_get_snapshot_last_idx(raft_server_t *me_);

int raft_get_snapshot_last_term(raft_server_t *me_);

void raft_set_snapshot_metadata(raft_server_t *me_, int term, int idx);

/** Check if a node is active.
* Active nodes could become voting nodes.
* This should be used for creating the membership snapshot.
**/
int raft_node_is_active(raft_node_t* me_);

/** Make the node active.
*
* The user sets this to 1 between raft_begin_load_snapshot and
* raft_end_load_snapshot.
*
* @param[in] active Set a node as active if this is 1
**/
void raft_node_set_active(raft_node_t* me_, int active);

/** Check if a node's voting status has been committed.
* This should be used for creating the membership snapshot.
**/
int raft_node_is_voting_committed(raft_node_t* me_);

/** Check if a node's membership to the cluster has been committed.
* This should be used for creating the membership snapshot.
**/
int raft_node_is_addition_committed(raft_node_t* me_);

#endif /* RAFT_H_ */
7 changes: 7 additions & 0 deletions include/raft_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ void log_empty(log_t * me_);
* Remove oldest entry. Set *etyp to oldest entry on success. */
int log_poll(log_t * me_, void** etyp);

/** Get an array of entries from this index onwards.
* This is used for batching.
*/
raft_entry_t* log_get_from_idx(log_t* me_, int idx, int *n_etys);

raft_entry_t* log_get_at_idx(log_t* me_, int idx);
Expand All @@ -46,4 +49,8 @@ raft_entry_t *log_peektail(log_t * me_);

int log_get_current_idx(log_t* me_);

int log_load_from_snapshot(log_t *me_, int idx, int term);

int log_get_base(log_t* me_);

#endif /* RAFT_LOG_H_ */
40 changes: 26 additions & 14 deletions include/raft_private.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
/**
* Copyright (c) 2013, Willem-Hendrik Thiart
* Use of this source code is governed by a BSD-style license that can be
* found in the LICENSE file.
*
* @file
* @author Willem Thiart himself@willemthiart.com
*/

#ifndef RAFT_PRIVATE_H_
#define RAFT_PRIVATE_H_

Expand All @@ -8,15 +17,6 @@ enum {
RAFT_NODE_STATUS_DISCONNECTING
};

/**
* Copyright (c) 2013, Willem-Hendrik Thiart
* Use of this source code is governed by a BSD-style license that can be
* found in the LICENSE file.
*
* @file
* @author Willem Thiart himself@willemthiart.com
*/

typedef struct {
/* Persistent state: */

Expand Down Expand Up @@ -44,7 +44,7 @@ typedef struct {

/* amount of time left till timeout */
int timeout_elapsed;

raft_node_t* nodes;
int num_nodes;

Expand All @@ -66,9 +66,15 @@ typedef struct {
/* the log which has a voting cfg change, otherwise -1 */
int voting_cfg_change_log_idx;

/* our membership with the cluster is confirmed (ie. configuration log was
/* Our membership with the cluster is confirmed (ie. configuration log was
* committed) */
int connected;

int snapshot_in_progress;

/* Last compacted snapshot */
int snapshot_last_idx;
int snapshot_last_term;
} raft_server_private_t;

int raft_election_start(raft_server_t* me);
Expand Down Expand Up @@ -118,12 +124,18 @@ int raft_node_has_vote_for_me(raft_node_t* me_);

void raft_node_set_has_sufficient_logs(raft_node_t* me_);

int raft_node_has_sufficient_logs(raft_node_t* me_);

int raft_votes_is_majority(const int nnodes, const int nvotes);

void raft_offer_log(raft_server_t* me_, raft_entry_t* ety, const int idx);

void raft_pop_log(raft_server_t* me_, raft_entry_t* ety, const int idx);

void raft_offer_log(raft_server_t* me_, raft_entry_t* ety, const int idx);
int raft_get_num_snapshottable_logs(raft_server_t* me_);

int raft_node_is_active(raft_node_t* me_);

void raft_node_set_voting_committed(raft_node_t* me_, int voting);

int raft_node_set_addition_committed(raft_node_t* me_, int committed);

#endif /* RAFT_PRIVATE_H_ */
Loading

0 comments on commit 3ee2c46

Please sign in to comment.