-
Notifications
You must be signed in to change notification settings - Fork 2
/
all-the-things.py
2025 lines (1769 loc) · 78.1 KB
/
all-the-things.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
import re
import pdfplumber
import docx
import pandas as pd
from rich.console import Console
from rich.table import Table
from rich.theme import Theme
from rich import box
import sys
import time
import shutil
import pyfiglet
import requests
import csv
import json
import socket
import base64
from ipaddress import ip_network, ip_address, collapse_addresses
from ipwhois import IPWhois
from concurrent.futures import ThreadPoolExecutor, as_completed
# ---------------------------------------------------------------------
# Initialize rich console with a custom theme
# ---------------------------------------------------------------------
custom_theme = Theme({
"header": "bold magenta",
"info": "dim cyan",
"warning": "bold yellow",
"error": "bold red",
"success": "bold green",
})
console = Console(theme=custom_theme)
# --- Helper Functions for Displaying ASCII Art (Optional) ---
def get_terminal_size():
"""Return the size of the terminal window."""
columns, lines = shutil.get_terminal_size((80, 20))
return columns, lines
def read_ascii_art(file_path):
"""Read ASCII art from a file."""
with open(file_path, 'r') as file:
return file.readlines()
def display_ascii_art(art, columns):
"""Display ASCII art resized to fit the terminal width."""
for line in art:
print((line.strip('\n') + ' ' * columns)[:columns])
def banner():
"""Display banner ASCII art."""
file_path = 'ascii_art.txt' # Assuming you have an ASCII art file
if os.path.exists(file_path):
ascii_art = read_ascii_art(file_path)
columns, _ = get_terminal_size()
os.system('cls' if os.name == 'nt' else 'clear')
display_ascii_art(ascii_art, columns)
time.sleep(5) # Reduced wait time
print("\033c", end="") # Clear the screen
def title():
wordart = pyfiglet.figlet_format("ATT", font="dos_rebel")
print(wordart)
# ---------------------------------------------------------------------
# List of keywords to search for in tables
# ---------------------------------------------------------------------
keywords = [
'TADIG Code', 'Network Name', 'Network Type', 'Technology', '2G Frequencies', '3G Frequencies',
'4G Frequencies', '5G Frequencies', 'MSISDN Number Ranges', 'Country Code', 'National Destination Code',
'SN Range Start', 'SN Range Stop', 'Mobile Country Code', 'Mobile Network Code', 'IMSI to MGT Translation',
'Does Number Portability Apply?', 'SCCP Gateway Information', 'SCCP Carrier Name', 'DPC Information',
'XUDT/XUDTS Segmentation Capabilities', 'ANSI Networks SCCP Gateway', 'Subscriber Identity Authentication',
'Authentication Performed for GSM Subscribers', 'Authentication Performed for GPRS Subscribers', 'Cipher Algorithm',
'Automatic Roaming Testing', 'AAC (Automatic Answering Circuit) Information',
'DAAC (Data Automatic Answering Circuit) Information', 'Mobile Application Part (MAP) Information',
'Application Context Names', 'Inbound Roaming', 'Outbound Roaming', 'MSC/VLR, SGSN versions',
'MAP Versions', 'MAP Functionality', 'roamingNumberEnquiry', 'subscribeDataMngt', 'CAMEL Information',
'CAMEL Phases and Versions', 'O-CSI', 'D-CSI', 'MT-SMS-CSI', 'CAP Version', 'CAPv2', 'CAPv3', 'CAPv4',
'Partial Implementations', 'CAMEL Phase 4 CSIs', 'MG-CSI', 'Network Elements Information', 'Node Types',
'HLR', 'MSC/VLR', 'SCP', 'SMSC', 'Node IDs and Addresses', 'E.164 Numbers', 'IP Addresses',
'Global Title Addresses', 'Roamware Dummy Global Titles', 'VIP Information', 'SGSN', 'GGSN VIP IP addresses',
'Packet Data Services Information', 'APN Identifiers', 'APN Operator Identifier', 'APN Credentials',
'APN DNS IP Addresses', 'GTP Versions', 'GTPv1', 'SGSN and GGSN Versions', 'Multiple PDP Context Support',
'2G/3G Data Service Profiles', 'GPRS Information', 'APN List for Testing and Troubleshooting',
'WAP APN', 'MMS APN Information', 'Pingable and Trace-routable IP Addresses', 'Autonomous System Numbers',
'ASN', 'GRX Providers', 'Inter-PLMN GSN Backbone IP Address Ranges', 'LTE Roaming Information',
'IPX Provider Names', 'Primary IP Addresses', 'Secondary IP Addresses', 'Diameter Architecture',
'EPC Realms for Roaming', 'S6a', 'S6d', 'SMS over NAS Support', 'LTE QoS Profiles', 'Miscellaneous Information',
'MSRN Structure', 'IMSI Structure', 'SMSC GT Addresses', 'Additional Global Title Addresses', 'Roamware Dummy'
]
# ---------------------------------------------------------------------
# Table Extraction
# ---------------------------------------------------------------------
def table_contains_keywords(table, keywords):
for row in table:
for cell in row:
if cell:
for keyword in keywords:
if keyword.lower() in cell.lower():
return True
return False
def get_table_keyword(table, keywords):
"""
Return the most relevant keyword found in the table.
Used to label the CSV with something meaningful.
"""
keyword_counts = {}
for row in table:
for cell in row:
if cell:
for keyword in keywords:
if keyword.lower() in cell.lower():
keyword_counts[keyword] = keyword_counts.get(keyword, 0) + 1
if keyword_counts:
sorted_keywords = sorted(keyword_counts.items(), key=lambda x: x[1], reverse=True)
return sorted_keywords[0][0].replace(' ', '_') # e.g., "TADIG_Code"
return 'table'
def extract_tables_from_pdf(file_path, keywords):
tables = []
with pdfplumber.open(file_path) as pdf:
for page_num, page in enumerate(pdf.pages, start=1):
page_tables = page.extract_tables()
if page_tables:
for table in page_tables:
# Some tables may contain None cells
sanitized_table = []
for row in table:
sanitized_row = [str(cell) if cell else '' for cell in row]
sanitized_table.append(sanitized_row)
# Now check keywords
if table_contains_keywords(sanitized_table, keywords):
console.print(
f"Found matching table on page {page_num} in [info]{os.path.basename(file_path)}[/info]",
style="success")
tables.append(sanitized_table)
return tables
def extract_tables_from_docx(file_path, keywords):
doc = docx.Document(file_path)
tables = []
for table in doc.tables:
data = []
for row in table.rows:
data.append([cell.text.strip() for cell in row.cells])
if data and table_contains_keywords(data, keywords):
console.print(
f"Found matching table in [info]{os.path.basename(file_path)}[/info]",
style="success")
tables.append(data)
return tables
def save_tables(tables, output_directory, base_filename):
for i, table in enumerate(tables):
max_columns = max(len(row) for row in table)
standardized_table = []
for row in table:
# Pad or truncate each row to match max_columns
if len(row) < max_columns:
row.extend([''] * (max_columns - len(row))) # <-- Fixed parenthesis here
elif len(row) > max_columns:
row = row[:max_columns]
standardized_table.append(row)
# Use the first row as the header
header = standardized_table[0]
data_rows = standardized_table[1:]
# Replace empty headers with default names
header = [f"Column_{j+1}" if not col else col for j, col in enumerate(header)]
df = pd.DataFrame(data_rows, columns=header)
# Get a keyword for the table name
table_keyword = get_table_keyword(standardized_table, keywords)
table_keyword = re.sub(r'[^\w\-_. ]', '_', table_keyword)
output_file = os.path.join(output_directory, f"{base_filename}_{table_keyword}_{i+1}.csv")
df.to_csv(output_file, index=False)
console.print(f"Saved table to [success]{output_file}[/success]")
def display_tables_from_folder(output_directory, selected_indices=None):
csv_files = [f for f in os.listdir(output_directory) if f.endswith('.csv')]
if not csv_files:
console.print("No tables found in this folder.", style="warning")
return
if selected_indices is None:
selected_indices = range(1, len(csv_files) + 1)
for idx in selected_indices:
if 1 <= idx <= len(csv_files):
csv_file = csv_files[idx - 1]
df = pd.read_csv(os.path.join(output_directory, csv_file))
console.print(f"Displaying [info]{csv_file}[/info]:", style="header")
display_table_with_rich(df)
else:
console.print(f"Table number {idx} is out of range.", style="error")
def display_table_with_rich(df):
table = Table(show_header=True, header_style="bold magenta", box=box.SQUARE)
for column in df.columns:
table.add_column(str(column), overflow="fold")
for _, row in df.iterrows():
table.add_row(*[str(item) for item in row])
console.print(table)
def process_file(file_path, directory):
filename = os.path.basename(file_path)
console.print(f"Processing file: [info]{filename}[/info]", style="success")
if filename.lower().endswith(".pdf"):
tables = extract_tables_from_pdf(file_path, keywords)
elif filename.lower().endswith(".docx"):
tables = extract_tables_from_docx(file_path, keywords)
else:
console.print(f"Skipping file: [warning]{filename}[/warning] (unsupported format)")
return
if tables:
base_filename = os.path.splitext(filename)[0]
output_directory = os.path.join(directory, base_filename)
os.makedirs(output_directory, exist_ok=True)
save_tables(tables, output_directory, base_filename)
display_tables_from_folder(output_directory)
else:
console.print(f"No matching tables found in [warning]{filename}[/warning]", style="warning")
def list_processed_folders(directory):
folders = [f for f in os.listdir(directory) if os.path.isdir(os.path.join(directory, f))]
folders = [f for f in folders if not f.startswith('.')]
if not folders:
console.print("No previously ingested folders found.", style="warning")
return []
console.print("Previously ingested folders:", style="header")
for idx, folder in enumerate(folders, 1):
console.print(f"{idx}. {folder}")
return folders
def parse_user_selection(user_input, max_value):
selections = set()
parts = user_input.split(',')
for part in parts:
part = part.strip()
if '-' in part:
try:
start_str, end_str = part.split('-')
start = int(start_str)
end = int(end_str)
if start <= end:
for i in range(start, end + 1):
if 1 <= i <= max_value:
selections.add(i)
else:
console.print(f"Invalid range '{part}'.", style="error")
except ValueError:
console.print(f"Invalid range '{part}'.", style="error")
else:
try:
i = int(part)
if 1 <= i <= max_value:
selections.add(i)
else:
console.print(f"Invalid number '{part}'.", style="error")
except ValueError:
console.print(f"Invalid input '{part}'.", style="error")
return sorted(selections)
def parse_directory_input(directory_input):
directory = directory_input.strip('"\'')
if '/' in directory:
directory = directory.rsplit('/', 1)[0]
elif '\\' in directory:
directory = directory.rsplit('\\', 1)[0]
return directory
def read_recent_directories():
recent_dirs = []
if os.path.exists('recent_dirs.txt'):
with open('recent_dirs.txt', 'r') as file:
recent_dirs = [line.strip() for line in file.readlines()]
return recent_dirs[:3]
def write_recent_directory(directory):
recent_dirs = read_recent_directories()
if directory in recent_dirs:
recent_dirs.remove(directory)
recent_dirs.insert(0, directory)
with open('recent_dirs.txt', 'w') as file:
for dir_ in recent_dirs[:3]:
file.write(dir_ + '\n')
def recall_tables():
"""
Menu system to let user pick a base directory (with recent shortcuts),
then pick an ingested subfolder, then display CSV tables.
"""
print("\033c", end="")
title()
recent_dirs = read_recent_directories()
if recent_dirs:
console.print("Recent directories:", style="header")
for idx, dir_ in enumerate(recent_dirs, 1):
console.print(f"{idx}. {dir_}")
console.print("Enter the base directory where ingested folders are stored:")
base_dir_input = console.input("(You can select a recent directory by number, or enter a new path): ").strip()
if base_dir_input.isdigit():
selection = int(base_dir_input)
if 1 <= selection <= len(recent_dirs):
directory = recent_dirs[selection - 1]
else:
console.print("Invalid selection.", style="error")
return
else:
directory = parse_directory_input(base_dir_input)
if os.path.isdir(directory):
write_recent_directory(directory)
folders = list_processed_folders(directory)
if not folders:
return
try:
choice = int(console.input("Select a folder number to display tables (0 to cancel): "))
except ValueError:
console.print("Invalid input.", style="error")
return
if choice == 0:
return
if 1 <= choice <= len(folders):
selected_folder = folders[choice - 1]
output_directory = os.path.join(directory, selected_folder)
csv_files = [f for f in os.listdir(output_directory) if f.endswith('.csv')]
if not csv_files:
console.print("No tables found in this folder.", style="warning")
return
console.print("Available tables in the folder:", style="header")
for idx, csv_file in enumerate(csv_files, 1):
console.print(f"{idx}. {csv_file}")
while True:
user_input = console.input("Enter table numbers to display (e.g., 1,2-5,8), or 0 to go back: ")
if user_input.strip() == '0':
return
selected_indices = parse_user_selection(user_input, len(csv_files))
if selected_indices:
display_tables_from_folder(output_directory, selected_indices)
more_input = console.input("Would you like to view more tables from this folder? (y/n): ").strip().lower()
if more_input != 'y':
return
else:
console.print("No valid table numbers entered. Please try again.", style="error")
else:
console.print("Invalid selection.", style="error")
else:
console.print("Invalid directory. Please try again.", style="error")
# ---------------------------------------------------------------------
# Explanation & Google Dork Functions
# ---------------------------------------------------------------------
def chunked_print(text, lines_per_page=30):
"""
Prints the given text in chunks of `lines_per_page`.
After printing each chunk, waits for user input:
- Press Enter to continue
- Type 'q' and press Enter to quit early
"""
lines = text.strip().splitlines()
idx = 0
total_lines = len(lines)
while idx < total_lines:
chunk = lines[idx : idx + lines_per_page]
# Print this chunk
console.print("\n".join(chunk), style="info")
idx += lines_per_page
if idx < total_lines:
# Prompt user if there's more to show
user_input = console.input("\nPress [Enter] to continue or 'q' to quit: ").strip().lower()
if user_input == 'q':
break
def explain_tool():
"""
Comprehensive explanation reflecting every key feature in the tool:
- Document ingestion (IR.21, IR.85, etc.)
- Table extraction and CSV saving
- IP and CIDR extraction
- ASN lookups
- BGP queries (BGPView)
- Resolved IP hostnames
- Previously ingested data recall
- OSINT/Recon display from BGP data
- Google Dorking for new Telco docs
"""
# Show a short header
console.print("\n[+] What is this tool about?", style="header")
# Build one multiline string to explain all features
explanation_text = r"""
This tool is a one-stop solution to automate Open-Source Intelligence (OSINT) and
telecom-focused data extraction from common carrier documentation such as GSMA IR.21
and IR.85 files. Whether you are performing security research, network engineering,
or investigating roaming agreements, this suite of features will streamline your process.
Here is a breakdown of the major features:
1) Google Dorking for Telco Docs (IR.21 or IR.85)
- Quickly discover publicly exposed IR.21/IR.85 PDFs using Google Dork queries.
- Download interesting documents for further ingestion.
2) Ingest & Process New Documents (PDF or DOCX)
- Automatically parse IR.21, IR.85, and similar telecom-related documents.
- Detect and extract relevant tables containing:
• Roaming details
• SCCP Gateway info
• DPC (Destination Point Code) data
• IP addresses for Diameter, GPRS, LTE
• ASN references, etc.
- Save each extracted table as CSV for easy review.
3) Recall & Display Ingested Data
- Organize extracted tables by folder for quick retrieval.
- Quickly list previously ingested documents and pick which tables to display.
- Use a Rich-based table view in the console for a clean, readable format.
4) Extract IPs & CIDRs
- Parse your CSV data for any IP addresses or CIDR ranges.
- Flatten or expand CIDRs to get all IPs if desired.
- Save them in a unique list (unique_ips.txt) for further analysis or lookups.
5) ASN Lookup & BGP Queries (via BGPView)
- Convert your extracted IPs/CIDRs into a minimal set of aggregated networks.
- Query each network to identify the associated ASN (using IPWhois).
- Optionally query BGPView for each ASN to see:
• Prefixes (IPv4 & IPv6)
• Peers, Upstreams, Downstreams
• Internet Exchange Points (IXs)
- Store this extended ASN data locally (asn_all_endpoints.json).
6) Resolve IPs to Hostnames
- Perform reverse DNS lookups on IPs to find potential hostnames.
- Handy for OSINT when identifying domain or service behind an IP.
7) OSINT & Recon Display
- Provide a condensed, security-oriented view of BGP results.
- Show relevant data (ASN info, contact emails, IP prefixes, peers, etc.) in Rich tables.
- Great for threat research, network mapping, or cross-referencing telecom operators.
8) Misc. Utilities
- Menu-based interface to walk through each feature step by step.
- Persistent “recent directories” for quick navigation.
- Color-coded console output powered by Rich.
In essence, this tool streamlines telecom OSINT from finding exposed IR.21/IR.85
documents online to extracting data, identifying IP addresses, performing ASN
analysis, and generating robust BGP-based intelligence. Its modular approach
lets you pick exactly what you want to do, whether it's table extraction, IP
analysis, or in-depth BGP recon.
"""
# Now print this explanation in chunks of 30 lines
chunked_print(explanation_text, lines_per_page=30)
def google_dork_telco_docs():
"""
Sub-menu to pick IR.21 or IR.85 for the Google Dork query.
"""
console.print("\n[+] Telco Document Google Dorking", style="header")
console.print("Which document type would you like to search for?")
console.print("1. IR.21")
console.print("2. IR.85")
choice = console.input("Enter your choice (1 or 2): ").strip()
if choice == '1':
query = "filetype:pdf IR.21"
console.print("\nPerforming Google Dork for IR.21 docs...", style="info")
elif choice == '2':
query = "filetype:pdf IR.85"
console.print("\nPerforming Google Dork for IR.85 docs...", style="info")
else:
console.print("[warning]Invalid choice. Returning to menu.[/warning]")
return
search_results = []
try:
from googlesearch import search
for result in search(query, num_results=10):
search_results.append(result)
except Exception as e:
console.print(f"[error]Error while performing Google search: {e}[/error]")
return
if not search_results:
console.print("[warning]No results found.[/warning]")
return
console.print("\n[+] Found the following PDF links:", style="header")
for idx, link in enumerate(search_results, 1):
console.print(f"{idx}. {link}")
while True:
try:
choice = int(console.input("\nEnter the number of the PDF you want to download (or 0 to cancel): ").strip())
if choice == 0:
return
elif 1 <= choice <= len(search_results):
selected_link = search_results[choice - 1]
download_pdf(selected_link)
return
else:
console.print("[error]Invalid selection. Please choose a valid number from the list.[/error]")
except ValueError:
console.print("[error]Invalid input. Please enter a number.[/error]")
def download_pdf(url):
save_dir = "./my_ir_docs"
os.makedirs(save_dir, exist_ok=True)
file_name = url.split("/")[-1]
file_path = os.path.join(save_dir, file_name)
console.print(f"\n[+] Downloading PDF from {url}...", style="info")
try:
response = requests.get(url, stream=True)
response.raise_for_status()
with open(file_path, 'wb') as file:
shutil.copyfileobj(response.raw, file)
console.print(f"[success]Downloaded PDF to: {file_path}[/success]")
except Exception as e:
console.print(f"[error]Failed to download PDF: {e}[/error]")
# ---------------------------------------------------------------------
# IP & CIDR Extraction (only) + ASN & BGP (separate)
# ---------------------------------------------------------------------
BGPVIEW_BASE_URL = "https://api.bgpview.io"
def extract_ips_and_cidrs_from_row(row):
extracted_ips = []
for cell in row:
cidr_matches = re.findall(r"\b\d{1,3}(?:\.\d{1,3}){3}/\d{1,2}\b", cell)
extracted_ips.extend(cidr_matches)
ip_matches = re.findall(r"\b\d{1,3}(?:\.\d{1,3}){3}\b", cell)
for ip in ip_matches:
try:
ip_address(ip)
extracted_ips.append(ip)
except ValueError:
continue
return extracted_ips
def expand_cidrs_to_ips(ips_and_cidrs):
expanded_ips = set()
for item in ips_and_cidrs:
try:
if '/' in item:
network = ip_network(item, strict=False)
expanded_ips.update(str(ip) for ip in network)
else:
expanded_ips.add(item)
except ValueError:
console.print(f"[warning]Invalid IP or range skipped: {item}[/warning]")
return sorted(expanded_ips)
def extract_ips_from_csv(directory):
all_ips_and_cidrs = []
csv_files = [f for f in os.listdir(directory) if f.endswith(".csv")]
if not csv_files:
console.print("[warning]No CSV files found in this folder.[/warning]")
return all_ips_and_cidrs
for filename in csv_files:
file_path = os.path.join(directory, filename)
console.print(f"Processing CSV: [info]{file_path}[/info]", style="info")
with open(file_path, newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
_ = next(reader, None) # skip header row if present
for row in reader:
extracted_ips = extract_ips_and_cidrs_from_row(row)
if extracted_ips:
all_ips_and_cidrs.extend(extracted_ips)
return all_ips_and_cidrs
def determine_cidrs(ips):
ip_objs = []
for ip_str in ips:
try:
ip_objs.append(ip_address(ip_str))
except ValueError:
pass
collapsed_nets = collapse_addresses(ip_objs)
return sorted(collapsed_nets, key=lambda n: (n.network_address, n.prefixlen))
def query_asn_for_cidr(ip_net):
representative_ip = str(ip_net.network_address)
try:
obj = IPWhois(representative_ip)
results = obj.lookup_rdap()
return {
"CIDR": str(ip_net),
"Representative IP": representative_ip,
"ASN": results.get("asn", "N/A"),
"ASN Description": results.get("asn_description", "N/A"),
"ASN Country": results.get("asn_country_code", "N/A")
}
except Exception as e:
return {
"CIDR": str(ip_net),
"Representative IP": representative_ip,
"ASN": None,
"ASN Description": f"Error: {e}",
"ASN Country": "N/A"
}
def query_bgpview_all(asn, sleep_time=1):
endpoints = [
f"/asn/AS{asn}",
f"/asn/AS{asn}/prefixes",
f"/asn/AS{asn}/peers",
f"/asn/AS{asn}/upstreams",
f"/asn/AS{asn}/downstreams",
f"/asn/AS{asn}/ixs",
]
results = {}
for endpoint in endpoints:
url = BGPVIEW_BASE_URL + endpoint
try:
resp = requests.get(url, timeout=10)
resp.raise_for_status()
data = resp.json()
key = endpoint.lstrip('/')
results[key] = data.get("data", {})
except requests.exceptions.RequestException as e:
results[endpoint] = {"error": str(e)}
time.sleep(sleep_time)
return results
def menu_extract_ips_only():
"""
Just the IP extraction from previously ingested CSVs (no ASN or BGP).
"""
print("\033c", end="")
title()
recent_dirs = read_recent_directories()
if recent_dirs:
console.print("Recent directories:", style="header")
for idx, dir_ in enumerate(recent_dirs, 1):
console.print(f"{idx}. {dir_}")
console.print("Enter the base directory where ingested folders are stored:")
base_dir_input = console.input("(You can select a recent directory by number, or enter a new path): ").strip()
if base_dir_input.isdigit():
selection = int(base_dir_input)
if 1 <= selection <= len(recent_dirs):
directory = recent_dirs[selection - 1]
else:
console.print("Invalid selection.", style="error")
return
else:
directory = parse_directory_input(base_dir_input)
if not os.path.isdir(directory):
console.print(f"[error]Invalid directory: {directory}[/error]")
return
write_recent_directory(directory)
folders = list_processed_folders(directory)
if not folders:
return
try:
choice = int(console.input("Select a folder number for IP extraction (0 to cancel): "))
except ValueError:
console.print("Invalid input.", style="error")
return
if choice == 0:
return
if 1 <= choice <= len(folders):
selected_folder = folders[choice - 1]
folder_path = os.path.join(directory, selected_folder)
console.print(f"\n[info]Extracting IPs/CIDRs from CSVs in {folder_path}[/info]")
all_ips_and_cidrs = extract_ips_from_csv(folder_path)
if not all_ips_and_cidrs:
console.print("[warning]No IPs or CIDRs found.[/warning]")
return
console.print("\nExpanding CIDRs into individual IPs...", style="info")
unique_ips = expand_cidrs_to_ips(all_ips_and_cidrs)
output_file_txt = os.path.join(folder_path, "unique_ips.txt")
with open(output_file_txt, "w") as outfile:
outfile.write("\n".join(unique_ips))
console.print(f"[success]Unique IPs saved to: {output_file_txt}[/success]")
else:
console.print("Invalid selection.", style="error")
def menu_bgp_lookup_for_ips():
"""
Separate function for ASN & BGP lookups from previously extracted IPs.
"""
print("\033c", end="")
title()
recent_dirs = read_recent_directories()
if recent_dirs:
console.print("Recent directories:", style="header")
for idx, dir_ in enumerate(recent_dirs, 1):
console.print(f"{idx}. {dir_}")
console.print("Enter the base directory where ingested folders are stored:")
base_dir_input = console.input("(You can select a recent directory by number, or enter a new path): ").strip()
if base_dir_input.isdigit():
selection = int(base_dir_input)
if 1 <= selection <= len(recent_dirs):
directory = recent_dirs[selection - 1]
else:
console.print("Invalid selection.", style="error")
return
else:
directory = parse_directory_input(base_dir_input)
if not os.path.isdir(directory):
console.print(f"[error]Invalid directory: {directory}[/error]")
return
write_recent_directory(directory)
# Step: pick a subfolder
folders = list_processed_folders(directory)
if not folders:
return
try:
choice = int(console.input("Select a folder to do ASN & BGP lookups (0 to cancel): "))
except ValueError:
console.print("Invalid input.", style="error")
return
if choice == 0:
return
if 1 <= choice <= len(folders):
selected_folder = folders[choice - 1]
folder_path = os.path.join(directory, selected_folder)
# 1) Load the previously extracted IPs
unique_ips_file = os.path.join(folder_path, "unique_ips.txt")
if not os.path.isfile(unique_ips_file):
console.print(f"[warning]No unique_ips.txt found in {folder_path}. Please extract IPs first.[/warning]")
return
with open(unique_ips_file, "r") as uf:
unique_ips = [line.strip() for line in uf if line.strip()]
if not unique_ips:
console.print("[warning]The unique_ips.txt file is empty.[/warning]")
return
# 2) Collapsing IPs -> minimal set of CIDRs
console.print("\nCollapsing IPs into minimal set of CIDRs...", style="info")
collapsed_nets = determine_cidrs(unique_ips)
# 3) Query ASN for each
asn_results = []
console.print("\nQuerying ASN info for each collapsed CIDR...", style="info")
for net in collapsed_nets:
asn_info = query_asn_for_cidr(net)
asn_results.append(asn_info)
console.print(
f"CIDR: {asn_info['CIDR']}, ASN: {asn_info['ASN']}, Desc: {asn_info['ASN Description']}"
)
output_file_json = os.path.join(folder_path, "asn_results.json")
with open(output_file_json, "w") as outfile:
json.dump(asn_results, outfile, indent=4)
console.print(f"[success]ASN results saved to: {output_file_json}[/success]")
# 4) Ask user if they want BGPView data
do_bgpview = console.input("\nWould you like to fetch all BGPView data for each ASN? (yes/no): ").strip().lower()
if do_bgpview in ["yes", "y"]:
asn_set = set()
for entry in asn_results:
asn_val = entry.get("ASN")
if asn_val and asn_val.lower() != "none":
asn_val = str(asn_val).replace("AS", "")
asn_set.add(asn_val)
console.print("\nQuerying BGPView for all sub-endpoints...\n", style="info")
bgp_extended_data = {}
for asn in sorted(asn_set):
console.print(f"Pulling all BGPView data for ASN {asn} ...", style="info")
bgp_data = query_bgpview_all(asn, sleep_time=1)
bgp_extended_data[asn] = bgp_data
all_endpoints_json = os.path.join(folder_path, "asn_all_endpoints.json")
with open(all_endpoints_json, "w") as af:
json.dump(bgp_extended_data, af, indent=4)
console.print(f"[success]Detailed BGPView info saved to: {all_endpoints_json}[/success]")
else:
console.print("[info]Skipping BGPView data retrieval.[/info]")
else:
console.print("Invalid selection.", style="error")
def menu_resolve_ips():
"""
Reads unique_ips.txt from a previously ingested folder,
tries to resolve each IP to a hostname via socket,
displays the results, and saves them to a file.
"""
print("\033c", end="")
title()
recent_dirs = read_recent_directories()
if recent_dirs:
console.print("Recent directories:", style="header")
for idx, dir_ in enumerate(recent_dirs, 1):
console.print(f"{idx}. {dir_}")
console.print("Enter the base directory where ingested folders are stored:")
base_dir_input = console.input("(You can select a recent directory by number, or enter a new path): ").strip()
if base_dir_input.isdigit():
selection = int(base_dir_input)
if 1 <= selection <= len(recent_dirs):
directory = recent_dirs[selection - 1]
else:
console.print("Invalid selection.", style="error")
return
else:
directory = parse_directory_input(base_dir_input)
if not os.path.isdir(directory):
console.print(f"[error]Invalid directory: {directory}[/error]")
return
write_recent_directory(directory)
# Let user pick subfolder
folders = list_processed_folders(directory)
if not folders:
return
try:
choice = int(console.input("Select a folder number for IP resolution (0 to cancel): "))
except ValueError:
console.print("Invalid input.", style="error")
return
if choice == 0:
return
if 1 <= choice <= len(folders):
selected_folder = folders[choice - 1]
folder_path = os.path.join(directory, selected_folder)
# unique_ips.txt must exist
unique_ips_file = os.path.join(folder_path, "unique_ips.txt")
if not os.path.isfile(unique_ips_file):
console.print(f"[warning]No unique_ips.txt found in {folder_path}. Please extract IPs first.[/warning]")
return
with open(unique_ips_file, "r") as uf:
ips = [line.strip() for line in uf if line.strip()]
if not ips:
console.print("[warning]No IPs found in unique_ips.txt.[/warning]")
return
# Resolve each IP to hostname
resolved_data = []
console.print(f"\nResolving {len(ips)} IPs...", style="info")
for ip_str in ips:
hostname = None
try:
hostname = socket.gethostbyaddr(ip_str)[0]
except socket.herror:
hostname = "No reverse DNS entry"
except Exception as e:
hostname = f"Error: {e}"
resolved_data.append({"IP": ip_str, "Hostname": hostname})
console.print(f"IP: {ip_str} -> {hostname}")
# Save to file
output_file = os.path.join(folder_path, "resolved_ips.txt")
with open(output_file, "w") as f:
for entry in resolved_data:
f.write(f"{entry['IP']},{entry['Hostname']}\n")
console.print(f"[success]Resolved IPs saved to: {output_file}[/success]")
else:
console.print("Invalid selection.", style="error")
# ---------------------------------------------------------------------
# NEW: Display BGPView All Endpoints Menu
# ---------------------------------------------------------------------
def menu_display_bgp_endpoints():
"""
Loads the asn_all_endpoints.json from a previously ingested folder
(where BGP data was saved). Displays it in console, or optionally,
show only a subset. This is a simple raw display for demonstration.
"""
print("\033c", end="")
title()
recent_dirs = read_recent_directories()
if recent_dirs:
console.print("Recent directories:", style="header")
for idx, dir_ in enumerate(recent_dirs, 1):
console.print(f"{idx}. {dir_}")
console.print("Enter the base directory where ingested folders are stored:")
base_dir_input = console.input("(You can select a recent directory by number, or enter a new path): ").strip()
if base_dir_input.isdigit():
selection = int(base_dir_input)
if 1 <= selection <= len(recent_dirs):
directory = recent_dirs[selection - 1]
else:
console.print("Invalid selection.", style="error")
return
else:
directory = parse_directory_input(base_dir_input)
if not os.path.isdir(directory):
console.print(f"[error]Invalid directory: {directory}[/error]")
return
write_recent_directory(directory)
# Let user pick subfolder
folders = list_processed_folders(directory)
if not folders:
return
try:
choice = int(console.input("Select a folder number to display BGP endpoints (0 to cancel): "))
except ValueError:
console.print("Invalid input.", style="error")
return
if choice == 0:
return
if 1 <= choice <= len(folders):
selected_folder = folders[choice - 1]
folder_path = os.path.join(directory, selected_folder)
# asn_all_endpoints.json must exist
bgp_file = os.path.join(folder_path, "asn_all_endpoints.json")
if not os.path.isfile(bgp_file):
console.print(f"[warning]No asn_all_endpoints.json found in {folder_path}. Please run BGP lookups first.[/warning]")
return
with open(bgp_file, "r") as bf:
bgp_data = json.load(bf)
# Display raw or partial
console.print("\n[info]Found BGP data for the following ASNs:[/info]", style="header")
for asn in bgp_data.keys():
console.print(f" ASN: {asn}")
console.print("\nEnter an ASN to see detailed data, or 'all' to display everything, or 0 to cancel.")
user_asn = console.input("ASN choice: ").strip().lower()
if user_asn == '0':
return
if user_asn == 'all':
console.print(json.dumps(bgp_data, indent=4), style="info")
else:
# If the user typed '12345' or 'AS12345'
user_asn_str = user_asn.replace("as", "").strip()
if user_asn_str in bgp_data:
console.print(json.dumps(bgp_data[user_asn_str], indent=4), style="info")
else:
console.print(f"[warning]ASN {user_asn_str} not found in asn_all_endpoints.json[/warning]")
else:
console.print("Invalid selection.", style="error")
def display_osint_recon(bgp_data):
"""
Displays interesting OSINT data from the BGP queries in a concise manner.
Loops through each ASN in the JSON, gathers:
- Basic ASN info
- RIR/whois info
- IPv4/IPv6 prefixes
- Peers, Upstreams, Downstreams
- IXs, etc.
Also handles the case where asn/AS####/ixs might be a dict or a list.
"""
for asn_str, endpoints_dict in bgp_data.items():
console.rule(title=f"[bold magenta]ASN {asn_str}[/bold magenta]", style="header")
# base_key: e.g. "asn/AS197465" or "asn/AS55083"
base_key = f"asn/AS{asn_str}"
# -------------------------
# 1) Basic ASN Info
# -------------------------
if base_key in endpoints_dict:
_display_asn_basic_info(endpoints_dict[base_key])
else:
console.print(f"[warning]No base ASN info found for ASN {asn_str}[/warning]")
# -------------------------
# 2) Prefixes
# -------------------------
prefixes_key = base_key + "/prefixes"
if prefixes_key in endpoints_dict:
_display_asn_prefixes(endpoints_dict[prefixes_key])
else:
console.print(f"[warning]No prefix info found for ASN {asn_str}[/warning]")