From 298cadd0b7c643317d6398b710a62280c0d1125d Mon Sep 17 00:00:00 2001 From: galhilu Date: Tue, 19 Sep 2023 17:19:00 +0300 Subject: [PATCH] newRouters implementation, branched from FullFlow --- .../NerlnetApp/src/Client/clientStatem.erl | 2 +- .../src/MainServer/mainGenserver.erl | 11 ++-- .../NerlnetApp/src/Router/routerGenserver.erl | 66 +++++++++++++++++-- .../NerlnetApp/src/Router/routingHandler.erl | 5 +- src_erl/NerlnetApp/src/nerl_tools.erl | 18 ++++- src_erl/NerlnetApp/src/nerlnetApp_app.erl | 2 + 6 files changed, 93 insertions(+), 11 deletions(-) diff --git a/src_erl/NerlnetApp/src/Client/clientStatem.erl b/src_erl/NerlnetApp/src/Client/clientStatem.erl index 328776534..e5d3db737 100644 --- a/src_erl/NerlnetApp/src/Client/clientStatem.erl +++ b/src_erl/NerlnetApp/src/Client/clientStatem.erl @@ -371,7 +371,7 @@ ack(MyName, NerlnetGraph) -> % ?LOG_INFO("~p sending ACK ~n",[MyName]), {RouterHost,RouterPort} = nerl_tools:getShortPath(MyName,?MAIN_SERVER_ATOM,NerlnetGraph), %% send an ACK to mainserver that the client is ready - nerl_tools:http_request(RouterHost,RouterPort,"clientReady",MyName). + nerl_tools:http_request(RouterHost,RouterPort,"unicast",term_to_binary({?MAIN_SERVER_ATOM,{"clientReady",MyName}})). createWorkers(ClientName, EtsRef) -> CLIENT_WORKES_MAPS_TUPLE_IDX = 3, diff --git a/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl b/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl index 545acfbf3..6283adb98 100644 --- a/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl +++ b/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl @@ -127,7 +127,7 @@ handle_cast({clientsTraining,Body}, State = #main_genserver_state{state = castin io:format("Body:~p~n",[Body]), {noreply, State#main_genserver_state{clientsWaitingList = ListOfClients,msgCounter = MsgCounter+1}}; -handle_cast({clientsTraining, _Body}, State = #main_genserver_state{myName = MyName, clients = ListOfClients, msgCounter = MsgCounter}) -> +handle_cast({clientsTraining, _Body}, State = #main_genserver_state{myName = MyName, clients = ListOfClients,nerlnetGraph=NerlnetGraph, msgCounter = MsgCounter}) -> %% send router http request, to rout this message to all sensors % io:format("main server: setting all clients on training state: ~p~n",[ListOfClients]), %% io:format("Body:~p~n",[Body]), @@ -135,7 +135,10 @@ handle_cast({clientsTraining, _Body}, State = #main_genserver_state{myName = MyN %% io:format("Splitted-(Body):~p~n",[re:split(binary_to_list(Body), ",", [{return, list}])]), %% TODO find the router that can send this request to Sources** io:format("setting clients ~p to training~n",[ListOfClients]), - _SendAction = [{setClientState(clientTraining,ClientName,MyName)}|| ClientName <- ListOfClients], + [H|_]=ListOfClients, %temp, once fully implemented, sending should be to a connected router + {RouterHost,RouterPort} = nerl_tools:getShortPath(MyName,H,NerlnetGraph), + nerl_tools:http_request(RouterHost,RouterPort,"broadcast",term_to_binary({ListOfClients,{"clientTraining",MyName}})), + %_SendAction = [{setClientState(clientTraining,ClientName,MyName)}|| ClientName <- ListOfClients], {noreply, State#main_genserver_state{clientsWaitingList = ListOfClients,msgCounter = MsgCounter+1}}; handle_cast({clientsPredict,_Body}, State = #main_genserver_state{state = casting, clients = ListOfClients,msgCounter = MsgCounter}) -> @@ -224,8 +227,8 @@ handle_cast({sourceAck,Body}, State = #main_genserver_state{sourcesWaitingList = handle_cast({clientAck,Body}, State = #main_genserver_state{clientsWaitingList = WaitingList,msgCounter = MsgCounter}) -> - NewWaitingList = WaitingList--[list_to_atom(binary_to_list(Body))], - % io:format("new Waiting List: ~p ~n",[NewWaitingList]), + NewWaitingList = WaitingList--[binary_to_term(Body)], + %io:format("new Waiting List: ~p ~n",[NewWaitingList]), if length(NewWaitingList) == 0 -> ack(); true-> ok end, {noreply, State#main_genserver_state{clientsWaitingList = NewWaitingList, msgCounter = MsgCounter+1}}; diff --git a/src_erl/NerlnetApp/src/Router/routerGenserver.erl b/src_erl/NerlnetApp/src/Router/routerGenserver.erl index e1ed36b1e..ea4c74ade 100644 --- a/src_erl/NerlnetApp/src/Router/routerGenserver.erl +++ b/src_erl/NerlnetApp/src/Router/routerGenserver.erl @@ -21,10 +21,9 @@ code_change/3]). -define(SERVER, ?MODULE). +-define(UNICAST_ACTION_ATOM, unicast). - --record(router_genserver_state, {myName, nerlnetGraph,msgCounter = 0}). - +-record(router_genserver_state, {myName, nerlnetGraph,msgCounter = 0,etsRef}). %%%=================================================================== %%% API %%%=================================================================== @@ -53,7 +52,11 @@ init({MyName,NerlnetGraph}) -> ?LOG_NOTICE("Router ~p is connected to: ~p~n",[MyName, [digraph:vertex(NerlnetGraph,Vertex) || Vertex <- digraph:out_neighbours(NerlnetGraph,MyName)]]), % nerl_tools:start_connection([digraph:vertex(NerlnetGraph,Vertex) || Vertex <- digraph:out_neighbours(NerlnetGraph,MyName)]), put(nerlnetGraph, NerlnetGraph), - {ok, #router_genserver_state{msgCounter = 1, myName = MyName }}. + EtsRef = ets:new(routint_table, [set]), + EntitiesList=digraph:vertices(NerlnetGraph), + Routing_table=nerl_tools:make_routint_table(EtsRef,EntitiesList--[?API_SERVER_ATOM,MyName],MyName,NerlnetGraph), + %io:format("--------------------------router EntitiesList: ~p~n",[EntitiesList]), + {ok, #router_genserver_state{msgCounter = 1, myName = MyName, etsRef=Routing_table }}. %% @private @@ -177,6 +180,61 @@ handle_cast({getStats,_Body}, State = #router_genserver_state{myName = MyName, {noreply, State#router_genserver_state{msgCounter = MsgCounter+1}}; +handle_cast({unicast,{Dest,Body}}, State = #router_genserver_state{msgCounter = MsgCounter,etsRef=Routing_table }) -> + + [{Dest,{Name,Host,Port}}]=ets:lookup(Routing_table,Dest), + case Dest of + Name-> + %the destination is the next hop, send as regular message + {Action,Data}=Body; + _-> + %next hop isnt the destination, continue as geneal router message + Action=atom_to_list(?UNICAST_ACTION_ATOM), + Data={Dest,Body} + end, + nerl_tools:http_request(Host, Port,Action, term_to_binary(Data)), + {noreply, State#router_genserver_state{msgCounter = MsgCounter+1,etsRef=Routing_table }}; + +handle_cast({broadcast,{DestList,Body}}, State = #router_genserver_state{msgCounter = MsgCounter,etsRef=Routing_table }) -> + MapFunc=fun(Dest,Acc)-> + %make a map when keys are addreses to send a message to, and values are lists of destination of the message that go throu key addres + [{Dest,{Name,Host,Port}}]=ets:lookup(Routing_table,Dest), + case maps:is_key({Name,Host,Port},Acc) of + true-> + %addres alread in, append Dest to exsisting value + NewVal=maps:get({Name,Host,Port},Acc)++[Dest]; + false-> + %addres not in yet, create new value for it + NewVal=[Dest] + end, + maps:put({Name,Host,Port},NewVal,Acc) + end, + + NextHopMap=lists:foldl(MapFunc,#{},DestList), + + SendFunc=fun({Name,Host,Port},DestEntityList)-> + %iterate on the maps keys (addreses) and forword message according to 1 of 3 cases + case length(DestEntityList) of + 1-> + [Entity]=DestEntityList, + case Entity of + Name-> + {Action,Data}=Body; + _-> + Action=atom_to_list(?UNICAST_ACTION_ATOM), + Data={Entity,Body} + end; + _-> + %multipul destinations continue as broadcast message + Action="broadcast", + Data={DestEntityList,Body} + end, + nerl_tools:http_request(Host, Port,Action, term_to_binary(Data)) + end, + maps:foreach(SendFunc,NextHopMap), + {noreply, State#router_genserver_state{msgCounter = MsgCounter+1,etsRef=Routing_table }}; + + handle_cast(_Request, State = #router_genserver_state{msgCounter = MsgCounter }) -> {noreply, State#router_genserver_state{msgCounter = MsgCounter+1}}. diff --git a/src_erl/NerlnetApp/src/Router/routingHandler.erl b/src_erl/NerlnetApp/src/Router/routingHandler.erl index 10b96bbf3..037fae670 100644 --- a/src_erl/NerlnetApp/src/Router/routingHandler.erl +++ b/src_erl/NerlnetApp/src/Router/routingHandler.erl @@ -74,7 +74,10 @@ init(Req0, State) -> % gen_server:cast(Router_genserver_Pid, {federatedWeights,Body}); %%%%%%%%%%%%%%GUI actions - getStats -> gen_server:cast(Router_genserver_Pid, {getStats,Body}) + getStats -> gen_server:cast(Router_genserver_Pid, {getStats,Body}); + + unicast ->gen_server:cast(Router_genserver_Pid, {unicast,binary_to_term(Body)}); + broadcast ->gen_server:cast(Router_genserver_Pid, {broadcast,binary_to_term(Body)}) end, Reply = io_lib:format(" ", []), diff --git a/src_erl/NerlnetApp/src/nerl_tools.erl b/src_erl/NerlnetApp/src/nerl_tools.erl index 08da78065..16ff7cf20 100644 --- a/src_erl/NerlnetApp/src/nerl_tools.erl +++ b/src_erl/NerlnetApp/src/nerl_tools.erl @@ -9,6 +9,7 @@ -export([getdeviceIP/0, port_available/1]). -export([list_to_numeric/1]). -export([calculate_size/1]). +-export([make_routint_table/4]). setup_logger(Module) -> logger:set_handler_config(default, formatter, {logger_formatter, #{}}), @@ -191,4 +192,19 @@ calculate_size(List) when is_list(List) -> %% TODO: add another timing map for NIF of each worker action -%% TODO: create create_body func for standard message passing \ No newline at end of file +%% TODO: create create_body func for standard message passing + +make_routint_table(Ets,[],_Origin,_NerlnetGraph)->Ets; + +make_routint_table(Ets,[Entity|EntitiesList],Origin,NerlnetGraph)-> + case digraph:get_short_path(NerlnetGraph,Origin,Entity) of + + false -> + ok; + + ShortPath -> + NextHop = lists:nth(2,ShortPath), + {Name,{Host,Port}} = digraph:vertex(NerlnetGraph,NextHop), + ets:insert(Ets,{Entity,{Name,Host,Port}}) + end, + make_routint_table(Ets,EntitiesList,Origin,NerlnetGraph). \ No newline at end of file diff --git a/src_erl/NerlnetApp/src/nerlnetApp_app.erl b/src_erl/NerlnetApp/src/nerlnetApp_app.erl index fd3d04aca..7e6038fb6 100644 --- a/src_erl/NerlnetApp/src/nerlnetApp_app.erl +++ b/src_erl/NerlnetApp/src/nerlnetApp_app.erl @@ -266,6 +266,8 @@ createRouters(MapOfRouters, HostName) -> {"/stopCasting",routingHandler, [stopCasting,RouterGenServerPid]}, {"/federatedWeightsVector",routingHandler, [federatedWeightsVector,RouterGenServerPid]}, {"/federatedWeights",routingHandler, [federatedWeights,RouterGenServerPid]}, + {"/unicast",routingHandler, [unicast,RouterGenServerPid]}, + {"/broadcast",routingHandler, [broadcast,RouterGenServerPid]}, %%GUI actions {"/getStats",routingHandler, [getStats,RouterGenServerPid]}