From 633220fe8399a0adfd53e23e02d697202b306acc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Sta=C5=9Bczak?= Date: Thu, 14 Dec 2023 23:17:26 +0100 Subject: [PATCH] Fix #38 --- onetimepass/db/models.py | 10 +++++ onetimepass/otp.py | 83 +++++++++++++++++++++++++--------------- 2 files changed, 62 insertions(+), 31 deletions(-) diff --git a/onetimepass/db/models.py b/onetimepass/db/models.py index ec618e7..56b883b 100644 --- a/onetimepass/db/models.py +++ b/onetimepass/db/models.py @@ -1,5 +1,7 @@ from __future__ import annotations +import base64 +import binascii import datetime import typing @@ -76,6 +78,14 @@ class AliasSchema(BaseModel): label: str | None issuer: str | None + @validator("secret") + def valid_base32_secret(cls, v): + try: + base64.b32decode(v) + except binascii.Error as e: + raise ValueError(f"invalid Base32 value; {e}") + return v + @validator("params") def valid_params_for_otp_type(cls, v, values): otp_type = values["otp_type"] diff --git a/onetimepass/otp.py b/onetimepass/otp.py index 053c269..c4bc201 100644 --- a/onetimepass/otp.py +++ b/onetimepass/otp.py @@ -1,3 +1,4 @@ +import base64 import binascii import datetime import functools @@ -99,6 +100,17 @@ def handle_conflicting_options(options: Dict[str, bool]): raise ClickUsageError(f"conflicting options: {options_list}") +def validation_error_to_str(error: pydantic.ValidationError) -> str: + error_messages: list[str] = [str(i.exc) for i in error.args[0]] + + if len(error_messages) == 1: + return error_messages[0] + + error_messages.insert(0, "") + bullet_list = "\n- ".join(error_messages) + return bullet_list + + @click.group(context_settings={"help_option_names": ["-h", "--help"]}) @click.option("color", "-c/-C", "--color/--no-color", default=True, show_default=True) @click.option("quiet", "-q", "--quiet", is_flag=True) @@ -145,7 +157,7 @@ def show(ctx: click.Context, alias: str, wait: int | None, minimum_verbose: bool if wait is not None: remaining_seconds = algorithm.get_seconds_remaining( algorithm.TOTPParameters( - secret=alias_data.secret.encode(), + secret=base64.b32decode(alias_data.secret), digits_count=alias_data.digits_count, hash_algorithm=alias_data.hash_algorithm, time_step_seconds=alias_data.params.time_step_seconds, @@ -156,7 +168,7 @@ def show(ctx: click.Context, alias: str, wait: int | None, minimum_verbose: bool time.sleep(remaining_seconds) # Reinitialize parameters to get valid result params = algorithm.TOTPParameters( - secret=alias_data.secret.encode(), + secret=base64.b32decode(alias_data.secret), digits_count=alias_data.digits_count, hash_algorithm=alias_data.hash_algorithm, time_step_seconds=alias_data.params.time_step_seconds, @@ -174,7 +186,7 @@ def show(ctx: click.Context, alias: str, wait: int | None, minimum_verbose: bool elif alias_data.otp_type == OTPType.HOTP: alias_data.params.counter += 1 params = algorithm.HOTPParameters( - secret=alias_data.secret.encode(), + secret=base64.b32decode(alias_data.secret), digits_count=alias_data.digits_count, hash_algorithm=alias_data.hash_algorithm, counter=alias_data.params.counter, @@ -336,7 +348,7 @@ def add_uri(ctx: click.Context, alias: str): try: uri_parsed = Uri.parse(input_uri) except ParsingError as e: - logger.error(e) + logger.debug(e) raise ClickUsageError("invalid URI") otp_type = OTPType(uri_parsed.type) @@ -351,15 +363,18 @@ def add_uri(ctx: click.Context, alias: str): else: raise UnhandledOTPTypeException(otp_type) - alias_data = AliasSchema( - otp_type=otp_type, - label=str(uri_parsed.label), - issuer=uri_parsed.parameters.issuer or uri_parsed.label.issuer, - secret=uri_parsed.parameters.secret, - digits_count=uri_parsed.parameters.digits, - hash_algorithm=uri_parsed.parameters.algorithm, - params=params, - ) + try: + alias_data = AliasSchema( + otp_type=otp_type, + label=str(uri_parsed.label), + issuer=uri_parsed.parameters.issuer or uri_parsed.label.issuer, + secret=uri_parsed.parameters.secret, + digits_count=uri_parsed.parameters.digits, + hash_algorithm=uri_parsed.parameters.algorithm, + params=params, + ) + except pydantic.ValidationError as e: + raise ClickUsageError(validation_error_to_str(e)) data.add_alias(alias, alias_data) db.write(data) @@ -426,15 +441,18 @@ def add_hotp( if alias in data.otp: raise ClickUsageError(f"Alias {alias} exists. Consider renaming it") - alias_data = AliasSchema( - otp_type=OTPType.HOTP, - label=label, - issuer=issuer, - secret=input_secret, - digits_count=digits_count, - hash_algorithm=OTPAlgorithm(algorithm), - params=HOTPParams(counter=counter), - ) + try: + alias_data = AliasSchema( + otp_type=OTPType.HOTP, + label=label, + issuer=issuer, + secret=input_secret, + digits_count=digits_count, + hash_algorithm=OTPAlgorithm(algorithm), + params=HOTPParams(counter=counter), + ) + except pydantic.ValidationError as e: + raise ClickUsageError(validation_error_to_str(e)) data.add_alias(alias, alias_data) db.write(data) @@ -483,15 +501,18 @@ def add_totp( if alias in data.otp: raise ClickUsageError(f"Alias {alias} exists. Consider renaming it") - alias_data = AliasSchema( - otp_type=OTPType.TOTP, - label=label, - issuer=issuer, - secret=input_secret, - digits_count=digits_count, - hash_algorithm=OTPAlgorithm(algorithm), - params=TOTPParams(initial_time=initial_time, time_step_seconds=period), - ) + try: + alias_data = AliasSchema( + otp_type=OTPType.TOTP, + label=label, + issuer=issuer, + secret=input_secret, + digits_count=digits_count, + hash_algorithm=OTPAlgorithm(algorithm), + params=TOTPParams(initial_time=initial_time, time_step_seconds=period), + ) + except pydantic.ValidationError as e: + raise ClickUsageError(validation_error_to_str(e)) data.add_alias(alias, alias_data) db.write(data)