From efd979c94e9223d61a43e452d12aa21ad1b78fdd Mon Sep 17 00:00:00 2001 From: Jim Newsome Date: Wed, 29 Jun 2022 13:12:26 -0500 Subject: [PATCH 1/9] combine_parsed_consensus_results: split into parts --- tornettools/stage.py | 83 +++++++++++++++++++++++--------------------- 1 file changed, 43 insertions(+), 40 deletions(-) diff --git a/tornettools/stage.py b/tornettools/stage.py index 186cd12..b389722 100644 --- a/tornettools/stage.py +++ b/tornettools/stage.py @@ -111,7 +111,13 @@ def stage_relays(args): consensus_paths = get_file_list(args.consensus_path) logging.info("Processing {} consensus files from {}...".format(len(consensus_paths), args.consensus_path)) - relays, min_unix_time, max_unix_time, network_stats = process(num_processes, consensus_paths, parse_consensus, combine_parsed_consensus_results) + consensuses = process(num_processes, consensus_paths, parse_consensus, lambda x: x) + relays = relays_from_consensuses(consensuses) + network_stats = network_stats_from_consensuses(consensuses) + min_unix_time = min([c['pub_dt'] for c in consensuses]) + max_unix_time = max([c['pub_dt'] for c in consensuses]) + timestr = get_time_suffix(min_unix_time, max_unix_time) + logging.info("Found {} total unique relays during {} with a median network size of {} relays".format(len(relays), timestr, network_stats['med_count_total'])) servdesc_paths = get_file_list(args.server_descriptor_path) logging.info("Processing {} server descriptor files from {}...".format(len(servdesc_paths), args.server_descriptor_path)) @@ -266,9 +272,13 @@ def parse_consensus(path): continue weights[position_type] /= weights["total"] + # valid_after is for V3 descriptors, V2 use net_status.published + pub_dt = net_status.valid_after.replace(tzinfo=timezone.utc).timestamp() + assert(pub_dt is not None) + result = { 'type': 'consensus', - 'pub_dt': net_status.valid_after, # valid_after is for V3 descriptors, V2 use net_status.published + 'pub_dt': pub_dt, 'relays': relays, 'weights': weights, 'counts': counts, @@ -276,51 +286,47 @@ def parse_consensus(path): return result -def combine_parsed_consensus_results(results): +def relays_from_consensuses(consensuses): relays = {} - network_stats = {} - min_unix_time, max_unix_time = None, None - counts_t, counts_eg, counts_e, counts_g, counts_m = [], [], [], [], [] - weights_t, weights_eg, weights_e, weights_g, weights_m = [], [], [], [], [] + for consensus in consensuses: + assert(consensus is not None) + assert(consensus['type'] == 'consensus') + for fingerprint in consensus['relays']: + relays.setdefault(fingerprint, Relay(fingerprint, consensus['relays'][fingerprint]['address'])) - for result in results: - if result is None: - continue + r = relays[fingerprint] - if result['type'] != 'consensus': - continue + r.weights.append(consensus['relays'][fingerprint]['weight']) - if result['pub_dt'] is not None: - unix_time = result['pub_dt'].replace(tzinfo=timezone.utc).timestamp() - if min_unix_time is None or unix_time < min_unix_time: - min_unix_time = unix_time - if max_unix_time is None or unix_time > max_unix_time: - max_unix_time = unix_time + if consensus['relays'][fingerprint]['is_exit']: + r.num_exit += 1 + if consensus['relays'][fingerprint]['is_guard']: + r.num_guard += 1 - weights_t.append(result['weights']['total']) - weights_eg.append(result['weights']['exitguard']) - weights_g.append(result['weights']['guard']) - weights_e.append(result['weights']['exit']) - weights_m.append(result['weights']['middle']) + return relays - counts_t.append(result['counts']['total']) - counts_eg.append(result['counts']['exitguard']) - counts_g.append(result['counts']['guard']) - counts_e.append(result['counts']['exit']) - counts_m.append(result['counts']['middle']) +def network_stats_from_consensuses(consensuses): + network_stats = {} - for fingerprint in result['relays']: - relays.setdefault(fingerprint, Relay(fingerprint, result['relays'][fingerprint]['address'])) + counts_t, counts_eg, counts_e, counts_g, counts_m = [], [], [], [], [] + weights_t, weights_eg, weights_e, weights_g, weights_m = [], [], [], [], [] - r = relays[fingerprint] + for consensus in consensuses: + assert(consensus is not None) + assert(consensus['type'] == 'consensus') - r.weights.append(result['relays'][fingerprint]['weight']) + weights_t.append(consensus['weights']['total']) + weights_eg.append(consensus['weights']['exitguard']) + weights_g.append(consensus['weights']['guard']) + weights_e.append(consensus['weights']['exit']) + weights_m.append(consensus['weights']['middle']) - if result['relays'][fingerprint]['is_exit']: - r.num_exit += 1 - if result['relays'][fingerprint]['is_guard']: - r.num_guard += 1 + counts_t.append(consensus['counts']['total']) + counts_eg.append(consensus['counts']['exitguard']) + counts_g.append(consensus['counts']['guard']) + counts_e.append(consensus['counts']['exit']) + counts_m.append(consensus['counts']['middle']) network_stats = { # the counts are whole numbers @@ -337,10 +343,7 @@ def combine_parsed_consensus_results(results): 'med_weight_total': 1.0, # for completeness } - timestr = get_time_suffix(min_unix_time, max_unix_time) - logging.info("Found {} total unique relays during {} with a median network size of {} relays".format(len(relays), timestr, network_stats['med_count_total'])) - - return relays, min_unix_time, max_unix_time, network_stats + return network_stats # this func is run by helper processes in process pool def parse_serverdesc(args): From c83e70a7168de5e390f82da6f0827ad2579577f0 Mon Sep 17 00:00:00 2001 From: Jim Newsome Date: Wed, 29 Jun 2022 13:44:07 -0500 Subject: [PATCH 2/9] combine_parsed_serverdesc_results -> bandwidths_from_serverdescs --- tornettools/stage.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/tornettools/stage.py b/tornettools/stage.py index b389722..a8a4ba7 100644 --- a/tornettools/stage.py +++ b/tornettools/stage.py @@ -122,7 +122,8 @@ def stage_relays(args): servdesc_paths = get_file_list(args.server_descriptor_path) logging.info("Processing {} server descriptor files from {}...".format(len(servdesc_paths), args.server_descriptor_path)) sdesc_args = [[p, min_unix_time, max_unix_time] for p in servdesc_paths] - bandwidths = process(num_processes, sdesc_args, parse_serverdesc, combine_parsed_serverdesc_results) + serverdescs = process(num_processes, sdesc_args, parse_serverdesc, lambda x: x) + bandwidths = bandwidths_from_serverdescs(serverdescs) found_bandwidths = 0 for fingerprint in relays: @@ -383,23 +384,23 @@ def parse_serverdesc(args): return result -def combine_parsed_serverdesc_results(results): +def bandwidths_from_serverdescs(serverdescs): bandwidths = {} - for result in results: - if result is None: + for sd in serverdescs: + if sd is None: continue - if result['type'] != 'serverdesc': + if sd['type'] != 'serverdesc': continue - bandwidths.setdefault(result['fprint'], Bandwidths(result['fprint'])) + bandwidths.setdefault(sd['fprint'], Bandwidths(sd['fprint'])) - b = bandwidths[result['fprint']] + b = bandwidths[sd['fprint']] - b.max_obs_bw = max(b.max_obs_bw, result['bw_obs']) - b.bw_rates.append(result['bw_rate']) - b.bw_bursts.append(result['bw_burst']) + b.max_obs_bw = max(b.max_obs_bw, sd['bw_obs']) + b.bw_rates.append(sd['bw_rate']) + b.bw_bursts.append(sd['bw_burst']) return bandwidths From 77610642c6e822f29bc0fe884baca9952c80873a Mon Sep 17 00:00:00 2001 From: Jim Newsome Date: Wed, 29 Jun 2022 17:44:21 -0500 Subject: [PATCH 3/9] do clustering --- tornettools/stage.py | 201 +++++++++++++++++++++++++++++++------------ 1 file changed, 145 insertions(+), 56 deletions(-) diff --git a/tornettools/stage.py b/tornettools/stage.py index a8a4ba7..d50c7ae 100644 --- a/tornettools/stage.py +++ b/tornettools/stage.py @@ -7,6 +7,7 @@ from tornettools.util import dump_json_data from tornettools.util_geoip import GeoIP +from ipaddress import IPv4Address from multiprocessing import Pool, cpu_count from statistics import median from datetime import datetime, timezone @@ -18,8 +19,8 @@ # this is parsed from the consensus files class Relay(): - def __init__(self, fingerprint, address): - self.fingerprint = fingerprint + def __init__(self, fingerprints, address): + self.fingerprints = fingerprints self.address = address # the length of this list indicates the number of consensuses the relay appeared in self.weights = [] @@ -28,12 +29,11 @@ def __init__(self, fingerprint, address): # a count of the number of consensuses in which the relay had the guard flag self.num_guard = 0 # bandwidth information parsed from server descriptor files - self.bandwidths = Bandwidths(fingerprint) + self.bandwidths = Bandwidths() # this is parsed from the server descriptor files class Bandwidths(): - def __init__(self, fingerprint): - self.fingerprint = fingerprint + def __init__(self): self.max_obs_bw = 0 self.bw_rates = [] self.bw_bursts = [] @@ -112,25 +112,45 @@ def stage_relays(args): consensus_paths = get_file_list(args.consensus_path) logging.info("Processing {} consensus files from {}...".format(len(consensus_paths), args.consensus_path)) consensuses = process(num_processes, consensus_paths, parse_consensus, lambda x: x) - relays = relays_from_consensuses(consensuses) - network_stats = network_stats_from_consensuses(consensuses) min_unix_time = min([c['pub_dt'] for c in consensuses]) max_unix_time = max([c['pub_dt'] for c in consensuses]) - timestr = get_time_suffix(min_unix_time, max_unix_time) - logging.info("Found {} total unique relays during {} with a median network size of {} relays".format(len(relays), timestr, network_stats['med_count_total'])) servdesc_paths = get_file_list(args.server_descriptor_path) logging.info("Processing {} server descriptor files from {}...".format(len(servdesc_paths), args.server_descriptor_path)) sdesc_args = [[p, min_unix_time, max_unix_time] for p in servdesc_paths] serverdescs = process(num_processes, sdesc_args, parse_serverdesc, lambda x: x) + + families = families_from_serverdescs(serverdescs) + + cluster_consensuses(families, consensuses) + + relays = relays_from_consensuses(consensuses) + network_stats = network_stats_from_consensuses(consensuses) + #timestr = get_time_suffix(min_unix_time, max_unix_time) + #logging.info("Found {} total unique relays during {} with a median network size of {} relays".format(len(relays), timestr, network_stats['med_count_total'])) + bandwidths = bandwidths_from_serverdescs(serverdescs) found_bandwidths = 0 - for fingerprint in relays: - if fingerprint in bandwidths: - # overwrite empty bandwidth with parsed bandwidth info - relays[fingerprint].bandwidths = bandwidths[fingerprint] - found_bandwidths += 1 + for relay in relays.values(): + for fingerprint in relay.fingerprints: + cluster_bandwidths = [] + bandwidth = bandwidths.get(fingerprint) + if bandwidth is not None: + cluster_bandwidths.append({ + 'bandwidth_capacity': int(bandwidth.max_obs_bw), + 'bandwidth_rate': int(median(bandwidth.bw_rates)) if len(bandwidth.bw_rates) > 0 else 0, + 'bandwidth_burst': int(median(bandwidth.bw_bursts)) if len(bandwidth.bw_bursts) > 0 else 0, + }) + if len(cluster_bandwidths) > 0: + relay.bandwidth_capacity = max(b['bandwidth_capacity'] for b in cluster_bandwidths) + relay.bandwidth_rate = max(b['bandwidth_rate'] for b in cluster_bandwidths) + relay.bandwidth_burst = max(b['bandwidth_burst'] for b in cluster_bandwidths) + found_bandwidths += 1 + else: + relay.bandwidth_capacity = 0 + relay.bandwidth_rate = 0 + relay.bandwidth_burst = 0 logging.info("We found bandwidth information for {} of {} relays".format(found_bandwidths, len(relays))) # for (k, v) in sorted(relays.items(), key=lambda kv: kv[1].bandwidths.max_obs_bw): @@ -151,15 +171,15 @@ def stage_relays(args): r = relays[fingerprint] output['relays'][fingerprint] = { - 'fingerprint': r.fingerprint, + 'fingerprints': r.fingerprints, 'address': r.address, 'running_frequency': float(len(r.weights)) / float(len(consensus_paths)), # frac consensuses in which relay appeared 'guard_frequency': float(r.num_guard) / float(len(r.weights)), # when running, frac consensuses with exit flag 'exit_frequency': float(r.num_exit) / float(len(r.weights)), # when running, frac consensuses with guard flag 'weight': float(median(r.weights)) if len(r.weights) > 0 else 0.0, - 'bandwidth_capacity': int(r.bandwidths.max_obs_bw), - 'bandwidth_rate': int(median(r.bandwidths.bw_rates)) if len(r.bandwidths.bw_rates) > 0 else 0, - 'bandwidth_burst': int(median(r.bandwidths.bw_bursts)) if len(r.bandwidths.bw_bursts) > 0 else 0, + 'bandwidth_capacity': r.bandwidth_capacity, + 'bandwidth_rate': r.bandwidth_rate, + 'bandwidth_burst': r.bandwidth_burst, } if geo is not None: @@ -225,8 +245,6 @@ def parse_consensus(path): net_status = next(parse_file(path, document_handler='DOCUMENT', validate=False)) relays = {} - weights = {"total": 0, "exit": 0, "guard": 0, "exitguard": 0, "middle": 0} - counts = {"total": 0, "exit": 0, "guard": 0, "exitguard": 0, "middle": 0} for (fingerprint, router_entry) in net_status.routers.items(): if Flag.BADEXIT in router_entry.flags or Flag.RUNNING not in router_entry.flags or Flag.VALID not in router_entry.flags: @@ -247,32 +265,6 @@ def parse_consensus(path): else: relays[fingerprint]['is_exit'] = False - # fill in the weights - bw_weight = float(router_entry.bandwidth) - - weights["total"] += bw_weight - counts["total"] += 1 - if relays[fingerprint]['is_guard'] and relays[fingerprint]['is_exit']: - weights["exitguard"] += bw_weight - counts["exitguard"] += 1 - elif relays[fingerprint]['is_guard']: - weights["guard"] += bw_weight - counts["guard"] += 1 - elif relays[fingerprint]['is_exit']: - weights["exit"] += bw_weight - counts["exit"] += 1 - else: - weights["middle"] += bw_weight - counts["middle"] += 1 - - # weights are normalized on a per-consensus basis - for fingerprint in relays: - relays[fingerprint]['weight'] /= weights["total"] - for position_type in weights: - if position_type == "total": - continue - weights[position_type] /= weights["total"] - # valid_after is for V3 descriptors, V2 use net_status.published pub_dt = net_status.valid_after.replace(tzinfo=timezone.utc).timestamp() assert(pub_dt is not None) @@ -281,28 +273,98 @@ def parse_consensus(path): 'type': 'consensus', 'pub_dt': pub_dt, 'relays': relays, - 'weights': weights, - 'counts': counts, } return result +def get_cluster_key(families, fingerprint, address): + masked_ip = int(IPv4Address(address)) & 0xffff0000 + # family will be missing if we didn't have a descriptor + # for the given relay. + family = families.get(fingerprint) or f'<{fingerprint}>' + return (masked_ip, family) + +def cluster_consensuses(families, consensuses): + for c in consensuses: + clustered_relays = {} + for fingerprint, relay in c['relays'].items(): + clustered_relays.setdefault(get_cluster_key(families, fingerprint, relay['address']), []).append((fingerprint, relay)) + new_relays = {} + for pairs in clustered_relays.values(): + fingerprints = [p[0] for p in pairs] + fingerprints.sort() + + relays = [p[1] for p in pairs] + new_relays[sorted(fingerprints)[0]] = { + 'address': sorted([r['address'] for r in relays])[0], + 'weight': sum([r['weight'] for r in relays]), + 'is_guard': any([r['is_guard'] for r in relays]), + 'is_exit': any([r['is_exit'] for r in relays]), + 'fingerprints': fingerprints, + } + logging.info("Clustered {} relays into {} relays".format(len(c['relays']), len(new_relays))) + c['relays'] = new_relays + + weights = { + 'total': 0, + 'exitguard': 0, + 'guard': 0, + 'exit': 0, + 'middle': 0, + } + counts = { + 'total': 0, + 'exitguard': 0, + 'guard': 0, + 'exit': 0, + 'middle': 0, + } + + for r in new_relays.values(): + bw_weight = r['weight'] + weights["total"] += bw_weight + counts["total"] += 1 + if r['is_guard'] and r['is_exit']: + weights["exitguard"] += bw_weight + counts["exitguard"] += 1 + elif r['is_guard']: + weights["guard"] += bw_weight + counts["guard"] += 1 + elif r['is_exit']: + weights["exit"] += bw_weight + counts["exit"] += 1 + else: + weights["middle"] += bw_weight + counts["middle"] += 1 + + # weights are normalized on a per-consensus basis + for r in new_relays.values(): + r['weight'] /= weights["total"] + for position_type in weights: + if position_type == "total": + continue + weights[position_type] /= weights["total"] + + c['weights'] = weights + c['counts'] = counts + +def add_bandwidths(consensuses, serverdescs): + pass + def relays_from_consensuses(consensuses): relays = {} for consensus in consensuses: assert(consensus is not None) assert(consensus['type'] == 'consensus') - for fingerprint in consensus['relays']: - relays.setdefault(fingerprint, Relay(fingerprint, consensus['relays'][fingerprint]['address'])) - - r = relays[fingerprint] + for fingerprint, consensus_relay in consensus['relays'].items(): + r = relays.setdefault(fingerprint, Relay(consensus_relay['fingerprints'], consensus_relay['address'])) - r.weights.append(consensus['relays'][fingerprint]['weight']) + r.weights.append(consensus_relay['weight']) - if consensus['relays'][fingerprint]['is_exit']: + if consensus_relay['is_exit']: r.num_exit += 1 - if consensus['relays'][fingerprint]['is_guard']: + if consensus_relay['is_guard']: r.num_guard += 1 return relays @@ -371,8 +433,15 @@ def parse_serverdesc(args): if bst_bw is not None and bst_bw < advertised_bw: advertised_bw = bst_bw + family = relay.family + assert(family is not None) + # Ensure own fingerprint is in family + family.add(f'${relay.fingerprint}') + assert(f'${relay.fingerprint}' in family) + result = { 'type': 'serverdesc', + 'family': family, 'pub_dt': relay.published, 'fprint': relay.fingerprint, 'address': relay.address, @@ -384,6 +453,26 @@ def parse_serverdesc(args): return result +def families_from_serverdescs(serverdescs): + family_sets = {} + + for sd in serverdescs: + if sd is None: + continue + + if sd['type'] != 'serverdesc': + continue + + # Each relay's family is the union of all the families it has published. + family_sets.setdefault(sd['fprint'], set()).update(sd['family']) + + # Convert to normalized string + families = {} + for fp, family_set in family_sets.items(): + families[fp] = str(sorted(list(family_set))) + + return families + def bandwidths_from_serverdescs(serverdescs): bandwidths = {} @@ -394,7 +483,7 @@ def bandwidths_from_serverdescs(serverdescs): if sd['type'] != 'serverdesc': continue - bandwidths.setdefault(sd['fprint'], Bandwidths(sd['fprint'])) + bandwidths.setdefault(sd['fprint'], Bandwidths()) b = bandwidths[sd['fprint']] From c232e3881d4b744f0a2bc1de3ca3b2cc6bd78dda Mon Sep 17 00:00:00 2001 From: Jim Newsome Date: Wed, 29 Jun 2022 18:20:16 -0500 Subject: [PATCH 4/9] Make family calculations a bit more robust --- tornettools/stage.py | 37 ++++++++++++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/tornettools/stage.py b/tornettools/stage.py index d50c7ae..60528af 100644 --- a/tornettools/stage.py +++ b/tornettools/stage.py @@ -433,11 +433,13 @@ def parse_serverdesc(args): if bst_bw is not None and bst_bw < advertised_bw: advertised_bw = bst_bw - family = relay.family + # Convert fingerprints in family to match fingerprints we use everywhere else. + # i.e. remove the $ prefix and ensure upper-case. + family = set([fp[1:].upper() for fp in relay.family]) + assert(family is not None) # Ensure own fingerprint is in family - family.add(f'${relay.fingerprint}') - assert(f'${relay.fingerprint}' in family) + family.add(relay.fingerprint) result = { 'type': 'serverdesc', @@ -466,6 +468,35 @@ def families_from_serverdescs(serverdescs): # Each relay's family is the union of all the families it has published. family_sets.setdefault(sd['fprint'], set()).update(sd['family']) + # Remove non-mutuals + for fp, family in family_sets.items(): + mutuals = set() + for other_fp in family: + other_family = family_sets.get(other_fp) + if other_family is not None and fp in other_family: + mutuals.add(other_fp) + # Mutate `family` to contain only mutuals; don't reassign since we're iterating through the dict. + if len(family) != len(mutuals): + logging.info(f"XXX Dropping non-mutuals shrunk family from {len(family)} to {len(mutuals)}") + family.clear() + family.update(mutuals) + + # Add transitives + for fp, family in family_sets.items(): + transitives = set() + to_process = family_sets[fp].copy() + while len(to_process) > 0: + other_fp = to_process.pop() + if other_fp in transitives: + # Already processed + continue + transitives.add(other_fp) + to_process.update(family_sets[other_fp]) + if len(family) != len(transitives): + logging.info(f"XXX Adding transitives grew family from {len(family)} to {len(transitives)}") + family.clear() + family.update(transitives) + # Convert to normalized string families = {} for fp, family_set in family_sets.items(): From b2a5b41652035739b5d671245e345ad5ade5d211 Mon Sep 17 00:00:00 2001 From: Jim Newsome Date: Wed, 29 Jun 2022 19:03:07 -0500 Subject: [PATCH 5/9] Incorporate country in cluster keys --- tornettools/stage.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/tornettools/stage.py b/tornettools/stage.py index 60528af..2cf5697 100644 --- a/tornettools/stage.py +++ b/tornettools/stage.py @@ -122,7 +122,11 @@ def stage_relays(args): families = families_from_serverdescs(serverdescs) - cluster_consensuses(families, consensuses) + geo = None + if args.geoip_path is not None: + geo = GeoIP(args.geoip_path) + + cluster_consensuses(families, geo, consensuses) relays = relays_from_consensuses(consensuses) network_stats = network_stats_from_consensuses(consensuses) @@ -156,10 +160,6 @@ def stage_relays(args): # for (k, v) in sorted(relays.items(), key=lambda kv: kv[1].bandwidths.max_obs_bw): # logging.info("fp={} capacity={}".format(k, v.bandwidths.max_obs_bw)) - geo = None - if args.geoip_path is not None: - geo = GeoIP(args.geoip_path) - output = { 'min_unix_time': min_unix_time, 'max_unix_time': max_unix_time, @@ -277,18 +277,21 @@ def parse_consensus(path): return result -def get_cluster_key(families, fingerprint, address): +def get_cluster_key(families, geo, fingerprint, address): masked_ip = int(IPv4Address(address)) & 0xffff0000 + # We're using fairly wide IP address ranges; but we can separate + # hosts that we know to be in different countries. + country = geo.ip_to_country_code(address) if geo else None # family will be missing if we didn't have a descriptor # for the given relay. family = families.get(fingerprint) or f'<{fingerprint}>' - return (masked_ip, family) + return (masked_ip, country, family) -def cluster_consensuses(families, consensuses): +def cluster_consensuses(families, geo, consensuses): for c in consensuses: clustered_relays = {} for fingerprint, relay in c['relays'].items(): - clustered_relays.setdefault(get_cluster_key(families, fingerprint, relay['address']), []).append((fingerprint, relay)) + clustered_relays.setdefault(get_cluster_key(families, geo, fingerprint, relay['address']), []).append((fingerprint, relay)) new_relays = {} for pairs in clustered_relays.values(): fingerprints = [p[0] for p in pairs] From 8778984808b74ee283a371bb93da0cceab33c307 Mon Sep 17 00:00:00 2001 From: Jim Newsome Date: Wed, 29 Jun 2022 19:23:51 -0500 Subject: [PATCH 6/9] record nicknames for debugging --- tornettools/stage.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tornettools/stage.py b/tornettools/stage.py index 2cf5697..3e776b6 100644 --- a/tornettools/stage.py +++ b/tornettools/stage.py @@ -172,6 +172,7 @@ def stage_relays(args): output['relays'][fingerprint] = { 'fingerprints': r.fingerprints, + 'nicknames': r.nicknames, 'address': r.address, 'running_frequency': float(len(r.weights)) / float(len(consensus_paths)), # frac consensuses in which relay appeared 'guard_frequency': float(r.num_guard) / float(len(r.weights)), # when running, frac consensuses with exit flag @@ -254,6 +255,7 @@ def parse_consensus(path): relays[fingerprint]['address'] = router_entry.address relays[fingerprint]['weight'] = router_entry.bandwidth + relays[fingerprint]['nickname'] = router_entry.nickname if Flag.GUARD in router_entry.flags and Flag.FAST in router_entry.flags and Flag.STABLE in router_entry.flags: relays[fingerprint]['is_guard'] = True @@ -304,6 +306,7 @@ def cluster_consensuses(families, geo, consensuses): 'is_guard': any([r['is_guard'] for r in relays]), 'is_exit': any([r['is_exit'] for r in relays]), 'fingerprints': fingerprints, + 'nicknames': [r['nickname'] for r in relays], } logging.info("Clustered {} relays into {} relays".format(len(c['relays']), len(new_relays))) c['relays'] = new_relays @@ -362,6 +365,7 @@ def relays_from_consensuses(consensuses): assert(consensus['type'] == 'consensus') for fingerprint, consensus_relay in consensus['relays'].items(): r = relays.setdefault(fingerprint, Relay(consensus_relay['fingerprints'], consensus_relay['address'])) + r.nicknames = consensus_relay['nicknames'] r.weights.append(consensus_relay['weight']) From 03a0d5fb7a9e1d24ea84ce0b6326a4ed1a3dfa49 Mon Sep 17 00:00:00 2001 From: Jim Newsome Date: Tue, 21 Feb 2023 10:35:50 -0600 Subject: [PATCH 7/9] cluster /8's instead of /16's --- tornettools/stage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tornettools/stage.py b/tornettools/stage.py index 3e776b6..e11fe3c 100644 --- a/tornettools/stage.py +++ b/tornettools/stage.py @@ -280,7 +280,7 @@ def parse_consensus(path): return result def get_cluster_key(families, geo, fingerprint, address): - masked_ip = int(IPv4Address(address)) & 0xffff0000 + masked_ip = int(IPv4Address(address)) & 0xffffff00 # We're using fairly wide IP address ranges; but we can separate # hosts that we know to be in different countries. country = geo.ip_to_country_code(address) if geo else None From bf515575a27a2dffeb4feebe92fecf1ccbfea6dd Mon Sep 17 00:00:00 2001 From: Rob Jansen Date: Wed, 8 Mar 2023 11:58:09 -0500 Subject: [PATCH 8/9] Change clustering criteria from /24 to /32 (ie exact IP match) --- tornettools/stage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tornettools/stage.py b/tornettools/stage.py index e11fe3c..74dd720 100644 --- a/tornettools/stage.py +++ b/tornettools/stage.py @@ -280,7 +280,7 @@ def parse_consensus(path): return result def get_cluster_key(families, geo, fingerprint, address): - masked_ip = int(IPv4Address(address)) & 0xffffff00 + masked_ip = int(IPv4Address(address)) & 0xffffffff # We're using fairly wide IP address ranges; but we can separate # hosts that we know to be in different countries. country = geo.ip_to_country_code(address) if geo else None From 27b85a57223c95209c24b778a3d6d4bdeb9ebf49 Mon Sep 17 00:00:00 2001 From: Rob Jansen Date: Wed, 8 Mar 2023 11:59:03 -0500 Subject: [PATCH 9/9] Fix bug where relays were not being clustered as intended --- tornettools/stage.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/tornettools/stage.py b/tornettools/stage.py index 74dd720..298cdb1 100644 --- a/tornettools/stage.py +++ b/tornettools/stage.py @@ -136,9 +136,11 @@ def stage_relays(args): bandwidths = bandwidths_from_serverdescs(serverdescs) found_bandwidths = 0 + # each 'relay' may actually be many relays clustered into one for relay in relays.values(): + # first we want to collect all bandwidth info we have from everyone in the cluster + cluster_bandwidths = [] for fingerprint in relay.fingerprints: - cluster_bandwidths = [] bandwidth = bandwidths.get(fingerprint) if bandwidth is not None: cluster_bandwidths.append({ @@ -146,15 +148,16 @@ def stage_relays(args): 'bandwidth_rate': int(median(bandwidth.bw_rates)) if len(bandwidth.bw_rates) > 0 else 0, 'bandwidth_burst': int(median(bandwidth.bw_bursts)) if len(bandwidth.bw_bursts) > 0 else 0, }) - if len(cluster_bandwidths) > 0: - relay.bandwidth_capacity = max(b['bandwidth_capacity'] for b in cluster_bandwidths) - relay.bandwidth_rate = max(b['bandwidth_rate'] for b in cluster_bandwidths) - relay.bandwidth_burst = max(b['bandwidth_burst'] for b in cluster_bandwidths) - found_bandwidths += 1 - else: - relay.bandwidth_capacity = 0 - relay.bandwidth_rate = 0 - relay.bandwidth_burst = 0 + # now flatten the bandwidth info for the cluster down into a single bandwidth for the cluster representative + if len(cluster_bandwidths) > 0: + relay.bandwidth_capacity = max(b['bandwidth_capacity'] for b in cluster_bandwidths) + relay.bandwidth_rate = max(b['bandwidth_rate'] for b in cluster_bandwidths) + relay.bandwidth_burst = max(b['bandwidth_burst'] for b in cluster_bandwidths) + found_bandwidths += 1 + else: + relay.bandwidth_capacity = 0 + relay.bandwidth_rate = 0 + relay.bandwidth_burst = 0 logging.info("We found bandwidth information for {} of {} relays".format(found_bandwidths, len(relays))) # for (k, v) in sorted(relays.items(), key=lambda kv: kv[1].bandwidths.max_obs_bw):