Skip to content

Commit

Permalink
working
Browse files Browse the repository at this point in the history
  • Loading branch information
marshHawk4 committed Jul 13, 2023
1 parent 8b3f77f commit ec4bdb0
Show file tree
Hide file tree
Showing 6 changed files with 472 additions and 405 deletions.
3 changes: 2 additions & 1 deletion api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class Config:
extra = ExtraEnum.allow

num_samples: int = Field(
..., description="number of samples for a CIEMSS simulation", example=100
100, description="number of samples for a CIEMSS simulation", example=100
)
# start_state: Optional[dict[str,float]]
# pseudocount: float = Field(
Expand Down Expand Up @@ -139,3 +139,4 @@ class JobResponse(BaseModel):

class StatusSimulationIdGetResponse(BaseModel):
status: Optional[Status] = None
progress: Optional[float]
24 changes: 12 additions & 12 deletions rabbit/pika_service.py → api/pika_service.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
import pika, sys, os
import json
import redis
import time
import logging
PIKA_HOST = os.getenv("PIKA_HOST","pika.pyciemss")
REDIS = os.getenv("REDIS_HOST", "redis")
r = redis.Redis(host=REDIS, port=6379, decode_responses=True)

## This is for testing locally
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
def pika_service():
connection = pika.BlockingConnection(pika.ConnectionParameters(host=PIKA_HOST))
channel = connection.channel()

channel.queue_declare(queue='terarium')

def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
print(json.loads(body))
resp = json.loads(body)
r.set(resp.get('job_id'), resp.get('progress'))
print('after')


channel.basic_consume(queue='terarium', on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
17 changes: 14 additions & 3 deletions api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,15 @@
SimulatePostRequest,
StatusSimulationIdGetResponse,
)
from pika_service import pika_service
import os
import redis
import sys
from threading import Thread
import time

REDIS = os.getenv("REDIS_HOST")
r = redis.Redis(host=REDIS, port=6379, decode_responses=True)

def build_api(*args) -> FastAPI:

Expand Down Expand Up @@ -46,11 +54,12 @@ def get_status(simulation_id: str) -> StatusSimulationIdGetResponse:
from utils import fetch_job_status

status = fetch_job_status(simulation_id)
print(status)
progress = r.get(simulation_id)

if not isinstance(status, str):
return status

return {"status": Status.from_rq(status)}
return {"status": Status.from_rq(status), "progress":progress}

import logging
@app.post("/simulate", response_model=JobResponse)
Expand Down Expand Up @@ -90,7 +99,6 @@ def calibrate_model(body: CalibratePostRequest) -> JobResponse:
from utils import create_job

# Parse request body
print(body)
engine = str(body.engine).lower()
model_config_id = body.model_config_id
dataset = body.dataset
Expand Down Expand Up @@ -128,3 +136,6 @@ def create_ensemble(body: EnsemblePostRequest) -> JobResponse:
)


time.sleep(10)
thread = Thread(target = pika_service)
thread.start()
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ services:
- data-api # TODO Remove in production
depends_on:
- redis
- pika
volumes:
- $PWD/api:/api
extra_hosts:
Expand Down
Loading

0 comments on commit ec4bdb0

Please sign in to comment.