From dc3efa497c3505ed83f002c3fd6892bd6523d17f Mon Sep 17 00:00:00 2001 From: Anderson Banihirwe <13301940+andersy005@users.noreply.github.com> Date: Fri, 22 Nov 2024 10:16:22 -0800 Subject: [PATCH] Add project type to the `/projects/` and `/projects/{project_id'}` endpoints (#131) * Update offsets-db-data requirement to version 2024.11 * Add project type support and enhance file processing logging * Update test data URLs for November 2024 and add project types dataset * [skip-ci] Retrigger CI * Add project types to database update function * Update cache expiration to 2 hours and add project type filter to project queries --- .../46b4797494ca_remove_b_tree_indexes.py | 36 ----------------- ...exes.py => 8738137f943c_reset_database.py} | 40 +++++++++---------- offsets_db_api/main.py | 4 +- offsets_db_api/models.py | 20 +++++++++- offsets_db_api/routers/files.py | 2 + offsets_db_api/routers/projects.py | 37 +++++++++++++---- offsets_db_api/schemas.py | 1 + offsets_db_api/tasks.py | 15 ++++++- tests/conftest.py | 12 ++++-- update_database.py | 1 + 10 files changed, 94 insertions(+), 74 deletions(-) delete mode 100644 migrations/versions/46b4797494ca_remove_b_tree_indexes.py rename migrations/versions/{94e3b0854cad_reset_migrations_and_add_indexes.py => 8738137f943c_reset_database.py} (88%) diff --git a/migrations/versions/46b4797494ca_remove_b_tree_indexes.py b/migrations/versions/46b4797494ca_remove_b_tree_indexes.py deleted file mode 100644 index fe93902..0000000 --- a/migrations/versions/46b4797494ca_remove_b_tree_indexes.py +++ /dev/null @@ -1,36 +0,0 @@ -"""remove b-tree indexes - -Revision ID: 46b4797494ca -Revises: 94e3b0854cad -Create Date: 2024-09-11 13:10:17.186167 - -""" - -from alembic import op - -# revision identifiers, used by Alembic. -revision = '46b4797494ca' -down_revision = '94e3b0854cad' -branch_labels = None -depends_on = None - - -def upgrade() -> None: - op.execute('CREATE EXTENSION IF NOT EXISTS pg_trgm') - # ### commands auto generated by Alembic - please adjust! ### - op.drop_index('ix_credit_retirement_account', table_name='credit') - op.drop_index('ix_credit_retirement_beneficiary', table_name='credit') - op.drop_index('ix_credit_retirement_note', table_name='credit') - op.drop_index('ix_credit_retirement_reason', table_name='credit') - # ### end Alembic commands ### - - -def downgrade() -> None: - # ### commands auto generated by Alembic - please adjust! ### - op.create_index('ix_credit_retirement_reason', 'credit', ['retirement_reason'], unique=False) - op.create_index('ix_credit_retirement_note', 'credit', ['retirement_note'], unique=False) - op.create_index( - 'ix_credit_retirement_beneficiary', 'credit', ['retirement_beneficiary'], unique=False - ) - op.create_index('ix_credit_retirement_account', 'credit', ['retirement_account'], unique=False) - # ### end Alembic commands ### diff --git a/migrations/versions/94e3b0854cad_reset_migrations_and_add_indexes.py b/migrations/versions/8738137f943c_reset_database.py similarity index 88% rename from migrations/versions/94e3b0854cad_reset_migrations_and_add_indexes.py rename to migrations/versions/8738137f943c_reset_database.py index c3ff3e1..9799d57 100644 --- a/migrations/versions/94e3b0854cad_reset_migrations_and_add_indexes.py +++ b/migrations/versions/8738137f943c_reset_database.py @@ -1,8 +1,8 @@ -"""reset migrations and add indexes +"""reset database -Revision ID: 94e3b0854cad +Revision ID: 8738137f943c Revises: -Create Date: 2024-09-10 19:03:00.335882 +Create Date: 2024-11-21 16:40:14.822948 """ @@ -12,7 +12,7 @@ from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. -revision = '94e3b0854cad' +revision = '8738137f943c' down_revision = None branch_labels = None depends_on = None @@ -47,8 +47,8 @@ def upgrade() -> None: sa.Column('recorded_at', sa.DateTime(), nullable=False), sa.Column( 'category', - sa.Enum('projects', 'credits', 'clips', 'unknown', name='filecategory'), - nullable=False, + sa.Enum('projects', 'credits', 'clips', 'projecttypes', 'unknown', name='filecategory'), + nullable=True, ), sa.PrimaryKeyConstraint('id'), ) @@ -124,9 +124,6 @@ def upgrade() -> None: sa.PrimaryKeyConstraint('id'), ) op.create_index(op.f('ix_credit_project_id'), 'credit', ['project_id'], unique=False) - op.create_index( - op.f('ix_credit_retirement_account'), 'credit', ['retirement_account'], unique=False - ) op.create_index( 'ix_credit_retirement_account_gin', 'credit', @@ -134,9 +131,6 @@ def upgrade() -> None: unique=False, postgresql_using='gin', ) - op.create_index( - op.f('ix_credit_retirement_beneficiary'), 'credit', ['retirement_beneficiary'], unique=False - ) op.create_index( 'ix_credit_retirement_beneficiary_gin', 'credit', @@ -144,7 +138,6 @@ def upgrade() -> None: unique=False, postgresql_using='gin', ) - op.create_index(op.f('ix_credit_retirement_note'), 'credit', ['retirement_note'], unique=False) op.create_index( 'ix_credit_retirement_note_gin', 'credit', @@ -152,9 +145,6 @@ def upgrade() -> None: unique=False, postgresql_using='gin', ) - op.create_index( - op.f('ix_credit_retirement_reason'), 'credit', ['retirement_reason'], unique=False - ) op.create_index( 'ix_credit_retirement_reason_gin', 'credit', @@ -172,23 +162,33 @@ def upgrade() -> None: unique=False, postgresql_using='gin', ) + op.create_table( + 'projecttype', + sa.Column('project_id', sqlmodel.sql.sqltypes.AutoString(), nullable=False), + sa.Column('project_type', sqlmodel.sql.sqltypes.AutoString(), nullable=True), + sa.Column('source', sqlmodel.sql.sqltypes.AutoString(), nullable=True), + sa.ForeignKeyConstraint( + ['project_id'], + ['project.project_id'], + ), + sa.PrimaryKeyConstraint('project_id'), + ) + op.create_index(op.f('ix_projecttype_project_id'), 'projecttype', ['project_id'], unique=False) # ### end Alembic commands ### def downgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_projecttype_project_id'), table_name='projecttype') + op.drop_table('projecttype') op.drop_index('ix_credit_transaction_type_gin', table_name='credit', postgresql_using='gin') op.drop_index(op.f('ix_credit_transaction_date'), table_name='credit') op.drop_index('ix_credit_retirement_reason_gin', table_name='credit', postgresql_using='gin') - op.drop_index(op.f('ix_credit_retirement_reason'), table_name='credit') op.drop_index('ix_credit_retirement_note_gin', table_name='credit', postgresql_using='gin') - op.drop_index(op.f('ix_credit_retirement_note'), table_name='credit') op.drop_index( 'ix_credit_retirement_beneficiary_gin', table_name='credit', postgresql_using='gin' ) - op.drop_index(op.f('ix_credit_retirement_beneficiary'), table_name='credit') op.drop_index('ix_credit_retirement_account_gin', table_name='credit', postgresql_using='gin') - op.drop_index(op.f('ix_credit_retirement_account'), table_name='credit') op.drop_index(op.f('ix_credit_project_id'), table_name='credit') op.drop_table('credit') op.drop_index(op.f('ix_clipproject_project_id'), table_name='clipproject') diff --git a/offsets_db_api/main.py b/offsets_db_api/main.py index 0b9bcb3..45611b1 100644 --- a/offsets_db_api/main.py +++ b/offsets_db_api/main.py @@ -39,7 +39,7 @@ async def lifespan_event(app: FastAPI): # set up cache logger.info('๐Ÿ”ฅ Setting up cache...') - expiration = int(60 * 60 * 24) # 24 hours + expiration = int(60 * 60 * 2) # 2 hours cache_status_header = 'X-OffsetsDB-Cache' FastAPICache.init( InMemoryBackend(), @@ -59,7 +59,7 @@ async def lifespan_event(app: FastAPI): yield logger.info('Application shutdown...') - logger.info('Clearing cache...') + logger.info('๐Ÿงน Clearing cache...') FastAPICache.reset() observer.stop() observer.join() diff --git a/offsets_db_api/models.py b/offsets_db_api/models.py index 9739add..965183a 100644 --- a/offsets_db_api/models.py +++ b/offsets_db_api/models.py @@ -3,7 +3,7 @@ import pydantic from sqlalchemy.dialects import postgresql -from sqlmodel import BigInteger, Column, Field, Index, Relationship, SQLModel, String, text +from sqlmodel import BigInteger, Column, Enum, Field, Index, Relationship, SQLModel, String, text from offsets_db_api.schemas import FileCategory, FileStatus, Pagination @@ -17,7 +17,9 @@ class File(SQLModel, table=True): recorded_at: datetime.datetime = Field( default_factory=datetime.datetime.utcnow, description='Date file was recorded in database' ) - category: FileCategory = Field(description='Category of file', default='unknown') + category: FileCategory = Field( + description='Category of file', sa_column=Column(Enum(FileCategory)) + ) class ProjectBase(SQLModel): @@ -72,6 +74,7 @@ class Project(ProjectBase, table=True): clip_relationships: list['ClipProject'] = Relationship( back_populates='project', sa_relationship_kwargs={'cascade': 'all,delete,delete-orphan'} ) + project_type: 'ProjectType' = Relationship(back_populates='project') class ClipBase(SQLModel): @@ -120,6 +123,7 @@ class ProjectWithClips(ProjectBase): clips: list[Clip] | None = Field( default=None, description='List of clips associated with project' ) + project_type: str | None = Field(description='Type of project', default=None) class CreditBase(SQLModel): @@ -252,3 +256,15 @@ class PaginatedClips(pydantic.BaseModel): class PaginatedFiles(pydantic.BaseModel): pagination: Pagination data: list[File] | list[dict[str, typing.Any]] + + +class ProjectType(SQLModel, table=True): + project_id: str = Field( + description='Project id used by registry system', + foreign_key='project.project_id', + primary_key=True, + index=True, + ) + project_type: str | None = Field(description='Type of project', default=None) + source: str | None = Field(description='Source of project type', default=None) + project: Project | None = Relationship(back_populates='project_type') diff --git a/offsets_db_api/routers/files.py b/offsets_db_api/routers/files.py index b6ea613..4f495d0 100644 --- a/offsets_db_api/routers/files.py +++ b/offsets_db_api/routers/files.py @@ -48,6 +48,8 @@ async def submit_file( settings = get_settings() engine = get_engine(database_url=settings.database_url) + logger.info(f'Processing files: {file_objs}') + background_tasks.add_task(process_files, engine=engine, session=session, files=file_objs) return file_objs diff --git a/offsets_db_api/routers/projects.py b/offsets_db_api/routers/projects.py index f259b13..cc49bc9 100644 --- a/offsets_db_api/routers/projects.py +++ b/offsets_db_api/routers/projects.py @@ -9,7 +9,14 @@ from offsets_db_api.cache import CACHE_NAMESPACE from offsets_db_api.database import get_session from offsets_db_api.log import get_logger -from offsets_db_api.models import Clip, ClipProject, PaginatedProjects, Project, ProjectWithClips +from offsets_db_api.models import ( + Clip, + ClipProject, + PaginatedProjects, + Project, + ProjectType, + ProjectWithClips, +) from offsets_db_api.schemas import Pagination, Registries from offsets_db_api.security import check_api_key from offsets_db_api.sql_helpers import apply_filters, apply_sorting, handle_pagination @@ -28,6 +35,7 @@ async def get_projects( country: list[str] | None = Query(None, description='Country name'), protocol: list[str] | None = Query(None, description='Protocol name'), category: list[str] | None = Query(None, description='Category name'), + project_type: list[str] | None = Query(None, description='Project type'), is_compliance: bool | None = Query(None, description='Whether project is an ARB project'), listed_at_from: datetime.datetime | datetime.date | None = Query( default=None, description='Format: YYYY-MM-DD' @@ -68,9 +76,13 @@ async def get_projects( ('issued', issued_max, '<=', Project), ('retired', retired_min, '>=', Project), ('retired', retired_max, '<=', Project), + ('project_type', project_type, 'ilike', ProjectType), ] - statement = select(Project) + # Modified to include ProjectType in the initial query + statement = select(Project, ProjectType.project_type).outerjoin( + ProjectType, Project.project_id == ProjectType.project_id + ) for attribute, values, operation, model in filters: statement = apply_filters( @@ -105,7 +117,7 @@ async def get_projects( ) # Get the list of project IDs from the results - project_ids = [project.project_id for project in results] + project_ids = [project.project_id for project, _ in results] # Subquery to get clips related to the project IDs clip_statement = ( @@ -120,10 +132,11 @@ async def get_projects( for project_id, clip in clip_results: project_to_clips[project_id].append(clip) - # Transform the dictionary into a list of projects with clips + # Transform the dictionary into a list of projects with clips and project_type projects_with_clips = [] - for project in results: + for project, project_type in results: project_data = project.model_dump() + project_data['project_type'] = project_type project_data['clips'] = [ clip.model_dump() for clip in project_to_clips.get(project.project_id, []) ] @@ -156,14 +169,21 @@ async def get_project( logger.info(f'Getting project: {request.url}') # main query to get the project details - statement = select(Project).where(Project.project_id == project_id) - project = session.exec(statement).one_or_none() + statement = ( + select(Project, ProjectType.project_type) + .outerjoin(ProjectType, Project.project_id == ProjectType.project_id) + .where(Project.project_id == project_id) + ) - if not project: + result = session.exec(statement).one_or_none() + + if not result: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f'project {project_id} not found' ) + project, project_type = result + # Subquery to get the related clips clip_statement = ( select(Clip) @@ -174,5 +194,6 @@ async def get_project( # Construct the response data project_data = project.model_dump() + project_data['project_type'] = project_type project_data['clips'] = [clip.model_dump() for clip in clip_projects_subquery] return project_data diff --git a/offsets_db_api/schemas.py b/offsets_db_api/schemas.py index 19edb95..532ef39 100644 --- a/offsets_db_api/schemas.py +++ b/offsets_db_api/schemas.py @@ -25,6 +25,7 @@ class FileCategory(str, enum.Enum): projects = 'projects' credits = 'credits' clips = 'clips' + projecttypes = 'projecttypes' unknown = 'unknown' diff --git a/offsets_db_api/tasks.py b/offsets_db_api/tasks.py index 8ccff0a..06b96e8 100644 --- a/offsets_db_api/tasks.py +++ b/offsets_db_api/tasks.py @@ -2,7 +2,7 @@ import traceback import pandas as pd -from offsets_db_data.models import clip_schema, credit_schema, project_schema +from offsets_db_data.models import clip_schema, credit_schema, project_schema, project_types_schema from offsets_db_data.registry import get_registry_from_project_id from sqlalchemy.exc import IntegrityError from sqlmodel import ARRAY, BigInteger, Boolean, Date, DateTime, Session, String, col, select, text @@ -88,7 +88,7 @@ def process_dataframe(df, table_name, engine, dtype_dict=None): # Instead of dropping table (which results in data type, schema overrides), delete all rows. conn.execute(text(f'TRUNCATE TABLE {table_name} RESTART IDENTITY CASCADE;')) - if table_name in {'credit', 'clipproject'}: + if table_name in {'credit', 'clipproject', 'projecttype'}: session = next(get_session()) try: logger.info(f'Processing data destined for {table_name} table...') @@ -160,6 +160,17 @@ async def process_files(*, engine, session, files: list[File]): process_dataframe(df, 'project', engine, project_dtype_dict) update_file_status(file, session, 'success') + elif file.category == 'projecttypes': + logger.info(f'๐Ÿ“š Loading project type file: {file.url}') + data = pd.read_parquet(file.url, engine='fastparquet') + df = project_types_schema.validate(data) + project_type_dtype_dict = { + 'project_id': String, + 'project_type': String, + 'source': String, + } + process_dataframe(df, 'projecttype', engine, project_type_dtype_dict) + update_file_status(file, session, 'success') else: logger.info(f'โ“ Unknown file category: {file.category}. Skipping file {file.url}') diff --git a/tests/conftest.py b/tests/conftest.py index 6b080f8..ecaacef 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -50,21 +50,25 @@ def wait_for_file_processing(test_app: TestClient, file_ids: list[str], timeout: def setup_post(test_app: TestClient): payload: list[dict[str, str]] = [ { - 'url': 's3://carbonplan-offsets-db/final/2024-09-05/credits-augmented.parquet', + 'url': 's3://carbonplan-offsets-db/final/2024-11-19/credits-augmented.parquet', 'category': 'credits', }, { - 'url': 's3://carbonplan-offsets-db/final/2024-09-05/projects-augmented.parquet', + 'url': 's3://carbonplan-offsets-db/final/2024-11-19/projects-augmented.parquet', 'category': 'projects', }, { - 'url': 's3://carbonplan-offsets-db/final/2024-09-05/curated-clips.parquet', + 'url': 's3://carbonplan-offsets-db/final/2024-11-19/curated-clips.parquet', 'category': 'clips', }, { - 'url': 's3://carbonplan-offsets-db/final/2024-09-03/weekly-summary-clips.parquet', + 'url': 's3://carbonplan-offsets-db/final/2024-11-19/weekly-summary-clips.parquet', 'category': 'clips', }, + { + 'url': 's3://carbonplan-offsets-db/final/2024-11-19/project-types.parquet', + 'category': 'projecttypes', + }, ] headers = { diff --git a/update_database.py b/update_database.py index c78ced4..66745c0 100644 --- a/update_database.py +++ b/update_database.py @@ -24,6 +24,7 @@ def get_latest(*, bucket: str): ('credits', 'credits-augmented', today, yesterday), ('projects', 'projects-augmented', today, yesterday), ('clips', 'curated-clips', today, yesterday), + ('projecttypes', 'project-types', today, yesterday), ] data = []