From ed345e5b161754709c389c494cc5d8d0743236ad Mon Sep 17 00:00:00 2001 From: cabolanos Date: Thu, 19 Feb 2015 15:02:25 -0500 Subject: [PATCH 1/8] [Doc Fix] Instructions to run tests were added. --- README.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/README.md b/README.md index 20a3784..e91dacd 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,20 @@ being in the top level directory: make all blog +## Running Tests + +To run tests successfully, you need to follow these steps first: + + * Start the database engines: MySQL, PostgreSQL, MongoDB and ElasticSearch + + * For MySQL, PostgreSQL and MongoDB, you need to: + - Create an user (or use defaults) and configure it on `test/test.config` + file. + - Create test database `sumo_test` on each DB. + +NOTE: For MongoDB you first create the test database and then create an user +to access that DB. For more information visit [MongoDB Tutorial](http://docs.mongodb.org/manual/tutorial). + ## Change Log All notable changes to this project will be documented in the From ef8209df3c0e06be6e6bb8e27d384f4a9b7e3572 Mon Sep 17 00:00:00 2001 From: cabolanos Date: Wed, 25 Feb 2015 07:32:14 -0500 Subject: [PATCH 2/8] Riak backend implementation - first commit. --- Makefile | 5 +- rebar.config | 3 +- src/sumo_backend_riak.erl | 132 ++++++++++++++++ src/sumo_store_riak.erl | 267 +++++++++++++++++++++++++++++++++ src/sumo_store_riak_mapred.erl | 127 ++++++++++++++++ test/sumo_basic_SUITE.erl | 14 +- test/sumo_test_people_riak.erl | 29 ++++ test/test.config | 19 ++- 8 files changed, 586 insertions(+), 10 deletions(-) create mode 100644 src/sumo_backend_riak.erl create mode 100644 src/sumo_store_riak.erl create mode 100644 src/sumo_store_riak_mapred.erl create mode 100644 test/sumo_test_people_riak.erl diff --git a/Makefile b/Makefile index 601297c..a2bd4a9 100644 --- a/Makefile +++ b/Makefile @@ -1,13 +1,14 @@ PROJECT = sumo_db -DEPS = lager emysql emongo tirerl epgsql wpool +DEPS = lager emysql emongo tirerl epgsql wpool riakc dep_lager = git https://github.com/basho/lager.git 2.1.0 dep_emysql = git https://github.com/Eonblast/Emysql.git v0.4.1 dep_emongo = git https://github.com/inaka/emongo.git v0.2.1 dep_tirerl = git https://github.com/inaka/tirerl 0.1.0 dep_epgsql = git https://github.com/epgsql/epgsql 2.0.0 -dep_wpool = git https://github.com/inaka/worker_pool.git 1.0 +dep_wpool = git https://github.com/inaka/worker_pool.git 1.0.1 +dep_riakc = git https://github.com/basho/riak-erlang-client.git 2.0.1 TEST_DEPS = mixer dep_mixer = git git://github.com/inaka/mixer.git 0.1.2 diff --git a/rebar.config b/rebar.config index f5f9911..823f04d 100644 --- a/rebar.config +++ b/rebar.config @@ -25,7 +25,8 @@ {tirerl, ".*", {git, "git://github.com/inaka/tirerl", "0.1.0"}}, {epgsql, ".*", {git, "git://github.com/epgsql/epgsql", "2.0.0"}}, {mixer, ".*", {git, "git://github.com/inaka/mixer", "0.1.2"}}, - {worker_pool, ".*", {git, "git://github.com/inaka/worker_pool.git", "1.0"}} + {worker_pool, ".*", {git, "git://github.com/inaka/worker_pool.git", "1.0.1"}}, + {riakc, ".*", {git, "git://github.com/basho/riak-erlang-client", "2.0.1"}} ]}. {xref_warnings, true}. {xref_checks, [undefined_function_calls, undefined_functions, locals_not_used, deprecated_function_calls, deprecated_functions]}. diff --git a/src/sumo_backend_riak.erl b/src/sumo_backend_riak.erl new file mode 100644 index 0000000..d8c1048 --- /dev/null +++ b/src/sumo_backend_riak.erl @@ -0,0 +1,132 @@ +%%% @hidden +%%% @doc Riak storage backend implementation. +%%% +%%% Copyright 2012 Inaka <hello@inaka.net> +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%% @end +%%% @copyright Inaka +%%% +-module(sumo_backend_riak). +-author("Carlos Andres Bolanos "). +-license("Apache License 2.0"). + +-behaviour(gen_server). +-behaviour(sumo_backend). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Exports. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%%% Public API. +-export( + [ get_connection/1, + get_state/1 + ]). + +%%% Exports for sumo_backend +-export( + [ start_link/2 + ]). + +%%% Exports for gen_server +-export( + [ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Types. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-record(state, {conn :: pid(), encoding :: atom(), bucket :: binary()}). +-type state() :: #state{}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% External API. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-spec start_link(atom(), proplists:proplist()) -> {ok, pid()}|term(). +start_link(Name, Options) -> + gen_server:start_link({local, Name}, ?MODULE, Options, []). + +-spec get_connection(atom() | pid()) -> atom(). +get_connection(Name) -> + gen_server:call(Name, get_connection). + +-spec get_state(atom() | pid()) -> state(). +get_state(Name) -> + gen_server:call(Name, get_state). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% gen_server stuff. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-spec init([term()]) -> {ok, state()}. +init(Options) -> + %% Get connection parameters + Host = proplists:get_value(host, Options, "127.0.0.1"), + Port = proplists:get_value(port, Options, 8087), + Opts = riak_opts(Options), + Bucket = iolist_to_binary( + proplists:get_value(bucket, Options, <<"default">>)), + %% Encoding + Encoding = proplists:get_value(encoding, Options, json), + %% Place Riak connection + {ok, Conn} = riakc_pb_socket:start_link(Host, Port, Opts), + %% Initial state + {ok, #state{conn = Conn, encoding = Encoding, bucket = Bucket}}. + +-spec handle_call(term(), term(), state()) -> {reply, term(), state()}. +handle_call(get_connection, _From, State = #state{conn = Conn}) -> + {reply, Conn, State}; +handle_call(get_state, _From, State) -> + {reply, State, State}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Unused Callbacks +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-spec handle_cast(term(), state()) -> {noreply, state()}. +handle_cast(_Msg, State) -> {noreply, State}. + +-spec handle_info(term(), state()) -> {noreply, state()}. +handle_info(_Msg, State) -> {noreply, State}. + +-spec terminate(term(), state()) -> ok. +terminate(_Reason, _State) -> ok. + +-spec code_change(term(), state(), term()) -> {ok, state()}. +code_change(_OldVsn, State, _Extra) -> {ok, State}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% gen_server stuff. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-spec riak_opts([term()]) -> [term()]. +riak_opts(Options) -> + User = proplists:get_value(username, Options), + Pass = proplists:get_value(password, Options), + Opts0 = case User /= undefined andalso Pass /= undefined of + true -> [{credentials, User, Pass}]; + _ -> [] + end, + Opts1 = case lists:keyfind(connect_timeout, 1, Options) of + {_, V1} -> [{connect_timeout, V1}, {auto_reconnect, true}] ++ Opts0; + _ -> [{auto_reconnect, true}] ++ Opts0 + end, + Opts1. diff --git a/src/sumo_store_riak.erl b/src/sumo_store_riak.erl new file mode 100644 index 0000000..c6d346a --- /dev/null +++ b/src/sumo_store_riak.erl @@ -0,0 +1,267 @@ +%%% @hidden +%%% @doc Riak store implementation. +%%% +%%% Copyright 2012 Inaka <hello@inaka.net> +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%% @end +%%% @copyright Inaka +%%% +-module(sumo_store_riak). +-author("Carlos Andres Bolanos "). +-github("https://github.com/inaka"). +-license("Apache License 2.0"). + +-behavior(sumo_store). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Exports. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% Public API. +-export([init/1]). +-export([create_schema/2]). +-export([persist/2]). +-export([delete_by/3, delete_all/2]). +-export([find_all/2, find_all/5, find_by/3, find_by/5, find_by/6]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Types. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-record(state, {conn :: pid(), encoding :: atom(), bucket :: binary()}). +-type state() :: #state{}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% External API. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-spec init(term()) -> {ok, term()}. +init(Options) -> + % The storage backend key in the options specifies the name of the process + % which creates and initializes the storage backend. + Backend = proplists:get_value(storage_backend, Options), + State = sumo_backend_riak:get_state(Backend), + {ok, State}. + +-spec persist(sumo_internal:doc(), state()) -> + sumo_store:result(sumo_internal:doc(), state()). +persist(Doc, #state{conn = Conn, encoding = Enc, bucket = Bucket} = State) -> + DocName = sumo_internal:doc_name(Doc), + IdField = sumo_internal:id_field_name(DocName), + Id = case sumo_internal:get_field(IdField, Doc) of + undefined -> next_id(32); + Id0 -> Id0 + end, + NewDoc = sumo_internal:set_field(IdField, Id, Doc), + Fields = sumo_internal:doc_fields(NewDoc), + EncObj = enc(Fields, Enc), + %% Create Riak Object + Object = riakc_obj:new(Bucket, iolist_to_binary(Id), EncObj, ctype(Enc)), + %% Store the object + case riakc_pb_socket:put(Conn, Object) of + {error, Error} -> + {error, Error, State}; + _ -> + {ok, NewDoc, State} + end. + +-spec delete_by(sumo:schema_name(), sumo:conditions(), state()) -> + sumo_store:result(sumo_store:affected_rows(), state()). +delete_by(DocName, Conditions, #state{conn = Conn, bucket = Bucket} = State) -> + IdField = sumo_internal:id_field_name(DocName), + case lists:keyfind(IdField, 1, Conditions) of + {_K, Key} -> + case riakc_pb_socket:delete(Conn, Bucket, iolist_to_binary(Key)) of + ok -> + {ok, 1, State}; + {error, Error} -> + {error, Error, State} + end; + _ -> + %% @todo query keys that match with conditions and then delete them. + {ok, 1, State} + end. + +-spec delete_all(sumo:schema_name(), state()) -> + sumo_store:result(sumo_store:affected_rows(), state()). +delete_all(_DocName, #state{conn = Conn, bucket = Bucket} = State) -> + %% @todo Optimization required -- this should be a batch op. + %% Asking Riak to generate a list of all keys in a production environment + %% is generally a bad idea. It's an expensive operation. + Delete = fun(K, Acc) -> + case riakc_pb_socket:delete(Conn, Bucket, K) of + ok -> Acc + 1; + {error, _} -> Acc + end + end, + Keys = case riakc_pb_socket:list_keys(Conn, Bucket) of + {ok, LK} -> LK; + _ -> [] + end, + Count = lists:foldl(Delete, 0, Keys), + {ok, Count, State}. + +-spec find_all(sumo:schema_name(), state()) -> + sumo_store:result([sumo_internal:doc()], state()). +find_all(DocName, + #state{conn = Conn, bucket = Bucket, encoding = Enc} = State) -> + %% @todo Optimization required -- this should be a batch op. + %% Asking Riak to generate a list of all keys in a production environment + %% is generally a bad idea. It's an expensive operation. + case riakc_pb_socket:list_keys(Conn, Bucket) of + {ok, Keys} -> + Fun = fun(Item) -> map_to_doc(DocName, Item) end, + Docs = lists:map(Fun, fetch_bulk(Conn, Bucket, Enc, Keys)), + {ok, Docs, State}; + _ -> + {ok, [], State} + end. + +-spec find_all( + sumo:schema_name(), + term(), + non_neg_integer(), + non_neg_integer(), + state() +) -> sumo_store:result([sumo_internal:doc()], state()). +find_all(DocName, _SortFields, Limit, Offset, State) -> + find_by(DocName, [], Limit, Offset, State). + +-spec find_by(sumo:schema_name(), sumo:conditions(), state()) -> + sumo_store:result([sumo_internal:doc()], state()). +find_by(DocName, Conditions, State) -> + find_by(DocName, Conditions, 0, 0, State). + +-spec find_by( + sumo:schema_name(), + sumo:conditions(), + non_neg_integer(), + non_neg_integer(), + state() +) -> sumo_store:result([sumo_internal:doc()], state()). +find_by(DocName, + Conditions, + _Limit, + _Offset, + #state{conn = Conn, bucket = Bucket, encoding = Enc} = State) -> + IdField = sumo_internal:id_field_name(DocName), + case lists:keyfind(IdField, 1, Conditions) of + {_K, Key} -> + case riakc_pb_socket:get(Conn, Bucket, iolist_to_binary(Key)) of + {ok, RiakObj} -> + Val = map_to_doc(DocName, dec(riakc_obj:get_value(RiakObj), Enc)), + {ok, [Val], State}; + {error, Error} -> + {error, Error, State} + end; + _ -> + Query = build_query(Conditions), + case riakc_pb_socket:mapred_bucket(Conn, Bucket, Query) of + {ok, [{_, Results}]} -> + F = fun(Val, Acc) -> [map_to_doc(DocName, dec(Val, Enc)) | Acc] end, + NewRes = lists:foldl(F, [], Results), + {ok, NewRes, State}; + {error, Error} -> + {error, Error, State} + end + end. + +%% XXX We should have a DSL here, to allow querying in a known language +%% to be translated by each driver into its own. +-spec find_by( + sumo:schema_name(), + sumo:conditions(), + term(), + non_neg_integer(), + non_neg_integer(), + state() +) -> sumo_store:result([sumo_internal:doc()], state()). +find_by(_DocName, _Conditions, _SortFields, _Limit, _Offset, State) -> + {error, not_supported, State}. + +%% XXX: Refactor: +%% Requires {length, X} to be the first field attribute in order to form the +%% correct query. :P +%% If no indexes are defined, will put an extra comma :P +%% Maybe it would be better to just use ALTER statements instead of trying to +%% create the schema on the 1st pass. Also, ALTER statements might be better +%% for when we have migrations. +-spec create_schema(sumo:schema(), state()) -> sumo_store:result(state()). +create_schema(_Schema, State) -> + %% @todo Search Schema (Solr), and probably create 2i into the given schema + {ok, State}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Private API. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +ctype(bert) -> "application/x-erlang-binary"; +ctype(_) -> "application/json". + +enc(Data, Encoding) -> + case Encoding of + bert -> + term_to_binary(Data, [compressed]); + _ -> + jiffy:encode(Data, [uescape]) + end. + +dec(Data, Encoding) -> + case Encoding of + bert -> + binary_to_term(Data); + _ -> + jiffy:decode(Data, [return_maps]) + end. + +map_to_doc(DocName, Item) -> + Fun = fun (Key, Doc) -> + FieldName = binary_to_atom(Key, utf8), + Value = maps:get(Key, Item), + sumo_internal:set_field(FieldName, Value, Doc) + end, + Keys = maps:keys(Item), + lists:foldl(Fun, sumo_internal:new_doc(DocName), Keys). + +next_id(Len) -> + <> = crypto:strong_rand_bytes(12), + random:seed({A1, A2, A3}), + Chrs = list_to_tuple("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"), + ChrsSize = size(Chrs), + F = fun(_, R) -> [element(random:uniform(ChrsSize), Chrs) | R] end, + lists:foldl(F, "", lists:seq(1, Len)). + +fetch_bulk(Conn, Bucket, Enc, Keys) -> + F = fun(K, Acc) -> + case riakc_pb_socket:get(Conn, Bucket, K) of + {ok, RiakObj} -> + Val = riakc_obj:get_value(RiakObj), + [dec(Val, Enc) | Acc]; + {error, _} -> + Acc + end + end, + lists:foldl(F, [], Keys). + +norm_conditions(Conditions) -> + F = fun({X, Y}, Acc) when is_list(Y) -> + [{atom_to_binary(X, utf8), iolist_to_binary(Y)} | Acc]; + ({X, Y}, Acc) -> + [{atom_to_binary(X, utf8), Y} | Acc] + end, + lists:foldl(F, [], Conditions). + +build_query(Conditions) -> + [{map, {modfun, sumo_store_riak_mapred, map_object_value_by_index}, + norm_conditions(Conditions), true}]. diff --git a/src/sumo_store_riak_mapred.erl b/src/sumo_store_riak_mapred.erl new file mode 100644 index 0000000..8cf8357 --- /dev/null +++ b/src/sumo_store_riak_mapred.erl @@ -0,0 +1,127 @@ +%%% @hidden +%%% @doc convenience functions for defining common map/reduce phases +%%% +%%% Copyright 2012 Inaka <hello@inaka.net> +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%% @end +%%% @copyright Inaka +%%% +-module(sumo_store_riak_mapred). +-author("Carlos Andres Bolanos "). +-github("https://github.com/inaka"). +-license("Apache License 2.0"). + +%% API +-export([map_object_key/3, + map_object_value/3, + map_object_kv/3, + map_object_key_by_index/3, + map_object_value_by_index/3, + map_object_kv_by_index/3]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% API - Map/Reduce Functions. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% @spec map_object_key(riak_object:riak_object(), term(), term()) -> [term()] +%% @doc map phase function +%% Return a list that contains the key of each object named by +%% BucketKeys. +%% If the RiakObject is the tuple {error, notfound}, the +%% behavior of this function is produce no output (literally []). +map_object_key({error, notfound}, _KeyData, _Arg) -> + []; +map_object_key(Object, _KeyData, _Arg) -> + [return(k, Object)]. + +%% @spec map_object_value(riak_object:riak_object(), term(), term()) -> [term()] +%% @doc map phase function +%% Return a list that contains the value of each object named by +%% BucketKeys. +%% If the RiakObject is the tuple {error, notfound}, the +%% behavior of this function is produce no output (literally []). +map_object_value({error, notfound}, _KeyData, _Arg) -> + []; +map_object_value(Object, _KeyData, _Arg) -> + [return(v, Object)]. + +%% @spec map_object_kv(riak_object:riak_object(), term(), term()) -> [term()] +%% @doc map phase function +%% Return a list that contains the key/value pair of each object named by +%% BucketKeys. +%% If the RiakObject is the tuple {error, notfound}, the +%% behavior of this function is produce no output (literally []). +map_object_kv({error, notfound}, _KeyData, _Arg) -> + []; +map_object_kv(Object, _KeyData, _Arg) -> + [return(kv, Object)]. + +%% @spec map_object_key_by_index(riak_object:riak_object(), term(), term()) -> +%% [term()] +%% @doc map phase function +%% Return a list that contains the key of each object named by +%% BucketKeys, that match with the specified indices. +%% If the RiakObject is the tuple {error, notfound}, the +%% behavior of this function is produce no output (literally []). +map_object_key_by_index({error, notfound}, _KeyData, _Arg) -> + []; +map_object_key_by_index(Object, KeyData, Arg) -> + find_by_index(Object, KeyData, Arg, k). + +%% @spec map_object_value_by_index(riak_object:riak_object(), term(), term()) -> +%% [term()] +%% @doc map phase function +%% Return a list that contains the value of each object named by +%% BucketKeys, that match with the specified indices. +%% If the RiakObject is the tuple {error, notfound}, the +%% behavior of this function is produce no output (literally []). +map_object_value_by_index({error, notfound}, _KeyData, _Arg) -> + []; +map_object_value_by_index(Object, KeyData, Arg) -> + find_by_index(Object, KeyData, Arg, v). + +%% @spec map_object_kv_by_index(riak_object:riak_object(), term(), term()) -> +%% [term()] +%% @doc map phase function +%% Return a list that contains the key/value pair of each object named by +%% BucketKeys, that match with the specified indices. +%% If the RiakObject is the tuple {error, notfound}, the +%% behavior of this function is produce no output (literally []). +map_object_kv_by_index({error, notfound}, _KeyData, _Arg) -> + []; +map_object_kv_by_index(Object, KeyData, Arg) -> + find_by_index(Object, KeyData, Arg, kv). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Internals. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% @private +find_by_index(O, _, IdxL, Ret) -> + {struct, Map} = mochijson2:decode(riak_object:get_value(O)), + Deleted = dict:is_key(<<"X-Riak-Deleted">>, riak_object:get_metadata(O)), + F = fun({X, Y}, Acc) -> (proplists:get_value(X, Map) =:= Y) and Acc end, + Eval = lists:foldl(F, not Deleted, IdxL), + case Eval of + true -> [return(Ret, O)]; + false -> [] + end. + +%% @private +return(k, O) -> + riak_object:key(O); +return(v, O) -> + riak_object:get_value(O); +return(kv, O) -> + {riak_object:key(O), riak_object:get_value(O)}. diff --git a/test/sumo_basic_SUITE.erl b/test/sumo_basic_SUITE.erl index f9a1356..d8414e2 100644 --- a/test/sumo_basic_SUITE.erl +++ b/test/sumo_basic_SUITE.erl @@ -87,15 +87,20 @@ init_store(Module) -> sumo:persist(Module, Module:new(<<"E">>, <<"A">>, 2)), sumo:persist(Module, Module:new(<<"F">>, <<"E">>, 1)), - %% This is necessary to get elasticsearch in - %% particular to index its stuff. - timer:sleep(1000). + %% 1. Necessary to get elasticsearch in particular to index its stuff. + %% 2. Necessary to Riak. + %% Riak clusters will retain deleted objects for some period of time + %% (3 seconds by default), and the MapReduce framework does not conceal + %% these from submitted jobs. + %% @see + timer:sleep(3000). find_all_module(Module) -> 6 = length(sumo:find_all(Module)). find_by_module(Module) -> Results = sumo:find_by(Module, [{last_name, <<"D">>}]), + 2 = length(Results), SortFun = fun(A, B) -> Module:name(A) < Module:name(B) end, [First, Second | _] = lists:sort(SortFun, Results), @@ -121,7 +126,8 @@ run_all_stores(Fun) -> Modules = [sumo_test_people_mysql, sumo_test_people_mongo, sumo_test_people_elasticsearch, - sumo_test_people_pgsql], + sumo_test_people_pgsql, + sumo_test_people_riak], lists:foreach(Fun, Modules). -spec to_str(any()) -> string(). diff --git a/test/sumo_test_people_riak.erl b/test/sumo_test_people_riak.erl new file mode 100644 index 0000000..c760248 --- /dev/null +++ b/test/sumo_test_people_riak.erl @@ -0,0 +1,29 @@ +-module(sumo_test_people_riak). + +-behavior(sumo_doc). + +-include_lib("mixer/include/mixer.hrl"). +-mixin([{sumo_test_people, + [sumo_wakeup/1, + sumo_sleep/1, + new/2, + new/3, + new/4, + name/1, + id/1 + ] + } + ]). + +-export([sumo_schema/0]). + +-spec sumo_schema() -> sumo:schema(). +sumo_schema() -> + Fields = + [sumo:new_field(id, string, [id, not_null, auto_increment]), + sumo:new_field(name, string, [{length, 255}, not_null]), + sumo:new_field(last_name, string, [{length, 255}, not_null]), + sumo:new_field(age, integer), + sumo:new_field(address, string, [{length, 255}]) + ], + sumo:new_schema(?MODULE, Fields). diff --git a/test/test.config b/test/test.config index ecd77fb..b9d8220 100644 --- a/test/test.config +++ b/test/test.config @@ -36,8 +36,15 @@ [{host, "127.0.0.1"}, {port, 5432}, {database, "sumo_test"}, - {username, "jfacorro"}, - {password, ""}] + {username, "root"}, + {password, "pass"}] + }, + {sumo_test_backend_riak, + sumo_backend_riak, + [{host, "127.0.0.1"}, + {port, 8087}, + {bucket, "sumo_test"}, + {encoding, json}] } ] }, @@ -61,6 +68,11 @@ sumo_store_pgsql, [{storage_backend, sumo_test_backend_pgsql}, {workers, 10}] + }, + {sumo_test_riak, + sumo_store_riak, + [{storage_backend, sumo_test_backend_riak}, + {workers, 10}] } ] }, @@ -69,7 +81,8 @@ {sumo_test_people_mysql, sumo_test_mysql}, {sumo_test_people_mongo, sumo_test_mongo}, {sumo_test_people_elasticsearch, sumo_test_elasticsearch}, - {sumo_test_people_pgsql, sumo_test_pgsql} + {sumo_test_people_pgsql, sumo_test_pgsql}, + {sumo_test_people_riak, sumo_test_riak} ] }, {events, From 28bbad5a96a9b53df3046dc13f350aba30b044bc Mon Sep 17 00:00:00 2001 From: cabolanos Date: Thu, 26 Feb 2015 08:59:16 -0500 Subject: [PATCH 3/8] Refactoring to Riak Data Types. --- src/sumo_backend_riak.erl | 12 +- src/sumo_store_riak.erl | 250 +++++++++++++++--------- src/sumo_store_riak_mapred.erl | 127 ------------ test/sumo_basic_SUITE.erl | 11 +- test/sumo_test_people.erl | 28 ++- test/sumo_test_people_elasticsearch.erl | 3 +- test/sumo_test_people_mongo.erl | 3 +- test/sumo_test_people_mysql.erl | 3 +- test/sumo_test_people_pgsql.erl | 3 +- test/sumo_test_people_riak.erl | 5 +- test/test.config | 3 +- 11 files changed, 211 insertions(+), 237 deletions(-) delete mode 100644 src/sumo_store_riak_mapred.erl diff --git a/src/sumo_backend_riak.erl b/src/sumo_backend_riak.erl index d8c1048..32a34dc 100644 --- a/src/sumo_backend_riak.erl +++ b/src/sumo_backend_riak.erl @@ -53,7 +53,7 @@ %% Types. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --record(state, {conn :: pid(), encoding :: atom(), bucket :: binary()}). +-record(state, {conn :: pid(), bucket :: binary(), index :: binary()}). -type state() :: #state{}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -82,14 +82,18 @@ init(Options) -> Host = proplists:get_value(host, Options, "127.0.0.1"), Port = proplists:get_value(port, Options, 8087), Opts = riak_opts(Options), + BucketType = iolist_to_binary( + proplists:get_value(bucket_type, Options)), Bucket = iolist_to_binary( - proplists:get_value(bucket, Options, <<"default">>)), + proplists:get_value(bucket, Options, <<"sumo_test">>)), + Index = iolist_to_binary( + proplists:get_value(index, Options, <<"sumo_test_index">>)), %% Encoding - Encoding = proplists:get_value(encoding, Options, json), + %%Encoding = proplists:get_value(encoding, Options, json), %% Place Riak connection {ok, Conn} = riakc_pb_socket:start_link(Host, Port, Opts), %% Initial state - {ok, #state{conn = Conn, encoding = Encoding, bucket = Bucket}}. + {ok, #state{conn = Conn, bucket = {BucketType, Bucket}, index = Index}}. -spec handle_call(term(), term(), state()) -> {reply, term(), state()}. handle_call(get_connection, _From, State = #state{conn = Conn}) -> diff --git a/src/sumo_store_riak.erl b/src/sumo_store_riak.erl index c6d346a..c8639ab 100644 --- a/src/sumo_store_riak.erl +++ b/src/sumo_store_riak.erl @@ -36,12 +36,53 @@ -export([find_all/2, find_all/5, find_by/3, find_by/5, find_by/6]). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% Types. +%% Types and Macros. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --record(state, {conn :: pid(), encoding :: atom(), bucket :: binary()}). +-record(state, {conn :: pid(), bucket :: binary(), index :: binary()}). -type state() :: #state{}. +-define(KEY_FIND(Key_, TupleList_), + case lists:keyfind(Key_, 1, TupleList_) of + {_, V_} -> V_; + _ -> undefined + end +). + +-define(NORM_DOC_FIELD(Src_), + re:replace( + Src_, <<"_register|_set|_counter|_flag|_map">>, <<"">>, + [{return, binary}, global]) +). + +-define(REG(Val_), + fun(R_) -> riakc_register:set(Val_, R_) end +). + +-define(RMAP_FETCH(Key_, Map_), + case riakc_map:find({Key_, register}, Map_) of + {ok, Val_} -> Val_; + _ -> undefined + end +). + +-define(RMAP_UPDATE(KV_, Map_), + {Key_, Val_} = KV_, + riakc_map:update({Key_, register}, ?REG(Val_), Map_) +). + +-define(FETCH_MAP(Conn_, Bucket_, Key_), + riakc_pb_socket:fetch_type(Conn_,Bucket_, Key_) +). + +-define(UPDATE_MAP(Conn_, Bucket_, Key_, Map_), + riakc_pb_socket:update_type(Conn_, Bucket_, Key_, riakc_map:to_op(Map_)) +). + +-define(SEARCH(Conn_, Index_, Query_), + riakc_pb_socket:search(Conn_, Index_, Query_) +). + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% External API. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -54,30 +95,24 @@ init(Options) -> State = sumo_backend_riak:get_state(Backend), {ok, State}. --spec persist(sumo_internal:doc(), state()) -> - sumo_store:result(sumo_internal:doc(), state()). -persist(Doc, #state{conn = Conn, encoding = Enc, bucket = Bucket} = State) -> - DocName = sumo_internal:doc_name(Doc), +-spec persist( + sumo_internal:doc(), state() +) -> sumo_store:result(sumo_internal:doc(), state()). +persist(Doc, #state{conn = Conn, bucket = Bucket} = State) -> + NewDoc = new_doc(Doc), + DocName = sumo_internal:doc_name(NewDoc), IdField = sumo_internal:id_field_name(DocName), - Id = case sumo_internal:get_field(IdField, Doc) of - undefined -> next_id(32); - Id0 -> Id0 - end, - NewDoc = sumo_internal:set_field(IdField, Id, Doc), - Fields = sumo_internal:doc_fields(NewDoc), - EncObj = enc(Fields, Enc), - %% Create Riak Object - Object = riakc_obj:new(Bucket, iolist_to_binary(Id), EncObj, ctype(Enc)), - %% Store the object - case riakc_pb_socket:put(Conn, Object) of + Id = iolist_to_binary(sumo_internal:get_field(IdField, NewDoc)), + case ?UPDATE_MAP(Conn, Bucket, Id, doc_to_rmap(NewDoc)) of {error, Error} -> {error, Error, State}; _ -> {ok, NewDoc, State} end. --spec delete_by(sumo:schema_name(), sumo:conditions(), state()) -> - sumo_store:result(sumo_store:affected_rows(), state()). +-spec delete_by( + sumo:schema_name(), sumo:conditions(), state() +) -> sumo_store:result(sumo_store:affected_rows(), state()). delete_by(DocName, Conditions, #state{conn = Conn, bucket = Bucket} = State) -> IdField = sumo_internal:id_field_name(DocName), case lists:keyfind(IdField, 1, Conditions) of @@ -93,8 +128,9 @@ delete_by(DocName, Conditions, #state{conn = Conn, bucket = Bucket} = State) -> {ok, 1, State} end. --spec delete_all(sumo:schema_name(), state()) -> - sumo_store:result(sumo_store:affected_rows(), state()). +-spec delete_all( + sumo:schema_name(), state() +) -> sumo_store:result(sumo_store:affected_rows(), state()). delete_all(_DocName, #state{conn = Conn, bucket = Bucket} = State) -> %% @todo Optimization required -- this should be a batch op. %% Asking Riak to generate a list of all keys in a production environment @@ -112,17 +148,17 @@ delete_all(_DocName, #state{conn = Conn, bucket = Bucket} = State) -> Count = lists:foldl(Delete, 0, Keys), {ok, Count, State}. --spec find_all(sumo:schema_name(), state()) -> - sumo_store:result([sumo_internal:doc()], state()). -find_all(DocName, - #state{conn = Conn, bucket = Bucket, encoding = Enc} = State) -> +-spec find_all( + sumo:schema_name(), state() +) -> sumo_store:result([sumo_internal:doc()], state()). +find_all(DocName, #state{conn = Conn, bucket = Bucket} = State) -> %% @todo Optimization required -- this should be a batch op. %% Asking Riak to generate a list of all keys in a production environment %% is generally a bad idea. It's an expensive operation. case riakc_pb_socket:list_keys(Conn, Bucket) of {ok, Keys} -> - Fun = fun(Item) -> map_to_doc(DocName, Item) end, - Docs = lists:map(Fun, fetch_bulk(Conn, Bucket, Enc, Keys)), + F = fun(Item, Acc) -> [rmap_to_doc(DocName, Item) | Acc] end, + Docs = lists:foldl(F, [], fetch_bulk(Conn, Bucket, Keys)), {ok, Docs, State}; _ -> {ok, [], State} @@ -154,22 +190,22 @@ find_by(DocName, Conditions, _Limit, _Offset, - #state{conn = Conn, bucket = Bucket, encoding = Enc} = State) -> + #state{conn = Conn, bucket = Bucket, index = Index} = State) -> IdField = sumo_internal:id_field_name(DocName), case lists:keyfind(IdField, 1, Conditions) of {_K, Key} -> - case riakc_pb_socket:get(Conn, Bucket, iolist_to_binary(Key)) of - {ok, RiakObj} -> - Val = map_to_doc(DocName, dec(riakc_obj:get_value(RiakObj), Enc)), + case ?FETCH_MAP(Conn, Bucket, iolist_to_binary(Key)) of + {ok, RMap} -> + Val = rmap_to_doc(DocName, RMap), {ok, [Val], State}; {error, Error} -> {error, Error, State} end; _ -> Query = build_query(Conditions), - case riakc_pb_socket:mapred_bucket(Conn, Bucket, Query) of - {ok, [{_, Results}]} -> - F = fun(Val, Acc) -> [map_to_doc(DocName, dec(Val, Enc)) | Acc] end, + case ?SEARCH(Conn, Index, Query) of + {ok, {search_results, Results, _, _Count}} -> + F = fun({_, KV}, Acc) -> [kv_to_doc(DocName, KV) | Acc] end, NewRes = lists:foldl(F, [], Results), {ok, NewRes, State}; {error, Error} -> @@ -177,8 +213,6 @@ find_by(DocName, end end. -%% XXX We should have a DSL here, to allow querying in a known language -%% to be translated by each driver into its own. -spec find_by( sumo:schema_name(), sumo:conditions(), @@ -190,13 +224,6 @@ find_by(DocName, find_by(_DocName, _Conditions, _SortFields, _Limit, _Offset, State) -> {error, not_supported, State}. -%% XXX: Refactor: -%% Requires {length, X} to be the first field attribute in order to form the -%% correct query. :P -%% If no indexes are defined, will put an extra comma :P -%% Maybe it would be better to just use ALTER statements instead of trying to -%% create the schema on the 1st pass. Also, ALTER statements might be better -%% for when we have migrations. -spec create_schema(sumo:schema(), state()) -> sumo_store:result(state()). create_schema(_Schema, State) -> %% @todo Search Schema (Solr), and probably create 2i into the given schema @@ -206,34 +233,89 @@ create_schema(_Schema, State) -> %% Private API. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -ctype(bert) -> "application/x-erlang-binary"; -ctype(_) -> "application/json". +%% @private +new_doc(Doc) -> + DocName = sumo_internal:doc_name(Doc), + IdField = sumo_internal:id_field_name(DocName), + Id = case sumo_internal:get_field(IdField, Doc) of + undefined -> next_id(32); + Id0 -> Id0 + end, + sumo_internal:set_field(IdField, Id, Doc). -enc(Data, Encoding) -> - case Encoding of - bert -> - term_to_binary(Data, [compressed]); - _ -> - jiffy:encode(Data, [uescape]) - end. +%% @private +doc_to_rmap(Doc) -> + Fields = sumo_internal:doc_fields(Doc), + F = fun({K, V}, Acc) -> + ?RMAP_UPDATE({atom_to_binary(K, utf8), to_bin(V)}, Acc) + end, + lists:foldl(F, riakc_map:new(), maps:to_list(Fields)). -dec(Data, Encoding) -> - case Encoding of - bert -> - binary_to_term(Data); - _ -> - jiffy:decode(Data, [return_maps]) - end. +%% @private +rmap_to_doc(DocName, RMap) -> + F = fun({{K, _}, V}, Acc) -> + sumo_internal:set_field(binary_to_atom(K, utf8), V, Acc) + end, + lists:foldl(F, sumo_internal:new_doc(DocName), riakc_map:value(RMap)). + +%% @private +kv_to_doc(DocName, KV) -> + F = fun({K, V}, Acc) -> + NK = ?NORM_DOC_FIELD(K), + sumo_internal:set_field(binary_to_atom(NK, utf8), V, Acc) + end, + lists:foldl(F, sumo_internal:new_doc(DocName), KV). + +%% @private +fetch_bulk(Conn, Bucket, Keys) -> + F = fun(K, Acc) -> + case ?FETCH_MAP(Conn, Bucket, K) of + {ok, RMap} -> [RMap | Acc]; + _ -> Acc + end + end, + lists:foldl(F, [], Keys). + +%% @private +build_query({q_str, Q}) when is_binary(Q) -> + Q; +build_query(PL) when is_list(PL) -> + build_query1(PL, <<"">>); +build_query(_) -> + <<"*:*">>. -map_to_doc(DocName, Item) -> - Fun = fun (Key, Doc) -> - FieldName = binary_to_atom(Key, utf8), - Value = maps:get(Key, Item), - sumo_internal:set_field(FieldName, Value, Doc) - end, - Keys = maps:keys(Item), - lists:foldl(Fun, sumo_internal:new_doc(DocName), Keys). +build_query1([], Acc) -> + Acc; +build_query1([{_, [{_, _} | _T0]} = KV | T], <<"">>) -> + build_query1(T, <<(<<"(">>)/binary, (build_query2(KV))/binary, (<<")">>)/binary>>); +build_query1([{_, [{_, _} | _T0]} = KV | T], Acc) -> + build_query1(T, <>)/binary, (build_query2(KV))/binary, (<<")">>)/binary>>); +build_query1([{K, V} | T], <<"">>) -> + build_query1(T, <<(query_eq(K, V))/binary>>); +build_query1([{K, V} | T], Acc) -> + build_query1(T, <>)/binary, (query_eq(K, V))/binary>>). +query_eq(K, V) -> + <<(atom_to_binary(K, utf8))/binary, + (<<"_register:">>)/binary, + (to_bin(V))/binary>>. + +build_query2({K, [{_, _} | _T] = V}) -> + F = fun({K_, V_}, Acc) -> + Eq = <<(atom_to_binary(K_, utf8))/binary, + (<<"_register:">>)/binary, + (to_bin(V_))/binary>>, + case Acc of + <<"">> -> + Eq; + _ -> + <>)/binary, (to_bin(K))/binary, + (<<" ">>)/binary, Eq/binary>> + end + end, + lists:foldl(F, <<"">>, V). + +%% @private next_id(Len) -> <> = crypto:strong_rand_bytes(12), random:seed({A1, A2, A3}), @@ -242,26 +324,14 @@ next_id(Len) -> F = fun(_, R) -> [element(random:uniform(ChrsSize), Chrs) | R] end, lists:foldl(F, "", lists:seq(1, Len)). -fetch_bulk(Conn, Bucket, Enc, Keys) -> - F = fun(K, Acc) -> - case riakc_pb_socket:get(Conn, Bucket, K) of - {ok, RiakObj} -> - Val = riakc_obj:get_value(RiakObj), - [dec(Val, Enc) | Acc]; - {error, _} -> - Acc - end - end, - lists:foldl(F, [], Keys). - -norm_conditions(Conditions) -> - F = fun({X, Y}, Acc) when is_list(Y) -> - [{atom_to_binary(X, utf8), iolist_to_binary(Y)} | Acc]; - ({X, Y}, Acc) -> - [{atom_to_binary(X, utf8), Y} | Acc] - end, - lists:foldl(F, [], Conditions). - -build_query(Conditions) -> - [{map, {modfun, sumo_store_riak_mapred, map_object_value_by_index}, - norm_conditions(Conditions), true}]. +%% @private +to_bin(Data) when is_integer(Data) -> + integer_to_binary(Data); +to_bin(Data) when is_float(Data) -> + float_to_binary(Data); +to_bin(Data) when is_atom(Data) -> + atom_to_binary(Data, utf8); +to_bin(Data) when is_list(Data) -> + iolist_to_binary(Data); +to_bin(Data) -> + Data. diff --git a/src/sumo_store_riak_mapred.erl b/src/sumo_store_riak_mapred.erl deleted file mode 100644 index 8cf8357..0000000 --- a/src/sumo_store_riak_mapred.erl +++ /dev/null @@ -1,127 +0,0 @@ -%%% @hidden -%%% @doc convenience functions for defining common map/reduce phases -%%% -%%% Copyright 2012 Inaka <hello@inaka.net> -%%% -%%% Licensed under the Apache License, Version 2.0 (the "License"); -%%% you may not use this file except in compliance with the License. -%%% You may obtain a copy of the License at -%%% -%%% http://www.apache.org/licenses/LICENSE-2.0 -%%% -%%% Unless required by applicable law or agreed to in writing, software -%%% distributed under the License is distributed on an "AS IS" BASIS, -%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%%% See the License for the specific language governing permissions and -%%% limitations under the License. -%%% @end -%%% @copyright Inaka -%%% --module(sumo_store_riak_mapred). --author("Carlos Andres Bolanos "). --github("https://github.com/inaka"). --license("Apache License 2.0"). - -%% API --export([map_object_key/3, - map_object_value/3, - map_object_kv/3, - map_object_key_by_index/3, - map_object_value_by_index/3, - map_object_kv_by_index/3]). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% API - Map/Reduce Functions. -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -%% @spec map_object_key(riak_object:riak_object(), term(), term()) -> [term()] -%% @doc map phase function -%% Return a list that contains the key of each object named by -%% BucketKeys. -%% If the RiakObject is the tuple {error, notfound}, the -%% behavior of this function is produce no output (literally []). -map_object_key({error, notfound}, _KeyData, _Arg) -> - []; -map_object_key(Object, _KeyData, _Arg) -> - [return(k, Object)]. - -%% @spec map_object_value(riak_object:riak_object(), term(), term()) -> [term()] -%% @doc map phase function -%% Return a list that contains the value of each object named by -%% BucketKeys. -%% If the RiakObject is the tuple {error, notfound}, the -%% behavior of this function is produce no output (literally []). -map_object_value({error, notfound}, _KeyData, _Arg) -> - []; -map_object_value(Object, _KeyData, _Arg) -> - [return(v, Object)]. - -%% @spec map_object_kv(riak_object:riak_object(), term(), term()) -> [term()] -%% @doc map phase function -%% Return a list that contains the key/value pair of each object named by -%% BucketKeys. -%% If the RiakObject is the tuple {error, notfound}, the -%% behavior of this function is produce no output (literally []). -map_object_kv({error, notfound}, _KeyData, _Arg) -> - []; -map_object_kv(Object, _KeyData, _Arg) -> - [return(kv, Object)]. - -%% @spec map_object_key_by_index(riak_object:riak_object(), term(), term()) -> -%% [term()] -%% @doc map phase function -%% Return a list that contains the key of each object named by -%% BucketKeys, that match with the specified indices. -%% If the RiakObject is the tuple {error, notfound}, the -%% behavior of this function is produce no output (literally []). -map_object_key_by_index({error, notfound}, _KeyData, _Arg) -> - []; -map_object_key_by_index(Object, KeyData, Arg) -> - find_by_index(Object, KeyData, Arg, k). - -%% @spec map_object_value_by_index(riak_object:riak_object(), term(), term()) -> -%% [term()] -%% @doc map phase function -%% Return a list that contains the value of each object named by -%% BucketKeys, that match with the specified indices. -%% If the RiakObject is the tuple {error, notfound}, the -%% behavior of this function is produce no output (literally []). -map_object_value_by_index({error, notfound}, _KeyData, _Arg) -> - []; -map_object_value_by_index(Object, KeyData, Arg) -> - find_by_index(Object, KeyData, Arg, v). - -%% @spec map_object_kv_by_index(riak_object:riak_object(), term(), term()) -> -%% [term()] -%% @doc map phase function -%% Return a list that contains the key/value pair of each object named by -%% BucketKeys, that match with the specified indices. -%% If the RiakObject is the tuple {error, notfound}, the -%% behavior of this function is produce no output (literally []). -map_object_kv_by_index({error, notfound}, _KeyData, _Arg) -> - []; -map_object_kv_by_index(Object, KeyData, Arg) -> - find_by_index(Object, KeyData, Arg, kv). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% Internals. -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -%% @private -find_by_index(O, _, IdxL, Ret) -> - {struct, Map} = mochijson2:decode(riak_object:get_value(O)), - Deleted = dict:is_key(<<"X-Riak-Deleted">>, riak_object:get_metadata(O)), - F = fun({X, Y}, Acc) -> (proplists:get_value(X, Map) =:= Y) and Acc end, - Eval = lists:foldl(F, not Deleted, IdxL), - case Eval of - true -> [return(Ret, O)]; - false -> [] - end. - -%% @private -return(k, O) -> - riak_object:key(O); -return(v, O) -> - riak_object:get_value(O); -return(kv, O) -> - {riak_object:key(O), riak_object:get_value(O)}. diff --git a/test/sumo_basic_SUITE.erl b/test/sumo_basic_SUITE.erl index d8414e2..9c38313 100644 --- a/test/sumo_basic_SUITE.erl +++ b/test/sumo_basic_SUITE.erl @@ -87,13 +87,8 @@ init_store(Module) -> sumo:persist(Module, Module:new(<<"E">>, <<"A">>, 2)), sumo:persist(Module, Module:new(<<"F">>, <<"E">>, 1)), - %% 1. Necessary to get elasticsearch in particular to index its stuff. - %% 2. Necessary to Riak. - %% Riak clusters will retain deleted objects for some period of time - %% (3 seconds by default), and the MapReduce framework does not conceal - %% these from submitted jobs. - %% @see - timer:sleep(3000). + %% Necessary to get elasticsearch in particular to index its stuff. + timer:sleep(1000). find_all_module(Module) -> 6 = length(sumo:find_all(Module)). @@ -104,6 +99,8 @@ find_by_module(Module) -> SortFun = fun(A, B) -> Module:name(A) < Module:name(B) end, [First, Second | _] = lists:sort(SortFun, Results), + true = is_integer(Module:age(First)), + "B" = to_str(Module:name(First)), "D" = to_str(Module:name(Second)). diff --git a/test/sumo_test_people.erl b/test/sumo_test_people.erl index 57a80be..6b5e7a5 100644 --- a/test/sumo_test_people.erl +++ b/test/sumo_test_people.erl @@ -8,7 +8,7 @@ sumo_sleep/1 ]). --export([new/2, new/3, new/4, name/1, id/1]). +-export([new/2, new/3, new/4, name/1, id/1, age/1]). -record(person, {id :: integer(), name :: string(), @@ -36,7 +36,7 @@ sumo_wakeup(Person) -> id = maps:get(id, Person), name = maps:get(name, Person), last_name = maps:get(last_name, Person), - age = maps:get(age, Person), + age = from_bin(maps:get(age, Person), integer), address = maps:get(address, Person) }. @@ -64,3 +64,27 @@ name(Person) -> id(Person) -> Person#person.id. + +age(Person) -> + Person#person.age. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% Internals +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% @private +%% @doc This helper function is needed by Riak, because data in Riak is stored +%% as Riak Maps, and values in the them must be binary, so when a data is +%% stored in Riak, all values in the map are converted to binary, because +%% of that, is necessary convert values to original types when data is +%% returned. +from_bin(Bin, integer) when is_binary(Bin) -> + binary_to_integer(Bin); +from_bin(Bin, float) when is_binary(Bin) -> + binary_to_float(Bin); +from_bin(Bin, atom) when is_binary(Bin) -> + binary_to_atom(Bin, utf8); +from_bin(Bin, string) when is_binary(Bin) -> + binary_to_list(Bin); +from_bin(Bin, _) -> + Bin. diff --git a/test/sumo_test_people_elasticsearch.erl b/test/sumo_test_people_elasticsearch.erl index 5b022ab..7b7f8b2 100644 --- a/test/sumo_test_people_elasticsearch.erl +++ b/test/sumo_test_people_elasticsearch.erl @@ -10,7 +10,8 @@ new/3, new/4, name/1, - id/1 + id/1, + age/1 ] } ]). diff --git a/test/sumo_test_people_mongo.erl b/test/sumo_test_people_mongo.erl index 8317eef..1c06e23 100644 --- a/test/sumo_test_people_mongo.erl +++ b/test/sumo_test_people_mongo.erl @@ -10,7 +10,8 @@ new/3, new/4, name/1, - id/1 + id/1, + age/1 ] } ]). diff --git a/test/sumo_test_people_mysql.erl b/test/sumo_test_people_mysql.erl index 1543a4d..01d81a7 100644 --- a/test/sumo_test_people_mysql.erl +++ b/test/sumo_test_people_mysql.erl @@ -10,7 +10,8 @@ new/3, new/4, name/1, - id/1 + id/1, + age/1 ] } ]). diff --git a/test/sumo_test_people_pgsql.erl b/test/sumo_test_people_pgsql.erl index 3e15c01..16676c3 100644 --- a/test/sumo_test_people_pgsql.erl +++ b/test/sumo_test_people_pgsql.erl @@ -10,7 +10,8 @@ new/3, new/4, name/1, - id/1 + id/1, + age/1 ] } ]). diff --git a/test/sumo_test_people_riak.erl b/test/sumo_test_people_riak.erl index c760248..5d7694c 100644 --- a/test/sumo_test_people_riak.erl +++ b/test/sumo_test_people_riak.erl @@ -10,7 +10,8 @@ new/3, new/4, name/1, - id/1 + id/1, + age/1 ] } ]). @@ -20,7 +21,7 @@ -spec sumo_schema() -> sumo:schema(). sumo_schema() -> Fields = - [sumo:new_field(id, string, [id, not_null, auto_increment]), + [sumo:new_field(id, string, [id, {length, 255}, not_null]), sumo:new_field(name, string, [{length, 255}, not_null]), sumo:new_field(last_name, string, [{length, 255}, not_null]), sumo:new_field(age, integer), diff --git a/test/test.config b/test/test.config index b9d8220..0449b71 100644 --- a/test/test.config +++ b/test/test.config @@ -43,8 +43,9 @@ sumo_backend_riak, [{host, "127.0.0.1"}, {port, 8087}, + {bucket_type, "maps"}, {bucket, "sumo_test"}, - {encoding, json}] + {index, "sumo_test_index"}] } ] }, From 924fe28323008676fe2b58ed6af6dd0611d5b3aa Mon Sep 17 00:00:00 2001 From: cabolanos Date: Thu, 26 Feb 2015 17:54:47 -0500 Subject: [PATCH 4/8] Refactoring to Riak Data Types. Query/Bulk optimizations using Search and 2i APIs. --- src/sumo_store_riak.erl | 295 +++++++++++++++++++++----------------- test/sumo_basic_SUITE.erl | 22 ++- 2 files changed, 182 insertions(+), 135 deletions(-) diff --git a/src/sumo_store_riak.erl b/src/sumo_store_riak.erl index c8639ab..2a518db 100644 --- a/src/sumo_store_riak.erl +++ b/src/sumo_store_riak.erl @@ -36,58 +36,19 @@ -export([find_all/2, find_all/5, find_by/3, find_by/5, find_by/6]). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% Types and Macros. +%% Types. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -record(state, {conn :: pid(), bucket :: binary(), index :: binary()}). -type state() :: #state{}. --define(KEY_FIND(Key_, TupleList_), - case lists:keyfind(Key_, 1, TupleList_) of - {_, V_} -> V_; - _ -> undefined - end -). - --define(NORM_DOC_FIELD(Src_), - re:replace( - Src_, <<"_register|_set|_counter|_flag|_map">>, <<"">>, - [{return, binary}, global]) -). - --define(REG(Val_), - fun(R_) -> riakc_register:set(Val_, R_) end -). - --define(RMAP_FETCH(Key_, Map_), - case riakc_map:find({Key_, register}, Map_) of - {ok, Val_} -> Val_; - _ -> undefined - end -). - --define(RMAP_UPDATE(KV_, Map_), - {Key_, Val_} = KV_, - riakc_map:update({Key_, register}, ?REG(Val_), Map_) -). - --define(FETCH_MAP(Conn_, Bucket_, Key_), - riakc_pb_socket:fetch_type(Conn_,Bucket_, Key_) -). - --define(UPDATE_MAP(Conn_, Bucket_, Key_, Map_), - riakc_pb_socket:update_type(Conn_, Bucket_, Key_, riakc_map:to_op(Map_)) -). - --define(SEARCH(Conn_, Index_, Query_), - riakc_pb_socket:search(Conn_, Index_, Query_) -). - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% External API. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --spec init(term()) -> {ok, term()}. +-spec init( + term() +) -> {ok, term()}. init(Options) -> % The storage backend key in the options specifies the name of the process % which creates and initializes the storage backend. @@ -99,11 +60,8 @@ init(Options) -> sumo_internal:doc(), state() ) -> sumo_store:result(sumo_internal:doc(), state()). persist(Doc, #state{conn = Conn, bucket = Bucket} = State) -> - NewDoc = new_doc(Doc), - DocName = sumo_internal:doc_name(NewDoc), - IdField = sumo_internal:id_field_name(DocName), - Id = iolist_to_binary(sumo_internal:get_field(IdField, NewDoc)), - case ?UPDATE_MAP(Conn, Bucket, Id, doc_to_rmap(NewDoc)) of + {Id, NewDoc} = new_doc(Doc, State), + case update_map(Conn, Bucket, Id, doc_to_rmap(NewDoc)) of {error, Error} -> {error, Error, State}; _ -> @@ -113,55 +71,58 @@ persist(Doc, #state{conn = Conn, bucket = Bucket} = State) -> -spec delete_by( sumo:schema_name(), sumo:conditions(), state() ) -> sumo_store:result(sumo_store:affected_rows(), state()). -delete_by(DocName, Conditions, #state{conn = Conn, bucket = Bucket} = State) -> +delete_by(DocName, + Conditions, + #state{conn = Conn, bucket = Bucket, index = Index} = State) -> IdField = sumo_internal:id_field_name(DocName), case lists:keyfind(IdField, 1, Conditions) of {_K, Key} -> - case riakc_pb_socket:delete(Conn, Bucket, iolist_to_binary(Key)) of + case delete_map(Conn, Bucket, iolist_to_binary(Key)) of ok -> {ok, 1, State}; {error, Error} -> {error, Error, State} end; _ -> - %% @todo query keys that match with conditions and then delete them. - {ok, 1, State} + Query = build_query(Conditions), + case search_docs_by(DocName, Conn, Index, Query, 0, 0) of + {ok, {Total, Res}} -> + delete_docs(Conn, Bucket, Res), + {ok, Total, State}; + {error, Error} -> + {error, Error, State} + end end. -spec delete_all( sumo:schema_name(), state() ) -> sumo_store:result(sumo_store:affected_rows(), state()). delete_all(_DocName, #state{conn = Conn, bucket = Bucket} = State) -> - %% @todo Optimization required -- this should be a batch op. - %% Asking Riak to generate a list of all keys in a production environment - %% is generally a bad idea. It's an expensive operation. - Delete = fun(K, Acc) -> - case riakc_pb_socket:delete(Conn, Bucket, K) of - ok -> Acc + 1; - {error, _} -> Acc - end - end, - Keys = case riakc_pb_socket:list_keys(Conn, Bucket) of - {ok, LK} -> LK; - _ -> [] - end, - Count = lists:foldl(Delete, 0, Keys), - {ok, Count, State}. + Del = fun({C, B, Kst}, Acc) -> + lists:foreach(fun(K) -> delete_map(C, B, K) end, Kst), + Acc + length(Kst) + end, + case stream_keys(Conn, Bucket, Del, 0) of + {ok, Count} -> {ok, Count, State}; + {_, Count} -> {error, Count, State} + end. -spec find_all( sumo:schema_name(), state() ) -> sumo_store:result([sumo_internal:doc()], state()). find_all(DocName, #state{conn = Conn, bucket = Bucket} = State) -> - %% @todo Optimization required -- this should be a batch op. - %% Asking Riak to generate a list of all keys in a production environment - %% is generally a bad idea. It's an expensive operation. - case riakc_pb_socket:list_keys(Conn, Bucket) of - {ok, Keys} -> - F = fun(Item, Acc) -> [rmap_to_doc(DocName, Item) | Acc] end, - Docs = lists:foldl(F, [], fetch_bulk(Conn, Bucket, Keys)), - {ok, Docs, State}; - _ -> - {ok, [], State} + Get = fun({C, B, Kst}, Acc) -> + Fun = fun(K, Acc) -> + case fetch_map(C, B, K) of + {ok, M} -> [rmap_to_doc(DocName, M) | Acc]; + _ -> Acc + end + end, + lists:foldl(Fun, [], Kst) ++ Acc + end, + case stream_keys(Conn, Bucket, Get, []) of + {ok, Docs} -> {ok, Docs, State}; + {_, Docs} -> {error, Docs, State} end. -spec find_all( @@ -172,6 +133,7 @@ find_all(DocName, #state{conn = Conn, bucket = Bucket} = State) -> state() ) -> sumo_store:result([sumo_internal:doc()], state()). find_all(DocName, _SortFields, Limit, Offset, State) -> + %% @todo implement search with sort parameters. find_by(DocName, [], Limit, Offset, State). -spec find_by(sumo:schema_name(), sumo:conditions(), state()) -> @@ -188,13 +150,13 @@ find_by(DocName, Conditions, State) -> ) -> sumo_store:result([sumo_internal:doc()], state()). find_by(DocName, Conditions, - _Limit, - _Offset, + Limit, + Offset, #state{conn = Conn, bucket = Bucket, index = Index} = State) -> IdField = sumo_internal:id_field_name(DocName), case lists:keyfind(IdField, 1, Conditions) of {_K, Key} -> - case ?FETCH_MAP(Conn, Bucket, iolist_to_binary(Key)) of + case fetch_map(Conn, Bucket, iolist_to_binary(Key)) of {ok, RMap} -> Val = rmap_to_doc(DocName, RMap), {ok, [Val], State}; @@ -203,13 +165,9 @@ find_by(DocName, end; _ -> Query = build_query(Conditions), - case ?SEARCH(Conn, Index, Query) of - {ok, {search_results, Results, _, _Count}} -> - F = fun({_, KV}, Acc) -> [kv_to_doc(DocName, KV) | Acc] end, - NewRes = lists:foldl(F, [], Results), - {ok, NewRes, State}; - {error, Error} -> - {error, Error, State} + case search_docs_by(DocName, Conn, Index, Query, Limit, Offset) of + {ok, {_, Res}} -> {ok, Res, State}; + {error, Error} -> {error, Error, State} end end. @@ -224,9 +182,10 @@ find_by(DocName, find_by(_DocName, _Conditions, _SortFields, _Limit, _Offset, State) -> {error, not_supported, State}. --spec create_schema(sumo:schema(), state()) -> sumo_store:result(state()). +-spec create_schema( + sumo:schema(), state() +) -> sumo_store:result(state()). create_schema(_Schema, State) -> - %% @todo Search Schema (Solr), and probably create 2i into the given schema {ok, State}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -234,20 +193,32 @@ create_schema(_Schema, State) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% @private -new_doc(Doc) -> +doc_id(Doc) -> + DocName = sumo_internal:doc_name(Doc), + IdField = sumo_internal:id_field_name(DocName), + sumo_internal:get_field(IdField, Doc). + +%% @private +new_doc(Doc, #state{conn = Conn, bucket = Bucket}) -> DocName = sumo_internal:doc_name(Doc), IdField = sumo_internal:id_field_name(DocName), Id = case sumo_internal:get_field(IdField, Doc) of - undefined -> next_id(32); - Id0 -> Id0 + undefined -> + case update_map(Conn, Bucket, undefined, doc_to_rmap(Doc)) of + {ok, RiakMapId} -> RiakMapId; + {error, Error} -> throw(Error); + _ -> throw(unexpected) + end; + Id0 -> + to_bin(Id0) end, - sumo_internal:set_field(IdField, Id, Doc). + {Id, sumo_internal:set_field(IdField, Id, Doc)}. %% @private doc_to_rmap(Doc) -> Fields = sumo_internal:doc_fields(Doc), F = fun({K, V}, Acc) -> - ?RMAP_UPDATE({atom_to_binary(K, utf8), to_bin(V)}, Acc) + rmap_update({atom_to_binary(K, utf8), to_bin(V)}, Acc) end, lists:foldl(F, riakc_map:new(), maps:to_list(Fields)). @@ -261,77 +232,137 @@ rmap_to_doc(DocName, RMap) -> %% @private kv_to_doc(DocName, KV) -> F = fun({K, V}, Acc) -> - NK = ?NORM_DOC_FIELD(K), + NK = normalize_doc_fields(K), sumo_internal:set_field(binary_to_atom(NK, utf8), V, Acc) end, lists:foldl(F, sumo_internal:new_doc(DocName), KV). %% @private -fetch_bulk(Conn, Bucket, Keys) -> - F = fun(K, Acc) -> - case ?FETCH_MAP(Conn, Bucket, K) of - {ok, RMap} -> [RMap | Acc]; - _ -> Acc - end +normalize_doc_fields(Src) -> + re:replace( + Src, <<"_register|_set|_counter|_flag|_map">>, <<"">>, + [{return, binary}, global]). + +%% @private +rmap_update({K, V}, Map) -> + riakc_map:update({K, register}, fun(R) -> riakc_register:set(V, R) end, Map). + +%% @private +fetch_map(Conn, Bucket, Key) -> + riakc_pb_socket:fetch_type(Conn, Bucket, Key). + +%% @private +delete_map(Conn, Bucket, Key) -> + riakc_pb_socket:delete(Conn, Bucket, Key). + +%% @private +update_map(Conn, Bucket, Key, Map) -> + riakc_pb_socket:update_type(Conn, Bucket, Key, riakc_map:to_op(Map)). + +%% @private +search(Conn, Index, Query, 0, 0) -> + riakc_pb_socket:search(Conn, Index, Query); +search(Conn, Index, Query, Limit, Offset) -> + riakc_pb_socket:search(Conn, Index, Query, [{start, Offset}, {rows, Limit}]). + +%% @private +stream_keys(Conn, Bucket, F, Acc) -> + {ok, Ref} = riakc_pb_socket:get_index_eq( + Conn, Bucket, <<"$bucket">>, <<"">>, [{stream, true}]), + receive_stream(Ref, Conn, Bucket, F, Acc). + +%% @private +receive_stream(Ref, Conn, Bucket, F, Acc) -> + receive + {Ref, {_, Stream, _}} -> + receive_stream(Ref, Conn, Bucket, F, F({Conn, Bucket, Stream}, Acc)); + {Ref, {done, _}} -> + {ok, Acc}; + _ -> + {error, Acc} + after + 30000 -> {timeout, Acc} + end. + +%% @private +search_docs_by(DocName, Conn, Index, Query, Limit, Offset) -> + case search(Conn, Index, Query, Limit, Offset) of + {ok, {search_results, Results, _, Total}} -> + F = fun({_, KV}, Acc) -> [kv_to_doc(DocName, KV) | Acc] end, + NewRes = lists:foldl(F, [], Results), + {ok, {Total, NewRes}}; + {error, Error} -> + {error, Error} + end. + +%% @private +delete_docs(Conn, Bucket, Docs) -> + F = fun(D) -> + K = doc_id(D), + delete_map(Conn, Bucket, K) end, - lists:foldl(F, [], Keys). + lists:foreach(F, Docs). + +%% @private +to_bin(Data) when is_integer(Data) -> + integer_to_binary(Data); +to_bin(Data) when is_float(Data) -> + float_to_binary(Data); +to_bin(Data) when is_atom(Data) -> + atom_to_binary(Data, utf8); +to_bin(Data) when is_list(Data) -> + iolist_to_binary(Data); +to_bin(Data) -> + Data. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Private API - Query Builder. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% @private build_query({q_str, Q}) when is_binary(Q) -> Q; +build_query([]) -> + <<"*:*">>; build_query(PL) when is_list(PL) -> build_query1(PL, <<"">>); build_query(_) -> <<"*:*">>. +%% @private build_query1([], Acc) -> Acc; build_query1([{_, [{_, _} | _T0]} = KV | T], <<"">>) -> - build_query1(T, <<(<<"(">>)/binary, (build_query2(KV))/binary, (<<")">>)/binary>>); + build_query1(T, + <<(<<"(">>)/binary, (build_query2(KV))/binary, (<<")">>)/binary>>); build_query1([{_, [{_, _} | _T0]} = KV | T], Acc) -> - build_query1(T, <>)/binary, (build_query2(KV))/binary, (<<")">>)/binary>>); + build_query1(T, + <>)/binary, + (build_query2(KV))/binary, (<<")">>)/binary>>); build_query1([{K, V} | T], <<"">>) -> build_query1(T, <<(query_eq(K, V))/binary>>); build_query1([{K, V} | T], Acc) -> - build_query1(T, <>)/binary, (query_eq(K, V))/binary>>). - -query_eq(K, V) -> - <<(atom_to_binary(K, utf8))/binary, - (<<"_register:">>)/binary, - (to_bin(V))/binary>>. + build_query1(T, + <>)/binary, (query_eq(K, V))/binary>>). +%% @private build_query2({K, [{_, _} | _T] = V}) -> F = fun({K_, V_}, Acc) -> Eq = <<(atom_to_binary(K_, utf8))/binary, - (<<"_register:">>)/binary, - (to_bin(V_))/binary>>, + (<<"_register:">>)/binary, + (to_bin(V_))/binary>>, case Acc of <<"">> -> Eq; _ -> <>)/binary, (to_bin(K))/binary, - (<<" ">>)/binary, Eq/binary>> + (<<" ">>)/binary, Eq/binary>> end end, lists:foldl(F, <<"">>, V). %% @private -next_id(Len) -> - <> = crypto:strong_rand_bytes(12), - random:seed({A1, A2, A3}), - Chrs = list_to_tuple("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"), - ChrsSize = size(Chrs), - F = fun(_, R) -> [element(random:uniform(ChrsSize), Chrs) | R] end, - lists:foldl(F, "", lists:seq(1, Len)). - -%% @private -to_bin(Data) when is_integer(Data) -> - integer_to_binary(Data); -to_bin(Data) when is_float(Data) -> - float_to_binary(Data); -to_bin(Data) when is_atom(Data) -> - atom_to_binary(Data, utf8); -to_bin(Data) when is_list(Data) -> - iolist_to_binary(Data); -to_bin(Data) -> - Data. +query_eq(K, V) -> + <<(atom_to_binary(K, utf8))/binary, + (<<"_register:">>)/binary, + (to_bin(V))/binary>>. diff --git a/test/sumo_basic_SUITE.erl b/test/sumo_basic_SUITE.erl index 9c38313..55060f6 100644 --- a/test/sumo_basic_SUITE.erl +++ b/test/sumo_basic_SUITE.erl @@ -87,8 +87,17 @@ init_store(Module) -> sumo:persist(Module, Module:new(<<"E">>, <<"A">>, 2)), sumo:persist(Module, Module:new(<<"F">>, <<"E">>, 1)), - %% Necessary to get elasticsearch in particular to index its stuff. - timer:sleep(1000). + %% Sync Timeout. + %% 1. Necessary to get elasticsearch in particular to index its stuff. + %% 2. Necessary to Riak. + %% Riak clusters will retain deleted objects for some period of time + %% (3 seconds by default), and the MapReduce framework does not conceal + %% these from submitted jobs. + %% @see + case Module of + sumo_test_people_riak -> timer:sleep(5000); + _ -> timer:sleep(1000) + end. find_all_module(Module) -> 6 = length(sumo:find_all(Module)). @@ -102,7 +111,14 @@ find_by_module(Module) -> true = is_integer(Module:age(First)), "B" = to_str(Module:name(First)), - "D" = to_str(Module:name(Second)). + "D" = to_str(Module:name(Second)), + + %% Check find_by ID + [First1] = sumo:find_by(Module, [{id, Module:id(First)}]), + First1 = First, + %% Check pagination + Results1 = sumo:find_by(Module, [], 3, 1), + 3 = length(Results1). delete_all_module(Module) -> sumo:delete_all(Module), From 1ea1e6d161b6a970da1925739edc9620041f138a Mon Sep 17 00:00:00 2001 From: cabolanos Date: Fri, 27 Feb 2015 17:59:16 -0500 Subject: [PATCH 5/8] Refactoring to riak data types finished. Performance optimizations. Modify some functions to support multilevel entities. Riak documentation added to README. --- README.md | 64 ++++++++++++++++++++++++++++++++++++++--- src/sumo_store_riak.erl | 61 ++++++++++++++++++++++++++++++--------- 2 files changed, 108 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index e91dacd..d1640de 100644 --- a/README.md +++ b/README.md @@ -75,15 +75,71 @@ being in the top level directory: To run tests successfully, you need to follow these steps first: - * Start the database engines: MySQL, PostgreSQL, MongoDB and ElasticSearch + * Start the database engines: **MySQL**, **PostgreSQL**, **MongoDB** and + **ElasticSearch** - * For MySQL, PostgreSQL and MongoDB, you need to: + * For **MySQL**, **PostgreSQL** and **MongoDB**, you need to: - Create an user (or use defaults) and configure it on `test/test.config` file. - Create test database `sumo_test` on each DB. -NOTE: For MongoDB you first create the test database and then create an user -to access that DB. For more information visit [MongoDB Tutorial](http://docs.mongodb.org/manual/tutorial). +> **Note:** + +> - For **MongoDB** you first create the test database and then create an user + to access that DB. For more information visit [MongoDB Tutorial](http://docs.mongodb.org/manual/tutorial). +> - For **Riak** please follow instruction below first ([ Riak](#riak)). + +## Riak + +### Install Riak + +To install/upgrade **Riak** please follow the instructions in this link: +[Installing and Upgrading Riak](http://docs.basho.com/riak/latest/ops/building/installing). + +### Initial Configurations + +Due to **Riak** comes with default configuration, we need to change some +parameters required by `sumo_db`. + +**Riak** has a main configuration file `riak.conf`, which you can find into +your installation path `$YOUR_INSTALL_PATH/etc/riak.conf`. + +> **Note:** For more information check this link [Configuration Files](http://docs.basho.com/riak/latest/ops/advanced/configs/configuration-files). + +First parameter to change is the default **Riak** backend from **Bitcask** to +**LevelDB**. This change also eblables to use [Riak Secondary Indexes](http://docs.basho.com/riak/latest/ops/advanced/configs/secondary-index/). + + storage_backend = leveldb + +Then proceed to enable search capabilities: + + search = on + +> **Note:** For more information check this link [Riak Search Settings](http://docs.basho.com/riak/latest/ops/advanced/configs/search/). + +### Configuring Riak Data Types and Search + +First, let's create and activate a bucket type simply called maps that is set up +to store Riak maps: + + $ riak-admin bucket-type create maps '{"props":{"datatype":"map"}}' + $ riak-admin bucket-type activate mapsdmin bucket-type activate maps + +Now, let's create a search index called `sumo_test_index` using the default +schema: + + $ curl -XPUT $RIAK_HOST/search/index/sumo_test_index \ + -H 'Content-Type: application/json' \ + -d '{"schema":"_yz_default"}' + +With our index created, we can associate our new `sumo_test_index` index with +our `maps` bucket type: + + $ riak-admin bucket-type update maps '{"props":{"search_index":"sumo_test_index"}}' + +Now we can start to working with **Riak** from `sumo_db`. + +> **Note:** For more information check this link [Riak Data Types and Search](http://docs.basho.com/riak/latest/dev/search/search-data-types/#Maps-Example). ## Change Log diff --git a/src/sumo_store_riak.erl b/src/sumo_store_riak.erl index 2a518db..e1dc934 100644 --- a/src/sumo_store_riak.erl +++ b/src/sumo_store_riak.erl @@ -112,10 +112,10 @@ delete_all(_DocName, #state{conn = Conn, bucket = Bucket} = State) -> ) -> sumo_store:result([sumo_internal:doc()], state()). find_all(DocName, #state{conn = Conn, bucket = Bucket} = State) -> Get = fun({C, B, Kst}, Acc) -> - Fun = fun(K, Acc) -> + Fun = fun(K, Acc0) -> case fetch_map(C, B, K) of - {ok, M} -> [rmap_to_doc(DocName, M) | Acc]; - _ -> Acc + {ok, M} -> [rmap_to_doc(DocName, M) | Acc0]; + _ -> Acc0 end end, lists:foldl(Fun, [], Kst) ++ Acc @@ -217,23 +217,52 @@ new_doc(Doc, #state{conn = Conn, bucket = Bucket}) -> %% @private doc_to_rmap(Doc) -> Fields = sumo_internal:doc_fields(Doc), + map_to_rmap(Fields). + +%% @private +map_to_rmap(Map) -> F = fun({K, V}, Acc) -> - rmap_update({atom_to_binary(K, utf8), to_bin(V)}, Acc) + case V of + V when is_map(V) -> + NewV = map_to_rmap(V), + riakc_map:update({to_bin(K), map}, fun(_M) -> NewV end, Acc); + V when is_list(V) -> + case io_lib:printable_list(V) of + true -> + riakc_map:update( + {to_bin(K), register}, + fun(R) -> riakc_register:set(to_bin(V), R) end, + Acc); + false -> + riakc_map:update({to_bin(K), set}, fun(_S) -> V end, Acc) + end; + _ -> + riakc_map:update( + {to_bin(K), register}, + fun(R) -> riakc_register:set(to_bin(V), R) end, + Acc) + end end, - lists:foldl(F, riakc_map:new(), maps:to_list(Fields)). + lists:foldl(F, riakc_map:new(), maps:to_list(Map)). %% @private rmap_to_doc(DocName, RMap) -> - F = fun({{K, _}, V}, Acc) -> - sumo_internal:set_field(binary_to_atom(K, utf8), V, Acc) + sumo_internal:new_doc(DocName, rmap_to_map(RMap)). + +%% @private +rmap_to_map(RMap) -> + F = fun({{K, map}, V}, Acc) -> + maps:put(to_atom(K), rmap_to_map(V), Acc); + ({{K, _}, V}, Acc) -> + maps:put(to_atom(K), V, Acc) end, - lists:foldl(F, sumo_internal:new_doc(DocName), riakc_map:value(RMap)). + lists:foldl(F, #{}, riakc_map:value(RMap)). %% @private kv_to_doc(DocName, KV) -> F = fun({K, V}, Acc) -> NK = normalize_doc_fields(K), - sumo_internal:set_field(binary_to_atom(NK, utf8), V, Acc) + sumo_internal:set_field(to_atom(NK), V, Acc) end, lists:foldl(F, sumo_internal:new_doc(DocName), KV). @@ -243,10 +272,6 @@ normalize_doc_fields(Src) -> Src, <<"_register|_set|_counter|_flag|_map">>, <<"">>, [{return, binary}, global]). -%% @private -rmap_update({K, V}, Map) -> - riakc_map:update({K, register}, fun(R) -> riakc_register:set(V, R) end, Map). - %% @private fetch_map(Conn, Bucket, Key) -> riakc_pb_socket:fetch_type(Conn, Bucket, Key). @@ -315,6 +340,16 @@ to_bin(Data) when is_list(Data) -> to_bin(Data) -> Data. +%% @private +to_atom(Data) when is_binary(Data) -> + binary_to_atom(Data, utf8); +to_atom(Data) when is_list(Data) -> + list_to_atom(Data); +to_atom(Data) when is_pid(Data); is_reference(Data); is_tuple(Data) -> + list_to_atom(integer_to_list(erlang:phash2(Data))); +to_atom(Data) -> + Data. + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Private API - Query Builder. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% From 3424faf24d86641781636005067ac3b761a89134 Mon Sep 17 00:00:00 2001 From: cabolanos Date: Mon, 2 Mar 2015 10:04:26 -0500 Subject: [PATCH 6/8] Riak implementation notes (and some TODOs) were added. --- src/sumo_store_riak.erl | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/sumo_store_riak.erl b/src/sumo_store_riak.erl index e1dc934..00dc7ff 100644 --- a/src/sumo_store_riak.erl +++ b/src/sumo_store_riak.erl @@ -1,5 +1,17 @@ %%% @hidden %%% @doc Riak store implementation. +%%% Implementation Notes: +%%%
    +%%%
  • Riak Data Types as main structures to push/pull data.
  • +%%%
  • Bulk operations (such as: delete_all and find_all) were +%%% optimized using streaming. Records are streamed in portions +%%% (using Riak 2i to stream keys first), and then the current +%%% operation (e.g.: delete the record or accumulate the values +%%% to return them later) is applied. This allows better memory +%%% and cpu efficiency.
  • +%%%
  • Querys were implemented using Riak Search on Data Types, +%%% to get better performance and flexibility.
  • +%%%
%%% %%% Copyright 2012 Inaka <hello@inaka.net> %%% @@ -220,6 +232,7 @@ doc_to_rmap(Doc) -> map_to_rmap(Fields). %% @private +%% Support multi-level structures. map_to_rmap(Map) -> F = fun({K, V}, Acc) -> case V of @@ -250,6 +263,7 @@ rmap_to_doc(DocName, RMap) -> sumo_internal:new_doc(DocName, rmap_to_map(RMap)). %% @private +%% Support multi-level structures. rmap_to_map(RMap) -> F = fun({{K, map}, V}, Acc) -> maps:put(to_atom(K), rmap_to_map(V), Acc); @@ -310,6 +324,10 @@ receive_stream(Ref, Conn, Bucket, F, Acc) -> end. %% @private +%% @todo Add multi-level support. +%% Instead of transform search result to doc, get just the key and +%% fetch the value (`fetch_map`), then apply `rmap_to_doc` on that +%% value, since this function supports multi-level. search_docs_by(DocName, Conn, Index, Query, Limit, Offset) -> case search(Conn, Index, Query, Limit, Offset) of {ok, {search_results, Results, _, Total}} -> From 8a525632e58a6bafe3b325a761db7a7fef5e7e92 Mon Sep 17 00:00:00 2001 From: cabolanos Date: Tue, 3 Mar 2015 11:18:02 -0500 Subject: [PATCH 7/8] Style fixes according to Elvis, and general fixes to doc. --- README.md | 10 +++---- src/sumo_backend_riak.erl | 3 +- src/sumo_store_riak.erl | 63 ++++++++++++++++++++++----------------- test/sumo_basic_SUITE.erl | 2 +- 4 files changed, 42 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index d1640de..cccddb9 100644 --- a/README.md +++ b/README.md @@ -87,7 +87,7 @@ To run tests successfully, you need to follow these steps first: > - For **MongoDB** you first create the test database and then create an user to access that DB. For more information visit [MongoDB Tutorial](http://docs.mongodb.org/manual/tutorial). -> - For **Riak** please follow instruction below first ([ Riak](#riak)). +> - For **Riak** please follow instructions below ([ Riak](#riak)). ## Riak @@ -98,8 +98,8 @@ To install/upgrade **Riak** please follow the instructions in this link: ### Initial Configurations -Due to **Riak** comes with default configuration, we need to change some -parameters required by `sumo_db`. +Due to the fact that **Riak** comes with default configuration, we need to +change some parameters required by `sumo_db`. **Riak** has a main configuration file `riak.conf`, which you can find into your installation path `$YOUR_INSTALL_PATH/etc/riak.conf`. @@ -107,7 +107,7 @@ your installation path `$YOUR_INSTALL_PATH/etc/riak.conf`. > **Note:** For more information check this link [Configuration Files](http://docs.basho.com/riak/latest/ops/advanced/configs/configuration-files). First parameter to change is the default **Riak** backend from **Bitcask** to -**LevelDB**. This change also eblables to use [Riak Secondary Indexes](http://docs.basho.com/riak/latest/ops/advanced/configs/secondary-index/). +**LevelDB**. This change also enables the use of [Riak Secondary Indexes](http://docs.basho.com/riak/latest/ops/advanced/configs/secondary-index/). storage_backend = leveldb @@ -137,7 +137,7 @@ our `maps` bucket type: $ riak-admin bucket-type update maps '{"props":{"search_index":"sumo_test_index"}}' -Now we can start to working with **Riak** from `sumo_db`. +Now we can start working with **Riak** from `sumo_db`. > **Note:** For more information check this link [Riak Data Types and Search](http://docs.basho.com/riak/latest/dev/search/search-data-types/#Maps-Example). diff --git a/src/sumo_backend_riak.erl b/src/sumo_backend_riak.erl index 32a34dc..527a9be 100644 --- a/src/sumo_backend_riak.erl +++ b/src/sumo_backend_riak.erl @@ -82,14 +82,13 @@ init(Options) -> Host = proplists:get_value(host, Options, "127.0.0.1"), Port = proplists:get_value(port, Options, 8087), Opts = riak_opts(Options), + %% Get DB parameters BucketType = iolist_to_binary( proplists:get_value(bucket_type, Options)), Bucket = iolist_to_binary( proplists:get_value(bucket, Options, <<"sumo_test">>)), Index = iolist_to_binary( proplists:get_value(index, Options, <<"sumo_test_index">>)), - %% Encoding - %%Encoding = proplists:get_value(encoding, Options, json), %% Place Riak connection {ok, Conn} = riakc_pb_socket:start_link(Host, Port, Opts), %% Initial state diff --git a/src/sumo_store_riak.erl b/src/sumo_store_riak.erl index 00dc7ff..a1e994d 100644 --- a/src/sumo_store_riak.erl +++ b/src/sumo_store_riak.erl @@ -9,7 +9,7 @@ %%% operation (e.g.: delete the record or accumulate the values %%% to return them later) is applied. This allows better memory %%% and cpu efficiency. -%%%
  • Querys were implemented using Riak Search on Data Types, +%%%
  • Query functions were implemented using Riak Search on Data Types, %%% to get better performance and flexibility.
  • %%% %%% @@ -124,13 +124,7 @@ delete_all(_DocName, #state{conn = Conn, bucket = Bucket} = State) -> ) -> sumo_store:result([sumo_internal:doc()], state()). find_all(DocName, #state{conn = Conn, bucket = Bucket} = State) -> Get = fun({C, B, Kst}, Acc) -> - Fun = fun(K, Acc0) -> - case fetch_map(C, B, K) of - {ok, M} -> [rmap_to_doc(DocName, M) | Acc0]; - _ -> Acc0 - end - end, - lists:foldl(Fun, [], Kst) ++ Acc + fetch_map_bulk(DocName, C, B, Kst) ++ Acc end, case stream_keys(Conn, Bucket, Get, []) of {ok, Docs} -> {ok, Docs, State}; @@ -227,6 +221,7 @@ new_doc(Doc, #state{conn = Conn, bucket = Bucket}) -> {Id, sumo_internal:set_field(IdField, Id, Doc)}. %% @private +%% Support multi-level structures. doc_to_rmap(Doc) -> Fields = sumo_internal:doc_fields(Doc), map_to_rmap(Fields). @@ -235,30 +230,32 @@ doc_to_rmap(Doc) -> %% Support multi-level structures. map_to_rmap(Map) -> F = fun({K, V}, Acc) -> - case V of - V when is_map(V) -> - NewV = map_to_rmap(V), - riakc_map:update({to_bin(K), map}, fun(_M) -> NewV end, Acc); - V when is_list(V) -> - case io_lib:printable_list(V) of - true -> - riakc_map:update( - {to_bin(K), register}, - fun(R) -> riakc_register:set(to_bin(V), R) end, - Acc); - false -> - riakc_map:update({to_bin(K), set}, fun(_S) -> V end, Acc) - end; - _ -> - riakc_map:update( - {to_bin(K), register}, - fun(R) -> riakc_register:set(to_bin(V), R) end, - Acc) - end + rmap_update(K, V, Acc) end, lists:foldl(F, riakc_map:new(), maps:to_list(Map)). %% @private +rmap_update(K, V, RMap) when is_map(V) -> + NewV = map_to_rmap(V), + riakc_map:update({to_bin(K), map}, fun(_M) -> NewV end, RMap); +rmap_update(K, V, RMap) when is_list(V) -> + case io_lib:printable_list(V) of + true -> + riakc_map:update( + {to_bin(K), register}, + fun(R) -> riakc_register:set(to_bin(V), R) end, + RMap); + false -> + riakc_map:update({to_bin(K), set}, fun(_S) -> V end, RMap) + end; +rmap_update(K, V, RMap) -> + riakc_map:update( + {to_bin(K), register}, + fun(R) -> riakc_register:set(to_bin(V), R) end, + RMap). + +%% @private +%% Support multi-level structures. rmap_to_doc(DocName, RMap) -> sumo_internal:new_doc(DocName, rmap_to_map(RMap)). @@ -290,6 +287,16 @@ normalize_doc_fields(Src) -> fetch_map(Conn, Bucket, Key) -> riakc_pb_socket:fetch_type(Conn, Bucket, Key). +%% @private +fetch_map_bulk(DocName, Conn, Bucket, Keys) -> + Fun = fun(K, Acc) -> + case fetch_map(Conn, Bucket, K) of + {ok, M} -> [rmap_to_doc(DocName, M) | Acc]; + _ -> Acc + end + end, + lists:foldl(Fun, [], Keys). + %% @private delete_map(Conn, Bucket, Key) -> riakc_pb_socket:delete(Conn, Bucket, Key). diff --git a/test/sumo_basic_SUITE.erl b/test/sumo_basic_SUITE.erl index 55060f6..5128a62 100644 --- a/test/sumo_basic_SUITE.erl +++ b/test/sumo_basic_SUITE.erl @@ -93,7 +93,7 @@ init_store(Module) -> %% Riak clusters will retain deleted objects for some period of time %% (3 seconds by default), and the MapReduce framework does not conceal %% these from submitted jobs. - %% @see + %% @see case Module of sumo_test_people_riak -> timer:sleep(5000); _ -> timer:sleep(1000) From 58f50f95f97580a3d4bc466a682d0438137de86d Mon Sep 17 00:00:00 2001 From: cabolanos Date: Tue, 3 Mar 2015 13:19:33 -0500 Subject: [PATCH 8/8] General fixes. --- src/sumo_store_riak.erl | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/sumo_store_riak.erl b/src/sumo_store_riak.erl index a1e994d..b94df24 100644 --- a/src/sumo_store_riak.erl +++ b/src/sumo_store_riak.erl @@ -229,16 +229,13 @@ doc_to_rmap(Doc) -> %% @private %% Support multi-level structures. map_to_rmap(Map) -> - F = fun({K, V}, Acc) -> - rmap_update(K, V, Acc) - end, - lists:foldl(F, riakc_map:new(), maps:to_list(Map)). + lists:foldl(fun rmap_update/2, riakc_map:new(), maps:to_list(Map)). %% @private -rmap_update(K, V, RMap) when is_map(V) -> +rmap_update({K, V}, RMap) when is_map(V) -> NewV = map_to_rmap(V), riakc_map:update({to_bin(K), map}, fun(_M) -> NewV end, RMap); -rmap_update(K, V, RMap) when is_list(V) -> +rmap_update({K, V}, RMap) when is_list(V) -> case io_lib:printable_list(V) of true -> riakc_map:update( @@ -248,7 +245,7 @@ rmap_update(K, V, RMap) when is_list(V) -> false -> riakc_map:update({to_bin(K), set}, fun(_S) -> V end, RMap) end; -rmap_update(K, V, RMap) -> +rmap_update({K, V}, RMap) -> riakc_map:update( {to_bin(K), register}, fun(R) -> riakc_register:set(to_bin(V), R) end,