Skip to content

Commit

Permalink
Merge remote-tracking branch 'emqx/emqx-OTP-26.2.1' into merge-emqx-O…
Browse files Browse the repository at this point in the history
…TP-26.2.2
  • Loading branch information
qzhuyan committed Mar 6, 2024
2 parents b83df13 + 26b68b6 commit 0324419
Show file tree
Hide file tree
Showing 24 changed files with 684 additions and 49 deletions.
4 changes: 2 additions & 2 deletions erts/emulator/beam/erl_bif_info.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ static char erts_system_version[] = ("Erlang/OTP " ERLANG_OTP_RELEASE
" [erts-" ERLANG_VERSION "]"
#ifndef OTP_RELEASE
#ifdef ERLANG_GIT_VERSION
" [source-" ERLANG_GIT_VERSION "]"
" [emqx-" ERLANG_GIT_VERSION "]"
#else
" [source]"
" [emqx]"
#endif
#endif
#if defined(ARCH_64)
Expand Down
95 changes: 84 additions & 11 deletions lib/kernel/src/gen_tcp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
-export([send/2, recv/2, recv/3, unrecv/2]).
-export([controlling_process/2]).
-export([fdopen/2]).
-export([ipv6_probe/0]).

-include("inet_int.hrl").
-include("file.hrl").
Expand Down Expand Up @@ -120,7 +121,8 @@
recvtclass |
recvttl |
pktoptions |
ipv6_v6only.
ipv6_v6only |
ipv6_probe.
-type connect_option() ::
{fd, Fd :: non_neg_integer()} |
inet:address_family() |
Expand All @@ -131,6 +133,7 @@
{tcp_module, module()} |
{netns, file:filename_all()} |
{bind_to_device, binary()} |
{ipv6_probe, boolean() | timeout()} |
option().
-type listen_option() ::
{fd, Fd :: non_neg_integer()} |
Expand All @@ -157,6 +160,8 @@
%% Connect a socket
%%

ipv6_probe() -> true.

-spec connect(SockAddr, Opts) -> {ok, Socket} | {error, Reason} when
SockAddr :: socket:sockaddr_in() | socket:sockaddr_in6(),
Opts :: [inet:inet_backend() | connect_option()],
Expand Down Expand Up @@ -218,21 +223,89 @@ connect(#{family := Fam} = SockAddr, Opts, Timeout)
Reason :: timeout | inet:posix().

connect(Address, Port, Opts0, Timeout) ->
case inet:gen_tcp_module(Opts0) of
%% When neither `inet` nor `inet6` is provided in Opts0,
%% and if `ipv6_probe` option is given, try to connect ipv6 first.
{TryIpv6, Ipv6T} =
case proplists:get_value(ipv6_probe, Opts0) of
true -> {true, 2000}; %% default 2 seconds
false -> {false, 0};
undefined -> {false, 0};
T -> {true, T}
end,
%% delete it to avoid interference
Opts1 = proplists:delete(ipv6_probe, Opts0),
case inet:gen_tcp_module(Opts1) of
{?MODULE, Opts} ->
Timer = inet:start_timer(Timeout),
Res = (catch connect1(Address,Port,Opts,Timer)),
_ = inet:stop_timer(Timer),
case Res of
{ok,S} -> {ok,S};
{error, einval} -> exit(badarg);
{'EXIT',Reason} -> exit(Reason);
Error -> Error
end;
connect_maybe_ipv6(Address, Port, Opts, Timeout, TryIpv6, Ipv6T);
{GenTcpMod, Opts} ->
GenTcpMod:connect(Address, Port, Opts, Timeout)
end.

connect_maybe_ipv6(Address, Port, Opts, Timeout, TryIpv6, Ipv6T) ->
case maybe_ipv6(Address, Opts, TryIpv6) of
{maybe, NewOpts} when TryIpv6 ->
try
{ok, _} = connect_0(Address, Port, NewOpts, Ipv6T)
catch
_ : _ ->
%% fallback
connect_0(Address, Port, Opts, Timeout)
end;
NewOpts ->
connect_0(Address, Port, NewOpts, Timeout)
end.

connect_0(Address, Port, Opts, Timeout) ->
Timer = inet:start_timer(Timeout),
Res = (catch connect1(Address,Port,Opts,Timer)),
_ = inet:stop_timer(Timer),
case Res of
{ok,S} -> {ok,S};
{error, einval} -> exit(badarg);
{'EXIT',Reason} -> exit(Reason);
Error -> Error
end.

maybe_ipv6({local, _}, Opts, _TryIpv6) ->
%% unapplicable to local sockets
Opts;
maybe_ipv6(Host, Opts, TryIpv6) ->
case lists:member(inet, Opts) orelse lists:member(inet6, Opts) of
true ->
Opts; %% caller has made the decision
false when is_tuple(Host) ->
%% ip tuple provided
maybe_ipv6_1(Host, Opts);
false when TryIpv6 ->
%% string host
maybe_ipv6_2(Host, Opts);
false ->
Opts
end.

maybe_ipv6_1(Ip, Opts) when tuple_size(Ip) =:= 4 -> Opts;
maybe_ipv6_1(Ip, Opts) when tuple_size(Ip) =:= 8 -> [inet6 | Opts].

maybe_ipv6_2(Host, Opts) ->
case inet:parse_address(Host) of
{ok, Ip} when is_tuple(Ip) ->
%% ip string provided, parsed into tuple
maybe_ipv6_1(Ip, Opts);
_ ->
maybe_ipv6_3(Host, Opts)
end.

maybe_ipv6_3(Host, Opts) ->
case inet:getaddr(Host, inet6) of
{ok, _} ->
%% the target has a resolvable v6 IP
%% maybe try to connect
{maybe, [inet6 | Opts]};
_ ->
%% the target has no resolvable v6 IP
Opts
end.

connect1(Address, Port, Opts0, Timer) ->
{Mod, Opts} = inet:tcp_module(Opts0, Address),
case Mod:getaddrs(Address, Timer) of
Expand Down
1 change: 1 addition & 0 deletions lib/mnesia/src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ MODULES= \
mnesia_ext_sup \
mnesia_frag \
mnesia_frag_hash \
mnesia_hook \
mnesia_index \
mnesia_kernel_sup \
mnesia_late_loader \
Expand Down
1 change: 1 addition & 0 deletions lib/mnesia/src/mnesia.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mnesia_ext_sup,
mnesia_frag,
mnesia_frag_hash,
mnesia_hook,
mnesia_index,
mnesia_kernel_sup,
mnesia_late_loader,
Expand Down
15 changes: 10 additions & 5 deletions lib/mnesia/src/mnesia.erl
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
read_table_property/2, write_table_property/2, delete_table_property/2,
change_table_frag/2,
clear_table/1, clear_table/4,
match_delete/2,

%% Table load
dump_tables/1, wait_for_tables/2, force_load_table/1,
Expand Down Expand Up @@ -2812,21 +2813,25 @@ change_table_copy_type(T, N, S) ->

-spec clear_table(Tab::table()) -> t_result('ok').
clear_table(Tab) ->
match_delete(Tab, '_').

-spec match_delete(Tab::table(), ets:match_pattern()) -> t_result('ok').
match_delete(Tab, Pattern) ->
case get(mnesia_activity_state) of
State = {Mod, Tid, _Ts} when element(1, Tid) =/= tid ->
transaction(State, fun() -> do_clear_table(Tab) end, [], infinity, Mod, sync);
transaction(State, fun() -> do_clear_table(Tab, Pattern) end, [], infinity, Mod, sync);
undefined ->
transaction(undefined, fun() -> do_clear_table(Tab) end, [], infinity, ?DEFAULT_ACCESS, sync);
transaction(undefined, fun() -> do_clear_table(Tab, Pattern) end, [], infinity, ?DEFAULT_ACCESS, sync);
_ -> %% Not allowed for clear_table
mnesia:abort({aborted, nested_transaction})
end.

do_clear_table(Tab) ->
do_clear_table(Tab, Pattern) ->
case get(mnesia_activity_state) of
{?DEFAULT_ACCESS, Tid, Ts} ->
clear_table(Tid, Ts, Tab, '_');
clear_table(Tid, Ts, Tab, Pattern);
{Mod, Tid, Ts} ->
Mod:clear_table(Tid, Ts, Tab, '_');
Mod:clear_table(Tid, Ts, Tab, Pattern);
_ ->
abort(no_transaction)
end.
Expand Down
9 changes: 6 additions & 3 deletions lib/mnesia/src/mnesia_checkpoint.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
tm_prepare/1,
tm_retain/4,
tm_retain/5,
tm_retain/6,
tm_enter_pending/1,
tm_enter_pending/3,
tm_exit_pending/1
Expand Down Expand Up @@ -148,7 +149,6 @@ enter_still_pending([Tid | Tids], Tab) ->
enter_still_pending([], _Tab) ->
ok.


%% Looks up checkpoints for functions in mnesia_tm.
tm_retain(Tid, Tab, Key, Op) ->
case val({Tab, commit_work}) of
Expand All @@ -157,11 +157,14 @@ tm_retain(Tid, Tab, Key, Op) ->
_ ->
undefined
end.

tm_retain(Tid, Tab, Key, Op, Checkpoints) ->
tm_retain(Tid, Tab, Key, Op, Checkpoints, '_').

tm_retain(Tid, Tab, Key, Op, Checkpoints, Obj) ->
case Op of
clear_table ->
OldRecs = mnesia_lib:db_match_object(Tab, '_'),
OldRecs = mnesia_lib:db_match_object(Tab, Obj),
send_group_retain(OldRecs, Checkpoints, Tid, Tab, []),
OldRecs;
_ ->
Expand Down
8 changes: 6 additions & 2 deletions lib/mnesia/src/mnesia_dumper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,12 @@ dets_insert(Op,Tab,Key,Val, Storage0) ->
dets_updated(Tab,Key),
mnesia_lib:db_match_erase(Storage, Tab, Val);
clear_table ->
dets_cleared(Tab),
ok = mnesia_lib:db_match_erase(Storage, Tab, '_')
%% Val is a match_delete pattern
case Val of
'_' -> dets_cleared(Tab);
_ -> dets_updated(Tab, Val)
end,
ok = mnesia_lib:db_match_erase(Storage, Tab, Val)
end.

dets_updated(Tab,Key) ->
Expand Down
105 changes: 105 additions & 0 deletions lib/mnesia/src/mnesia_hook.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1996-2021. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%

-module(mnesia_hook).

-include("mnesia.hrl").

-export([
register_hook/2,
unregister_hook/1,
do_post_commit/2
]).

-define(hook(NAME), {mnesia_hook, NAME}).

-type post_commit_hook_data() ::
#{ node => node()
, ram_copies => list()
, disc_copies => list()
, disc_only_copies => list()
, ext => list()
, schema_ops => list()
}.

-type post_commit_hook() :: fun((_Tid, post_commit_hook_data()) -> ok).

-spec register_hook(post_commit, post_commit_hook()) -> ok | {error, term()}.
register_hook(post_commit, Hook) when is_function(Hook, 2) ->
persistent_term:put(?hook(post_commit), Hook);
register_hook(_, _) ->
{error, bad_type}.

-spec unregister_hook(post_commit) -> boolean() | {error, term()}.
unregister_hook(post_commit) ->
persistent_term:erase(?hook(post_commit));
unregister_hook(_) ->
{error, bad_type}.

-spec do_post_commit(_Tid, #commit{}) -> ok.
do_post_commit(Tid, Commit) ->
case persistent_term:get(?hook(post_commit), undefined) of
undefined ->
ok;
Fun ->
#commit{ node = Node
, ram_copies = Ram
, disc_copies = Disc
, disc_only_copies = DiscOnly
, ext = Ext
, schema_ops = SchemaOps
} = Commit,
CommitData = #{ node => Node
, ram_copies => Ram
, disc_copies => Disc
, disc_only_copies => DiscOnly
, ext => Ext
, schema_ops => SchemaOps
},
try Fun(Tid, CommitData)
catch EC:Err:St ->
CommitTabs = commit_tabs(Ram, Disc, DiscOnly, Ext),
mnesia_lib:dbg_out("Mnesia post_commit hook failed: ~p:~p~nStacktrace:~p~nCommit tables:~p~n",
[EC, Err, stack_without_args(St), CommitTabs])
end,
ok
end.

%% May be helpful for debugging
commit_tabs(Ram, Disc, DiscOnly, Ext) ->
Acc = tabs_from_ops(Ram, []),
Acc1 = tabs_from_ops(Disc, Acc),
Acc2 = tabs_from_ops(DiscOnly, Acc1),
lists:uniq(tabs_from_ops(Ext, Acc2)).

tabs_from_ops([{{Tab, _K}, _Val, _Op} | T], Acc) ->
tabs_from_ops(T, [Tab | Acc]);
tabs_from_ops([_ | T], Acc) ->
tabs_from_ops(T, Acc);
tabs_from_ops([], Acc) ->
Acc.

%% Args may contain sensitive data
stack_without_args([{M, F, Args, Info} | T]) when is_list(Args) ->
[{M, F, length(Args), Info} | stack_without_args(T)];
stack_without_args([StItem | T] ) ->
[StItem | stack_without_args(T)];
stack_without_args([]) ->
[].
25 changes: 23 additions & 2 deletions lib/mnesia/src/mnesia_loader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,19 @@ do_get_network_copy(Tab, _Reason, _Ns, unknown, _Cs) ->
verbose("Local table copy of ~tp has recently been deleted, ignored.~n", [Tab]),
{not_loaded, storage_unknown};
do_get_network_copy(Tab, Reason, Ns, Storage, Cs) ->
[Node | Tail] = Ns,
[Node | Tail] =
case ?catch_val(copy_from_node) of
undefined -> Ns;
CPNode when is_atom(CPNode) ->
case lists:member(CPNode, Ns) of
true ->
[CPNode | Ns -- [CPNode]];
false ->
Ns
end;
_ ->
Ns
end,
case lists:member(Node,val({current, db_nodes})) of
true ->
dbg_out("Getting table ~tp (~p) from node ~p: ~tp~n",
Expand Down Expand Up @@ -917,7 +929,16 @@ get_chunk_func(Pid, Tab, {ext, Alias, Mod}, RemoteS) ->
get_chunk_func(Pid, Tab, Storage, RemoteS) ->
try
TabSize = mnesia:table_info(Tab, size),
KeysPerTransfer = calc_nokeys(Storage, Tab),
KeysPerTransfer =
case ?catch_val(send_table_batch_size) of
{'EXIT', _} ->
mnesia_lib:set(send_table_batch_size, 0),
calc_nokeys(Storage, Tab);
0 ->
calc_nokeys(Storage, Tab);
Val when is_integer(Val) ->
Val
end,
ChunkData = dets:info(Tab, bchunk_format),
UseDetsChunk =
Storage == RemoteS andalso
Expand Down
Loading

0 comments on commit 0324419

Please sign in to comment.