Skip to content

Commit

Permalink
feat: refactor into one module
Browse files Browse the repository at this point in the history
  • Loading branch information
xnought committed Nov 26, 2023
1 parent 824d982 commit 2c2b925
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 71 deletions.
122 changes: 59 additions & 63 deletions backend/src/protein.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,11 @@
import os
from base64 import b64decode
from io import StringIO

from Bio.PDB import PDBParser
from Bio.SeqUtils import molecular_weight, seq1

from .db import Database


def decode_base64(b64_header_and_data: str):
"""Converts a base64 string to bytes"""
# only decode after the header (data:application/octet-stream;base64,)
end_of_header = b64_header_and_data.index(",")
b64_data_only = b64_header_and_data[end_of_header:]
return b64decode(b64_data_only).decode("utf-8")


class PDB:
def __init__(self, file_contents, name=""):
self.name = name
Expand All @@ -30,10 +20,6 @@ def __init__(self, file_contents, name=""):
except Exception as e:
raise e # raise to the user who calls this PDB class

@property
def pdb_file_name(self):
return f"{os.path.join('src/data/pdbAlphaFold', self.name)}.pdb"

@property
def num_amino_acids(self) -> int:
return len(self.amino_acids())
Expand All @@ -49,54 +35,64 @@ def amino_acids(self, one_letter_code=True):
]


class Protein:
@staticmethod
def parse_pdb(name: str, file_contents: str, encoding="str"):
if encoding == "str":
return PDB(file_contents, name)
elif encoding == "b64":
return PDB(decode_base64(file_contents), name)
else:
raise ValueError(f"Invalid encoding: {encoding}")

@staticmethod
def name_taken(name: str):
"""Checks if a protein name already exists in the database
Returns: True if exists | False if not exists
"""
with Database() as db:
try:
entry_sql = db.execute_return(
"""SELECT name FROM proteins
WHERE name = %s""",
[name],
)

# if we got a result back
return entry_sql is not None and len(entry_sql) != 0

except Exception:
return False

@staticmethod
def save(pdb: PDB):
log.warn(pdb.pdb_file_name)
def decode_base64(b64_header_and_data: str):
"""Converts a base64 string to bytes"""
# only decode after the header (data:application/octet-stream;base64,)
end_of_header = b64_header_and_data.index(",")
b64_data_only = b64_header_and_data[end_of_header:]
return b64decode(b64_data_only).decode("utf-8")


def pdb_file_name(name: str):
return f"{os.path.join('src/data/pdbAlphaFold', name)}.pdb"


def parse_protein_pdb(name: str, file_contents: str, encoding="str"):
if encoding == "str":
return PDB(file_contents, name)
elif encoding == "b64":
return PDB(decode_base64(file_contents), name)
else:
raise ValueError(f"Invalid encoding: {encoding}")


def protein_name_taken(name: str):
"""Checks if a protein name already exists in the database
Returns: True if exists | False if not exists
"""
with Database() as db:
try:
with open(pdb.pdb_file_name, "w") as f:
f.write(pdb.file_contents)
entry_sql = db.execute_return(
"""SELECT name FROM proteins
WHERE name = %s""",
[name],
)

# if we got a result back
return entry_sql is not None and len(entry_sql) != 0

except Exception:
log.warn("could not save")
raise Exception("Could not save pdb file")

with Database() as db:
try:
db.execute(
"""INSERT INTO proteins (name, length, mass) VALUES (%s, %s, %s);""",
[
pdb.name,
pdb.num_amino_acids,
pdb.mass_daltons,
],
)
except Exception as e:
raise e
return False


def save_protein(pdb: PDB):
path = pdb_file_name(pdb.name)
try:
with open(path, "w") as f:
f.write(pdb.file_contents)
except Exception:
log.warn("could not save")
raise Exception("Could not save pdb file")

with Database() as db:
try:
db.execute(
"""INSERT INTO proteins (name, length, mass) VALUES (%s, %s, %s);""",
[
pdb.name,
pdb.num_amino_acids,
pdb.mass_daltons,
],
)
except Exception as e:
raise e
18 changes: 10 additions & 8 deletions backend/src/server.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from .setup import init_fastapi_app, disable_cors
from .api_types import ProteinEntry, UploadBody, UploadError
from .db import Database, str_to_bytea, bytea_to_str
from .protein import Protein
import logging as log
import os
from fastapi.staticfiles import StaticFiles

from .api_types import ProteinEntry, UploadBody, UploadError
from .db import Database, bytea_to_str, str_to_bytea
from .protein import parse_protein_pdb, pdb_file_name, protein_name_taken
from .setup import disable_cors, init_fastapi_app

app = init_fastapi_app()
disable_cors(app, origins=["http://0.0.0.0:5173", "http://localhost:5173"])
Expand Down Expand Up @@ -77,6 +77,8 @@ def delete_protein_entry(protein_name: str):
WHERE name = %s""",
[protein_name],
)
# delete the file from the data/ folder
os.remove(pdb_file_name(protein_name))
except Exception as e:
log.error(e)

Expand All @@ -87,19 +89,19 @@ def upload_protein_entry(body: UploadBody):
body.name = body.name.replace(" ", "_")

# check that the name is not already taken in the DB
if Protein.name_taken(body.name):
if protein_name_taken(body.name):
return UploadError.NAME_NOT_UNIQUE

# if name is unique, save the pdb file and add the entry to the database
try:
# TODO: consider somehow sending the file as a stream instead of a b64 string or send as regular string
pdb = Protein.parse_pdb(body.name, body.pdb_file_base64, encoding="b64")
pdb = parse_protein_pdb(body.name, body.pdb_file_base64, encoding="b64")
except Exception:
return UploadError.PARSE_ERROR

try:
# write to file to data/ folder
with open(pdb.pdb_file_name, "w") as f:
with open(pdb_file_name(pdb.name), "w") as f:
f.write(pdb.file_contents)

# save to db
Expand Down

0 comments on commit 2c2b925

Please sign in to comment.