Skip to content

Commit

Permalink
terarium dataservice migration changes (#158)
Browse files Browse the repository at this point in the history
Co-authored-by: David Gauldie <dgauldie@uncharted.software>
Co-authored-by: dvince <dvince@uncharted.software>
  • Loading branch information
3 people authored Feb 7, 2024
1 parent bbd9cc9 commit 50823b8
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 14 deletions.
1 change: 1 addition & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ version = "0.15.0"

[deps]
AMQPClient = "79c8b4cd-a41a-55fa-907c-fab5288e1383"
Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f"
CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b"
Catlab = "134e5e36-593f-5add-ad60-77f754baafbe"
Clang_jll = "0ee61d77-7f21-5576-8119-9fcc46b10100"
Expand Down
55 changes: 41 additions & 14 deletions src/SimulationService.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import PrecompileTools: @recompile_invalidations, @compile_workload

@recompile_invalidations begin
import AMQPClient
import Base64
import CSV
import DataFrames: DataFrame, names, rename!
import Dates: Dates, DateTime, now, UTC
Expand Down Expand Up @@ -52,6 +53,8 @@ const PORT = Ref{Int}()
# Terrarium Data Service (TDS)
const ENABLE_TDS = Ref{Bool}()
const TDS_URL = Ref{String}()
const TDS_USER = Ref{String}()
const TDS_PASSWORD = Ref{String}()
const TDS_RETRIES = Ref{Int}()
# RabbitMQ (Note: assumes running on localhost)
const RABBITMQ_ENABLED = Ref{Bool}()
Expand All @@ -64,6 +67,8 @@ const RABBITMQ_SSL = Ref{Bool}()

const queue_dict = Dict{String, String}()

const basic_auth_header = Ref{Pair{String, String}}()

function __init__()
if Threads.nthreads() == 1
@warn "SimulationService.jl expects `Threads.nthreads() > 1`. Use e.g. `julia --threads=auto`."
Expand All @@ -76,7 +81,9 @@ function __init__()
HOST[] = get(ENV, "SIMSERVICE_HOST", "0.0.0.0")
PORT[] = parse(Int, get(ENV, "SIMSERVICE_PORT", "8080"))
ENABLE_TDS[] = get(ENV, "SIMSERVICE_ENABLE_TDS", "true") == "true"
TDS_URL[] = get(ENV, "SIMSERVICE_TDS_URL", "http://localhost:8001")
TDS_URL[] = get(ENV, "SIMSERVICE_TDS_URL", "http://localhost:3000")
TDS_USER[] = get(ENV, "SIMSERVICE_TDS_USER", "user")
TDS_PASSWORD[] = get(ENV, "SIMSERVICE_TDS_PASSWORD", "password")
TDS_RETRIES[] = parse(Int, get(ENV, "SIMSERVICE_TDS_RETRIES", "10"))
RABBITMQ_ENABLED[] = get(ENV, "SIMSERVICE_RABBITMQ_ENABLED", "false") == "true" && ENABLE_TDS[]
RABBITMQ_LOGIN[] = get(ENV, "SIMSERVICE_RABBITMQ_LOGIN", "guest")
Expand All @@ -102,13 +109,19 @@ function __init__()
AMQPClient.queue_declare(rabbitmq_channel[], RABBITMQ_ROUTE[];)
end

encoded_credentials = Base64.base64encode("$(TDS_USER[]):$(TDS_PASSWORD[])")
basic_auth_header[] = "Authorization" => "Basic $encoded_credentials"



v = Pkg.Types.read_project("Project.toml").version
@info "__init__ SimulationService with Version = $v"
end

#-----------------------------------------------------------------------------# start!
function start!(; host=HOST[], port=PORT[], kw...)
@info "starting server on $host:$port. nthreads=$(Threads.nthreads())"

ENABLE_TDS[] || @warn "TDS is disabled. Some features will not work."
stop!() # Stop server if it's already running
server_url[] = "http://$host:$port"
Expand Down Expand Up @@ -143,9 +156,11 @@ function stop!()
end

#-----------------------------------------------------------------------------# utils
json_header = ["Content-Type" => "application/json"]
json_content_header = "Content-Type" => "application/json"
snake_case_header = "X-Enable-Snake-Case" => ""

get_json(url::String) = JSON3.read(HTTP.get(url, [json_content_header]).body)

get_json(url::String) = JSON3.read(HTTP.get(url, json_header).body)

timestamp() = Dates.format(now(), "yyyy-mm-ddTHH:MM:SS")

Expand Down Expand Up @@ -222,7 +237,7 @@ end
#-----------------------------------------------------------------------------# OperationRequest
Base.@kwdef mutable struct OperationRequest
obj::JSON3.Object = JSON3.Object() # untouched JSON from request sent by HMI
id::String = "sciml-$(UUIDs.uuid4())" # matches DataServiceModel :id
id::String = "$(UUIDs.uuid4())" # matches DataServiceModel :id
route::String = "unknown" # :simulate, :calibrate, etc.
model::Union{Nothing, JSON3.Object} = nothing # ASKEM Model Representation (AMR)
models::Union{Nothing, Vector{JSON3.Object}} = nothing # Multiple models (in AMR)
Expand All @@ -248,7 +263,7 @@ function OperationRequest(req::HTTP.Request, route::String)

# Use custom MQ if specified
params = HTTP.queryparams(req)
if haskey(params, "queue")
if haskey(params, "queue")
queue_name = params["queue"]
queue_dict[o.id] = queue_name
AMQPClient.queue_declare(rabbitmq_channel[], queue_name; passive=true)
Expand Down Expand Up @@ -324,7 +339,7 @@ Base.@kwdef mutable struct DataServiceModel
engine::String = "sciml" # (ignore) TDS supports multiple engine. We are the `sciml` engine.
type::String = "" # :calibration, :calibration_simulation, :ensemble, :simulation
execution_payload::JSON3.Object = JSON3.Object() # untouched JSON from request sent by HMI
workflow_id::String = "IGNORED" # (ignore)
workflow_id::Union{Nothing, String} = nothing # (ignore)
# Optional
name::Union{Nothing, String} = nothing # (ignore)
description::Union{Nothing, String} = nothing # (ignore)
Expand Down Expand Up @@ -412,22 +427,27 @@ function DataServiceModel(id::String)
@info "DataServiceModel($(repr(id)))"
check = (_, e) -> e isa HTTP.Exceptions.StatusError && ex.status == 404
delays = fill(1, TDS_RETRIES[])
res = retry(() -> HTTP.get("$(TDS_URL[])/simulations/$id"); delays, check)()

res = retry(() -> HTTP.get("$(TDS_URL[])/simulations/$id", [basic_auth_header[], snake_case_header, json_content_header]); delays, check)()
return JSON3.read(res.body, DataServiceModel)
end

function get_model(id::String)
@assert ENABLE_TDS[]
@info "get_model($(repr(id)))"
get_json("$(TDS_URL[])/model_configurations/$id").configuration

tds_url = "$(TDS_URL[])/model-configurations/$id"

JSON3.read(HTTP.get(tds_url, [basic_auth_header[], json_content_header, snake_case_header]).body).configuration
end

function get_dataset(obj::JSON3.Object)
@assert ENABLE_TDS[]
@info "get_dataset with obj = $(JSON3.write(obj))"

tds_url = "$(TDS_URL[])/datasets/$(obj.id)/download-url?filename=$(obj.filename)"
s3_url = get_json(tds_url).url

s3_url = JSON3.read(HTTP.get(tds_url, [basic_auth_header[], json_content_header, snake_case_header]).body).url
df = CSV.read(download(s3_url), DataFrame)

for (k,v) in get(obj, :mappings, Dict())
Expand Down Expand Up @@ -458,7 +478,10 @@ function create(o::OperationRequest)
@warn "TDS disabled - `create` $o: $body"
return body
end
HTTP.post("$(TDS_URL[])/simulations/", json_header; body)


new_id = JSON3.read(HTTP.post("$(TDS_URL[])/simulations", [basic_auth_header[], json_content_header, snake_case_header]; body).body).id
o.id = new_id
end

# update the DataServiceModel in TDS: PUT /simulations/{id}
Expand All @@ -475,7 +498,9 @@ function update(o::OperationRequest; kw...)
setproperty!(m, k, v) :
setproperty!(m, k, Base.nonnothingtype(fieldtype(DataServiceModel, k))(v))
end
HTTP.put("$(TDS_URL[])/simulations/$(o.id)", json_header; body=JSON3.write(m))


HTTP.put("$(TDS_URL[])/simulations/$(o.id)", [basic_auth_header[], json_content_header, snake_case_header]; body=JSON3.write(m))
end

function complete(o::OperationRequest)
Expand All @@ -497,15 +522,17 @@ function complete(o::OperationRequest)
# everything else as JSON file
body = JSON3.write(o.result)
filename = "result.json"
header = json_header
header = json_content_header
end
if !ENABLE_TDS[]
@warn "TDS disabled - `complete` $o: summary(body) = $(summary(body))"
return body
end

tds_url = "$(TDS_URL[])/simulations/$(o.id)/upload-url?filename=$filename"
s3_url = get_json(tds_url).url

s3_url = JSON3.read(HTTP.get(tds_url, [basic_auth_header[], json_content_header, snake_case_header]).body).url

HTTP.put(s3_url, header; body=body)
update(o; status = "complete", completed_time = timestamp(), result_files = [filename])

Expand Down Expand Up @@ -563,7 +590,7 @@ get(ENV, "SIMSERVICE_PRECOMPILE", "true") == "true" && include("precompile.jl")

#-----------------------------------------------------------------------------# PackageCompilers.jl entry
function julia_main()::Cint
start!();
start!();
while true sleep(10000) end
return 0
end
Expand Down

0 comments on commit 50823b8

Please sign in to comment.