Skip to content

Commit

Permalink
Merge pull request #7 from shamimrezasohag/global-cdn-anycast-analysis
Browse files Browse the repository at this point in the history
Added global DNS server to check further
  • Loading branch information
shamimrezasohag authored Oct 18, 2024
2 parents 832b705 + 6c5a233 commit 9eb76f6
Showing 1 changed file with 149 additions and 17 deletions.
166 changes: 149 additions & 17 deletions dns_security_analysis_tool.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import time
import sys
import os
import dns.resolver
import dns.reversename
import dns.query
Expand All @@ -21,7 +24,9 @@ class DNSQueryTool:
def __init__(self, dns_server):
# Initialize the resolver with the specified DNS server
self.resolver = dns.resolver.Resolver()
self.resolver.nameservers = [dns_server]
# self.resolver.nameservers = [dns_server]
self.default_nameserver = dns_server


def query_all_records(self, domain):
# Query multiple DNS record types for a given domain
Expand All @@ -42,22 +47,23 @@ def query_all_records(self, domain):
results = {k: v for k, v in results.items() if v and v != ''}
return results

def query_record(self, domain, record_type):
# Perform a DNS query for a specific record type
def query_record(self, domain, record_type, nameserver=None):
# Perform a DNS query for a specific record type using the specified nameserver
resolver = dns.resolver.Resolver()
resolver.nameservers = [nameserver] if nameserver else [self.default_nameserver]
try:
answers = self.resolver.resolve(domain, record_type)
answers = resolver.resolve(domain, record_type)
return "; ".join([answer.to_text() for answer in answers])
except dns.resolver.NXDOMAIN:
if record_type == 'A' or record_type == 'AAAA':
try:
self.resolver.resolve(domain, 'CNAME')
resolver.resolve(domain, 'CNAME')
return f"Misconfigured"
except:
return f"DNE" # Does Not Exist
return f"DNE" # Does Not Exist
except Exception as e:
logging.error(f"Error querying {record_type} record for {domain}: {e}")
return ""

def check_reverse_dns(self, ip_addresses):
# Perform reverse DNS lookups for given IP addresses/got from the A record
ptr_records = []
Expand Down Expand Up @@ -119,10 +125,73 @@ def analyze_records(self, records):
anomalies['MX'] = "Suspicious MX record found"
# Additional anomaly checks can be added here
return "; ".join([f"{k}: {v}" for k, v in anomalies.items()])
# New Method: Query DNS from Multiple Regions
def query_from_multiple_regions(self, domain, record_type="A"):
dns_servers = {
"Google DNS (US)": "8.8.8.8",
"Cloudflare DNS (Global)": "1.1.1.1",
"Quad9 DNS (EU)": "9.9.9.9",
"OpenDNS (US)": "208.67.222.222"
}
results = {}
for location, nameserver in dns_servers.items():
result = self.query_record(domain, record_type, nameserver)
results[location] = result
return results

# New Method: Check DNSSEC from Multiple Regions
def check_dnssec_global(self, domain):
# Check DNSSEC status from different geographic locations
dns_servers = {
"Google DNS (US)": "8.8.8.8",
"Cloudflare DNS (Global)": "1.1.1.1",
"Quad9 DNS (EU)": "9.9.9.9",
"OpenDNS (US)": "208.67.222.222"
}
dnssec_results = {}
for location, nameserver in dns_servers.items():
self.resolver.nameservers = [nameserver]
dnssec_status = self.check_dnssec(domain, '')
dnssec_results[location] = dnssec_status
return dnssec_results

# New Method: Measure DNS Query Latency
def latency_check(self, domain):
dns_servers = {
"Google DNS (US)": "8.8.8.8",
"Cloudflare DNS (Global)": "1.1.1.1",
"Quad9 DNS (EU)": "9.9.9.9",
"OpenDNS (US)": "208.67.222.222"
}
latencies = {}
for location, nameserver in dns_servers.items():
start_time = time.time()
self.query_record(domain, "A", nameserver)
latency = time.time() - start_time
latencies[location] = latency
return latencies

def generate_report(all_data, output_format, output_filename):
# Generate a report in the specified format (CSV, HTML, JSON)
# Create a DataFrame from the cleaned and processed data
df = pd.DataFrame(all_data)

# Enhanced report formatting
# Ensuring that missing fields are clearly marked as 'Not Available'
df.fillna("Not Available", inplace=True)

# Renaming columns for better readability (optional, depending on need)
df.rename(columns={
'A': 'A Record',
'AAAA': 'AAAA Record',
'MX': 'MX Record',
'PTR': 'PTR Record',
'DNSSEC': 'DNSSEC Status',
'Global DNS Results': 'Global DNS Records',
'Latencies': 'DNS Latency',
'DNSSEC Global': 'DNSSEC Global Status'
}, inplace=True)

# Generate a report in the specified format (CSV, HTML, JSON)
if output_format == 'csv':
df.to_csv(output_filename, index=False)
elif output_format == 'html':
Expand All @@ -131,49 +200,112 @@ def generate_report(all_data, output_format, output_filename):
with open(output_filename, 'w') as file:
file.write(json.dumps(all_data, indent=4))

def process_domains(domains, dns_server, output_format, output_filename):
def enhance_report(df):
# Initialize a list to store enhanced rows
enhanced_rows = []

# Iterate over each row to break down the Global DNS Results and Latencies
for _, row in df.iterrows():
# Ensure the data is in string format before using eval (check if eval is necessary)
global_dns = row.get('Global DNS Results', {})
latencies = row.get('Latencies', {})
dnssec_global = row.get('DNSSEC Global', {})

# Convert to dictionaries if they are in string format (using eval cautiously)
try:
if isinstance(global_dns, str):
global_dns = eval(global_dns)
if isinstance(latencies, str):
latencies = eval(latencies)
if isinstance(dnssec_global, str):
dnssec_global = eval(dnssec_global)
except Exception as e:
logging.error(f"Error processing row: {e}")
continue

# Create a new row with the necessary enhancements
enhanced_row = {
'Domain': row.get('domain', 'Unknown Domain'),
'A Record': row.get('A', 'Not Available'),
'AAAA Record': row.get('AAAA', 'Not Available'),
'MX Record': row.get('MX', 'Not Available'),
'PTR Record': row.get('PTR', 'Not Available'),
'DNSSEC Status': row.get('DNSSEC', 'Not Available'),
}

# Add global DNS results and latencies for each DNS provider
for provider in ['Google DNS (US)', 'Cloudflare DNS (Global)', 'Quad9 DNS (EU)', 'OpenDNS (US)']:
enhanced_row[f'{provider} A Record'] = global_dns.get(provider, 'Not Available')
enhanced_row[f'{provider} Latency'] = f"{latencies.get(provider, 'N/A')} seconds"
enhanced_row[f'{provider} DNSSEC'] = dnssec_global.get(provider, 'Not Verified')

# Append the enhanced row to the list
enhanced_rows.append(enhanced_row)

# Create a new DataFrame from the enhanced rows
enhanced_df = pd.DataFrame(enhanced_rows)
return enhanced_df


def process_domains(domains, dns_server, output_format, output_filename, global_analysis=False):
pattern = r'^((?!-)[A-Za-z0-9-]{1,63}(?<!-)\.)+[A-Za-z]{2,6}$'
# ... inside your process_domains function ...

valid_domains = [domain for domain in domains if re.match(pattern, domain) is not None and domain.strip()]

# Process a list of domains for DNS security analysis
# Initialize the tool with the specified DNS server
tool = DNSQueryTool(dns_server)
all_results = []
max_threads = 10 # You can adjust the number of threads

# Filter out empty domains and prepare for progress tracking
skipped_domains_count = len(domains) - len(valid_domains)

with tqdm(total=len(valid_domains), desc="Analyzing Domains") as pbar:
with ThreadPoolExecutor(max_workers=max_threads) as executor:
# Create a future for each domain in the filtered list
futures = {executor.submit(tool.query_all_records, domain): domain for domain in valid_domains}

for future in as_completed(futures):
dns_results = future.result()

# Run global analysis if the flag is set
if global_analysis:
global_dns_results = tool.query_from_multiple_regions(dns_results['domain'])
latencies = tool.latency_check(dns_results['domain'])
dnssec_global_results = tool.check_dnssec_global(dns_results['domain'])

dns_results['Global DNS Results'] = global_dns_results
dns_results['Latencies'] = latencies
dns_results['DNSSEC Global'] = dnssec_global_results

all_results.append(dns_results)
pbar.update(1)

if skipped_domains_count > 0:
logging.warning(f"Skipped {skipped_domains_count} invalid domain rows")

generate_report(all_results, output_format, output_filename)
# Convert the results to a DataFrame
df = pd.DataFrame(all_results)

# Enhance the report before generating the final output
enhanced_df = enhance_report(df)

# Generate the final output
generate_report(enhanced_df, output_format, output_filename)


def main():
parser = argparse.ArgumentParser(description='Comprehensive DNS Security Analysis Tool')
parser.add_argument('--domains-file', required=True, help='Input file for bulk domain security analysis')
parser.add_argument('--dns-server', default='8.8.8.8', help='DNS server to use for queries')
parser.add_argument('--format', default='json', choices=['json', 'csv', 'html'], help='Format of the security report')
parser.add_argument('--output', '-o', default='dns_security_report', help='Output file name for the DNS security report')
parser.add_argument('--global-analysis', action='store_true', help='Enable global CDN and Anycast DNS analysis')
args = parser.parse_args()

with open(args.domains_file, 'r') as file:
domains = [line.strip() for line in file]

output_filename = f"{args.output}.{args.format}"
process_domains(domains, args.dns_server, args.format, output_filename)
process_domains(domains, args.dns_server, args.format, output_filename, global_analysis=args.global_analysis)

if __name__ == "__main__":
main()

0 comments on commit 9eb76f6

Please sign in to comment.