From 699e1c772f4932cf73ae87d48d138d4177d4c8fc Mon Sep 17 00:00:00 2001 From: Ghost Ops <72981180+GhostOps77@users.noreply.github.com> Date: Sat, 4 May 2024 18:51:50 +0530 Subject: [PATCH 1/5] implemented ReplContext class --- click_repl/__init__.py | 1 + click_repl/_completer.py | 14 +-- click_repl/_ctx_stack.py | 28 +++++ click_repl/_repl.py | 99 ++++++++++-------- click_repl/core.py | 216 +++++++++++++++++++++++++++++++++++++++ click_repl/globals_.py | 42 ++++++++ 6 files changed, 350 insertions(+), 50 deletions(-) create mode 100644 click_repl/_ctx_stack.py create mode 100644 click_repl/core.py create mode 100644 click_repl/globals_.py diff --git a/click_repl/__init__.py b/click_repl/__init__.py index df3cea8..34f20da 100644 --- a/click_repl/__init__.py +++ b/click_repl/__init__.py @@ -1,4 +1,5 @@ from ._completer import ClickCompleter as ClickCompleter # noqa: F401 +from .core import pass_context as pass_context # noqa: F401 from ._repl import register_repl as register_repl # noqa: F401 from ._repl import repl as repl # noqa: F401 from .exceptions import CommandLineParserError as CommandLineParserError # noqa: F401 diff --git a/click_repl/_completer.py b/click_repl/_completer.py index 67b8bda..1f64fa0 100644 --- a/click_repl/_completer.py +++ b/click_repl/_completer.py @@ -206,16 +206,16 @@ def _get_completion_for_cmd_args( current_args = args[param.nargs * -1 :] # Show only unused opts - already_present = any([ - opt in previous_args for opt in opts - ]) + already_present = any([opt in previous_args for opt in opts]) hide = self.show_only_unused and already_present and not param.multiple # Show only shortest opt - if (self.shortest_only - and not incomplete # just typed a space - # not selecting a value for a longer version of this option - and args[-1] not in opts): + if ( + self.shortest_only + and not incomplete # just typed a space + # not selecting a value for a longer version of this option + and args[-1] not in opts + ): opts = [min(opts, key=len)] for option in opts: diff --git a/click_repl/_ctx_stack.py b/click_repl/_ctx_stack.py new file mode 100644 index 0000000..1046ccd --- /dev/null +++ b/click_repl/_ctx_stack.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .core import ReplContext + + +# To store the ReplContext objects generated throughout the Runtime. +_context_stack: list[ReplContext] = [] + + +def _push_context(ctx: ReplContext) -> None: + """ + Pushes a new REPL context onto the current stack. + + Parameters + ---------- + ctx + The :class:`~click_repl.core.ReplContext` object that should be + added to the REPL context stack. + """ + _context_stack.append(ctx) + + +def _pop_context() -> None: + """Removes the top-level REPL context from the stack.""" + _context_stack.pop() diff --git a/click_repl/_repl.py b/click_repl/_repl.py index 5693f52..743049b 100644 --- a/click_repl/_repl.py +++ b/click_repl/_repl.py @@ -2,18 +2,22 @@ import click import sys -from prompt_toolkit import PromptSession from prompt_toolkit.history import InMemoryHistory from ._completer import ClickCompleter from .exceptions import ClickExit # type: ignore[attr-defined] from .exceptions import CommandLineParserError, ExitReplException, InvalidGroupFormat from .utils import _execute_internal_and_sys_cmds +from .core import ReplContext +from .globals_ import get_current_repl_ctx __all__ = ["bootstrap_prompt", "register_repl", "repl"] +ISATTY = sys.stdin.isatty() + + def bootstrap_prompt( group, prompt_kwargs, @@ -90,58 +94,67 @@ def repl( original_command = available_commands.pop(repl_command_name, None) - if isatty: - prompt_kwargs = bootstrap_prompt(group, prompt_kwargs, group_ctx) - session = PromptSession(**prompt_kwargs) + repl_ctx = ReplContext( + group_ctx, + bootstrap_prompt(group, prompt_kwargs, group_ctx), + get_current_repl_ctx(silent=True), + ) - def get_command(): - return session.prompt() + if ISATTY: + # If stdin is a TTY, prompt the user for input using PromptSession. + def get_command() -> str: + return repl_ctx.session.prompt() # type: ignore else: - get_command = sys.stdin.readline - - while True: - try: - command = get_command() - except KeyboardInterrupt: - continue - except EOFError: - break - - if not command: - if isatty: + # If stdin is not a TTY, read input from stdin directly. + def get_command() -> str: + inp = sys.stdin.readline().strip() + repl_ctx._history.append(inp) + return inp + + with repl_ctx: + while True: + try: + command = get_command() + except KeyboardInterrupt: continue - else: + except EOFError: break - try: - args = _execute_internal_and_sys_cmds( - command, allow_internal_commands, allow_system_commands - ) - if args is None: - continue + if not command: + if isatty: + continue + else: + break - except CommandLineParserError: - continue + try: + args = _execute_internal_and_sys_cmds( + command, allow_internal_commands, allow_system_commands + ) + if args is None: + continue - except ExitReplException: - break + except CommandLineParserError: + continue + + except ExitReplException: + break - try: - # The group command will dispatch based on args. - old_protected_args = group_ctx.protected_args try: - group_ctx.protected_args = args - group.invoke(group_ctx) - finally: - group_ctx.protected_args = old_protected_args - except click.ClickException as e: - e.show() - except (ClickExit, SystemExit): - pass - - except ExitReplException: - break + # The group command will dispatch based on args. + old_protected_args = group_ctx.protected_args + try: + group_ctx.protected_args = args + group.invoke(group_ctx) + finally: + group_ctx.protected_args = old_protected_args + except click.ClickException as e: + e.show() + except (ClickExit, SystemExit): + pass + + except ExitReplException: + break if original_command is not None: available_commands[repl_command_name] = original_command diff --git a/click_repl/core.py b/click_repl/core.py new file mode 100644 index 0000000..aef5e0e --- /dev/null +++ b/click_repl/core.py @@ -0,0 +1,216 @@ +""" +Core functionalities for managing context of the click_repl app. +""" + +from __future__ import annotations + +import sys +from functools import wraps +from typing import TYPE_CHECKING, Any, Callable, Dict, Generator, TypeVar + +from click import Context +from prompt_toolkit import PromptSession +from typing_extensions import Concatenate, Final, ParamSpec, TypeAlias, TypedDict + +from ._ctx_stack import _pop_context, _push_context +from .globals_ import get_current_repl_ctx + +if TYPE_CHECKING: + from prompt_toolkit.formatted_text import AnyFormattedText + + +P = ParamSpec("P") +R = TypeVar("R") +F = TypeVar("F", bound=Callable[..., Any]) + + +__all__ = ["ReplContext", "pass_context"] + +ISATTY = sys.stdin.isatty() + +_PromptSession: TypeAlias = PromptSession[Dict[str, Any]] + + +class ReplContextInfoDict(TypedDict): + group_ctx: Context + prompt_kwargs: dict[str, Any] + session: _PromptSession | None + parent: ReplContext | None + _history: list[str] + + +class ReplContext: + """ + Context object for the REPL sessions. + + This class tracks the depth of nested REPLs, ensuring seamless navigation + between different levels. It facilitates nested REPL scenarios, allowing + multiple levels of interactive REPL sessions. + + Each REPL's properties are stored inside this context class, allowing them to + be accessed and shared with their parent REPL. + + All the settings for each REPL session persist until the session is terminated. + + Parameters + ---------- + group_ctx + The click context object that belong to the CLI/parent Group. + + prompt_kwargs + Extra keyword arguments for + :class:`~prompt_toolkit.shortcuts.PromptSession` class. + + parent + REPL Context object of the parent REPL session, if exists. Otherwise, :obj:`None`. + """ + + __slots__ = ( + "group_ctx", + "prompt_kwargs", + "parent", + "session", + "_history", + ) + + def __init__( + self, + group_ctx: Context, + prompt_kwargs: dict[str, Any] = {}, + parent: ReplContext | None = None, + ) -> None: + """ + Initializes the `ReplContext` class. + """ + session: _PromptSession | None + + if ISATTY: + session = PromptSession(**prompt_kwargs) + + else: + session = None + + self.group_ctx: Final[Context] = group_ctx + """The click context object that belong to the CLI/parent Group.""" + + self.session = session + """Object that's responsible for managing and executing the REPL.""" + + self._history: list[str] = [] + """ + History of past executed commands. + + Used only when :func:`~sys.stdin.isatty` is :obj:`False`. + """ + + self.prompt_kwargs = prompt_kwargs + """ + Extra keyword arguments for + :class:`~prompt_toolkit.shortcuts.PromptSession` class. + """ + + self.parent: Final[ReplContext | None] = parent + """ + REPL Context object of the parent REPL session, if exists. + Otherwise, :obj:`None`. + """ + + def __enter__(self) -> ReplContext: + _push_context(self) + return self + + def __exit__(self, *_: Any) -> None: + _pop_context() + + @property + def prompt(self) -> AnyFormattedText: + """ + The prompt text of the REPL. + + Returns + ------- + prompt_toolkit.formatted_text.AnyFormattedText + The prompt object if :func:`~sys.stdin.isatty` is :obj:`True`, + else :obj:`None`. + """ + if ISATTY and self.session is not None: + return self.session.message + return None + + @prompt.setter + def prompt(self, value: AnyFormattedText) -> None: + if ISATTY and self.session is not None: + self.session.message = value + + def to_info_dict(self) -> ReplContextInfoDict: + """ + Provides a dictionary with minimal info about the current REPL. + + Returns + ------- + ReplContextInfoDict + A dictionary that has the instance variables and their values. + """ + + res: ReplContextInfoDict = { + "group_ctx": self.group_ctx, + "prompt_kwargs": self.prompt_kwargs, + "session": self.session, + "parent": self.parent, + "_history": self._history, + } + + return res + + def session_reset(self) -> None: + """ + Resets values of :class:`~prompt_toolkit.session.PromptSession` to + the provided :attr:`~.prompt_kwargs`, discarding any changes done to the + :class:`~prompt_toolkit.session.PromptSession` object. + """ + + if ISATTY and self.session is not None: + self.session = PromptSession(**self.prompt_kwargs) + + def history(self) -> Generator[str, None, None]: + """ + Generates the history of past executed commands. + + Yields + ------ + str + The executed command string from the history, + in chronological order from most recent to oldest. + """ + + if ISATTY and self.session is not None: + yield from self.session.history.load_history_strings() + + else: + yield from reversed(self._history) + + +def pass_context( + func: Callable[Concatenate[ReplContext | None, P], R], +) -> Callable[P, R]: + """ + Decorator that marks a callback function to receive the current + REPL context object as its first argument. + + Parameters + ---------- + func + The callback function to pass context as its first parameter. + + Returns + ------- + Callable[P,R] + The decorated callback function that receives the current REPL + context object as its first argument. + """ + + @wraps(func) + def decorator(*args: P.args, **kwargs: P.kwargs) -> R: + return func(get_current_repl_ctx(), *args, **kwargs) + + return decorator diff --git a/click_repl/globals_.py b/click_repl/globals_.py new file mode 100644 index 0000000..92508cf --- /dev/null +++ b/click_repl/globals_.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, NoReturn + +from ._ctx_stack import _context_stack + +if TYPE_CHECKING: + from .core import ReplContext + + +def get_current_repl_ctx(silent: bool = False) -> ReplContext | NoReturn | None: + """ + Retrieves the current click-repl context. + + This function provides a way to access the context from anywhere + in the code. This function serves as a more implicit alternative to the + :func:`~click.core.pass_context` decorator. + + Parameters + ---------- + silent + If set to :obj:`True`, the function returns :obj:`None` if no context + is available. The default behavior is to raise a :exc:`~RuntimeError`. + + Returns + ------- + :class:`~click_repl.core.ReplContext` | None + REPL context object if available, or :obj:`None` if ``silent`` is :obj:`True`. + + Raises + ------ + RuntimeError + If there's no context object in the stack and ``silent`` is :obj:`False`. + """ + + try: + return _context_stack[-1] + except IndexError: + if not silent: + raise RuntimeError("There is no active click-repl context.") + + return None From 357f19b173603a4c8ce4c4edc429758118da887f Mon Sep 17 00:00:00 2001 From: Ghost Ops <72981180+GhostOps77@users.noreply.github.com> Date: Sat, 4 May 2024 18:52:13 +0530 Subject: [PATCH 2/5] updated dependencies --- setup.cfg | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.cfg b/setup.cfg index 7cba7f5..b8b3a2d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,6 +29,7 @@ packages= install_requires = click>=7.0 prompt_toolkit>=3.0.36 + typing-extensions>=4.7.0 python_requires = >=3.6 zip_safe = no From 7096ed6d54c7a778289f35b67f32714a8e8a954d Mon Sep 17 00:00:00 2001 From: Ghost Ops <72981180+GhostOps77@users.noreply.github.com> Date: Sat, 4 May 2024 18:52:27 +0530 Subject: [PATCH 3/5] Added test cases for ReplContext --- tests/test_repl_ctx.py | 50 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 tests/test_repl_ctx.py diff --git a/tests/test_repl_ctx.py b/tests/test_repl_ctx.py new file mode 100644 index 0000000..f4fb238 --- /dev/null +++ b/tests/test_repl_ctx.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +import click +import pytest + +import click_repl +from tests import mock_stdin + + +@click.group(invoke_without_command=True) +@click.pass_context +def cli(ctx): + if ctx.invoked_subcommand is None: + click_repl.repl(ctx) + + +@cli.command() +def hello(): + print("Hello!") + + +@cli.command() +@click_repl.pass_context +def history_test(repl_ctx): + print(list(repl_ctx.history())) + + +def test_repl_ctx_history(capsys): + with mock_stdin("hello\nhistory-test\n"): + with pytest.raises(SystemExit): + cli(args=[], prog_name="test_repl_ctx_history") + + assert ( + capsys.readouterr().out.replace("\r\n", "\n") + == "Hello!\n['history-test', 'hello']\n" + ) + + +@cli.command() +@click_repl.pass_context +def prompt_test(repl_ctx): + print(repl_ctx.prompt) + + +def test_repl_ctx_prompt(capsys): + with mock_stdin("prompt-test\n"): + with pytest.raises(SystemExit): + cli(args=[], prog_name="test_repl_ctx_history") + + assert capsys.readouterr().out.replace("\r\n", "\n") == "None\n" From 4f81e6adb7dc576b51dc2af66890750cc579c84b Mon Sep 17 00:00:00 2001 From: Ghost Ops <72981180+GhostOps77@users.noreply.github.com> Date: Sat, 4 May 2024 19:14:52 +0530 Subject: [PATCH 4/5] Moved ISATTY value to 'globals_' for global access --- click_repl/_repl.py | 9 ++------- click_repl/core.py | 3 +-- click_repl/globals_.py | 4 ++++ 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/click_repl/_repl.py b/click_repl/_repl.py index 743049b..0445182 100644 --- a/click_repl/_repl.py +++ b/click_repl/_repl.py @@ -9,15 +9,12 @@ from .exceptions import CommandLineParserError, ExitReplException, InvalidGroupFormat from .utils import _execute_internal_and_sys_cmds from .core import ReplContext -from .globals_ import get_current_repl_ctx +from .globals_ import ISATTY, get_current_repl_ctx __all__ = ["bootstrap_prompt", "register_repl", "repl"] -ISATTY = sys.stdin.isatty() - - def bootstrap_prompt( group, prompt_kwargs, @@ -77,8 +74,6 @@ def repl( f"an optional argument '{param.name}' in REPL mode" ) - isatty = sys.stdin.isatty() - # Delete the REPL command from those available, as we don't want to allow # nesting REPLs (note: pass `None` to `pop` as we don't want to error if # REPL command already not present for some reason). @@ -122,7 +117,7 @@ def get_command() -> str: break if not command: - if isatty: + if ISATTY: continue else: break diff --git a/click_repl/core.py b/click_repl/core.py index aef5e0e..50ad81c 100644 --- a/click_repl/core.py +++ b/click_repl/core.py @@ -13,7 +13,7 @@ from typing_extensions import Concatenate, Final, ParamSpec, TypeAlias, TypedDict from ._ctx_stack import _pop_context, _push_context -from .globals_ import get_current_repl_ctx +from .globals_ import ISATTY, get_current_repl_ctx if TYPE_CHECKING: from prompt_toolkit.formatted_text import AnyFormattedText @@ -26,7 +26,6 @@ __all__ = ["ReplContext", "pass_context"] -ISATTY = sys.stdin.isatty() _PromptSession: TypeAlias = PromptSession[Dict[str, Any]] diff --git a/click_repl/globals_.py b/click_repl/globals_.py index 92508cf..6a73652 100644 --- a/click_repl/globals_.py +++ b/click_repl/globals_.py @@ -1,5 +1,6 @@ from __future__ import annotations +import sys from typing import TYPE_CHECKING, NoReturn from ._ctx_stack import _context_stack @@ -8,6 +9,9 @@ from .core import ReplContext +ISATTY = sys.stdin.isatty() + + def get_current_repl_ctx(silent: bool = False) -> ReplContext | NoReturn | None: """ Retrieves the current click-repl context. From a4f3ab5a1a7894239a49029e2981366853fd2771 Mon Sep 17 00:00:00 2001 From: Ghost Ops <72981180+GhostOps77@users.noreply.github.com> Date: Sat, 4 May 2024 19:16:52 +0530 Subject: [PATCH 5/5] Fixed flake8 errors --- click_repl/core.py | 1 - 1 file changed, 1 deletion(-) diff --git a/click_repl/core.py b/click_repl/core.py index 50ad81c..79d30fa 100644 --- a/click_repl/core.py +++ b/click_repl/core.py @@ -4,7 +4,6 @@ from __future__ import annotations -import sys from functools import wraps from typing import TYPE_CHECKING, Any, Callable, Dict, Generator, TypeVar