diff --git a/docs/python-api.rst b/docs/python-api.rst index af6cb520..74574d1c 100644 --- a/docs/python-api.rst +++ b/docs/python-api.rst @@ -1402,6 +1402,8 @@ To keep the original table around instead of dropping it, pass the ``keep_table= table.transform(types={"age": int}, keep_table="original_table") +This method raises a ``sqlite_utils.db.TransformError`` exception if the table cannot be transformed, usually because there are existing constraints or indexes that are incompatible with modifications to the columns. + .. _python_api_transform_alter_column_types: Altering column types diff --git a/sqlite_utils/db.py b/sqlite_utils/db.py index a8aa2209..557bef9e 100644 --- a/sqlite_utils/db.py +++ b/sqlite_utils/db.py @@ -156,6 +156,10 @@ Trigger = namedtuple("Trigger", ("name", "table", "sql")) +class TransformError(Exception): + pass + + ForeignKeyIndicator = Union[ str, ForeignKey, @@ -1972,6 +1976,30 @@ def transform_sql( sqls.append( "ALTER TABLE [{}] RENAME TO [{}];".format(new_table_name, self.name) ) + # Re-add existing indexes + for index in self.indexes: + if index.origin != "pk": + index_sql = self.db.execute( + """SELECT sql FROM sqlite_master WHERE type = 'index' AND name = :index_name;""", + {"index_name": index.name}, + ).fetchall()[0][0] + if index_sql is None: + raise TransformError( + f"Index '{index.name}' on table '{self.name}' does not have a " + "CREATE INDEX statement. You must manually drop this index prior to running this " + "transformation and manually recreate the new index after running this transformation." + ) + if keep_table: + sqls.append(f"DROP INDEX IF EXISTS [{index.name}];") + for col in index.columns: + if col in rename.keys() or col in drop: + raise TransformError( + f"Index '{index.name}' column '{col}' is not in updated table '{self.name}'. " + f"You must manually drop this index prior to running this transformation " + f"and manually recreate the new index after running this transformation. " + f"The original index sql statement is: `{index_sql}`. No changes have been applied to this table." + ) + sqls.append(index_sql) return sqls def extract( diff --git a/tests/test_transform.py b/tests/test_transform.py index 111236dd..7ced9d96 100644 --- a/tests/test_transform.py +++ b/tests/test_transform.py @@ -1,4 +1,4 @@ -from sqlite_utils.db import ForeignKey +from sqlite_utils.db import ForeignKey, TransformError from sqlite_utils.utils import OperationalError import pytest @@ -539,3 +539,125 @@ def test_transform_strict(fresh_db, strict): assert dogs.strict == strict or not fresh_db.supports_strict dogs.transform(not_null={"name"}) assert dogs.strict == strict or not fresh_db.supports_strict + + +@pytest.mark.parametrize( + "indexes, transform_params", + [ + ([["name"]], {"types": {"age": str}}), + ([["name"], ["age", "breed"]], {"types": {"age": str}}), + ([], {"types": {"age": str}}), + ([["name"]], {"types": {"age": str}, "keep_table": "old_dogs"}), + ], +) +def test_transform_indexes(fresh_db, indexes, transform_params): + # https://github.com/simonw/sqlite-utils/issues/633 + # New table should have same indexes as old table after transformation + dogs = fresh_db["dogs"] + dogs.insert({"id": 1, "name": "Cleo", "age": 5, "breed": "Labrador"}, pk="id") + + for index in indexes: + dogs.create_index(index) + + indexes_before_transform = dogs.indexes + + dogs.transform(**transform_params) + + assert sorted( + [ + {k: v for k, v in idx._asdict().items() if k != "seq"} + for idx in dogs.indexes + ], + key=lambda x: x["name"], + ) == sorted( + [ + {k: v for k, v in idx._asdict().items() if k != "seq"} + for idx in indexes_before_transform + ], + key=lambda x: x["name"], + ), f"Indexes before transform: {indexes_before_transform}\nIndexes after transform: {dogs.indexes}" + if "keep_table" in transform_params: + assert all( + index.origin == "pk" + for index in fresh_db[transform_params["keep_table"]].indexes + ) + + +def test_transform_retains_indexes_with_foreign_keys(fresh_db): + dogs = fresh_db["dogs"] + owners = fresh_db["owners"] + + dogs.insert({"id": 1, "name": "Cleo", "owner_id": 1}, pk="id") + owners.insert({"id": 1, "name": "Alice"}, pk="id") + + dogs.create_index(["name"]) + + indexes_before_transform = dogs.indexes + + fresh_db.add_foreign_keys([("dogs", "owner_id", "owners", "id")]) # calls transform + + assert sorted( + [ + {k: v for k, v in idx._asdict().items() if k != "seq"} + for idx in dogs.indexes + ], + key=lambda x: x["name"], + ) == sorted( + [ + {k: v for k, v in idx._asdict().items() if k != "seq"} + for idx in indexes_before_transform + ], + key=lambda x: x["name"], + ), f"Indexes before transform: {indexes_before_transform}\nIndexes after transform: {dogs.indexes}" + + +@pytest.mark.parametrize( + "transform_params", + [ + {"rename": {"age": "dog_age"}}, + {"drop": ["age"]}, + ], +) +def test_transform_with_indexes_errors(fresh_db, transform_params): + # Should error with a compound (name, age) index if age is renamed or dropped + dogs = fresh_db["dogs"] + dogs.insert({"id": 1, "name": "Cleo", "age": 5}, pk="id") + + dogs.create_index(["name", "age"]) + + with pytest.raises(TransformError) as excinfo: + dogs.transform(**transform_params) + + assert ( + "Index 'idx_dogs_name_age' column 'age' is not in updated table 'dogs'. " + "You must manually drop this index prior to running this transformation" + in str(excinfo.value) + ) + + +def test_transform_with_unique_constraint_implicit_index(fresh_db): + dogs = fresh_db["dogs"] + # Create a table with a UNIQUE constraint on 'name', which creates an implicit index + fresh_db.execute( + """ + CREATE TABLE dogs ( + id INTEGER PRIMARY KEY, + name TEXT UNIQUE, + age INTEGER + ); + """ + ) + dogs.insert({"id": 1, "name": "Cleo", "age": 5}) + + # Attempt to transform the table without modifying 'name' + with pytest.raises(TransformError) as excinfo: + dogs.transform(types={"age": str}) + + assert ( + "Index 'sqlite_autoindex_dogs_1' on table 'dogs' does not have a CREATE INDEX statement." + in str(excinfo.value) + ) + assert ( + "You must manually drop this index prior to running this transformation and manually recreate the new index after running this transformation." + in str(excinfo.value) + )