From 3ee2c4636a4553e7d55e793be51c0adfb43815c5 Mon Sep 17 00:00:00 2001 From: Willem Thiart Date: Sun, 12 Jun 2016 11:59:33 +0200 Subject: [PATCH] Add log compaction --- Makefile | 2 +- README.rst | 28 +- include/raft.h | 137 +++++++- include/raft_log.h | 7 + include/raft_private.h | 40 ++- src/raft_log.c | 40 ++- src/raft_node.c | 64 +++- src/raft_server.c | 372 ++++++++++++++++++--- src/raft_server_properties.c | 32 +- tests/test_log.c | 47 +++ tests/test_snapshotting.c | 608 +++++++++++++++++++++++++++++++++++ 11 files changed, 1296 insertions(+), 81 deletions(-) create mode 100644 tests/test_snapshotting.c diff --git a/Makefile b/Makefile index b0b925be..47cb30e5 100644 --- a/Makefile +++ b/Makefile @@ -52,7 +52,7 @@ static: $(OBJECTS) ar -r libraft.a $(OBJECTS) .PHONY: tests -tests: src/raft_server.c src/raft_server_properties.c src/raft_log.c src/raft_node.c $(TEST_DIR)/main_test.c $(TEST_DIR)/test_server.c $(TEST_DIR)/test_node.c $(TEST_DIR)/test_log.c $(TEST_DIR)/test_scenario.c $(TEST_DIR)/mock_send_functions.c $(TEST_DIR)/CuTest.c $(LLQUEUE_DIR)/linked_list_queue.c +tests: src/raft_server.c src/raft_server_properties.c src/raft_log.c src/raft_node.c $(TEST_DIR)/main_test.c $(TEST_DIR)/test_server.c $(TEST_DIR)/test_node.c $(TEST_DIR)/test_log.c $(TEST_DIR)/test_snapshotting.c $(TEST_DIR)/test_scenario.c $(TEST_DIR)/mock_send_functions.c $(TEST_DIR)/CuTest.c $(LLQUEUE_DIR)/linked_list_queue.c $(CC) $(CFLAGS) -o tests_main $^ ./tests_main gcov raft_server.c diff --git a/README.rst b/README.rst index 368700f5..bfd6320c 100644 --- a/README.rst +++ b/README.rst @@ -428,7 +428,27 @@ It's highly recommended that when a node is added to the cluster that its node I 3. Once the ``RAFT_LOGTYPE_REMOVE_NODE`` configuration change log is applied in the ``applylog`` callback we shutdown the server if it is to be removed. -Todo -==== - -- Log compaction +Log Compaction +-------------- +The log compaction method supported is called "Snapshotting for memory-based state machines" (Ongaro, 2014) + +This library does not send snapshots (ie. there are NO send_snapshot, recv_snapshot callbacks to implement). The user has to send the snapshot outside of this library. The implementor has to serialize and deserialize the snapshot. + +The process works like this: + +1. Begin snapshotting with ``raft_begin_snapshot``. +2. Save the current membership details to the snapshot. +3. Save the finite state machine to the snapshot. +4. End snapshotting with ``raft_end_snapshot``. +5. When the ``send_snapshot`` callback fires, the user must propogate the snapshot to the other node. +6. Once the peer has the snapshot, they call ``raft_begin_load_snapshot``. +7. Peer calls ``raft_add_node`` to add nodes as per the snapshot's membership info. +8. Peer call s``raft_node_set_voting`` to nodes as per the snapshot's membership info. +9. Peer calls ``raft_node_set_active`` to nodes as per the snapshot's membership info. +10. Finally, peer calls ``raft_node_set_active`` to nodes as per the snapshot's membership info. + +When a node receives a snapshot it could reuse that snapshot itself for other nodes. + +References +========== +Ongaro, D. (2014). Consensus: bridging theory and practice. Retrieved from https://web.stanford.edu/~ouster/cgi-bin/papers/OngaroPhD.pdf diff --git a/include/raft.h b/include/raft.h index 1fb11b2b..7a8953c7 100644 --- a/include/raft.h +++ b/include/raft.h @@ -14,6 +14,9 @@ #define RAFT_ERR_ONE_VOTING_CHANGE_ONLY -3 #define RAFT_ERR_SHUTDOWN -4 #define RAFT_ERR_NOMEM -5 +#define RAFT_ERR_NEEDS_SNAPSHOT -6 +#define RAFT_ERR_SNAPSHOT_IN_PROGRESS -7 +#define RAFT_ERR_SNAPSHOT_ALREADY_LOADED -8 #define RAFT_ERR_LAST -100 #define RAFT_REQUESTVOTE_ERR_GRANTED 1 @@ -59,6 +62,7 @@ typedef enum { * Removing nodes is a 2 step process: first demote, then remove. */ RAFT_LOGTYPE_REMOVE_NODE, + RAFT_LOGTYPE_SNAPSHOT, /** * Users can piggyback the entry mechanism by specifying log types that * are higher than RAFT_LOGTYPE_NUM. @@ -220,6 +224,22 @@ typedef int ( msg_appendentries_t* msg ); +/** + * Log compaction + * Callback for telling the user to send a snapshot. + * + * @param[in] raft Raft server making this callback + * @param[in] user_data User data that is passed from Raft server + * @param[in] node Node's ID that needs a snapshot sent to + **/ +typedef int ( +*func_send_snapshot_f +) ( + raft_server_t* raft, + void *user_data, + raft_node_t* node + ); + /** Callback for detecting when non-voting nodes have obtained enough logs. * This triggers only when there are no pending configuration changes. * @param[in] raft The Raft server making this callback @@ -320,6 +340,9 @@ typedef struct /** Callback for sending appendentries messages */ func_send_appendentries_f send_appendentries; + /** Callback for notifying user that a node needs a snapshot sent */ + func_send_snapshot_f send_snapshot; + /** Callback for finite state machine application * Return 0 on success. * Return RAFT_ERR_SHUTDOWN if you want the server to shutdown. */ @@ -464,7 +487,10 @@ int raft_periodic(raft_server_t* me, int msec_elapsed); * @param[in] node The node who sent us this message * @param[in] ae The appendentries message * @param[out] r The resulting response - * @return 0 on success */ + * @return + * 0 on success + * RAFT_ERR_NEEDS_SNAPSHOT + * */ int raft_recv_appendentries(raft_server_t* me, raft_node_t* node, msg_appendentries_t* ae, @@ -710,6 +736,10 @@ void raft_node_set_voting(raft_node_t* node, int voting); * @return 1 if this is a voting node. Otherwise 0. */ int raft_node_is_voting(raft_node_t* me_); +/** Check if a node has sufficient logs to be able to join the cluster. + **/ +int raft_node_has_sufficient_logs(raft_node_t* me_); + /** Apply all entries up to the commit index * @return * 0 on success; @@ -735,4 +765,109 @@ int raft_entry_is_voting_cfg_change(raft_entry_t* ety); * @return 1 if this is a configuration change. */ int raft_entry_is_cfg_change(raft_entry_t* ety); +raft_entry_t *raft_get_last_applied_entry(raft_server_t *me_); + +/** Begin snapshotting. + * + * While snapshotting, raft will: + * - not apply log entries + * - not start elections + * + * @return 0 on success + * + **/ +int raft_begin_snapshot(raft_server_t *me_); + +/** Stop snapshotting. + * + * The user MUST include membership changes inside the snapshot. This means + * that membership changes are included in the size of the snapshot. For peers + * that load the snapshot, the user needs to deserialize the snapshot to + * obtain the membership changes. + * + * The user MUST compact the log up to the commit index. This means all + * log entries up to the commit index MUST be deleted (aka polled). + * + * @return + * 0 on success + * -1 on failure + **/ +int raft_end_snapshot(raft_server_t *me_); + +/** Get the entry index of the entry that was snapshotted + **/ +int raft_get_snapshot_entry_idx(raft_server_t *me_); + +/** Check is a snapshot is in progress + **/ +int raft_snapshot_is_in_progress(raft_server_t *me_); + +/** Remove the first log entry. + * This should be used for compacting logs. + * @return 0 on success + **/ +int raft_poll_entry(raft_server_t* me_, raft_entry_t **ety); + +/** Get last applied entry + **/ +raft_entry_t *raft_get_last_applied_entry(raft_server_t *me_); + +int raft_get_first_entry_idx(raft_server_t* me_); + +/** Start loading snapshot + * + * This is usually the result of a snapshot being loaded. + * We need to send an appendentries response. + * + * @param[in] last_included_term Term of the last log of the snapshot + * @param[in] last_included_index Index of the last log of the snapshot + * + * @return + * 0 on success + * -1 on failure + * RAFT_ERR_SNAPSHOT_ALREADY_LOADED + **/ +int raft_begin_load_snapshot(raft_server_t *me_, + int last_included_term, + int last_included_index); + +/** Stop loading snapshot. + * + * @return + * 0 on success + * -1 on failure + **/ +int raft_end_load_snapshot(raft_server_t *me_); + +int raft_get_snapshot_last_idx(raft_server_t *me_); + +int raft_get_snapshot_last_term(raft_server_t *me_); + +void raft_set_snapshot_metadata(raft_server_t *me_, int term, int idx); + +/** Check if a node is active. + * Active nodes could become voting nodes. + * This should be used for creating the membership snapshot. + **/ +int raft_node_is_active(raft_node_t* me_); + +/** Make the node active. + * + * The user sets this to 1 between raft_begin_load_snapshot and + * raft_end_load_snapshot. + * + * @param[in] active Set a node as active if this is 1 + **/ +void raft_node_set_active(raft_node_t* me_, int active); + +/** Check if a node's voting status has been committed. + * This should be used for creating the membership snapshot. + **/ +int raft_node_is_voting_committed(raft_node_t* me_); + +/** Check if a node's membership to the cluster has been committed. + * This should be used for creating the membership snapshot. + **/ +int raft_node_is_addition_committed(raft_node_t* me_); + #endif /* RAFT_H_ */ diff --git a/include/raft_log.h b/include/raft_log.h index c1a5d1c8..e5ddd6a5 100644 --- a/include/raft_log.h +++ b/include/raft_log.h @@ -36,6 +36,9 @@ void log_empty(log_t * me_); * Remove oldest entry. Set *etyp to oldest entry on success. */ int log_poll(log_t * me_, void** etyp); +/** Get an array of entries from this index onwards. + * This is used for batching. + */ raft_entry_t* log_get_from_idx(log_t* me_, int idx, int *n_etys); raft_entry_t* log_get_at_idx(log_t* me_, int idx); @@ -46,4 +49,8 @@ raft_entry_t *log_peektail(log_t * me_); int log_get_current_idx(log_t* me_); +int log_load_from_snapshot(log_t *me_, int idx, int term); + +int log_get_base(log_t* me_); + #endif /* RAFT_LOG_H_ */ diff --git a/include/raft_private.h b/include/raft_private.h index 08481ffd..fd341b19 100644 --- a/include/raft_private.h +++ b/include/raft_private.h @@ -1,3 +1,12 @@ +/** + * Copyright (c) 2013, Willem-Hendrik Thiart + * Use of this source code is governed by a BSD-style license that can be + * found in the LICENSE file. + * + * @file + * @author Willem Thiart himself@willemthiart.com + */ + #ifndef RAFT_PRIVATE_H_ #define RAFT_PRIVATE_H_ @@ -8,15 +17,6 @@ enum { RAFT_NODE_STATUS_DISCONNECTING }; -/** - * Copyright (c) 2013, Willem-Hendrik Thiart - * Use of this source code is governed by a BSD-style license that can be - * found in the LICENSE file. - * - * @file - * @author Willem Thiart himself@willemthiart.com - */ - typedef struct { /* Persistent state: */ @@ -44,7 +44,7 @@ typedef struct { /* amount of time left till timeout */ int timeout_elapsed; - + raft_node_t* nodes; int num_nodes; @@ -66,9 +66,15 @@ typedef struct { /* the log which has a voting cfg change, otherwise -1 */ int voting_cfg_change_log_idx; - /* our membership with the cluster is confirmed (ie. configuration log was + /* Our membership with the cluster is confirmed (ie. configuration log was * committed) */ int connected; + + int snapshot_in_progress; + + /* Last compacted snapshot */ + int snapshot_last_idx; + int snapshot_last_term; } raft_server_private_t; int raft_election_start(raft_server_t* me); @@ -118,12 +124,18 @@ int raft_node_has_vote_for_me(raft_node_t* me_); void raft_node_set_has_sufficient_logs(raft_node_t* me_); -int raft_node_has_sufficient_logs(raft_node_t* me_); - int raft_votes_is_majority(const int nnodes, const int nvotes); +void raft_offer_log(raft_server_t* me_, raft_entry_t* ety, const int idx); + void raft_pop_log(raft_server_t* me_, raft_entry_t* ety, const int idx); -void raft_offer_log(raft_server_t* me_, raft_entry_t* ety, const int idx); +int raft_get_num_snapshottable_logs(raft_server_t* me_); + +int raft_node_is_active(raft_node_t* me_); + +void raft_node_set_voting_committed(raft_node_t* me_, int voting); + +int raft_node_set_addition_committed(raft_node_t* me_, int committed); #endif /* RAFT_PRIVATE_H_ */ diff --git a/src/raft_log.c b/src/raft_log.c index 7e64fefa..f8dfa56b 100644 --- a/src/raft_log.c +++ b/src/raft_log.c @@ -75,6 +75,30 @@ static int __ensurecapacity(log_private_t * me) return 0; } +int log_load_from_snapshot(log_t *me_, int idx, int term) +{ + log_private_t* me = (log_private_t*)me_; + + log_clear(me_); + + raft_entry_t ety; + ety.data.len = 0; + ety.id = 1; + ety.term = term; + ety.type = RAFT_LOGTYPE_SNAPSHOT; + + int e = log_append_entry(me_, &ety); + if (e != 0) + { + assert(0); + return e; + } + + me->base = idx - 1; + + return 0; +} + log_t* log_alloc(int initial_size) { log_private_t* me = (log_private_t*)calloc(1, sizeof(log_private_t)); @@ -112,7 +136,8 @@ void log_clear(log_t* me_) me->base = 0; } -int log_append_entry(log_t* me_, raft_entry_t* c) +/** TODO: rename log_append */ +int log_append_entry(log_t* me_, raft_entry_t* ety) { log_private_t* me = (log_private_t*)me_; int idx = me->base + me->count + 1; @@ -122,7 +147,7 @@ int log_append_entry(log_t* me_, raft_entry_t* c) if (e != 0) return e; - memcpy(&me->entries[me->back], c, sizeof(raft_entry_t)); + memcpy(&me->entries[me->back], ety, sizeof(raft_entry_t)); if (me->cb && me->cb->log_offer) { @@ -177,7 +202,7 @@ raft_entry_t* log_get_at_idx(log_t* me_, int idx) if (idx == 0) return NULL; - if (idx < me->base) + if (idx <= me->base) return NULL; if (me->base + me->count < idx) @@ -186,12 +211,8 @@ raft_entry_t* log_get_at_idx(log_t* me_, int idx) /* idx starts at 1 */ idx -= 1; - if (idx - me->base < me->front) - return NULL; - i = (me->front + idx - me->base) % me->size; return &me->entries[i]; - } int log_count(log_t* me_) @@ -288,3 +309,8 @@ int log_get_current_idx(log_t* me_) log_private_t* me = (log_private_t*)me_; return log_count(me_) + me->base; } + +int log_get_base(log_t* me_) +{ + return ((log_private_t*)me_)->base; +} diff --git a/src/raft_node.c b/src/raft_node.c index 4eeea079..82a654a7 100644 --- a/src/raft_node.c +++ b/src/raft_node.c @@ -16,9 +16,12 @@ #include "raft.h" -#define RAFT_NODE_VOTED_FOR_ME 1 -#define RAFT_NODE_VOTING (1 << 1) -#define RAFT_NODE_HAS_SUFFICIENT_LOG (1 << 2) +#define RAFT_NODE_VOTED_FOR_ME (1 << 0) +#define RAFT_NODE_VOTING (1 << 1) +#define RAFT_NODE_HAS_SUFFICIENT_LOG (1 << 2) +#define RAFT_NODE_INACTIVE (1 << 3) +#define RAFT_NODE_VOTING_COMMITTED (1 << 4) +#define RAFT_NODE_ADDITION_COMMITTED (1 << 5) typedef struct { @@ -107,9 +110,15 @@ void raft_node_set_voting(raft_node_t* me_, int voting) { raft_node_private_t* me = (raft_node_private_t*)me_; if (voting) + { + assert(!raft_node_is_voting(me_)); me->flags |= RAFT_NODE_VOTING; + } else + { + assert(raft_node_is_voting(me_)); me->flags &= ~RAFT_NODE_VOTING; + } } int raft_node_is_voting(raft_node_t* me_) @@ -118,16 +127,46 @@ int raft_node_is_voting(raft_node_t* me_) return (me->flags & RAFT_NODE_VOTING) != 0; } +int raft_node_has_sufficient_logs(raft_node_t* me_) +{ + raft_node_private_t* me = (raft_node_private_t*)me_; + return (me->flags & RAFT_NODE_HAS_SUFFICIENT_LOG) != 0; +} + void raft_node_set_has_sufficient_logs(raft_node_t* me_) { raft_node_private_t* me = (raft_node_private_t*)me_; me->flags |= RAFT_NODE_HAS_SUFFICIENT_LOG; } -int raft_node_has_sufficient_logs(raft_node_t* me_) +void raft_node_set_active(raft_node_t* me_, int active) { raft_node_private_t* me = (raft_node_private_t*)me_; - return (me->flags & RAFT_NODE_HAS_SUFFICIENT_LOG) != 0; + if (!active) + me->flags |= RAFT_NODE_INACTIVE; + else + me->flags &= ~RAFT_NODE_INACTIVE; +} + +int raft_node_is_active(raft_node_t* me_) +{ + raft_node_private_t* me = (raft_node_private_t*)me_; + return (me->flags & RAFT_NODE_INACTIVE) == 0; +} + +void raft_node_set_voting_committed(raft_node_t* me_, int voting) +{ + raft_node_private_t* me = (raft_node_private_t*)me_; + if (voting) + me->flags |= RAFT_NODE_VOTING_COMMITTED; + else + me->flags &= ~RAFT_NODE_VOTING_COMMITTED; +} + +int raft_node_is_voting_committed(raft_node_t* me_) +{ + raft_node_private_t* me = (raft_node_private_t*)me_; + return (me->flags & RAFT_NODE_VOTING_COMMITTED) != 0; } int raft_node_get_id(raft_node_t* me_) @@ -135,3 +174,18 @@ int raft_node_get_id(raft_node_t* me_) raft_node_private_t* me = (raft_node_private_t*)me_; return me->id; } + +void raft_node_set_addition_committed(raft_node_t* me_, int committed) +{ + raft_node_private_t* me = (raft_node_private_t*)me_; + if (committed) + me->flags |= RAFT_NODE_ADDITION_COMMITTED; + else + me->flags &= ~RAFT_NODE_ADDITION_COMMITTED; +} + +int raft_node_is_addition_committed(raft_node_t* me_) +{ + raft_node_private_t* me = (raft_node_private_t*)me_; + return (me->flags & RAFT_NODE_ADDITION_COMMITTED) != 0; +} diff --git a/src/raft_server.c b/src/raft_server.c index 6fce1f24..fad848e1 100644 --- a/src/raft_server.c +++ b/src/raft_server.c @@ -70,6 +70,10 @@ raft_server_t* raft_new() me->voting_cfg_change_log_idx = -1; raft_set_state((raft_server_t*)me, RAFT_STATE_FOLLOWER); me->current_leader = NULL; + + me->snapshot_in_progress = 0; + raft_set_snapshot_metadata((raft_server_t*)me, 0, 0); + return (raft_server_t*)me; } @@ -113,7 +117,7 @@ int raft_delete_entry_from_idx(raft_server_t* me_, int idx) { raft_server_private_t* me = (raft_server_private_t*)me_; - assert(me->commit_idx < idx); + assert(raft_get_commit_idx(me_) < idx); if (idx <= me->voting_cfg_change_log_idx) me->voting_cfg_change_log_idx = -1; @@ -143,10 +147,11 @@ void raft_become_leader(raft_server_t* me_) me->timeout_elapsed = 0; for (i = 0; i < me->num_nodes; i++) { - if (me->node == me->nodes[i]) + raft_node_t* node = me->nodes[i]; + + if (me->node == node || !raft_node_is_active(node)) continue; - raft_node_t* node = me->nodes[i]; raft_node_set_next_idx(node, raft_get_current_idx(me_) + 1); raft_node_set_match_idx(node, 0); raft_send_appendentries(me_, node); @@ -173,8 +178,16 @@ int raft_become_candidate(raft_server_t* me_) me->timeout_elapsed = 0; for (i = 0; i < me->num_nodes; i++) - if (me->node != me->nodes[i] && raft_node_is_voting(me->nodes[i])) - raft_send_requestvote(me_, me->nodes[i]); + { + raft_node_t* node = me->nodes[i]; + + if (me->node != node && + raft_node_is_active(node) && + raft_node_is_voting(node)) + { + raft_send_requestvote(me_, node); + } + } return 0; } @@ -205,7 +218,11 @@ int raft_periodic(raft_server_t* me_, int msec_since_last_period) if (me->request_timeout <= me->timeout_elapsed) raft_send_appendentries_all(me_); } - else if (me->election_timeout_rand <= me->timeout_elapsed) + + else if (me->election_timeout_rand <= me->timeout_elapsed && + /* Don't become the leader when building snapshots or bad things will + * happen when we get a client request */ + !raft_snapshot_is_in_progress(me_)) { if (1 < raft_get_num_voting_nodes(me_) && raft_node_is_voting(raft_get_my_node(me_))) @@ -216,8 +233,13 @@ int raft_periodic(raft_server_t* me_, int msec_since_last_period) } } - if (me->last_applied_idx < me->commit_idx) - return raft_apply_entry(me_); + if (me->last_applied_idx < raft_get_commit_idx(me_) && + !raft_snapshot_is_in_progress(me_)) + { + int e = raft_apply_entry(me_); + if (-1 != e) + return e; + } return 0; } @@ -299,6 +321,7 @@ int raft_recv_appendentries_response(raft_server_t* me_, if (!raft_node_is_voting(node) && !raft_voting_change_is_in_progress(me_) && raft_get_current_idx(me_) <= r->current_idx + 1 && + !raft_node_is_voting_committed(node) && me->cb.node_has_sufficient_logs && 0 == raft_node_has_sufficient_logs(node) ) @@ -318,9 +341,11 @@ int raft_recv_appendentries_response(raft_server_t* me_, int i, votes = 1; for (i = 0; i < me->num_nodes; i++) { - if (me->node != me->nodes[i] && - raft_node_is_voting(me->nodes[i]) && - point <= raft_node_get_match_idx(me->nodes[i])) + raft_node_t* node = me->nodes[i]; + if (me->node != node && + raft_node_is_active(node) && + raft_node_is_voting(node) && + point <= raft_node_get_match_idx(node)) { votes++; } @@ -401,8 +426,9 @@ int raft_recv_appendentries( if (ety->term != ae->prev_log_term) { - __log(me_, node, "AE term doesn't match prev_term (ie. %d vs %d) ci:%d pli:%d", - ety->term, ae->prev_log_term, raft_get_current_idx(me_), ae->prev_log_idx); + __log(me_, node, "AE term doesn't match prev_term (ie. %d vs %d) ci:%d comi:%d lcomi:%d pli:%d", + ety->term, ae->prev_log_term, raft_get_current_idx(me_), + raft_get_commit_idx(me_), ae->leader_commit, ae->prev_log_idx); /* Delete all the following log entries because they don't match */ e = raft_delete_entry_from_idx(me_, max(ae->prev_log_idx, ae->leader_commit + 1)); @@ -642,11 +668,18 @@ int raft_recv_entry(raft_server_t* me_, raft_server_private_t* me = (raft_server_private_t*)me_; int i; - /* Only one voting cfg change at a time */ if (raft_entry_is_voting_cfg_change(ety)) + { + /* Only one voting cfg change at a time */ if (raft_voting_change_is_in_progress(me_)) return RAFT_ERR_ONE_VOTING_CHANGE_ONLY; + /* Multi-threading: need to fail here because user might be + * snapshotting membership settings. */ + if (raft_snapshot_is_in_progress(me_)) + return RAFT_ERR_SNAPSHOT_IN_PROGRESS; + } + if (!raft_is_leader(me_)) return RAFT_ERR_NOT_LEADER; @@ -659,16 +692,20 @@ int raft_recv_entry(raft_server_t* me_, return e; for (i = 0; i < me->num_nodes; i++) { - if (me->node == me->nodes[i] || !me->nodes[i] || - !raft_node_is_voting(me->nodes[i])) + raft_node_t* node = me->nodes[i]; + + if (me->node == node || + !node || + !raft_node_is_active(node) || + !raft_node_is_voting(node)) continue; /* Only send new entries. * Don't send the entry to peers who are behind, to prevent them from * becoming congested. */ - int next_idx = raft_node_get_next_idx(me->nodes[i]); + int next_idx = raft_node_get_next_idx(node); if (next_idx == raft_get_current_idx(me_)) - raft_send_appendentries(me_, me->nodes[i]); + raft_send_appendentries(me_, node); } /* if we're the only node, we can consider the entry committed */ @@ -719,8 +756,11 @@ int raft_apply_entry(raft_server_t* me_) { raft_server_private_t* me = (raft_server_private_t*)me_; + if (raft_snapshot_is_in_progress(me_)) + return -1; + /* Don't apply after the commit_idx */ - if (me->last_applied_idx == me->commit_idx) + if (me->last_applied_idx == raft_get_commit_idx(me_)) return -1; int log_idx = me->last_applied_idx + 1; @@ -740,19 +780,40 @@ int raft_apply_entry(raft_server_t* me_) return RAFT_ERR_SHUTDOWN; } - /* Membership Change: confirm connection with cluster */ - if (RAFT_LOGTYPE_ADD_NODE == ety->type) - { - int node_id = me->cb.log_get_node_id(me_, raft_get_udata(me_), ety, log_idx); - raft_node_set_has_sufficient_logs(raft_get_node(me_, node_id)); - if (node_id == raft_get_nodeid(me_)) - me->connected = RAFT_NODE_STATUS_CONNECTED; - } - /* voting cfg change is now complete */ if (log_idx == me->voting_cfg_change_log_idx) me->voting_cfg_change_log_idx = -1; + if (!raft_entry_is_cfg_change(ety)) + return 0; + + int node_id = me->cb.log_get_node_id(me_, raft_get_udata(me_), ety, log_idx); + raft_node_t* node = raft_get_node(me_, node_id); + + switch (ety->type) { + case RAFT_LOGTYPE_ADD_NODE: + raft_node_set_addition_committed(node, 1); + raft_node_set_voting_committed(node, 1); + /* Membership Change: confirm connection with cluster */ + raft_node_set_has_sufficient_logs(node); + if (node_id == raft_get_nodeid(me_)) + me->connected = RAFT_NODE_STATUS_CONNECTED; + break; + case RAFT_LOGTYPE_ADD_NONVOTING_NODE: + raft_node_set_addition_committed(node, 1); + break; + case RAFT_LOGTYPE_DEMOTE_NODE: + if (node) + raft_node_set_voting_committed(node, 0); + break; + case RAFT_LOGTYPE_REMOVE_NODE: + if (node) + raft_remove_node(me_, node); + break; + default: + break; + } + return 0; } @@ -780,7 +841,17 @@ int raft_send_appendentries(raft_server_t* me_, raft_node_t* node) int next_idx = raft_node_get_next_idx(node); + /* figure out if the client needs a snapshot sent */ + if (0 < me->snapshot_last_idx && next_idx < me->snapshot_last_idx) + { + if (me->cb.send_snapshot) + me->cb.send_snapshot(me_, me->udata, node); + return RAFT_ERR_NEEDS_SNAPSHOT; + } + ae.entries = raft_get_entries_from_idx(me_, next_idx, &ae.n_entries); + assert((!ae.entries && 0 == ae.n_entries) || + (ae.entries && 0 < ae.n_entries)); /* previous log is the log just before the new logs */ if (1 < next_idx) @@ -789,6 +860,8 @@ int raft_send_appendentries(raft_server_t* me_, raft_node_t* node) ae.prev_log_idx = next_idx - 1; if (prev_ety) ae.prev_log_term = prev_ety->term; + else + ae.prev_log_term = me->snapshot_last_term; } __log(me_, node, "sending appendentries node: ci:%d comi:%d t:%d lc:%d pli:%d plt:%d", @@ -810,12 +883,12 @@ int raft_send_appendentries_all(raft_server_t* me_) me->timeout_elapsed = 0; for (i = 0; i < me->num_nodes; i++) { - if (me->node != me->nodes[i]) - { - e = raft_send_appendentries(me_, me->nodes[i]); - if (0 != e) - return e; - } + if (me->node == me->nodes[i] || !raft_node_is_active(me->nodes[i])) + continue; + + e = raft_send_appendentries(me_, me->nodes[i]); + if (0 != e) + return e; } return 0; @@ -842,13 +915,12 @@ raft_node_t* raft_add_node(raft_server_t* me_, void* udata, int id, int is_self) node = raft_node_new(udata, id); if (!node) return NULL; - me->num_nodes++; - void* p = realloc(me->nodes, sizeof(void*) * me->num_nodes); + void* p = realloc(me->nodes, sizeof(void*) * (me->num_nodes + 1)); if (!p) { - me->num_nodes--; raft_node_free(node); return NULL; } + me->num_nodes++; me->nodes = p; me->nodes[me->num_nodes - 1] = node; if (is_self) @@ -874,8 +946,10 @@ void raft_remove_node(raft_server_t* me_, raft_node_t* node) { raft_server_private_t* me = (raft_server_private_t*)me_; + assert(node); + int i, found = 0; - for (i = 0; inum_nodes; i++) + for (i = 0; i < me->num_nodes; i++) { if (me->nodes[i] == node) { @@ -896,9 +970,15 @@ int raft_get_nvotes_for_me(raft_server_t* me_) int i, votes; for (i = 0, votes = 0; i < me->num_nodes; i++) - if (me->node != me->nodes[i] && raft_node_is_voting(me->nodes[i])) - if (raft_node_has_vote_for_me(me->nodes[i])) - votes += 1; + { + if (me->node != me->nodes[i] && + raft_node_is_active(me->nodes[i]) && + raft_node_is_voting(me->nodes[i]) && + raft_node_has_vote_for_me(me->nodes[i])) + { + votes += 1; + } + } if (me->voted_for == raft_get_nodeid(me_)) votes += 1; @@ -939,6 +1019,9 @@ int raft_msg_entry_response_committed(raft_server_t* me_, int raft_apply_all(raft_server_t* me_) { + if (raft_snapshot_is_in_progress(me_)) + return 0; + while (raft_get_last_applied_idx(me_) < raft_get_commit_idx(me_)) { int e = raft_apply_entry(me_); @@ -980,8 +1063,15 @@ void raft_offer_log(raft_server_t* me_, raft_entry_t* ety, const int idx) case RAFT_LOGTYPE_ADD_NONVOTING_NODE: if (!is_self) { - raft_node_t* node = raft_add_non_voting_node(me_, NULL, node_id, is_self); - assert(node); + if (node && !raft_node_is_active(node)) + { + raft_node_set_active(node, 1); + } + else if (!node) + { + raft_node_t* node = raft_add_non_voting_node(me_, NULL, node_id, is_self); + assert(node); + } } break; @@ -992,12 +1082,13 @@ void raft_offer_log(raft_server_t* me_, raft_entry_t* ety, const int idx) break; case RAFT_LOGTYPE_DEMOTE_NODE: - raft_node_set_voting(node, 0); + if (node) + raft_node_set_voting(node, 0); break; case RAFT_LOGTYPE_REMOVE_NODE: if (node) - raft_remove_node(me_, node); + raft_node_set_active(node, 0); break; default: @@ -1025,9 +1116,8 @@ void raft_pop_log(raft_server_t* me_, raft_entry_t* ety, const int idx) case RAFT_LOGTYPE_REMOVE_NODE: { - int is_self = node_id == raft_get_nodeid(me_); - raft_node_t* node = raft_add_non_voting_node(me_, NULL, node_id, is_self); - assert(node); + raft_node_t* node = raft_get_node(me_, node_id); + raft_node_set_active(node, 1); } break; @@ -1053,3 +1143,189 @@ void raft_pop_log(raft_server_t* me_, raft_entry_t* ety, const int idx) break; } } + +int raft_poll_entry(raft_server_t* me_, raft_entry_t **ety) +{ + raft_server_private_t* me = (raft_server_private_t*)me_; + + int e = log_poll(me->log, (void*)ety); + if (e != 0) + return e; + assert(*ety != NULL); + + return 0; +} + +int raft_get_first_entry_idx(raft_server_t* me_) +{ + raft_server_private_t* me = (raft_server_private_t*)me_; + + assert(0 < raft_get_current_idx(me_)); + + if (me->snapshot_last_idx == 0) + return 1; + + return me->snapshot_last_idx; +} + +int raft_get_num_snapshottable_logs(raft_server_t *me_) +{ + raft_server_private_t* me = (raft_server_private_t*)me_; + if (raft_get_log_count(me_) <= 1) + return 0; + return raft_get_commit_idx(me_) - log_get_base(me->log); +} + +int raft_begin_snapshot(raft_server_t *me_) +{ + raft_server_private_t* me = (raft_server_private_t*)me_; + + if (raft_get_num_snapshottable_logs(me_) == 0) + return -1; + + int snapshot_target = raft_get_commit_idx(me_); + if (!snapshot_target || snapshot_target == 0) + return -1; + + raft_entry_t* ety = raft_get_entry_from_idx(me_, snapshot_target); + if (!ety) + return -1; + + /* we need to get all the way to the commit idx */ + int e = raft_apply_all(me_); + if (e != 0) + return e; + + assert(raft_get_commit_idx(me_) == raft_get_last_applied_idx(me_)); + + raft_set_snapshot_metadata(me_, ety->term, snapshot_target); + me->snapshot_in_progress = 1; + + __log(me_, NULL, + "begin snapshot sli:%d slt:%d slogs:%d\n", + me->snapshot_last_idx, + me->snapshot_last_term, + raft_get_num_snapshottable_logs(me_)); + + return 0; +} + +int raft_end_snapshot(raft_server_t *me_) +{ + raft_server_private_t* me = (raft_server_private_t*)me_; + + if (!me->snapshot_in_progress || me->snapshot_last_idx == 0) + return -1; + + /* If needed, remove compacted logs */ + int i = raft_get_first_entry_idx(me_), end = raft_get_commit_idx(me_); + for (; i <= end; i++) + { + raft_entry_t* _ety; + int e = raft_poll_entry(me_, &_ety); + if (e != 0) + return -1; + } + + me->snapshot_in_progress = 0; + + if (!raft_is_leader(me_)) + return 0; + + for (i = 0; i < me->num_nodes; i++) + { + raft_node_t* node = me->nodes[i]; + + if (me->node == node || !raft_node_is_active(node)) + continue; + + int next_idx = raft_node_get_next_idx(node); + + /* figure out if the client needs a snapshot sent */ + if (0 < me->snapshot_last_idx && next_idx < me->snapshot_last_idx) + { + if (me->cb.send_snapshot) + me->cb.send_snapshot(me_, me->udata, node); + } + } + + return 0; +} + +int raft_begin_load_snapshot( + raft_server_t *me_, + int last_included_term, + int last_included_index) +{ + raft_server_private_t* me = (raft_server_private_t*)me_; + + if (last_included_index == -1) + return -1; + + if (last_included_index == 0 || last_included_index == 0) + return -1; + + /* loading the snapshot will break cluster safety */ + if (last_included_index < me->last_applied_idx) + return -1; + + /* snapshot was unnecessary */ + if (last_included_index < raft_get_current_idx(me_)) + return -1; + + if (last_included_term == me->snapshot_last_term && last_included_index == me->snapshot_last_idx) + return RAFT_ERR_SNAPSHOT_ALREADY_LOADED; + + me->current_term = last_included_term; + me->voted_for = -1; + raft_set_state((raft_server_t*)me, RAFT_STATE_FOLLOWER); + me->current_leader = NULL; + + log_load_from_snapshot(me->log, last_included_index, last_included_term); + + if (raft_get_commit_idx(me_) < last_included_index) + raft_set_commit_idx(me_, last_included_index); + + me->last_applied_idx = last_included_index; + raft_set_snapshot_metadata(me_, last_included_term, me->last_applied_idx); + + /* remove all nodes but self */ + int i, my_node_by_idx = 0; + for (i = 0; i < me->num_nodes; i++) + { + if (raft_get_nodeid(me_) == raft_node_get_id(me->nodes[i])) + my_node_by_idx = i; + else + raft_node_set_active(me->nodes[i], 0); + } + + /* this will be realloc'd by a raft_add_node */ + me->nodes[0] = me->nodes[my_node_by_idx]; + me->num_nodes = 1; + + __log(me_, NULL, + "loaded snapshot sli:%d slt:%d slogs:%d\n", + me->snapshot_last_idx, + me->snapshot_last_term, + raft_get_num_snapshottable_logs(me_)); + + return 0; +} + +int raft_end_load_snapshot(raft_server_t *me_) +{ + raft_server_private_t* me = (raft_server_private_t*)me_; + int i; + + /* Set nodes' voting status as committed */ + for (i = 0; i < me->num_nodes; i++) + { + raft_node_t* node = me->nodes[i]; + raft_node_set_voting_committed(node, raft_node_is_voting(node)); + raft_node_set_addition_committed(node, 1); + if (raft_node_is_voting(node)) + raft_node_set_has_sufficient_logs(node); + } + + return 0; +} diff --git a/src/raft_server_properties.c b/src/raft_server_properties.c index 05531edc..d441a7bd 100644 --- a/src/raft_server_properties.c +++ b/src/raft_server_properties.c @@ -60,7 +60,7 @@ int raft_get_num_voting_nodes(raft_server_t* me_) raft_server_private_t* me = (raft_server_private_t*)me_; int i, num = 0; for (i = 0; i < me->num_nodes; i++) - if (raft_node_is_voting(me->nodes[i])) + if (raft_node_is_active(me->nodes[i]) && raft_node_is_voting(me->nodes[i])) num++; return num; } @@ -229,3 +229,33 @@ int raft_is_connected(raft_server_t* me_) { return ((raft_server_private_t*)me_)->connected; } + +int raft_snapshot_is_in_progress(raft_server_t *me_) +{ + return ((raft_server_private_t*)me_)->snapshot_in_progress; +} + +raft_entry_t *raft_get_last_applied_entry(raft_server_t *me_) +{ + raft_server_private_t* me = (raft_server_private_t*)me_; + if (raft_get_last_applied_idx(me_) == 0) + return NULL; + return log_get_at_idx(me->log, raft_get_last_applied_idx(me_)); +} + +int raft_get_snapshot_last_idx(raft_server_t *me_) +{ + return ((raft_server_private_t*)me_)->snapshot_last_idx; +} + +int raft_get_snapshot_last_term(raft_server_t *me_) +{ + return ((raft_server_private_t*)me_)->snapshot_last_term; +} + +void raft_set_snapshot_metadata(raft_server_t *me_, int term, int idx) +{ + raft_server_private_t* me = (raft_server_private_t*)me_; + me->snapshot_last_term = term; + me->snapshot_last_idx = idx; +} diff --git a/tests/test_log.c b/tests/test_log.c index c2cb22e7..a9244812 100644 --- a/tests/test_log.c +++ b/tests/test_log.c @@ -322,7 +322,10 @@ void TestLog_poll(CuTest * tc) CuAssertTrue(tc, NULL != ety); CuAssertIntEquals(tc, 2, log_count(l)); CuAssertIntEquals(tc, ety->id, 1); + CuAssertIntEquals(tc, 1, log_get_base(l)); CuAssertTrue(tc, NULL == log_get_at_idx(l, 1)); + CuAssertTrue(tc, NULL != log_get_at_idx(l, 2)); + CuAssertTrue(tc, NULL != log_get_at_idx(l, 3)); CuAssertIntEquals(tc, 3, log_get_current_idx(l)); /* remove 2nd */ @@ -331,7 +334,9 @@ void TestLog_poll(CuTest * tc) CuAssertTrue(tc, NULL != ety); CuAssertIntEquals(tc, 1, log_count(l)); CuAssertIntEquals(tc, ety->id, 2); + CuAssertTrue(tc, NULL == log_get_at_idx(l, 1)); CuAssertTrue(tc, NULL == log_get_at_idx(l, 2)); + CuAssertTrue(tc, NULL != log_get_at_idx(l, 3)); CuAssertIntEquals(tc, 3, log_get_current_idx(l)); /* remove 3rd */ @@ -340,6 +345,8 @@ void TestLog_poll(CuTest * tc) CuAssertTrue(tc, NULL != ety); CuAssertIntEquals(tc, 0, log_count(l)); CuAssertIntEquals(tc, ety->id, 3); + CuAssertTrue(tc, NULL == log_get_at_idx(l, 1)); + CuAssertTrue(tc, NULL == log_get_at_idx(l, 2)); CuAssertTrue(tc, NULL == log_get_at_idx(l, 3)); CuAssertIntEquals(tc, 3, log_get_current_idx(l)); } @@ -379,6 +386,46 @@ void T_estlog_cant_append_duplicates(CuTest * tc) } #endif +void TestLog_load_from_snapshot(CuTest * tc) +{ + void *l; + raft_entry_t e1, e2, e3; + + memset(&e1, 0, sizeof(raft_entry_t)); + memset(&e2, 0, sizeof(raft_entry_t)); + memset(&e3, 0, sizeof(raft_entry_t)); + + l = log_new(); + CuAssertIntEquals(tc, 0, log_get_current_idx(l)); + CuAssertIntEquals(tc, 0, log_load_from_snapshot(l, 10, 5)); + CuAssertIntEquals(tc, 10, log_get_current_idx(l)); + + /* this is just a marker + * it should never be sent to any nodes because it is part of a snapshot */ + CuAssertIntEquals(tc, 1, log_count(l)); +} + +void TestLog_load_from_snapshot_clears_log(CuTest * tc) +{ + void *l; + raft_entry_t e1, e2, e3; + + memset(&e1, 0, sizeof(raft_entry_t)); + memset(&e2, 0, sizeof(raft_entry_t)); + memset(&e3, 0, sizeof(raft_entry_t)); + + l = log_new(); + + CuAssertIntEquals(tc, 0, log_append_entry(l, &e1)); + CuAssertIntEquals(tc, 0, log_append_entry(l, &e2)); + CuAssertIntEquals(tc, 2, log_count(l)); + CuAssertIntEquals(tc, 2, log_get_current_idx(l)); + + CuAssertIntEquals(tc, 0, log_load_from_snapshot(l, 10, 5)); + CuAssertIntEquals(tc, 1, log_count(l)); + CuAssertIntEquals(tc, 10, log_get_current_idx(l)); +} + void TestLog_front_pushes_across_boundary(CuTest * tc) { void* r = __set_up(); diff --git a/tests/test_snapshotting.c b/tests/test_snapshotting.c new file mode 100644 index 00000000..7c0d3a94 --- /dev/null +++ b/tests/test_snapshotting.c @@ -0,0 +1,608 @@ +#include +#include +#include +#include +#include +#include +#include "CuTest.h" + +#include "raft.h" +#include "raft_log.h" +#include "raft_private.h" +#include "mock_send_functions.h" + +static int __raft_persist_term( + raft_server_t* raft, + void *udata, + int term, + int vote + ) +{ + return 0; +} + +static int __raft_persist_vote( + raft_server_t* raft, + void *udata, + int vote + ) +{ + return 0; +} + +static int __raft_applylog( + raft_server_t* raft, + void *udata, + raft_entry_t *ety, + int idx + ) +{ + return 0; +} + +static int __raft_send_requestvote(raft_server_t* raft, + void* udata, + raft_node_t* node, + msg_requestvote_t* msg) +{ + return 0; +} + +static int __raft_send_appendentries(raft_server_t* raft, + void* udata, + raft_node_t* node, + msg_appendentries_t* msg) +{ + return 0; +} + +static int __raft_send_appendentries_capture(raft_server_t* raft, + void* udata, + raft_node_t* node, + msg_appendentries_t* msg) +{ + msg_appendentries_t* msg_captured = (msg_appendentries_t*)udata; + memcpy(msg_captured, msg, sizeof(msg_appendentries_t)); + return 0; +} + +/* static raft_cbs_t generic_funcs = { */ +/* .persist_term = __raft_persist_term, */ +/* .persist_vote = __raft_persist_vote, */ +/* }; */ + +static int max_election_timeout(int election_timeout) +{ + return 2 * election_timeout; +} + +// TODO: don't apply logs while snapshotting +// TODO: don't cause elections while snapshotting + +void TestRaft_leader_begin_snapshot_fails_if_no_logs_to_compact(CuTest * tc) +{ + raft_cbs_t funcs = { + .send_appendentries = __raft_send_appendentries, + }; + + void *r = raft_new(); + raft_set_callbacks(r, &funcs, NULL); + + msg_entry_response_t cr; + + raft_add_node(r, NULL, 1, 1); + raft_add_node(r, NULL, 2, 0); + + /* I am the leader */ + raft_set_state(r, RAFT_STATE_LEADER); + CuAssertIntEquals(tc, 0, raft_get_log_count(r)); + + /* entry message */ + msg_entry_t ety = {}; + ety.id = 1; + ety.data.buf = "entry"; + ety.data.len = strlen("entry"); + + /* receive entry */ + raft_recv_entry(r, &ety, &cr); + ety.id = 2; + raft_recv_entry(r, &ety, &cr); + CuAssertIntEquals(tc, 2, raft_get_log_count(r)); + CuAssertIntEquals(tc, -1, raft_begin_snapshot(r)); + + raft_set_commit_idx(r, 1); + CuAssertIntEquals(tc, 0, raft_begin_snapshot(r)); +} + +void TestRaft_leader_will_not_apply_entry_if_snapshot_is_in_progress(CuTest * tc) +{ + raft_cbs_t funcs = { + .send_appendentries = __raft_send_appendentries, + }; + + void *r = raft_new(); + raft_set_callbacks(r, &funcs, NULL); + + msg_entry_response_t cr; + + raft_add_node(r, NULL, 1, 1); + raft_add_node(r, NULL, 2, 0); + + /* I am the leader */ + raft_set_state(r, RAFT_STATE_LEADER); + CuAssertIntEquals(tc, 0, raft_get_log_count(r)); + + /* entry message */ + msg_entry_t ety = {}; + ety.id = 1; + ety.data.buf = "entry"; + ety.data.len = strlen("entry"); + + /* receive entry */ + raft_recv_entry(r, &ety, &cr); + ety.id = 1; + raft_recv_entry(r, &ety, &cr); + raft_set_commit_idx(r, 1); + CuAssertIntEquals(tc, 2, raft_get_log_count(r)); + + CuAssertIntEquals(tc, 0, raft_begin_snapshot(r)); + CuAssertIntEquals(tc, 1, raft_get_last_applied_idx(r)); + raft_set_commit_idx(r, 2); + CuAssertIntEquals(tc, -1, raft_apply_entry(r)); + CuAssertIntEquals(tc, 1, raft_get_last_applied_idx(r)); +} + +void TestRaft_leader_snapshot_end_fails_if_snapshot_not_in_progress(CuTest * tc) +{ + raft_cbs_t funcs = { + .send_appendentries = __raft_send_appendentries, + }; + + void *r = raft_new(); + raft_set_callbacks(r, &funcs, NULL); + + raft_add_node(r, NULL, 1, 1); + raft_add_node(r, NULL, 2, 0); + + /* I am the leader */ + raft_set_state(r, RAFT_STATE_LEADER); + CuAssertIntEquals(tc, 0, raft_get_log_count(r)); + CuAssertIntEquals(tc, -1, raft_end_snapshot(r)); +} + +void TestRaft_leader_snapshot_begin_fails_if_less_than_2_logs_to_compact(CuTest * tc) +{ + raft_cbs_t funcs = { + .send_appendentries = __raft_send_appendentries, + }; + + void *r = raft_new(); + raft_set_callbacks(r, &funcs, NULL); + + msg_entry_response_t cr; + + raft_add_node(r, NULL, 1, 1); + raft_add_node(r, NULL, 2, 0); + + /* I am the leader */ + raft_set_state(r, RAFT_STATE_LEADER); + CuAssertIntEquals(tc, 0, raft_get_log_count(r)); + + /* entry message */ + msg_entry_t ety = {}; + ety.id = 1; + ety.data.buf = "entry"; + ety.data.len = strlen("entry"); + + /* receive entry */ + raft_recv_entry(r, &ety, &cr); + raft_set_commit_idx(r, 1); + CuAssertIntEquals(tc, 1, raft_get_log_count(r)); + CuAssertIntEquals(tc, -1, raft_begin_snapshot(r)); +} + +void TestRaft_leader_snapshot_end_succeeds_if_log_compacted(CuTest * tc) +{ + raft_cbs_t funcs = { + .persist_term = __raft_persist_term, + .send_appendentries = __raft_send_appendentries, + }; + + void *r = raft_new(); + raft_set_callbacks(r, &funcs, NULL); + + msg_entry_response_t cr; + + raft_add_node(r, NULL, 1, 1); + raft_add_node(r, NULL, 2, 0); + + /* I am the leader */ + raft_set_state(r, RAFT_STATE_LEADER); + raft_set_current_term(r, 1); + CuAssertIntEquals(tc, 0, raft_get_log_count(r)); + + /* entry message */ + msg_entry_t ety = {}; + ety.id = 1; + ety.data.buf = "entry"; + ety.data.len = strlen("entry"); + + /* receive entry */ + raft_recv_entry(r, &ety, &cr); + ety.id = 2; + raft_recv_entry(r, &ety, &cr); + raft_set_commit_idx(r, 1); + CuAssertIntEquals(tc, 2, raft_get_log_count(r)); + CuAssertIntEquals(tc, 1, raft_get_num_snapshottable_logs(r)); + + CuAssertIntEquals(tc, 0, raft_begin_snapshot(r)); + + raft_entry_t* _ety; + int i = raft_get_first_entry_idx(r); + for (; i < raft_get_commit_idx(r); i++) + CuAssertIntEquals(tc, 0, raft_poll_entry(r, &_ety)); + + CuAssertIntEquals(tc, 0, raft_end_snapshot(r)); + CuAssertIntEquals(tc, 0, raft_get_num_snapshottable_logs(r)); + CuAssertIntEquals(tc, 1, raft_get_log_count(r)); + CuAssertIntEquals(tc, 1, raft_get_commit_idx(r)); + CuAssertIntEquals(tc, 1, raft_get_last_applied_idx(r)); + CuAssertIntEquals(tc, 0, raft_periodic(r, 1000)); +} + +void TestRaft_leader_snapshot_end_succeeds_if_log_compacted2(CuTest * tc) +{ + raft_cbs_t funcs = { + .persist_term = __raft_persist_term, + .send_appendentries = __raft_send_appendentries, + }; + + void *r = raft_new(); + raft_set_callbacks(r, &funcs, NULL); + + msg_entry_response_t cr; + + raft_add_node(r, NULL, 1, 1); + raft_add_node(r, NULL, 2, 0); + + /* I am the leader */ + raft_set_state(r, RAFT_STATE_LEADER); + raft_set_current_term(r, 1); + CuAssertIntEquals(tc, 0, raft_get_log_count(r)); + + /* entry message */ + msg_entry_t ety = {}; + ety.id = 1; + ety.data.buf = "entry"; + ety.data.len = strlen("entry"); + + /* receive entry */ + raft_recv_entry(r, &ety, &cr); + ety.id = 2; + raft_recv_entry(r, &ety, &cr); + ety.id = 3; + raft_recv_entry(r, &ety, &cr); + raft_set_commit_idx(r, 2); + CuAssertIntEquals(tc, 3, raft_get_log_count(r)); + CuAssertIntEquals(tc, 2, raft_get_num_snapshottable_logs(r)); + + CuAssertIntEquals(tc, 0, raft_begin_snapshot(r)); + + raft_entry_t* _ety; + int i = raft_get_first_entry_idx(r); + for (; i <= raft_get_commit_idx(r); i++) + CuAssertIntEquals(tc, 0, raft_poll_entry(r, &_ety)); + + CuAssertIntEquals(tc, 0, raft_end_snapshot(r)); + CuAssertIntEquals(tc, 0, raft_get_num_snapshottable_logs(r)); + CuAssertIntEquals(tc, 1, raft_get_log_count(r)); + CuAssertIntEquals(tc, 2, raft_get_commit_idx(r)); + CuAssertIntEquals(tc, 2, raft_get_last_applied_idx(r)); + CuAssertIntEquals(tc, 0, raft_periodic(r, 1000)); +} + +void TestRaft_joinee_needs_to_get_snapshot(CuTest * tc) +{ + raft_cbs_t funcs = { + .send_appendentries = __raft_send_appendentries, + }; + + void *r = raft_new(); + raft_set_callbacks(r, &funcs, NULL); + + msg_entry_response_t cr; + + raft_add_node(r, NULL, 1, 1); + raft_add_node(r, NULL, 2, 0); + + /* I am the leader */ + raft_set_state(r, RAFT_STATE_LEADER); + CuAssertIntEquals(tc, 0, raft_get_log_count(r)); + + /* entry message */ + msg_entry_t ety = {}; + ety.id = 1; + ety.data.buf = "entry"; + ety.data.len = strlen("entry"); + + /* receive entry */ + raft_recv_entry(r, &ety, &cr); + ety.id = 2; + raft_recv_entry(r, &ety, &cr); + raft_set_commit_idx(r, 1); + CuAssertIntEquals(tc, 2, raft_get_log_count(r)); + CuAssertIntEquals(tc, 1, raft_get_num_snapshottable_logs(r)); + + CuAssertIntEquals(tc, 0, raft_begin_snapshot(r)); + CuAssertIntEquals(tc, 1, raft_get_last_applied_idx(r)); + CuAssertIntEquals(tc, -1, raft_apply_entry(r)); + CuAssertIntEquals(tc, 1, raft_get_last_applied_idx(r)); +} + +void TestRaft_follower_load_from_snapshot(CuTest * tc) +{ + raft_cbs_t funcs = { + }; + + void *r = raft_new(); + raft_set_callbacks(r, &funcs, NULL); + + raft_add_node(r, NULL, 1, 1); + raft_add_node(r, NULL, 2, 0); + + raft_set_state(r, RAFT_STATE_FOLLOWER); + CuAssertIntEquals(tc, 0, raft_get_log_count(r)); + + /* entry message */ + msg_entry_t ety = {}; + ety.id = 1; + ety.data.buf = "entry"; + ety.data.len = strlen("entry"); + + CuAssertIntEquals(tc, 0, raft_get_log_count(r)); + CuAssertIntEquals(tc, 0, raft_begin_load_snapshot(r, 5, 5)); + CuAssertIntEquals(tc, 0, raft_end_load_snapshot(r)); + CuAssertIntEquals(tc, 1, raft_get_log_count(r)); + CuAssertIntEquals(tc, 0, raft_get_num_snapshottable_logs(r)); + CuAssertIntEquals(tc, 5, raft_get_commit_idx(r)); + CuAssertIntEquals(tc, 5, raft_get_last_applied_idx(r)); + + CuAssertIntEquals(tc, 0, raft_periodic(r, 1000)); + + /* current idx means snapshot was unnecessary */ + ety.id = 2; + raft_append_entry(r, &ety); + ety.id = 3; + raft_append_entry(r, &ety); + raft_set_commit_idx(r, 7); + CuAssertIntEquals(tc, -1, raft_begin_load_snapshot(r, 6, 5)); + CuAssertIntEquals(tc, 7, raft_get_commit_idx(r)); +} + +void TestRaft_follower_load_from_snapshot_fails_if_already_loaded(CuTest * tc) +{ + raft_cbs_t funcs = { + }; + + void *r = raft_new(); + raft_set_callbacks(r, &funcs, NULL); + + raft_add_node(r, NULL, 1, 1); + raft_add_node(r, NULL, 2, 0); + + raft_set_state(r, RAFT_STATE_FOLLOWER); + CuAssertIntEquals(tc, 0, raft_get_log_count(r)); + + /* entry message */ + msg_entry_t ety = {}; + ety.id = 1; + ety.data.buf = "entry"; + ety.data.len = strlen("entry"); + + CuAssertIntEquals(tc, 0, raft_get_log_count(r)); + CuAssertIntEquals(tc, 0, raft_begin_load_snapshot(r, 5, 5)); + CuAssertIntEquals(tc, 0, raft_end_load_snapshot(r)); + CuAssertIntEquals(tc, 1, raft_get_log_count(r)); + CuAssertIntEquals(tc, 0, raft_get_num_snapshottable_logs(r)); + CuAssertIntEquals(tc, 5, raft_get_commit_idx(r)); + CuAssertIntEquals(tc, 5, raft_get_last_applied_idx(r)); + + CuAssertIntEquals(tc, RAFT_ERR_SNAPSHOT_ALREADY_LOADED, raft_begin_load_snapshot(r, 5, 5)); +} + +void TestRaft_follower_load_from_snapshot_does_not_break_cluster_safety(CuTest * tc) +{ + raft_cbs_t funcs = { + }; + + void *r = raft_new(); + raft_set_callbacks(r, &funcs, NULL); + + raft_add_node(r, NULL, 1, 1); + raft_add_node(r, NULL, 2, 0); + + raft_set_state(r, RAFT_STATE_FOLLOWER); + CuAssertIntEquals(tc, 0, raft_get_log_count(r)); + + /* entry message */ + msg_entry_t ety = {}; + ety.id = 1; + ety.data.buf = "entry"; + ety.data.len = strlen("entry"); + raft_append_entry(r, &ety); + + ety.id = 2; + ety.data.buf = "entry"; + ety.data.len = strlen("entry"); + raft_append_entry(r, &ety); + + ety.id = 3; + ety.data.buf = "entry"; + ety.data.len = strlen("entry"); + raft_append_entry(r, &ety); + + CuAssertIntEquals(tc, -1, raft_begin_load_snapshot(r, 2, 2)); +} + +void TestRaft_follower_load_from_snapshot_fails_if_log_is_newer(CuTest * tc) +{ + raft_cbs_t funcs = { + }; + + void *r = raft_new(); + raft_set_callbacks(r, &funcs, NULL); + + raft_add_node(r, NULL, 1, 1); + raft_add_node(r, NULL, 2, 0); + + raft_set_state(r, RAFT_STATE_FOLLOWER); + CuAssertIntEquals(tc, 0, raft_get_log_count(r)); + + raft_set_last_applied_idx(r, 5); + + /* entry message */ + msg_entry_t ety = {}; + ety.id = 1; + ety.data.buf = "entry"; + ety.data.len = strlen("entry"); + + CuAssertIntEquals(tc, -1, raft_begin_load_snapshot(r, 2, 2)); +} + +void TestRaft_leader_sends_appendentries_when_node_next_index_was_compacted(CuTest* tc) +{ + raft_cbs_t funcs = { + .send_appendentries = __raft_send_appendentries_capture, + }; + + msg_appendentries_t ae; + + void *r = raft_new(); + raft_set_callbacks(r, &funcs, &ae); + + raft_node_t* node; + raft_add_node(r, NULL, 1, 1); + node = raft_add_node(r, NULL, 2, 0); + raft_add_node(r, NULL, 3, 0); + + /* entry 1 */ + char *str = "aaa"; + raft_entry_t ety = {}; + ety.term = 1; + ety.id = 1; + ety.data.buf = str; + ety.data.len = 3; + raft_append_entry(r, &ety); + + /* entry 2 */ + ety.term = 1; + ety.id = 2; + ety.data.buf = str; + ety.data.len = 3; + raft_append_entry(r, &ety); + + /* entry 3 */ + ety.term = 1; + ety.id = 3; + ety.data.buf = str; + ety.data.len = 3; + raft_append_entry(r, &ety); + CuAssertIntEquals(tc, 3, raft_get_current_idx(r)); + + /* compact entry 1 & 2 */ + CuAssertIntEquals(tc, 0, raft_begin_load_snapshot(r, 2, 3)); + CuAssertIntEquals(tc, 0, raft_end_load_snapshot(r)); + CuAssertIntEquals(tc, 3, raft_get_current_idx(r)); + + /* node wants an entry that was compacted */ + raft_node_set_next_idx(node, raft_get_current_idx(r)); + + raft_set_state(r, RAFT_STATE_LEADER); + raft_set_current_term(r, 2); + CuAssertIntEquals(tc, 0, raft_send_appendentries(r, node)); + CuAssertIntEquals(tc, 2, ae.term); + CuAssertIntEquals(tc, 2, ae.prev_log_idx); + CuAssertIntEquals(tc, 2, ae.prev_log_term); +} + +void TestRaft_recv_entry_fails_if_snapshot_in_progress(CuTest* tc) +{ + raft_cbs_t funcs = { + .send_appendentries = __raft_send_appendentries, + }; + + void *r = raft_new(); + raft_set_callbacks(r, &funcs, NULL); + + msg_entry_response_t cr; + + raft_add_node(r, NULL, 1, 1); + raft_add_node(r, NULL, 2, 0); + + /* I am the leader */ + raft_set_state(r, RAFT_STATE_LEADER); + CuAssertIntEquals(tc, 0, raft_get_log_count(r)); + + /* entry message */ + msg_entry_t ety = {}; + ety.id = 1; + ety.data.buf = "entry"; + ety.data.len = strlen("entry"); + + /* receive entry */ + raft_recv_entry(r, &ety, &cr); + ety.id = 2; + raft_recv_entry(r, &ety, &cr); + CuAssertIntEquals(tc, 2, raft_get_log_count(r)); + + raft_set_commit_idx(r, 1); + CuAssertIntEquals(tc, 0, raft_begin_snapshot(r)); + + ety.id = 3; + ety.type = RAFT_LOGTYPE_ADD_NODE; + CuAssertIntEquals(tc, RAFT_ERR_SNAPSHOT_IN_PROGRESS, raft_recv_entry(r, &ety, &cr)); +} + +void TxestRaft_full_snapshot(CuTest * tc) +{ + raft_cbs_t funcs = { + .persist_term = __raft_persist_term, + .send_appendentries = __raft_send_appendentries, + }; + + void *r = raft_new(); + raft_set_callbacks(r, &funcs, NULL); + + msg_entry_response_t cr; + + raft_add_node(r, NULL, 1, 1); + raft_add_node(r, NULL, 2, 0); + + /* I am the leader */ + raft_set_state(r, RAFT_STATE_LEADER); + raft_set_current_term(r, 1); + CuAssertIntEquals(tc, 0, raft_get_log_count(r)); + + /* entry message */ + msg_entry_t ety = {}; + ety.id = 1; + ety.data.buf = "entry"; + ety.data.len = strlen("entry"); + + /* receive entry */ + raft_recv_entry(r, &ety, &cr); + raft_set_commit_idx(r, 1); + CuAssertIntEquals(tc, 1, raft_get_log_count(r)); + CuAssertIntEquals(tc, 1, raft_get_num_snapshottable_logs(r)); + + CuAssertIntEquals(tc, 0, raft_begin_snapshot(r)); + + raft_entry_t* _ety; + int i = raft_get_first_entry_idx(r); + for (; i < raft_get_commit_idx(r); i++) + CuAssertIntEquals(tc, 0, raft_poll_entry(r, &_ety)); + + CuAssertIntEquals(tc, 0, raft_end_snapshot(r)); + CuAssertIntEquals(tc, 0, raft_get_num_snapshottable_logs(r)); + CuAssertIntEquals(tc, 1, raft_get_log_count(r)); +}