From 8906f57740aaf3e9ea77af3de0b061a64a2f5d3b Mon Sep 17 00:00:00 2001 From: Simon Willison Date: Fri, 8 Nov 2024 11:16:57 -0800 Subject: [PATCH] Hack to support reuse of memory command, closes #643 --- docs/plugins.rst | 25 +++++++++++++++++++++++++ sqlite_utils/cli.py | 15 ++++++++++++++- sqlite_utils/db.py | 5 +++++ tests/test_cli_memory.py | 22 +++++++++++++++++++++- 4 files changed, 65 insertions(+), 2 deletions(-) diff --git a/docs/plugins.rst b/docs/plugins.rst index 2135a9b9..b9e4362d 100644 --- a/docs/plugins.rst +++ b/docs/plugins.rst @@ -115,6 +115,31 @@ Example implementation: "Say hello world" click.echo("Hello world!") +New commands implemented by plugins can invoke existing commands using the `context.invoke `__ mechanism. + +As a special niche feature, if your plugin needs to import some files and then act against an in-memory database containing those files you can forward to the :ref:`sqlite-utils memory command ` and then access a named in-memory database called ``sqlite_utils_memory`` like this: + +.. code-block:: python + + from contextlib import redirect_stdout + import io + + @cli.command() + @click.pass_context + @click.argument( + "paths", + type=click.Path(file_okay=True, dir_okay=False, allow_dash=True), + required=False, + nargs=-1, + ) + def show_schema_for_files(ctx, paths): + from sqlite_utils.cli import memory + with redirect_stdout(io.StringIO()): + ctx.invoke(memory, paths=paths, sql="select 1") + db = sqlite_utils.Database(memory_name="sqlite_utils_memory") + # Now do something with that database + click.echo(db.schema) + .. _plugins_hooks_prepare_connection: prepare_connection(conn) diff --git a/sqlite_utils/cli.py b/sqlite_utils/cli.py index b7de4191..0e4efbfc 100644 --- a/sqlite_utils/cli.py +++ b/sqlite_utils/cli.py @@ -1815,6 +1815,10 @@ def query( ) +# So named memory db "sqlite_utils_memory" isn't dropped when it goes out of scope +_sqlite_utils_memory_db = [] + + @cli.command() @click.argument( "paths", @@ -1921,7 +1925,14 @@ def memory( \b sqlite-utils memory animals.csv --schema """ - db = sqlite_utils.Database(memory=True) + if getattr(sys, "_sqlite_utils_memory_test", False) or not getattr( + sys, "_called_from_test", False + ): + db = sqlite_utils.Database(memory_name="sqlite_utils_memory") + _sqlite_utils_memory_db.append(db) + else: + db = sqlite_utils.Database(memory=True) + # If --dump or --save or --analyze used but no paths detected, assume SQL query is a path: if (dump or save or schema or analyze) and not paths: paths = [sql] @@ -1954,6 +1965,7 @@ def memory( rows = tracker.wrap(rows) if flatten: rows = (_flatten(row) for row in rows) + db[file_table].insert_all(rows, alter=True) if tracker is not None: db[file_table].transform(types=tracker.types) @@ -1964,6 +1976,7 @@ def memory( for view_name in view_names: if not db[view_name].exists(): db.create_view(view_name, "select * from [{}]".format(file_table)) + if fp: fp.close() diff --git a/sqlite_utils/db.py b/sqlite_utils/db.py index 6b3de87d..a8aa2209 100644 --- a/sqlite_utils/db.py +++ b/sqlite_utils/db.py @@ -325,6 +325,8 @@ def __init__( execute_plugins: bool = True, strict: bool = False, ): + self.memory_name = None + self.memory = False assert (filename_or_conn is not None and (not memory and not memory_name)) or ( filename_or_conn is None and (memory or memory_name) ), "Either specify a filename_or_conn or pass memory=True" @@ -335,8 +337,11 @@ def __init__( uri=True, check_same_thread=False, ) + self.memory = True + self.memory_name = memory_name elif memory or filename_or_conn == ":memory:": self.conn = sqlite3.connect(":memory:") + self.memory = True elif isinstance(filename_or_conn, (str, pathlib.Path)): if recreate and os.path.exists(filename_or_conn): try: diff --git a/tests/test_cli_memory.py b/tests/test_cli_memory.py index f69de224..cd26eab0 100644 --- a/tests/test_cli_memory.py +++ b/tests/test_cli_memory.py @@ -1,6 +1,6 @@ import json - import pytest +import sys from click.testing import CliRunner from sqlite_utils import Database, cli @@ -305,3 +305,23 @@ def test_memory_functions(): ) assert result.exit_code == 0 assert result.output.strip() == '[{"hello()": "Hello"}]' + + +@pytest.mark.parametrize("enabled", (False, True)) +def test_memory_named_database_hack(enabled): + # https://github.com/simonw/sqlite-utils/issues/643 + sys._sqlite_utils_memory_test = enabled + try: + result = CliRunner().invoke( + cli.cli, + ["memory", "-", "--analyze"], + input="id,name\n1,Cleo\n2,Bants", + ) + assert result.exit_code == 0 + db = Database(memory_name="sqlite_utils_memory") + if enabled: + assert db.table_names() == ["stdin"] + else: + assert db.table_names() == [] + finally: + sys._sqlite_utils_memory_test = False