From 894d22182e2b7784cf686c9f977a49a26ef51feb Mon Sep 17 00:00:00 2001 From: Anderson Banihirwe Date: Tue, 3 Sep 2024 20:32:20 -0700 Subject: [PATCH 1/9] enable search across `beneficiary`, `note`, `reason`, `account` in `/credits` endpoint --- offsets_db_api/routers/credits.py | 40 ++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/offsets_db_api/routers/credits.py b/offsets_db_api/routers/credits.py index b7cf5d9..54dba23 100644 --- a/offsets_db_api/routers/credits.py +++ b/offsets_db_api/routers/credits.py @@ -34,7 +34,11 @@ async def get_credits( ), search: str | None = Query( None, - description='Case insensitive search string. Currently searches on `project_id` and `name` fields only.', + description='Search string. Use "r:" prefix for regex search, "e:" for exact match, "entity:" for entity-specific search, or leave blank for case-insensitive partial match.', + ), + search_fields: list[str] = Query( + ['beneficiary', 'account', 'note', 'reason'], + description='Fields to search in', ), sort: list[str] = Query( default=['project_id'], @@ -76,19 +80,37 @@ async def get_credits( operation=operation, ) - # Handle 'search' filter separately due to its unique logic + # Handle advanced search if search: - search_pattern = f'%{search}%' - statement = statement.where( - or_( - col(Project.project_id).ilike(search_pattern), - col(Project.name).ilike(search_pattern), - ) - ) + search_conditions = [] + logger.info(f'Search string: {search}') + logger.info(f'Search fields: {search_fields}') + if search.startswith('r:'): + # Regex search + pattern = search[2:] # Remove 'r:' prefix + logger.info(f'Regex search pattern: {pattern}') + for field in search_fields: + if field in Credit.__table__.columns: + search_conditions.append(col(getattr(Credit, field)).op('~*')(pattern)) + elif field in Project.__table__.columns: + search_conditions.append(col(getattr(Project, field)).op('~*')(pattern)) + else: + # Case-insensitive partial match (default behavior) + search_pattern = f'%{search}%' + for field in search_fields: + if field in Credit.__table__.columns: + search_conditions.append(col(getattr(Credit, field)).ilike(search_pattern)) + elif field in Project.__table__.columns: + search_conditions.append(col(getattr(Project, field)).ilike(search_pattern)) + + if search_conditions: + statement = statement.where(or_(*search_conditions)) if sort: statement = apply_sorting(statement=statement, sort=sort, model=Credit, primary_key='id') + logger.info(f"SQL Query: {statement.compile(compile_kwargs={'literal_binds': True})}") + total_entries, current_page, total_pages, next_page, results = handle_pagination( statement=statement, primary_key=Credit.id, From 0db5cd64a1e43aa0c3e110af17b2033c24401bec Mon Sep 17 00:00:00 2001 From: Anderson Banihirwe Date: Tue, 3 Sep 2024 20:47:05 -0700 Subject: [PATCH 2/9] update docstring --- offsets_db_api/routers/credits.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/offsets_db_api/routers/credits.py b/offsets_db_api/routers/credits.py index 54dba23..3979472 100644 --- a/offsets_db_api/routers/credits.py +++ b/offsets_db_api/routers/credits.py @@ -34,10 +34,10 @@ async def get_credits( ), search: str | None = Query( None, - description='Search string. Use "r:" prefix for regex search, "e:" for exact match, "entity:" for entity-specific search, or leave blank for case-insensitive partial match.', + description='Search string. Use "r:" prefix for regex search, or leave blank for case-insensitive partial match.', ), search_fields: list[str] = Query( - ['beneficiary', 'account', 'note', 'reason'], + default=['beneficiary', 'account', 'note', 'reason'], description='Fields to search in', ), sort: list[str] = Query( @@ -109,7 +109,7 @@ async def get_credits( if sort: statement = apply_sorting(statement=statement, sort=sort, model=Credit, primary_key='id') - logger.info(f"SQL Query: {statement.compile(compile_kwargs={'literal_binds': True})}") + logger.info(f"SQL Credits Query: {statement.compile(compile_kwargs={'literal_binds': True})}") total_entries, current_page, total_pages, next_page, results = handle_pagination( statement=statement, From acfcbee1ea7d4fc50f4f90898299ebb8cb364f6b Mon Sep 17 00:00:00 2001 From: Anderson Banihirwe Date: Thu, 5 Sep 2024 18:18:42 -0700 Subject: [PATCH 3/9] Refactor search fields in credits endpoint to reflect retirement context --- Procfile | 1 - offsets_db_api/routers/credits.py | 7 ++++++- 2 files changed, 6 insertions(+), 2 deletions(-) delete mode 100644 Procfile diff --git a/Procfile b/Procfile deleted file mode 100644 index bca02e7..0000000 --- a/Procfile +++ /dev/null @@ -1 +0,0 @@ -web: gunicorn -w $OFFSETS_DB_WEB_CONCURRENCY -t 120 -k uvicorn.workers.UvicornWorker offsets_db_api.main:app --config gunicorn_config.py --access-logfile '-' --error-logfile '-' diff --git a/offsets_db_api/routers/credits.py b/offsets_db_api/routers/credits.py index 3979472..94be630 100644 --- a/offsets_db_api/routers/credits.py +++ b/offsets_db_api/routers/credits.py @@ -37,7 +37,12 @@ async def get_credits( description='Search string. Use "r:" prefix for regex search, or leave blank for case-insensitive partial match.', ), search_fields: list[str] = Query( - default=['beneficiary', 'account', 'note', 'reason'], + default=[ + 'retirement_beneficiary', + 'retirement_account', + 'retirement_note', + 'retirement_reason', + ], description='Fields to search in', ), sort: list[str] = Query( From 54367de02e04d4465a75378fb5ad471cd05fee3c Mon Sep 17 00:00:00 2001 From: Anderson Banihirwe Date: Thu, 5 Sep 2024 18:19:14 -0700 Subject: [PATCH 4/9] Refactor gunicorn process name in fly.staging.toml --- fly.staging.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fly.staging.toml b/fly.staging.toml index 365d18c..c870936 100644 --- a/fly.staging.toml +++ b/fly.staging.toml @@ -25,10 +25,10 @@ release_command = "bash -l release.sh" [processes] - web = "gunicorn -w $OFFSETS_DB_WEB_CONCURRENCY -t 120 -k uvicorn.workers.UvicornWorker offsets_db_api.main:app --config gunicorn_config.py --access-logfile '-' --error-logfile '-'" + app = "gunicorn -w $OFFSETS_DB_WEB_CONCURRENCY -t 120 -k uvicorn.workers.UvicornWorker offsets_db_api.main:app --config gunicorn_config.py --access-logfile '-' --error-logfile '-'" [[services]] -processes = ["web"] +processes = ["app"] protocol = "tcp" internal_port = 8000 force_https = true From 1ca249aea4d4fa7bb5ae5ae73139b71e4a541001 Mon Sep 17 00:00:00 2001 From: Anderson Banihirwe Date: Mon, 9 Sep 2024 23:48:13 -0700 Subject: [PATCH 5/9] Refactor search fields in credits endpoint to enable trigram similarity search --- ...fd_rename_credit_transaction_fields_to_.py | 25 ++++ offsets_db_api/routers/credits.py | 135 ++++++++++++++++-- 2 files changed, 148 insertions(+), 12 deletions(-) diff --git a/migrations/versions/8b97dd22cafd_rename_credit_transaction_fields_to_.py b/migrations/versions/8b97dd22cafd_rename_credit_transaction_fields_to_.py index b4f5db8..771c7f0 100644 --- a/migrations/versions/8b97dd22cafd_rename_credit_transaction_fields_to_.py +++ b/migrations/versions/8b97dd22cafd_rename_credit_transaction_fields_to_.py @@ -19,6 +19,8 @@ def upgrade() -> None: + # Create pg_trgm extension + op.execute('CREATE EXTENSION IF NOT EXISTS pg_trgm') # ### commands auto generated by Alembic - please adjust! ### op.create_table( 'clip', @@ -117,10 +119,30 @@ def upgrade() -> None: op.create_index( op.f('ix_credit_transaction_date'), 'credit', ['transaction_date'], unique=False ) + + # Create GIN indexes for trigram similarity search + op.execute( + 'CREATE INDEX idx_credit_retirement_beneficiary_trgm ON credit USING gin (retirement_beneficiary gin_trgm_ops)' + ) + op.execute( + 'CREATE INDEX idx_credit_retirement_note_trgm ON credit USING gin (retirement_note gin_trgm_ops)' + ) + op.execute( + 'CREATE INDEX idx_credit_retirement_account_trgm ON credit USING gin (retirement_account gin_trgm_ops)' + ) + op.execute( + 'CREATE INDEX idx_credit_retirement_reason_trgm ON credit USING gin (retirement_reason gin_trgm_ops)' + ) # ### end Alembic commands ### def downgrade() -> None: + # Remove GIN indexes + op.execute('DROP INDEX IF EXISTS idx_credit_retirement_beneficiary_trgm') + op.execute('DROP INDEX IF EXISTS idx_credit_retirement_note_trgm') + op.execute('DROP INDEX IF EXISTS idx_credit_retirement_account_trgm') + op.execute('DROP INDEX IF EXISTS idx_credit_retirement_reason_trgm') + # ### commands auto generated by Alembic - please adjust! ### op.drop_index(op.f('ix_credit_transaction_date'), table_name='credit') op.drop_index(op.f('ix_credit_retirement_reason'), table_name='credit') @@ -134,4 +156,7 @@ def downgrade() -> None: op.drop_table('project') op.drop_table('file') op.drop_table('clip') + + # Remove pg_trgm extension + op.execute('DROP EXTENSION IF EXISTS pg_trgm') # ### end Alembic commands ### diff --git a/offsets_db_api/routers/credits.py b/offsets_db_api/routers/credits.py index 94be630..fe1e5fb 100644 --- a/offsets_db_api/routers/credits.py +++ b/offsets_db_api/routers/credits.py @@ -1,8 +1,11 @@ import datetime +import json +import re from fastapi import APIRouter, Depends, Query, Request from fastapi_cache.decorator import cache -from sqlmodel import Session, col, or_, select +from pydantic import BaseModel +from sqlmodel import Session, col, func, or_, select from offsets_db_api.cache import CACHE_NAMESPACE from offsets_db_api.database import get_session @@ -15,6 +18,74 @@ router = APIRouter() logger = get_logger() +# Helper functions + + +def normalize_text(text: str) -> str: + return re.sub(r'[^\w\s]', '', text.lower()).strip() + + +ACRONYM_EXPANSIONS = { + 'jp': ['j p', 'j.p.'], + 'ms': ['microsoft'], + 'ibm': ['i b m', 'i.b.m.'], + # Add more acronym expansions as needed +} + + +def expand_acronyms(text: str) -> list[str]: + words = text.split() + expansions = [text] + + for i, word in enumerate(words): + if word in ACRONYM_EXPANSIONS: + for expansion in ACRONYM_EXPANSIONS[word]: + new_words = words.copy() + new_words[i] = expansion + expansions.append(' '.join(new_words)) + + return expansions + + +COMPANY_ALIASES = { + 'apple': ['apple inc', 'apple incorporated'], + 'jp morgan': ['jpmorgan', 'jp morgan chase', 'chase bank', 'j p morgan', 'j.p. morgan'], + 'microsoft': ['microsoft corporation', 'ms'], + # Add more aliases as needed +} + + +def get_aliases(term: str) -> list[str]: + normalized_term = normalize_text(term) + return next( + ( + [key] + aliases + for key, aliases in COMPANY_ALIASES.items() + if normalized_term in [normalize_text(a) for a in [key] + aliases] + ), + [normalized_term], + ) + + +class SearchField(BaseModel): + field: str + weight: float + + +def parse_search_fields( + search_fields_str: str = '[{"field":"retirement_beneficiary","weight":2.0},{"field":"retirement_account","weight":1.5},{"field":"retirement_note","weight":1.0},{"field":"retirement_reason","weight":1.0}]', +) -> list[SearchField]: + try: + search_fields = json.loads(search_fields_str) + return [SearchField(**field) for field in search_fields] + except json.JSONDecodeError: + raise ValueError('Invalid JSON format for search_fields') + except KeyError: + raise ValueError("Each search field must have 'field' and 'weight' keys") + + +# Main endpoint + @router.get('/', summary='List credits', response_model=PaginatedCredits) @cache(namespace=CACHE_NAMESPACE) @@ -34,16 +105,14 @@ async def get_credits( ), search: str | None = Query( None, - description='Search string. Use "r:" prefix for regex search, or leave blank for case-insensitive partial match.', + description='Search string. Use "r:" prefix for regex search, "t:" for trigram search, "w:" for weighted search, or leave blank for case-insensitive partial match.', + ), + search_fields: str = Query( + default='[{"field":"retirement_beneficiary","weight":2.0},{"field":"retirement_account","weight":1.5},{"field":"retirement_note","weight":1.0},{"field":"retirement_reason","weight":1.0}]', + description='JSON string of fields to search in with their weights', ), - search_fields: list[str] = Query( - default=[ - 'retirement_beneficiary', - 'retirement_account', - 'retirement_note', - 'retirement_reason', - ], - description='Fields to search in', + similarity_threshold: float = Query( + 0.7, ge=0.0, le=1.0, description='similarity threshold (0-1)' ), sort: list[str] = Query( default=['project_id'], @@ -90,19 +159,61 @@ async def get_credits( search_conditions = [] logger.info(f'Search string: {search}') logger.info(f'Search fields: {search_fields}') + + search_fields = parse_search_fields(search_fields) + if search.startswith('r:'): # Regex search pattern = search[2:] # Remove 'r:' prefix logger.info(f'Regex search pattern: {pattern}') - for field in search_fields: + for field_info in search_fields: + field = field_info.field if field in Credit.__table__.columns: search_conditions.append(col(getattr(Credit, field)).op('~*')(pattern)) elif field in Project.__table__.columns: search_conditions.append(col(getattr(Project, field)).op('~*')(pattern)) + elif search.startswith('t:'): + # Trigram similarity search + search_term = search[2:] # Remove 't:' prefix + logger.info(f'Trigram search term: {search_term}') + for field_info in search_fields: + field = field_info.field + if field in Credit.__table__.columns: + search_conditions.append( + func.word_similarity(func.lower(getattr(Credit, field)), search_term) + > similarity_threshold + ) + elif field in Project.__table__.columns: + search_conditions.append( + func.word_similarity(func.lower(getattr(Project, field)), search_term) + > similarity_threshold + ) + elif search.startswith('w:'): + # Weighted search with alias and acronym expansion + search_term = search[2:] # Remove 'w:' prefix + logger.info(f'Weighted search term: {search_term}') + variations = expand_acronyms(search_term) + variations.extend(get_aliases(search_term)) + + for variation in variations: + for field_info in search_fields: + field = field_info.field + weight = field_info.weight + if field in Credit.__table__.columns: + search_conditions.append( + func.similarity(func.lower(getattr(Credit, field)), variation) * weight + > similarity_threshold + ) + elif field in Project.__table__.columns: + search_conditions.append( + func.similarity(func.lower(getattr(Project, field)), variation) * weight + > similarity_threshold + ) else: # Case-insensitive partial match (default behavior) search_pattern = f'%{search}%' - for field in search_fields: + for field_info in search_fields: + field = field_info.field if field in Credit.__table__.columns: search_conditions.append(col(getattr(Credit, field)).ilike(search_pattern)) elif field in Project.__table__.columns: From e3f948e7f0f4698d08c6942b1a9449c2927c87f7 Mon Sep 17 00:00:00 2001 From: Anderson Banihirwe Date: Tue, 10 Sep 2024 16:03:31 -0700 Subject: [PATCH 6/9] remove unnecessary function helpers --- offsets_db_api/routers/credits.py | 158 ++++-------------------------- 1 file changed, 19 insertions(+), 139 deletions(-) diff --git a/offsets_db_api/routers/credits.py b/offsets_db_api/routers/credits.py index fe1e5fb..a8e751d 100644 --- a/offsets_db_api/routers/credits.py +++ b/offsets_db_api/routers/credits.py @@ -1,10 +1,7 @@ import datetime -import json -import re from fastapi import APIRouter, Depends, Query, Request from fastapi_cache.decorator import cache -from pydantic import BaseModel from sqlmodel import Session, col, func, or_, select from offsets_db_api.cache import CACHE_NAMESPACE @@ -18,74 +15,6 @@ router = APIRouter() logger = get_logger() -# Helper functions - - -def normalize_text(text: str) -> str: - return re.sub(r'[^\w\s]', '', text.lower()).strip() - - -ACRONYM_EXPANSIONS = { - 'jp': ['j p', 'j.p.'], - 'ms': ['microsoft'], - 'ibm': ['i b m', 'i.b.m.'], - # Add more acronym expansions as needed -} - - -def expand_acronyms(text: str) -> list[str]: - words = text.split() - expansions = [text] - - for i, word in enumerate(words): - if word in ACRONYM_EXPANSIONS: - for expansion in ACRONYM_EXPANSIONS[word]: - new_words = words.copy() - new_words[i] = expansion - expansions.append(' '.join(new_words)) - - return expansions - - -COMPANY_ALIASES = { - 'apple': ['apple inc', 'apple incorporated'], - 'jp morgan': ['jpmorgan', 'jp morgan chase', 'chase bank', 'j p morgan', 'j.p. morgan'], - 'microsoft': ['microsoft corporation', 'ms'], - # Add more aliases as needed -} - - -def get_aliases(term: str) -> list[str]: - normalized_term = normalize_text(term) - return next( - ( - [key] + aliases - for key, aliases in COMPANY_ALIASES.items() - if normalized_term in [normalize_text(a) for a in [key] + aliases] - ), - [normalized_term], - ) - - -class SearchField(BaseModel): - field: str - weight: float - - -def parse_search_fields( - search_fields_str: str = '[{"field":"retirement_beneficiary","weight":2.0},{"field":"retirement_account","weight":1.5},{"field":"retirement_note","weight":1.0},{"field":"retirement_reason","weight":1.0}]', -) -> list[SearchField]: - try: - search_fields = json.loads(search_fields_str) - return [SearchField(**field) for field in search_fields] - except json.JSONDecodeError: - raise ValueError('Invalid JSON format for search_fields') - except KeyError: - raise ValueError("Each search field must have 'field' and 'weight' keys") - - -# Main endpoint - @router.get('/', summary='List credits', response_model=PaginatedCredits) @cache(namespace=CACHE_NAMESPACE) @@ -107,12 +36,14 @@ async def get_credits( None, description='Search string. Use "r:" prefix for regex search, "t:" for trigram search, "w:" for weighted search, or leave blank for case-insensitive partial match.', ), - search_fields: str = Query( - default='[{"field":"retirement_beneficiary","weight":2.0},{"field":"retirement_account","weight":1.5},{"field":"retirement_note","weight":1.0},{"field":"retirement_reason","weight":1.0}]', - description='JSON string of fields to search in with their weights', - ), - similarity_threshold: float = Query( - 0.7, ge=0.0, le=1.0, description='similarity threshold (0-1)' + search_fields: list[str] = Query( + default=[ + 'retirement_beneficiary', + 'retirement_account', + 'retirement_note', + 'retirement_reason', + ], + description='Fields to search in', ), sort: list[str] = Query( default=['project_id'], @@ -154,70 +85,19 @@ async def get_credits( operation=operation, ) - # Handle advanced search if search: + # Default to case-insensitive partial match + search_term = f'%{search}%' search_conditions = [] - logger.info(f'Search string: {search}') - logger.info(f'Search fields: {search_fields}') - - search_fields = parse_search_fields(search_fields) - - if search.startswith('r:'): - # Regex search - pattern = search[2:] # Remove 'r:' prefix - logger.info(f'Regex search pattern: {pattern}') - for field_info in search_fields: - field = field_info.field - if field in Credit.__table__.columns: - search_conditions.append(col(getattr(Credit, field)).op('~*')(pattern)) - elif field in Project.__table__.columns: - search_conditions.append(col(getattr(Project, field)).op('~*')(pattern)) - elif search.startswith('t:'): - # Trigram similarity search - search_term = search[2:] # Remove 't:' prefix - logger.info(f'Trigram search term: {search_term}') - for field_info in search_fields: - field = field_info.field - if field in Credit.__table__.columns: - search_conditions.append( - func.word_similarity(func.lower(getattr(Credit, field)), search_term) - > similarity_threshold - ) - elif field in Project.__table__.columns: - search_conditions.append( - func.word_similarity(func.lower(getattr(Project, field)), search_term) - > similarity_threshold - ) - elif search.startswith('w:'): - # Weighted search with alias and acronym expansion - search_term = search[2:] # Remove 'w:' prefix - logger.info(f'Weighted search term: {search_term}') - variations = expand_acronyms(search_term) - variations.extend(get_aliases(search_term)) - - for variation in variations: - for field_info in search_fields: - field = field_info.field - weight = field_info.weight - if field in Credit.__table__.columns: - search_conditions.append( - func.similarity(func.lower(getattr(Credit, field)), variation) * weight - > similarity_threshold - ) - elif field in Project.__table__.columns: - search_conditions.append( - func.similarity(func.lower(getattr(Project, field)), variation) * weight - > similarity_threshold - ) - else: - # Case-insensitive partial match (default behavior) - search_pattern = f'%{search}%' - for field_info in search_fields: - field = field_info.field - if field in Credit.__table__.columns: - search_conditions.append(col(getattr(Credit, field)).ilike(search_pattern)) - elif field in Project.__table__.columns: - search_conditions.append(col(getattr(Project, field)).ilike(search_pattern)) + for field in search_fields: + if field in Credit.__table__.columns: + search_conditions.append( + func.lower(getattr(Credit, field)).like(func.lower(search_term)) + ) + elif field in Project.__table__.columns: + search_conditions.append( + func.lower(getattr(Project, field)).like(func.lower(search_term)) + ) if search_conditions: statement = statement.where(or_(*search_conditions)) From d4a70c1c35fda1a6422f6c7566e180e02e2889b7 Mon Sep 17 00:00:00 2001 From: Anderson Banihirwe <13301940+andersy005@users.noreply.github.com> Date: Tue, 10 Sep 2024 16:23:08 -0700 Subject: [PATCH 7/9] Refactor Procfile and fly.toml files (#123) --- fly.prod.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fly.prod.toml b/fly.prod.toml index afb344b..4dc77c7 100644 --- a/fly.prod.toml +++ b/fly.prod.toml @@ -23,11 +23,11 @@ PORT = "8000" release_command = "bash -l release.sh" [processes] - web = "gunicorn -w $OFFSETS_DB_WEB_CONCURRENCY -t 120 -k uvicorn.workers.UvicornWorker offsets_db_api.main:app --config gunicorn_config.py --access-logfile '-' --error-logfile '-'" + app = "gunicorn -w $OFFSETS_DB_WEB_CONCURRENCY -t 120 -k uvicorn.workers.UvicornWorker offsets_db_api.main:app --config gunicorn_config.py --access-logfile '-' --error-logfile '-'" [[services]] -processes = ["web"] +processes = ["app"] protocol = "tcp" internal_port = 8000 force_https = true From 6b2910dfddd81d310eb91772f37efe9e20615f8a Mon Sep 17 00:00:00 2001 From: Anderson Banihirwe <13301940+andersy005@users.noreply.github.com> Date: Wed, 11 Sep 2024 13:19:49 -0700 Subject: [PATCH 8/9] add `pg_trgm` extension and ensure the indexes are maintained during db update (#124) --- migrations/script.py.mako | 1 + .../46b4797494ca_remove_b_tree_indexes.py | 36 +++++++ ...54cad_reset_migrations_and_add_indexes.py} | 98 ++++++++++++++----- offsets_db_api/models.py | 60 +++++++++--- offsets_db_api/tasks.py | 84 +++++++++++++--- 5 files changed, 228 insertions(+), 51 deletions(-) create mode 100644 migrations/versions/46b4797494ca_remove_b_tree_indexes.py rename migrations/versions/{8b97dd22cafd_rename_credit_transaction_fields_to_.py => 94e3b0854cad_reset_migrations_and_add_indexes.py} (67%) diff --git a/migrations/script.py.mako b/migrations/script.py.mako index 3124b62..d772443 100644 --- a/migrations/script.py.mako +++ b/migrations/script.py.mako @@ -18,6 +18,7 @@ depends_on = ${repr(depends_on)} def upgrade() -> None: + op.execute('CREATE EXTENSION IF NOT EXISTS pg_trgm') ${upgrades if upgrades else "pass"} diff --git a/migrations/versions/46b4797494ca_remove_b_tree_indexes.py b/migrations/versions/46b4797494ca_remove_b_tree_indexes.py new file mode 100644 index 0000000..fe93902 --- /dev/null +++ b/migrations/versions/46b4797494ca_remove_b_tree_indexes.py @@ -0,0 +1,36 @@ +"""remove b-tree indexes + +Revision ID: 46b4797494ca +Revises: 94e3b0854cad +Create Date: 2024-09-11 13:10:17.186167 + +""" + +from alembic import op + +# revision identifiers, used by Alembic. +revision = '46b4797494ca' +down_revision = '94e3b0854cad' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.execute('CREATE EXTENSION IF NOT EXISTS pg_trgm') + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index('ix_credit_retirement_account', table_name='credit') + op.drop_index('ix_credit_retirement_beneficiary', table_name='credit') + op.drop_index('ix_credit_retirement_note', table_name='credit') + op.drop_index('ix_credit_retirement_reason', table_name='credit') + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_index('ix_credit_retirement_reason', 'credit', ['retirement_reason'], unique=False) + op.create_index('ix_credit_retirement_note', 'credit', ['retirement_note'], unique=False) + op.create_index( + 'ix_credit_retirement_beneficiary', 'credit', ['retirement_beneficiary'], unique=False + ) + op.create_index('ix_credit_retirement_account', 'credit', ['retirement_account'], unique=False) + # ### end Alembic commands ### diff --git a/migrations/versions/8b97dd22cafd_rename_credit_transaction_fields_to_.py b/migrations/versions/94e3b0854cad_reset_migrations_and_add_indexes.py similarity index 67% rename from migrations/versions/8b97dd22cafd_rename_credit_transaction_fields_to_.py rename to migrations/versions/94e3b0854cad_reset_migrations_and_add_indexes.py index 771c7f0..c3ff3e1 100644 --- a/migrations/versions/8b97dd22cafd_rename_credit_transaction_fields_to_.py +++ b/migrations/versions/94e3b0854cad_reset_migrations_and_add_indexes.py @@ -1,8 +1,8 @@ -"""Rename credit transaction fields to reflect retirement context +"""reset migrations and add indexes -Revision ID: 8b97dd22cafd +Revision ID: 94e3b0854cad Revises: -Create Date: 2024-09-05 14:30:52.228849 +Create Date: 2024-09-10 19:03:00.335882 """ @@ -12,14 +12,13 @@ from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. -revision = '8b97dd22cafd' +revision = '94e3b0854cad' down_revision = None branch_labels = None depends_on = None def upgrade() -> None: - # Create pg_trgm extension op.execute('CREATE EXTENSION IF NOT EXISTS pg_trgm') # ### commands auto generated by Alembic - please adjust! ### op.create_table( @@ -35,6 +34,7 @@ def upgrade() -> None: sa.Column('id', sa.Integer(), nullable=False), sa.PrimaryKeyConstraint('id'), ) + op.create_index(op.f('ix_clip_id'), 'clip', ['id'], unique=False) op.create_table( 'file', sa.Column('id', sa.Integer(), nullable=False), @@ -52,6 +52,7 @@ def upgrade() -> None: ), sa.PrimaryKeyConstraint('id'), ) + op.create_index(op.f('ix_file_id'), 'file', ['id'], unique=False) op.create_table( 'project', sa.Column('project_id', sqlmodel.sql.sqltypes.AutoString(), nullable=False), @@ -71,7 +72,21 @@ def upgrade() -> None: sa.Column('project_url', sqlmodel.sql.sqltypes.AutoString(), nullable=True), sa.PrimaryKeyConstraint('project_id'), ) + op.create_index( + 'ix_project_name_gin', + 'project', + [sa.text('lower(name) gin_trgm_ops')], + unique=False, + postgresql_using='gin', + ) op.create_index(op.f('ix_project_project_id'), 'project', ['project_id'], unique=True) + op.create_index( + 'ix_project_project_id_gin', + 'project', + [sa.text('lower(project_id) gin_trgm_ops')], + unique=False, + postgresql_using='gin', + ) op.create_table( 'clipproject', sa.Column('id', sa.Integer(), nullable=False), @@ -87,6 +102,9 @@ def upgrade() -> None: ), sa.PrimaryKeyConstraint('id'), ) + op.create_index(op.f('ix_clipproject_clip_id'), 'clipproject', ['clip_id'], unique=False) + op.create_index(op.f('ix_clipproject_id'), 'clipproject', ['id'], unique=False) + op.create_index(op.f('ix_clipproject_project_id'), 'clipproject', ['project_id'], unique=False) op.create_table( 'credit', sa.Column('quantity', sa.BigInteger(), nullable=True), @@ -109,54 +127,80 @@ def upgrade() -> None: op.create_index( op.f('ix_credit_retirement_account'), 'credit', ['retirement_account'], unique=False ) + op.create_index( + 'ix_credit_retirement_account_gin', + 'credit', + [sa.text('lower(retirement_account) gin_trgm_ops')], + unique=False, + postgresql_using='gin', + ) op.create_index( op.f('ix_credit_retirement_beneficiary'), 'credit', ['retirement_beneficiary'], unique=False ) - op.create_index(op.f('ix_credit_retirement_note'), 'credit', ['retirement_note'], unique=False) op.create_index( - op.f('ix_credit_retirement_reason'), 'credit', ['retirement_reason'], unique=False + 'ix_credit_retirement_beneficiary_gin', + 'credit', + [sa.text('lower(retirement_beneficiary) gin_trgm_ops')], + unique=False, + postgresql_using='gin', ) + op.create_index(op.f('ix_credit_retirement_note'), 'credit', ['retirement_note'], unique=False) op.create_index( - op.f('ix_credit_transaction_date'), 'credit', ['transaction_date'], unique=False + 'ix_credit_retirement_note_gin', + 'credit', + [sa.text('lower(retirement_note) gin_trgm_ops')], + unique=False, + postgresql_using='gin', ) - - # Create GIN indexes for trigram similarity search - op.execute( - 'CREATE INDEX idx_credit_retirement_beneficiary_trgm ON credit USING gin (retirement_beneficiary gin_trgm_ops)' + op.create_index( + op.f('ix_credit_retirement_reason'), 'credit', ['retirement_reason'], unique=False ) - op.execute( - 'CREATE INDEX idx_credit_retirement_note_trgm ON credit USING gin (retirement_note gin_trgm_ops)' + op.create_index( + 'ix_credit_retirement_reason_gin', + 'credit', + [sa.text('lower(retirement_reason) gin_trgm_ops')], + unique=False, + postgresql_using='gin', ) - op.execute( - 'CREATE INDEX idx_credit_retirement_account_trgm ON credit USING gin (retirement_account gin_trgm_ops)' + op.create_index( + op.f('ix_credit_transaction_date'), 'credit', ['transaction_date'], unique=False ) - op.execute( - 'CREATE INDEX idx_credit_retirement_reason_trgm ON credit USING gin (retirement_reason gin_trgm_ops)' + op.create_index( + 'ix_credit_transaction_type_gin', + 'credit', + [sa.text('lower(transaction_type) gin_trgm_ops')], + unique=False, + postgresql_using='gin', ) # ### end Alembic commands ### def downgrade() -> None: - # Remove GIN indexes - op.execute('DROP INDEX IF EXISTS idx_credit_retirement_beneficiary_trgm') - op.execute('DROP INDEX IF EXISTS idx_credit_retirement_note_trgm') - op.execute('DROP INDEX IF EXISTS idx_credit_retirement_account_trgm') - op.execute('DROP INDEX IF EXISTS idx_credit_retirement_reason_trgm') - # ### commands auto generated by Alembic - please adjust! ### + op.drop_index('ix_credit_transaction_type_gin', table_name='credit', postgresql_using='gin') op.drop_index(op.f('ix_credit_transaction_date'), table_name='credit') + op.drop_index('ix_credit_retirement_reason_gin', table_name='credit', postgresql_using='gin') op.drop_index(op.f('ix_credit_retirement_reason'), table_name='credit') + op.drop_index('ix_credit_retirement_note_gin', table_name='credit', postgresql_using='gin') op.drop_index(op.f('ix_credit_retirement_note'), table_name='credit') + op.drop_index( + 'ix_credit_retirement_beneficiary_gin', table_name='credit', postgresql_using='gin' + ) op.drop_index(op.f('ix_credit_retirement_beneficiary'), table_name='credit') + op.drop_index('ix_credit_retirement_account_gin', table_name='credit', postgresql_using='gin') op.drop_index(op.f('ix_credit_retirement_account'), table_name='credit') op.drop_index(op.f('ix_credit_project_id'), table_name='credit') op.drop_table('credit') + op.drop_index(op.f('ix_clipproject_project_id'), table_name='clipproject') + op.drop_index(op.f('ix_clipproject_id'), table_name='clipproject') + op.drop_index(op.f('ix_clipproject_clip_id'), table_name='clipproject') op.drop_table('clipproject') + op.drop_index('ix_project_project_id_gin', table_name='project', postgresql_using='gin') op.drop_index(op.f('ix_project_project_id'), table_name='project') + op.drop_index('ix_project_name_gin', table_name='project', postgresql_using='gin') op.drop_table('project') + op.drop_index(op.f('ix_file_id'), table_name='file') op.drop_table('file') + op.drop_index(op.f('ix_clip_id'), table_name='clip') op.drop_table('clip') - - # Remove pg_trgm extension - op.execute('DROP EXTENSION IF EXISTS pg_trgm') # ### end Alembic commands ### diff --git a/offsets_db_api/models.py b/offsets_db_api/models.py index a41d61c..9739add 100644 --- a/offsets_db_api/models.py +++ b/offsets_db_api/models.py @@ -3,13 +3,13 @@ import pydantic from sqlalchemy.dialects import postgresql -from sqlmodel import BigInteger, Column, Field, Relationship, SQLModel, String +from sqlmodel import BigInteger, Column, Field, Index, Relationship, SQLModel, String, text from offsets_db_api.schemas import FileCategory, FileStatus, Pagination class File(SQLModel, table=True): - id: int = Field(default=None, primary_key=True) + id: int = Field(default=None, primary_key=True, index=True) url: str content_hash: str | None = Field(description='Hash of file contents') status: FileStatus = Field(default='pending', description='Status of file processing') @@ -55,6 +55,14 @@ class ProjectBase(SQLModel): class Project(ProjectBase, table=True): + __table_args__ = ( + Index( + 'ix_project_project_id_gin', + text('lower(project_id) gin_trgm_ops'), + postgresql_using='gin', + ), + Index('ix_project_name_gin', text('lower(name) gin_trgm_ops'), postgresql_using='gin'), + ) credits: list['Credit'] = Relationship( back_populates='project', sa_relationship_kwargs={ @@ -82,7 +90,7 @@ class ClipBase(SQLModel): class Clip(ClipBase, table=True): - id: int = Field(default=None, primary_key=True) + id: int = Field(default=None, primary_key=True, index=True) project_relationships: list['ClipProject'] = Relationship( back_populates='clip', sa_relationship_kwargs={'cascade': 'all,delete,delete-orphan'} ) @@ -99,9 +107,11 @@ class ClipwithProjects(ClipBase): class ClipProject(SQLModel, table=True): - id: int = Field(default=None, primary_key=True) - clip_id: int = Field(description='Id of clip', foreign_key='clip.id') - project_id: str = Field(description='Id of project', foreign_key='project.project_id') + id: int = Field(default=None, primary_key=True, index=True) + clip_id: int = Field(description='Id of clip', foreign_key='clip.id', index=True) + project_id: str = Field( + description='Id of project', foreign_key='project.project_id', index=True + ) clip: Clip | None = Relationship(back_populates='project_relationships') project: Project | None = Relationship(back_populates='clip_relationships') @@ -117,15 +127,41 @@ class CreditBase(SQLModel): vintage: int | None = Field(description='Vintage year of credits') transaction_date: datetime.date | None = Field(description='Date of transaction', index=True) transaction_type: str | None = Field(description='Type of transaction') - retirement_account: str | None = Field( - description='Account used for the transaction', index=True - ) - retirement_beneficiary: str | None = Field(description='Beneficiary of credits', index=True) - retirement_reason: str | None = Field(description='Reason for transaction', index=True) - retirement_note: str | None = Field(description='Note', index=True) + retirement_account: str | None = Field(description='Account used for the transaction') + retirement_beneficiary: str | None = Field(description='Beneficiary of credits') + retirement_reason: str | None = Field(description='Reason for transaction') + retirement_note: str | None = Field(description='Note') class Credit(CreditBase, table=True): + __table_args__ = ( + Index( + 'ix_credit_transaction_type_gin', + text('lower(transaction_type) gin_trgm_ops'), + postgresql_using='gin', + ), + Index( + 'ix_credit_retirement_account_gin', + text('lower(retirement_account) gin_trgm_ops'), + postgresql_using='gin', + ), + Index( + 'ix_credit_retirement_beneficiary_gin', + text('lower(retirement_beneficiary) gin_trgm_ops'), + postgresql_using='gin', + ), + Index( + 'ix_credit_retirement_reason_gin', + text('lower(retirement_reason) gin_trgm_ops'), + postgresql_using='gin', + ), + Index( + 'ix_credit_retirement_note_gin', + text('lower(retirement_note) gin_trgm_ops'), + postgresql_using='gin', + ), + ) + id: int = Field(default=None, primary_key=True) project_id: str | None = Field( description='Project id used by registry system', diff --git a/offsets_db_api/tasks.py b/offsets_db_api/tasks.py index 435a514..8ccff0a 100644 --- a/offsets_db_api/tasks.py +++ b/offsets_db_api/tasks.py @@ -3,11 +3,14 @@ import pandas as pd from offsets_db_data.models import clip_schema, credit_schema, project_schema -from sqlmodel import ARRAY, BigInteger, Boolean, Date, DateTime, String, text +from offsets_db_data.registry import get_registry_from_project_id +from sqlalchemy.exc import IntegrityError +from sqlmodel import ARRAY, BigInteger, Boolean, Date, DateTime, Session, String, col, select, text from offsets_db_api.cache import watch_dog_file +from offsets_db_api.database import get_session from offsets_db_api.log import get_logger -from offsets_db_api.models import File +from offsets_db_api.models import File, Project logger = get_logger() @@ -16,12 +19,66 @@ def update_file_status(file, session, status, error=None): logger.info(f'🔄 Updating file status: {file.url}') file.status = status file.error = error + file.recorded_at = datetime.datetime.now(datetime.timezone.utc) session.add(file) session.commit() session.refresh(file) logger.info(f'✅ File status updated: {file.url}') +def ensure_projects_exist(df: pd.DataFrame, session: Session) -> None: + """ + Ensure all project IDs in the dataframe exist in the database. + If not, create placeholder projects for missing IDs. + """ + logger.info('🔍 Checking for missing project IDs') + + # Get all unique project IDs from the dataframe + credit_project_ids = df['project_id'].unique() + + # Query existing project IDs + existing_project_ids = set( + session.exec( + select(Project.project_id).where(col(Project.project_id).in_(credit_project_ids)) + ).all() + ) + + # Identify missing project IDs + missing_project_ids = set(credit_project_ids) - existing_project_ids + + logger.info(f'🔍 Found {len(existing_project_ids)} existing project IDs') + logger.info(f'🔍 Found {len(missing_project_ids)} missing project IDs: {missing_project_ids}') + + # Create placeholder projects for missing IDs + urls = { + 'verra': 'https://registry.verra.org/app/projectDetail/VCS/', + 'gold-standard': 'https://registry.goldstandard.org/projects?q=gs', + 'american-carbon-registry': 'https://acr2.apx.com/mymodule/reg/prjView.asp?id1=', + 'climate-action-reserve': 'https://thereserve2.apx.com/mymodule/reg/prjView.asp?id1=', + 'art-trees': 'https://art.apx.com/mymodule/reg/prjView.asp?id1=', + } + for project_id in missing_project_ids: + registry = get_registry_from_project_id(project_id) + if url := urls.get(registry): + url = f'{url}{project_id[3:]}' + placeholder_project = Project( + project_id=project_id, + registry=registry, + category=['unknown'], + protocol=['unknown'], + project_url=url, + ) + session.add(placeholder_project) + + try: + session.commit() + logger.info(f'✅ Added {len(missing_project_ids)} missing project IDs to the database') + except IntegrityError as exc: + session.rollback() + logger.error(f'❌ Error creating placeholder projects: {exc}') + raise + + def process_dataframe(df, table_name, engine, dtype_dict=None): logger.info(f'📝 Writing DataFrame to {table_name}') logger.info(f'engine: {engine}') @@ -29,10 +86,18 @@ def process_dataframe(df, table_name, engine, dtype_dict=None): with engine.begin() as conn: if engine.dialect.has_table(conn, table_name): # Instead of dropping table (which results in data type, schema overrides), delete all rows. - conn.execute(text(f'TRUNCATE TABLE {table_name} RESTART IDENTITY')) + conn.execute(text(f'TRUNCATE TABLE {table_name} RESTART IDENTITY CASCADE;')) - # write the data - df.to_sql(table_name, engine, if_exists='append', index=False, dtype=dtype_dict) + if table_name in {'credit', 'clipproject'}: + session = next(get_session()) + try: + logger.info(f'Processing data destined for {table_name} table...') + ensure_projects_exist(df, session) + except IntegrityError: + logger.error('❌ Failed to ensure projects exist. Continuing with data insertion.') + + # write the data + df.to_sql(table_name, conn, if_exists='append', index=False, dtype=dtype_dict) logger.info(f'✅ Written 🧬 shape {df.shape} to {table_name}') @@ -93,11 +158,6 @@ async def process_files(*, engine, session, files: list[File]): 'project_url': String, } - # Execute raw SQL to drop dependent tables and the project table - with engine.begin() as conn: - # conn.execute(text("DROP TABLE IF EXISTS clip, credit, project;")) - conn.execute(text('DROP TABLE IF EXISTS project CASCADE;')) - process_dataframe(df, 'project', engine, project_dtype_dict) update_file_status(file, session, 'success') @@ -129,7 +189,7 @@ async def process_files(*, engine, session, files: list[File]): update_file_status(file, session, 'failure', error=str(e)) with engine.begin() as conn: - conn.execute(text('DROP TABLE IF EXISTS clipproject, clip CASCADE;')) + conn.execute(text('TRUNCATE TABLE clipproject, clip RESTART IDENTITY CASCADE;')) df = pd.concat(clips_dfs).reset_index(drop=True).reset_index().rename(columns={'index': 'id'}) df = clip_schema.validate(df) @@ -155,6 +215,6 @@ async def process_files(*, engine, session, files: list[File]): # modify the watch_dog_file with open(watch_dog_file, 'w') as f: - now = datetime.datetime.utcnow() + now = datetime.datetime.now(datetime.timezone.utc) logger.info(f'✅ Updated watch_dog_file: {watch_dog_file} to {now}') f.write(now.strftime('%Y-%m-%d %H:%M:%S')) From 37ad6deea98ca7b2eea2abb784c8c5261dad2834 Mon Sep 17 00:00:00 2001 From: Anderson Banihirwe Date: Wed, 11 Sep 2024 14:04:01 -0700 Subject: [PATCH 9/9] remove `func.lower()`: since the GIN indexes already use `lower()`, we don't need to apply it again in our query --- offsets_db_api/routers/credits.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/offsets_db_api/routers/credits.py b/offsets_db_api/routers/credits.py index a8e751d..599e221 100644 --- a/offsets_db_api/routers/credits.py +++ b/offsets_db_api/routers/credits.py @@ -2,7 +2,7 @@ from fastapi import APIRouter, Depends, Query, Request from fastapi_cache.decorator import cache -from sqlmodel import Session, col, func, or_, select +from sqlmodel import Session, col, or_, select from offsets_db_api.cache import CACHE_NAMESPACE from offsets_db_api.database import get_session @@ -34,7 +34,7 @@ async def get_credits( ), search: str | None = Query( None, - description='Search string. Use "r:" prefix for regex search, "t:" for trigram search, "w:" for weighted search, or leave blank for case-insensitive partial match.', + description='Case insensitive search string. Currently searches in fields specified in `search_fileds` parameter', ), search_fields: list[str] = Query( default=[ @@ -91,13 +91,9 @@ async def get_credits( search_conditions = [] for field in search_fields: if field in Credit.__table__.columns: - search_conditions.append( - func.lower(getattr(Credit, field)).like(func.lower(search_term)) - ) + search_conditions.append(getattr(Credit, field).ilike(search_term)) elif field in Project.__table__.columns: - search_conditions.append( - func.lower(getattr(Project, field)).like(func.lower(search_term)) - ) + search_conditions.append(getattr(Project, field).ilike(search_term)) if search_conditions: statement = statement.where(or_(*search_conditions))