From 6c5a23361569bdfe5d2fc530ed5ced85f0473130 Mon Sep 17 00:00:00 2001 From: Shamim Date: Sat, 19 Oct 2024 01:09:30 +0600 Subject: [PATCH] Added global DNS server to check further --- dns_security_analysis_tool.py | 166 ++++++++++++++++++++++++++++++---- 1 file changed, 149 insertions(+), 17 deletions(-) diff --git a/dns_security_analysis_tool.py b/dns_security_analysis_tool.py index aa223fb..829e3bc 100644 --- a/dns_security_analysis_tool.py +++ b/dns_security_analysis_tool.py @@ -1,3 +1,6 @@ +import time +import sys +import os import dns.resolver import dns.reversename import dns.query @@ -21,7 +24,9 @@ class DNSQueryTool: def __init__(self, dns_server): # Initialize the resolver with the specified DNS server self.resolver = dns.resolver.Resolver() - self.resolver.nameservers = [dns_server] +# self.resolver.nameservers = [dns_server] + self.default_nameserver = dns_server + def query_all_records(self, domain): # Query multiple DNS record types for a given domain @@ -42,22 +47,23 @@ def query_all_records(self, domain): results = {k: v for k, v in results.items() if v and v != ''} return results - def query_record(self, domain, record_type): - # Perform a DNS query for a specific record type + def query_record(self, domain, record_type, nameserver=None): + # Perform a DNS query for a specific record type using the specified nameserver + resolver = dns.resolver.Resolver() + resolver.nameservers = [nameserver] if nameserver else [self.default_nameserver] try: - answers = self.resolver.resolve(domain, record_type) + answers = resolver.resolve(domain, record_type) return "; ".join([answer.to_text() for answer in answers]) except dns.resolver.NXDOMAIN: if record_type == 'A' or record_type == 'AAAA': try: - self.resolver.resolve(domain, 'CNAME') + resolver.resolve(domain, 'CNAME') return f"Misconfigured" except: - return f"DNE" # Does Not Exist + return f"DNE" # Does Not Exist except Exception as e: logging.error(f"Error querying {record_type} record for {domain}: {e}") return "" - def check_reverse_dns(self, ip_addresses): # Perform reverse DNS lookups for given IP addresses/got from the A record ptr_records = [] @@ -119,10 +125,73 @@ def analyze_records(self, records): anomalies['MX'] = "Suspicious MX record found" # Additional anomaly checks can be added here return "; ".join([f"{k}: {v}" for k, v in anomalies.items()]) + # New Method: Query DNS from Multiple Regions + def query_from_multiple_regions(self, domain, record_type="A"): + dns_servers = { + "Google DNS (US)": "8.8.8.8", + "Cloudflare DNS (Global)": "1.1.1.1", + "Quad9 DNS (EU)": "9.9.9.9", + "OpenDNS (US)": "208.67.222.222" + } + results = {} + for location, nameserver in dns_servers.items(): + result = self.query_record(domain, record_type, nameserver) + results[location] = result + return results + + # New Method: Check DNSSEC from Multiple Regions + def check_dnssec_global(self, domain): + # Check DNSSEC status from different geographic locations + dns_servers = { + "Google DNS (US)": "8.8.8.8", + "Cloudflare DNS (Global)": "1.1.1.1", + "Quad9 DNS (EU)": "9.9.9.9", + "OpenDNS (US)": "208.67.222.222" + } + dnssec_results = {} + for location, nameserver in dns_servers.items(): + self.resolver.nameservers = [nameserver] + dnssec_status = self.check_dnssec(domain, '') + dnssec_results[location] = dnssec_status + return dnssec_results + + # New Method: Measure DNS Query Latency + def latency_check(self, domain): + dns_servers = { + "Google DNS (US)": "8.8.8.8", + "Cloudflare DNS (Global)": "1.1.1.1", + "Quad9 DNS (EU)": "9.9.9.9", + "OpenDNS (US)": "208.67.222.222" + } + latencies = {} + for location, nameserver in dns_servers.items(): + start_time = time.time() + self.query_record(domain, "A", nameserver) + latency = time.time() - start_time + latencies[location] = latency + return latencies def generate_report(all_data, output_format, output_filename): - # Generate a report in the specified format (CSV, HTML, JSON) + # Create a DataFrame from the cleaned and processed data df = pd.DataFrame(all_data) + + # Enhanced report formatting + # Ensuring that missing fields are clearly marked as 'Not Available' + df.fillna("Not Available", inplace=True) + + # Renaming columns for better readability (optional, depending on need) + df.rename(columns={ + 'A': 'A Record', + 'AAAA': 'AAAA Record', + 'MX': 'MX Record', + 'PTR': 'PTR Record', + 'DNSSEC': 'DNSSEC Status', + 'Global DNS Results': 'Global DNS Records', + 'Latencies': 'DNS Latency', + 'DNSSEC Global': 'DNSSEC Global Status' + }, inplace=True) + + # Generate a report in the specified format (CSV, HTML, JSON) if output_format == 'csv': df.to_csv(output_filename, index=False) elif output_format == 'html': @@ -131,35 +200,96 @@ def generate_report(all_data, output_format, output_filename): with open(output_filename, 'w') as file: file.write(json.dumps(all_data, indent=4)) -def process_domains(domains, dns_server, output_format, output_filename): +def enhance_report(df): + # Initialize a list to store enhanced rows + enhanced_rows = [] + + # Iterate over each row to break down the Global DNS Results and Latencies + for _, row in df.iterrows(): + # Ensure the data is in string format before using eval (check if eval is necessary) + global_dns = row.get('Global DNS Results', {}) + latencies = row.get('Latencies', {}) + dnssec_global = row.get('DNSSEC Global', {}) + + # Convert to dictionaries if they are in string format (using eval cautiously) + try: + if isinstance(global_dns, str): + global_dns = eval(global_dns) + if isinstance(latencies, str): + latencies = eval(latencies) + if isinstance(dnssec_global, str): + dnssec_global = eval(dnssec_global) + except Exception as e: + logging.error(f"Error processing row: {e}") + continue + + # Create a new row with the necessary enhancements + enhanced_row = { + 'Domain': row.get('domain', 'Unknown Domain'), + 'A Record': row.get('A', 'Not Available'), + 'AAAA Record': row.get('AAAA', 'Not Available'), + 'MX Record': row.get('MX', 'Not Available'), + 'PTR Record': row.get('PTR', 'Not Available'), + 'DNSSEC Status': row.get('DNSSEC', 'Not Available'), + } + # Add global DNS results and latencies for each DNS provider + for provider in ['Google DNS (US)', 'Cloudflare DNS (Global)', 'Quad9 DNS (EU)', 'OpenDNS (US)']: + enhanced_row[f'{provider} A Record'] = global_dns.get(provider, 'Not Available') + enhanced_row[f'{provider} Latency'] = f"{latencies.get(provider, 'N/A')} seconds" + enhanced_row[f'{provider} DNSSEC'] = dnssec_global.get(provider, 'Not Verified') + + # Append the enhanced row to the list + enhanced_rows.append(enhanced_row) + + # Create a new DataFrame from the enhanced rows + enhanced_df = pd.DataFrame(enhanced_rows) + return enhanced_df + + +def process_domains(domains, dns_server, output_format, output_filename, global_analysis=False): pattern = r'^((?!-)[A-Za-z0-9-]{1,63}(? 0: logging.warning(f"Skipped {skipped_domains_count} invalid domain rows") - generate_report(all_results, output_format, output_filename) + # Convert the results to a DataFrame + df = pd.DataFrame(all_results) + + # Enhance the report before generating the final output + enhanced_df = enhance_report(df) + + # Generate the final output + generate_report(enhanced_df, output_format, output_filename) + def main(): parser = argparse.ArgumentParser(description='Comprehensive DNS Security Analysis Tool') @@ -167,13 +297,15 @@ def main(): parser.add_argument('--dns-server', default='8.8.8.8', help='DNS server to use for queries') parser.add_argument('--format', default='json', choices=['json', 'csv', 'html'], help='Format of the security report') parser.add_argument('--output', '-o', default='dns_security_report', help='Output file name for the DNS security report') + parser.add_argument('--global-analysis', action='store_true', help='Enable global CDN and Anycast DNS analysis') args = parser.parse_args() with open(args.domains_file, 'r') as file: domains = [line.strip() for line in file] output_filename = f"{args.output}.{args.format}" - process_domains(domains, args.dns_server, args.format, output_filename) + process_domains(domains, args.dns_server, args.format, output_filename, global_analysis=args.global_analysis) if __name__ == "__main__": main() +