From 1bd121e2f74d015df2b834b24c53790ac7d6e885 Mon Sep 17 00:00:00 2001 From: Tillman Elser Date: Tue, 16 Jul 2024 16:48:39 -0700 Subject: [PATCH] add hash partitioning --- .../versions/d87a6410efe4_migration.py | 90 ++++++++++--------- 1 file changed, 50 insertions(+), 40 deletions(-) diff --git a/src/migrations/versions/d87a6410efe4_migration.py b/src/migrations/versions/d87a6410efe4_migration.py index 51058a808..860444f79 100644 --- a/src/migrations/versions/d87a6410efe4_migration.py +++ b/src/migrations/versions/d87a6410efe4_migration.py @@ -16,54 +16,64 @@ def upgrade(): - with op.get_context().autocommit_block(): - op.execute( - "DROP INDEX CONCURRENTLY IF EXISTS ix_grouping_records_stacktrace_embedding_hnsw_new" - ) + # Create a temporary table with the new structure + op.execute( + """ + CREATE TABLE grouping_records_tmp (LIKE grouping_records INCLUDING ALL) + PARTITION BY HASH (project_id); + """ + ) + # Create partitions + for i in range(100): op.execute( - """ - CREATE INDEX CONCURRENTLY IF NOT EXISTS - ix_grouping_records_stacktrace_embedding_hnsw_new - ON grouping_records USING hnsw (stacktrace_embedding vector_cosine_ops) - WITH (m = 16, ef_construction = 200) - """ + f""" + CREATE TABLE grouping_records_tmp_{i} + PARTITION OF grouping_records_tmp + FOR VALUES WITH (modulus 100, remainder {i}); + """ ) - op.execute( - "DROP INDEX CONCURRENTLY IF EXISTS ix_grouping_records_stacktrace_embedding_hnsw" - ) + # Copy data to the temporary table + op.execute("INSERT INTO grouping_records_tmp SELECT * FROM grouping_records;") - op.execute( - """ - ALTER INDEX ix_grouping_records_stacktrace_embedding_hnsw_new - RENAME TO ix_grouping_records_stacktrace_embedding_hnsw - """ - ) + # Rename tables + op.execute("ALTER TABLE grouping_records RENAME TO grouping_records_old;") + op.execute("ALTER TABLE grouping_records_tmp RENAME TO grouping_records;") + + # Recreate the index with new parameters + op.create_index( + "ix_grouping_records_stacktrace_embedding_hnsw", + "grouping_records", + ["stacktrace_embedding"], + unique=False, + postgresql_using="hnsw", + postgresql_with={"m": 16, "ef_construction": 200}, + postgresql_ops={"stacktrace_embedding": "vector_cosine_ops"}, + ) + + # Drop the old table + op.execute("DROP TABLE grouping_records_old;") def downgrade(): - with op.get_context().autocommit_block(): - op.execute( - "DROP INDEX CONCURRENTLY IF EXISTS ix_grouping_records_stacktrace_embedding_hnsw_old" - ) + # Create a temporary table without partitioning + op.execute("CREATE TABLE grouping_records_tmp (LIKE grouping_records INCLUDING ALL);") - op.execute( - """ - CREATE INDEX CONCURRENTLY IF NOT EXISTS - ix_grouping_records_stacktrace_embedding_hnsw_old - ON grouping_records USING hnsw (stacktrace_embedding vector_cosine_ops) - WITH (m = 16, ef_construction = 64) - """ - ) + # Copy data to the temporary table + op.execute("INSERT INTO grouping_records_tmp SELECT * FROM grouping_records;") - op.execute( - "DROP INDEX CONCURRENTLY IF EXISTS ix_grouping_records_stacktrace_embedding_hnsw" - ) + # Rename tables + op.execute("DROP TABLE grouping_records;") + op.execute("ALTER TABLE grouping_records_tmp RENAME TO grouping_records;") - op.execute( - """ - ALTER INDEX ix_grouping_records_stacktrace_embedding_hnsw_old - RENAME TO ix_grouping_records_stacktrace_embedding_hnsw - """ - ) + # Recreate the index with old parameters + op.create_index( + "ix_grouping_records_stacktrace_embedding_hnsw", + "grouping_records", + ["stacktrace_embedding"], + unique=False, + postgresql_using="hnsw", + postgresql_with={"m": 16, "ef_construction": 64}, + postgresql_ops={"stacktrace_embedding": "vector_cosine_ops"}, + )