Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[API-SERVER] New DB inherritance approach #280

Merged
merged 99 commits into from
Mar 10, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
99 commits
Select commit Hold shift + click to select a range
96f823f
[API-SERVER] New DB inherritance approach
leondavi Jan 16, 2024
959df0a
[DB] Add DB implementation
leondavi Jan 17, 2024
d3fe059
[APIServer] WIP DB Concept
leondavi Jan 17, 2024
acbaf7a
[API Server] Pass tests
leondavi Jan 17, 2024
9624c7e
Comments on formats
GuyPerets106 Jan 23, 2024
9f2398a
[ApiServerDB] WIP decodeHttpMainServer
ohad123 Jan 23, 2024
4f004fc
[ApiServerDB] Changed worker seperators in stats
GuyPerets106 Jan 24, 2024
5c19127
[Api Server] change NerlComDB functions
ohad123 Jan 24, 2024
05854e6
[ApiServerDB] Fixed stats request issues
GuyPerets106 Jan 24, 2024
1a7568b
[ApiServerDB] WIP NerlcomDB - stats from experiment
ohad123 Jan 25, 2024
9a6fcfb
[ApiServerDB] Changes to Routing IMPORTANT
GuyPerets106 Jan 25, 2024
b3ad95b
Merge branch 'ApiServerDB' of github.com:leondavi/NErlNet into ApiSer…
GuyPerets106 Jan 25, 2024
a873c97
[ApiServerDB] Changed Encoded String Format
GuyPerets106 Jan 25, 2024
799ea7b
[ApiServerDB] fix decoderHttpMainServer
ohad123 Jan 25, 2024
eb08392
[ApiServer] add comments to decoder
ohad123 Jan 25, 2024
2ecec82
[ApiServerDB] Doubled client name bug fixed
GuyPerets106 Jan 25, 2024
d6be794
[ApiServerDB] WIP fix NerlComDb
ohad123 Jan 25, 2024
e4d5b30
[ApiServerDB] fix comment - use defintions for separators
ohad123 Jan 26, 2024
c4f6712
[ApiServerDB] Removed prints
GuyPerets106 Jan 30, 2024
0271514
Added Example Notebook
GuyPerets106 Jan 30, 2024
a2b684f
[ApiServerDB] CommStats changes , Tasks in comments
GuyPerets106 Jan 31, 2024
984777c
[ApiServerDB] WIP
NoaShapira8 Feb 2, 2024
18b3e27
[ApiServerDB] Parsing trainRes completed
GuyPerets106 Feb 5, 2024
dd71d09
[ApiServer] WIP rebuild of experiment flow
ohad123 Feb 8, 2024
0903058
Merge branch 'master' of https://github.com/leondavi/NErlNet into Api…
ohad123 Feb 8, 2024
adcec42
[Api Server] add new json for experiment flow
ohad123 Feb 11, 2024
1174e8f
[ApiServerDB] WIP experiment flow
ohad123 Feb 11, 2024
ab220ef
[ApiServer]WIP
NoaShapira8 Feb 16, 2024
5ee34a6
[ApiServer] WIP finish experiment json parser
ohad123 Feb 16, 2024
09b53d0
[ApiServer] WIP fix parse_experiment_flow_json
ohad123 Feb 16, 2024
36b56a0
[ApiServerDB] WIP
ohad123 Feb 16, 2024
2252b78
[ApiServerDB] WIP
leondavi Feb 16, 2024
f3e7553
[ApiServerDB] WIP: run_current_experiment_phase
NoaShapira8 Feb 16, 2024
e14489c
[ApiServerDB] WIP
leondavi Feb 16, 2024
a7a4278
[API_SERVER] Add terminate action and new send jsons action
leondavi Feb 17, 2024
fb898a4
[ApiServerDB] Ack Updates
GuyPerets106 Feb 17, 2024
6571a9f
[ApiServer] WIP
ohad123 Feb 18, 2024
00033fa
[ApiServerDB] WIP
ohad123 Feb 18, 2024
6bce283
[ApiServerDB]WIP
ohad123 Feb 18, 2024
ef0c6b5
[ApiServerDB] WIP
ohad123 Feb 18, 2024
29a063e
[ApiServerDB] JsonReceivedAck
GuyPerets106 Feb 18, 2024
9e0f78b
[ApiServerDB] WIP
ohad123 Feb 18, 2024
f0b8e55
[ApiServerDB] WIP
ohad123 Feb 20, 2024
389da10
[ApiServerDB] WIP
ohad123 Feb 22, 2024
5b4c966
[ApiServerDB] WIP
ohad123 Feb 22, 2024
b68b109
[ApiServerDB] Updated predictRes Erlang Side
GuyPerets106 Feb 22, 2024
1fbb24f
[ApiServerDB] WIP
ohad123 Feb 22, 2024
ad46793
[APiServerDB] WIP
ohad123 Feb 22, 2024
af29fa2
[ApiServerDB] WIP
ohad123 Feb 22, 2024
dcf90c5
[ApiServerDB] WIP
ohad123 Feb 22, 2024
10eec93
[ApiServerDB] Fixed predictRes erlang-side
GuyPerets106 Feb 23, 2024
2ca8950
[ApiServerDB] WIP
ohad123 Feb 24, 2024
567b4d1
[ApiServerDB] WIP
ohad123 Feb 24, 2024
767823e
[ApiServerDB] Added Batch Timestamp
GuyPerets106 Feb 24, 2024
e3daca7
[ApiServerDB] Removed Deprecated "NumOfSamples"
GuyPerets106 Feb 24, 2024
dbc6b11
[ApiServerDB] Added Batch Timestamp to train phase
GuyPerets106 Feb 24, 2024
36d03b7
[ApiServerDB] WIP
ohad123 Feb 24, 2024
428c85c
[ApiServerDB] WIP
ohad123 Feb 24, 2024
2f8a120
[ApiServerDB] Added tensor format to trainRes
GuyPerets106 Feb 24, 2024
bcf6611
Merge branch 'ApiServerDB' of github.com:leondavi/NErlNet into ApiSer…
GuyPerets106 Feb 24, 2024
1d2d545
[ApiServerDB] Removed print
GuyPerets106 Feb 24, 2024
3575408
[ApiServerDB[WIP
ohad123 Feb 24, 2024
6256e0f
[ApiServerDB] WIP
ohad123 Feb 24, 2024
7ea6274
[ApiServerDB] WIP
ohad123 Feb 25, 2024
dc92323
[ApiServerDB] WIP
ohad123 Feb 25, 2024
5f96b99
[ApiServerDB]WIP
ohad123 Feb 25, 2024
81864af
[ApiServerDB] WIP
NoaShapira8 Feb 27, 2024
8eab3ed
[ApiServerDB] Build data frame for loss by ts
NoaShapira8 Feb 28, 2024
0010b96
[ApiServerDB] add func get_mean_loss_list
NoaShapira8 Feb 28, 2024
7dc342e
[ApiServerDB] WIP
NoaShapira8 Feb 29, 2024
652913c
[ApiServerDB] WIP
NoaShapira8 Feb 29, 2024
876e7d9
[ApiServerDB] WIP
ohad123 Feb 29, 2024
1ac898e
[ApiServerDB] WIP
ohad123 Feb 29, 2024
ebca014
[ApiServerDB]WIP
ohad123 Mar 2, 2024
cde6f76
[ApiServerDB] Nerlnet Restart Erlang Side
GuyPerets106 Mar 3, 2024
d415fce
Merge branch 'ApiServerDB' of github.com:leondavi/NErlNet into ApiSer…
GuyPerets106 Mar 3, 2024
0900145
[ApiServerDB] add confusion matrix
ohad123 Mar 4, 2024
a1a1982
Merge branch 'ApiServerDB' of github.com:leondavi/NErlNet into ApiSer…
ohad123 Mar 4, 2024
85477f4
[ApiServerDB] WIP
ohad123 Mar 4, 2024
b7724c1
[ApiServerDB] NerlnetGraph visualization in apiServer
GuyPerets106 Mar 5, 2024
9786692
[ApiServerDB] Nerlnet Graph visualization completed
GuyPerets106 Mar 5, 2024
7b6ecc3
[ApiServerDB] Finish get_confusion_matrices
NoaShapira8 Mar 6, 2024
3d8d32b
[ApiServerDB] finish get_model_performence_stats
NoaShapira8 Mar 6, 2024
a6aa35f
[ApiServerDB] new experiment test
NoaShapira8 Mar 6, 2024
487cf22
Merge branch 'master' of github.com:leondavi/NErlNet into ApiServerDB
leondavi Mar 7, 2024
7c6a4a2
tmp
leondavi Mar 7, 2024
d0d6ac5
[CI] New ApiServer DP interface
leondavi Mar 7, 2024
88bbcee
[CI] Issue with experiment
leondavi Mar 7, 2024
8747dde
[ApiServerDB] fix get_min_loss to be OrderDict
NoaShapira8 Mar 7, 2024
1a268ce
[ApiServerDB] fix assert issue
NoaShapira8 Mar 7, 2024
423a045
[CI] Issue with assert 1K batches train and predict
leondavi Mar 7, 2024
8460d29
[ApiServerDB] Fix assert issue
NoaShapira8 Mar 10, 2024
4d6d78f
[CI] Baseline generate
leondavi Mar 10, 2024
e87eb91
[ApiServerDB] get communication stats
NoaShapira8 Mar 10, 2024
3780f20
[CI] Issue with confusion matrix test
leondavi Mar 10, 2024
7c354ba
Merge branch 'ApiServerDB' of github.com:leondavi/NErlNet into ApiSer…
leondavi Mar 10, 2024
57cfdb7
Add comm stats to test - TODO missed batches
leondavi Mar 10, 2024
8f56b3e
[ApiServerDB] WIP stats
NoaShapira8 Mar 10, 2024
41ece72
[ApiServerDB] flow_test
NoaShapira8 Mar 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions src_erl/NerlnetApp/src/Client/clientStatem.erl
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ init({MyName,NerlnetGraph, ClientWorkers , WorkerShaMap , WorkerToClientMap , Sh
clientWorkersFunctions:create_workers(MyName , EtsRef , ShaToModelArgsMap , EtsStats),
%% send pre_idle signal to workers
WorkersNames = clientWorkersFunctions:get_workers_names(EtsRef),
[gen_statem:cast(clientWorkersFunctions:get_worker_pid(EtsRef , WorkerName), {pre_idle}) || WorkerName <- clientWorkersFunctions:get_workers_names(EtsRef)],
[gen_statem:cast(clientWorkersFunctions:get_worker_pid(EtsRef , WorkerName), {pre_idle}) || WorkerName <- WorkersNames],

% update dictionary
WorkersEts = ets:lookup_element(EtsRef , workers_ets , ?DATA_IDX),
Expand Down Expand Up @@ -133,11 +133,11 @@ waitforWorkers(cast, In = {NewState}, State = #client_statem_state{myName = _MyN
cast_message_to_workers(EtsRef, {NewState}), %% This function increments the number of sent messages in stats ets
{next_state, waitforWorkers, State#client_statem_state{nextState = NewState, waitforWorkers = Workers}};

waitforWorkers(cast, EventContent, State = #client_statem_state{myName = _MyName}) ->
waitforWorkers(cast, EventContent, State = #client_statem_state{myName = MyName}) ->
ClientStatsEts = get(client_stats_ets),
stats:increment_messages_received(ClientStatsEts),
stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(EventContent)),
?LOG_WARNING("client ~p waitforWorkers ignored!!!: ~p ~n",[myName, EventContent]),
?LOG_WARNING("client ~p waitforWorkers ignored!!!: ~p ~n",[MyName, EventContent]),
{next_state, waitforWorkers, State}.


Expand Down Expand Up @@ -167,17 +167,19 @@ idle(cast, _In = {statistics}, State = #client_statem_state{ myName = MyName, et
EtsStats = get(ets_stats),
ClientStatsEts = get(client_stats_ets),
ClientStatsEncStr = stats:encode_ets_to_http_bin_str(ClientStatsEts),
ClientStatsToSend = atom_to_list(MyName) ++ ?API_SERVER_WITHIN_ENTITY_SEPERATOR ++ ClientStatsEncStr ++ ?API_SERVER_ENTITY_SEPERATOR,
%ClientStatsToSend = atom_to_list(MyName) ++ ?API_SERVER_WITHIN_ENTITY_SEPERATOR ++ ClientStatsEncStr ++ ?API_SERVER_ENTITY_SEPERATOR,
stats:increment_messages_received(ClientStatsEts),
ListStatsEts = ets:tab2list(EtsStats) -- [{MyName , ClientStatsEts}],
ListStatsEts = ets:tab2list(EtsStats) -- [{MyName , ClientStatsEts}],
WorkersStatsEncStr = create_encoded_stats_str(ListStatsEts),
StatsBody = {MyName , ClientStatsToSend ++ WorkersStatsEncStr},
DataToSend = ClientStatsEncStr ++ WorkersStatsEncStr,
io:format("DataToSend: ~p~n",[DataToSend]),
StatsBody = {MyName , DataToSend},
{RouterHost,RouterPort} = ets:lookup_element(EtsRef, my_router, ?DATA_IDX),
nerl_tools:http_router_request(RouterHost, RouterPort, [?MAIN_SERVER_ATOM], atom_to_list(statistics), StatsBody),
stats:increment_messages_sent(ClientStatsEts),
{next_state, idle, State};

idle(cast, In = {training}, State = #client_statem_state{myName = MyName, etsRef = EtsRef}) ->
idle(cast, In = {training}, State = #client_statem_state{myName = _MyName, etsRef = EtsRef}) ->
ClientStatsEts = get(client_stats_ets),
stats:increment_messages_received(ClientStatsEts),
stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(In)), MessageToCast = {training},
Expand Down Expand Up @@ -408,7 +410,8 @@ cast_message_to_workers(EtsRef, Msg) ->

create_encoded_stats_str(ListStatsEts) ->
Func = fun({WorkerName , StatsEts}) ->
WorkerEncStatsStr = stats:encode_ets_to_http_bin_str(StatsEts),
WorkerName ++ ?API_SERVER_WITHIN_ENTITY_SEPERATOR ++ WorkerEncStatsStr ++ ?API_SERVER_ENTITY_SEPERATOR
WorkerEncStatsStr = stats:encode_workers_ets_to_http_bin_str(StatsEts),
%% w1&bytes_sent:6.0:float#bad_messages:0:int....|
atom_to_list(WorkerName) ++ ?WORKER_SEPERATOR ++ WorkerEncStatsStr
end,
lists:flatten(lists:map(Func , ListStatsEts)).
32 changes: 18 additions & 14 deletions src_erl/NerlnetApp/src/MainServer/mainGenserver.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ init({MyName,ClientsNames,BatchSize,WorkersMap,NerlnetGraph}) ->
EtsStats = ets:new(stats , [set]),
put(etsStats, EtsStats), %% All entities including mainServer ets tables statistics
Entities = [digraph:vertex(NerlnetGraph,Vertex) || Vertex <- digraph:vertices(NerlnetGraph)--[?API_SERVER_ATOM]],
generate_stats_ets_tables(Entities),
ets:insert(MainServerEts , {entities_names_list , Entities -- [?MAIN_SERVER_ATOM]}),
EntitiesNames = [Name || {Name, _CommTuple} <- Entities],
generate_stats_ets_tables(EntitiesNames),
ets:insert(MainServerEts , {entities_names_list , EntitiesNames -- [?MAIN_SERVER_ATOM]}),
ets:insert(MainServerEts , {batch_size , BatchSize}),
ets:insert(MainServerEts , {workers_map , WorkersMap}),
ets:insert(MainServerEts , {clients_names_list , ClientsNames}),
Expand Down Expand Up @@ -153,11 +154,12 @@ handle_cast({statistics,Body}, State = #main_genserver_state{myName = MyName}) -
true ->
%% TODO - Guy here you should get the the encoded statistics from entities and decode it use it the function you should implement
%% statistics arrived from Entity
{From, StatsEtsEncStr} = Body,
EntityName = binary_to_atom(From), %TODO Guy
set_entity_stats_ets_str(EntityName, StatsEtsEncStr),
{From, StatsEtsEncStr} = binary_to_term(Body),
%% EntityName = binary_to_atom(From), %TODO Guy / NO NEED
set_entity_stats_ets_str(From, StatsEtsEncStr),
% TODO increase counter_received_stats ets by 1
ets:update_counter(StatsEts, counter_received_stats, 1),
ets:update_counter(get(main_server_ets), counter_received_stats, 1),
stats:increment_messages_received(StatsEts),
% [From|[NewCounter]] = re:split(binary_to_list(Body), ":", [{return, list}]),

% NewStatisticsMap = maps:put(From,NewCounter,StatisticsMap),
Expand All @@ -166,17 +168,17 @@ handle_cast({statistics,Body}, State = #main_genserver_state{myName = MyName}) -
ReceivedCounterStatsValue = ets:lookup_element(get(main_server_ets), counter_received_stats, ?DATA_IDX),
EntitiesNamesList = ets:lookup_element(get(main_server_ets), entities_names_list, ?DATA_IDX),
TotalNumOfEntities = length(EntitiesNamesList), % without MainServer!

if ReceivedCounterStatsValue == TotalNumOfEntities -> %% got stats from all entities
Func = fun(Entity) ->
EntityStatsEncStr = get_entity_stats_ets_str(EntityName),
Entity ++ ?API_SERVER_WITHIN_ENTITY_SEPERATOR ++ EntityStatsEncStr ++ ?API_SERVER_ENTITY_SEPERATOR
EntityStatsEncStr = get_entity_stats_ets_str(Entity),
atom_to_list(Entity) ++ ?API_SERVER_WITHIN_ENTITY_SEPERATOR ++ EntityStatsEncStr ++ ?API_SERVER_ENTITY_SEPERATOR
end,
MainServerEncStatsEts = stats:encode_ets_to_http_bin_str(get_entity_stats_ets_str(?MAIN_SERVER_ATOM)),
MainServerStr = atom_to_list(?MAIN_SERVER_ATOM) ++ ?API_SERVER_WITHIN_ENTITY_SEPERATOR ++ MainServerEncStatsEts ++ ?API_SERVER_ENTITY_SEPERATOR,
StatsToSend = lists:flatten([Func(Entity) || Entity <- EntitiesNamesList] ++ MainServerStr), % add main server to the list
{RouterHost,RouterPort} = ets:lookup_element(get(main_server_ets), my_router, ?DATA_IDX),
ActionStr = atom_to_list(statistics),
io:format("STATS TO SEND TO API SERVER: ~p~n",[StatsToSend]),
nerl_tools:http_router_request(RouterHost,RouterPort, [?API_SERVER_ATOM], ActionStr, list_to_binary(StatsToSend)), % update the source with its data
ets:update_element(StatsEts, counter_received_stats, {?STATS_KEYVAL_VAL_IDX, 0});

Expand All @@ -194,6 +196,7 @@ handle_cast({sourceDone,Body}, State = #main_genserver_state{myName = MyName, so
UpdatedSourcesCastingList = SourcesCastingList--[SourceName],
case UpdatedSourcesCastingList of
[] -> % the list is empty - all sources were done casting their batches
?LOG_NOTICE("[MAIN-Server] All sources finished casting"),
PhaseAtom = clientIdle,
update_clients_phase(PhaseAtom, MyName),
ListOfClients = ets:lookup_element(get(main_server_ets), clients_names_list, ?DATA_IDX),
Expand Down Expand Up @@ -349,20 +352,21 @@ update_clients_phase(PhaseAtom, MessageBody) when is_atom(PhaseAtom) ->
% Sends requests for statisics from all entities excludes main server
statistics_requests_to_entities() ->
ListOfEntities = ets:lookup_element(get(main_server_ets), entities_names_list, ?DATA_IDX),
% remove mainServer from the list
%DestinationsList = lists:map(fun({Entity , CommTuple}) when Entity =/= ?MAIN_SERVER_ATOM -> {Entity, CommTuple} end, ListOfEntities),
{RouterHost,RouterPort} = ets:lookup_element(get(main_server_ets), my_router, ?DATA_IDX),
ActionStr = atom_to_list(statistics),
DestinationsList = ListOfEntities,
MessageBody = "", % there is no need for body in statistics request
nerl_tools:http_router_request(RouterHost, RouterPort, DestinationsList, ActionStr, MessageBody).
nerl_tools:http_router_request(RouterHost, RouterPort, ListOfEntities, ActionStr, MessageBody).

generate_stats_ets_tables(VerticesList) ->
generate_stats_ets_tables(EntitiesNamesList) ->
MainServerEtsStats = get(etsStats),
Func =
fun({Name, {_Host, _Port, _DeviceName}}) ->
fun(Name) ->
EntityStatsEts = stats:generate_stats_ets(),
ets:insert(MainServerEtsStats, {Name, EntityStatsEts})
end,
lists:foreach(Func, VerticesList).
lists:foreach(Func, EntitiesNamesList).

get_entity_stats_ets(EntityName) ->
MainServerEtsStats = get(etsStats),
Expand Down
19 changes: 11 additions & 8 deletions src_erl/NerlnetApp/src/Router/routerGenserver.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,16 @@ init({MyName , _Policy , NerlnetGraph}) -> %% TODO : Add policy to router
{ok, #router_genserver_state{msgCounter = 1, myName = MyName, etsRef=RoutingTableEtsRef}}.

handle_cast({statistics , _Body} , State=#router_genserver_state{etsRef = Routing_table}) ->

io:format("~p Got statistics request ~n" , [get(myName)]),
RouterStatsEts = get(router_stats_ets),
stats:increment_messages_received(RouterStatsEts),

MyName = get(myName),
StatsEtsStr = stats:encode_ets_to_http_bin_str(RouterStatsEts),
StatisticsBody = {term_to_binary(MyName) , list_to_binary(StatsEtsStr)}, % old data
[{_Dest,{_Name , RouterHost , RouterPort}}] = ets:lookup(Routing_table , ?MAIN_SERVER_ATOM),
nerl_tools:http_router_request(RouterHost, RouterPort, [?MAIN_SERVER_ATOM], atom_to_list(statistics), StatisticsBody),
StatisticsBody = {MyName , StatsEtsStr},
[{_Dest,{_Name , MyRouterAddress , MyRouterPort}}] = ets:lookup(Routing_table , MyName), % Router is always owned by itself
io:format("~p is sending statistics~n" , [get(myName)]),
nerl_tools:http_router_request(MyRouterAddress, MyRouterPort, [?MAIN_SERVER_ATOM], atom_to_list(statistics), StatisticsBody),
stats:increment_messages_sent(RouterStatsEts),
{noreply , State};

Expand Down Expand Up @@ -91,18 +92,20 @@ handle_cast({unicast,{Dest,Body}}, State = #router_genserver_state{msgCounter =
stats:increment_messages_sent(RouterStatsEts),
{noreply, State#router_genserver_state{msgCounter = MsgCounter+1,etsRef=Routing_table }};

handle_cast({broadcast,{DestList,Body}}, State = #router_genserver_state{etsRef=Routing_table }) ->
handle_cast({broadcast,{DestList,Body}}, State = #router_genserver_state{etsRef=Routing_table , myName = _MyName}) ->
io:format("@ROUTER BOROADCAST , Body: ~p~n" , [Body]),
RouterStatsEts = get(router_stats_ets),
%%Destinations = [Dest || Dest <- DestList, Dest =/= get(myName)],
stats:increment_messages_received(RouterStatsEts),
MapFunc=fun(Dest,Acc)->
%make a map when keys are addreses to send a message to, and values are lists of destination of the message that go throu key addres
[{Dest,{Name,Host,Port}}]=ets:lookup(Routing_table,Dest),
[{Dest,{Name,Host,Port}}] = ets:lookup(Routing_table,Dest),
case maps:is_key({Name,Host,Port},Acc) of
true->
%addres alread in, append Dest to exsisting value
%address already in, append Dest to existing value
NewVal=maps:get({Name,Host,Port},Acc)++[Dest];
false->
%addres not in yet, create new value for it
%address not in yet, create new value for it
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GuyPerets106
Guy please add documentation here for both cases that there is a single returned value from routing table:

  1. Single entity's destination remained from broadcast list for the next hop.
  2. entity belongs to current router.

NewVal=[Dest]
end,
maps:put({Name,Host,Port},NewVal,Acc)
Expand Down
4 changes: 2 additions & 2 deletions src_erl/NerlnetApp/src/Source/sourceStatem.erl
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ idle(cast, {startCasting,Body}, State = #source_statem_state{batchesList = Batch
{next_state, castingData, State#source_statem_state{transmitter_pid = TransmitterPID}};

idle(cast, {startCasting}, State) ->
?LOG_ERROR("Should not get start casting in idle - start casting is only after setting the phase type"),
EtsRef = get(source_ets),
% io:format("im not suppose to be here"),
StatsEtsRef = get(source_stats_ets),
stats:increment_messages_received(StatsEtsRef),

Expand All @@ -183,7 +183,7 @@ idle(cast, {statistics}, State) ->

MyName = ets:lookup_element(EtsRef, my_name, ?DATA_IDX),
StatsEtsStr = stats:encode_ets_to_http_bin_str(StatsEtsRef),
StatisticsBody = {term_to_binary(MyName) , list_to_binary(StatsEtsStr)}, % old data
StatisticsBody = {MyName , StatsEtsStr},
{RouterHost,RouterPort} = ets:lookup_element(EtsRef, my_router, ?DATA_IDX),
nerl_tools:http_router_request(RouterHost, RouterPort, [?MAIN_SERVER_ATOM], atom_to_list(statistics), StatisticsBody),
stats:increment_messages_sent(StatsEtsRef),
Expand Down
21 changes: 17 additions & 4 deletions src_erl/NerlnetApp/src/Stats/stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
-export([get_bytes_sent/1, increment_bytes_sent/2]).
-export([get_bad_messages/1, increment_bad_messages/1]).
-export([get_value/2, increment_by_value/3]).
-export([encode_ets_to_http_bin_str/1 , decode_http_bin_str_to_ets/1]).
-export([encode_ets_to_http_bin_str/1 , decode_http_bin_str_to_ets/1 , encode_workers_ets_to_http_bin_str/1]).
-export([update_workers_ets/4, increment_workers_ets/4 , generate_workers_stats_ets/0]).

get_numeric_type(Value) ->
Expand All @@ -28,14 +28,27 @@ encode_ets_to_http_bin_str(StatsEts) ->
KeyStr = lists:flatten(io_lib:format("~p" , [Key])),
ValueStr = lists:flatten(io_lib:format("~p" , [Value])),
TypeStr = lists:flatten(io_lib:format("~p" , [Type])),
%% StatsKey:StatsValue:StatsType#StatsKey:StatsValue:StatsType#...
KeyStr ++ ?SEPERATOR_WITHIN_TRIPLET ++ ValueStr ++ ?SEPERATOR_WITHIN_TRIPLET ++ TypeStr ++ ?SEPERATOR_TRIPLETS
end,
lists:flatten(lists:map(Func , StatsList)).

encode_workers_ets_to_http_bin_str(StatsEts) ->
%% Takes value from ets and converts it to "<EntitiyName1>SEPERATOR<Value1 <EntityName2>SEPERATOR<Value2>" string.
StatsList = ets:tab2list(StatsEts),
Func = fun({Key , Value}) ->
Type = get_numeric_type(Value),
KeyStr = lists:flatten(io_lib:format("~p" , [Key])),
ValueStr = lists:flatten(io_lib:format("~p" , [Value])),
TypeStr = lists:flatten(io_lib:format("~p" , [Type])),
%% StatsKey:StatsValue:StatsType#StatsKey:StatsValue:StatsType#...
KeyStr ++ ?WORKER_SEPERATOR_WITHIN_TRIPLET ++ ValueStr ++ ?WORKER_SEPERATOR_WITHIN_TRIPLET ++ TypeStr ++ ?WORKER_SEPERATOR_TRIPLETS
end,
lists:flatten(lists:map(Func , StatsList)).

decode_http_bin_str_to_ets(EncodedStr) ->
ReturnedEts = ets:new(ets_to_merge , [set]),
KeyValTypeTokens = string:tokens(EncodedStr , ?SEPERATOR_TRIPLETS),
io:format("KeyValTypeTokens: ~p~n" , [KeyValTypeTokens]),
Func = fun(Triplet) ->
[Key , ValueStr , Type] = string:tokens(Triplet , ?SEPERATOR_WITHIN_TRIPLET),
TypeAtom = list_to_atom(Type),
Expand All @@ -50,7 +63,7 @@ decode_http_bin_str_to_ets(EncodedStr) ->
ReturnedEts.


generate_stats_ets() ->
generate_stats_ets() -> %% clients , routers , mainserver...
StatsEts = ets:new(stats_ets , [set]),
ets:insert(StatsEts, {messages_received , 0}),
ets:insert(StatsEts, {messages_sent , 0}),
Expand All @@ -63,7 +76,7 @@ generate_stats_ets() ->
ets:insert(StatsEts, {batches_sent , 0}), % related with source
StatsEts.

generate_workers_stats_ets() ->
generate_workers_stats_ets() -> %% workers..
WorkersStatsEts = ets:new(workers_ets , [set, public]),
ets:insert(WorkersStatsEts, {bytes_received , 0}),
ets:insert(WorkersStatsEts, {bytes_sent , 0}),
Expand Down
6 changes: 5 additions & 1 deletion src_erl/NerlnetApp/src/Stats/stats.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@
-define(SEPERATOR_WITHIN_TRIPLET , ":").

-define(API_SERVER_ENTITY_SEPERATOR , "|").
-define(API_SERVER_WITHIN_ENTITY_SEPERATOR , "&").
-define(API_SERVER_WITHIN_ENTITY_SEPERATOR , "&").

-define(WORKER_SEPERATOR , "^").
-define(WORKER_SEPERATOR_TRIPLETS , "@").
-define(WORKER_SEPERATOR_WITHIN_TRIPLET , "$").
23 changes: 14 additions & 9 deletions src_erl/NerlnetApp/src/nerl_tools.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ read_all_data(Req0, Got) ->

deleteOldJson(FilePath) ->
try file:delete(FilePath)
catch {error, E} -> io:format("couldn't delete file ~p, beacuse ~p~n",[FilePath, E])
catch {error, E} -> ?LOG_ERROR("couldn't delete file ~p, beacuse ~p~n",[FilePath, E])
end.

% get this host ip
Expand Down Expand Up @@ -208,13 +208,18 @@ calculate_size(List) when is_list(List) ->

%% TODO: create create_body func for standard message passing

make_routing_table(Ets,EntitiesList,Origin,NerlnetGraph)->
%% Entities List must not contain ThisRouter
%% NextHop is the next router for all entities that are not connected with ThisRouter
%% Otherwise , NextHop is an entity of ThisRouter
make_routing_table(Ets,EntitiesList,ThisRouter,NerlnetGraph)->
GenerateTablesFunc = fun(Entity) ->
case digraph:get_short_path(NerlnetGraph,Origin,Entity) of
false -> ok;
ShortPath -> NextHop = lists:nth(2,ShortPath),
{Name , {Host , Port , _DeviceName}} = digraph:vertex(NerlnetGraph,NextHop),
ets:insert(Ets,{Entity,{Name,Host,Port}})
end % case end
case digraph:get_short_path(NerlnetGraph,ThisRouter,Entity) of
false -> ok;
ShortPath -> NextHop = lists:nth(2,ShortPath),
{Name , {Host , Port , _DeviceName}} = digraph:vertex(NerlnetGraph,NextHop),
ets:insert(Ets,{Entity,{Name,Host,Port}})
end % case end
end, % fun end
lists:foreach(GenerateTablesFunc, EntitiesList).
lists:foreach(GenerateTablesFunc, EntitiesList),
{ThisRouter , {RouterHost , RouterPort , _DeviceName}} = digraph:vertex(NerlnetGraph , ThisRouter),
ets:insert(Ets , {ThisRouter , {ThisRouter , RouterHost , RouterPort}}).
Loading
Loading