Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexPatrie committed Aug 15, 2024
1 parent 419e4de commit 5724e4f
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 64 deletions.
46 changes: 23 additions & 23 deletions compose_api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

# -- constraints -- #

APP_TITLE = "bio-composer"
APP_TITLE = "bio-compose"
APP_VERSION = "1.0.0"

# TODO: update this
Expand Down Expand Up @@ -105,7 +105,7 @@ def stop_mongo_client() -> DbClientResponse:

# -- endpoint logic -- #

@app.get("/")
@app.get("/", summary="Ping the API root.")
def root():
return {'bio-composer-message': 'Hello from the BioComposer!'}

Expand All @@ -117,31 +117,30 @@ def root():
# response_model=PendingSmoldynJob,
name="Run a smoldyn simulation",
operation_id="run-smoldyn",
tags=["Execute Simulations"],
summary="Run a smoldyn simulation")
tags=["Simulation Execution"],
summary="Run a smoldyn simulation.")
async def run_smoldyn(
uploaded_file: UploadFile = File(..., description="Smoldyn Configuration File"),
duration: int = Query(default=None, description="Simulation Duration"),
dt: float = Query(default=None, description="Interval of step with which simulation runs"),
# initial_molecule_state: List = Body(default=None, description="Mapping of species names to initial molecule conditions including counts and location.")
):
try:
job_id = "execute-simulations-smoldyn" + str(uuid.uuid4())
job_id = "simulation-execution-smoldyn" + str(uuid.uuid4())
_time = db_connector.timestamp()
uploaded_file_location = await write_uploaded_file(job_id=job_id, uploaded_file=uploaded_file, bucket_name=BUCKET_NAME, extension='.txt')

job_doc = {
'job_id': job_id,
'timestamp': _time,
'status': JobStatus.PENDING,
'path': uploaded_file_location,
'duration': duration,
'dt': dt,
# 'initial_molecule_state': initial_molecule_state
}
pending_job = await db_connector.write(collection_name=DatabaseCollections.PENDING_JOBS, **job_doc)
pending_job = await db_connector.insert_job_async(
collection_name=DatabaseCollections.PENDING_JOBS.value,
job_id=job_id,
timestamp=_time,
status=JobStatus.PENDING.value,
path=uploaded_file_location,
duration=duration,
dt=dt
)

return job_doc
return pending_job
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

Expand All @@ -151,8 +150,8 @@ async def run_smoldyn(
# response_model=PendingUtcJob,
name="Run an ODE Uniform Time Course simulation",
operation_id="run-utc",
tags=["Execute Simulations"],
summary="Run a Uniform Time Course simulation")
tags=["Simulation Execution"],
summary="Run a Uniform Time Course simulation.")
async def run_utc(
uploaded_file: UploadFile = File(..., description="SBML File"),
start: int = Query(..., description="Starting time for utc"),
Expand All @@ -161,7 +160,7 @@ async def run_utc(
simulator: str = Query(..., description="Simulator to use (one of: amici, copasi, tellurium, vcell)"),
):
try:
job_id = "execute-simulations-utc" + str(uuid.uuid4())
job_id = "simulation-execution-utc" + str(uuid.uuid4())
_time = db_connector.timestamp()
uploaded_file_location = await write_uploaded_file(job_id=job_id, uploaded_file=uploaded_file, bucket_name=BUCKET_NAME, extension='.xml')

Expand Down Expand Up @@ -357,25 +356,26 @@ async def fetch_results(job_id: str):
for collection in DatabaseCollections:
coll_name = collection.value
job = await db_connector.read(collection_name=coll_name, job_id=job_id)
# kob = {'results': {'results_file': 'uploads/simulation-execution-smoldynee94fcbe-82f2-43ca-9b3b-2caf24a647dc/modelout.txt'}, '_id': '', 'status': 'COMPLETED'}

# job exists
if not isinstance(job, type(None)):
job.pop('_id')

# case: job is completed
if job['status'] == "COMPLETED":
job.pop('_id')

# check for a downloadable file in results
# job_data = job['results'] # ['results']
job_data = job
print(job.keys())
job_data = job['results']

# case: output is a file
if "results_file" in job_data.keys():
remote_fp = job_data['results_file']
temp_dest = mkdtemp()
local_fp = download_file_from_bucket(source_blob_path=remote_fp, out_dir=temp_dest, bucket_name=BUCKET_NAME)

return FileResponse(path=local_fp)
return FileResponse(path=local_fp, media_type="application/octet-stream", filename=local_fp.split("/")[-1])
# case output is data
else:
return OutputData(content=job)
Expand Down
19 changes: 12 additions & 7 deletions compose_api/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ async def read(self, collection_name: DatabaseCollections | str, **kwargs):
"""
coll_name = self._parse_enum_input(collection_name)
coll = self.get_collection(coll_name)
result = coll.find_one(kwargs)
result = coll.find_one(kwargs.copy())
return result

async def write(self, collection_name: DatabaseCollections | str, **kwargs):
Expand All @@ -234,14 +234,10 @@ async def write(self, collection_name: DatabaseCollections | str, **kwargs):
collection_name: str: collection name in mongodb
**kwargs: mongo db `insert_one` query defining the document where the key is as in the key of the document.
"""
coll_name = self._parse_enum_input(collection_name)

for k in kwargs.keys():
v = kwargs[k]
kwargs[k] = self._parse_enum_input(v)
coll_name = collection_name

coll = self.get_collection(coll_name)
result = coll.insert_one(kwargs)
result = coll.insert_one(kwargs.copy())
return kwargs

def get_collection(self, collection_name: str) -> Collection:
Expand All @@ -250,6 +246,15 @@ def get_collection(self, collection_name: str) -> Collection:
except:
return None

async def insert_job_async(self, collection_name: str, **kwargs) -> Dict[str, Any]:
return self.insert_job(collection_name, **kwargs)

def insert_job(self, collection_name: str, **kwargs) -> Dict[str, Any]:
coll = self.get_collection(collection_name)
job_doc = kwargs.copy()
coll.insert_one(job_doc)
return kwargs

async def update_job_status(self, collection_name: str, job_id: str, status: str | JobStatus):
job_status = self._parse_enum_input(status)
return self.db[collection_name].update_one({'job_id': job_id, }, {'$set': {'status': job_status}})
Expand Down
4 changes: 2 additions & 2 deletions compose_worker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

# sleep params
DELAY_TIMER = 20
MAX_RETRIES = 20
MAX_RETRIES = 30

# creds params
MONGO_URI = os.getenv("MONGO_URI")
Expand All @@ -40,7 +40,7 @@ async def main(max_retries=MAX_RETRIES):
await asyncio.sleep(10) # TODO: adjust this for client polling as needed

await supervisor.check_jobs()
await asyncio.sleep(3)
await asyncio.sleep(5)
n_retries += 1


Expand Down
13 changes: 11 additions & 2 deletions compose_worker/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ async def read(self, collection_name: DatabaseCollections | str, **kwargs):
"""
coll_name = self._parse_enum_input(collection_name)
coll = self.get_collection(coll_name)
result = coll.find_one(kwargs)
result = coll.find_one(kwargs.copy())
return result

async def write(self, collection_name: DatabaseCollections | str, **kwargs):
Expand All @@ -196,7 +196,7 @@ async def write(self, collection_name: DatabaseCollections | str, **kwargs):
kwargs[k] = self._parse_enum_input(v)

coll = self.get_collection(coll_name)
result = coll.insert_one(kwargs)
result = coll.insert_one(kwargs.copy())
return kwargs

def get_collection(self, collection_name: str) -> Collection:
Expand All @@ -205,6 +205,15 @@ def get_collection(self, collection_name: str) -> Collection:
except:
return None

async def insert_job_async(self, collection_name: str, **kwargs) -> Dict[str, Any]:
return self.insert_job(collection_name, **kwargs)

def insert_job(self, collection_name: str, **kwargs) -> Dict[str, Any]:
coll = self.get_collection(collection_name)
job_doc = kwargs
coll.insert_one(job_doc)
return job_doc

async def update_job_status(self, collection_name: str, job_id: str, status: str | JobStatus):
job_status = self._parse_enum_input(status)
return self.db[collection_name].update_one({'job_id': job_id, }, {'$set': {'status': job_status}})
Expand Down
64 changes: 36 additions & 28 deletions compose_worker/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


class Supervisor:
def __init__(self, db_connector: MongoDbConnector, queue_timer: int = 20, preferred_queue_index: int = 0):
def __init__(self, db_connector: MongoDbConnector, queue_timer: int = 10, preferred_queue_index: int = 0):
self.db_connector = db_connector
self.queue_timer = queue_timer
self.preferred_queue_index = preferred_queue_index
Expand All @@ -40,40 +40,48 @@ async def check_jobs(self) -> int:
# 6. Sleep for a larger period of time
# 7. At the end of check_jobs, run self.job_queue = self.db_connector.pending_jobs() (refresh)

for i, pending_job in enumerate(self.job_queue):
# get job id
job_id = pending_job.get('job_id')
source = pending_job.get('path')
async def check():
for i, pending_job in enumerate(self.job_queue):
# get job id
job_id = pending_job.get('job_id')
source = pending_job.get('path')

# check if job id exists in dbconn.completed
is_completed = self.job_exists(job_id=job_id, collection_name="completed_jobs")
worker = None
# check if job id exists in dbconn.completed
is_completed = self.job_exists(job_id=job_id, collection_name="completed_jobs")
worker = None

# case: job is not complete, otherwise do nothing
if not is_completed:
# check: run simulations
if job_id.startswith('execute-simulations'):
worker = SimulationRunWorker(job=pending_job)
# check: verifications
elif job_id.startswith('verification'):
# otherwise: create new worker with job
worker = VerificationWorker(job=pending_job)
# case: job is not complete, otherwise do nothing
if not is_completed:
# check: run simulations
if job_id.startswith('simulation-execution'):
worker = SimulationRunWorker(job=pending_job)
# check: verifications
elif job_id.startswith('verification'):
# otherwise: create new worker with job
worker = VerificationWorker(job=pending_job)

# change job status for client poll
# await self.db_connector.update_job_status(collection_name="pending_jobs", job_id=job_id, status=JobStatus.RUNNING)
# change job status for client poll
# await self.db_connector.update_job_status(collection_name="pending_jobs", job_id=job_id, status=JobStatus.RUNNING)

# change job status for client by inserting a new in progress job
await self.db_connector.write(collection_name=DatabaseCollections.IN_PROGRESS_JOBS, job_id=job_id, timestamp=self.db_connector.timestamp(), status=JobStatus.IN_PROGRESS)
# change job status for client by inserting a new in progress job
await self.db_connector.write(collection_name=DatabaseCollections.IN_PROGRESS_JOBS.value, job_id=job_id, timestamp=self.db_connector.timestamp(), status=JobStatus.IN_PROGRESS.value)

# when worker completes, dismiss worker (if in parallel)
await worker.run()
# when worker completes, dismiss worker (if in parallel)
await worker.run()

# create new completed job using the worker's job_result TODO: refactor output nesting
result_data = worker.job_result
await self.db_connector.write(collection_name=DatabaseCollections.COMPLETED_JOBS, job_id=job_id, results=result_data, source=source.split('/')[-1], status=JobStatus.COMPLETED)
# create new completed job using the worker's job_result TODO: refactor output nesting
result_data = worker.job_result
await self.db_connector.write(collection_name=DatabaseCollections.COMPLETED_JOBS.value, job_id=job_id, results=result_data, source=source.split('/')[-1], status=JobStatus.COMPLETED.value)

# scan is complete, now refresh jobs
self.job_queue = self.db_connector.pending_jobs()
for _ in range(self.queue_timer):
# perform check
await check()

# rest
await sleep(2)

# refresh jobs
self.job_queue = self.db_connector.pending_jobs()

return 0

Expand Down
4 changes: 2 additions & 2 deletions compose_worker/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ async def run_smoldyn(self, local_fp: str):
format_smoldyn_configuration(filename=local_fp)

# get job params
duration = self.job_params['duration']
dt = self.job_params['dt']
duration = self.job_params.get('duration')
dt = self.job_params.get('dt')
initial_species_state = self.job_params.get('initial_molecule_state') # not yet implemented

# execute simularium, pointing to a filepath that is returned by the run smoldyn call
Expand Down

0 comments on commit 5724e4f

Please sign in to comment.