Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/pika #9

Merged
merged 14 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,20 @@ in order to pull the PyCIEMSS repository in as a submodule and set up the enviro
to start the containers and the API. The API url will be `http://localhost:8010` by default

## Notes

### Result Files
Every operation saves 3 files to S3
- `result.csv`
- `eval.csv`
- `visualization.json`

### RabbitMQ
Only the `calibrate` operation reports progress to RabbitMQ. This is to
the `simulation-status` queue with a payload that looks like `{"job_id": "some string", "progress": "float between 0 and 1"}`.

The Docker Compose starts rabbitmq AND a mock consumer for the messages. The
mock consumer is only helpful for testing without the full stack.


## License

Expand Down
3 changes: 3 additions & 0 deletions docker/Dockerfile.api
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ RUN poetry config virtualenvs.create false && \
poetry install --no-root --no-cache

COPY service service
COPY README.md README.md

RUN poetry install

ENV REDIS_HOST redis
ENV REDIS_PORT 6379
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile.worker
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ COPY poetry.lock poetry.lock
RUN poetry config virtualenvs.create false && \
poetry install --no-root --no-cache --extras worker

RUN pip install --no-cache-dir git+https://github.com/ciemss/pyciemss.git
RUN pip install --no-cache-dir git+https://github.com/fivegrant/pyciemss.git@fg/remove-pika
COPY service service
COPY README.md README.md

Expand Down
29 changes: 28 additions & 1 deletion docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ services:
- data-api
depends_on:
- redis
- rabbitmq
volumes:
- $PWD/api:/api
- $PWD/service:/service
extra_hosts:
- "host.docker.internal:host-gateway"
redis:
Expand All @@ -41,6 +42,32 @@ services:
depends_on:
- redis
- api
- rabbitmq
networks:
- data-api
- pyciemss
rabbitmq:
container_name: rabbitmq
hostname: rabbitmq
image: 'rabbitmq:3'
healthcheck:
test: ["CMD", "rabbitmqctl", "status"]
interval: 10s
timeout: 5s
retries: 5
ports:
- "5672:5672"
networks:
- pyciemss
rabbitmq-mock-consumer:
container_name: rabbitmq-mock-consumer
build:
context: ..
dockerfile: docker/Dockerfile.api
command: poetry run mockrabbitmq
env_file:
- ../.env
networks:
- pyciemss
depends_on:
- rabbitmq
5 changes: 3 additions & 2 deletions env.sample
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
TDS_URL=http://data-service-api:8000
REDIS_HOST=redis
REDIS_PORT=6379
TDS_URL=http://data-service-api:8000
PYCIEMSS_OUTPUT_FILEPATH=result.csv
RABBITMQ_HOST=rabbitmq.pyciemss
RABBITMQ_PORT=5672
86 changes: 51 additions & 35 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version = "1.6.0"
description = "PyCIEMSS simulation service to run CIEMSS simulations"
authors = ["Powell Fendley", "Five Grant"]
readme = "README.md"
packages = [{include = "api"}, {include = "workers"}]
packages = [{include = "service"}]

[tool.poetry.dependencies]
python = "^3.9"
Expand All @@ -14,13 +14,18 @@ rq = "^1.15.0"
redis = "^4.5.5"
boto3 = "^1.26.147"
uvicorn = "^0.22.0"
pika = "1.3.2"
pandas = "^2.0.0"
torch = { version = "^2.0.1", optional = true }
numpy = { version = "^1.24.0", optional = true }
filelock = "^3.12.2"

[tool.poetry.extras]
worker = ["torch", "numpy"]

[tool.poetry.scripts]
mockrabbitmq = "service.utils.rabbitmq:mock_rabbitmq_consumer"


[build-system]
requires = ["poetry-core"]
Expand Down
2 changes: 2 additions & 0 deletions service/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from settings import settings
from utils.rq_helpers import update_status_on_job_fail
from utils.tds import update_tds_status, cleanup_job_dir, fetch_dataset, fetch_model, attach_files
from utils.rabbitmq import gen_rabbitmq_hook

from pyciemss.PetriNetODE.interfaces import (
load_and_calibrate_and_sample_petri_model,
Expand Down Expand Up @@ -80,6 +81,7 @@ def calibrate_then_simulate(request, *, job_id):
petri_model_or_path=amr_path,
timepoints=timepoints,
data_path=dataset_path,
progress_hook=gen_rabbitmq_hook(job_id),
visual_options=True,
**request.extra.dict()
)
Expand Down
8 changes: 6 additions & 2 deletions service/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@
EnsembleCalibratePostRequest,
StatusSimulationIdGetResponse,
)
import os
import redis
import sys
from threading import Thread
import time

from utils.rq_helpers import create_job, fetch_job_status, kill_job
from utils.rabbitmq import mock_rabbitmq_consumer


logging.basicConfig()
Expand Down Expand Up @@ -79,7 +85,6 @@ def cancel_job(simulation_id: str) -> StatusSimulationIdGetResponse:
return {"status": Status.from_rq(status)}


import logging
@app.post("/simulate", response_model=JobResponse)
def simulate_model(body: SimulatePostRequest) -> JobResponse:
"""
Expand Down Expand Up @@ -118,4 +123,3 @@ def create_calibrate_ensemble(body: EnsembleCalibratePostRequest) -> JobResponse
resp = create_job("operations.ensemble_calibrate", body, "ensemble-calibrate")
response = {"simulation_id": resp["id"]}
return response

4 changes: 3 additions & 1 deletion service/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ class Settings(BaseSettings):
"""
pyciemss-service configuration
"""
TDS_URL: str = "http://data-service-api:8000"
REDIS_HOST: str = "redis"
REDIS_PORT: int = 6379
TDS_URL: str = "http://data-service-api:8000"
RABBITMQ_HOST: str = "rabbitmq.pyciemss"
RABBITMQ_PORT: int = 5672


settings = Settings()
43 changes: 43 additions & 0 deletions service/utils/rabbitmq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import pika, sys, os
import json
import redis
import time
import logging

from settings import settings

conn_config = pika.ConnectionParameters(host=settings.RABBITMQ_HOST, port=settings.RABBITMQ_PORT)


def mock_rabbitmq_consumer():
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
# TODO: Conditionally start on status of rabbitmq
time.sleep(10)
connection = pika.BlockingConnection(conn_config)
channel = connection.channel()

channel.queue_declare(queue='simulation-status')

def callback(ch, method, properties, body):
resp = json.loads(body)
logging.info("job_id:%s; progress:%s", resp['job_id'], resp['progress'])


channel.basic_consume(queue='simulation-status', on_message_callback=callback, auto_ack=True)

logging.info(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()


def gen_rabbitmq_hook(job_id):
connection = pika.BlockingConnection(conn_config)
channel = connection.channel()

def hook(progress):
channel.basic_publish(
exchange='',
routing_key='simulation-status',
body=json.dumps({"job_id":job_id, "progress":progress})
)
return hook