diff --git a/tornettools/stage.py b/tornettools/stage.py index 186cd12..298cdb1 100644 --- a/tornettools/stage.py +++ b/tornettools/stage.py @@ -7,6 +7,7 @@ from tornettools.util import dump_json_data from tornettools.util_geoip import GeoIP +from ipaddress import IPv4Address from multiprocessing import Pool, cpu_count from statistics import median from datetime import datetime, timezone @@ -18,8 +19,8 @@ # this is parsed from the consensus files class Relay(): - def __init__(self, fingerprint, address): - self.fingerprint = fingerprint + def __init__(self, fingerprints, address): + self.fingerprints = fingerprints self.address = address # the length of this list indicates the number of consensuses the relay appeared in self.weights = [] @@ -28,12 +29,11 @@ def __init__(self, fingerprint, address): # a count of the number of consensuses in which the relay had the guard flag self.num_guard = 0 # bandwidth information parsed from server descriptor files - self.bandwidths = Bandwidths(fingerprint) + self.bandwidths = Bandwidths() # this is parsed from the server descriptor files class Bandwidths(): - def __init__(self, fingerprint): - self.fingerprint = fingerprint + def __init__(self): self.max_obs_bw = 0 self.bw_rates = [] self.bw_bursts = [] @@ -111,28 +111,58 @@ def stage_relays(args): consensus_paths = get_file_list(args.consensus_path) logging.info("Processing {} consensus files from {}...".format(len(consensus_paths), args.consensus_path)) - relays, min_unix_time, max_unix_time, network_stats = process(num_processes, consensus_paths, parse_consensus, combine_parsed_consensus_results) + consensuses = process(num_processes, consensus_paths, parse_consensus, lambda x: x) + min_unix_time = min([c['pub_dt'] for c in consensuses]) + max_unix_time = max([c['pub_dt'] for c in consensuses]) servdesc_paths = get_file_list(args.server_descriptor_path) logging.info("Processing {} server descriptor files from {}...".format(len(servdesc_paths), args.server_descriptor_path)) sdesc_args = [[p, min_unix_time, max_unix_time] for p in servdesc_paths] - bandwidths = process(num_processes, sdesc_args, parse_serverdesc, combine_parsed_serverdesc_results) + serverdescs = process(num_processes, sdesc_args, parse_serverdesc, lambda x: x) + + families = families_from_serverdescs(serverdescs) + + geo = None + if args.geoip_path is not None: + geo = GeoIP(args.geoip_path) + + cluster_consensuses(families, geo, consensuses) + + relays = relays_from_consensuses(consensuses) + network_stats = network_stats_from_consensuses(consensuses) + #timestr = get_time_suffix(min_unix_time, max_unix_time) + #logging.info("Found {} total unique relays during {} with a median network size of {} relays".format(len(relays), timestr, network_stats['med_count_total'])) + + bandwidths = bandwidths_from_serverdescs(serverdescs) found_bandwidths = 0 - for fingerprint in relays: - if fingerprint in bandwidths: - # overwrite empty bandwidth with parsed bandwidth info - relays[fingerprint].bandwidths = bandwidths[fingerprint] + # each 'relay' may actually be many relays clustered into one + for relay in relays.values(): + # first we want to collect all bandwidth info we have from everyone in the cluster + cluster_bandwidths = [] + for fingerprint in relay.fingerprints: + bandwidth = bandwidths.get(fingerprint) + if bandwidth is not None: + cluster_bandwidths.append({ + 'bandwidth_capacity': int(bandwidth.max_obs_bw), + 'bandwidth_rate': int(median(bandwidth.bw_rates)) if len(bandwidth.bw_rates) > 0 else 0, + 'bandwidth_burst': int(median(bandwidth.bw_bursts)) if len(bandwidth.bw_bursts) > 0 else 0, + }) + # now flatten the bandwidth info for the cluster down into a single bandwidth for the cluster representative + if len(cluster_bandwidths) > 0: + relay.bandwidth_capacity = max(b['bandwidth_capacity'] for b in cluster_bandwidths) + relay.bandwidth_rate = max(b['bandwidth_rate'] for b in cluster_bandwidths) + relay.bandwidth_burst = max(b['bandwidth_burst'] for b in cluster_bandwidths) found_bandwidths += 1 + else: + relay.bandwidth_capacity = 0 + relay.bandwidth_rate = 0 + relay.bandwidth_burst = 0 logging.info("We found bandwidth information for {} of {} relays".format(found_bandwidths, len(relays))) # for (k, v) in sorted(relays.items(), key=lambda kv: kv[1].bandwidths.max_obs_bw): # logging.info("fp={} capacity={}".format(k, v.bandwidths.max_obs_bw)) - geo = None - if args.geoip_path is not None: - geo = GeoIP(args.geoip_path) - output = { 'min_unix_time': min_unix_time, 'max_unix_time': max_unix_time, @@ -144,15 +174,16 @@ def stage_relays(args): r = relays[fingerprint] output['relays'][fingerprint] = { - 'fingerprint': r.fingerprint, + 'fingerprints': r.fingerprints, + 'nicknames': r.nicknames, 'address': r.address, 'running_frequency': float(len(r.weights)) / float(len(consensus_paths)), # frac consensuses in which relay appeared 'guard_frequency': float(r.num_guard) / float(len(r.weights)), # when running, frac consensuses with exit flag 'exit_frequency': float(r.num_exit) / float(len(r.weights)), # when running, frac consensuses with guard flag 'weight': float(median(r.weights)) if len(r.weights) > 0 else 0.0, - 'bandwidth_capacity': int(r.bandwidths.max_obs_bw), - 'bandwidth_rate': int(median(r.bandwidths.bw_rates)) if len(r.bandwidths.bw_rates) > 0 else 0, - 'bandwidth_burst': int(median(r.bandwidths.bw_bursts)) if len(r.bandwidths.bw_bursts) > 0 else 0, + 'bandwidth_capacity': r.bandwidth_capacity, + 'bandwidth_rate': r.bandwidth_rate, + 'bandwidth_burst': r.bandwidth_burst, } if geo is not None: @@ -218,8 +249,6 @@ def parse_consensus(path): net_status = next(parse_file(path, document_handler='DOCUMENT', validate=False)) relays = {} - weights = {"total": 0, "exit": 0, "guard": 0, "exitguard": 0, "middle": 0} - counts = {"total": 0, "exit": 0, "guard": 0, "exitguard": 0, "middle": 0} for (fingerprint, router_entry) in net_status.routers.items(): if Flag.BADEXIT in router_entry.flags or Flag.RUNNING not in router_entry.flags or Flag.VALID not in router_entry.flags: @@ -229,6 +258,7 @@ def parse_consensus(path): relays[fingerprint]['address'] = router_entry.address relays[fingerprint]['weight'] = router_entry.bandwidth + relays[fingerprint]['nickname'] = router_entry.nickname if Flag.GUARD in router_entry.flags and Flag.FAST in router_entry.flags and Flag.STABLE in router_entry.flags: relays[fingerprint]['is_guard'] = True @@ -240,87 +270,136 @@ def parse_consensus(path): else: relays[fingerprint]['is_exit'] = False - # fill in the weights - bw_weight = float(router_entry.bandwidth) - - weights["total"] += bw_weight - counts["total"] += 1 - if relays[fingerprint]['is_guard'] and relays[fingerprint]['is_exit']: - weights["exitguard"] += bw_weight - counts["exitguard"] += 1 - elif relays[fingerprint]['is_guard']: - weights["guard"] += bw_weight - counts["guard"] += 1 - elif relays[fingerprint]['is_exit']: - weights["exit"] += bw_weight - counts["exit"] += 1 - else: - weights["middle"] += bw_weight - counts["middle"] += 1 - - # weights are normalized on a per-consensus basis - for fingerprint in relays: - relays[fingerprint]['weight'] /= weights["total"] - for position_type in weights: - if position_type == "total": - continue - weights[position_type] /= weights["total"] + # valid_after is for V3 descriptors, V2 use net_status.published + pub_dt = net_status.valid_after.replace(tzinfo=timezone.utc).timestamp() + assert(pub_dt is not None) result = { 'type': 'consensus', - 'pub_dt': net_status.valid_after, # valid_after is for V3 descriptors, V2 use net_status.published + 'pub_dt': pub_dt, 'relays': relays, - 'weights': weights, - 'counts': counts, } return result -def combine_parsed_consensus_results(results): - relays = {} - network_stats = {} - min_unix_time, max_unix_time = None, None +def get_cluster_key(families, geo, fingerprint, address): + masked_ip = int(IPv4Address(address)) & 0xffffffff + # We're using fairly wide IP address ranges; but we can separate + # hosts that we know to be in different countries. + country = geo.ip_to_country_code(address) if geo else None + # family will be missing if we didn't have a descriptor + # for the given relay. + family = families.get(fingerprint) or f'<{fingerprint}>' + return (masked_ip, country, family) + +def cluster_consensuses(families, geo, consensuses): + for c in consensuses: + clustered_relays = {} + for fingerprint, relay in c['relays'].items(): + clustered_relays.setdefault(get_cluster_key(families, geo, fingerprint, relay['address']), []).append((fingerprint, relay)) + new_relays = {} + for pairs in clustered_relays.values(): + fingerprints = [p[0] for p in pairs] + fingerprints.sort() + + relays = [p[1] for p in pairs] + new_relays[sorted(fingerprints)[0]] = { + 'address': sorted([r['address'] for r in relays])[0], + 'weight': sum([r['weight'] for r in relays]), + 'is_guard': any([r['is_guard'] for r in relays]), + 'is_exit': any([r['is_exit'] for r in relays]), + 'fingerprints': fingerprints, + 'nicknames': [r['nickname'] for r in relays], + } + logging.info("Clustered {} relays into {} relays".format(len(c['relays']), len(new_relays))) + c['relays'] = new_relays + + weights = { + 'total': 0, + 'exitguard': 0, + 'guard': 0, + 'exit': 0, + 'middle': 0, + } + counts = { + 'total': 0, + 'exitguard': 0, + 'guard': 0, + 'exit': 0, + 'middle': 0, + } - counts_t, counts_eg, counts_e, counts_g, counts_m = [], [], [], [], [] - weights_t, weights_eg, weights_e, weights_g, weights_m = [], [], [], [], [] + for r in new_relays.values(): + bw_weight = r['weight'] + weights["total"] += bw_weight + counts["total"] += 1 + if r['is_guard'] and r['is_exit']: + weights["exitguard"] += bw_weight + counts["exitguard"] += 1 + elif r['is_guard']: + weights["guard"] += bw_weight + counts["guard"] += 1 + elif r['is_exit']: + weights["exit"] += bw_weight + counts["exit"] += 1 + else: + weights["middle"] += bw_weight + counts["middle"] += 1 + + # weights are normalized on a per-consensus basis + for r in new_relays.values(): + r['weight'] /= weights["total"] + for position_type in weights: + if position_type == "total": + continue + weights[position_type] /= weights["total"] - for result in results: - if result is None: - continue + c['weights'] = weights + c['counts'] = counts - if result['type'] != 'consensus': - continue +def add_bandwidths(consensuses, serverdescs): + pass - if result['pub_dt'] is not None: - unix_time = result['pub_dt'].replace(tzinfo=timezone.utc).timestamp() - if min_unix_time is None or unix_time < min_unix_time: - min_unix_time = unix_time - if max_unix_time is None or unix_time > max_unix_time: - max_unix_time = unix_time +def relays_from_consensuses(consensuses): + relays = {} - weights_t.append(result['weights']['total']) - weights_eg.append(result['weights']['exitguard']) - weights_g.append(result['weights']['guard']) - weights_e.append(result['weights']['exit']) - weights_m.append(result['weights']['middle']) + for consensus in consensuses: + assert(consensus is not None) + assert(consensus['type'] == 'consensus') + for fingerprint, consensus_relay in consensus['relays'].items(): + r = relays.setdefault(fingerprint, Relay(consensus_relay['fingerprints'], consensus_relay['address'])) + r.nicknames = consensus_relay['nicknames'] - counts_t.append(result['counts']['total']) - counts_eg.append(result['counts']['exitguard']) - counts_g.append(result['counts']['guard']) - counts_e.append(result['counts']['exit']) - counts_m.append(result['counts']['middle']) + r.weights.append(consensus_relay['weight']) - for fingerprint in result['relays']: - relays.setdefault(fingerprint, Relay(fingerprint, result['relays'][fingerprint]['address'])) + if consensus_relay['is_exit']: + r.num_exit += 1 + if consensus_relay['is_guard']: + r.num_guard += 1 - r = relays[fingerprint] + return relays - r.weights.append(result['relays'][fingerprint]['weight']) +def network_stats_from_consensuses(consensuses): + network_stats = {} - if result['relays'][fingerprint]['is_exit']: - r.num_exit += 1 - if result['relays'][fingerprint]['is_guard']: - r.num_guard += 1 + counts_t, counts_eg, counts_e, counts_g, counts_m = [], [], [], [], [] + weights_t, weights_eg, weights_e, weights_g, weights_m = [], [], [], [], [] + + for consensus in consensuses: + assert(consensus is not None) + assert(consensus['type'] == 'consensus') + + weights_t.append(consensus['weights']['total']) + weights_eg.append(consensus['weights']['exitguard']) + weights_g.append(consensus['weights']['guard']) + weights_e.append(consensus['weights']['exit']) + weights_m.append(consensus['weights']['middle']) + + counts_t.append(consensus['counts']['total']) + counts_eg.append(consensus['counts']['exitguard']) + counts_g.append(consensus['counts']['guard']) + counts_e.append(consensus['counts']['exit']) + counts_m.append(consensus['counts']['middle']) network_stats = { # the counts are whole numbers @@ -337,10 +416,7 @@ def combine_parsed_consensus_results(results): 'med_weight_total': 1.0, # for completeness } - timestr = get_time_suffix(min_unix_time, max_unix_time) - logging.info("Found {} total unique relays during {} with a median network size of {} relays".format(len(relays), timestr, network_stats['med_count_total'])) - - return relays, min_unix_time, max_unix_time, network_stats + return network_stats # this func is run by helper processes in process pool def parse_serverdesc(args): @@ -367,8 +443,17 @@ def parse_serverdesc(args): if bst_bw is not None and bst_bw < advertised_bw: advertised_bw = bst_bw + # Convert fingerprints in family to match fingerprints we use everywhere else. + # i.e. remove the $ prefix and ensure upper-case. + family = set([fp[1:].upper() for fp in relay.family]) + + assert(family is not None) + # Ensure own fingerprint is in family + family.add(relay.fingerprint) + result = { 'type': 'serverdesc', + 'family': family, 'pub_dt': relay.published, 'fprint': relay.fingerprint, 'address': relay.address, @@ -380,23 +465,72 @@ def parse_serverdesc(args): return result -def combine_parsed_serverdesc_results(results): +def families_from_serverdescs(serverdescs): + family_sets = {} + + for sd in serverdescs: + if sd is None: + continue + + if sd['type'] != 'serverdesc': + continue + + # Each relay's family is the union of all the families it has published. + family_sets.setdefault(sd['fprint'], set()).update(sd['family']) + + # Remove non-mutuals + for fp, family in family_sets.items(): + mutuals = set() + for other_fp in family: + other_family = family_sets.get(other_fp) + if other_family is not None and fp in other_family: + mutuals.add(other_fp) + # Mutate `family` to contain only mutuals; don't reassign since we're iterating through the dict. + if len(family) != len(mutuals): + logging.info(f"XXX Dropping non-mutuals shrunk family from {len(family)} to {len(mutuals)}") + family.clear() + family.update(mutuals) + + # Add transitives + for fp, family in family_sets.items(): + transitives = set() + to_process = family_sets[fp].copy() + while len(to_process) > 0: + other_fp = to_process.pop() + if other_fp in transitives: + # Already processed + continue + transitives.add(other_fp) + to_process.update(family_sets[other_fp]) + if len(family) != len(transitives): + logging.info(f"XXX Adding transitives grew family from {len(family)} to {len(transitives)}") + family.clear() + family.update(transitives) + + # Convert to normalized string + families = {} + for fp, family_set in family_sets.items(): + families[fp] = str(sorted(list(family_set))) + + return families + +def bandwidths_from_serverdescs(serverdescs): bandwidths = {} - for result in results: - if result is None: + for sd in serverdescs: + if sd is None: continue - if result['type'] != 'serverdesc': + if sd['type'] != 'serverdesc': continue - bandwidths.setdefault(result['fprint'], Bandwidths(result['fprint'])) + bandwidths.setdefault(sd['fprint'], Bandwidths()) - b = bandwidths[result['fprint']] + b = bandwidths[sd['fprint']] - b.max_obs_bw = max(b.max_obs_bw, result['bw_obs']) - b.bw_rates.append(result['bw_rate']) - b.bw_bursts.append(result['bw_burst']) + b.max_obs_bw = max(b.max_obs_bw, sd['bw_obs']) + b.bw_rates.append(sd['bw_rate']) + b.bw_bursts.append(sd['bw_burst']) return bandwidths