Skip to content

Commit

Permalink
Add E2E tests (#36)
Browse files Browse the repository at this point in the history
* Init test fixture config

* Inject Redis into API

* Mock redis

* Run through simulate e2e fully [caveats]

* Capture output files [broken]

* Freeze version properly

* Add better file storage [broken]

* Fix storage

* Move file save logic to fixture

* Check if valid files

* Add both ensemble operations

* Add calibrate to E2E tests

* Warn if RabbitMQ cannot connect
  • Loading branch information
fivegrant authored Aug 17, 2023
1 parent 80f4ebd commit 5304854
Show file tree
Hide file tree
Showing 15 changed files with 628 additions and 183 deletions.
76 changes: 75 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ uvicorn = "^0.22.0"
pika = "1.3.2"
pandas = "^2.0.0"
filelock = "^3.12.2"
poethepoet = "^0.21.1"
# juliacall = { version="^0.9.14", optional = true }

[tool.poetry.scripts]
Expand All @@ -30,14 +31,14 @@ pytest = "^7.4.0"
pre-commit = "^3.3.3"
requests-mock = "^1.11.0"
mock = "^5.1.0"
poethepoet = "^0.21.1"
fakeredis = "^2.17.0"
httpx = "^0.24.1"

[tool.poe.tasks]
install-pyciemss = "pip install --no-cache git+https://github.com/ciemss/pyciemss.git@v0.0.1"

[tool.pytest.ini_options]
python_files = ["tests/tests.py"]
markers = ["operation"]
markers = ["example_dir"]
pythonpath = "service"


Expand Down
24 changes: 16 additions & 8 deletions service/api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import logging
from fastapi import FastAPI, HTTPException
from fastapi import FastAPI, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware

from models import (
Expand All @@ -14,7 +14,7 @@
StatusSimulationIdGetResponse,
)

from utils.rq_helpers import create_job, fetch_job_status, kill_job
from utils.rq_helpers import get_redis, create_job, fetch_job_status, kill_job

Operation = Simulate | Calibrate | EnsembleSimulate | EnsembleCalibrate

Expand Down Expand Up @@ -55,11 +55,13 @@ def get_ping():


@app.get("/status/{simulation_id}", response_model=StatusSimulationIdGetResponse)
def get_status(simulation_id: str) -> StatusSimulationIdGetResponse:
def get_status(
simulation_id: str, redis_conn=Depends(get_redis)
) -> StatusSimulationIdGetResponse:
"""
Retrieve the status of a simulation
"""
status = fetch_job_status(simulation_id)
status = fetch_job_status(simulation_id, redis_conn)
logging.info(status)
if not isinstance(status, str):
return status
Expand All @@ -70,11 +72,13 @@ def get_status(simulation_id: str) -> StatusSimulationIdGetResponse:
@app.get(
"/cancel/{simulation_id}", response_model=StatusSimulationIdGetResponse
) # NOT IN SPEC
def cancel_job(simulation_id: str) -> StatusSimulationIdGetResponse:
def cancel_job(
simulation_id: str, redis_conn=Depends(get_redis)
) -> StatusSimulationIdGetResponse:
"""
Cancel a simulation
"""
status = kill_job(simulation_id)
status = kill_job(simulation_id, redis_conn)
logging.info(status)
if not isinstance(status, str):
return status
Expand All @@ -83,7 +87,11 @@ def cancel_job(simulation_id: str) -> StatusSimulationIdGetResponse:


@app.post("/{operation}", response_model=JobResponse)
def operate(operation: str, body: Operation) -> JobResponse:
def operate(
operation: str,
body: Operation,
redis_conn=Depends(get_redis),
) -> JobResponse:
def check(otype):
if isinstance(body, otype):
return None
Expand All @@ -103,4 +111,4 @@ def check(otype):
check(EnsembleCalibrate)
case _:
raise HTTPException(status_code=404, detail="Operation not found")
return create_job(body, operation)
return create_job(body, operation, redis_conn)
17 changes: 15 additions & 2 deletions service/models.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from __future__ import annotations
import socket
import logging

from enum import Enum
from typing import ClassVar, Dict, List, Optional
from pydantic import BaseModel, Field, Extra


from utils.tds import fetch_dataset, fetch_model
from utils.rabbitmq import gen_rabbitmq_hook
from utils.tds import fetch_dataset, fetch_model
from settings import settings

TDS_CONFIGURATIONS = "/model_configurations/"
Expand Down Expand Up @@ -199,11 +201,22 @@ def gen_pyciemss_args(self, job_id):

dataset_path = fetch_dataset(self.dataset.dict(), TDS_URL, job_id)

# TODO: Test RabbitMQ
try:
hook = gen_rabbitmq_hook(job_id)
except socket.gaierror:
logging.warning(
"%s: Failed to connect to RabbitMQ. Unable to log progress", job_id
)

def hook(_):
return None

return {
"petri_model_or_path": amr_path,
"timepoints": timepoints,
"data_path": dataset_path,
"progress_hook": gen_rabbitmq_hook(job_id),
"progress_hook": hook,
"visual_options": True,
**self.extra.dict(),
}
Expand Down
21 changes: 10 additions & 11 deletions service/utils/rq_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from __future__ import annotations

import logging
import uuid
from uuid import uuid4
import json
import requests

Expand All @@ -22,9 +22,8 @@
logging.getLogger().setLevel(logging.DEBUG)


# REDIS CONNECTION AND QUEUE OBJECTS
redis = Redis(settings.REDIS_HOST, settings.REDIS_PORT)
queue = Queue(connection=redis, default_timeout=-1)
def get_redis():
return Redis(settings.REDIS_HOST, settings.REDIS_PORT)


def update_status_on_job_fail(job, connection, etype, value, traceback):
Expand All @@ -41,9 +40,8 @@ def update_status_on_job_fail(job, connection, etype, value, traceback):
logging.exception(log_message)


def create_job(request_payload, sim_type):
random_id = str(uuid.uuid4())
job_id = f"ciemss-{random_id}"
def create_job(request_payload, sim_type, redis_conn):
job_id = f"ciemss-{uuid4()}"

post_url = TDS_URL + TDS_SIMULATIONS
payload = {
Expand All @@ -66,6 +64,7 @@ def create_job(request_payload, sim_type):
)
logging.info(response.content)

queue = Queue(connection=redis_conn, default_timeout=-1)
queue.enqueue_call(
func="execute.run",
args=[request_payload],
Expand All @@ -77,7 +76,7 @@ def create_job(request_payload, sim_type):
return {"simulation_id": job_id}


def fetch_job_status(job_id):
def fetch_job_status(job_id, redis_conn):
"""Fetch a job's results from RQ.
Args:
Expand All @@ -89,7 +88,7 @@ def fetch_job_status(job_id):
content: contains the job's results.
"""
try:
job = Job.fetch(job_id, connection=redis)
job = Job.fetch(job_id, connection=redis_conn)
# r = job.latest_result()
# string_res = r.return_value
result = job.get_status()
Expand All @@ -101,9 +100,9 @@ def fetch_job_status(job_id):
)


def kill_job(job_id):
def kill_job(job_id, redis_conn):
try:
job = Job.fetch(job_id, connection=redis)
job = Job.fetch(job_id, connection=redis_conn)
except NoSuchJobError:
return Response(
status_code=status.HTTP_404_NOT_FOUND,
Expand Down
7 changes: 7 additions & 0 deletions service/utils/tds.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ def attach_files(output: dict, tds_api, simulation_endpoint, job_id, status="com
presigned_upload_url = upload_response.json()["url"]
with open(location, "rb") as f:
upload_response = requests.put(presigned_upload_url, f)
if upload_response.status_code >= 300:
raise Exception(
(
"Failed to upload file to TDS "
f"(status: {upload_response.status_code}): {handle}"
)
)
else:
logging.error(f"{job_id} ran into error")

Expand Down
28 changes: 28 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import json
import os

import pytest


@pytest.fixture
def example_context(request):
ctx = {}
chosen = request.node.get_closest_marker("example_dir").args[0]
path_prefix = f"./tests/examples/{chosen}"
with open(f"{path_prefix}/input/request.json", "r") as file:
ctx["request"] = json.load(file)
with open(f"{path_prefix}/output/tds_simulation.json", "r") as file:
ctx["tds_simulation"] = json.load(file)

def fetch(handle, return_path=False):
io_dir = (
"input" if os.path.exists(f"{path_prefix}/input/{handle}") else "output"
)
path = f"{path_prefix}/{io_dir}/{handle}"
if return_path:
return os.path.abspath(path)
with open(path, "r") as file:
return file.read()

ctx["fetch"] = fetch
return ctx
Empty file added tests/integration/__init__.py
Empty file.
Loading

0 comments on commit 5304854

Please sign in to comment.