From 58b89c2eb4a6489982036e45ce0407714ac5002a Mon Sep 17 00:00:00 2001 From: cabol Date: Tue, 10 Mar 2015 14:34:37 -0500 Subject: [PATCH 1/4] Riak backend config was modified to allow multiple stores to share one backend. Also, R/W quorums can be configured now within the store. --- README.md | 2 +- src/sumo_backend_riak.erl | 43 +++++++++--------------------- src/sumo_store_riak.erl | 56 ++++++++++++++++++++++++++------------- test/test.config | 12 +++++---- 4 files changed, 58 insertions(+), 55 deletions(-) diff --git a/README.md b/README.md index 28a72e4..76cf018 100644 --- a/README.md +++ b/README.md @@ -124,7 +124,7 @@ First, let's create and activate a bucket type simply called maps that is set up to store Riak maps: $ riak-admin bucket-type create maps '{"props":{"datatype":"map"}}' - $ riak-admin bucket-type activate mapsdmin bucket-type activate maps + $ riak-admin bucket-type activate maps Now, let's create a search index called `sumo_test_index` using the default schema: diff --git a/src/sumo_backend_riak.erl b/src/sumo_backend_riak.erl index 527a9be..f5d7bef 100644 --- a/src/sumo_backend_riak.erl +++ b/src/sumo_backend_riak.erl @@ -29,31 +29,25 @@ %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Public API. --export( - [ get_connection/1, - get_state/1 - ]). +-export([get_connection/1]). %%% Exports for sumo_backend --export( - [ start_link/2 - ]). +-export([start_link/2]). %%% Exports for gen_server --export( - [ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Types. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --record(state, {conn :: pid(), bucket :: binary(), index :: binary()}). +-record(state, {conn :: pid()}). -type state() :: #state{}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -68,10 +62,6 @@ start_link(Name, Options) -> get_connection(Name) -> gen_server:call(Name, get_connection). --spec get_state(atom() | pid()) -> state(). -get_state(Name) -> - gen_server:call(Name, get_state). - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% gen_server stuff. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -82,23 +72,14 @@ init(Options) -> Host = proplists:get_value(host, Options, "127.0.0.1"), Port = proplists:get_value(port, Options, 8087), Opts = riak_opts(Options), - %% Get DB parameters - BucketType = iolist_to_binary( - proplists:get_value(bucket_type, Options)), - Bucket = iolist_to_binary( - proplists:get_value(bucket, Options, <<"sumo_test">>)), - Index = iolist_to_binary( - proplists:get_value(index, Options, <<"sumo_test_index">>)), %% Place Riak connection {ok, Conn} = riakc_pb_socket:start_link(Host, Port, Opts), %% Initial state - {ok, #state{conn = Conn, bucket = {BucketType, Bucket}, index = Index}}. + {ok, #state{conn = Conn}}. -spec handle_call(term(), term(), state()) -> {reply, term(), state()}. handle_call(get_connection, _From, State = #state{conn = Conn}) -> - {reply, Conn, State}; -handle_call(get_state, _From, State) -> - {reply, State, State}. + {reply, Conn, State}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Unused Callbacks diff --git a/src/sumo_store_riak.erl b/src/sumo_store_riak.erl index b94df24..aee89c3 100644 --- a/src/sumo_store_riak.erl +++ b/src/sumo_store_riak.erl @@ -51,7 +51,14 @@ %% Types. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --record(state, {conn :: pid(), bucket :: binary(), index :: binary()}). +-type r_param() :: r | pr | notfound_ok. +-type w_param() :: w | pw | dw | returnbody. + +-record(state, {conn :: pid(), + bucket :: binary(), + index :: binary(), + r_args :: [{r_param(), integer() | (true | false)}], + w_args :: [{w_param(), integer() | (true | false)}]}). -type state() :: #state{}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -61,19 +68,29 @@ -spec init( term() ) -> {ok, term()}. -init(Options) -> +init(Opts) -> % The storage backend key in the options specifies the name of the process % which creates and initializes the storage backend. - Backend = proplists:get_value(storage_backend, Options), - State = sumo_backend_riak:get_state(Backend), + Backend = proplists:get_value(storage_backend, Opts), + Conn = sumo_backend_riak:get_connection(Backend), + BucketType = iolist_to_binary(proplists:get_value(bucket_type, Opts)), + Bucket = iolist_to_binary(proplists:get_value(bucket, Opts, <<"sumo">>)), + Index = iolist_to_binary(proplists:get_value(index, Opts, <<"sumo_idx">>)), + R = proplists:get_value(r_args, Opts, []), + W = proplists:get_value(w_args, Opts, []), + State = #state{conn = Conn, + bucket = {BucketType, Bucket}, + index = Index, + r_args = R, + w_args = W}, {ok, State}. -spec persist( sumo_internal:doc(), state() ) -> sumo_store:result(sumo_internal:doc(), state()). -persist(Doc, #state{conn = Conn, bucket = Bucket} = State) -> +persist(Doc, #state{conn = Conn, bucket = Bucket, w_args = W} = State) -> {Id, NewDoc} = new_doc(Doc, State), - case update_map(Conn, Bucket, Id, doc_to_rmap(NewDoc)) of + case update_map(Conn, Bucket, Id, doc_to_rmap(NewDoc), W) of {error, Error} -> {error, Error, State}; _ -> @@ -122,9 +139,9 @@ delete_all(_DocName, #state{conn = Conn, bucket = Bucket} = State) -> -spec find_all( sumo:schema_name(), state() ) -> sumo_store:result([sumo_internal:doc()], state()). -find_all(DocName, #state{conn = Conn, bucket = Bucket} = State) -> +find_all(DocName, #state{conn = Conn, bucket = Bucket, r_args = R} = State) -> Get = fun({C, B, Kst}, Acc) -> - fetch_map_bulk(DocName, C, B, Kst) ++ Acc + fetch_map_bulk(DocName, C, B, Kst, R) ++ Acc end, case stream_keys(Conn, Bucket, Get, []) of {ok, Docs} -> {ok, Docs, State}; @@ -158,11 +175,14 @@ find_by(DocName, Conditions, Limit, Offset, - #state{conn = Conn, bucket = Bucket, index = Index} = State) -> + #state{conn = Conn, + bucket = Bucket, + index = Index, + r_args = R} = State) -> IdField = sumo_internal:id_field_name(DocName), case lists:keyfind(IdField, 1, Conditions) of {_K, Key} -> - case fetch_map(Conn, Bucket, iolist_to_binary(Key)) of + case fetch_map(Conn, Bucket, iolist_to_binary(Key), R) of {ok, RMap} -> Val = rmap_to_doc(DocName, RMap), {ok, [Val], State}; @@ -205,12 +225,12 @@ doc_id(Doc) -> sumo_internal:get_field(IdField, Doc). %% @private -new_doc(Doc, #state{conn = Conn, bucket = Bucket}) -> +new_doc(Doc, #state{conn = Conn, bucket = Bucket, w_args = W}) -> DocName = sumo_internal:doc_name(Doc), IdField = sumo_internal:id_field_name(DocName), Id = case sumo_internal:get_field(IdField, Doc) of undefined -> - case update_map(Conn, Bucket, undefined, doc_to_rmap(Doc)) of + case update_map(Conn, Bucket, undefined, doc_to_rmap(Doc), W) of {ok, RiakMapId} -> RiakMapId; {error, Error} -> throw(Error); _ -> throw(unexpected) @@ -281,13 +301,13 @@ normalize_doc_fields(Src) -> [{return, binary}, global]). %% @private -fetch_map(Conn, Bucket, Key) -> - riakc_pb_socket:fetch_type(Conn, Bucket, Key). +fetch_map(Conn, Bucket, Key, Opts) -> + riakc_pb_socket:fetch_type(Conn, Bucket, Key, Opts). %% @private -fetch_map_bulk(DocName, Conn, Bucket, Keys) -> +fetch_map_bulk(DocName, Conn, Bucket, Keys, Opts) -> Fun = fun(K, Acc) -> - case fetch_map(Conn, Bucket, K) of + case fetch_map(Conn, Bucket, K, Opts) of {ok, M} -> [rmap_to_doc(DocName, M) | Acc]; _ -> Acc end @@ -299,8 +319,8 @@ delete_map(Conn, Bucket, Key) -> riakc_pb_socket:delete(Conn, Bucket, Key). %% @private -update_map(Conn, Bucket, Key, Map) -> - riakc_pb_socket:update_type(Conn, Bucket, Key, riakc_map:to_op(Map)). +update_map(Conn, Bucket, Key, Map, Opts) -> + riakc_pb_socket:update_type(Conn, Bucket, Key, riakc_map:to_op(Map), Opts). %% @private search(Conn, Index, Query, 0, 0) -> diff --git a/test/test.config b/test/test.config index 0449b71..2b79a87 100644 --- a/test/test.config +++ b/test/test.config @@ -42,10 +42,7 @@ {sumo_test_backend_riak, sumo_backend_riak, [{host, "127.0.0.1"}, - {port, 8087}, - {bucket_type, "maps"}, - {bucket, "sumo_test"}, - {index, "sumo_test_index"}] + {port, 8087}] } ] }, @@ -73,7 +70,12 @@ {sumo_test_riak, sumo_store_riak, [{storage_backend, sumo_test_backend_riak}, - {workers, 10}] + {workers, 10}, + {bucket_type, "maps"}, + {bucket, "sumo_test"}, + {index, "sumo_test_index"}, + {w_args, [{w, 2}, {pw, 0}, {dw, 0}, {returnbody, false}]}, + {r_args, [{r, 2}, {pr, 0}]}] } ] }, From a6e06b062242db4e19218af5567045b81f0a87b2 Mon Sep 17 00:00:00 2001 From: cabol Date: Wed, 11 Mar 2015 14:48:10 -0500 Subject: [PATCH 2/4] Workarounds were added to fix the problem with the connection pool in the backend. 1st workaround was modify the 'get_connection' function in the backend to return a new connection. 2nd workaround was add 'checkin' and 'checkout' functions to simulate a simple connection pool in LIFO mode (but is not a real pool), but they still have some problems. --- src/sumo_backend_riak.erl | 78 ++++++++++++++++++-- src/sumo_store_riak.erl | 150 ++++++++++++++++++++++---------------- test/test.config | 3 +- 3 files changed, 160 insertions(+), 71 deletions(-) diff --git a/src/sumo_backend_riak.erl b/src/sumo_backend_riak.erl index f5d7bef..e43e62e 100644 --- a/src/sumo_backend_riak.erl +++ b/src/sumo_backend_riak.erl @@ -29,7 +29,7 @@ %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Public API. --export([get_connection/1]). +-export([get_connection/1, checkin_conn/2, checkout_conn/1]). %%% Exports for sumo_backend -export([start_link/2]). @@ -47,7 +47,13 @@ %% Types. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --record(state, {conn :: pid()}). +-type r_host() :: iolist() | string(). +-type r_port() :: non_neg_integer(). +-type r_opts() :: [term()]. +-type r_pool() :: [pid()]. + +-record(state, {conn_args :: {r_host(), r_port(), r_opts()}, + conn_pool = [] :: r_pool()}). -type state() :: #state{}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -62,6 +68,14 @@ start_link(Name, Options) -> get_connection(Name) -> gen_server:call(Name, get_connection). +-spec checkin_conn(atom() | pid(), pid()) -> atom(). +checkin_conn(Name, Conn) -> + gen_server:call(Name, {checkin_conn, Conn}). + +-spec checkout_conn(atom() | pid()) -> atom(). +checkout_conn(Name) -> + gen_server:call(Name, checkout_conn). + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% gen_server stuff. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -71,15 +85,63 @@ init(Options) -> %% Get connection parameters Host = proplists:get_value(host, Options, "127.0.0.1"), Port = proplists:get_value(port, Options, 8087), + PoolSize = proplists:get_value(poolsize, Options, 10), Opts = riak_opts(Options), - %% Place Riak connection - {ok, Conn} = riakc_pb_socket:start_link(Host, Port, Opts), + %% Create Riak connection pool + F = fun(_E, Acc) -> + {ok, Conn} = riakc_pb_socket:start_link(Host, Port, Opts), + [Conn | Acc] + end, + ConnPool = lists:foldl(F, [], lists:seq(1, PoolSize)), %% Initial state - {ok, #state{conn = Conn}}. - + {ok, #state{conn_args = {Host, Port, Opts}, conn_pool = ConnPool}}. + +%% @todo: These are workarounds, a real connection pool needs to be added. +%% Workaround 1: when the store calls 'get_connection', a new Riak connection +%% is returned - one to one model (between connection and store). +%% Workaround 2: a simple list (LIFO) was added to hold a set of connections +%% that are created in the 'init' function (see code above). When the store +%% needs a connection, must call 'checkout_conn' to get the Pid, and when +%% it finish, must call 'checkin_conn'. These operations have some problems, +%% for instance, the don't consider an overflow, so the amount of new +%% connection are not watched, so in case of high concurrency, exist the +%% risk of have so many connection hitting the DB and causing contention. -spec handle_call(term(), term(), state()) -> {reply, term(), state()}. -handle_call(get_connection, _From, State = #state{conn = Conn}) -> - {reply, Conn, State}. +handle_call(get_connection, + _From, + State = #state{conn_args = {Host, Port, Opts}}) -> + {ok, Conn} = riakc_pb_socket:start_link(Host, Port, Opts), + {reply, Conn, State}; +handle_call({checkin_conn, Conn}, + _From, + State = #state{conn_args = {Host, Port, Opts}, + conn_pool = ConnPool}) -> + NewConn = case is_process_alive(Conn) of + true -> + Conn; + false -> + {ok, Conn0} = riakc_pb_socket:start_link(Host, Port, Opts), + Conn0 + end, + {reply, ok, State#state{conn_pool = [NewConn | ConnPool]}}; +handle_call(checkout_conn, + _From, + State = #state{conn_args = {Host, Port, Opts}, conn_pool = []}) -> + {ok, Conn} = riakc_pb_socket:start_link(Host, Port, Opts), + {reply, Conn, State#state{conn_pool = [Conn]}}; +handle_call(checkout_conn, + _From, + State = #state{conn_args = {Host, Port, Opts}, + conn_pool = ConnPool}) -> + [Conn | _T] = ConnPool, + NewConn = case is_process_alive(Conn) of + true -> + Conn; + false -> + {ok, Conn0} = riakc_pb_socket:start_link(Host, Port, Opts), + Conn0 + end, + {reply, NewConn, State#state{conn_pool = (ConnPool -- [Conn])}}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Unused Callbacks diff --git a/src/sumo_store_riak.erl b/src/sumo_store_riak.erl index aee89c3..910c7a6 100644 --- a/src/sumo_store_riak.erl +++ b/src/sumo_store_riak.erl @@ -54,9 +54,9 @@ -type r_param() :: r | pr | notfound_ok. -type w_param() :: w | pw | dw | returnbody. --record(state, {conn :: pid(), +-record(state, {backend, conn :: pid(), bucket :: binary(), - index :: binary(), + index :: binary(), r_args :: [{r_param(), integer() | (true | false)}], w_args :: [{w_param(), integer() | (true | false)}]}). -type state() :: #state{}. @@ -78,7 +78,7 @@ init(Opts) -> Index = iolist_to_binary(proplists:get_value(index, Opts, <<"sumo_idx">>)), R = proplists:get_value(r_args, Opts, []), W = proplists:get_value(w_args, Opts, []), - State = #state{conn = Conn, + State = #state{backend = Backend, conn = Conn, bucket = {BucketType, Bucket}, index = Index, r_args = R, @@ -88,13 +88,18 @@ init(Opts) -> -spec persist( sumo_internal:doc(), state() ) -> sumo_store:result(sumo_internal:doc(), state()). -persist(Doc, #state{conn = Conn, bucket = Bucket, w_args = W} = State) -> - {Id, NewDoc} = new_doc(Doc, State), - case update_map(Conn, Bucket, Id, doc_to_rmap(NewDoc), W) of - {error, Error} -> - {error, Error, State}; - _ -> - {ok, NewDoc, State} +persist(Doc, #state{backend = Backend, bucket = Bucket, w_args = W} = State) -> + Conn = sumo_backend_riak:checkout_conn(Backend), + try + {Id, NewDoc} = new_doc(Doc, Conn, State), + case update_map(Conn, Bucket, Id, doc_to_rmap(NewDoc), W) of + {error, Error} -> + {error, Error, State}; + _ -> + {ok, NewDoc, State} + end + after + ok = sumo_backend_riak:checkin_conn(Backend, Conn) end. -spec delete_by( @@ -102,50 +107,66 @@ persist(Doc, #state{conn = Conn, bucket = Bucket, w_args = W} = State) -> ) -> sumo_store:result(sumo_store:affected_rows(), state()). delete_by(DocName, Conditions, - #state{conn = Conn, bucket = Bucket, index = Index} = State) -> - IdField = sumo_internal:id_field_name(DocName), - case lists:keyfind(IdField, 1, Conditions) of - {_K, Key} -> - case delete_map(Conn, Bucket, iolist_to_binary(Key)) of - ok -> - {ok, 1, State}; - {error, Error} -> - {error, Error, State} - end; - _ -> - Query = build_query(Conditions), - case search_docs_by(DocName, Conn, Index, Query, 0, 0) of - {ok, {Total, Res}} -> - delete_docs(Conn, Bucket, Res), - {ok, Total, State}; - {error, Error} -> - {error, Error, State} - end + #state{backend = Backend, bucket = Bucket, index = Index} = State) -> + Conn = sumo_backend_riak:checkout_conn(Backend), + try + IdField = sumo_internal:id_field_name(DocName), + case lists:keyfind(IdField, 1, Conditions) of + {_K, Key} -> + case delete_map(Conn, Bucket, iolist_to_binary(Key)) of + ok -> + {ok, 1, State}; + {error, Error} -> + {error, Error, State} + end; + _ -> + Query = build_query(Conditions), + case search_docs_by(DocName, Conn, Index, Query, 0, 0) of + {ok, {Total, Res}} -> + delete_docs(Conn, Bucket, Res), + {ok, Total, State}; + {error, Error} -> + {error, Error, State} + end + end + after + ok = sumo_backend_riak:checkin_conn(Backend, Conn) end. -spec delete_all( sumo:schema_name(), state() ) -> sumo_store:result(sumo_store:affected_rows(), state()). -delete_all(_DocName, #state{conn = Conn, bucket = Bucket} = State) -> - Del = fun({C, B, Kst}, Acc) -> - lists:foreach(fun(K) -> delete_map(C, B, K) end, Kst), - Acc + length(Kst) - end, - case stream_keys(Conn, Bucket, Del, 0) of - {ok, Count} -> {ok, Count, State}; - {_, Count} -> {error, Count, State} +delete_all(_DocName, #state{backend = Backend, bucket = Bucket} = State) -> + Conn = sumo_backend_riak:checkout_conn(Backend), + try + Del = fun({C, B, Kst}, Acc) -> + lists:foreach(fun(K) -> delete_map(C, B, K) end, Kst), + Acc + length(Kst) + end, + case stream_keys(Conn, Bucket, Del, 0) of + {ok, Count} -> {ok, Count, State}; + {_, Count} -> {error, Count, State} + end + after + ok = sumo_backend_riak:checkin_conn(Backend, Conn) end. -spec find_all( sumo:schema_name(), state() ) -> sumo_store:result([sumo_internal:doc()], state()). -find_all(DocName, #state{conn = Conn, bucket = Bucket, r_args = R} = State) -> - Get = fun({C, B, Kst}, Acc) -> - fetch_map_bulk(DocName, C, B, Kst, R) ++ Acc - end, - case stream_keys(Conn, Bucket, Get, []) of - {ok, Docs} -> {ok, Docs, State}; - {_, Docs} -> {error, Docs, State} +find_all(DocName, + #state{backend = Backend, bucket = Bucket, r_args = R} = State) -> + Conn = sumo_backend_riak:checkout_conn(Backend), + try + Get = fun({C, B, Kst}, Acc) -> + fetch_map_bulk(DocName, C, B, Kst, R) ++ Acc + end, + case stream_keys(Conn, Bucket, Get, []) of + {ok, Docs} -> {ok, Docs, State}; + {_, Docs} -> {error, Docs, State} + end + after + ok = sumo_backend_riak:checkin_conn(Backend, Conn) end. -spec find_all( @@ -175,26 +196,31 @@ find_by(DocName, Conditions, Limit, Offset, - #state{conn = Conn, + #state{backend = Backend, bucket = Bucket, index = Index, r_args = R} = State) -> - IdField = sumo_internal:id_field_name(DocName), - case lists:keyfind(IdField, 1, Conditions) of - {_K, Key} -> - case fetch_map(Conn, Bucket, iolist_to_binary(Key), R) of - {ok, RMap} -> - Val = rmap_to_doc(DocName, RMap), - {ok, [Val], State}; - {error, Error} -> - {error, Error, State} - end; - _ -> - Query = build_query(Conditions), - case search_docs_by(DocName, Conn, Index, Query, Limit, Offset) of - {ok, {_, Res}} -> {ok, Res, State}; - {error, Error} -> {error, Error, State} - end + Conn = sumo_backend_riak:checkout_conn(Backend), + try + IdField = sumo_internal:id_field_name(DocName), + case lists:keyfind(IdField, 1, Conditions) of + {_K, Key} -> + case fetch_map(Conn, Bucket, iolist_to_binary(Key), R) of + {ok, RMap} -> + Val = rmap_to_doc(DocName, RMap), + {ok, [Val], State}; + {error, Error} -> + {error, Error, State} + end; + _ -> + Query = build_query(Conditions), + case search_docs_by(DocName, Conn, Index, Query, Limit, Offset) of + {ok, {_, Res}} -> {ok, Res, State}; + {error, Error} -> {error, Error, State} + end + end + after + ok = sumo_backend_riak:checkin_conn(Backend, Conn) end. -spec find_by( @@ -225,7 +251,7 @@ doc_id(Doc) -> sumo_internal:get_field(IdField, Doc). %% @private -new_doc(Doc, #state{conn = Conn, bucket = Bucket, w_args = W}) -> +new_doc(Doc, Conn, #state{bucket = Bucket, w_args = W}) -> DocName = sumo_internal:doc_name(Doc), IdField = sumo_internal:id_field_name(DocName), Id = case sumo_internal:get_field(IdField, Doc) of diff --git a/test/test.config b/test/test.config index 2b79a87..932bdfd 100644 --- a/test/test.config +++ b/test/test.config @@ -42,7 +42,8 @@ {sumo_test_backend_riak, sumo_backend_riak, [{host, "127.0.0.1"}, - {port, 8087}] + {port, 8087}, + {poolsize, 10}] } ] }, From 10ad7ce1b06f21475db7863b66f8dc8623ccb344 Mon Sep 17 00:00:00 2001 From: cabol Date: Thu, 12 Mar 2015 12:31:18 -0500 Subject: [PATCH 3/4] Change pool issue to the approach of workaround 1 - Change pool issue workaround to 1st approach - when the store invokes , a new Riak connection is returned - one to one model (between connection and store). --- src/sumo_backend_riak.erl | 78 +++------------- src/sumo_store_riak.erl | 182 ++++++++++++++++++-------------------- 2 files changed, 94 insertions(+), 166 deletions(-) diff --git a/src/sumo_backend_riak.erl b/src/sumo_backend_riak.erl index e43e62e..a320004 100644 --- a/src/sumo_backend_riak.erl +++ b/src/sumo_backend_riak.erl @@ -29,7 +29,7 @@ %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Public API. --export([get_connection/1, checkin_conn/2, checkout_conn/1]). +-export([get_connection/1]). %%% Exports for sumo_backend -export([start_link/2]). @@ -47,13 +47,9 @@ %% Types. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --type r_host() :: iolist() | string(). --type r_port() :: non_neg_integer(). --type r_opts() :: [term()]. --type r_pool() :: [pid()]. - --record(state, {conn_args :: {r_host(), r_port(), r_opts()}, - conn_pool = [] :: r_pool()}). +-record(state, {host :: string(), + port :: non_neg_integer(), + opts :: [term()]}). -type state() :: #state{}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -68,14 +64,6 @@ start_link(Name, Options) -> get_connection(Name) -> gen_server:call(Name, get_connection). --spec checkin_conn(atom() | pid(), pid()) -> atom(). -checkin_conn(Name, Conn) -> - gen_server:call(Name, {checkin_conn, Conn}). - --spec checkout_conn(atom() | pid()) -> atom(). -checkout_conn(Name) -> - gen_server:call(Name, checkout_conn). - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% gen_server stuff. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -85,63 +73,17 @@ init(Options) -> %% Get connection parameters Host = proplists:get_value(host, Options, "127.0.0.1"), Port = proplists:get_value(port, Options, 8087), - PoolSize = proplists:get_value(poolsize, Options, 10), Opts = riak_opts(Options), - %% Create Riak connection pool - F = fun(_E, Acc) -> - {ok, Conn} = riakc_pb_socket:start_link(Host, Port, Opts), - [Conn | Acc] - end, - ConnPool = lists:foldl(F, [], lists:seq(1, PoolSize)), - %% Initial state - {ok, #state{conn_args = {Host, Port, Opts}, conn_pool = ConnPool}}. - -%% @todo: These are workarounds, a real connection pool needs to be added. -%% Workaround 1: when the store calls 'get_connection', a new Riak connection -%% is returned - one to one model (between connection and store). -%% Workaround 2: a simple list (LIFO) was added to hold a set of connections -%% that are created in the 'init' function (see code above). When the store -%% needs a connection, must call 'checkout_conn' to get the Pid, and when -%% it finish, must call 'checkin_conn'. These operations have some problems, -%% for instance, the don't consider an overflow, so the amount of new -%% connection are not watched, so in case of high concurrency, exist the -%% risk of have so many connection hitting the DB and causing contention. + {ok, #state{host = Host, port = Port, opts = Opts}}. + +%% @todo: implement connection pool. +%% In other cases is a built-in feature of the client. -spec handle_call(term(), term(), state()) -> {reply, term(), state()}. handle_call(get_connection, _From, - State = #state{conn_args = {Host, Port, Opts}}) -> + State = #state{host = Host, port = Port, opts = Opts}) -> {ok, Conn} = riakc_pb_socket:start_link(Host, Port, Opts), - {reply, Conn, State}; -handle_call({checkin_conn, Conn}, - _From, - State = #state{conn_args = {Host, Port, Opts}, - conn_pool = ConnPool}) -> - NewConn = case is_process_alive(Conn) of - true -> - Conn; - false -> - {ok, Conn0} = riakc_pb_socket:start_link(Host, Port, Opts), - Conn0 - end, - {reply, ok, State#state{conn_pool = [NewConn | ConnPool]}}; -handle_call(checkout_conn, - _From, - State = #state{conn_args = {Host, Port, Opts}, conn_pool = []}) -> - {ok, Conn} = riakc_pb_socket:start_link(Host, Port, Opts), - {reply, Conn, State#state{conn_pool = [Conn]}}; -handle_call(checkout_conn, - _From, - State = #state{conn_args = {Host, Port, Opts}, - conn_pool = ConnPool}) -> - [Conn | _T] = ConnPool, - NewConn = case is_process_alive(Conn) of - true -> - Conn; - false -> - {ok, Conn0} = riakc_pb_socket:start_link(Host, Port, Opts), - Conn0 - end, - {reply, NewConn, State#state{conn_pool = (ConnPool -- [Conn])}}. + {reply, Conn, State}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Unused Callbacks diff --git a/src/sumo_store_riak.erl b/src/sumo_store_riak.erl index 910c7a6..c5ef61c 100644 --- a/src/sumo_store_riak.erl +++ b/src/sumo_store_riak.erl @@ -51,14 +51,21 @@ %% Types. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Riak quorum parameters. +%% @see -type r_param() :: r | pr | notfound_ok. -type w_param() :: w | pw | dw | returnbody. --record(state, {backend, conn :: pid(), - bucket :: binary(), - index :: binary(), - r_args :: [{r_param(), integer() | (true | false)}], - w_args :: [{w_param(), integer() | (true | false)}]}). +%% conn: is the Pid of the gen_server that holds the connection with Riak +%% bucket: Riak bucket (per store) +%% index: Riak index to be used by Riak Search +%% read_quorum: Riak read quorum parameters. +%% write_quorum: Riak write quorum parameters. +-record(state, {conn :: pid(), + bucket :: binary(), + index :: binary(), + read_quorum :: [{r_param(), integer() | (true | false)}], + write_quorum :: [{w_param(), integer() | (true | false)}]}). -type state() :: #state{}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -73,33 +80,32 @@ init(Opts) -> % which creates and initializes the storage backend. Backend = proplists:get_value(storage_backend, Opts), Conn = sumo_backend_riak:get_connection(Backend), - BucketType = iolist_to_binary(proplists:get_value(bucket_type, Opts)), - Bucket = iolist_to_binary(proplists:get_value(bucket, Opts, <<"sumo">>)), - Index = iolist_to_binary(proplists:get_value(index, Opts, <<"sumo_idx">>)), - R = proplists:get_value(r_args, Opts, []), - W = proplists:get_value(w_args, Opts, []), - State = #state{backend = Backend, conn = Conn, + BucketType = iolist_to_binary( + proplists:get_value(bucket_type, Opts, <<"maps">>)), + Bucket = iolist_to_binary( + proplists:get_value(bucket, Opts, <<"sumo">>)), + Index = iolist_to_binary( + proplists:get_value(index, Opts, <<"sumo_index">>)), + Rq = proplists:get_value(read_quorum, Opts, []), + Wq = proplists:get_value(write_quorum, Opts, []), + State = #state{conn = Conn, bucket = {BucketType, Bucket}, index = Index, - r_args = R, - w_args = W}, + read_quorum = Rq, + write_quorum = Wq}, {ok, State}. -spec persist( sumo_internal:doc(), state() ) -> sumo_store:result(sumo_internal:doc(), state()). -persist(Doc, #state{backend = Backend, bucket = Bucket, w_args = W} = State) -> - Conn = sumo_backend_riak:checkout_conn(Backend), - try - {Id, NewDoc} = new_doc(Doc, Conn, State), - case update_map(Conn, Bucket, Id, doc_to_rmap(NewDoc), W) of - {error, Error} -> - {error, Error, State}; - _ -> - {ok, NewDoc, State} - end - after - ok = sumo_backend_riak:checkin_conn(Backend, Conn) +persist(Doc, + #state{conn = Conn, bucket = Bucket, write_quorum = Wq} = State) -> + {Id, NewDoc} = new_doc(Doc, State), + case update_map(Conn, Bucket, Id, doc_to_rmap(NewDoc), Wq) of + {error, Error} -> + {error, Error, State}; + _ -> + {ok, NewDoc, State} end. -spec delete_by( @@ -107,66 +113,51 @@ persist(Doc, #state{backend = Backend, bucket = Bucket, w_args = W} = State) -> ) -> sumo_store:result(sumo_store:affected_rows(), state()). delete_by(DocName, Conditions, - #state{backend = Backend, bucket = Bucket, index = Index} = State) -> - Conn = sumo_backend_riak:checkout_conn(Backend), - try - IdField = sumo_internal:id_field_name(DocName), - case lists:keyfind(IdField, 1, Conditions) of - {_K, Key} -> - case delete_map(Conn, Bucket, iolist_to_binary(Key)) of - ok -> - {ok, 1, State}; - {error, Error} -> - {error, Error, State} - end; - _ -> - Query = build_query(Conditions), - case search_docs_by(DocName, Conn, Index, Query, 0, 0) of - {ok, {Total, Res}} -> - delete_docs(Conn, Bucket, Res), - {ok, Total, State}; - {error, Error} -> - {error, Error, State} - end - end - after - ok = sumo_backend_riak:checkin_conn(Backend, Conn) + #state{conn = Conn, bucket = Bucket, index = Index} = State) -> + IdField = sumo_internal:id_field_name(DocName), + case lists:keyfind(IdField, 1, Conditions) of + {_K, Key} -> + case delete_map(Conn, Bucket, iolist_to_binary(Key)) of + ok -> + {ok, 1, State}; + {error, Error} -> + {error, Error, State} + end; + _ -> + Query = build_query(Conditions), + case search_docs_by(DocName, Conn, Index, Query, 0, 0) of + {ok, {Total, Res}} -> + delete_docs(Conn, Bucket, Res), + {ok, Total, State}; + {error, Error} -> + {error, Error, State} + end end. -spec delete_all( sumo:schema_name(), state() ) -> sumo_store:result(sumo_store:affected_rows(), state()). -delete_all(_DocName, #state{backend = Backend, bucket = Bucket} = State) -> - Conn = sumo_backend_riak:checkout_conn(Backend), - try - Del = fun({C, B, Kst}, Acc) -> - lists:foreach(fun(K) -> delete_map(C, B, K) end, Kst), - Acc + length(Kst) - end, - case stream_keys(Conn, Bucket, Del, 0) of - {ok, Count} -> {ok, Count, State}; - {_, Count} -> {error, Count, State} - end - after - ok = sumo_backend_riak:checkin_conn(Backend, Conn) +delete_all(_DocName, #state{conn = Conn, bucket = Bucket} = State) -> + Del = fun({C, B, Kst}, Acc) -> + lists:foreach(fun(K) -> delete_map(C, B, K) end, Kst), + Acc + length(Kst) + end, + case stream_keys(Conn, Bucket, Del, 0) of + {ok, Count} -> {ok, Count, State}; + {_, Count} -> {error, Count, State} end. -spec find_all( sumo:schema_name(), state() ) -> sumo_store:result([sumo_internal:doc()], state()). find_all(DocName, - #state{backend = Backend, bucket = Bucket, r_args = R} = State) -> - Conn = sumo_backend_riak:checkout_conn(Backend), - try - Get = fun({C, B, Kst}, Acc) -> - fetch_map_bulk(DocName, C, B, Kst, R) ++ Acc - end, - case stream_keys(Conn, Bucket, Get, []) of - {ok, Docs} -> {ok, Docs, State}; - {_, Docs} -> {error, Docs, State} - end - after - ok = sumo_backend_riak:checkin_conn(Backend, Conn) + #state{conn = Conn, bucket = Bucket, read_quorum = Rq} = State) -> + Get = fun({C, B, Kst}, Acc) -> + fetch_map_bulk(DocName, C, B, Kst, Rq) ++ Acc + end, + case stream_keys(Conn, Bucket, Get, []) of + {ok, Docs} -> {ok, Docs, State}; + {_, Docs} -> {error, Docs, State} end. -spec find_all( @@ -196,31 +187,26 @@ find_by(DocName, Conditions, Limit, Offset, - #state{backend = Backend, + #state{conn = Conn, bucket = Bucket, index = Index, - r_args = R} = State) -> - Conn = sumo_backend_riak:checkout_conn(Backend), - try - IdField = sumo_internal:id_field_name(DocName), - case lists:keyfind(IdField, 1, Conditions) of - {_K, Key} -> - case fetch_map(Conn, Bucket, iolist_to_binary(Key), R) of - {ok, RMap} -> - Val = rmap_to_doc(DocName, RMap), - {ok, [Val], State}; - {error, Error} -> - {error, Error, State} - end; - _ -> - Query = build_query(Conditions), - case search_docs_by(DocName, Conn, Index, Query, Limit, Offset) of - {ok, {_, Res}} -> {ok, Res, State}; - {error, Error} -> {error, Error, State} - end - end - after - ok = sumo_backend_riak:checkin_conn(Backend, Conn) + read_quorum = Rq} = State) -> + IdField = sumo_internal:id_field_name(DocName), + case lists:keyfind(IdField, 1, Conditions) of + {_K, Key} -> + case fetch_map(Conn, Bucket, iolist_to_binary(Key), Rq) of + {ok, RMap} -> + Val = rmap_to_doc(DocName, RMap), + {ok, [Val], State}; + {error, Error} -> + {error, Error, State} + end; + _ -> + Query = build_query(Conditions), + case search_docs_by(DocName, Conn, Index, Query, Limit, Offset) of + {ok, {_, Res}} -> {ok, Res, State}; + {error, Error} -> {error, Error, State} + end end. -spec find_by( @@ -251,7 +237,7 @@ doc_id(Doc) -> sumo_internal:get_field(IdField, Doc). %% @private -new_doc(Doc, Conn, #state{bucket = Bucket, w_args = W}) -> +new_doc(Doc, #state{conn = Conn, bucket = Bucket, write_quorum = W}) -> DocName = sumo_internal:doc_name(Doc), IdField = sumo_internal:id_field_name(DocName), Id = case sumo_internal:get_field(IdField, Doc) of From 0a7f2139eabc0adeef2e1b209f223d0504e3cbd3 Mon Sep 17 00:00:00 2001 From: cabol Date: Thu, 12 Mar 2015 12:47:57 -0500 Subject: [PATCH 4/4] Fix variable name (W -> Wq). --- src/sumo_store_riak.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sumo_store_riak.erl b/src/sumo_store_riak.erl index c5ef61c..501271b 100644 --- a/src/sumo_store_riak.erl +++ b/src/sumo_store_riak.erl @@ -237,12 +237,12 @@ doc_id(Doc) -> sumo_internal:get_field(IdField, Doc). %% @private -new_doc(Doc, #state{conn = Conn, bucket = Bucket, write_quorum = W}) -> +new_doc(Doc, #state{conn = Conn, bucket = Bucket, write_quorum = Wq}) -> DocName = sumo_internal:doc_name(Doc), IdField = sumo_internal:id_field_name(DocName), Id = case sumo_internal:get_field(IdField, Doc) of undefined -> - case update_map(Conn, Bucket, undefined, doc_to_rmap(Doc), W) of + case update_map(Conn, Bucket, undefined, doc_to_rmap(Doc), Wq) of {ok, RiakMapId} -> RiakMapId; {error, Error} -> throw(Error); _ -> throw(unexpected)