Skip to content

Commit

Permalink
newRouters implementation, branched from FullFlow
Browse files Browse the repository at this point in the history
  • Loading branch information
galhilu authored and leondavi committed Sep 22, 2023
1 parent dde9b45 commit 298cadd
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src_erl/NerlnetApp/src/Client/clientStatem.erl
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ ack(MyName, NerlnetGraph) ->
% ?LOG_INFO("~p sending ACK ~n",[MyName]),
{RouterHost,RouterPort} = nerl_tools:getShortPath(MyName,?MAIN_SERVER_ATOM,NerlnetGraph),
%% send an ACK to mainserver that the client is ready
nerl_tools:http_request(RouterHost,RouterPort,"clientReady",MyName).
nerl_tools:http_request(RouterHost,RouterPort,"unicast",term_to_binary({?MAIN_SERVER_ATOM,{"clientReady",MyName}})).

createWorkers(ClientName, EtsRef) ->
CLIENT_WORKES_MAPS_TUPLE_IDX = 3,
Expand Down
11 changes: 7 additions & 4 deletions src_erl/NerlnetApp/src/MainServer/mainGenserver.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,18 @@ handle_cast({clientsTraining,Body}, State = #main_genserver_state{state = castin
io:format("Body:~p~n",[Body]),
{noreply, State#main_genserver_state{clientsWaitingList = ListOfClients,msgCounter = MsgCounter+1}};

handle_cast({clientsTraining, _Body}, State = #main_genserver_state{myName = MyName, clients = ListOfClients, msgCounter = MsgCounter}) ->
handle_cast({clientsTraining, _Body}, State = #main_genserver_state{myName = MyName, clients = ListOfClients,nerlnetGraph=NerlnetGraph, msgCounter = MsgCounter}) ->
%% send router http request, to rout this message to all sensors
% io:format("main server: setting all clients on training state: ~p~n",[ListOfClients]),
%% io:format("Body:~p~n",[Body]),
%% io:format("binary_to_list(Body):~p~n",[binary_to_list(Body)]),
%% io:format("Splitted-(Body):~p~n",[re:split(binary_to_list(Body), ",", [{return, list}])]),
%% TODO find the router that can send this request to Sources**
io:format("setting clients ~p to training~n",[ListOfClients]),
_SendAction = [{setClientState(clientTraining,ClientName,MyName)}|| ClientName <- ListOfClients],
[H|_]=ListOfClients, %temp, once fully implemented, sending should be to a connected router
{RouterHost,RouterPort} = nerl_tools:getShortPath(MyName,H,NerlnetGraph),
nerl_tools:http_request(RouterHost,RouterPort,"broadcast",term_to_binary({ListOfClients,{"clientTraining",MyName}})),
%_SendAction = [{setClientState(clientTraining,ClientName,MyName)}|| ClientName <- ListOfClients],
{noreply, State#main_genserver_state{clientsWaitingList = ListOfClients,msgCounter = MsgCounter+1}};

handle_cast({clientsPredict,_Body}, State = #main_genserver_state{state = casting, clients = ListOfClients,msgCounter = MsgCounter}) ->
Expand Down Expand Up @@ -224,8 +227,8 @@ handle_cast({sourceAck,Body}, State = #main_genserver_state{sourcesWaitingList =


handle_cast({clientAck,Body}, State = #main_genserver_state{clientsWaitingList = WaitingList,msgCounter = MsgCounter}) ->
NewWaitingList = WaitingList--[list_to_atom(binary_to_list(Body))],
% io:format("new Waiting List: ~p ~n",[NewWaitingList]),
NewWaitingList = WaitingList--[binary_to_term(Body)],
%io:format("new Waiting List: ~p ~n",[NewWaitingList]),
if length(NewWaitingList) == 0 -> ack();
true-> ok end,
{noreply, State#main_genserver_state{clientsWaitingList = NewWaitingList, msgCounter = MsgCounter+1}};
Expand Down
66 changes: 62 additions & 4 deletions src_erl/NerlnetApp/src/Router/routerGenserver.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
code_change/3]).

-define(SERVER, ?MODULE).
-define(UNICAST_ACTION_ATOM, unicast).


-record(router_genserver_state, {myName, nerlnetGraph,msgCounter = 0}).

-record(router_genserver_state, {myName, nerlnetGraph,msgCounter = 0,etsRef}).
%%%===================================================================
%%% API
%%%===================================================================
Expand Down Expand Up @@ -53,7 +52,11 @@ init({MyName,NerlnetGraph}) ->
?LOG_NOTICE("Router ~p is connected to: ~p~n",[MyName, [digraph:vertex(NerlnetGraph,Vertex) || Vertex <- digraph:out_neighbours(NerlnetGraph,MyName)]]),
% nerl_tools:start_connection([digraph:vertex(NerlnetGraph,Vertex) || Vertex <- digraph:out_neighbours(NerlnetGraph,MyName)]),
put(nerlnetGraph, NerlnetGraph),
{ok, #router_genserver_state{msgCounter = 1, myName = MyName }}.
EtsRef = ets:new(routint_table, [set]),
EntitiesList=digraph:vertices(NerlnetGraph),
Routing_table=nerl_tools:make_routint_table(EtsRef,EntitiesList--[?API_SERVER_ATOM,MyName],MyName,NerlnetGraph),
%io:format("--------------------------router EntitiesList: ~p~n",[EntitiesList]),
{ok, #router_genserver_state{msgCounter = 1, myName = MyName, etsRef=Routing_table }}.


%% @private
Expand Down Expand Up @@ -177,6 +180,61 @@ handle_cast({getStats,_Body}, State = #router_genserver_state{myName = MyName,
{noreply, State#router_genserver_state{msgCounter = MsgCounter+1}};


handle_cast({unicast,{Dest,Body}}, State = #router_genserver_state{msgCounter = MsgCounter,etsRef=Routing_table }) ->

[{Dest,{Name,Host,Port}}]=ets:lookup(Routing_table,Dest),
case Dest of
Name->
%the destination is the next hop, send as regular message
{Action,Data}=Body;
_->
%next hop isnt the destination, continue as geneal router message
Action=atom_to_list(?UNICAST_ACTION_ATOM),
Data={Dest,Body}
end,
nerl_tools:http_request(Host, Port,Action, term_to_binary(Data)),
{noreply, State#router_genserver_state{msgCounter = MsgCounter+1,etsRef=Routing_table }};

handle_cast({broadcast,{DestList,Body}}, State = #router_genserver_state{msgCounter = MsgCounter,etsRef=Routing_table }) ->
MapFunc=fun(Dest,Acc)->
%make a map when keys are addreses to send a message to, and values are lists of destination of the message that go throu key addres
[{Dest,{Name,Host,Port}}]=ets:lookup(Routing_table,Dest),
case maps:is_key({Name,Host,Port},Acc) of
true->
%addres alread in, append Dest to exsisting value
NewVal=maps:get({Name,Host,Port},Acc)++[Dest];
false->
%addres not in yet, create new value for it
NewVal=[Dest]
end,
maps:put({Name,Host,Port},NewVal,Acc)
end,

NextHopMap=lists:foldl(MapFunc,#{},DestList),

SendFunc=fun({Name,Host,Port},DestEntityList)->
%iterate on the maps keys (addreses) and forword message according to 1 of 3 cases
case length(DestEntityList) of
1->
[Entity]=DestEntityList,
case Entity of
Name->
{Action,Data}=Body;
_->
Action=atom_to_list(?UNICAST_ACTION_ATOM),
Data={Entity,Body}
end;
_->
%multipul destinations continue as broadcast message
Action="broadcast",
Data={DestEntityList,Body}
end,
nerl_tools:http_request(Host, Port,Action, term_to_binary(Data))
end,
maps:foreach(SendFunc,NextHopMap),
{noreply, State#router_genserver_state{msgCounter = MsgCounter+1,etsRef=Routing_table }};


handle_cast(_Request, State = #router_genserver_state{msgCounter = MsgCounter }) ->
{noreply, State#router_genserver_state{msgCounter = MsgCounter+1}}.

Expand Down
5 changes: 4 additions & 1 deletion src_erl/NerlnetApp/src/Router/routingHandler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ init(Req0, State) ->
% gen_server:cast(Router_genserver_Pid, {federatedWeights,Body});

%%%%%%%%%%%%%%GUI actions
getStats -> gen_server:cast(Router_genserver_Pid, {getStats,Body})
getStats -> gen_server:cast(Router_genserver_Pid, {getStats,Body});

unicast ->gen_server:cast(Router_genserver_Pid, {unicast,binary_to_term(Body)});
broadcast ->gen_server:cast(Router_genserver_Pid, {broadcast,binary_to_term(Body)})

end,
Reply = io_lib:format(" ", []),
Expand Down
18 changes: 17 additions & 1 deletion src_erl/NerlnetApp/src/nerl_tools.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
-export([getdeviceIP/0, port_available/1]).
-export([list_to_numeric/1]).
-export([calculate_size/1]).
-export([make_routint_table/4]).

setup_logger(Module) ->
logger:set_handler_config(default, formatter, {logger_formatter, #{}}),
Expand Down Expand Up @@ -191,4 +192,19 @@ calculate_size(List) when is_list(List) ->

%% TODO: add another timing map for NIF of each worker action

%% TODO: create create_body func for standard message passing
%% TODO: create create_body func for standard message passing

make_routint_table(Ets,[],_Origin,_NerlnetGraph)->Ets;

make_routint_table(Ets,[Entity|EntitiesList],Origin,NerlnetGraph)->
case digraph:get_short_path(NerlnetGraph,Origin,Entity) of

false ->
ok;

ShortPath ->
NextHop = lists:nth(2,ShortPath),
{Name,{Host,Port}} = digraph:vertex(NerlnetGraph,NextHop),
ets:insert(Ets,{Entity,{Name,Host,Port}})
end,
make_routint_table(Ets,EntitiesList,Origin,NerlnetGraph).
2 changes: 2 additions & 0 deletions src_erl/NerlnetApp/src/nerlnetApp_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ createRouters(MapOfRouters, HostName) ->
{"/stopCasting",routingHandler, [stopCasting,RouterGenServerPid]},
{"/federatedWeightsVector",routingHandler, [federatedWeightsVector,RouterGenServerPid]},
{"/federatedWeights",routingHandler, [federatedWeights,RouterGenServerPid]},
{"/unicast",routingHandler, [unicast,RouterGenServerPid]},
{"/broadcast",routingHandler, [broadcast,RouterGenServerPid]},

%%GUI actions
{"/getStats",routingHandler, [getStats,RouterGenServerPid]}
Expand Down

0 comments on commit 298cadd

Please sign in to comment.