Skip to content

Commit

Permalink
Add project type to the /projects/ and /projects/{project_id'} en…
Browse files Browse the repository at this point in the history
…dpoints (#131)

* Update offsets-db-data requirement to version 2024.11

* Add project type support and enhance file processing logging

* Update test data URLs for November 2024 and add project types dataset

* [skip-ci] Retrigger CI

* Add project types to database update function

* Update cache expiration to 2 hours and add project type filter to project queries
  • Loading branch information
andersy005 authored Nov 22, 2024
1 parent dc707cc commit dc3efa4
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 74 deletions.
36 changes: 0 additions & 36 deletions migrations/versions/46b4797494ca_remove_b_tree_indexes.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""reset migrations and add indexes
"""reset database
Revision ID: 94e3b0854cad
Revision ID: 8738137f943c
Revises:
Create Date: 2024-09-10 19:03:00.335882
Create Date: 2024-11-21 16:40:14.822948
"""

Expand All @@ -12,7 +12,7 @@
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = '94e3b0854cad'
revision = '8738137f943c'
down_revision = None
branch_labels = None
depends_on = None
Expand Down Expand Up @@ -47,8 +47,8 @@ def upgrade() -> None:
sa.Column('recorded_at', sa.DateTime(), nullable=False),
sa.Column(
'category',
sa.Enum('projects', 'credits', 'clips', 'unknown', name='filecategory'),
nullable=False,
sa.Enum('projects', 'credits', 'clips', 'projecttypes', 'unknown', name='filecategory'),
nullable=True,
),
sa.PrimaryKeyConstraint('id'),
)
Expand Down Expand Up @@ -124,37 +124,27 @@ def upgrade() -> None:
sa.PrimaryKeyConstraint('id'),
)
op.create_index(op.f('ix_credit_project_id'), 'credit', ['project_id'], unique=False)
op.create_index(
op.f('ix_credit_retirement_account'), 'credit', ['retirement_account'], unique=False
)
op.create_index(
'ix_credit_retirement_account_gin',
'credit',
[sa.text('lower(retirement_account) gin_trgm_ops')],
unique=False,
postgresql_using='gin',
)
op.create_index(
op.f('ix_credit_retirement_beneficiary'), 'credit', ['retirement_beneficiary'], unique=False
)
op.create_index(
'ix_credit_retirement_beneficiary_gin',
'credit',
[sa.text('lower(retirement_beneficiary) gin_trgm_ops')],
unique=False,
postgresql_using='gin',
)
op.create_index(op.f('ix_credit_retirement_note'), 'credit', ['retirement_note'], unique=False)
op.create_index(
'ix_credit_retirement_note_gin',
'credit',
[sa.text('lower(retirement_note) gin_trgm_ops')],
unique=False,
postgresql_using='gin',
)
op.create_index(
op.f('ix_credit_retirement_reason'), 'credit', ['retirement_reason'], unique=False
)
op.create_index(
'ix_credit_retirement_reason_gin',
'credit',
Expand All @@ -172,23 +162,33 @@ def upgrade() -> None:
unique=False,
postgresql_using='gin',
)
op.create_table(
'projecttype',
sa.Column('project_id', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('project_type', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column('source', sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.ForeignKeyConstraint(
['project_id'],
['project.project_id'],
),
sa.PrimaryKeyConstraint('project_id'),
)
op.create_index(op.f('ix_projecttype_project_id'), 'projecttype', ['project_id'], unique=False)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_projecttype_project_id'), table_name='projecttype')
op.drop_table('projecttype')
op.drop_index('ix_credit_transaction_type_gin', table_name='credit', postgresql_using='gin')
op.drop_index(op.f('ix_credit_transaction_date'), table_name='credit')
op.drop_index('ix_credit_retirement_reason_gin', table_name='credit', postgresql_using='gin')
op.drop_index(op.f('ix_credit_retirement_reason'), table_name='credit')
op.drop_index('ix_credit_retirement_note_gin', table_name='credit', postgresql_using='gin')
op.drop_index(op.f('ix_credit_retirement_note'), table_name='credit')
op.drop_index(
'ix_credit_retirement_beneficiary_gin', table_name='credit', postgresql_using='gin'
)
op.drop_index(op.f('ix_credit_retirement_beneficiary'), table_name='credit')
op.drop_index('ix_credit_retirement_account_gin', table_name='credit', postgresql_using='gin')
op.drop_index(op.f('ix_credit_retirement_account'), table_name='credit')
op.drop_index(op.f('ix_credit_project_id'), table_name='credit')
op.drop_table('credit')
op.drop_index(op.f('ix_clipproject_project_id'), table_name='clipproject')
Expand Down
4 changes: 2 additions & 2 deletions offsets_db_api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async def lifespan_event(app: FastAPI):

# set up cache
logger.info('🔥 Setting up cache...')
expiration = int(60 * 60 * 24) # 24 hours
expiration = int(60 * 60 * 2) # 2 hours
cache_status_header = 'X-OffsetsDB-Cache'
FastAPICache.init(
InMemoryBackend(),
Expand All @@ -59,7 +59,7 @@ async def lifespan_event(app: FastAPI):
yield

logger.info('Application shutdown...')
logger.info('Clearing cache...')
logger.info('🧹 Clearing cache...')
FastAPICache.reset()
observer.stop()
observer.join()
Expand Down
20 changes: 18 additions & 2 deletions offsets_db_api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pydantic
from sqlalchemy.dialects import postgresql
from sqlmodel import BigInteger, Column, Field, Index, Relationship, SQLModel, String, text
from sqlmodel import BigInteger, Column, Enum, Field, Index, Relationship, SQLModel, String, text

from offsets_db_api.schemas import FileCategory, FileStatus, Pagination

Expand All @@ -17,7 +17,9 @@ class File(SQLModel, table=True):
recorded_at: datetime.datetime = Field(
default_factory=datetime.datetime.utcnow, description='Date file was recorded in database'
)
category: FileCategory = Field(description='Category of file', default='unknown')
category: FileCategory = Field(
description='Category of file', sa_column=Column(Enum(FileCategory))
)


class ProjectBase(SQLModel):
Expand Down Expand Up @@ -72,6 +74,7 @@ class Project(ProjectBase, table=True):
clip_relationships: list['ClipProject'] = Relationship(
back_populates='project', sa_relationship_kwargs={'cascade': 'all,delete,delete-orphan'}
)
project_type: 'ProjectType' = Relationship(back_populates='project')


class ClipBase(SQLModel):
Expand Down Expand Up @@ -120,6 +123,7 @@ class ProjectWithClips(ProjectBase):
clips: list[Clip] | None = Field(
default=None, description='List of clips associated with project'
)
project_type: str | None = Field(description='Type of project', default=None)


class CreditBase(SQLModel):
Expand Down Expand Up @@ -252,3 +256,15 @@ class PaginatedClips(pydantic.BaseModel):
class PaginatedFiles(pydantic.BaseModel):
pagination: Pagination
data: list[File] | list[dict[str, typing.Any]]


class ProjectType(SQLModel, table=True):
project_id: str = Field(
description='Project id used by registry system',
foreign_key='project.project_id',
primary_key=True,
index=True,
)
project_type: str | None = Field(description='Type of project', default=None)
source: str | None = Field(description='Source of project type', default=None)
project: Project | None = Relationship(back_populates='project_type')
2 changes: 2 additions & 0 deletions offsets_db_api/routers/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ async def submit_file(
settings = get_settings()
engine = get_engine(database_url=settings.database_url)

logger.info(f'Processing files: {file_objs}')

background_tasks.add_task(process_files, engine=engine, session=session, files=file_objs)
return file_objs

Expand Down
37 changes: 29 additions & 8 deletions offsets_db_api/routers/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@
from offsets_db_api.cache import CACHE_NAMESPACE
from offsets_db_api.database import get_session
from offsets_db_api.log import get_logger
from offsets_db_api.models import Clip, ClipProject, PaginatedProjects, Project, ProjectWithClips
from offsets_db_api.models import (
Clip,
ClipProject,
PaginatedProjects,
Project,
ProjectType,
ProjectWithClips,
)
from offsets_db_api.schemas import Pagination, Registries
from offsets_db_api.security import check_api_key
from offsets_db_api.sql_helpers import apply_filters, apply_sorting, handle_pagination
Expand All @@ -28,6 +35,7 @@ async def get_projects(
country: list[str] | None = Query(None, description='Country name'),
protocol: list[str] | None = Query(None, description='Protocol name'),
category: list[str] | None = Query(None, description='Category name'),
project_type: list[str] | None = Query(None, description='Project type'),
is_compliance: bool | None = Query(None, description='Whether project is an ARB project'),
listed_at_from: datetime.datetime | datetime.date | None = Query(
default=None, description='Format: YYYY-MM-DD'
Expand Down Expand Up @@ -68,9 +76,13 @@ async def get_projects(
('issued', issued_max, '<=', Project),
('retired', retired_min, '>=', Project),
('retired', retired_max, '<=', Project),
('project_type', project_type, 'ilike', ProjectType),
]

statement = select(Project)
# Modified to include ProjectType in the initial query
statement = select(Project, ProjectType.project_type).outerjoin(
ProjectType, Project.project_id == ProjectType.project_id
)

for attribute, values, operation, model in filters:
statement = apply_filters(
Expand Down Expand Up @@ -105,7 +117,7 @@ async def get_projects(
)

# Get the list of project IDs from the results
project_ids = [project.project_id for project in results]
project_ids = [project.project_id for project, _ in results]

# Subquery to get clips related to the project IDs
clip_statement = (
Expand All @@ -120,10 +132,11 @@ async def get_projects(
for project_id, clip in clip_results:
project_to_clips[project_id].append(clip)

# Transform the dictionary into a list of projects with clips
# Transform the dictionary into a list of projects with clips and project_type
projects_with_clips = []
for project in results:
for project, project_type in results:
project_data = project.model_dump()
project_data['project_type'] = project_type
project_data['clips'] = [
clip.model_dump() for clip in project_to_clips.get(project.project_id, [])
]
Expand Down Expand Up @@ -156,14 +169,21 @@ async def get_project(
logger.info(f'Getting project: {request.url}')

# main query to get the project details
statement = select(Project).where(Project.project_id == project_id)
project = session.exec(statement).one_or_none()
statement = (
select(Project, ProjectType.project_type)
.outerjoin(ProjectType, Project.project_id == ProjectType.project_id)
.where(Project.project_id == project_id)
)

if not project:
result = session.exec(statement).one_or_none()

if not result:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=f'project {project_id} not found'
)

project, project_type = result

# Subquery to get the related clips
clip_statement = (
select(Clip)
Expand All @@ -174,5 +194,6 @@ async def get_project(

# Construct the response data
project_data = project.model_dump()
project_data['project_type'] = project_type
project_data['clips'] = [clip.model_dump() for clip in clip_projects_subquery]
return project_data
1 change: 1 addition & 0 deletions offsets_db_api/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class FileCategory(str, enum.Enum):
projects = 'projects'
credits = 'credits'
clips = 'clips'
projecttypes = 'projecttypes'
unknown = 'unknown'


Expand Down
15 changes: 13 additions & 2 deletions offsets_db_api/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import traceback

import pandas as pd
from offsets_db_data.models import clip_schema, credit_schema, project_schema
from offsets_db_data.models import clip_schema, credit_schema, project_schema, project_types_schema
from offsets_db_data.registry import get_registry_from_project_id
from sqlalchemy.exc import IntegrityError
from sqlmodel import ARRAY, BigInteger, Boolean, Date, DateTime, Session, String, col, select, text
Expand Down Expand Up @@ -88,7 +88,7 @@ def process_dataframe(df, table_name, engine, dtype_dict=None):
# Instead of dropping table (which results in data type, schema overrides), delete all rows.
conn.execute(text(f'TRUNCATE TABLE {table_name} RESTART IDENTITY CASCADE;'))

if table_name in {'credit', 'clipproject'}:
if table_name in {'credit', 'clipproject', 'projecttype'}:
session = next(get_session())
try:
logger.info(f'Processing data destined for {table_name} table...')
Expand Down Expand Up @@ -160,6 +160,17 @@ async def process_files(*, engine, session, files: list[File]):

process_dataframe(df, 'project', engine, project_dtype_dict)
update_file_status(file, session, 'success')
elif file.category == 'projecttypes':
logger.info(f'📚 Loading project type file: {file.url}')
data = pd.read_parquet(file.url, engine='fastparquet')
df = project_types_schema.validate(data)
project_type_dtype_dict = {
'project_id': String,
'project_type': String,
'source': String,
}
process_dataframe(df, 'projecttype', engine, project_type_dtype_dict)
update_file_status(file, session, 'success')

else:
logger.info(f'❓ Unknown file category: {file.category}. Skipping file {file.url}')
Expand Down
12 changes: 8 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,25 @@ def wait_for_file_processing(test_app: TestClient, file_ids: list[str], timeout:
def setup_post(test_app: TestClient):
payload: list[dict[str, str]] = [
{
'url': 's3://carbonplan-offsets-db/final/2024-09-05/credits-augmented.parquet',
'url': 's3://carbonplan-offsets-db/final/2024-11-19/credits-augmented.parquet',
'category': 'credits',
},
{
'url': 's3://carbonplan-offsets-db/final/2024-09-05/projects-augmented.parquet',
'url': 's3://carbonplan-offsets-db/final/2024-11-19/projects-augmented.parquet',
'category': 'projects',
},
{
'url': 's3://carbonplan-offsets-db/final/2024-09-05/curated-clips.parquet',
'url': 's3://carbonplan-offsets-db/final/2024-11-19/curated-clips.parquet',
'category': 'clips',
},
{
'url': 's3://carbonplan-offsets-db/final/2024-09-03/weekly-summary-clips.parquet',
'url': 's3://carbonplan-offsets-db/final/2024-11-19/weekly-summary-clips.parquet',
'category': 'clips',
},
{
'url': 's3://carbonplan-offsets-db/final/2024-11-19/project-types.parquet',
'category': 'projecttypes',
},
]

headers = {
Expand Down
1 change: 1 addition & 0 deletions update_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def get_latest(*, bucket: str):
('credits', 'credits-augmented', today, yesterday),
('projects', 'projects-augmented', today, yesterday),
('clips', 'curated-clips', today, yesterday),
('projecttypes', 'project-types', today, yesterday),
]

data = []
Expand Down

0 comments on commit dc3efa4

Please sign in to comment.