Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New routers full flow #257

Merged
merged 3 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src_erl/NerlnetApp/src/Client/clientStatem.erl
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ ack(MyName, NerlnetGraph) ->
% ?LOG_INFO("~p sending ACK ~n",[MyName]),
{RouterHost,RouterPort} = nerl_tools:getShortPath(MyName,?MAIN_SERVER_ATOM,NerlnetGraph),
%% send an ACK to mainserver that the client is ready
nerl_tools:http_request(RouterHost,RouterPort,"clientReady",MyName).
nerl_tools:http_request(RouterHost,RouterPort,"unicast",term_to_binary({?MAIN_SERVER_ATOM,{"clientReady",MyName}})).

createWorkers(ClientName, EtsRef) ->
CLIENT_WORKES_MAPS_TUPLE_IDX = 3,
Expand Down
11 changes: 7 additions & 4 deletions src_erl/NerlnetApp/src/MainServer/mainGenserver.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,18 @@ handle_cast({clientsTraining,Body}, State = #main_genserver_state{state = castin
io:format("Body:~p~n",[Body]),
{noreply, State#main_genserver_state{clientsWaitingList = ListOfClients,msgCounter = MsgCounter+1}};

handle_cast({clientsTraining, _Body}, State = #main_genserver_state{myName = MyName, clients = ListOfClients, msgCounter = MsgCounter}) ->
handle_cast({clientsTraining, _Body}, State = #main_genserver_state{myName = MyName, clients = ListOfClients,nerlnetGraph=NerlnetGraph, msgCounter = MsgCounter}) ->
%% send router http request, to rout this message to all sensors
% io:format("main server: setting all clients on training state: ~p~n",[ListOfClients]),
%% io:format("Body:~p~n",[Body]),
%% io:format("binary_to_list(Body):~p~n",[binary_to_list(Body)]),
%% io:format("Splitted-(Body):~p~n",[re:split(binary_to_list(Body), ",", [{return, list}])]),
%% TODO find the router that can send this request to Sources**
io:format("setting clients ~p to training~n",[ListOfClients]),
_SendAction = [{setClientState(clientTraining,ClientName,MyName)}|| ClientName <- ListOfClients],
[H|_]=ListOfClients, %temp, once fully implemented, sending should be to a connected router
{RouterHost,RouterPort} = nerl_tools:getShortPath(MyName,H,NerlnetGraph),
nerl_tools:http_request(RouterHost,RouterPort,"broadcast",term_to_binary({ListOfClients,{"clientTraining",MyName}})),
%_SendAction = [{setClientState(clientTraining,ClientName,MyName)}|| ClientName <- ListOfClients],
{noreply, State#main_genserver_state{clientsWaitingList = ListOfClients,msgCounter = MsgCounter+1}};

handle_cast({clientsPredict,_Body}, State = #main_genserver_state{state = casting, clients = ListOfClients,msgCounter = MsgCounter}) ->
Expand Down Expand Up @@ -224,8 +227,8 @@ handle_cast({sourceAck,Body}, State = #main_genserver_state{sourcesWaitingList =


handle_cast({clientAck,Body}, State = #main_genserver_state{clientsWaitingList = WaitingList,msgCounter = MsgCounter}) ->
NewWaitingList = WaitingList--[list_to_atom(binary_to_list(Body))],
% io:format("new Waiting List: ~p ~n",[NewWaitingList]),
NewWaitingList = WaitingList--[binary_to_term(Body)],
%io:format("new Waiting List: ~p ~n",[NewWaitingList]),
if length(NewWaitingList) == 0 -> ack();
true-> ok end,
{noreply, State#main_genserver_state{clientsWaitingList = NewWaitingList, msgCounter = MsgCounter+1}};
Expand Down
66 changes: 62 additions & 4 deletions src_erl/NerlnetApp/src/Router/routerGenserver.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
code_change/3]).

-define(SERVER, ?MODULE).
-define(UNICAST_ACTION_ATOM, unicast).


-record(router_genserver_state, {myName, nerlnetGraph,msgCounter = 0}).

-record(router_genserver_state, {myName, nerlnetGraph,msgCounter = 0,etsRef}).
%%%===================================================================
%%% API
%%%===================================================================
Expand Down Expand Up @@ -53,7 +52,11 @@ init({MyName,NerlnetGraph}) ->
?LOG_NOTICE("Router ~p is connected to: ~p~n",[MyName, [digraph:vertex(NerlnetGraph,Vertex) || Vertex <- digraph:out_neighbours(NerlnetGraph,MyName)]]),
% nerl_tools:start_connection([digraph:vertex(NerlnetGraph,Vertex) || Vertex <- digraph:out_neighbours(NerlnetGraph,MyName)]),
put(nerlnetGraph, NerlnetGraph),
{ok, #router_genserver_state{msgCounter = 1, myName = MyName }}.
RoutingTableEtsRef = ets:new(routing_table, [set]),
EntitiesList=digraph:vertices(NerlnetGraph),
nerl_tools:make_routing_table(RoutingTableEtsRef,EntitiesList--[?API_SERVER_ATOM,MyName],MyName,NerlnetGraph),
%io:format("--------------------------router EntitiesList: ~p~n",[EntitiesList]),
{ok, #router_genserver_state{msgCounter = 1, myName = MyName, etsRef=RoutingTableEtsRef}}.


%% @private
Expand Down Expand Up @@ -177,6 +180,61 @@ handle_cast({getStats,_Body}, State = #router_genserver_state{myName = MyName,
{noreply, State#router_genserver_state{msgCounter = MsgCounter+1}};


handle_cast({unicast,{Dest,Body}}, State = #router_genserver_state{msgCounter = MsgCounter,etsRef=Routing_table }) ->

[{Dest,{Name,Host,Port}}]=ets:lookup(Routing_table,Dest),
case Dest of
Name->
%the destination is the next hop, send as regular message
{Action,Data}=Body;
_->
%next hop isnt the destination, continue as geneal router message
Action=atom_to_list(?UNICAST_ACTION_ATOM),
Data={Dest,Body}
end,
nerl_tools:http_request(Host, Port,Action, term_to_binary(Data)),
{noreply, State#router_genserver_state{msgCounter = MsgCounter+1,etsRef=Routing_table }};

handle_cast({broadcast,{DestList,Body}}, State = #router_genserver_state{msgCounter = MsgCounter,etsRef=Routing_table }) ->
MapFunc=fun(Dest,Acc)->
%make a map when keys are addreses to send a message to, and values are lists of destination of the message that go throu key addres
[{Dest,{Name,Host,Port}}]=ets:lookup(Routing_table,Dest),
case maps:is_key({Name,Host,Port},Acc) of
true->
%addres alread in, append Dest to exsisting value
NewVal=maps:get({Name,Host,Port},Acc)++[Dest];
false->
%addres not in yet, create new value for it
NewVal=[Dest]
end,
maps:put({Name,Host,Port},NewVal,Acc)
end,

NextHopMap=lists:foldl(MapFunc,#{},DestList),

SendFunc=fun({Name,Host,Port},DestEntityList)->
%iterate on the maps keys (addreses) and forword message according to 1 of 3 cases
case length(DestEntityList) of
1->
[Entity]=DestEntityList,
case Entity of
Name->
{Action,Data}=Body;
_->
Action=atom_to_list(?UNICAST_ACTION_ATOM),
Data={Entity,Body}
end;
_->
%multipul destinations continue as broadcast message
Action="broadcast",
Data={DestEntityList,Body}
end,
nerl_tools:http_request(Host, Port,Action, term_to_binary(Data))
end,
maps:foreach(SendFunc,NextHopMap),
{noreply, State#router_genserver_state{msgCounter = MsgCounter+1,etsRef=Routing_table }};


handle_cast(_Request, State = #router_genserver_state{msgCounter = MsgCounter }) ->
{noreply, State#router_genserver_state{msgCounter = MsgCounter+1}}.

Expand Down
5 changes: 4 additions & 1 deletion src_erl/NerlnetApp/src/Router/routingHandler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ init(Req0, State) ->
% gen_server:cast(Router_genserver_Pid, {federatedWeights,Body});

%%%%%%%%%%%%%%GUI actions
getStats -> gen_server:cast(Router_genserver_Pid, {getStats,Body})
getStats -> gen_server:cast(Router_genserver_Pid, {getStats,Body});

unicast ->gen_server:cast(Router_genserver_Pid, {unicast,binary_to_term(Body)});
broadcast ->gen_server:cast(Router_genserver_Pid, {broadcast,binary_to_term(Body)})

end,
Reply = io_lib:format(" ", []),
Expand Down
14 changes: 13 additions & 1 deletion src_erl/NerlnetApp/src/nerl_tools.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
-export([getdeviceIP/0, port_available/1]).
-export([list_to_numeric/1]).
-export([calculate_size/1]).
-export([make_routing_table/4]).

setup_logger(Module) ->
logger:set_handler_config(default, formatter, {logger_formatter, #{}}),
Expand Down Expand Up @@ -191,4 +192,15 @@ calculate_size(List) when is_list(List) ->

%% TODO: add another timing map for NIF of each worker action

%% TODO: create create_body func for standard message passing
%% TODO: create create_body func for standard message passing

make_routing_table(Ets,EntitiesList,Origin,NerlnetGraph)->
GenerateTablesFunc = fun(Entity) ->
case digraph:get_short_path(NerlnetGraph,Origin,Entity) of
false -> ok;
ShortPath -> NextHop = lists:nth(2,ShortPath),
{Name,{Host,Port}} = digraph:vertex(NerlnetGraph,NextHop),
ets:insert(Ets,{Entity,{Name,Host,Port}})
end % case end
end, % fun end
lists:foreach(GenerateTablesFunc, EntitiesList).
31 changes: 24 additions & 7 deletions src_erl/NerlnetApp/src/nerlnetApp_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ waitForInit() ->
end.

createNerlnetInitiator(HostName) ->
Port = ?NERLNET_INIT_PORT,
PortAvailable = nerl_tools:port_available(Port),
DefaultPort = ?NERLNET_INIT_PORT,
PortAvailable = nerl_tools:port_available(DefaultPort),
if
PortAvailable ->
NerlnetInitiatorDispatch = cowboy_router:compile([
Expand All @@ -124,10 +124,10 @@ createNerlnetInitiator(HostName) ->
]),
%% cowboy:start_clear(Name, TransOpts, ProtoOpts) - an http_listener
%% An ok tuple is returned on success. It contains the pid of the top-level supervisor for the listener.
init_cowboy_start_clear(nerlnetInitiator, {HostName,Port},NerlnetInitiatorDispatch);
true -> ?LOG_NOTICE("Nerlnet uses port ~p and it has to be unused before running Nerlnet server!", [Port]),
?LOG_NOTICE("Find the process that uses port ~p using the command: lsof -i:~p",[Port, Port]),
?LOG_ERROR("Port ~p is being used - can not start (definition NERLNET_INIT_PORT in nerl_tools.hrl)", [Port])
init_cowboy_start_clear(nerlnetInitiator, {HostName,DefaultPort},NerlnetInitiatorDispatch);
true -> ?LOG_NOTICE("Nerlnet uses port ~p and it has to be unused before running Nerlnet server!", [DefaultPort]),
?LOG_NOTICE("Find the process that uses port ~p using the command: lsof -i:~p",[DefaultPort, DefaultPort]),
?LOG_ERROR("Port ~p is being used - can not start (definition NERLNET_INIT_PORT in nerl_tools.hrl)", [DefaultPort])
end.


Expand Down Expand Up @@ -168,6 +168,16 @@ parseJsonAndStartNerlnet(HostName) ->
createMainServer(HostOfMainServer,BatchSize,HostName).

%% internal functions
port_validator(Port, EntityName) ->
PortAvailable = nerl_tools:port_available(Port),
if PortAvailable ->
ok;
true -> ?LOG_ERROR("Nerlnet entity: ~p uses port ~p and it must be free", [EntityName, Port]),
?LOG_ERROR("You can take the following steps:",[]),
?LOG_ERROR("1. Change port in DC file to free port",[]),
?LOG_ERROR("2. Find the process that uses port ~p using the command: lsof -i:~p and terminate it (Risky approach)",[Port, Port]),
erlang:error("Port ~p is being used - cannot start")
end.

createClientsAndWorkers() ->
ClientsAndWorkers = ets:lookup_element(nerlnet_data, hostClients, ?DATA_IDX), % Each element is {Name,{Port,ClientWorkers,ClientWorkersMaps}}
Expand All @@ -177,7 +187,8 @@ createClientsAndWorkers() ->
NerlnetGraph = ets:lookup_element(nerlnet_data, communicationGraph, ?DATA_IDX),

Func =
fun({Client,{Port,_ClientWorkers,_ClientWorkersMaps, WorkerToClientMap}}) ->
fun({Client,{Port,_ClientWorkers,_ClientWorkersMaps, WorkerToClientMap}}) ->
port_validator(Port, Client),
ClientStatemArgs = {Client, NerlnetGraph, WorkerToClientMap},
ClientStatemPid = clientStatem:start_link(ClientStatemArgs),
%%Nerl Client
Expand Down Expand Up @@ -209,6 +220,7 @@ createSources(BatchSize, DefaultFrequency, HostName) ->
Func =
fun(SourceName,{SourcePort,SourceMethod, CustomFrequency}) ->
% SourceStatemArgs = {SourceName, WorkersMap, NerlnetGraph, SourceMethod, BatchSize}, %%TODO make this a list of Sources
port_validator(SourcePort, SourceName),
SourceStatemArgs =
case CustomFrequency of
none -> {SourceName, WorkersMap, NerlnetGraph, SourceMethod, BatchSize, DefaultFrequency};
Expand Down Expand Up @@ -243,6 +255,7 @@ createRouters(MapOfRouters, HostName) ->
%% all http requests will be handled by Cowboy which updates router_genserver if necessary.
%% connectivity map will be as follow:
%% name_atom of machine => {Host,Port} OR an atom router_name, indicating there is no direct http connection, and should pass request via router_name
port_validator(Port, RouterName),
RouterGenServerArgs= {RouterName, NerlnetGraph}, %%TODO make this a list of Routers
RouterGenServerPid = routerGenserver:start_link(RouterGenServerArgs),

Expand All @@ -266,6 +279,8 @@ createRouters(MapOfRouters, HostName) ->
{"/stopCasting",routingHandler, [stopCasting,RouterGenServerPid]},
{"/federatedWeightsVector",routingHandler, [federatedWeightsVector,RouterGenServerPid]},
{"/federatedWeights",routingHandler, [federatedWeights,RouterGenServerPid]},
{"/unicast",routingHandler, [unicast,RouterGenServerPid]},
{"/broadcast",routingHandler, [broadcast,RouterGenServerPid]},

%%GUI actions
{"/getStats",routingHandler, [getStats,RouterGenServerPid]}
Expand All @@ -281,6 +296,8 @@ createMainServer(false,_BatchSize,_HostName) -> none;
createMainServer(true,BatchSize,HostName) ->
Name = mainServer,
{Port, _Args} = ets:lookup_element(nerlnet_data, mainServer, ?DATA_IDX),
port_validator(Port, Name),

Clients = ets:lookup_element(nerlnet_data, clients, ?DATA_IDX), % format of maps: {ClientName => {Workers, Port}, ClientsMap}
ClientsNames = maps:keys(Clients),
WorkersMap = ets:lookup_element(nerlnet_data, workers, ?DATA_IDX),
Expand Down