diff --git a/Makefile b/Makefile
index 601297c..a2bd4a9 100644
--- a/Makefile
+++ b/Makefile
@@ -1,13 +1,14 @@
PROJECT = sumo_db
-DEPS = lager emysql emongo tirerl epgsql wpool
+DEPS = lager emysql emongo tirerl epgsql wpool riakc
dep_lager = git https://github.com/basho/lager.git 2.1.0
dep_emysql = git https://github.com/Eonblast/Emysql.git v0.4.1
dep_emongo = git https://github.com/inaka/emongo.git v0.2.1
dep_tirerl = git https://github.com/inaka/tirerl 0.1.0
dep_epgsql = git https://github.com/epgsql/epgsql 2.0.0
-dep_wpool = git https://github.com/inaka/worker_pool.git 1.0
+dep_wpool = git https://github.com/inaka/worker_pool.git 1.0.1
+dep_riakc = git https://github.com/basho/riak-erlang-client.git 2.0.1
TEST_DEPS = mixer
dep_mixer = git git://github.com/inaka/mixer.git 0.1.2
diff --git a/README.md b/README.md
index 573e005..28a72e4 100644
--- a/README.md
+++ b/README.md
@@ -72,6 +72,76 @@ being in the top level directory:
make all blog
+## Running Tests
+
+To run tests successfully, you need to follow these steps first:
+
+ * Start the database engines: **MySQL**, **PostgreSQL**, **MongoDB** and
+ **ElasticSearch**
+
+ * For **MySQL**, **PostgreSQL** and **MongoDB**, you need to:
+ - Create an user (or use defaults) and configure it on `test/test.config`
+ file.
+ - Create test database `sumo_test` on each DB.
+
+> **Note:**
+
+> - For **MongoDB** you first create the test database and then create an user
+ to access that DB. For more information visit [MongoDB Tutorial](http://docs.mongodb.org/manual/tutorial).
+> - For **Riak** please follow instructions below ([ Riak](#riak)).
+
+## Riak
+
+### Install Riak
+
+To install/upgrade **Riak** please follow the instructions in this link:
+[Installing and Upgrading Riak](http://docs.basho.com/riak/latest/ops/building/installing).
+
+### Initial Configurations
+
+Due to the fact that **Riak** comes with default configuration, we need to
+change some parameters required by `sumo_db`.
+
+**Riak** has a main configuration file `riak.conf`, which you can find into
+your installation path `$YOUR_INSTALL_PATH/etc/riak.conf`.
+
+> **Note:** For more information check this link [Configuration Files](http://docs.basho.com/riak/latest/ops/advanced/configs/configuration-files).
+
+First parameter to change is the default **Riak** backend from **Bitcask** to
+**LevelDB**. This change also enables the use of [Riak Secondary Indexes](http://docs.basho.com/riak/latest/ops/advanced/configs/secondary-index/).
+
+ storage_backend = leveldb
+
+Then proceed to enable search capabilities:
+
+ search = on
+
+> **Note:** For more information check this link [Riak Search Settings](http://docs.basho.com/riak/latest/ops/advanced/configs/search/).
+
+### Configuring Riak Data Types and Search
+
+First, let's create and activate a bucket type simply called maps that is set up
+to store Riak maps:
+
+ $ riak-admin bucket-type create maps '{"props":{"datatype":"map"}}'
+ $ riak-admin bucket-type activate mapsdmin bucket-type activate maps
+
+Now, let's create a search index called `sumo_test_index` using the default
+schema:
+
+ $ curl -XPUT $RIAK_HOST/search/index/sumo_test_index \
+ -H 'Content-Type: application/json' \
+ -d '{"schema":"_yz_default"}'
+
+With our index created, we can associate our new `sumo_test_index` index with
+our `maps` bucket type:
+
+ $ riak-admin bucket-type update maps '{"props":{"search_index":"sumo_test_index"}}'
+
+Now we can start working with **Riak** from `sumo_db`.
+
+> **Note:** For more information check this link [Riak Data Types and Search](http://docs.basho.com/riak/latest/dev/search/search-data-types/#Maps-Example).
+
## Change Log
All notable changes to this project will be documented in the
diff --git a/rebar.config b/rebar.config
index f5f9911..823f04d 100644
--- a/rebar.config
+++ b/rebar.config
@@ -25,7 +25,8 @@
{tirerl, ".*", {git, "git://github.com/inaka/tirerl", "0.1.0"}},
{epgsql, ".*", {git, "git://github.com/epgsql/epgsql", "2.0.0"}},
{mixer, ".*", {git, "git://github.com/inaka/mixer", "0.1.2"}},
- {worker_pool, ".*", {git, "git://github.com/inaka/worker_pool.git", "1.0"}}
+ {worker_pool, ".*", {git, "git://github.com/inaka/worker_pool.git", "1.0.1"}},
+ {riakc, ".*", {git, "git://github.com/basho/riak-erlang-client", "2.0.1"}}
]}.
{xref_warnings, true}.
{xref_checks, [undefined_function_calls, undefined_functions, locals_not_used, deprecated_function_calls, deprecated_functions]}.
diff --git a/src/sumo_backend_riak.erl b/src/sumo_backend_riak.erl
new file mode 100644
index 0000000..527a9be
--- /dev/null
+++ b/src/sumo_backend_riak.erl
@@ -0,0 +1,135 @@
+%%% @hidden
+%%% @doc Riak storage backend implementation.
+%%%
+%%% Copyright 2012 Inaka <hello@inaka.net>
+%%%
+%%% Licensed under the Apache License, Version 2.0 (the "License");
+%%% you may not use this file except in compliance with the License.
+%%% You may obtain a copy of the License at
+%%%
+%%% http://www.apache.org/licenses/LICENSE-2.0
+%%%
+%%% Unless required by applicable law or agreed to in writing, software
+%%% distributed under the License is distributed on an "AS IS" BASIS,
+%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%%% See the License for the specific language governing permissions and
+%%% limitations under the License.
+%%% @end
+%%% @copyright Inaka
+%%%
+-module(sumo_backend_riak).
+-author("Carlos Andres Bolanos ").
+-license("Apache License 2.0").
+
+-behaviour(gen_server).
+-behaviour(sumo_backend).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Exports.
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+%%% Public API.
+-export(
+ [ get_connection/1,
+ get_state/1
+ ]).
+
+%%% Exports for sumo_backend
+-export(
+ [ start_link/2
+ ]).
+
+%%% Exports for gen_server
+-export(
+ [ init/1
+ , handle_call/3
+ , handle_cast/2
+ , handle_info/2
+ , terminate/2
+ , code_change/3
+ ]).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Types.
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+-record(state, {conn :: pid(), bucket :: binary(), index :: binary()}).
+-type state() :: #state{}.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% External API.
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+-spec start_link(atom(), proplists:proplist()) -> {ok, pid()}|term().
+start_link(Name, Options) ->
+ gen_server:start_link({local, Name}, ?MODULE, Options, []).
+
+-spec get_connection(atom() | pid()) -> atom().
+get_connection(Name) ->
+ gen_server:call(Name, get_connection).
+
+-spec get_state(atom() | pid()) -> state().
+get_state(Name) ->
+ gen_server:call(Name, get_state).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% gen_server stuff.
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+-spec init([term()]) -> {ok, state()}.
+init(Options) ->
+ %% Get connection parameters
+ Host = proplists:get_value(host, Options, "127.0.0.1"),
+ Port = proplists:get_value(port, Options, 8087),
+ Opts = riak_opts(Options),
+ %% Get DB parameters
+ BucketType = iolist_to_binary(
+ proplists:get_value(bucket_type, Options)),
+ Bucket = iolist_to_binary(
+ proplists:get_value(bucket, Options, <<"sumo_test">>)),
+ Index = iolist_to_binary(
+ proplists:get_value(index, Options, <<"sumo_test_index">>)),
+ %% Place Riak connection
+ {ok, Conn} = riakc_pb_socket:start_link(Host, Port, Opts),
+ %% Initial state
+ {ok, #state{conn = Conn, bucket = {BucketType, Bucket}, index = Index}}.
+
+-spec handle_call(term(), term(), state()) -> {reply, term(), state()}.
+handle_call(get_connection, _From, State = #state{conn = Conn}) ->
+ {reply, Conn, State};
+handle_call(get_state, _From, State) ->
+ {reply, State, State}.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Unused Callbacks
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+-spec handle_cast(term(), state()) -> {noreply, state()}.
+handle_cast(_Msg, State) -> {noreply, State}.
+
+-spec handle_info(term(), state()) -> {noreply, state()}.
+handle_info(_Msg, State) -> {noreply, State}.
+
+-spec terminate(term(), state()) -> ok.
+terminate(_Reason, _State) -> ok.
+
+-spec code_change(term(), state(), term()) -> {ok, state()}.
+code_change(_OldVsn, State, _Extra) -> {ok, State}.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% gen_server stuff.
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+-spec riak_opts([term()]) -> [term()].
+riak_opts(Options) ->
+ User = proplists:get_value(username, Options),
+ Pass = proplists:get_value(password, Options),
+ Opts0 = case User /= undefined andalso Pass /= undefined of
+ true -> [{credentials, User, Pass}];
+ _ -> []
+ end,
+ Opts1 = case lists:keyfind(connect_timeout, 1, Options) of
+ {_, V1} -> [{connect_timeout, V1}, {auto_reconnect, true}] ++ Opts0;
+ _ -> [{auto_reconnect, true}] ++ Opts0
+ end,
+ Opts1.
diff --git a/src/sumo_store_riak.erl b/src/sumo_store_riak.erl
new file mode 100644
index 0000000..b94df24
--- /dev/null
+++ b/src/sumo_store_riak.erl
@@ -0,0 +1,425 @@
+%%% @hidden
+%%% @doc Riak store implementation.
+%%% Implementation Notes:
+%%%
+%%% - Riak Data Types as main structures to push/pull data.
+%%% - Bulk operations (such as: delete_all and find_all) were
+%%% optimized using streaming. Records are streamed in portions
+%%% (using Riak 2i to stream keys first), and then the current
+%%% operation (e.g.: delete the record or accumulate the values
+%%% to return them later) is applied. This allows better memory
+%%% and cpu efficiency.
+%%% - Query functions were implemented using Riak Search on Data Types,
+%%% to get better performance and flexibility.
+%%%
+%%%
+%%% Copyright 2012 Inaka <hello@inaka.net>
+%%%
+%%% Licensed under the Apache License, Version 2.0 (the "License");
+%%% you may not use this file except in compliance with the License.
+%%% You may obtain a copy of the License at
+%%%
+%%% http://www.apache.org/licenses/LICENSE-2.0
+%%%
+%%% Unless required by applicable law or agreed to in writing, software
+%%% distributed under the License is distributed on an "AS IS" BASIS,
+%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%%% See the License for the specific language governing permissions and
+%%% limitations under the License.
+%%% @end
+%%% @copyright Inaka
+%%%
+-module(sumo_store_riak).
+-author("Carlos Andres Bolanos ").
+-github("https://github.com/inaka").
+-license("Apache License 2.0").
+
+-behavior(sumo_store).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Exports.
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+%% Public API.
+-export([init/1]).
+-export([create_schema/2]).
+-export([persist/2]).
+-export([delete_by/3, delete_all/2]).
+-export([find_all/2, find_all/5, find_by/3, find_by/5, find_by/6]).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Types.
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+-record(state, {conn :: pid(), bucket :: binary(), index :: binary()}).
+-type state() :: #state{}.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% External API.
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+-spec init(
+ term()
+) -> {ok, term()}.
+init(Options) ->
+ % The storage backend key in the options specifies the name of the process
+ % which creates and initializes the storage backend.
+ Backend = proplists:get_value(storage_backend, Options),
+ State = sumo_backend_riak:get_state(Backend),
+ {ok, State}.
+
+-spec persist(
+ sumo_internal:doc(), state()
+) -> sumo_store:result(sumo_internal:doc(), state()).
+persist(Doc, #state{conn = Conn, bucket = Bucket} = State) ->
+ {Id, NewDoc} = new_doc(Doc, State),
+ case update_map(Conn, Bucket, Id, doc_to_rmap(NewDoc)) of
+ {error, Error} ->
+ {error, Error, State};
+ _ ->
+ {ok, NewDoc, State}
+ end.
+
+-spec delete_by(
+ sumo:schema_name(), sumo:conditions(), state()
+) -> sumo_store:result(sumo_store:affected_rows(), state()).
+delete_by(DocName,
+ Conditions,
+ #state{conn = Conn, bucket = Bucket, index = Index} = State) ->
+ IdField = sumo_internal:id_field_name(DocName),
+ case lists:keyfind(IdField, 1, Conditions) of
+ {_K, Key} ->
+ case delete_map(Conn, Bucket, iolist_to_binary(Key)) of
+ ok ->
+ {ok, 1, State};
+ {error, Error} ->
+ {error, Error, State}
+ end;
+ _ ->
+ Query = build_query(Conditions),
+ case search_docs_by(DocName, Conn, Index, Query, 0, 0) of
+ {ok, {Total, Res}} ->
+ delete_docs(Conn, Bucket, Res),
+ {ok, Total, State};
+ {error, Error} ->
+ {error, Error, State}
+ end
+ end.
+
+-spec delete_all(
+ sumo:schema_name(), state()
+) -> sumo_store:result(sumo_store:affected_rows(), state()).
+delete_all(_DocName, #state{conn = Conn, bucket = Bucket} = State) ->
+ Del = fun({C, B, Kst}, Acc) ->
+ lists:foreach(fun(K) -> delete_map(C, B, K) end, Kst),
+ Acc + length(Kst)
+ end,
+ case stream_keys(Conn, Bucket, Del, 0) of
+ {ok, Count} -> {ok, Count, State};
+ {_, Count} -> {error, Count, State}
+ end.
+
+-spec find_all(
+ sumo:schema_name(), state()
+) -> sumo_store:result([sumo_internal:doc()], state()).
+find_all(DocName, #state{conn = Conn, bucket = Bucket} = State) ->
+ Get = fun({C, B, Kst}, Acc) ->
+ fetch_map_bulk(DocName, C, B, Kst) ++ Acc
+ end,
+ case stream_keys(Conn, Bucket, Get, []) of
+ {ok, Docs} -> {ok, Docs, State};
+ {_, Docs} -> {error, Docs, State}
+ end.
+
+-spec find_all(
+ sumo:schema_name(),
+ term(),
+ non_neg_integer(),
+ non_neg_integer(),
+ state()
+) -> sumo_store:result([sumo_internal:doc()], state()).
+find_all(DocName, _SortFields, Limit, Offset, State) ->
+ %% @todo implement search with sort parameters.
+ find_by(DocName, [], Limit, Offset, State).
+
+-spec find_by(sumo:schema_name(), sumo:conditions(), state()) ->
+ sumo_store:result([sumo_internal:doc()], state()).
+find_by(DocName, Conditions, State) ->
+ find_by(DocName, Conditions, 0, 0, State).
+
+-spec find_by(
+ sumo:schema_name(),
+ sumo:conditions(),
+ non_neg_integer(),
+ non_neg_integer(),
+ state()
+) -> sumo_store:result([sumo_internal:doc()], state()).
+find_by(DocName,
+ Conditions,
+ Limit,
+ Offset,
+ #state{conn = Conn, bucket = Bucket, index = Index} = State) ->
+ IdField = sumo_internal:id_field_name(DocName),
+ case lists:keyfind(IdField, 1, Conditions) of
+ {_K, Key} ->
+ case fetch_map(Conn, Bucket, iolist_to_binary(Key)) of
+ {ok, RMap} ->
+ Val = rmap_to_doc(DocName, RMap),
+ {ok, [Val], State};
+ {error, Error} ->
+ {error, Error, State}
+ end;
+ _ ->
+ Query = build_query(Conditions),
+ case search_docs_by(DocName, Conn, Index, Query, Limit, Offset) of
+ {ok, {_, Res}} -> {ok, Res, State};
+ {error, Error} -> {error, Error, State}
+ end
+ end.
+
+-spec find_by(
+ sumo:schema_name(),
+ sumo:conditions(),
+ term(),
+ non_neg_integer(),
+ non_neg_integer(),
+ state()
+) -> sumo_store:result([sumo_internal:doc()], state()).
+find_by(_DocName, _Conditions, _SortFields, _Limit, _Offset, State) ->
+ {error, not_supported, State}.
+
+-spec create_schema(
+ sumo:schema(), state()
+) -> sumo_store:result(state()).
+create_schema(_Schema, State) ->
+ {ok, State}.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Private API.
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+%% @private
+doc_id(Doc) ->
+ DocName = sumo_internal:doc_name(Doc),
+ IdField = sumo_internal:id_field_name(DocName),
+ sumo_internal:get_field(IdField, Doc).
+
+%% @private
+new_doc(Doc, #state{conn = Conn, bucket = Bucket}) ->
+ DocName = sumo_internal:doc_name(Doc),
+ IdField = sumo_internal:id_field_name(DocName),
+ Id = case sumo_internal:get_field(IdField, Doc) of
+ undefined ->
+ case update_map(Conn, Bucket, undefined, doc_to_rmap(Doc)) of
+ {ok, RiakMapId} -> RiakMapId;
+ {error, Error} -> throw(Error);
+ _ -> throw(unexpected)
+ end;
+ Id0 ->
+ to_bin(Id0)
+ end,
+ {Id, sumo_internal:set_field(IdField, Id, Doc)}.
+
+%% @private
+%% Support multi-level structures.
+doc_to_rmap(Doc) ->
+ Fields = sumo_internal:doc_fields(Doc),
+ map_to_rmap(Fields).
+
+%% @private
+%% Support multi-level structures.
+map_to_rmap(Map) ->
+ lists:foldl(fun rmap_update/2, riakc_map:new(), maps:to_list(Map)).
+
+%% @private
+rmap_update({K, V}, RMap) when is_map(V) ->
+ NewV = map_to_rmap(V),
+ riakc_map:update({to_bin(K), map}, fun(_M) -> NewV end, RMap);
+rmap_update({K, V}, RMap) when is_list(V) ->
+ case io_lib:printable_list(V) of
+ true ->
+ riakc_map:update(
+ {to_bin(K), register},
+ fun(R) -> riakc_register:set(to_bin(V), R) end,
+ RMap);
+ false ->
+ riakc_map:update({to_bin(K), set}, fun(_S) -> V end, RMap)
+ end;
+rmap_update({K, V}, RMap) ->
+ riakc_map:update(
+ {to_bin(K), register},
+ fun(R) -> riakc_register:set(to_bin(V), R) end,
+ RMap).
+
+%% @private
+%% Support multi-level structures.
+rmap_to_doc(DocName, RMap) ->
+ sumo_internal:new_doc(DocName, rmap_to_map(RMap)).
+
+%% @private
+%% Support multi-level structures.
+rmap_to_map(RMap) ->
+ F = fun({{K, map}, V}, Acc) ->
+ maps:put(to_atom(K), rmap_to_map(V), Acc);
+ ({{K, _}, V}, Acc) ->
+ maps:put(to_atom(K), V, Acc)
+ end,
+ lists:foldl(F, #{}, riakc_map:value(RMap)).
+
+%% @private
+kv_to_doc(DocName, KV) ->
+ F = fun({K, V}, Acc) ->
+ NK = normalize_doc_fields(K),
+ sumo_internal:set_field(to_atom(NK), V, Acc)
+ end,
+ lists:foldl(F, sumo_internal:new_doc(DocName), KV).
+
+%% @private
+normalize_doc_fields(Src) ->
+ re:replace(
+ Src, <<"_register|_set|_counter|_flag|_map">>, <<"">>,
+ [{return, binary}, global]).
+
+%% @private
+fetch_map(Conn, Bucket, Key) ->
+ riakc_pb_socket:fetch_type(Conn, Bucket, Key).
+
+%% @private
+fetch_map_bulk(DocName, Conn, Bucket, Keys) ->
+ Fun = fun(K, Acc) ->
+ case fetch_map(Conn, Bucket, K) of
+ {ok, M} -> [rmap_to_doc(DocName, M) | Acc];
+ _ -> Acc
+ end
+ end,
+ lists:foldl(Fun, [], Keys).
+
+%% @private
+delete_map(Conn, Bucket, Key) ->
+ riakc_pb_socket:delete(Conn, Bucket, Key).
+
+%% @private
+update_map(Conn, Bucket, Key, Map) ->
+ riakc_pb_socket:update_type(Conn, Bucket, Key, riakc_map:to_op(Map)).
+
+%% @private
+search(Conn, Index, Query, 0, 0) ->
+ riakc_pb_socket:search(Conn, Index, Query);
+search(Conn, Index, Query, Limit, Offset) ->
+ riakc_pb_socket:search(Conn, Index, Query, [{start, Offset}, {rows, Limit}]).
+
+%% @private
+stream_keys(Conn, Bucket, F, Acc) ->
+ {ok, Ref} = riakc_pb_socket:get_index_eq(
+ Conn, Bucket, <<"$bucket">>, <<"">>, [{stream, true}]),
+ receive_stream(Ref, Conn, Bucket, F, Acc).
+
+%% @private
+receive_stream(Ref, Conn, Bucket, F, Acc) ->
+ receive
+ {Ref, {_, Stream, _}} ->
+ receive_stream(Ref, Conn, Bucket, F, F({Conn, Bucket, Stream}, Acc));
+ {Ref, {done, _}} ->
+ {ok, Acc};
+ _ ->
+ {error, Acc}
+ after
+ 30000 -> {timeout, Acc}
+ end.
+
+%% @private
+%% @todo Add multi-level support.
+%% Instead of transform search result to doc, get just the key and
+%% fetch the value (`fetch_map`), then apply `rmap_to_doc` on that
+%% value, since this function supports multi-level.
+search_docs_by(DocName, Conn, Index, Query, Limit, Offset) ->
+ case search(Conn, Index, Query, Limit, Offset) of
+ {ok, {search_results, Results, _, Total}} ->
+ F = fun({_, KV}, Acc) -> [kv_to_doc(DocName, KV) | Acc] end,
+ NewRes = lists:foldl(F, [], Results),
+ {ok, {Total, NewRes}};
+ {error, Error} ->
+ {error, Error}
+ end.
+
+%% @private
+delete_docs(Conn, Bucket, Docs) ->
+ F = fun(D) ->
+ K = doc_id(D),
+ delete_map(Conn, Bucket, K)
+ end,
+ lists:foreach(F, Docs).
+
+%% @private
+to_bin(Data) when is_integer(Data) ->
+ integer_to_binary(Data);
+to_bin(Data) when is_float(Data) ->
+ float_to_binary(Data);
+to_bin(Data) when is_atom(Data) ->
+ atom_to_binary(Data, utf8);
+to_bin(Data) when is_list(Data) ->
+ iolist_to_binary(Data);
+to_bin(Data) ->
+ Data.
+
+%% @private
+to_atom(Data) when is_binary(Data) ->
+ binary_to_atom(Data, utf8);
+to_atom(Data) when is_list(Data) ->
+ list_to_atom(Data);
+to_atom(Data) when is_pid(Data); is_reference(Data); is_tuple(Data) ->
+ list_to_atom(integer_to_list(erlang:phash2(Data)));
+to_atom(Data) ->
+ Data.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Private API - Query Builder.
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+%% @private
+build_query({q_str, Q}) when is_binary(Q) ->
+ Q;
+build_query([]) ->
+ <<"*:*">>;
+build_query(PL) when is_list(PL) ->
+ build_query1(PL, <<"">>);
+build_query(_) ->
+ <<"*:*">>.
+
+%% @private
+build_query1([], Acc) ->
+ Acc;
+build_query1([{_, [{_, _} | _T0]} = KV | T], <<"">>) ->
+ build_query1(T,
+ <<(<<"(">>)/binary, (build_query2(KV))/binary, (<<")">>)/binary>>);
+build_query1([{_, [{_, _} | _T0]} = KV | T], Acc) ->
+ build_query1(T,
+ <>)/binary,
+ (build_query2(KV))/binary, (<<")">>)/binary>>);
+build_query1([{K, V} | T], <<"">>) ->
+ build_query1(T, <<(query_eq(K, V))/binary>>);
+build_query1([{K, V} | T], Acc) ->
+ build_query1(T,
+ <>)/binary, (query_eq(K, V))/binary>>).
+
+%% @private
+build_query2({K, [{_, _} | _T] = V}) ->
+ F = fun({K_, V_}, Acc) ->
+ Eq = <<(atom_to_binary(K_, utf8))/binary,
+ (<<"_register:">>)/binary,
+ (to_bin(V_))/binary>>,
+ case Acc of
+ <<"">> ->
+ Eq;
+ _ ->
+ <>)/binary, (to_bin(K))/binary,
+ (<<" ">>)/binary, Eq/binary>>
+ end
+ end,
+ lists:foldl(F, <<"">>, V).
+
+%% @private
+query_eq(K, V) ->
+ <<(atom_to_binary(K, utf8))/binary,
+ (<<"_register:">>)/binary,
+ (to_bin(V))/binary>>.
diff --git a/test/sumo_basic_SUITE.erl b/test/sumo_basic_SUITE.erl
index f9a1356..5128a62 100644
--- a/test/sumo_basic_SUITE.erl
+++ b/test/sumo_basic_SUITE.erl
@@ -87,20 +87,38 @@ init_store(Module) ->
sumo:persist(Module, Module:new(<<"E">>, <<"A">>, 2)),
sumo:persist(Module, Module:new(<<"F">>, <<"E">>, 1)),
- %% This is necessary to get elasticsearch in
- %% particular to index its stuff.
- timer:sleep(1000).
+ %% Sync Timeout.
+ %% 1. Necessary to get elasticsearch in particular to index its stuff.
+ %% 2. Necessary to Riak.
+ %% Riak clusters will retain deleted objects for some period of time
+ %% (3 seconds by default), and the MapReduce framework does not conceal
+ %% these from submitted jobs.
+ %% @see
+ case Module of
+ sumo_test_people_riak -> timer:sleep(5000);
+ _ -> timer:sleep(1000)
+ end.
find_all_module(Module) ->
6 = length(sumo:find_all(Module)).
find_by_module(Module) ->
Results = sumo:find_by(Module, [{last_name, <<"D">>}]),
+ 2 = length(Results),
SortFun = fun(A, B) -> Module:name(A) < Module:name(B) end,
[First, Second | _] = lists:sort(SortFun, Results),
+ true = is_integer(Module:age(First)),
+
"B" = to_str(Module:name(First)),
- "D" = to_str(Module:name(Second)).
+ "D" = to_str(Module:name(Second)),
+
+ %% Check find_by ID
+ [First1] = sumo:find_by(Module, [{id, Module:id(First)}]),
+ First1 = First,
+ %% Check pagination
+ Results1 = sumo:find_by(Module, [], 3, 1),
+ 3 = length(Results1).
delete_all_module(Module) ->
sumo:delete_all(Module),
@@ -121,7 +139,8 @@ run_all_stores(Fun) ->
Modules = [sumo_test_people_mysql,
sumo_test_people_mongo,
sumo_test_people_elasticsearch,
- sumo_test_people_pgsql],
+ sumo_test_people_pgsql,
+ sumo_test_people_riak],
lists:foreach(Fun, Modules).
-spec to_str(any()) -> string().
diff --git a/test/sumo_test_people.erl b/test/sumo_test_people.erl
index 57a80be..6b5e7a5 100644
--- a/test/sumo_test_people.erl
+++ b/test/sumo_test_people.erl
@@ -8,7 +8,7 @@
sumo_sleep/1
]).
--export([new/2, new/3, new/4, name/1, id/1]).
+-export([new/2, new/3, new/4, name/1, id/1, age/1]).
-record(person, {id :: integer(),
name :: string(),
@@ -36,7 +36,7 @@ sumo_wakeup(Person) ->
id = maps:get(id, Person),
name = maps:get(name, Person),
last_name = maps:get(last_name, Person),
- age = maps:get(age, Person),
+ age = from_bin(maps:get(age, Person), integer),
address = maps:get(address, Person)
}.
@@ -64,3 +64,27 @@ name(Person) ->
id(Person) ->
Person#person.id.
+
+age(Person) ->
+ Person#person.age.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%%% Internals
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+%% @private
+%% @doc This helper function is needed by Riak, because data in Riak is stored
+%% as Riak Maps, and values in the them must be binary, so when a data is
+%% stored in Riak, all values in the map are converted to binary, because
+%% of that, is necessary convert values to original types when data is
+%% returned.
+from_bin(Bin, integer) when is_binary(Bin) ->
+ binary_to_integer(Bin);
+from_bin(Bin, float) when is_binary(Bin) ->
+ binary_to_float(Bin);
+from_bin(Bin, atom) when is_binary(Bin) ->
+ binary_to_atom(Bin, utf8);
+from_bin(Bin, string) when is_binary(Bin) ->
+ binary_to_list(Bin);
+from_bin(Bin, _) ->
+ Bin.
diff --git a/test/sumo_test_people_elasticsearch.erl b/test/sumo_test_people_elasticsearch.erl
index 5b022ab..7b7f8b2 100644
--- a/test/sumo_test_people_elasticsearch.erl
+++ b/test/sumo_test_people_elasticsearch.erl
@@ -10,7 +10,8 @@
new/3,
new/4,
name/1,
- id/1
+ id/1,
+ age/1
]
}
]).
diff --git a/test/sumo_test_people_mongo.erl b/test/sumo_test_people_mongo.erl
index 8317eef..1c06e23 100644
--- a/test/sumo_test_people_mongo.erl
+++ b/test/sumo_test_people_mongo.erl
@@ -10,7 +10,8 @@
new/3,
new/4,
name/1,
- id/1
+ id/1,
+ age/1
]
}
]).
diff --git a/test/sumo_test_people_mysql.erl b/test/sumo_test_people_mysql.erl
index 1543a4d..01d81a7 100644
--- a/test/sumo_test_people_mysql.erl
+++ b/test/sumo_test_people_mysql.erl
@@ -10,7 +10,8 @@
new/3,
new/4,
name/1,
- id/1
+ id/1,
+ age/1
]
}
]).
diff --git a/test/sumo_test_people_pgsql.erl b/test/sumo_test_people_pgsql.erl
index 3e15c01..16676c3 100644
--- a/test/sumo_test_people_pgsql.erl
+++ b/test/sumo_test_people_pgsql.erl
@@ -10,7 +10,8 @@
new/3,
new/4,
name/1,
- id/1
+ id/1,
+ age/1
]
}
]).
diff --git a/test/sumo_test_people_riak.erl b/test/sumo_test_people_riak.erl
new file mode 100644
index 0000000..5d7694c
--- /dev/null
+++ b/test/sumo_test_people_riak.erl
@@ -0,0 +1,30 @@
+-module(sumo_test_people_riak).
+
+-behavior(sumo_doc).
+
+-include_lib("mixer/include/mixer.hrl").
+-mixin([{sumo_test_people,
+ [sumo_wakeup/1,
+ sumo_sleep/1,
+ new/2,
+ new/3,
+ new/4,
+ name/1,
+ id/1,
+ age/1
+ ]
+ }
+ ]).
+
+-export([sumo_schema/0]).
+
+-spec sumo_schema() -> sumo:schema().
+sumo_schema() ->
+ Fields =
+ [sumo:new_field(id, string, [id, {length, 255}, not_null]),
+ sumo:new_field(name, string, [{length, 255}, not_null]),
+ sumo:new_field(last_name, string, [{length, 255}, not_null]),
+ sumo:new_field(age, integer),
+ sumo:new_field(address, string, [{length, 255}])
+ ],
+ sumo:new_schema(?MODULE, Fields).
diff --git a/test/test.config b/test/test.config
index ecd77fb..0449b71 100644
--- a/test/test.config
+++ b/test/test.config
@@ -36,8 +36,16 @@
[{host, "127.0.0.1"},
{port, 5432},
{database, "sumo_test"},
- {username, "jfacorro"},
- {password, ""}]
+ {username, "root"},
+ {password, "pass"}]
+ },
+ {sumo_test_backend_riak,
+ sumo_backend_riak,
+ [{host, "127.0.0.1"},
+ {port, 8087},
+ {bucket_type, "maps"},
+ {bucket, "sumo_test"},
+ {index, "sumo_test_index"}]
}
]
},
@@ -61,6 +69,11 @@
sumo_store_pgsql,
[{storage_backend, sumo_test_backend_pgsql},
{workers, 10}]
+ },
+ {sumo_test_riak,
+ sumo_store_riak,
+ [{storage_backend, sumo_test_backend_riak},
+ {workers, 10}]
}
]
},
@@ -69,7 +82,8 @@
{sumo_test_people_mysql, sumo_test_mysql},
{sumo_test_people_mongo, sumo_test_mongo},
{sumo_test_people_elasticsearch, sumo_test_elasticsearch},
- {sumo_test_people_pgsql, sumo_test_pgsql}
+ {sumo_test_people_pgsql, sumo_test_pgsql},
+ {sumo_test_people_riak, sumo_test_riak}
]
},
{events,