diff --git a/backend/src/protein.py b/backend/src/protein.py index 4dec184d..0fc3690f 100644 --- a/backend/src/protein.py +++ b/backend/src/protein.py @@ -2,21 +2,11 @@ import os from base64 import b64decode from io import StringIO - from Bio.PDB import PDBParser from Bio.SeqUtils import molecular_weight, seq1 - from .db import Database -def decode_base64(b64_header_and_data: str): - """Converts a base64 string to bytes""" - # only decode after the header (data:application/octet-stream;base64,) - end_of_header = b64_header_and_data.index(",") - b64_data_only = b64_header_and_data[end_of_header:] - return b64decode(b64_data_only).decode("utf-8") - - class PDB: def __init__(self, file_contents, name=""): self.name = name @@ -30,10 +20,6 @@ def __init__(self, file_contents, name=""): except Exception as e: raise e # raise to the user who calls this PDB class - @property - def pdb_file_name(self): - return f"{os.path.join('src/data/pdbAlphaFold', self.name)}.pdb" - @property def num_amino_acids(self) -> int: return len(self.amino_acids()) @@ -49,54 +35,64 @@ def amino_acids(self, one_letter_code=True): ] -class Protein: - @staticmethod - def parse_pdb(name: str, file_contents: str, encoding="str"): - if encoding == "str": - return PDB(file_contents, name) - elif encoding == "b64": - return PDB(decode_base64(file_contents), name) - else: - raise ValueError(f"Invalid encoding: {encoding}") - - @staticmethod - def name_taken(name: str): - """Checks if a protein name already exists in the database - Returns: True if exists | False if not exists - """ - with Database() as db: - try: - entry_sql = db.execute_return( - """SELECT name FROM proteins - WHERE name = %s""", - [name], - ) - - # if we got a result back - return entry_sql is not None and len(entry_sql) != 0 - - except Exception: - return False - - @staticmethod - def save(pdb: PDB): - log.warn(pdb.pdb_file_name) +def decode_base64(b64_header_and_data: str): + """Converts a base64 string to bytes""" + # only decode after the header (data:application/octet-stream;base64,) + end_of_header = b64_header_and_data.index(",") + b64_data_only = b64_header_and_data[end_of_header:] + return b64decode(b64_data_only).decode("utf-8") + + +def pdb_file_name(name: str): + return f"{os.path.join('src/data/pdbAlphaFold', name)}.pdb" + + +def parse_protein_pdb(name: str, file_contents: str, encoding="str"): + if encoding == "str": + return PDB(file_contents, name) + elif encoding == "b64": + return PDB(decode_base64(file_contents), name) + else: + raise ValueError(f"Invalid encoding: {encoding}") + + +def protein_name_taken(name: str): + """Checks if a protein name already exists in the database + Returns: True if exists | False if not exists + """ + with Database() as db: try: - with open(pdb.pdb_file_name, "w") as f: - f.write(pdb.file_contents) + entry_sql = db.execute_return( + """SELECT name FROM proteins + WHERE name = %s""", + [name], + ) + + # if we got a result back + return entry_sql is not None and len(entry_sql) != 0 + except Exception: - log.warn("could not save") - raise Exception("Could not save pdb file") - - with Database() as db: - try: - db.execute( - """INSERT INTO proteins (name, length, mass) VALUES (%s, %s, %s);""", - [ - pdb.name, - pdb.num_amino_acids, - pdb.mass_daltons, - ], - ) - except Exception as e: - raise e + return False + + +def save_protein(pdb: PDB): + path = pdb_file_name(pdb.name) + try: + with open(path, "w") as f: + f.write(pdb.file_contents) + except Exception: + log.warn("could not save") + raise Exception("Could not save pdb file") + + with Database() as db: + try: + db.execute( + """INSERT INTO proteins (name, length, mass) VALUES (%s, %s, %s);""", + [ + pdb.name, + pdb.num_amino_acids, + pdb.mass_daltons, + ], + ) + except Exception as e: + raise e diff --git a/backend/src/server.py b/backend/src/server.py index d028e3a6..dc22c467 100644 --- a/backend/src/server.py +++ b/backend/src/server.py @@ -1,10 +1,10 @@ -from .setup import init_fastapi_app, disable_cors -from .api_types import ProteinEntry, UploadBody, UploadError -from .db import Database, str_to_bytea, bytea_to_str -from .protein import Protein import logging as log +import os from fastapi.staticfiles import StaticFiles - +from .api_types import ProteinEntry, UploadBody, UploadError +from .db import Database, bytea_to_str, str_to_bytea +from .protein import parse_protein_pdb, pdb_file_name, protein_name_taken +from .setup import disable_cors, init_fastapi_app app = init_fastapi_app() disable_cors(app, origins=["http://0.0.0.0:5173", "http://localhost:5173"]) @@ -77,6 +77,8 @@ def delete_protein_entry(protein_name: str): WHERE name = %s""", [protein_name], ) + # delete the file from the data/ folder + os.remove(pdb_file_name(protein_name)) except Exception as e: log.error(e) @@ -87,19 +89,19 @@ def upload_protein_entry(body: UploadBody): body.name = body.name.replace(" ", "_") # check that the name is not already taken in the DB - if Protein.name_taken(body.name): + if protein_name_taken(body.name): return UploadError.NAME_NOT_UNIQUE # if name is unique, save the pdb file and add the entry to the database try: # TODO: consider somehow sending the file as a stream instead of a b64 string or send as regular string - pdb = Protein.parse_pdb(body.name, body.pdb_file_base64, encoding="b64") + pdb = parse_protein_pdb(body.name, body.pdb_file_base64, encoding="b64") except Exception: return UploadError.PARSE_ERROR try: # write to file to data/ folder - with open(pdb.pdb_file_name, "w") as f: + with open(pdb_file_name(pdb.name), "w") as f: f.write(pdb.file_contents) # save to db