diff --git a/include/raft.h b/include/raft.h index 2ff46bed..53bff120 100644 --- a/include/raft.h +++ b/include/raft.h @@ -14,6 +14,7 @@ #define RAFT_ERR_ONE_VOTING_CHANGE_ONLY -3 #define RAFT_ERR_SHUTDOWN -4 #define RAFT_ERR_NOMEM -5 +#define RAFT_ERR_LAST -100 #define RAFT_REQUESTVOTE_ERR_GRANTED 1 #define RAFT_REQUESTVOTE_ERR_NOT_GRANTED 0 @@ -148,7 +149,8 @@ typedef struct /* Having the following fields allows us to do less book keeping in * regards to full fledged RPC */ - /** This is the highest log IDX we've received and appended to our log */ + /** If success, this is the highest log IDX we've received and appended to + * our log; otherwise, this is the our currentIndex */ int current_idx; /** The first idx that we received within the appendentries message */ @@ -619,18 +621,24 @@ void* raft_get_udata(raft_server_t* me); /** Vote for a server. * This should be used to reload persistent state, ie. the voted-for field. - * @param[in] node The server to vote for */ -void raft_vote(raft_server_t* me_, raft_node_t* node); + * @param[in] node The server to vote for + * @return + * 0 on success */ +int raft_vote(raft_server_t* me_, raft_node_t* node); /** Vote for a server. * This should be used to reload persistent state, ie. the voted-for field. - * @param[in] nodeid The server to vote for by nodeid */ -void raft_vote_for_nodeid(raft_server_t* me_, const int nodeid); + * @param[in] nodeid The server to vote for by nodeid + * @return + * 0 on success */ +int raft_vote_for_nodeid(raft_server_t* me_, const int nodeid); /** Set the current term. * This should be used to reload persistent state, ie. the current_term field. - * @param[in] term The new current term */ -void raft_set_current_term(raft_server_t* me, const int term); + * @param[in] term The new current term + * @return + * 0 on success */ +int raft_set_current_term(raft_server_t* me, const int term); /** Set the commit idx. * This should be used to reload persistent state, ie. the commit_idx field. diff --git a/include/raft_log.h b/include/raft_log.h index 4d978155..4bccf917 100644 --- a/include/raft_log.h +++ b/include/raft_log.h @@ -24,16 +24,15 @@ int log_count(log_t* me_); /** * Delete all logs from this log onwards */ -void log_delete(log_t* me_, int idx); +int log_delete(log_t* me_, int idx); /** * Empty the queue. */ void log_empty(log_t * me_); /** - * Remove oldest entry - * @return oldest entry */ -void *log_poll(log_t * me_); + * Remove oldest entry. Set *etyp to oldest entry on success. */ +int log_poll(log_t * me_, void** etyp); raft_entry_t* log_get_from_idx(log_t* me_, int idx, int *n_etys); @@ -43,8 +42,6 @@ raft_entry_t* log_get_at_idx(log_t* me_, int idx); * @return youngest entry */ raft_entry_t *log_peektail(log_t * me_); -void log_delete(log_t* me_, int idx); - int log_get_current_idx(log_t* me_); #endif /* RAFT_LOG_H_ */ diff --git a/include/raft_private.h b/include/raft_private.h index 23b1cc9e..b4c2a123 100644 --- a/include/raft_private.h +++ b/include/raft_private.h @@ -71,16 +71,12 @@ typedef struct { int connected; } raft_server_private_t; -void raft_election_start(raft_server_t* me); +int raft_election_start(raft_server_t* me); -void raft_become_candidate(raft_server_t* me); +int raft_become_candidate(raft_server_t* me); void raft_become_follower(raft_server_t* me); -void raft_vote(raft_server_t* me, raft_node_t* node); - -void raft_set_current_term(raft_server_t* me,int term); - void raft_randomize_election_timeout(raft_server_t* me_); /** diff --git a/src/raft_log.c b/src/raft_log.c index 89c37e69..0cf75b25 100644 --- a/src/raft_log.c +++ b/src/raft_log.c @@ -119,9 +119,9 @@ int log_append_entry(log_t* me_, raft_entry_t* c) { void* ud = raft_get_udata(me->raft); e = me->cb->log_offer(me->raft, ud, &me->entries[me->back], idx); - raft_offer_log(me->raft, &me->entries[me->back], idx); - if (e == RAFT_ERR_SHUTDOWN) + if (0 != e) return e; + raft_offer_log(me->raft, &me->entries[me->back], idx); } me->count++; @@ -182,7 +182,7 @@ int log_count(log_t* me_) return ((log_private_t*)me_)->count; } -void log_delete(log_t* me_, int idx) +int log_delete(log_t* me_, int idx) { log_private_t* me = (log_private_t*)me_; int end; @@ -194,31 +194,39 @@ void log_delete(log_t* me_, int idx) for (end = log_count(me_); idx < end; idx++) { int idx_tmp = me->base + me->count; - if (me->cb && me->cb->log_pop) - me->cb->log_pop(me->raft, raft_get_udata(me->raft), - &me->entries[me->back - 1], idx_tmp); + if (me->cb && me->cb->log_pop) { + int e = me->cb->log_pop(me->raft, raft_get_udata(me->raft), + &me->entries[me->back - 1], idx_tmp); + if (0 != e) + return e; + } raft_pop_log(me->raft, &me->entries[me->back - 1], idx_tmp); me->back--; me->count--; } + return 0; } -void *log_poll(log_t * me_) +int log_poll(log_t * me_, void** etyp) { log_private_t* me = (log_private_t*)me_; int idx = me->base + 1; if (0 == log_count(me_)) - return NULL; + return -1; const void *elem = &me->entries[me->front]; - if (me->cb && me->cb->log_poll) - me->cb->log_poll(me->raft, raft_get_udata(me->raft), - &me->entries[me->front], idx); + if (me->cb && me->cb->log_poll) { + int e = me->cb->log_poll(me->raft, raft_get_udata(me->raft), + &me->entries[me->front], idx); + if (0 != e) + return e; + } me->front++; me->count--; me->base++; - return (void*)elem; + *etyp = (void*)elem; + return 0; } raft_entry_t *log_peektail(log_t * me_) diff --git a/src/raft_server.c b/src/raft_server.c index b9c2962d..9ab1907d 100644 --- a/src/raft_server.c +++ b/src/raft_server.c @@ -109,7 +109,7 @@ void raft_clear(raft_server_t* me_) log_clear(me->log); } -void raft_delete_entry_from_idx(raft_server_t* me_, int idx) +int raft_delete_entry_from_idx(raft_server_t* me_, int idx) { raft_server_private_t* me = (raft_server_private_t*)me_; @@ -118,10 +118,10 @@ void raft_delete_entry_from_idx(raft_server_t* me_, int idx) if (idx <= me->voting_cfg_change_log_idx) me->voting_cfg_change_log_idx = -1; - log_delete(me->log, idx); + return log_delete(me->log, idx); } -void raft_election_start(raft_server_t* me_) +int raft_election_start(raft_server_t* me_) { raft_server_private_t* me = (raft_server_private_t*)me_; @@ -129,7 +129,7 @@ void raft_election_start(raft_server_t* me_) me->election_timeout_rand, me->timeout_elapsed, me->current_term, raft_get_current_idx(me_)); - raft_become_candidate(me_); + return raft_become_candidate(me_); } void raft_become_leader(raft_server_t* me_) @@ -153,14 +153,16 @@ void raft_become_leader(raft_server_t* me_) } } -void raft_become_candidate(raft_server_t* me_) +int raft_become_candidate(raft_server_t* me_) { raft_server_private_t* me = (raft_server_private_t*)me_; int i; __log(me_, NULL, "becoming candidate"); - raft_set_current_term(me_, raft_get_current_term(me_) + 1); + int e = raft_set_current_term(me_, raft_get_current_term(me_) + 1); + if (0 != e) + return e; for (i = 0; i < me->num_nodes; i++) raft_node_vote_for_me(me->nodes[i], 0); raft_vote(me_, me->node); @@ -173,6 +175,7 @@ void raft_become_candidate(raft_server_t* me_) for (i = 0; i < me->num_nodes; i++) if (me->node != me->nodes[i] && raft_node_is_voting(me->nodes[i])) raft_send_requestvote(me_, me->nodes[i]); + return 0; } void raft_become_follower(raft_server_t* me_) @@ -206,7 +209,11 @@ int raft_periodic(raft_server_t* me_, int msec_since_last_period) { if (1 < raft_get_num_voting_nodes(me_) && raft_node_is_voting(raft_get_my_node(me_))) - raft_election_start(me_); + { + int e = raft_election_start(me_); + if (0 != e) + return e; + } } if (me->last_applied_idx < me->commit_idx) @@ -249,7 +256,9 @@ int raft_recv_appendentries_response(raft_server_t* me_, and convert to follower (§5.3) */ if (me->current_term < r->term) { - raft_set_current_term(me_, r->term); + int e = raft_set_current_term(me_, r->term); + if (0 != e) + return e; raft_become_follower(me_); me->current_leader = NULL; return 0; @@ -257,16 +266,18 @@ int raft_recv_appendentries_response(raft_server_t* me_, else if (me->current_term != r->term) return 0; - /* Stale response -- ignore */ - if (r->current_idx != 0 && r->current_idx <= raft_node_get_match_idx(node)) - return 0; + int match_idx = raft_node_get_match_idx(node); if (0 == r->success) { /* If AppendEntries fails because of log inconsistency: decrement nextIndex and retry (§5.3) */ int next_idx = raft_node_get_next_idx(node); - assert(0 <= next_idx); + assert(0 < next_idx); + /* Stale response -- ignore */ + assert(match_idx <= next_idx - 1); + if (match_idx == next_idx - 1) + return 0; if (r->current_idx < next_idx - 1) raft_node_set_next_idx(node, min(r->current_idx + 1, raft_get_current_idx(me_))); else @@ -277,6 +288,9 @@ int raft_recv_appendentries_response(raft_server_t* me_, return 0; } + if (r->current_idx <= match_idx) + return 0; + assert(r->current_idx <= raft_get_current_idx(me_)); raft_node_set_next_idx(node, r->current_idx + 1); @@ -345,7 +359,7 @@ int raft_recv_appendentries( ae->prev_log_term, ae->n_entries); - r->term = me->current_term; + r->success = 0; if (raft_is_candidate(me_) && me->current_term == ae->term) { @@ -353,8 +367,9 @@ int raft_recv_appendentries( } else if (me->current_term < ae->term) { - raft_set_current_term(me_, ae->term); - r->term = ae->term; + e = raft_set_current_term(me_, ae->term); + if (0 != e) + goto out; raft_become_follower(me_); } else if (ae->term < me->current_term) @@ -362,7 +377,7 @@ int raft_recv_appendentries( /* 1. Reply false if term < currentTerm (§5.1) */ __log(me_, node, "AE term %d is less than current term %d", ae->term, me->current_term); - goto fail_with_current_idx; + goto out; } /* update current leader because ae->term is up to date */ @@ -381,7 +396,7 @@ int raft_recv_appendentries( if (!ety) { __log(me_, node, "AE no log at prev_idx %d", ae->prev_log_idx); - goto fail_with_current_idx; + goto out; } if (ety->term != ae->prev_log_term) @@ -389,12 +404,12 @@ int raft_recv_appendentries( __log(me_, node, "AE term doesn't match prev_term (ie. %d vs %d) ci:%d pli:%d", ety->term, ae->prev_log_term, raft_get_current_idx(me_), ae->prev_log_idx); /* Delete all the following log entries because they don't match */ - raft_delete_entry_from_idx(me_, ae->prev_log_idx); - r->current_idx = ae->prev_log_idx - 1; - goto fail; + e = raft_delete_entry_from_idx(me_, ae->prev_log_idx); + goto out; } } + r->success = 1; r->current_idx = ae->prev_log_idx; /* 3. If an existing entry conflicts with a new one (same index @@ -406,30 +421,24 @@ int raft_recv_appendentries( raft_entry_t* ety = &ae->entries[i]; int ety_index = ae->prev_log_idx + 1 + i; raft_entry_t* existing_ety = raft_get_entry_from_idx(me_, ety_index); - r->current_idx = ety_index; if (existing_ety && existing_ety->term != ety->term && me->commit_idx < ety_index) { - raft_delete_entry_from_idx(me_, ety_index); + e = raft_delete_entry_from_idx(me_, ety_index); + if (0 != e) + goto out; break; } else if (!existing_ety) break; + r->current_idx = ety_index; } /* Pick up remainder in case of mismatch or missing entry */ for (; i < ae->n_entries; i++) { e = raft_append_entry(me_, &ae->entries[i]); - if (RAFT_ERR_SHUTDOWN == e) - { - r->success = 0; - r->first_idx = 0; - return RAFT_ERR_SHUTDOWN; - } - else if (0 != e) - { - goto fail_with_current_idx; - } + if (0 != e) + goto out; r->current_idx = ae->prev_log_idx + 1 + i; } @@ -441,15 +450,11 @@ int raft_recv_appendentries( raft_set_commit_idx(me_, min(last_log_idx, ae->leader_commit)); } - r->success = 1; +out: + r->term = me->current_term; + if (0 == r->success) + r->current_idx = raft_get_current_idx(me_); r->first_idx = ae->prev_log_idx + 1; - return 0; - -fail_with_current_idx: - r->current_idx = raft_get_current_idx(me_); -fail: - r->success = 0; - r->first_idx = 0; return e; } @@ -499,13 +504,18 @@ int raft_recv_requestvote(raft_server_t* me_, msg_requestvote_response_t *r) { raft_server_private_t* me = (raft_server_private_t*)me_; + int e = 0; if (!node) node = raft_get_node(me_, vr->candidate_id); if (raft_get_current_term(me_) < vr->term) { - raft_set_current_term(me_, vr->term); + e = raft_set_current_term(me_, vr->term); + if (0 != e) { + r->vote_granted = 0; + goto done; + } raft_become_follower(me_); me->current_leader = NULL; } @@ -516,8 +526,11 @@ int raft_recv_requestvote(raft_server_t* me_, * Both states would have voted for themselves */ assert(!(raft_is_leader(me_) || raft_is_candidate(me_))); - raft_vote_for_nodeid(me_, vr->candidate_id); - r->vote_granted = 1; + e = raft_vote_for_nodeid(me_, vr->candidate_id); + if (0 == e) + r->vote_granted = 1; + else + r->vote_granted = 0; /* there must be in an election. */ me->current_leader = NULL; @@ -547,7 +560,7 @@ int raft_recv_requestvote(raft_server_t* me_, r->vote_granted == 0 ? "not granted" : "unknown"); r->term = raft_get_current_term(me_); - return 0; + return e; } int raft_votes_is_majority(const int num_nodes, const int nvotes) @@ -574,7 +587,9 @@ int raft_recv_requestvote_response(raft_server_t* me_, } else if (raft_get_current_term(me_) < r->term) { - raft_set_current_term(me_, r->term); + int e = raft_set_current_term(me_, r->term); + if (0 != e) + return e; raft_become_follower(me_); me->current_leader = NULL; return 0; @@ -890,18 +905,22 @@ int raft_get_nvotes_for_me(raft_server_t* me_) return votes; } -void raft_vote(raft_server_t* me_, raft_node_t* node) +int raft_vote(raft_server_t* me_, raft_node_t* node) { - raft_vote_for_nodeid(me_, node ? raft_node_get_id(node) : -1); + return raft_vote_for_nodeid(me_, node ? raft_node_get_id(node) : -1); } -void raft_vote_for_nodeid(raft_server_t* me_, const int nodeid) +int raft_vote_for_nodeid(raft_server_t* me_, const int nodeid) { raft_server_private_t* me = (raft_server_private_t*)me_; + if (me->cb.persist_vote) { + int e = me->cb.persist_vote(me_, me->udata, nodeid); + if (0 != e) + return e; + } me->voted_for = nodeid; - if (me->cb.persist_vote) - me->cb.persist_vote(me_, me->udata, nodeid); + return 0; } int raft_msg_entry_response_committed(raft_server_t* me_, diff --git a/src/raft_server_properties.c b/src/raft_server_properties.c index 468062d2..05531edc 100644 --- a/src/raft_server_properties.c +++ b/src/raft_server_properties.c @@ -82,16 +82,22 @@ int raft_get_voted_for(raft_server_t* me_) return me->voted_for; } -void raft_set_current_term(raft_server_t* me_, const int term) +int raft_set_current_term(raft_server_t* me_, const int term) { raft_server_private_t* me = (raft_server_private_t*)me_; if (me->current_term < term) { - me->current_term = term; - me->voted_for = -1; + int voted_for = -1; if (me->cb.persist_term) - me->cb.persist_term(me_, me->udata, term, me->voted_for); + { + int e = me->cb.persist_term(me_, me->udata, term, voted_for); + if (0 != e) + return e; + } + me->current_term = term; + me->voted_for = voted_for; } + return 0; } int raft_get_current_term(raft_server_t* me_) diff --git a/tests/test_server.c b/tests/test_server.c index 9fdd2f5b..03600d13 100644 --- a/tests/test_server.c +++ b/tests/test_server.c @@ -14,7 +14,6 @@ // TODO: leader doesn't timeout and cause election - static int __raft_persist_term( raft_server_t* raft, void *udata, @@ -1458,6 +1457,127 @@ void TestRaft_follower_recv_appendentries_does_not_add_dupe_entries_already_in_l CuAssertIntEquals(tc, 2, raft_get_log_count(r)); } +typedef enum { + __RAFT_NO_ERR = 0, + __RAFT_LOG_OFFER_ERR, + __RAFT_LOG_POP_ERR +} __raft_error_type_e; + +typedef struct { + __raft_error_type_e type; + int idx; +} __raft_error_t; + +static int __raft_log_offer_error( + raft_server_t* raft, + void *user_data, + raft_entry_t *entry, + int entry_idx) +{ + __raft_error_t *error = user_data; + + if (__RAFT_LOG_OFFER_ERR == error->type && entry_idx == error->idx) + return RAFT_ERR_NOMEM; + return 0; +} + +static int __raft_log_pop_error( + raft_server_t* raft, + void *user_data, + raft_entry_t *entry, + int entry_idx) +{ + __raft_error_t *error = user_data; + + if (__RAFT_LOG_POP_ERR == error->type && entry_idx == error->idx) + return RAFT_ERR_NOMEM; + return 0; +} + +void TestRaft_follower_recv_appendentries_partial_failures( + CuTest * tc) +{ + raft_cbs_t funcs = { + .persist_term = __raft_persist_term, + .log_offer = __raft_log_offer_error, + .log_pop = __raft_log_pop_error + }; + + void *r = raft_new(); + __raft_error_t error = {}; + raft_set_callbacks(r, &funcs, &error); + + raft_add_node(r, NULL, 1, 1); + raft_add_node(r, NULL, 2, 0); + raft_set_current_term(r, 1); + + /* Append entry 1 and 2 of term 1. */ + raft_entry_t ety = {}; + ety.data.buf = "1aa"; + ety.data.len = 3; + ety.id = 1; + ety.term = 1; + raft_append_entry(r, &ety); + ety.data.buf = "1bb"; + ety.data.len = 3; + ety.id = 2; + ety.term = 1; + raft_append_entry(r, &ety); + CuAssertIntEquals(tc, 2, raft_get_current_idx(r)); + + msg_appendentries_t ae; + msg_appendentries_response_t aer; + + /* Receive entry 2 and 3 of term 2. */ + memset(&ae, 0, sizeof(msg_appendentries_t)); + ae.term = 2; + ae.prev_log_idx = 1; + ae.prev_log_term = 1; + msg_entry_t e[2]; + memset(&e, 0, sizeof(msg_entry_t) * 2); + e[0].term = 2; + e[0].id = 2; + e[1].term = 2; + e[1].id = 3; + ae.entries = e; + ae.n_entries = 2; + + /* Ask log_pop to fail at entry 2. */ + error.type = __RAFT_LOG_POP_ERR; + error.idx = 2; + memset(&aer, 0, sizeof(aer)); + int err = raft_recv_appendentries(r, raft_get_node(r, 2), &ae, &aer); + CuAssertIntEquals(tc, RAFT_ERR_NOMEM, err); + CuAssertTrue(tc, 1 == aer.success); + CuAssertIntEquals(tc, 1, aer.current_idx); + CuAssertIntEquals(tc, 2, raft_get_current_idx(r)); + raft_entry_t *tmp = raft_get_entry_from_idx(r, 2); + CuAssertTrue(tc, NULL != tmp); + CuAssertIntEquals(tc, 1, tmp->term); + + /* Ask log_offer to fail at entry 3. */ + error.type = __RAFT_LOG_OFFER_ERR; + error.idx = 3; + memset(&aer, 0, sizeof(aer)); + err = raft_recv_appendentries(r, raft_get_node(r, 2), &ae, &aer); + CuAssertIntEquals(tc, RAFT_ERR_NOMEM, err); + CuAssertTrue(tc, 1 == aer.success); + CuAssertIntEquals(tc, 2, aer.current_idx); + CuAssertIntEquals(tc, 2, raft_get_current_idx(r)); + tmp = raft_get_entry_from_idx(r, 2); + CuAssertTrue(tc, NULL != tmp); + CuAssertIntEquals(tc, 2, tmp->term); + + /* No more errors. */ + memset(&error, 0, sizeof(error)); + memset(&aer, 0, sizeof(aer)); + err = raft_recv_appendentries(r, raft_get_node(r, 2), &ae, &aer); + CuAssertIntEquals(tc, 0, err); + CuAssertTrue(tc, 1 == aer.success); + CuAssertIntEquals(tc, 3, aer.current_idx); + CuAssertIntEquals(tc, 3, raft_get_current_idx(r)); +} + /* If leaderCommit > commitidx, set commitidx = * min(leaderCommit, last log idx) */ void TestRaft_follower_recv_appendentries_set_commitidx_to_prevLogIdx(