Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

19 reconstruct+format #74

Merged
merged 11 commits into from
Oct 6, 2020
56 changes: 42 additions & 14 deletions src/chash.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,21 @@

-module(chash).

-export([contains_name/2, fresh/2, lookup/2, key_of/1,
members/1, merge_rings/2, next_index/2, nodes/1,
predecessors/2, predecessors/3, ring_increment/1,
size/1, successors/2, successors/3, update/3]).
-export([contains_name/2,
fresh/2,
lookup/2,
key_of/1,
members/1,
merge_rings/2,
next_index/2,
nodes/1,
predecessors/2,
predecessors/3,
ring_increment/1,
size/1,
successors/2,
successors/3,
update/3]).

-export_type([chash/0, index/0, index_as_int/0]).

Expand Down Expand Up @@ -144,7 +155,9 @@ next_index(IntegerKey, {NumPartitions, _}) ->
%% @doc Return the entire set of NodeEntries in the ring.
-spec nodes(CHash :: chash()) -> [node_entry()].

nodes(CHash) -> {_NumPartitions, Nodes} = CHash, Nodes.
nodes(CHash) ->
{_NumPartitions, Nodes} = CHash,
Nodes.

%% @doc Given an object key, return all NodeEntries in order starting at Index.
-spec ordered_from(Index :: index(),
Expand Down Expand Up @@ -190,7 +203,8 @@ ring_increment(NumPartitions) ->
-spec size(CHash :: chash()) -> integer().

size(CHash) ->
{_NumPartitions, Nodes} = CHash, length(Nodes).
{_NumPartitions, Nodes} = CHash,
length(Nodes).

%% @doc Given an object key, return all NodeEntries in order starting at Index.
-spec successors(Index :: index(),
Expand All @@ -210,7 +224,9 @@ successors(Index, CHash, N) ->
Ordered = ordered_from(Index, CHash),
{NumPartitions, _Nodes} = CHash,
if Num =:= NumPartitions -> Ordered;
true -> {Res, _} = lists:split(Num, Ordered), Res
true ->
{Res, _} = lists:split(Num, Ordered),
Res
end.

%% @doc Make the partition beginning at IndexAsInt owned by Name'd node.
Expand All @@ -219,7 +235,9 @@ successors(Index, CHash, N) ->

update(IndexAsInt, Name, CHash) ->
{NumPartitions, Nodes} = CHash,
NewNodes = lists:keyreplace(IndexAsInt, 1, Nodes,
NewNodes = lists:keyreplace(IndexAsInt,
1,
Nodes,
{IndexAsInt, Name}),
{NumPartitions, NewNodes}.

Expand Down Expand Up @@ -255,18 +273,27 @@ update_test() ->
% Create a fresh ring...
CHash = chash:fresh(5, Node),
GetNthIndex = fun (N, {_, Nodes}) ->
{Index, _} = lists:nth(N, Nodes), Index
{Index, _} = lists:nth(N, Nodes),
Index
end,
% Test update...
FirstIndex = GetNthIndex(1, CHash),
ThirdIndex = GetNthIndex(3, CHash),
{5,
[{_, NewNode}, {_, Node}, {_, Node}, {_, Node},
{_, Node}, {_, Node}]} =
[{_, NewNode},
{_, Node},
{_, Node},
{_, Node},
{_, Node},
{_, Node}]} =
update(FirstIndex, NewNode, CHash),
{5,
[{_, Node}, {_, Node}, {_, NewNode}, {_, Node},
{_, Node}, {_, Node}]} =
[{_, Node},
{_, Node},
{_, NewNode},
{_, Node},
{_, Node},
{_, Node}]} =
update(ThirdIndex, NewNode, CHash).

contains_test() ->
Expand Down Expand Up @@ -300,7 +327,8 @@ inverse_pred_test() ->

merge_test() ->
CHashA = chash:fresh(8, node_one),
CHashB = chash:update(0, node_one,
CHashB = chash:update(0,
node_one,
chash:fresh(8, node_two)),
CHash = chash:merge_rings(CHashA, CHashB),
?assertEqual(node_one, (chash:lookup(0, CHash))).
Expand Down
85 changes: 49 additions & 36 deletions src/chashbin.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,21 @@
%% -------------------------------------------------------------------
-module(chashbin).

-export([create/1, to_chash/1, to_list/1,
to_list_filter/2, responsible_index/2,
responsible_position/2, index_owner/2,
-export([create/1,
to_chash/1,
to_list/1,
to_list_filter/2,
responsible_index/2,
responsible_position/2,
index_owner/2,
num_partitions/1]).

-export([iterator/2, exact_iterator/2, itr_value/1,
itr_pop/2, itr_next/1, itr_next_while/2]).
-export([iterator/2,
exact_iterator/2,
itr_value/1,
itr_pop/2,
itr_next/1,
itr_next_while/2]).

-export_type([chashbin/0]).

Expand All @@ -51,22 +59,25 @@
-ifndef(namespaced_types).

-record(chashbin,
{size :: pos_integer(), owners :: owners_bin(),
nodes :: erlang:tuple(node())}).
{size :: pos_integer(),
owners :: owners_bin(),
nodes :: erlang:tuple(node())}).

-else.

-record(chashbin,
{size :: pos_integer(), owners :: owners_bin(),
nodes :: erlang:tuple(node())}).
{size :: pos_integer(),
owners :: owners_bin(),
nodes :: erlang:tuple(node())}).

-endif.

-type chashbin() :: #chashbin{}.

-record(iterator,
{pos :: non_neg_integer(), start :: non_neg_integer(),
chbin :: chashbin()}).
{pos :: non_neg_integer(),
start :: non_neg_integer(),
chbin :: chashbin()}).

-type iterator() :: #iterator{}.

Expand All @@ -90,7 +101,8 @@ create({Size, Owners}) ->
-spec to_chash(chashbin()) -> chash:chash().

to_chash(CHBin = #chashbin{size = Size}) ->
L = to_list(CHBin), {Size, L}.
L = to_list(CHBin),
{Size, L}.

%% @doc Convert a `chashbin' to a list of `{Index, Owner}' pairs
-spec to_list(chashbin()) -> [{index(), node()}].
Expand Down Expand Up @@ -136,10 +148,10 @@ responsible_position(HashKey, #chashbin{size = Size}) ->

index_owner(Idx, CHBin) ->
case itr_value(exact_iterator(Idx, CHBin)) of
{Idx, Owner} -> Owner;
_ ->
%% Match the behavior for riak_core_ring:index_owner/2
exit({badmatch, false})
{Idx, Owner} -> Owner;
_ ->
%% Match the behavior for riak_core_ring:index_owner/2
exit({badmatch, false})
end.

%% @doc Return the number of partitions in a given `chashbin'
Expand Down Expand Up @@ -182,8 +194,8 @@ itr_next(Itr = #iterator{pos = Pos, start = Start,
chbin = CHBin}) ->
Pos2 = (Pos + 1) rem CHBin#chashbin.size,
case Pos2 of
Start -> done;
_ -> Itr#iterator{pos = Pos2}
Start -> done;
_ -> Itr#iterator{pos = Pos2}
end.

%% @doc
Expand All @@ -197,21 +209,21 @@ itr_pop(N, Itr = #iterator{pos = Pos, chbin = CHBin}) ->
#chashbin{size = Size, owners = Bin, nodes = Nodes} =
CHBin,
L = case Bin of
<<_:Pos/binary-unit:176, Bin2:N/binary-unit:176,
_/binary>> ->
[{Idx, element(Id, Nodes)}
|| <<Idx:160/integer, Id:16/integer>> <= Bin2];
_ ->
Left = N + Pos - Size,
Skip = Pos - Left,
<<Bin3:Left/binary-unit:176, _:Skip/binary-unit:176,
Bin2/binary>> =
Bin,
L1 = [{Idx, element(Id, Nodes)}
|| <<Idx:160/integer, Id:16/integer>> <= Bin2],
L2 = [{Idx, element(Id, Nodes)}
|| <<Idx:160/integer, Id:16/integer>> <= Bin3],
L1 ++ L2
<<_:Pos/binary-unit:176, Bin2:N/binary-unit:176,
_/binary>> ->
[{Idx, element(Id, Nodes)}
|| <<Idx:160/integer, Id:16/integer>> <= Bin2];
_ ->
Left = N + Pos - Size,
Skip = Pos - Left,
<<Bin3:Left/binary-unit:176, _:Skip/binary-unit:176,
Bin2/binary>> =
Bin,
L1 = [{Idx, element(Id, Nodes)}
|| <<Idx:160/integer, Id:16/integer>> <= Bin2],
L2 = [{Idx, element(Id, Nodes)}
|| <<Idx:160/integer, Id:16/integer>> <= Bin3],
L1 ++ L2
end,
Pos2 = (Pos + N) rem Size,
Itr2 = Itr#iterator{pos = Pos2},
Expand All @@ -223,8 +235,8 @@ itr_pop(N, Itr = #iterator{pos = Pos, chbin = CHBin}) ->

itr_next_while(Pred, Itr) ->
case Pred(itr_value(Itr)) of
false -> Itr;
true -> itr_next_while(Pred, itr_next(Itr))
false -> Itr;
true -> itr_next_while(Pred, itr_next(Itr))
end.

%% ===================================================================
Expand All @@ -245,7 +257,8 @@ create_bin([{Idx, Owner} | Owners], Nodes, Bin) ->
index_position(<<Idx:160/integer>>, CHBin) ->
index_position(Idx, CHBin);
index_position(Idx, #chashbin{size = Size}) ->
Inc = chash:ring_increment(Size), Idx div Inc rem Size.
Inc = chash:ring_increment(Size),
Idx div Inc rem Size.

%% Return iterator pointing to the given index
exact_iterator(<<Idx:160/integer>>, CHBin) ->
Expand Down
8 changes: 6 additions & 2 deletions src/gen_nb_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@
-export([start_link/4]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2, code_change/3]).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).

-define(SERVER, ?MODULE).

Expand Down
27 changes: 14 additions & 13 deletions src/riak_core_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ start(_StartType, _StartArgs) ->
start_riak_core_sup().

stop(_State) ->
logger:info("Stopped application riak_core", []), ok.
logger:info("Stopped application riak_core", []),
ok.

validate_ring_state_directory_exists() ->
riak_core_util:start_app_deps(riak_core),
Expand All @@ -45,23 +46,23 @@ validate_ring_state_directory_exists() ->
case filelib:ensure_dir(filename:join(RingStateDir,
"dummy"))
of
ok -> ok;
{error, RingReason} ->
logger:critical("Ring state directory ~p does not exist, "
"and could not be created: ~p",
[RingStateDir,
riak_core_util:posix_error(RingReason)]),
throw({error, invalid_ring_state_dir})
ok -> ok;
{error, RingReason} ->
logger:critical("Ring state directory ~p does not exist, "
"and could not be created: ~p",
[RingStateDir,
riak_core_util:posix_error(RingReason)]),
throw({error, invalid_ring_state_dir})
end.

start_riak_core_sup() ->
%% Spin up the supervisor; prune ring files as necessary
case riak_core_sup:start_link() of
{ok, Pid} ->
ok = register_applications(),
ok = add_ring_event_handler(),
{ok, Pid};
{error, Reason} -> {error, Reason}
{ok, Pid} ->
ok = register_applications(),
ok = add_ring_event_handler(),
{ok, Pid};
{error, Reason} -> {error, Reason}
end.

register_applications() -> ok.
Expand Down
Loading