From bc138c9ecbb77b57656c3d069e0d14b293c4a700 Mon Sep 17 00:00:00 2001 From: sonhv0212 Date: Tue, 24 Dec 2024 11:52:42 +0700 Subject: [PATCH] p2psim: Add visualizer --- p2p/simulations/discovery/README.md | 11 +- p2p/simulations/discovery/discovery.py | 324 +++++++++++++++++++++++++ 2 files changed, 334 insertions(+), 1 deletion(-) create mode 100644 p2p/simulations/discovery/discovery.py diff --git a/p2p/simulations/discovery/README.md b/p2p/simulations/discovery/README.md index aa2a89281..1019cc924 100644 --- a/p2p/simulations/discovery/README.md +++ b/p2p/simulations/discovery/README.md @@ -45,4 +45,13 @@ After running the simulation, some files will be generated in the `results_dir` - `$test_name.log`: Log of all nodes in the simulation network - `stats_$test_name.csv`: Statistics of the simulation, including the number of peers, distribution of nodes in the DHT, ... - `peers_$test_name.log`: List peers of each node in the network -- `dht_$test_name.log`: List nodes in the DHT of each node \ No newline at end of file +- `dht_$test_name.log`: List nodes in the DHT of each node + +### Visualization + +To visualize the data, we can use the `discovery.py` script to plot the data. +Supported types: +- `dht_peer`: Ratio between the number of peers (outbound) and the number of nodes in the DHT +- `PeerCount`: Number of peers of each node +- `DHTBuckets`: Size of DHT +- And more type can see in the stats file diff --git a/p2p/simulations/discovery/discovery.py b/p2p/simulations/discovery/discovery.py new file mode 100644 index 000000000..ea33aa69f --- /dev/null +++ b/p2p/simulations/discovery/discovery.py @@ -0,0 +1,324 @@ +import argparse +import os +import pandas as pd +from matplotlib import pyplot as plt + +''' +Check if the node_id has a valid prefix +@param node_id: the node_id to check +@param prefixes: the prefixes to check +@return True if the node_id has a valid prefix +''' +def node_valid_with_prefixes(node_id, prefixes): + if prefixes is None: + return True + for prefix in prefixes: + if node_id.startswith(prefix): + return True + return False + +''' +Visualize the stats file generated by the simulation +''' +class StatsPlotter: + ''' + @param stats_file: the stats file generated by the simulation + @param result_dir: the directory to save the result + @param metric: the metric to plot (e.g., "PeerCount", "DHTBuckets") + @param P: Pth percentile to plot (e.g., 50, 90, 99; -1 means average) + @param time_range: the time range (from start the simulation) to plot + @param prefixes: the prefixes to filter the nodes + ''' + def __init__(self, stats_file, result_dir, metric, P, time_range=[-1,-1], prefixes=["node", "bootnode", "outbound"]): + self.stats_file = stats_file + self.result_dir = result_dir + self.prefixes = prefixes + self.metric = metric + self.P = P + self.time_range = time_range + os.makedirs(result_dir, exist_ok=True) + + ''' + Parse the stats file + @return: data_nodes is a dictionary where each key is a node, and its + data which is related to the metric, sorted by timestamps + ''' + def parse_stats(self): + df = pd.read_csv(self.stats_file) + timestamps = df['timestamp'].unique() + timestamps.sort() + self.nodes = df['node'].unique() + + # original timestamps and filter by time range + self.start_time = timestamps[0] + self.timestamps = [] + for ts in timestamps: + if self.time_range[0] == -1 or (self.time_range[0] <= ts - self.start_time and ts - self.start_time <= self.time_range[1]): + self.timestamps.append(ts) + + self.data_nodes = dict() + df = df[df['type'] == self.metric] + for node in self.nodes: + # filter node by prefixes + if not node_valid_with_prefixes(node, self.prefixes): + continue + + df_node = df[df['node'] == node] + if node not in self.data_nodes: + self.data_nodes[node] = [] + + for ts in self.timestamps: + value_str = df_node[df_node['timestamp'] == ts]['value'] + if value_str.empty: + self.data_nodes[node].append(0) + else: + # try to convert value to int, otherwise consider it as a array and sum it + value = 0 + try: + value = int(value_str.values[0]) + except: + try: + value = sum([int(v) for v in value_str.values[0][1:-1].split(" ")]) + except: + print("Error: ", value_str.values[0]) + self.data_nodes[node].append(value) + + return self.data_nodes + + ''' + Group nodes by timestamp that starts rolling out their batch + (e.g., node-1734890389-17 -> 1734890389, outbound-1734887683-49 -> 1734887683) + @return: results is a dictionary where each key is a timestamp, and its + value is a list of nodes that rolled out at that timestamp + ''' + def node_group_by_timestamp(self): + node_groups = dict() + for node in self.data_nodes: + ts = int(node.split("-")[1]) + if ts not in node_groups: + node_groups[ts] = [] + node_groups[ts].append(node) + return node_groups + + ''' + Calculate the average of the nodes in each group + @param node_groups: the node groups to calculate the average + @return: results is a dictionary where each key is name of the groups, and + its value is the average data of the nodes in that group in time range + ''' + def calc_avg_by_group(self, node_groups): + results = dict() + for group in node_groups: + results[group] = [] + nodes = node_groups[group] + for i, ts in enumerate(self.timestamps): + data = [self.data_nodes[node][i] for node in nodes] + results[group].append(sum(data) / len(data)) + return results + + ''' + Calculate the Pth percentile of the nodes in each group + @param node_groups: the node groups to calculate the Pth percentile + @param P: the Pth percentile to calculate + @return: results is a dictionary where each key is name of the groups, and + its value is the Pth percentile data of the nodes in that group in time range + ''' + def calc_p_by_group(self, node_groups, P, reverse=False): + results = dict() + for group in node_groups: + results[group] = [] + nodes = node_groups[group] + for i, ts in enumerate(self.timestamps): + data = [self.data_nodes[node][i] for node in nodes] + data.sort(reverse=reverse) + idx = int(len(data) * P / 100) + results[group].append(data[idx]) + return results + + def plot(self): + self.parse_stats() + node_groups = self.node_group_by_timestamp() + results = self.calc_avg_by_group(node_groups) if self.P == -1 else self.calc_p_by_group(node_groups, self.P, reverse=True) + normalized_timestamps = [ts - self.start_time for ts in self.timestamps] + + plt.figure(figsize=(10, 10)) + sorted_groups = list(results.keys()) + sorted_groups.sort() + for i, group in enumerate(sorted_groups): + plt.plot(normalized_timestamps, results[group], label=f"batch {i}" if i > 0 else "bootnode") + plt.xlabel("Time (s)") + plt.ylabel(self.metric) + plt.title(f"P{self.P} {self.metric}") + if self.P == -1: + plt.title(f"Average {self.metric}") + plt.legend() + plt.savefig(f"{self.result_dir}/{self.metric}_P{self.P}.png") + +''' +Visualize the DHT and peers infor +''' +class PeerDHTPlotter: + def __init__(self, dht_log_file, peer_log_file, result_dir, prefixes=["node", "bootnode", "outbound"]): + self.dht_log_file = dht_log_file + self.peer_log_file = peer_log_file + self.result_dir = result_dir + self.prefixes = prefixes + os.makedirs(result_dir, exist_ok=True) + + ''' + Parse the DHT log file + @return: dht_data is a dictionary where each key is a node_id, and its + value is a set of peers in the DHT + ''' + def parse_dht_log(self): + dht_data = dict() + with open(self.dht_log_file, "r") as f: + for line in f: + line = line.strip() + if line == '': + continue + + # node_id: [peer1, peer2, ...], [peer3, peer4, ...], ... + node_id, peer_list = line.strip().split(':') + if not node_valid_with_prefixes(node_id, self.prefixes): + continue + + buckets = peer_list.strip().split(',') + for bucket in buckets: + if bucket == '[]': + continue + peer_list = bucket.strip()[1:-1].split(' ') + for peer in peer_list: + if peer == '': + continue + if node_id not in dht_data: + dht_data[node_id] = set() + dht_data[node_id].add(peer) + return dht_data + + ''' + Parse the peers log file + @return: peer_data is a dictionary where each key is a node_id, and its + value is a list of peers that connected (oubound) to the node + ''' + def parse_peers_log(self, only_outbound=True): + peer_data = dict() + with open(self.peer_log_file, 'r') as f: + for line in f: + line = line.strip() + if line == '': + continue + + # node_id: (peer1 inbound), (peer2 inbound), ... + node_id, peer_list = line.strip().split(':') + if not node_valid_with_prefixes(node_id, self.prefixes): + continue + + for ts in peer_list.strip().split(','): + if ts == '': + continue + peer, inbound = ts.strip()[1:-1].split(' ') + if only_outbound and inbound == 'true': + continue + if node_id not in peer_data: + peer_data[node_id] = [] + peer_data[node_id].append(peer) + return peer_data + + def plot(self): + dht_data = self.parse_dht_log() + peer_data = self.parse_peers_log() + + x = [] + y = [] + + for node_id, peers in peer_data.items(): + count_peer_in_dht = 0 + for peer in peers: + if peer in dht_data[node_id]: + count_peer_in_dht += 1 + x.append(count_peer_in_dht) + y.append(len(peers)) + + hb = plt.hexbin(x, y, gridsize=30, cmap='Reds', mincnt=1) + plt.colorbar(hb, label='Frequency') + plt.xlabel('Number of peers in DHT') + plt.ylabel('Number of peers (outbound)') + plt.title('DHT vs. Outbound Peers') + plt.savefig(f"{self.result_dir}/dht_vs_outbound.png") + +def parse_arguments(): + parser = argparse.ArgumentParser(description="Plotting utility for various metrics.") + + parser.add_argument( + "--type", + required=True, + help="Type of metrics to process and plot (dht_peer, PeerCount, DHTBuckets, ... see more in stats file)." + ) + parser.add_argument( + "--P", + type=int, + default=-1, + help="Percentile for statistical calculations (e.g., 50, 90, 99; -1 means average)." + ) + parser.add_argument( + "--stats_file", + type=str, + help="Path to the stats CSV file.", + default=None + ) + parser.add_argument( + "--dht_file", + type=str, + help="Path to the DHT log file.", + default=None + ) + parser.add_argument( + "--peers_file", + type=str, + help="Path to the peers log file.", + default=None + ) + parser.add_argument( + "--result_dir", + type=str, + default="./results", + help="Directory where the resulting plots will be saved." + ) + parser.add_argument( + "--time_range", + type=str, + default="-1,-1", + help="Time range to plot (from start of the simulation)." + ) + parser.add_argument( + "--prefixes", + type=str, + default="node,bootnode,outbound", + help="Comma-separated list of prefixes to filter nodes" + ) + + return parser.parse_args() + +def main(): + args = parse_arguments() + + if args.type == "dht_peer": + if args.dht_file is None or args.peers_file is None: + print("Error: DHT log file and peers log file are required for dht_peer type.") + return + plotter = PeerDHTPlotter(args.dht_file, args.peers_file, args.result_dir, args.prefixes.split(",")) + plotter.plot() + else: + try: + time_range = [int(x) for x in args.time_range.split(",")] + except: + time_range = [-1, -1] + print("Error: Invalid time range format. Using default time") + if args.stats_file is None: + print("Error: Stats file is required for other types.") + plotter = StatsPlotter(args.stats_file, args.result_dir, args.type, args.P, time_range, args.prefixes.split(",")) + plotter.plot() + +if __name__ == "__main__": + main()