Skip to content

Commit

Permalink
DAOS-333 server: Handle storage access errors
Browse files Browse the repository at this point in the history
User storage callbacks may return negative errors smaller than
RAFT_ERR_LAST. These will be returned all the way to the API methods,
which report the errors back to users, so that appropriate handlings may
be performed.

Signed-off-by: Li Wei <wei.g.li@intel.com>
  • Loading branch information
liw authored and willemt committed Nov 21, 2017
1 parent 77fb611 commit 76d56d0
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 86 deletions.
22 changes: 15 additions & 7 deletions include/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#define RAFT_ERR_ONE_VOTING_CHANGE_ONLY -3
#define RAFT_ERR_SHUTDOWN -4
#define RAFT_ERR_NOMEM -5
#define RAFT_ERR_LAST -100

#define RAFT_REQUESTVOTE_ERR_GRANTED 1
#define RAFT_REQUESTVOTE_ERR_NOT_GRANTED 0
Expand Down Expand Up @@ -148,7 +149,8 @@ typedef struct
/* Having the following fields allows us to do less book keeping in
* regards to full fledged RPC */

/** This is the highest log IDX we've received and appended to our log */
/** If success, this is the highest log IDX we've received and appended to
* our log; otherwise, this is the our currentIndex */
int current_idx;

/** The first idx that we received within the appendentries message */
Expand Down Expand Up @@ -619,18 +621,24 @@ void* raft_get_udata(raft_server_t* me);

/** Vote for a server.
* This should be used to reload persistent state, ie. the voted-for field.
* @param[in] node The server to vote for */
void raft_vote(raft_server_t* me_, raft_node_t* node);
* @param[in] node The server to vote for
* @return
* 0 on success */
int raft_vote(raft_server_t* me_, raft_node_t* node);

/** Vote for a server.
* This should be used to reload persistent state, ie. the voted-for field.
* @param[in] nodeid The server to vote for by nodeid */
void raft_vote_for_nodeid(raft_server_t* me_, const int nodeid);
* @param[in] nodeid The server to vote for by nodeid
* @return
* 0 on success */
int raft_vote_for_nodeid(raft_server_t* me_, const int nodeid);

/** Set the current term.
* This should be used to reload persistent state, ie. the current_term field.
* @param[in] term The new current term */
void raft_set_current_term(raft_server_t* me, const int term);
* @param[in] term The new current term
* @return
* 0 on success */
int raft_set_current_term(raft_server_t* me, const int term);

/** Set the commit idx.
* This should be used to reload persistent state, ie. the commit_idx field.
Expand Down
9 changes: 3 additions & 6 deletions include/raft_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,15 @@ int log_count(log_t* me_);

/**
* Delete all logs from this log onwards */
void log_delete(log_t* me_, int idx);
int log_delete(log_t* me_, int idx);

/**
* Empty the queue. */
void log_empty(log_t * me_);

/**
* Remove oldest entry
* @return oldest entry */
void *log_poll(log_t * me_);
* Remove oldest entry. Set *etyp to oldest entry on success. */
int log_poll(log_t * me_, void** etyp);

raft_entry_t* log_get_from_idx(log_t* me_, int idx, int *n_etys);

Expand All @@ -43,8 +42,6 @@ raft_entry_t* log_get_at_idx(log_t* me_, int idx);
* @return youngest entry */
raft_entry_t *log_peektail(log_t * me_);

void log_delete(log_t* me_, int idx);

int log_get_current_idx(log_t* me_);

#endif /* RAFT_LOG_H_ */
8 changes: 2 additions & 6 deletions include/raft_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,12 @@ typedef struct {
int connected;
} raft_server_private_t;

void raft_election_start(raft_server_t* me);
int raft_election_start(raft_server_t* me);

void raft_become_candidate(raft_server_t* me);
int raft_become_candidate(raft_server_t* me);

void raft_become_follower(raft_server_t* me);

void raft_vote(raft_server_t* me, raft_node_t* node);

void raft_set_current_term(raft_server_t* me,int term);

void raft_randomize_election_timeout(raft_server_t* me_);

/**
Expand Down
32 changes: 20 additions & 12 deletions src/raft_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ int log_append_entry(log_t* me_, raft_entry_t* c)
{
void* ud = raft_get_udata(me->raft);
e = me->cb->log_offer(me->raft, ud, &me->entries[me->back], idx);
raft_offer_log(me->raft, &me->entries[me->back], idx);
if (e == RAFT_ERR_SHUTDOWN)
if (0 != e)
return e;
raft_offer_log(me->raft, &me->entries[me->back], idx);
}

me->count++;
Expand Down Expand Up @@ -182,7 +182,7 @@ int log_count(log_t* me_)
return ((log_private_t*)me_)->count;
}

void log_delete(log_t* me_, int idx)
int log_delete(log_t* me_, int idx)
{
log_private_t* me = (log_private_t*)me_;
int end;
Expand All @@ -194,31 +194,39 @@ void log_delete(log_t* me_, int idx)
for (end = log_count(me_); idx < end; idx++)
{
int idx_tmp = me->base + me->count;
if (me->cb && me->cb->log_pop)
me->cb->log_pop(me->raft, raft_get_udata(me->raft),
&me->entries[me->back - 1], idx_tmp);
if (me->cb && me->cb->log_pop) {
int e = me->cb->log_pop(me->raft, raft_get_udata(me->raft),
&me->entries[me->back - 1], idx_tmp);
if (0 != e)
return e;
}
raft_pop_log(me->raft, &me->entries[me->back - 1], idx_tmp);
me->back--;
me->count--;
}
return 0;
}

void *log_poll(log_t * me_)
int log_poll(log_t * me_, void** etyp)
{
log_private_t* me = (log_private_t*)me_;
int idx = me->base + 1;

if (0 == log_count(me_))
return NULL;
return -1;

const void *elem = &me->entries[me->front];
if (me->cb && me->cb->log_poll)
me->cb->log_poll(me->raft, raft_get_udata(me->raft),
&me->entries[me->front], idx);
if (me->cb && me->cb->log_poll) {
int e = me->cb->log_poll(me->raft, raft_get_udata(me->raft),
&me->entries[me->front], idx);
if (0 != e)
return e;
}
me->front++;
me->count--;
me->base++;
return (void*)elem;
*etyp = (void*)elem;
return 0;
}

raft_entry_t *log_peektail(log_t * me_)
Expand Down
Loading

0 comments on commit 76d56d0

Please sign in to comment.