- Environment
- Multinode Hello World 3. Installing rebar template for riak_core 4. Hello Multinode!!! 5. Consistent Hashing
- Implementing simple crawler 7. Implementing downloader part 8. Implementing storage part
- Handoff 10. What is handoff 11. Handling handoff 12. See handoff in action
- Fault tolerance
To skip setting up an environment there is already one prepared for this
tutorial: riak_core_env.
In the following chapters I assume that you have the environment
running and do the whole work in RIAK_CORE_ENV/synced/
directory
mentioned in the link.
The commands in this chapter has to be invoked from the VM.
Folks from Basho where kind enough to prepare a rebar template for creating riak_core apps. Apart from creating an application structure it also creates a script for administering the cluster.
Clone the template and install it (under ~/.rebar/templates):
git clone https://github.com/basho/rebar_riak_core
cd rebar_riak_core && make install
The template provided by Basho is quite old. However there are a lot of forks and this one seems to be adjusted to the newest stable version of riak_core that is 1.4.10 at the time of writing this tutorial. For other versions see releases.
Once we have the template let's use it to generate an Erlang app. Enter
the multinode
directory and invoke rebar:
cd ~/synced/multinode && ./rebar create template=riak_core appid=hwmn nodeid=hwmn
Next tweak it a little bit so that we work on the newest stable release
of the beast. Also we need newer lager version. Go and modify freshly
created rebar.config
:
{deps, [
{lager, "2.0.1", {git, "git://github.com/basho/lager", {tag, "2.0.1"}}},
{riak_core, "1.4.10", {git, "git://github.com/basho/riak_core", {tag, "1.4.10"}}}
]}.
We are ready to generate a release with 4 nodes and play with them:
make devrel
for d in dev/dev*; do $d/bin/hwmn start; done
To make sure that we're up and running do:
for d in dev/dev*; do $d/bin/hwmn ping; done
If you're not getting pongs... well I'm sorry - it worked for me.
But do our nodes know anything about each other? Let's check it using
an admin utility:
./dev/dev1/bin/hwmn-admin member_status
The output from the above command should like like this:
================================= Membership ==================================
Status Ring Pending Node
-------------------------------------------------------------------------------
valid 100.0% -- 'hwmn1@127.0.0.1'
-------------------------------------------------------------------------------
This simply means that the nodes ARE NOT in any relation - node hwmn1
knows just about itself. But as you probably already know riak_core
machinery was invented to actually help nodes live together. To join
them, issue:
for d in dev/dev{2,3,4}; do $d/bin/hwmn-admin cluster join hwmn1@127.0.0.1; done
But this is not enough. We've just staged changes to the ring. Before
they take effect we have to confirm the plan and commit. Yeah,
complicated... but move forward:
dev/dev1/bin/hwmn-admin cluster plan
We've just get informed by riak_core what will happen. Trust me and
agree by committing:
dev/dev1/bin/hwmn-admin cluster commit
And check the node's relations again:
./dev/dev1/bin/hwmn-admin member_status
If your output is similar to the following you managed to make a family:
================================= Membership ==================================
Status Ring Pending Node
-------------------------------------------------------------------------------
valid 75.0% 25.0% 'hwmn1@127.0.0.1'
valid 9.4% 25.0% 'hwmn2@127.0.0.1'
valid 7.8% 25.0% 'hwmn3@127.0.0.1'
valid 7.8% 25.0% 'hwmn4@127.0.0.1'
-------------------------------------------------------------------------------
Valid:4 / Leaving:0 / Exiting:0 / Joining:0 / Down:0
Look at the Ring
column. It indicates how much of the key
space is allocated for a particular node. Over time, each node should
cover proportional percentage of a ring.
What is it? After the Riak Glossary:
Consistent hashing is a technique used to limit the reshuffling of keys when a hash-table data structure is rebalanced (when slots are added or removed). Riak uses consistent hashing to organize its data storage and replication. Specifically, the vnodes in the Riak Ring responsible for storing each object are determined using the consistent hashing technique.
Basically, if we want to perform an operation in a particular riak_core virtual node (I'll try to explain later mysterious virtual node
- vnode) and we always want it to be the same vnode for a particular input, we use consistent hasing. And the resulting hash value for a given input stays the same regardless of changes to the ring.
As an exercise we can compute a hash value for some input and make
sure that it's the same over the ring. To do so attach to one of
the nodes:
./dev/dev1/bin/hwmn attach
and run the following snippet:
F = fun() ->
Hashes = [begin
Node = "hwmn" ++ integer_to_list(N) ++ "@127.0.0.1",
rpc:call(list_to_atom(Node), riak_core_util, chash_key, [{<<"please">>, <<"bleed">>}])
end || N <- [1,2,3]],
[OneHash] = lists:usort(Hashes),
OneHash
end.
(Hash = F()) == F().
What is a vnode?
A vnode is a virtual node, as opposed to physical node
- Each vnode is responsible for one partition on the ring
- A vnode is an Erlang process
- A vnode is a behavior written a top of the gen_fsm behavior
- A vnode handles incoming requests
- A vnode potentially stores data to be retrieved later
- A vnode is the unit of concurrency, replication, and fault tolerance
- Typically many vnodes will run on each physical node
Each machine has a vnode master who's purpose is to keep track of all active vnodes on its node.
How can we make a use of a computed Hash
? We can get a list of vnodes
on which we can perform/store something.
riak_core_apl:get_apl(Hash, _N = 2, hwmn).
apl
stands for active preference list.The value of
_N
indicates how many vnodes we want to involve in performing some operation associated withHash
. For example we might want to save an object on two vnodes.
The output from the call looks like this:
[{1301649895747835411525156804137939564381064921088, 'hwmn2@127.0.0.1'},
{1324485858831130769622089379649131486563188867072, 'hwmn3@127.0.0.1'}]
How to read this? The first element of a tuple is a partition (remember
that a vnode is responsible for one partition in the ring?) which is
dedicated to the Hash
and as you guessed the second element is
a node on which the partition sits!
When you're done with playing with the cluster stop the nodes:
for d in dev/dev*; do $d/bin/hwmn stop; done
Awesome, congratulations, great, sweet just fantastic. Hello Multinode completed!
The plan for the next step is to implement a distributed internet crawler that will be able to download websites and store them for later retrieval. The desing is as follows:
- downloading will take place on random vnodes in the cluster;
- a particular vnode will store a content of a given URL;
- an old version of a website will be replaced by a new one;
- the API will be implemented in an
sc.erl
module.
This part of the tutorial requires you to implement missing parts of
the simple_crawler
application that can be found in
RIAK_CORE_ENV/synced/crawler/simple_crawler
. The
application structure is compliant with Erlang/OTP so all the modules
are in apps/sc/src
.
The API for downloader is already implemented in sc:download/1
so
we only need to add a vnode that will be handling the actual download
tasks. A skeleton for the vnode is already there in
sc_downloader_vnode.erl
. Note, that a vnode have to implement
riak_core_vnode
behaviour. Here we're focusing on handle_command/3
callback, that will be invokded by the riak_core_vnode_master:command/3
.
More information on the
riak_core_vnode
callbacks can be found here.
Let's get to coding. First of all, add the asynchronous API to the
vnode. Edit the sc_downloader_vnode.erl
:
-export([start_vnode/1,
download/2]).
...
-define(MASTER, sc_downloader_vnode_master).
...
-spec download({chash:index_as_int(), node()}, binary()) -> term().
download(IdxNode, URL) ->
riak_core_vnode_master:command(IdxNode, {download, URL}, ?MASTER).
MASTER
indicates the ID of the master vnode for downloader vnodes.
Next, implement the command:
...
handle_command({download, URL} = Req, _Sender, State) ->
print_request_info(State#state.partition, node(), Req),
try
Content = download(URL),
store(URL, Content)
catch
throw:{download_error, Reason} ->
?PRINT({request_failed, Req, Reason})
end,
{noreply, State};
...
In the final step provide a specification for
sc_downloader_vnode_master
in sc_sup.erl
and add it to the
supervisor's child list:
...
VDownloaderMaster =
{sc_downloader_vnode_master,
{riak_core_vnode_master, start_link, [sc_downloader_vnode]},
permanent, 5000, worker, [riak_core_vnode_master]},
...
{ok, {{one_for_one, 5, 10}, [VMaster, VDownloaderMaster]}}.
Additionaly, register the vnode in sc_app.erl
:
...
ok = riak_core:register([{vnode_module, sc_downloader_vnode}]),
....
To test our new funcionality stop the whole cluster, clean project, build devrel and form the cluster:
for d in dev/dev*; do $d/bin/sc stop; done
make devclean && make devrel
for d in dev/dev*; do $d/bin/sc start; done
for d in dev/dev{2,3}; do $d/bin/sc-admin join sc1@127.0.0.1; done
This time to make things simpler we won't be staging and committing changes to the riak_core ring. This riak_core rebar template implements such simplified behaviour. And we will get by with 3 nodes.
Once we have all the setup up and running attach to one of the nodes and observe the logs of the other two nodes:
dev/dev1/bin/sc attach
tail -f dev/dev2/log/erlang.log.1
tail -f dev/dev3/log/erlang.log.1
Run the above commands from separate consoles.
Experiment a bit with sc:download/1
API:
[sc:download("http://www.erlang.org") || _ <- lists:seq(1,10)].
Note that the reuqests are serverd by random partitions on different nodes. Effectively it means that requests hit different vnodes (a vnode is responsible for one partition, right?).
"The randomness" is achieved by picking a vnode for random document
index. See sc:get_random_dument_index/0
how it works.
Let's move to our storage system. As above, the API is already
implemented in sc:store/2
and sc:get_content/2
(uncomment
all the lines in these functions). Recall from the design description,
that in this case the same vnode will be chosen for storing or
retrieving data for a particular URL.
Similarly as in the previous example we need a vnode to do our job.
There is already such a vnode implemented sc_storage_vnode.erl
.
Please, have look at its get_content/3
API function. It invokes
the command using riak_core_vnode_master:sync_spawn_command/3
that
is synchronous but does not block the vnode. The difference is also
in the command for retrieving the content as we return a reply.
To get it working you have to take care of the master vnode for
storage in sc_sup.erl
and register the sc_storage_vnode.erl
in the
sc_app.erl
- analogously as with downloader vnode.
The
ID
in the child specification that you need to provide in sc_sup.erl (for example sc_storage_vnode_master) must match the 3rd argument in the call toriak_core_vnode_master:command/3
.
When you're done restart the whole machinery, attach to one node and "tailf" the other nodes' logs:
for d in dev/dev*; do $d/bin/sc stop; done
make devclean && make devrel
for d in dev/dev*; do $d/bin/sc start; done
for d in dev/dev{2,3}; do $d/bin/sc-admin join sc1@127.0.0.1; done
dev/dev1/bin/sc attach
tail -f dev/dev2/log/erlang.log.1
tail -f dev/dev3/log/erlang.log.1
Then download your favorite website and retrieve its content:
sc:download("http://joemonster.org/").
sc:download("http://joemonster.org/").
sc:get_content("http://joemonster.org/").
You would expect that download requests will be served by different
vnodes and each store and get_content requests by the same vnode.
But, hey! what if get_content/1
returns nothing but ok
even though
the request matches the right partition?! Well, it's possible...
The explanation behind this behavior is that, when you start your first node it servers all the partitions which in practice means that it runs all the vnodes of each kind (by default 64 partitions are created). When new nodes join the cluster the partitions are spread across them but it happens in the background - strictly speaking: while the cluster is serving a request it's moving vnodes to other physical nodes in the same time. But riak_core have no idea how to move our data so it's just lost! Terrible ha?
To observe the whole system working as expected you need to wait for
the cluster to come into "stable state". Just check the status:
./dev/dev1/bin/sc-admin member_status
When there're no pending changes it means that no partitions will be moved. Now you can experiment again and make sure, that requests are served by appropriate partitions, vnodes and nodes.
In the next part I'm going to explain how not to lose data while moving vnode to another erlang node: so called handoff.
A handoff occurs when a vnode realizes that it's not on the proper Erlang node. Such a situation can take place when:
- a node is added to or removed from the ring,
- a node comes alive after it has been down.
In riak_core there's a periodic "home check" that verifies whether a vnode uses correct physical node. If that's not true for some vnode it will go into handoff mode and data will be transferred.
When riak_core decides to perform a handoff it calls two functions:
Mod:handoff_starting/2
and Mod:is_empty/1
. Through the first one
a vnode can agree on or not to proceede with the handoff. The second one
indicates if there's any data to be transfered and this one is
interesting for us. The vnode process started in sc_storage_vnode
saves
all the webpages' content into a dict data structure. Thus when a handoff
occurs we want this dict to be transferred.
So lets code sc_storage_vnode:is_empty/1
:
is_empty(State) ->
case dict:size(State#state.store) of
0 ->
{true, State};
_ ->
{false, State}
end.
When the framework decides to start handoff it sends ?FOLD_REQ
that
that says the vnode how to fold over its data. This request is supposed
to be handled in Mod:handle_handoff_command/3
and contains "folding
function" along with an initial accumulator. We should implement
handle_handoff_command/3
as follows:
handle_handoff_command(?FOLD_REQ{foldfun = Fun, acc0=Acc0},
_Sender, State) ->
Acc = dict:fold(Fun, Acc0, State#state.store),
{reply, Acc, State}.
If you're like my and this ?FOLD_REQ looks strange to you have a look at the riak_core_vnode.hrl that reveals that the macro expands to a record.
So, what's next with this magic handoff? Well, at this point things
are simple: each iteration of "folding function" calls
Mod:encode_handoff_item/2
that just do what it is supposed to do:
encode data before sending it to the targe vnode. The target vnode,
decodes the data in Mod:handle_handoff_data/2
. In this tutorial we are
using extremely complex method of encoding so write the following code
in your storage vnode really carefully:
encode_handoff_item(URL, Content) ->
term_to_binary({URL, Content}).
...
handle_handoff_data(Data, State) ->
{URL, Content} = binary_to_term(Data),
Dict = dict:store(URL, Content, State#state.store),
{reply, ok, State#state{store = Dict}}.
And that's it. Our system is now "move-vnode-resistant". One more thing
worth noting here: after the handoff is completed Mod:delete/1
is called and the vnode will be terminated just after this call.
During the termination Mod:terminate/2
will be called too.
I did not present all the callbacks related to handoff. For more information go to a great tutorial here. If you need more details look at the basho wiki.
Now, when we have handoff implemented, build devrel, start the cluster, but only join dev2 to dev1. We want to observe how the partitions are moved:
for d in dev/dev*; do $d/bin/sc stop; done
make devclean && make devrel
for d in dev/dev*; do $d/bin/sc start; done
dev/dev2/bin/sc-admin join sc1@127.0.0.1
Wait for the ring to get populated symmetrically across the two nodes.
Use ./dev/dev1/bin/sc-admin member_status
to check the status.
Next attach to the console of one node from the cluster "tailf" logs of the other node:
dev/dev1/bin/sc attach
tail -f dev/dev2/log/erlang.log.1
Download some websites using sc:get_links/0
. This functions will return
a list of 50 URLS:
[begin sc:download(L), timer:sleep(500) end || L <- sc:get_links()].
After that join the 3rd node and "tailf" its log. Wait for the cluster to get balanced and try to retrieve previously downloaded content using the attached console:
dev/dev3/bin/sc-admin join sc1@127.0.0.1
tail -f dev/dev3/log/erlang.log.1
spawn(fun() -> [begin
sc:get_content(L),
timer:sleep(500)
end || L <- sc:get_links()] end).
You should see that some URL's content is served by the 3rd node, although it joined the cluster after all the sites had been downloaded.
Without destroying the previous setup stop one of the nodes that you know
holds content for some website. Then try to get content of the website.
You should end up with not_found
response:
(sc1@127.0.0.1)9> sc:get_content("http://en.wikipedia.org/").
not_found
To tackle this problem we need to store our data on more that one vnode.
Lets code it in by changing sc:get_index_node/1
so that it gets
number of vnodes:
get_index_node(DocIdx, N) ->
riak_core_apl:get_apl(DocIdx, N, sc),
It also requires us to repair sc:download/1
, sc:store/2
and
sc:get_content/1
. Let's say that we want to store data on 3 vnodes:
download(URL) ->
DocIdx = get_random_document_index(),
IdxNodes = get_index_node(DocIdx, 1),
sc_downloader_vnode:download(IdxNodes, URL).
store(URL, Content) ->
DocIdx = get_index_for_url(URL),
IdxNodes = get_index_node(DocIdx, 3),
sc_storage_vnode:store(IdxNodes, {URL, Content}).
get_content(URL) ->
DocIdx = get_index_for_url(URL),
IdxNodes = get_index_node(DocIdx, 3),
R0 = [sc_storage_vnode:get_content(IN, URL) || IN <- IdxNodes],
R1 = lists:filter(fun(not_found) ->
false;
(_) ->
true
end, R0),
case R1 of
[] ->
not_found;
_ ->
hd(R1)
end.
Try that it really works. First start the cluster, join the nodes,
attach to sc1
and observe logs:
for d in dev/dev*; do $d/bin/sc stop; done
make devclean && make devrel
for d in dev/dev*; do $d/bin/sc start; done
for d in dev/dev{2,3}; do $d/bin/sc-admin join sc1@127.0.0.1; done
dev/dev1/bin/sc attach
tail -f dev/dev2/log/erlang.log.1
tail -f dev/dev3/log/erlang.log.1
Then wait for the ring to get synced and download some sites. In the logs you should see that the same data is duplicated over several vnodes (for the data to be safe at least 2 vnodes should be located on different physical nodes):
[begin sc:download(L), timer:sleep(500) end || L <- sc:get_links()].
Find a request for one website in a node, bring that node down and try to query that website. It should be still availabe on another node that hold a vnode for that website:
sc:get_content("http://en.wikipedia.org/").