Skip to content

Commit

Permalink
Merge pull request #17 from esl/otp27-docs
Browse files Browse the repository at this point in the history
Convert docs to OTP27 style, with backwards compatibility
  • Loading branch information
JanuszJakubiec authored Sep 17, 2024
2 parents 4102b01 + ddb3f34 commit 67e3de7
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 89 deletions.
5 changes: 4 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
{erl_opts, []}.
{erl_opts,
[warn_missing_doc, warn_missing_spec, warn_unused_import,
warn_export_vars, verbose, report, debug_info
]}.

{deps, [
{telemetry, "1.3.0"}
Expand Down
201 changes: 121 additions & 80 deletions src/segmented_cache.erl
Original file line number Diff line number Diff line change
@@ -1,146 +1,187 @@
%%%-------------------------------------------------------------------
%% @doc `segmented_cache' is a key/value pairs cache library implemented in rotating segments.
%%
%% For more information, see the README, and the function documentation.
%% @end
%%%-------------------------------------------------------------------
-module(segmented_cache).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC("""
`segmented_cache` is a key/value pairs cache library implemented in rotating segments.
For more information, see the README, and the function documentation.
""").

%% API
-export([start/1, start/2]).
-export([start_link/1, start_link/2]).
-export([is_member/2, get_entry/2, put_entry/3, merge_entry/3, delete_entry/2, delete_pattern/2]).

?DOC("Telemetry metadata with cache hit information.").
-type hit() :: #{name => name(), hit => boolean()}.

?DOC("Telemetry metadata with deletion error information.").
-type delete_error(Key) :: #{name => atom(),
value => Key,
delete_type => entry | pattern,
class => throw | error | exit,
reason => term()}.

?DOC("`m:pg` scope for cache coordination across distribution.").
-type scope() :: atom().
?DOC("Cache unique name.").
-type name() :: atom().
?DOC("Strategy for cache eviction.").
-type strategy() :: fifo | lru.
?DOC("Dynamic type of _keys_ from cache clients.").
-type key() :: term().
?DOC("Dynamic type of _values_ from cache clients.").
-type value() :: term().
?DOC("Merging function to use for resolving conflicts").
-type merger_fun(Value) :: fun((Value, Value) -> Value).
?DOC("Configuration values for the cache.").
-type opts() :: #{scope => scope(),
strategy => strategy(),
segment_num => non_neg_integer(),
ttl => timeout() | {erlang:time_unit(), non_neg_integer()},
merger_fun => merger_fun(term())}.

-export_type([scope/0, name/0, key/0, value/0, strategy/0, merger_fun/1, opts/0]).
-export_type([scope/0, name/0, key/0, value/0, hit/0, delete_error/1,
strategy/0, merger_fun/1, opts/0]).

%%====================================================================
%% API
%%====================================================================

%% @see start_link/2
?DOC("See `start_link/2` for more details").
-spec start(name()) -> gen_server:start_ret().
start(Name) when is_atom(Name) ->
start(Name, #{}).

%% @see start_link/2
?DOC("See `start_link/2` for more details").
-spec start(name(), opts()) -> gen_server:start_ret().
start(Name, Opts) when is_atom(Name), is_map(Opts) ->
segmented_cache_server:start(Name, Opts).

%% @see start_link/2
?DOC("See `start_link/2` for more details").
-spec start_link(name()) -> gen_server:start_ret().
start_link(Name) when is_atom(Name) ->
start_link(Name, #{}).

%% @doc Start and link a cache entity in the local node
%%
%% `Name' must be an atom. Then the cache will be identified by the pair `{segmented_cache, Name}',
%% and an entry in persistent_term will be created and the worker will join a pg group of
%% the same name.
%% `Opts' is a map containing the configuration.
%% `scope' is a `pg' scope. Defaults to `pg'.
%% `strategy' can be fifo or lru. Default is `fifo'.
%% `segment_num' is the number of segments for the cache. Default is `3'
%% `ttl' is the live, in minutes, of _each_ segment. Default is `480', i.e., 8 hours.
%% `merger_fun' is a function that, given a conflict, takes in order the old and new values and
%% applies a merging strategy. See the `merger_fun(term())' type.
?DOC("""
Start and link a cache entity in the local node.
`Name` must be an atom. Then the cache will be identified by the pair `{segmented_cache, Name}`,
and an entry in persistent_term will be created and the worker will join a pg group of
the same name.
`Opts` is a map containing the configuration.
- `scope` is a `pg` scope. Defaults to `pg`.
- `strategy` can be fifo or lru. Default is `fifo`.
- `segment_num` is the number of segments for the cache. Default is `3`
- `ttl` is the live, in minutes, of _each_ segment. Default is `480`, i.e., 8 hours.
- `merger_fun` is a function that, given a conflict,
takes in order the old and new values and applies a merging strategy.
See the `t:merger_fun/1` type.
""").
-spec start_link(name(), opts()) -> gen_server:start_ret().
start_link(Name, Opts) when is_atom(Name), is_map(Opts) ->
segmented_cache_server:start_link(Name, Opts).

%% @doc Check if Key is cached
%%
%% Raises telemetry span
%% name: [segmented_cache, Name, request, _]
%% start metadata: #{name => atom()}
%% stop metadata: #{name => atom(), hit => boolean()}
?DOC("""
Check if Key is cached.
Raises a telemetry span:
- name: `[segmented_cache, Name, request, _]`
- start metadata: `#{name => atom()}`
- stop metadata: `t:hit/0`
""").
-spec is_member(name(), key()) -> boolean().
is_member(Name, Key) when is_atom(Name) ->
Span = segmented_cache_helpers:is_member_span(Name, Key),
telemetry:span([segmented_cache, Name, request], #{name => Name, type => is_member}, Span).

%% @doc Get the entry for Key in cache
%%
%% Raises telemetry span
%% name: [segmented_cache, Name, request, _]
%% start metadata: #{name => atom()}
%% stop metadata: #{name => atom(), hit => boolean()}
?DOC("""
Get the entry for Key in cache.
Raises telemetry span:
- name: `[segmented_cache, Name, request, _]`
- start metadata: `#{name => atom()}`
- stop metadata: `t:hit/0`
""").
-spec get_entry(name(), key()) -> value() | not_found.
get_entry(Name, Key) when is_atom(Name) ->
Span = segmented_cache_helpers:get_entry_span(Name, Key),
telemetry:span([segmented_cache, Name, request], #{name => Name, type => get_entry}, Span).

%% @doc Add an entry to the first table in the segments.
%%
%% Possible race conditions:
%% <li> Two writers: another process might attempt to put a record at the same time. It this case,
%% both writers will attempt `ets:insert_new', resulting in only one of them succeeding.
%% The one that fails, will retry three times a `compare_and_swap', attempting to merge the
%% values and ensuring no data is lost.</li>
%% <li> One worker and the cleaner: there's a chance that by the time we insert in the ets table,
%% this table is not the first anymore because the cleaner has taken action and pushed it
%% behind.</li>
%% <li> Two writers and the cleaner: a mix of the previous, it can happen that two writers can
%% attempt to put a record at the same time, but exactly in-between, the cleaner rotates the
%% tables, resulting in the first writter inserting in the table that immediately becomes the
%% second, and the latter writter inserting in the recently treated as first, shadowing the
%% previous.</li>
%%
%% To treat the data race with the cleaner, after a successful insert, we re-check the index,
%% and if it has changed, we restart the whole operation again: we can be sure that no more
%% rotations will be triggered in a while, so the second round will be final.
%%
%% Strategy considerations: under a fifo strategy, no other writes can happen, but under a lru
%% strategy, many other workers might attemp to move a record forward. In this case, the
%% forwarding movement doesn't modify the record, and therefore the `compare_and_swap'
%% operation should succeed at once; then, once the record is in the front, all other workers
%% shouldn't be attempting to move it.
?DOC("""
Add an entry to the first table in the segments.
### Possible race conditions:
- Two writers: another process might attempt to put a record at the same time. It this case,
both writers will attempt `ets:insert_new`, resulting in only one of them succeeding.
The one that fails, will retry three times a `compare_and_swap`, attempting to merge the
values and ensuring no data is lost.
- One worker and the cleaner: there's a chance that by the time we insert in the ets table,
this table is not the first anymore because the cleaner has taken action and pushed it
behind.
- Two writers and the cleaner: a mix of the previous, it can happen that two writers can
attempt to put a record at the same time, but exactly in-between, the cleaner rotates the
tables, resulting in the first writter inserting in the table that immediately becomes the
second, and the latter writter inserting in the recently treated as first, shadowing the
previous.
To treat the data race with the cleaner, after a successful insert,
we re-check the index, and if it has changed, we restart the whole operation again:
we can be sure that no more rotations will be triggered in a while,
so the second round will be final.
### Strategy considerations:
Under a fifo strategy, no other writes can happen, but under a lru strategy,
many other workers might attemp to move a record forward. In this case,
the forwarding movement doesn't modify the record, and therefore the `compare_and_swap`
operation should succeed at once; then, once the record is in the front,
all other workers shouldn't be attempting to move it.
""").
-spec put_entry(name(), key(), value()) -> boolean().
put_entry(Name, Key, Value) when is_atom(Name) ->
segmented_cache_helpers:put_entry_front(Name, Key, Value).

%% @doc Merge a new entry into an existing one, or add it at the front if none is found.
%%
%% Race conditions considerations:
%% <li> Two writers: `compare_and_swap' will ensure they both succeed sequentially</li>
%% <li> Any writers and the cleaner: under fifo, the writer modifies the record in place
%% and doesn't need to be concerned with rotation. Under lru, the same considerations
%% than for a `put_entry_front' apply.</li>
?DOC("""
Merge a new entry into an existing one, or add it at the front if none is found.
Race conditions considerations:
- Two writers: `compare_and_swap` will ensure they both succeed sequentially
- Any writers and the cleaner: under fifo, the writer modifies the record in place
and doesn't need to be concerned with rotation. Under lru, the same considerations
than for a `put_entry_front` apply.
""").
-spec merge_entry(name(), key(), value()) -> boolean().
merge_entry(Name, Key, Value) when is_atom(Name) ->
segmented_cache_helpers:merge_entry(Name, Key, Value).

%% @doc Delete an entry in all ets segments
%%
%% Might raise a telemetry error if the request fails:
%% name: [segmented_cache, Name, delete_error]
%% measurements: #{}
%% metadata: #{name => atom(), delete_type => entry, value => Key,
%% class => throw | error | exit, reason => term()}
?DOC("""
Delete an entry in all ets segments.
Might raise a telemetry error if the request fails:
- name: `[segmented_cache, Name, delete_error]`
- measurements: `#{}`
- metadata: `t:delete_error/1`
""").
-spec delete_entry(name(), key()) -> true.
delete_entry(Name, Key) when is_atom(Name) ->
segmented_cache_server:request_delete_entry(Name, Key),
segmented_cache_helpers:delete_entry(Name, Key).

%% @doc Delete a pattern in all ets segments
%%
%% Might raise a telemetry error if the request fails:
%% name: [segmented_cache, Name, delete_error]
%% measurements: #{}
%% metadata: #{name => atom(), delete_type => pattern, value => Pattern,
%% class => throw | error | exit, reason => term()}
?DOC("""
Delete a pattern in all ets segments.
Might raise a telemetry error if the request fails:
- name: `[segmented_cache, Name, delete_error]`
- measurements: `#{}`
- metadata: `t:delete_error/1`
""").
-spec delete_pattern(name(), ets:match_pattern()) -> true.
delete_pattern(Name, Pattern) when is_atom(Name) ->
segmented_cache_server:request_delete_pattern(Name, Pattern),
Expand Down
2 changes: 1 addition & 1 deletion src/segmented_cache_callbacks.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%% @private
-module(segmented_cache_callbacks).
-moduledoc false.

-export([is_member_ets_fun/2, get_entry_ets_fun/2,
delete_entry_fun/2, delete_pattern_fun/2,
Expand Down
3 changes: 2 additions & 1 deletion src/segmented_cache_helpers.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%% @private
-module(segmented_cache_helpers).
-moduledoc false.

-define(APP_KEY, segmented_cache).

Expand Down Expand Up @@ -238,6 +238,7 @@ compare_and_swap(Attempts, EtsSegment, Key, Value, MergerFun) ->
%% Note that we must first empty the last table, and then rotate the index. If it was done
%% in the opposite order, there's a chance a worker can insert an entry at the front just
%% before the table is purged.
-spec purge_last_segment_and_rotate(segmented_cache:name()) -> non_neg_integer().
purge_last_segment_and_rotate(Name) ->
SegmentRecord = get_cache_config(Name),
Index = atomics:get(SegmentRecord#segmented_cache.index, 1),
Expand Down
13 changes: 7 additions & 6 deletions src/segmented_cache_server.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%% @private
-module(segmented_cache_server).
-moduledoc false.

-behaviour(gen_server).

Expand All @@ -10,11 +10,11 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).

-type request_content() :: term().

-record(cache_state, {scope :: segmented_cache:scope(),
name :: segmented_cache:name(),
ttl :: timeout(),
timer_ref :: undefined | reference()}).
-type state() :: #cache_state{}.

%%====================================================================
%% API
Expand All @@ -40,7 +40,7 @@ request_delete_pattern(Name, Pattern) ->
%% gen_server callbacks
%%====================================================================

-spec init({segmented_cache:name(), segmented_cache:opts()}) -> {ok, #cache_state{}}.
-spec init({segmented_cache:name(), segmented_cache:opts()}) -> {ok, state()}.
init({Name, Opts}) ->
#{scope := Scope, ttl := TTL} = segmented_cache_helpers:init_cache_config(Name, Opts),
pg:join(Scope, Name, self()),
Expand All @@ -52,11 +52,11 @@ init({Name, Opts}) ->
{ok, #cache_state{scope = Scope, name = Name, ttl = TTL, timer_ref = TimerRef}}
end.

-spec handle_call(any(), gen_server:from(), #cache_state{}) -> {reply, ok, #cache_state{}}.
-spec handle_call(any(), gen_server:from(), state()) -> {reply, ok, state()}.
handle_call(_Msg, _From, State) ->
{reply, ok, State}.

-spec handle_cast(term(), #cache_state{}) -> {noreply, #cache_state{}}.
-spec handle_cast(term(), state()) -> {noreply, state()}.
handle_cast({delete_entry, Key}, #cache_state{name = Name} = State) ->
segmented_cache_helpers:delete_entry(Name, Key),
{noreply, State};
Expand All @@ -66,7 +66,7 @@ handle_cast({delete_pattern, Pattern}, #cache_state{name = Name} = State) ->
handle_cast(_Msg, State) ->
{noreply, State}.

-spec handle_info(any(), #cache_state{}) -> {noreply, #cache_state{}}.
-spec handle_info(any(), state()) -> {noreply, state()}.
handle_info(purge, #cache_state{name = Name, ttl = TTL} = State) ->
segmented_cache_helpers:purge_last_segment_and_rotate(Name),
case TTL of
Expand All @@ -76,6 +76,7 @@ handle_info(purge, #cache_state{name = Name, ttl = TTL} = State) ->
handle_info(_Msg, State) ->
{noreply, State}.

-spec terminate(normal | shutdown | {shutdown, term()} | term(), state()) -> term().
terminate(_Reason, #cache_state{name = Name, timer_ref = TimerRef}) ->
segmented_cache_helpers:erase_cache_config(Name),
case TimerRef of
Expand Down

0 comments on commit 67e3de7

Please sign in to comment.