Skip to content

Commit

Permalink
Track progress of calibrate using RabbitMQ (#9)
Browse files Browse the repository at this point in the history
* merging things

* working service with latest changes

* Pika status

* set pyciemms to offical repo

* working

* working

* removed print

* remove test

* removed print

* Fix pika upstream

* Fix pika usage

* Deploy in separate image

* Fix mock consumer and README

---------

Co-authored-by: Five Grant <5@fivegrant.com>
  • Loading branch information
marshHawk4 and fivegrant committed Aug 10, 2023
1 parent 71a8490 commit ca18151
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 43 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,20 @@ in order to pull the PyCIEMSS repository in as a submodule and set up the enviro
to start the containers and the API. The API url will be `http://localhost:8010` by default

## Notes

### Result Files
Every operation saves 3 files to S3
- `result.csv`
- `eval.csv`
- `visualization.json`

### RabbitMQ
Only the `calibrate` operation reports progress to RabbitMQ. This is to
the `simulation-status` queue with a payload that looks like `{"job_id": "some string", "progress": "float between 0 and 1"}`.

The Docker Compose starts rabbitmq AND a mock consumer for the messages. The
mock consumer is only helpful for testing without the full stack.


## License

Expand Down
3 changes: 3 additions & 0 deletions docker/Dockerfile.api
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ RUN poetry config virtualenvs.create false && \
poetry install --no-root --no-cache

COPY service service
COPY README.md README.md

RUN poetry install

ENV REDIS_HOST redis
ENV REDIS_PORT 6379
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile.worker
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ COPY poetry.lock poetry.lock
RUN poetry config virtualenvs.create false && \
poetry install --no-root --no-cache --extras worker

RUN pip install --no-cache-dir git+https://github.com/ciemss/pyciemss.git
RUN pip install --no-cache-dir git+https://github.com/fivegrant/pyciemss.git@fg/remove-pika
COPY service service
COPY README.md README.md

Expand Down
29 changes: 28 additions & 1 deletion docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ services:
- data-api
depends_on:
- redis
- rabbitmq
volumes:
- $PWD/api:/api
- $PWD/service:/service
extra_hosts:
- "host.docker.internal:host-gateway"
redis:
Expand All @@ -41,6 +42,32 @@ services:
depends_on:
- redis
- api
- rabbitmq
networks:
- data-api
- pyciemss
rabbitmq:
container_name: rabbitmq
hostname: rabbitmq
image: 'rabbitmq:3'
healthcheck:
test: ["CMD", "rabbitmqctl", "status"]
interval: 10s
timeout: 5s
retries: 5
ports:
- "5672:5672"
networks:
- pyciemss
rabbitmq-mock-consumer:
container_name: rabbitmq-mock-consumer
build:
context: ..
dockerfile: docker/Dockerfile.api
command: poetry run mockrabbitmq
env_file:
- ../.env
networks:
- pyciemss
depends_on:
- rabbitmq
5 changes: 3 additions & 2 deletions env.sample
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
TDS_URL=http://data-service-api:8000
REDIS_HOST=redis
REDIS_PORT=6379
TDS_URL=http://data-service-api:8000
PYCIEMSS_OUTPUT_FILEPATH=result.csv
RABBITMQ_HOST=rabbitmq.pyciemss
RABBITMQ_PORT=5672
86 changes: 51 additions & 35 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version = "1.6.0"
description = "PyCIEMSS simulation service to run CIEMSS simulations"
authors = ["Powell Fendley", "Five Grant"]
readme = "README.md"
packages = [{include = "api"}, {include = "workers"}]
packages = [{include = "service"}]

[tool.poetry.dependencies]
python = "^3.9"
Expand All @@ -14,13 +14,18 @@ rq = "^1.15.0"
redis = "^4.5.5"
boto3 = "^1.26.147"
uvicorn = "^0.22.0"
pika = "1.3.2"
pandas = "^2.0.0"
torch = { version = "^2.0.1", optional = true }
numpy = { version = "^1.24.0", optional = true }
filelock = "^3.12.2"

[tool.poetry.extras]
worker = ["torch", "numpy"]

[tool.poetry.scripts]
mockrabbitmq = "service.utils.rabbitmq:mock_rabbitmq_consumer"


[build-system]
requires = ["poetry-core"]
Expand Down
2 changes: 2 additions & 0 deletions service/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from settings import settings
from utils.rq_helpers import update_status_on_job_fail
from utils.tds import update_tds_status, cleanup_job_dir, fetch_dataset, fetch_model, attach_files
from utils.rabbitmq import gen_rabbitmq_hook

from pyciemss.PetriNetODE.interfaces import (
load_and_calibrate_and_sample_petri_model,
Expand Down Expand Up @@ -80,6 +81,7 @@ def calibrate_then_simulate(request, *, job_id):
petri_model_or_path=amr_path,
timepoints=timepoints,
data_path=dataset_path,
progress_hook=gen_rabbitmq_hook(job_id),
visual_options=True,
**request.extra.dict()
)
Expand Down
8 changes: 6 additions & 2 deletions service/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@
EnsembleCalibratePostRequest,
StatusSimulationIdGetResponse,
)
import os
import redis
import sys
from threading import Thread
import time

from utils.rq_helpers import create_job, fetch_job_status, kill_job
from utils.rabbitmq import mock_rabbitmq_consumer


logging.basicConfig()
Expand Down Expand Up @@ -79,7 +85,6 @@ def cancel_job(simulation_id: str) -> StatusSimulationIdGetResponse:
return {"status": Status.from_rq(status)}


import logging
@app.post("/simulate", response_model=JobResponse)
def simulate_model(body: SimulatePostRequest) -> JobResponse:
"""
Expand Down Expand Up @@ -118,4 +123,3 @@ def create_calibrate_ensemble(body: EnsembleCalibratePostRequest) -> JobResponse
resp = create_job("operations.ensemble_calibrate", body, "ensemble-calibrate")
response = {"simulation_id": resp["id"]}
return response

4 changes: 3 additions & 1 deletion service/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ class Settings(BaseSettings):
"""
pyciemss-service configuration
"""
TDS_URL: str = "http://data-service-api:8000"
REDIS_HOST: str = "redis"
REDIS_PORT: int = 6379
TDS_URL: str = "http://data-service-api:8000"
RABBITMQ_HOST: str = "rabbitmq.pyciemss"
RABBITMQ_PORT: int = 5672


settings = Settings()
43 changes: 43 additions & 0 deletions service/utils/rabbitmq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import pika, sys, os
import json
import redis
import time
import logging

from settings import settings

conn_config = pika.ConnectionParameters(host=settings.RABBITMQ_HOST, port=settings.RABBITMQ_PORT)


def mock_rabbitmq_consumer():
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
# TODO: Conditionally start on status of rabbitmq
time.sleep(10)
connection = pika.BlockingConnection(conn_config)
channel = connection.channel()

channel.queue_declare(queue='simulation-status')

def callback(ch, method, properties, body):
resp = json.loads(body)
logging.info("job_id:%s; progress:%s", resp['job_id'], resp['progress'])


channel.basic_consume(queue='simulation-status', on_message_callback=callback, auto_ack=True)

logging.info(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()


def gen_rabbitmq_hook(job_id):
connection = pika.BlockingConnection(conn_config)
channel = connection.channel()

def hook(progress):
channel.basic_publish(
exchange='',
routing_key='simulation-status',
body=json.dumps({"job_id":job_id, "progress":progress})
)
return hook

0 comments on commit ca18151

Please sign in to comment.