diff --git a/database/.pylintrc b/database/.pylintrc new file mode 100644 index 0000000..63cde5b --- /dev/null +++ b/database/.pylintrc @@ -0,0 +1,647 @@ +[MAIN] + +# Analyse import fallback blocks. This can be used to support both Python 2 and +# 3 compatible code, which means that the block might have code that exists +# only in one or another interpreter, leading to false positives when analysed. +analyse-fallback-blocks=no + +# Clear in-memory caches upon conclusion of linting. Useful if running pylint +# in a server-like mode. +clear-cache-post-run=no + +# Load and enable all available extensions. Use --list-extensions to see a list +# all available extensions. +#enable-all-extensions= + +# In error mode, messages with a category besides ERROR or FATAL are +# suppressed, and no reports are done by default. Error mode is compatible with +# disabling specific errors. +#errors-only= + +# Always return a 0 (non-error) status code, even if lint errors are found. +# This is primarily useful in continuous integration scripts. +#exit-zero= + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code. +extension-pkg-allow-list=pycurl + +# A comma-separated list of package or module names from where C extensions may +# be loaded. Extensions are loading into the active Python interpreter and may +# run arbitrary code. (This is an alternative name to extension-pkg-allow-list +# for backward compatibility.) +extension-pkg-whitelist= + +# Return non-zero exit code if any of these messages/categories are detected, +# even if score is above --fail-under value. Syntax same as enable. Messages +# specified are enabled, while categories only check already-enabled messages. +fail-on= + +# Specify a score threshold under which the program will exit with error. +fail-under=10 + +# Interpret the stdin as a python script, whose filename needs to be passed as +# the module_or_package argument. +#from-stdin= + +# Files or directories to be skipped. They should be base names, not paths. +ignore=CVS + +# Add files or directories matching the regular expressions patterns to the +# ignore-list. The regex matches against paths and can be in Posix or Windows +# format. Because '\\' represents the directory delimiter on Windows systems, +# it can't be used as an escape character. +ignore-paths= + +# Files or directories matching the regular expression patterns are skipped. +# The regex matches against base names, not paths. The default value ignores +# Emacs file locks +ignore-patterns=^\.# + +# List of module names for which member attributes should not be checked and +# will not be imported (useful for modules/projects where namespaces are +# manipulated during runtime and thus existing member attributes cannot be +# deduced by static analysis). It supports qualified module names, as well as +# Unix pattern matching. +ignored-modules= + +# Python code to execute, usually for sys.path manipulation such as +# pygtk.require(). +#init-hook= + +# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the +# number of processors available to use, and will cap the count on Windows to +# avoid hangs. +jobs=1 + +# Control the amount of potential inferred values when inferring a single +# object. This can help the performance when dealing with large functions or +# complex, nested conditions. +limit-inference-results=100 + +# List of plugins (as comma separated values of python module names) to load, +# usually to register additional checkers. +load-plugins= + +# Pickle collected data for later comparisons. +persistent=yes + +# Resolve imports to .pyi stubs if available. May reduce no-member messages and +# increase not-an-iterable messages. +prefer-stubs=no + +# Minimum Python version to use for version dependent checks. Will default to +# the version used to run pylint. +py-version=3.11 + +# Discover python modules and packages in the file system subtree. +recursive=no + +# Add paths to the list of the source roots. Supports globbing patterns. The +# source root is an absolute path or a path relative to the current working +# directory used to determine a package namespace for modules located under the +# source root. +source-roots= + +# When enabled, pylint would attempt to guess common misconfiguration and emit +# user-friendly hints instead of false-positive error messages. +suggestion-mode=yes + +# Allow loading of arbitrary C extensions. Extensions are imported into the +# active Python interpreter and may run arbitrary code. +unsafe-load-any-extension=no + +# In verbose mode, extra non-checker-related info will be displayed. +#verbose= + + +[BASIC] + +# Naming style matching correct argument names. +argument-naming-style=snake_case + +# Regular expression matching correct argument names. Overrides argument- +# naming-style. If left empty, argument names will be checked with the set +# naming style. +#argument-rgx= + +# Naming style matching correct attribute names. +attr-naming-style=snake_case + +# Regular expression matching correct attribute names. Overrides attr-naming- +# style. If left empty, attribute names will be checked with the set naming +# style. +#attr-rgx= + +# Bad variable names which should always be refused, separated by a comma. +bad-names=foo, + bar, + baz, + toto, + tutu, + tata + +# Bad variable names regexes, separated by a comma. If names match any regex, +# they will always be refused +bad-names-rgxs= + +# Naming style matching correct class attribute names. +class-attribute-naming-style=any + +# Regular expression matching correct class attribute names. Overrides class- +# attribute-naming-style. If left empty, class attribute names will be checked +# with the set naming style. +#class-attribute-rgx= + +# Naming style matching correct class constant names. +class-const-naming-style=UPPER_CASE + +# Regular expression matching correct class constant names. Overrides class- +# const-naming-style. If left empty, class constant names will be checked with +# the set naming style. +#class-const-rgx= + +# Naming style matching correct class names. +class-naming-style=PascalCase + +# Regular expression matching correct class names. Overrides class-naming- +# style. If left empty, class names will be checked with the set naming style. +#class-rgx= + +# Naming style matching correct constant names. +const-naming-style=UPPER_CASE + +# Regular expression matching correct constant names. Overrides const-naming- +# style. If left empty, constant names will be checked with the set naming +# style. +#const-rgx= + +# Minimum line length for functions/classes that require docstrings, shorter +# ones are exempt. +docstring-min-length=-1 + +# Naming style matching correct function names. +function-naming-style=snake_case + +# Regular expression matching correct function names. Overrides function- +# naming-style. If left empty, function names will be checked with the set +# naming style. +#function-rgx= + +# Good variable names which should always be accepted, separated by a comma. +good-names=i, + j, + k, + ex, + Run, + _ + +# Good variable names regexes, separated by a comma. If names match any regex, +# they will always be accepted +good-names-rgxs= + +# Include a hint for the correct naming format with invalid-name. +include-naming-hint=no + +# Naming style matching correct inline iteration names. +inlinevar-naming-style=any + +# Regular expression matching correct inline iteration names. Overrides +# inlinevar-naming-style. If left empty, inline iteration names will be checked +# with the set naming style. +#inlinevar-rgx= + +# Naming style matching correct method names. +method-naming-style=snake_case + +# Regular expression matching correct method names. Overrides method-naming- +# style. If left empty, method names will be checked with the set naming style. +#method-rgx= + +# Naming style matching correct module names. +module-naming-style=snake_case + +# Regular expression matching correct module names. Overrides module-naming- +# style. If left empty, module names will be checked with the set naming style. +#module-rgx= + +# Colon-delimited sets of names that determine each other's naming style when +# the name regexes allow several styles. +name-group= + +# Regular expression which should only match function or class names that do +# not require a docstring. +no-docstring-rgx=^_ + +# List of decorators that produce properties, such as abc.abstractproperty. Add +# to this list to register other decorators that produce valid properties. +# These decorators are taken in consideration only for invalid-name. +property-classes=abc.abstractproperty + +# Regular expression matching correct type alias names. If left empty, type +# alias names will be checked with the set naming style. +#typealias-rgx= + +# Regular expression matching correct type variable names. If left empty, type +# variable names will be checked with the set naming style. +#typevar-rgx= + +# Naming style matching correct variable names. +variable-naming-style=snake_case + +# Regular expression matching correct variable names. Overrides variable- +# naming-style. If left empty, variable names will be checked with the set +# naming style. +#variable-rgx= + + +[CLASSES] + +# Warn about protected attribute access inside special methods +check-protected-access-in-special-methods=no + +# List of method names used to declare (i.e. assign) instance attributes. +defining-attr-methods=__init__, + __new__, + setUp, + asyncSetUp, + __post_init__ + +# List of member names, which should be excluded from the protected access +# warning. +exclude-protected=_asdict,_fields,_replace,_source,_make,os._exit + +# List of valid names for the first argument in a class method. +valid-classmethod-first-arg=cls + +# List of valid names for the first argument in a metaclass class method. +valid-metaclass-classmethod-first-arg=mcs + + +[DESIGN] + +# List of regular expressions of class ancestor names to ignore when counting +# public methods (see R0903) +exclude-too-few-public-methods= + +# List of qualified class names to ignore when counting class parents (see +# R0901) +ignored-parents= + +# Maximum number of arguments for function / method. +max-args=5 + +# Maximum number of attributes for a class (see R0902). +max-attributes=7 + +# Maximum number of boolean expressions in an if statement (see R0916). +max-bool-expr=5 + +# Maximum number of branch for function / method body. +max-branches=12 + +# Maximum number of locals for function / method body. +max-locals=15 + +# Maximum number of parents for a class (see R0901). +max-parents=7 + +# Maximum number of positional arguments for function / method. +max-positional-arguments=5 + +# Maximum number of public methods for a class (see R0904). +max-public-methods=20 + +# Maximum number of return / yield for function / method body. +max-returns=6 + +# Maximum number of statements in function / method body. +max-statements=50 + +# Minimum number of public methods for a class (see R0903). +min-public-methods=2 + + +[EXCEPTIONS] + +# Exceptions that will emit a warning when caught. +overgeneral-exceptions=builtins.BaseException,builtins.Exception + + +[FORMAT] + +# Expected format of line ending, e.g. empty (any line ending), LF or CRLF. +expected-line-ending-format= + +# Regexp for a line that is allowed to be longer than the limit. +ignore-long-lines=^\s*(# )??$ + +# Number of spaces of indent required inside a hanging or continued line. +indent-after-paren=4 + +# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1 +# tab). +indent-string=' ' + +# Maximum number of characters on a single line. +max-line-length=100 + +# Maximum number of lines in a module. +max-module-lines=1000 + +# Allow the body of a class to be on the same line as the declaration if body +# contains single statement. +single-line-class-stmt=no + +# Allow the body of an if to be on the same line as the test if there is no +# else. +single-line-if-stmt=no + + +[IMPORTS] + +# List of modules that can be imported at any level, not just the top level +# one. +allow-any-import-level= + +# Allow explicit reexports by alias from a package __init__. +allow-reexport-from-package=no + +# Allow wildcard imports from modules that define __all__. +allow-wildcard-with-all=no + +# Deprecated modules which should not be used, separated by a comma. +deprecated-modules= + +# Output a graph (.gv or any supported image format) of external dependencies +# to the given file (report RP0402 must not be disabled). +ext-import-graph= + +# Output a graph (.gv or any supported image format) of all (i.e. internal and +# external) dependencies to the given file (report RP0402 must not be +# disabled). +import-graph= + +# Output a graph (.gv or any supported image format) of internal dependencies +# to the given file (report RP0402 must not be disabled). +int-import-graph= + +# Force import order to recognize a module as part of the standard +# compatibility libraries. +known-standard-library= + +# Force import order to recognize a module as part of a third party library. +known-third-party=enchant + +# Couples of modules and preferred modules, separated by a comma. +preferred-modules= + + +[LOGGING] + +# The type of string formatting that logging methods do. `old` means using % +# formatting, `new` is for `{}` formatting. +logging-format-style=old + +# Logging modules to check that the string format arguments are in logging +# function parameter format. +logging-modules=logging + + +[MESSAGES CONTROL] + +# Only show warnings with the listed confidence levels. Leave empty to show +# all. Valid levels: HIGH, CONTROL_FLOW, INFERENCE, INFERENCE_FAILURE, +# UNDEFINED. +confidence=HIGH, + CONTROL_FLOW, + INFERENCE, + INFERENCE_FAILURE, + UNDEFINED + +# Disable the message, report, category or checker with the given id(s). You +# can either give multiple identifiers separated by comma (,) or put this +# option multiple times (only on the command line, not in the configuration +# file where it should appear only once). You can also use "--disable=all" to +# disable everything first and then re-enable specific checks. For example, if +# you want to run only the similarities checker, you can use "--disable=all +# --enable=similarities". If you want to run only the classes checker, but have +# no Warning level messages displayed, use "--disable=all --enable=classes +# --disable=W". +disable=raw-checker-failed, + bad-inline-option, + locally-disabled, + file-ignored, + suppressed-message, + useless-suppression, + deprecated-pragma, + use-symbolic-message-instead, + use-implicit-booleaness-not-comparison-to-string, + use-implicit-booleaness-not-comparison-to-zero + +# Enable the message, report, category or checker with the given id(s). You can +# either give multiple identifier separated by comma (,) or put this option +# multiple time (only on the command line, not in the configuration file where +# it should appear only once). See also the "--disable" option for examples. +enable= + + +[METHOD_ARGS] + +# List of qualified names (i.e., library.method) which require a timeout +# parameter e.g. 'requests.api.get,requests.api.post' +timeout-methods=requests.api.delete,requests.api.get,requests.api.head,requests.api.options,requests.api.patch,requests.api.post,requests.api.put,requests.api.request + + +[MISCELLANEOUS] + +# List of note tags to take in consideration, separated by a comma. +notes=FIXME, + XXX, + TODO + +# Regular expression of note tags to take in consideration. +notes-rgx= + + +[REFACTORING] + +# Maximum number of nested blocks for function / method body +max-nested-blocks=5 + +# Complete name of functions that never returns. When checking for +# inconsistent-return-statements if a never returning function is called then +# it will be considered as an explicit return statement and no message will be +# printed. +never-returning-functions=sys.exit,argparse.parse_error + +# Let 'consider-using-join' be raised when the separator to join on would be +# non-empty (resulting in expected fixes of the type: ``"- " + " - +# ".join(items)``) +suggest-join-with-non-empty-separator=yes + + +[REPORTS] + +# Python expression which should return a score less than or equal to 10. You +# have access to the variables 'fatal', 'error', 'warning', 'refactor', +# 'convention', and 'info' which contain the number of messages in each +# category, as well as 'statement' which is the total number of statements +# analyzed. This score is used by the global evaluation report (RP0004). +evaluation=max(0, 0 if fatal else 10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)) + +# Template used to display messages. This is a python new-style format string +# used to format the message information. See doc for all details. +msg-template= + +# Set the output format. Available formats are: text, parseable, colorized, +# json2 (improved json format), json (old json format) and msvs (visual +# studio). You can also give a reporter class, e.g. +# mypackage.mymodule.MyReporterClass. +#output-format= + +# Tells whether to display a full report or only the messages. +reports=no + +# Activate the evaluation score. +score=yes + + +[SIMILARITIES] + +# Comments are removed from the similarity computation +ignore-comments=yes + +# Docstrings are removed from the similarity computation +ignore-docstrings=yes + +# Imports are removed from the similarity computation +ignore-imports=yes + +# Signatures are removed from the similarity computation +ignore-signatures=yes + +# Minimum lines number of a similarity. +min-similarity-lines=4 + + +[SPELLING] + +# Limits count of emitted suggestions for spelling mistakes. +max-spelling-suggestions=4 + +# Spelling dictionary name. No available dictionaries : You need to install +# both the python package and the system dependency for enchant to work. +spelling-dict= + +# List of comma separated words that should be considered directives if they +# appear at the beginning of a comment and should not be checked. +spelling-ignore-comment-directives=fmt: on,fmt: off,noqa:,noqa,nosec,isort:skip,mypy: + +# List of comma separated words that should not be checked. +spelling-ignore-words= + +# A path to a file that contains the private dictionary; one word per line. +spelling-private-dict-file= + +# Tells whether to store unknown words to the private dictionary (see the +# --spelling-private-dict-file option) instead of raising a message. +spelling-store-unknown-words=no + + +[STRING] + +# This flag controls whether inconsistent-quotes generates a warning when the +# character used as a quote delimiter is used inconsistently within a module. +check-quote-consistency=no + +# This flag controls whether the implicit-str-concat should generate a warning +# on implicit string concatenation in sequences defined over several lines. +check-str-concat-over-line-jumps=no + + +[TYPECHECK] + +# List of decorators that produce context managers, such as +# contextlib.contextmanager. Add to this list to register other decorators that +# produce valid context managers. +contextmanager-decorators=contextlib.contextmanager + +# List of members which are set dynamically and missed by pylint inference +# system, and so shouldn't trigger E1101 when accessed. Python regular +# expressions are accepted. +generated-members= + +# Tells whether to warn about missing members when the owner of the attribute +# is inferred to be None. +ignore-none=yes + +# This flag controls whether pylint should warn about no-member and similar +# checks whenever an opaque object is returned when inferring. The inference +# can return multiple potential results while evaluating a Python object, but +# some branches might not be evaluated, which results in partial inference. In +# that case, it might be useful to still emit no-member and other checks for +# the rest of the inferred objects. +ignore-on-opaque-inference=yes + +# List of symbolic message names to ignore for Mixin members. +ignored-checks-for-mixins=no-member, + not-async-context-manager, + not-context-manager, + attribute-defined-outside-init + +# List of class names for which member attributes should not be checked (useful +# for classes with dynamically set attributes). This supports the use of +# qualified names. +ignored-classes=optparse.Values,thread._local,_thread._local,argparse.Namespace + +# Show a hint with possible names when a member name was not found. The aspect +# of finding the hint is based on edit distance. +missing-member-hint=yes + +# The minimum edit distance a name should have in order to be considered a +# similar match for a missing member name. +missing-member-hint-distance=1 + +# The total number of similar names that should be taken in consideration when +# showing a hint for a missing member. +missing-member-max-choices=1 + +# Regex pattern to define which classes are considered mixins. +mixin-class-rgx=.*[Mm]ixin + +# List of decorators that change the signature of a decorated function. +signature-mutators= + + +[VARIABLES] + +# List of additional names supposed to be defined in builtins. Remember that +# you should avoid defining new builtins when possible. +additional-builtins= + +# Tells whether unused global variables should be treated as a violation. +allow-global-unused-variables=yes + +# List of names allowed to shadow builtins +allowed-redefined-builtins= + +# List of strings which can identify a callback function by name. A callback +# name must start or end with one of those strings. +callbacks=cb_, + _cb + +# A regular expression matching the name of dummy variables (i.e. expected to +# not be used). +dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_ + +# Argument names that match this expression will be ignored. +ignored-argument-names=_.*|^ignored_|^unused_ + +# Tells whether we should check for unused import in __init__ files. +init-import=no + +# List of qualified module names which can have objects that can redefine +# builtins. +redefining-builtins-modules=six.moves,past.builtins,future.builtins,builtins,io diff --git a/database/index_data.py b/database/data_factorie.py similarity index 83% rename from database/index_data.py rename to database/data_factorie.py index fd7157a..8e9e89d 100644 --- a/database/index_data.py +++ b/database/data_factorie.py @@ -1,6 +1,4 @@ """Index dictionary data definitions""" -import os -import sys def make_trle_page_data(): """trle.net page, represents the same data as a page on the site""" @@ -51,3 +49,16 @@ def make_trcustoms_level_data(): "cover_md5sum": "", } + +def make_zip_file(): + """trcustoms.org or TRLE zipfile""" + return { + "name" :"", + "size" :"", + "md5" :"", + "url" :"", + "release" : "", + "version" : "" + } + + diff --git a/database/get_leaf_cert.py b/database/get_leaf_cert.py index 5e02d09..49157f2 100644 --- a/database/get_leaf_cert.py +++ b/database/get_leaf_cert.py @@ -8,6 +8,7 @@ import ssl import socket from cryptography import x509 +from cryptography.x509 import Certificate from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes, serialization @@ -62,5 +63,71 @@ def run(url): else: sys.exit(1) certificate = get_certificate(host) + if not certificate: + sys.exit(1) + print_certificate_details(certificate) - return certificate # this is bytes data type + if not isinstance(certificate, Certificate): + sys.exit(1) + + return certificate.public_bytes(encoding=serialization.Encoding.PEM) + +''' +def validate_downloaded_key(id_number, expected_serial): + """Validate the certificate in binary form with the cryptography module""" + pem_key = get_response(f"https://crt.sh/?d={id_number}", 'application/pkix-cert') + + if not isinstance(pem_key, bytes): + logging.error("Data type error, expected bytes got %s", type(pem_key)) + sys.exit(1) + + # Load the certificate + certificate = x509.load_pem_x509_certificate(pem_key, default_backend()) + + # Extract the serial number and convert it to hex (without leading '0x') + hex_serial = f'{certificate.serial_number:x}' + + # Compare the serial numbers + if hex_serial == expected_serial: + print("The downloaded PEM key matches the expected serial number.") + else: + logging.error("Serial mismatch! Expected: %s, but got: %s", expected_serial, hex_serial) + sys.exit(1) + + # Extract and validate the domain (Common Name) + valid_domains = ["trle.net", "trcustoms.org", "data.trcustoms.org", "staging.trcustoms.org"] + + # Check the Common Name (CN) in the certificate subject + comon_name = certificate.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0].value + if comon_name in valid_domains: + print(f"Valid domain found in CN: {comon_name}") + else: + logging.error("Invalid domain in CN: %s", comon_name) + sys.exit(1) + + # Extract the Subject Alternative Name (SAN) extension + try: + san_extension = certificate.extensions \ + .get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME) + + # Extract all DNS names listed in the SAN extension + dns_names = san_extension.value.get_values_for_type(x509.DNSName) # type: ignore + + print(f"DNS Names in SAN: {dns_names}") + + # Check if any of the DNS names match the valid domain list + valid_domains = ["trle.net", "www.trle.net", "trcustoms.org", "*.trcustoms.org", + "data.trcustoms.org", 'staging.trcustoms.org'] + + if all(domain in valid_domains for domain in dns_names): + print(f"Valid domain found in SAN: {dns_names}") + else: + print(f"Invalid domain in SAN: {dns_names}") + sys.exit(1) + + except x509.ExtensionNotFound: + print("No Subject Alternative Name (SAN) extension found in the certificate.") + + pem_data = certificate.public_bytes(encoding=serialization.Encoding.PEM) + return pem_data.decode('utf-8') +''' diff --git a/database/https.py b/database/https.py new file mode 100644 index 0000000..778912c --- /dev/null +++ b/database/https.py @@ -0,0 +1,417 @@ +"""Get a request response for https only with curl""" +import os +import sys +import time +import json +import socket +import logging +import tempfile +import hashlib +from urllib.parse import urlparse +from io import BytesIO +import pycurl +from tqdm import tqdm + +import get_leaf_cert +import data_factorie +class AcquireLock: + """ + Create a TCP socket to ensure a single instance. + + This class creates a TCP socket that binds to a specific port. If an instance + of this class is already running, it will log an error and exit. + + Example usage: + + ```python + lock = AcquireLock() + try: + # Your application logic here + pass # Replace with actual code + finally: + lock.release_lock() # Ensure the lock is released when done + ``` + + Attributes: + lock (socket.socket): The TCP socket used to enforce the single instance. + To protect the server from user error. + """ + def __init__(self): + # Create a TCP socket + self.lock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + # Bind to localhost and a specific port + self.lock.bind(('127.0.0.1', 55234)) + except socket.error: + logging.error("Another instance is already running") + sys.exit(1) + + def release_lock(self): + """Release the socket and close the connection.""" + if self.lock: + self.lock.close() + self.lock = None + + def is_locked(self): + """Check if the lock is active.""" + return self.lock is not None + + +class RequestHandler: + """Handle HTTPS requests with retry and certificate handling.""" + + def __init__(self): + self.misconfigured_server = False + self.leaf_cert = None + + + def validate_url(self, url): + """Limit to used domains.""" + allowed_domains = ( + "https://www.trle.net/", + "https://trcustoms.org/", + "https://data.trcustoms.org/" + ) + + if any(url.startswith(domain) for domain in allowed_domains): + return # URL is valid + + logging.error("Invalid URL domain: %s", url) + sys.exit(1) + + + def validate_data_type(self, content_type): + """Limit to used data types.""" + valid_content_types = { + 'application/json', + 'application/pkix-cert', + 'application/zip', + 'image/jpeg', + 'image/png', + 'text/html' + } + + if content_type not in valid_content_types: + logging.error("Invalid content type: %s", content_type) + sys.exit(1) + + + def set_leaf(self, curl): + """Write the certificate to a temporary file manually""" + if self.leaf_cert is None: + raise ValueError("Leaf certificate is None and cannot be written to a file.") + + # Use 'with' to ensure the temporary file is closed properly + with tempfile.NamedTemporaryFile(delete=False) as temp_cert_file: + temp_cert_file.write(self.leaf_cert) # self.leaf_cert must be of type 'bytes' + temp_cert_file.flush() # Flush to ensure data is written to disk + temp_cert_path = temp_cert_file.name # Get the name of the temporary file + + # Set CAINFO to use the temporary certificate file + curl.setopt(pycurl.CAINFO, temp_cert_path) + return temp_cert_path + + + def get_leaf(self, url): + if not self.misconfigured_server: + self.leaf_cert = get_leaf_cert.run(url) + if not isinstance(self.leaf_cert, bytes): + sys.exit(1) + + if self.leaf_cert: + self.misconfigured_server = True + logging.info("Leaf certificate retrieved and applied.") + else: + logging.error("Failed to retrieve leaf certificate. Exiting.") + sys.exit(1) + + + def get_response(self, url, content_type): + """Handle all https requests""" + self.validate_url(url) + self.validate_data_type(content_type) + + if url.startswith("https://www.trle.net/") and not self.misconfigured_server: + self.get_leaf(url) + + if content_type == 'application/zip': + return DOWNLOADER.download_file(url) + + max_retries = 3 + retries = 0 + curl = None + headers = None + response_buffer = None + temp_cert_path = None + + while retries < max_retries: + try: + response_buffer = BytesIO() + headers_buffer = BytesIO() + curl = pycurl.Curl() # pylint: disable=no-member + curl.setopt(pycurl.URL, url) + curl.setopt(pycurl.WRITEDATA, response_buffer) + curl.setopt(pycurl.WRITEHEADER, headers_buffer) + + if self.misconfigured_server: + if not self.leaf_cert: + sys.exit(1) + temp_cert_path = self.set_leaf(curl) + + headers_list = [ + 'User-Agent: Wget/1.21.1 (linux-gnu)', + 'Accept: */*', + ] + curl.setopt(pycurl.HTTPHEADER, headers_list) + curl.perform() + + response_code = curl.getinfo(pycurl.RESPONSE_CODE) + + if response_code != 200: + retries += 1 + time.sleep(3) + logging.warning("Retrying... Response code: %s", response_code) + curl.close() + continue + + headers = headers_buffer.getvalue().decode('utf-8') + break + + except pycurl.error as curl_error: + #if curl_error.args[0] == 60: # SSL certificate error + logging.error("Request failed: %s", curl_error) + retries += 1 + if retries >= max_retries: + logging.error("Max retries reached. Exiting.") + sys.exit(1) + + finally: + if temp_cert_path and os.path.exists(temp_cert_path): + os.remove(temp_cert_path) + + if curl is None: + logging.error("No curl instance") + sys.exit(1) + + if headers is None: + logging.error("No headers received") + sys.exit(1) + + if response_buffer is None: + logging.error("No response received") + sys.exit(1) + + # Extract Content-Type from the headers + response_content_type = self.extract_content_type(headers) + if response_content_type == content_type: + response = self.pack_response_buffer(content_type, response_buffer) + curl.close() + return response + logging.error("Unexpected content type: %s, expected %s", \ + response_content_type, content_type) + sys.exit(1) + + + def pack_response_buffer(self, content_type, response_buffer): + """Validate and return the response based on content type""" + if content_type == 'text/html': + return response_buffer.getvalue().decode('utf-8') + if content_type == 'application/json': + return json.loads(response_buffer.getvalue().decode('utf-8')) + if content_type in ['image/jpeg', 'image/png']: + return response_buffer.getvalue() + if content_type == 'application/pkix-cert': + return response_buffer.getvalue() + logging.error("Unsupported content type: %s", content_type) + return None + + + def extract_content_type(self, headers): + """Read the header lines to look for content-type""" + + for header in headers.splitlines(): + if header.lower().startswith('content-type:'): + return header.split(':', 1)[1].split(';')[0].strip() + logging.error("Could not extract content type from header: %s", headers) + return None + +class Downloader: + def __init__(self): + self.buffer = BytesIO() + self.status = 0 + self.progress_bar = None + + def write_callback(self, data): + """Callback function for writing downloaded data.""" + self.buffer.write(data) + return len(data) + + def progress_callback(self, total_to_download, downloaded, total_to_upload, uploaded): + """Callback function for reporting download progress. + + Args: + total_to_download (int): Total size of the file to download. + downloaded (int): Number of bytes downloaded so far. + total_to_upload (int): Total size of the file to upload (not used). + uploaded (int): Number of bytes uploaded (not used). + """ + _ = uploaded # Explicitly ignore 'uploaded' + _ = total_to_upload # Explicitly ignore 'total_to_upload' + if total_to_download > 0: + if self.progress_bar is None: + # Initialize the progress bar if it's not set + self.progress_bar= tqdm(total=total_to_download, + unit='B', + unit_scale=True, + unit_divisor=1024, + desc="Downloading") + self.progress_bar.update(downloaded - self.progress_bar.n) # Update the progress bar + self.progress_bar.total = total_to_download + return 0 # Returning 0 means to continue + + def download_file(self, url): + """ + Downloads a file from the specified URL and stores its contents in a buffer. + + This method utilizes the `pycurl` library to perform the download, providing + a progress bar for user feedback. It handles server misconfigurations, + follows redirects, and calculates the MD5 checksum of the downloaded file. + + Parameters: + ---------- + url : str + The URL of the file to download. Must be a valid URL. + + Raises: + ------- + SystemExit + Exits the program if the server is misconfigured and no leaf certificate is available. + + Exceptions: + ------------ + pycurl.error + Raised if an error occurs during the download process. + + Returns: + -------- + dict + Returns a dictionary containing details of the downloaded file, including: + - 'size': Size of the file in MiB (mebibytes). + - 'url': The effective URL from which the file was downloaded. + - 'name': The name of the file. + - 'md5': The MD5 checksum of the downloaded content. + + Notes: + ------ + - The progress bar is displayed using the `tqdm` library to indicate the download status. + - The method checks the HTTP response code after the download to ensure success (HTTP 200). + - Temporary files created for certificate handling are cleaned up after the download. + """ + curl = pycurl.Curl() + temp_cert_path = None + zip_file = data_factorie.make_zip_file() # Initialize the zip_file dictionary + + try: + # Get file size for the progress bar + curl.setopt(pycurl.NOBODY, True) # Disable body output for this request + curl.setopt(pycurl.URL, url) + curl.setopt(pycurl.FOLLOWLOCATION, True) # Follow redirects + + if REQUEST_HANDLER.misconfigured_server: + if not REQUEST_HANDLER.leaf_cert: + sys.exit(1) + temp_cert_path = REQUEST_HANDLER.set_leaf(curl) + + curl.perform() # Header only + + # Get header info + total_size = curl.getinfo(pycurl.CONTENT_LENGTH_DOWNLOAD) + zip_file['size'] = total_size / (1024 * 1024) # Size in MiB + zip_file['url'] = curl.getinfo(pycurl.EFFECTIVE_URL) + zip_file['name'] = os.path.basename(urlparse(zip_file['url']).path) + + # Main buffer + curl.setopt(pycurl.NOBODY, False) # Re-enable body output + curl.setopt(pycurl.WRITEFUNCTION, self.write_callback) + curl.setopt(pycurl.WRITEDATA, self.buffer) + + # Enable progress meter + self.progress_bar= tqdm(total=total_size, + unit='B', + unit_scale=True, + unit_divisor=1024, + desc="Downloading") + curl.setopt(pycurl.NOPROGRESS, False) + curl.setopt(pycurl.XFERINFOFUNCTION, self.progress_callback) + + # Perform the download + curl.perform() + + # Check for errors + http_code = curl.getinfo(pycurl.RESPONSE_CODE) + if http_code != 200: + self.status = 1 + print(f"Error: HTTP response code {http_code}") + return {} # Return an empty dict on error + + self.status = 0 + print("Downloaded successfully.") + + # Finalize MD5 checksum + md5_hash = hashlib.md5(usedforsecurity=False) + self.buffer.seek(0) # Reset buffer pointer + md5_hash.update(self.buffer.getvalue()) + zip_file['md5'] = md5_hash.hexdigest() + + except pycurl.error as e: + self.status = 1 + print(f"Error: {e}") + return {} # Return an empty dict on error + + finally: + if self.progress_bar: + self.progress_bar.close() # Close the progress bar + curl.close() + if temp_cert_path: + if os.path.exists(temp_cert_path): + os.remove(temp_cert_path) + + return zip_file # Always return the zip_file dictionary + + +REQUEST_HANDLER = RequestHandler() +ACQUIRE_LOCK = AcquireLock() +DOWNLOADER = Downloader() + +def get(url, content_type): + """ + Get server response from TRLE or Trcustom hosts + + content_type: + 'application/json' + 'application/pkix-cert' + 'application/zip' + 'image/jpeg' + 'image/png' + 'text/html' + + url must start with: + "https://www.trle.net/" + "https://trcustoms.org/" + "https://data.trcustoms.org/" + """ + return REQUEST_HANDLER.get_response(url, content_type) + + +def release_lock(): + """Release lock for this instance""" + ACQUIRE_LOCK.release_lock() + + +def is_locked(): + """Lock this instance""" + ACQUIRE_LOCK.is_locked() + + +#if __name__ == '__main__': +# print(get("https://www.trle.net/scadm/trle_dl.php?lid=3667", 'application/zip')) diff --git a/database/ideas.txt b/database/ideas.txt index f9f2628..708315b 100644 --- a/database/ideas.txt +++ b/database/ideas.txt @@ -167,6 +167,67 @@ int main() { outFile.close(); return 0; } +### Python ### +this could help downloadin it to a file instead of using memory with nested functions +not sure I have understod this, kind of. + +write_callback + ├── Takes: file_obj + └── Returns: _write (which has access to file_obj due to closure) + └── Uses file_obj.write(data) within the PycURL callback context + + +class Downloader: + def __init__(self): + self.progress_bar = None + + def write_callback(self, file_obj): + """Callback function to write data directly to a file.""" + def _write(data): + file_obj.write(data) + return len(data) + return _write + + def download_file(self, url, save_path): + curl = pycurl.Curl() + curl.setopt(pycurl.URL, url) + + try: + # Get file size for the progress bar + curl.setopt(pycurl.NOBODY, True) + curl.perform() + total_size = curl.getinfo(pycurl.CONTENT_LENGTH_DOWNLOAD) + curl.setopt(pycurl.NOBODY, False) + + # Open file for writing + with open(save_path, 'wb') as f: + # Set up the progress bar + self.progress_bar = tqdm(total=total_size, unit='B', unit_scale=True) + + # Use write callback to stream directly to file + curl.setopt(pycurl.WRITEFUNCTION, self.write_callback(f)) + curl.setopt(pycurl.FOLLOWLOCATION, True) + curl.setopt(pycurl.NOPROGRESS, False) + curl.setopt(pycurl.XFERINFOFUNCTION, self.progress_callback) + + # Perform the download + curl.perform() + + # Retrieve the final URL after redirects + final_url = curl.getinfo(pycurl.EFFECTIVE_URL) + print(f"Final URL: {final_url}") + + # Extract the filename from the URL if present + filename = final_url.split('/')[-1] + print(f"Extracted Filename: {filename}") + + except pycurl.error as e: + print(f"Download failed: {e}") + + finally: + if self.progress_bar: + self.progress_bar.close() + curl.close() Never forget how we can test one function in python: diff --git a/database/index_query.py b/database/index_query.py index 9e5a81b..1782330 100644 --- a/database/index_query.py +++ b/database/index_query.py @@ -2,7 +2,7 @@ import sys import sqlite3 -import index_data +import data_factorie os.chdir(os.path.dirname(os.path.abspath(__file__))) @@ -373,7 +373,7 @@ def get_trle_level_local_by_id(trle_id): records = [] for record in result: - level = index_data.make_trle_level_data() + level = data_factorie.make_trle_level_data() level['trle_id'] = record[0] level['author'] = record[1] level['title'] = record[2] @@ -423,7 +423,7 @@ def get_trcustoms_level_local_by_id(trcustoms_id): """, (trcustoms_id, ), cursor ) - level = index_data.make_trcustoms_level_data() + level = data_factorie.make_trcustoms_level_data() level['trcustoms_id'] = result[0][0] level['authors'] = result[0][1].split(',') if result[0][1] else [] level['title'] = result[0][2] @@ -454,7 +454,7 @@ def get_trle_page_local(offset, sortCreatedFirst=False): if offset > rec: sys.exit(1) - page = index_data.make_trle_page_data() + page = data_factorie.make_trle_page_data() page['offset'] = offset page['records_total'] = rec @@ -482,7 +482,7 @@ def get_trle_page_local(offset, sortCreatedFirst=False): ) # Process result to format the output as needed for row in result: - level = index_data.make_trle_level_data() + level = data_factorie.make_trle_level_data() level['trle_id'] = row[0] level['author'] = row[1] level['title'] = row[2] @@ -511,7 +511,7 @@ def get_trcustoms_page_local(page_number, sortCreatedFirst=False): cursor )[0][0] - page = index_data.make_trcustoms_page_data() + page = data_factorie.make_trcustoms_page_data() total = (rec + 19) // 20 if page_number > total: sys.exit(1) @@ -550,7 +550,7 @@ def get_trcustoms_page_local(page_number, sortCreatedFirst=False): ) # Process result to format the output as needed for row in result: - level = index_data.make_trcustoms_level_data() + level = data_factorie.make_trcustoms_level_data() level['trcustoms_id'] = row[0] level['authors'] = row[1].split(',') if row[1] else [] level['title'] = row[2] diff --git a/database/index_scrape.py b/database/index_scrape.py index f5ea849..7757675 100644 --- a/database/index_scrape.py +++ b/database/index_scrape.py @@ -3,184 +3,23 @@ import re import os import hashlib -import socket import uuid import time -import json import logging import tempfile from io import BytesIO from urllib.parse import urlparse, urlencode, parse_qs from datetime import datetime -import pycurl from bs4 import BeautifulSoup, Tag from PIL import Image -from cryptography import x509 -from cryptography.x509.oid import ExtensionOID -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import serialization -import index_data -import get_leaf_cert - -MISCONFIGURED_SERVER = False -LEAF_CERT = None +import data_factorie +import https # Set up logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(message)s') logging.getLogger("requests").setLevel(logging.DEBUG) -def acquire_lock(): - """Create a TCP socket to ensure single instance.""" - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - try: - # Bind to localhost and a specific port - sock.bind(('127.0.0.1', 55234)) - return sock - except socket.error: - logging.error("Another instance is already running") - sys.exit(1) - - -def release_lock(sock): - """Release the socket and close the connection.""" - sock.close() - - -def get_response(url, content_type): - """Handle all https requests""" - valid_content_types = [ - 'text/html', - 'application/json', - 'application/pkix-cert', - 'image/jpeg', - 'image/png' - ] - - if content_type not in valid_content_types: - logging.error("Invalid content type: %s", content_type) - sys.exit(1) - - max_retries = 3 - retries = 0 - curl = None - headers = None - response_buffer = None - temp_cert_path = None - - while retries < max_retries: - try: - response_buffer = BytesIO() - headers_buffer = BytesIO() - curl = pycurl.Curl() - curl.setopt(pycurl.URL, url) - curl.setopt(pycurl.WRITEDATA, response_buffer) - curl.setopt(pycurl.WRITEHEADER, headers_buffer) - - global MISCONFIGURED_SERVER - if MISCONFIGURED_SERVER: - global LEAF_CERT - if not LEAF_CERT: - sys.exit(1) - - # Write the certificate to a temporary file manually - temp_cert_file = tempfile.NamedTemporaryFile(delete=False) # `delete=False` prevents auto-deletion - temp_cert_file.write(LEAF_CERT) - temp_cert_file.flush() - temp_cert_path = temp_cert_file.name - temp_cert_file.close() # Close the file so it can be accessed by pycurl - - # Set CAINFO to use the temporary certificate file - curl.setopt(pycurl.CAINFO, temp_cert_path) - - headers_list = [ - 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', - 'Accept: */*', - ] - curl.setopt(pycurl.HTTPHEADER, headers_list) - curl.perform() - - response_code = curl.getinfo(pycurl.RESPONSE_CODE) - - if response_code != 200: - retries += 1 - time.sleep(3) - logging.warning(f"Retrying... Response code: {response_code}") - curl.close() - continue - - headers = headers_buffer.getvalue().decode('utf-8') - break - - except pycurl.error as curl_error: - if curl_error.args[0] == 60: # SSL certificate error - LEAF_CERT = get_leaf_cert.run(url) - - if LEAF_CERT: - try: - LEAF_CERT = LEAF_CERT.public_bytes(encoding=serialization.Encoding.PEM) - MISCONFIGURED_SERVER = True - logging.info("Leaf certificate retrieved and applied.") - except Exception as e: - logging.error("Failed to convert leaf certificate to PEM: %s", e) - sys.exit(1) - else: - logging.error("Failed to retrieve leaf certificate. Exiting.") - sys.exit(1) - continue - - logging.error("Request failed: %s", curl_error) - retries += 1 - if retries >= max_retries: - logging.error("Max retries reached. Exiting.") - sys.exit(1) - - finally: - if temp_cert_path and os.path.exists(temp_cert_path): - os.remove(temp_cert_path) # Ensure the temp cert file is deleted after the request - - if curl is None: - logging.error("No curl instance") - sys.exit(1) - - if headers is None: - logging.error("No headers received") - sys.exit(1) - - if response_buffer is None: - logging.error("No response received") - sys.exit(1) - - # Extract Content-Type from the headers - response_content_type = None - for header in headers.splitlines(): - if header.lower().startswith('content-type:'): - response_content_type = header.split(':', 1)[1].split(';')[0].strip() - break - - # Validate and return the response based on content type - if response_content_type == content_type: - if content_type == 'text/html': - return response_buffer.getvalue().decode('utf-8') - elif content_type == 'application/json': - return json.loads(response_buffer.getvalue().decode('utf-8')) - elif content_type in ['image/jpeg', 'image/png']: - return response_buffer.getvalue() - elif content_type == 'application/pkix-cert': - return response_buffer.getvalue() - else: - logging.error("Unexpected content type: %s, expected %s", response_content_type, content_type) - sys.exit(1) -def validate_pem(pem): - """Validate the certificate as a text""" - # Check if the response contains a PEM key - pem_pattern = r'-----BEGIN CERTIFICATE-----(.*?)-----END CERTIFICATE-----' - pem_match = re.search(pem_pattern, pem, re.DOTALL) - - if not pem_match: - print("PEM Key not found.") - sys.exit(1) - def trle_page_table(table): """filter out data from the TRLE level table result""" @@ -203,7 +42,7 @@ def trle_page_table(table): for row in table[1:]: cells = row.find_all('td') - level = index_data.make_trle_level_data() + level = data_factorie.make_trle_level_data() for idx, cell in enumerate(cells): if idx in field_mapping: @@ -235,8 +74,11 @@ def get_trle_page(offset, sort_created_first=False): } query_string = urlencode(params) url = f"https://www.trle.net/pFind.php?{query_string}" - soup = BeautifulSoup(get_response(url, 'text/html'), 'html.parser') - page = index_data.make_trle_page_data() + response = https.get(url, 'text/html') + if not response: + sys.exit(1) + soup = BeautifulSoup(response, 'html.parser') + page = data_factorie.make_trle_page_data() page['offset'] = offset # Find total records @@ -272,19 +114,23 @@ def get_trcustoms_page(page_number, sort_created_first=False): } query_string = urlencode(params) url = f"{host}?{query_string}" - data = get_response(url, 'application/json') + data = https.get(url, 'application/json') if not isinstance(data, dict): logging.error("Data type error, expected dict got %s", type(data)) sys.exit(1) - page = index_data.make_trcustoms_page_data() + page = data_factorie.make_trcustoms_page_data() page['current_page'] = data.get('current_page') page['total_pages'] = data.get('last_page') page['records_total'] = data.get('total_count') - results = data['results'] + results = data.get('results') + if not isinstance(results, list): + logging.error("Data type error, expected list got %s", type(results)) + sys.exit(1) + for item in results: - repacked_data = index_data.make_trcustoms_level_data() + repacked_data = data_factorie.make_trcustoms_level_data() for author in item['authors']: repacked_data['authors'].append(author['username']) for tag in item['tags']: @@ -312,7 +158,7 @@ def get_trle_cover(trle_id): sys.exit(1) url = f"https://www.trle.net/screens/{trle_id}.jpg" - response = get_response(url, 'image/jpeg') + response = https.get(url, 'image/jpeg') return cover_resize_to_webp(response) @@ -341,16 +187,16 @@ def get_cover_list(levels): file = level['cover'].replace(base_url, "") filename, ext = os.path.splitext(file) + ext = ext[1:] # remove dot - if ext.lower() in ('.jpg', '.jpeg', '.png'): - level_list.append(get_trcustoms_cover(filename, level['cover_md5sum'], ext[1:])) + if ext.lower() in ('jpg', 'jpeg', 'png'): + level_list.append(get_trcustoms_cover(filename, level['cover_md5sum'], ext)) else: print(f"Skipping level {level['title']}, invalid file format: {ext}") sys.exit(1) return level_list - def get_trcustoms_cover(image_uuid, md5sum, image_format): """Getting pictures from internet and displaying on the terminal""" if not is_valid_uuid(image_uuid): @@ -363,7 +209,7 @@ def get_trcustoms_cover(image_uuid, md5sum, image_format): url = f"https://data.trcustoms.org/media/level_images/{image_uuid}.{image_format}" if image_format.lower() == "jpg": image_format = "jpeg" - response = get_response(url, f"image/{image_format}") + response = https.get(url, f"image/{image_format}") # Check if the MD5 sum matches downloaded_md5sum = calculate_md5(response) @@ -377,25 +223,6 @@ def get_trcustoms_cover(image_uuid, md5sum, image_format): return temp_image_file.name -def get_trcustoms_page_cover_list(levels): - """Fetch a list of cover images from trcustoms""" - base_url = "https://data.trcustoms.org/media/level_images/" - level_list = [] - - for level in levels: - file = level['cover'].replace(base_url, "") - - filename, ext = os.path.splitext(file) - - if ext.lower() in ('.jpg', '.jpeg', '.png'): - level_list.append(get_trcustoms_cover(filename, level['cover_md5sum'], ext[1:])) - else: - print(f"Skipping level {level['title']}, invalid file format: {ext}") - sys.exit(1) - - return level_list - - def cover_resize_to_webp(input_img): """webp is the default we use here with 320x240 max resolution""" img = Image.open(BytesIO(input_img)) @@ -435,13 +262,58 @@ def convert_to_iso(date_str): raise ValueError(f"Unsupported date format: {date_str}") +def get_key_list(html): + """scrape keys and key status here + we cant depend on local keys from package manger that might be incomplete""" + + soup = BeautifulSoup(html, 'html.parser') + # Find the table containing the keys + table = soup.find_all('table')[2] # Adjust index if necessary + + # Iterate over the rows (skipping the header row) + ids = [] + for row in table.find_all('tr')[1:]: + key_column = row.find_all('td')[0] # Get the first column + key_striped = key_column.text.strip() # Extract the key text + print(f"Key: {key_striped}") + ids.append(key_striped) + + return ids + + +def trcustoms_key_list(): + """Get list of utf-8 public key for Trcustoms""" + key_list = https.get("https://crt.sh/?q=trcustoms.org&exclude=expired", 'text/html') + validated = get_key_list(key_list) + + public_key_list = [] + for key in validated: + time.sleep(5) + public_key_list.append(get_key(key)) + + return public_key_list + + +def trle_key_list(): + """Get list of utf-8 public key for TRLE""" + resp = https.get("https://crt.sh/?q=www.trle.net&exclude=expired", 'text/html') + key_list = get_key_list(resp) + + public_key_list = [] + for key in key_list: + time.sleep(5) + public_key_list.append(get_key(key)) + + return public_key_list + + def get_key(id_number): """Get the certificate from crt""" # Input validation if not id_number.isdigit(): print("Invalid ID number.") sys.exit(1) - html = get_response(f"https://crt.sh/?id={id_number}", 'text/html') + html = https.get(f"https://crt.sh/?id={id_number}", 'text/html') # Create a BeautifulSoup object soup = BeautifulSoup(html, 'html.parser') @@ -481,112 +353,7 @@ def get_key(id_number): print("Serial Number:", serial_number) - return validate_downloaded_key(id_number, serial_number) - - -def validate_downloaded_key(id_number, expected_serial): - """Validate the certificate in binary form with the cryptography module""" - pem_key = get_response(f"https://crt.sh/?d={id_number}", 'application/pkix-cert') - - if not isinstance(pem_key, bytes): - logging.error("Data type error, expected bytes got %s", type(pem_key)) - sys.exit(1) - - # Load the certificate - certificate = x509.load_pem_x509_certificate(pem_key, default_backend()) - - # Extract the serial number and convert it to hex (without leading '0x') - hex_serial = f'{certificate.serial_number:x}' - - # Compare the serial numbers - if hex_serial == expected_serial: - print("The downloaded PEM key matches the expected serial number.") - else: - logging.error("Serial mismatch! Expected: %s, but got: %s", expected_serial, hex_serial) - sys.exit(1) - - # Extract and validate the domain (Common Name) - valid_domains = ["trle.net", "trcustoms.org", "data.trcustoms.org", "staging.trcustoms.org"] - - # Check the Common Name (CN) in the certificate subject - comon_name = certificate.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0].value - if comon_name in valid_domains: - print(f"Valid domain found in CN: {comon_name}") - else: - logging.error("Invalid domain in CN: %s", comon_name) - sys.exit(1) - - # Extract the Subject Alternative Name (SAN) extension - try: - san_extension = certificate.extensions \ - .get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME) - - # Extract all DNS names listed in the SAN extension - dns_names = san_extension.value.get_values_for_type(x509.DNSName) # type: ignore - - print(f"DNS Names in SAN: {dns_names}") - - # Check if any of the DNS names match the valid domain list - valid_domains = ["trle.net", "www.trle.net", "trcustoms.org", "*.trcustoms.org", - "data.trcustoms.org", 'staging.trcustoms.org'] - - if all(domain in valid_domains for domain in dns_names): - print(f"Valid domain found in SAN: {dns_names}") - else: - print(f"Invalid domain in SAN: {dns_names}") - sys.exit(1) - - except x509.ExtensionNotFound: - print("No Subject Alternative Name (SAN) extension found in the certificate.") - - pem_data = certificate.public_bytes(encoding=serialization.Encoding.PEM) - return pem_data.decode('utf-8') - - -def get_key_list(html): - """scrape keys and key status here - we cant depend on local keys from package manger that might be incomplete""" - - soup = BeautifulSoup(html, 'html.parser') - # Find the table containing the keys - table = soup.find_all('table')[2] # Adjust index if necessary - - # Iterate over the rows (skipping the header row) - ids = [] - for row in table.find_all('tr')[1:]: - key_column = row.find_all('td')[0] # Get the first column - key_striped = key_column.text.strip() # Extract the key text - print(f"Key: {key_striped}") - ids.append(key_striped) - - return ids - - -def trcustoms_key_list(): - """Get list of utf-8 public key for Trcustoms""" - key_list = get_response("https://crt.sh/?q=trcustoms.org&exclude=expired", 'text/html') - validated = get_key_list(key_list) - - public_key_list = [] - for key in validated: - time.sleep(5) - public_key_list.append(get_key(key)) - - return public_key_list - - -def trle_key_list(): - """Get list of utf-8 public key for TRLE""" - resp = get_response("https://crt.sh/?q=www.trle.net&exclude=expired", 'text/html') - key_list = get_key_list(resp) - - public_key_list = [] - for key in key_list: - time.sleep(5) - public_key_list.append(get_key(key)) - - return public_key_list - + #return validate_downloaded_key(id_number, serial_number) '''