Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #38 #65

Merged
merged 1 commit into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions onetimepass/db/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import base64
import binascii
import datetime
import typing

Expand Down Expand Up @@ -76,6 +78,14 @@ class AliasSchema(BaseModel):
label: str | None
issuer: str | None

@validator("secret")
def valid_base32_secret(cls, v):
try:
base64.b32decode(v)
except binascii.Error as e:
raise ValueError(f"invalid Base32 value; {e}")
return v

@validator("params")
def valid_params_for_otp_type(cls, v, values):
otp_type = values["otp_type"]
Expand Down
83 changes: 52 additions & 31 deletions onetimepass/otp.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import base64
import binascii
import datetime
import functools
Expand Down Expand Up @@ -99,6 +100,17 @@ def handle_conflicting_options(options: Dict[str, bool]):
raise ClickUsageError(f"conflicting options: {options_list}")


def validation_error_to_str(error: pydantic.ValidationError) -> str:
error_messages: list[str] = [str(i.exc) for i in error.args[0]]

if len(error_messages) == 1:
return error_messages[0]

error_messages.insert(0, "")
bullet_list = "\n- ".join(error_messages)
return bullet_list


@click.group(context_settings={"help_option_names": ["-h", "--help"]})
@click.option("color", "-c/-C", "--color/--no-color", default=True, show_default=True)
@click.option("quiet", "-q", "--quiet", is_flag=True)
Expand Down Expand Up @@ -145,7 +157,7 @@ def show(ctx: click.Context, alias: str, wait: int | None, minimum_verbose: bool
if wait is not None:
remaining_seconds = algorithm.get_seconds_remaining(
algorithm.TOTPParameters(
secret=alias_data.secret.encode(),
secret=base64.b32decode(alias_data.secret),
digits_count=alias_data.digits_count,
hash_algorithm=alias_data.hash_algorithm,
time_step_seconds=alias_data.params.time_step_seconds,
Expand All @@ -156,7 +168,7 @@ def show(ctx: click.Context, alias: str, wait: int | None, minimum_verbose: bool
time.sleep(remaining_seconds)
# Reinitialize parameters to get valid result
params = algorithm.TOTPParameters(
secret=alias_data.secret.encode(),
secret=base64.b32decode(alias_data.secret),
digits_count=alias_data.digits_count,
hash_algorithm=alias_data.hash_algorithm,
time_step_seconds=alias_data.params.time_step_seconds,
Expand All @@ -174,7 +186,7 @@ def show(ctx: click.Context, alias: str, wait: int | None, minimum_verbose: bool
elif alias_data.otp_type == OTPType.HOTP:
alias_data.params.counter += 1
params = algorithm.HOTPParameters(
secret=alias_data.secret.encode(),
secret=base64.b32decode(alias_data.secret),
digits_count=alias_data.digits_count,
hash_algorithm=alias_data.hash_algorithm,
counter=alias_data.params.counter,
Expand Down Expand Up @@ -336,7 +348,7 @@ def add_uri(ctx: click.Context, alias: str):
try:
uri_parsed = Uri.parse(input_uri)
except ParsingError as e:
logger.error(e)
logger.debug(e)
raise ClickUsageError("invalid URI")

otp_type = OTPType(uri_parsed.type)
Expand All @@ -351,15 +363,18 @@ def add_uri(ctx: click.Context, alias: str):
else:
raise UnhandledOTPTypeException(otp_type)

alias_data = AliasSchema(
otp_type=otp_type,
label=str(uri_parsed.label),
issuer=uri_parsed.parameters.issuer or uri_parsed.label.issuer,
secret=uri_parsed.parameters.secret,
digits_count=uri_parsed.parameters.digits,
hash_algorithm=uri_parsed.parameters.algorithm,
params=params,
)
try:
alias_data = AliasSchema(
otp_type=otp_type,
label=str(uri_parsed.label),
issuer=uri_parsed.parameters.issuer or uri_parsed.label.issuer,
secret=uri_parsed.parameters.secret,
digits_count=uri_parsed.parameters.digits,
hash_algorithm=uri_parsed.parameters.algorithm,
params=params,
)
except pydantic.ValidationError as e:
raise ClickUsageError(validation_error_to_str(e))

data.add_alias(alias, alias_data)
db.write(data)
Expand Down Expand Up @@ -426,15 +441,18 @@ def add_hotp(
if alias in data.otp:
raise ClickUsageError(f"Alias {alias} exists. Consider renaming it")

alias_data = AliasSchema(
otp_type=OTPType.HOTP,
label=label,
issuer=issuer,
secret=input_secret,
digits_count=digits_count,
hash_algorithm=OTPAlgorithm(algorithm),
params=HOTPParams(counter=counter),
)
try:
alias_data = AliasSchema(
otp_type=OTPType.HOTP,
label=label,
issuer=issuer,
secret=input_secret,
digits_count=digits_count,
hash_algorithm=OTPAlgorithm(algorithm),
params=HOTPParams(counter=counter),
)
except pydantic.ValidationError as e:
raise ClickUsageError(validation_error_to_str(e))

data.add_alias(alias, alias_data)
db.write(data)
Expand Down Expand Up @@ -483,15 +501,18 @@ def add_totp(
if alias in data.otp:
raise ClickUsageError(f"Alias {alias} exists. Consider renaming it")

alias_data = AliasSchema(
otp_type=OTPType.TOTP,
label=label,
issuer=issuer,
secret=input_secret,
digits_count=digits_count,
hash_algorithm=OTPAlgorithm(algorithm),
params=TOTPParams(initial_time=initial_time, time_step_seconds=period),
)
try:
alias_data = AliasSchema(
otp_type=OTPType.TOTP,
label=label,
issuer=issuer,
secret=input_secret,
digits_count=digits_count,
hash_algorithm=OTPAlgorithm(algorithm),
params=TOTPParams(initial_time=initial_time, time_step_seconds=period),
)
except pydantic.ValidationError as e:
raise ClickUsageError(validation_error_to_str(e))

data.add_alias(alias, alias_data)
db.write(data)
Expand Down