Skip to content

Commit

Permalink
feat: add interface to expose HTTP ports on batch jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
mortenpi committed May 24, 2024
1 parent ad5d0fb commit 4d7ce32
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 10 deletions.
22 changes: 22 additions & 0 deletions docs/src/reference/job-submission.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,28 @@ JuliaHub.submit_job(
)
```

### [Opening ports on batch jobs](@id jobs-batch-expose-port)

If supported and enabled for the given product and user, JuliaHub allows

This exposes

This can be used to

(requiring )

The value will also be set in the `PORT` environment variable, available in the job.

Requires the product to support exposing ports and the user to have access to the feature.

!!! note "Valid port values"

???

!!! warn "Pricing"

Jobs that expose ports may be priced differently per hour than batch jobs that do not.

## [Default Applications](@id jobs-default-apps)

!!! compat "Experimental feature"
Expand Down
1 change: 1 addition & 0 deletions src/JuliaHub.jl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ include("node.jl")
include("jobsubmission.jl")
include("PackageBundler/PackageBundler.jl")
include("jobs/jobs.jl")
include("jobs/request.jl")
include("jobs/logging.jl")
include("jobs/logging-kafka.jl")
include("jobs/logging-legacy.jl")
Expand Down
7 changes: 7 additions & 0 deletions src/authentication.jl
Original file line number Diff line number Diff line change
Expand Up @@ -453,3 +453,10 @@ function reauthenticate!(
auth._tokenpath = new_auth._tokenpath
return auth
end

# This can be interpolated into the docstrings of functions that take the
# auth::Authentication = __auth__() keyword argument.
const _DOCS_authentication_kwarg = """
* `auth :: Authentication`: optional authentication object (see
[the authentication section](@ref authentication) for more information)
"""
34 changes: 33 additions & 1 deletion src/batchimages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Base.@kwdef struct BatchImage
_cpu_image_key::Union{String, Nothing}
_gpu_image_key::Union{String, Nothing}
_is_product_default::Bool
_interactive_product_name::Union{String, Nothing}
end

function Base.show(io::IO, image::BatchImage)
Expand All @@ -33,6 +34,10 @@ function Base.show(io::IO, ::MIME"text/plain", image::BatchImage)
print(io, '\n', " image: ", image.image)
isnothing(image._cpu_image_key) || print(io, "\n CPU image: ", image._cpu_image_key)
isnothing(image._gpu_image_key) || print(io, "\n GPU image: ", image._gpu_image_key)
if !isnothing(image._interactive_product_name)
print(io, "\n Features:")
print(io, "\n - Expose Port: ✓")
end
end

# This value is used in BatchImages objects when running against older JuliaHub
Expand Down Expand Up @@ -283,21 +288,48 @@ function _is_batch_app(app::DefaultApp)
compute_type in ("batch", "singlenode-batch") && (input_type == "userinput")
end

function _is_interactive_batch_app(app::DefaultApp)
# Like _is_batch_app, this should return false for JuliaHub <= 6.1
compute_type = get(app._json, "compute_type_name", nothing)
input_type = get(app._json, "input_type_name", nothing)
compute_type in ("distributed-interactive",) && (input_type == "userinput")
end

function _batchimages_62(auth::Authentication)
image_groups = _product_image_groups(auth)
batchapps = filter(_is_batch_app, _apps_default(auth))
batchapps, interactiveapps = let apps = _apps_default(auth)
filter(_is_batch_app, apps), filter(_is_interactive_batch_app, apps)
end
batchimages = map(batchapps) do app
product_name = app._json["product_name"]
image_group = app._json["image_group"]
images = get(image_groups, image_group, [])
if isempty(images)
@warn "Invalid image_group '$image_group' for '$product_name'" app
end
matching_interactive_app = filter(interactiveapps) do app
get(app._json, "image_group", nothing) == image_group
end
interactive_product_name = if length(matching_interactive_app) > 1
# If there are multiple interactive products configured for a batch product
# we issue a warning and disable the 'interactive' compute for it (i.e. the user
# won't be able to start jobs that require a port to be exposed until the configuration
# issue is resolved).
@warn "Multiple matching interactive apps for $(app)" image_group matches = matching_interactive_app
nothing
elseif isempty(matching_interactive_app)
# If we can't find a matching 'distributed-interactive' product, we disable the
# ability for the user to expose a port with this image.
nothing
else
only(matching_interactive_app)._json["product_name"]
end
map(images) do (display_name, imagekey)
BatchImage(;
product=product_name, image=display_name,
_cpu_image_key=imagekey.cpu, _gpu_image_key=imagekey.gpu,
_is_product_default=imagekey.isdefault,
_interactive_product_name = interactive_product_name,
)
end
end
Expand Down
92 changes: 92 additions & 0 deletions src/jobs/request.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""
function request(
job::Job,
method::AbstractString,
uripath::AbstractString,
[body];
[auth::Authentication],
[extra_headers],
kwargs...
) -> HTTP.Response
Performs a HTTP against the HTTP server exposed by the job with the authentication
token of the authenticated user. The function is a thin wrapper around the `HTTP.request`
function, constructing the correct URL and setting the authentication headers.
Arguments:
* `job::Job`: JuliaHub job (from [`JuliaHub.job`](@ref))
* `method::AbstractString`: HTTP method (gets directly passed to HTTP.jl)
* `uripath::AbstractString`: the path and query portion of the URL, which gets
appended to the scheme and hostname port of the URL. Must start with a `/`.
* `body`: gets passed as the `body` argument to HTTP.jl
Keyword arguments:
$(_DOCS_authentication_kwarg)
* `extra_headers`: an iterable of extra HTTP headers, that gets concatenated
with the list of necessary authentication headers and passed on to `HTTP.request`.
* Additional keyword arguments must be valid HTTP.jl keyword arguments and will
get directly passed to the `HTTP.request` function.
!!! note
See the [manual section on exposing ports](@ref jobs-batch-expose-port) and
the `expose` argument to [`submit_job`](@ref).
"""
function request(
job::Job,
method::AbstractString,
uripath::AbstractString,
body::Any = UInt8[];
auth::Authentication=__auth__(),
extra_headers::Vector{Any} = [],
kwargs...
)
proxyhost = _job_proxy_host(job)
if isnothing(proxyhost)
throw(ArgumentError("Job '$(job.id)' does not expose a HTTP endpoint."))
end
if !startswith(uripath, "/")
throw(ArgumentError("'uripath' must start with a /, got: '$uripath'"))
end
HTTP.request(
method,
string("https://", proxyhost, uripath),
[_authheaders(auth)..., extra_headers...],
body;
kwargs...
)
end

function _job_proxy_host(job::Job)
proxy_link = get(job._json, "proxy_link", "")
if isempty(proxy_link)
return nothing
end
uri = try
uri = URIs.URI(proxy_link)
checks = (
uri.scheme == "https",
!isempty(uri.host),
isempty(uri.path) || uri.path == "/",
isempty(uri.query),
isempty(uri.fragment),
)
@show checks
all(checks) ? uri : nothing
catch e
isa(e, ParseError) || rethrow()
nothing
end
if isnothing(uri)
throw(JuliaHubError("Invalid proxy_link value for job: $(job.id)\n proxy_link=$(proxy_link)"))
return nothing
end
return uri.host
end
62 changes: 53 additions & 9 deletions src/jobsubmission.jl
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ function _upload_appbundle(appbundle_tar_path::AbstractString; auth::Authenticat
Mocking.@mock _restput_mockable(
upload_url,
["Content-Length" => filesize(appbundle_tar_path)],
input
input,
)
end
# The response body of a successful upload is empty
Expand Down Expand Up @@ -955,6 +955,7 @@ struct WorkloadConfig
env::Dict{String, String}
project::Union{UUIDs.UUID, Nothing}
timelimit::Union{Dates.Hour, Unlimited}
exposed_port::Union{Int, Nothing}
# internal, undocumented, may be removed an any point, not part of the public API:
_image_sha256::Union{String, Nothing}

Expand All @@ -964,6 +965,7 @@ struct WorkloadConfig
env=(),
project::Union{UUIDs.UUID, Nothing}=nothing,
timelimit::Limit=_DEFAULT_WorkloadConfig_timelimit,
expose::Union{Integer, Nothing}=nothing,
# internal, undocumented, may be removed an any point, not part of the public API:
_image_sha256::Union{AbstractString, Nothing}=nothing,
)
Expand All @@ -974,18 +976,29 @@ struct WorkloadConfig
),
)
end
if !isnothing(expose) && !_is_valid_port_range(expose)
Base.throw(
ArgumentError(
"Invalid port value for expose: '$(expose)', must be >= 1 & <= 1 65535"
),
)
end
new(
app,
compute,
alias,
Dict(string(k) => v for (k, v) in pairs(env)),
project,
@_timelimit(timelimit),
expose,
_image_sha256,
)
end
end

# TODO: need to define the valid port range
_is_valid_port_range(port::Integer) = 1 <= port <= 65535

_is_gpu_job(workload::WorkloadConfig) = workload.compute.node.hasGPU

function Base.show(io::IO, ::MIME"text/plain", jc::WorkloadConfig)
Expand Down Expand Up @@ -1020,8 +1033,8 @@ end
elastic::Bool = false,
process_per_node::Bool = true,
# Runtime configuration keyword arguments
[alias::AbstractString], [env], [project::Union{UUID, AbstractString}],
timelimit::Limit = Hour(1),
[alias::AbstractString], [env], [expose::Integer],
[project::Union{UUID, AbstractString}], timelimit::Limit = Hour(1),
# General keyword arguments
dryrun::Bool = false,
[auth :: Authentication]
Expand Down Expand Up @@ -1057,10 +1070,12 @@ of the job.
with. If a string is passed, it must parse as a valid UUID. Passing `nothing` is equivalent to omitting the
argument.
* `expose :: Union{Integer, Nothing}`: if set to an integer in the valid port range, that port will be exposed
over HTTPS, allowing for (authenticated) HTTP calls. [See the relevant manual section for more information.](@ref jobs-batch-expose-port)
**General arguments.**
* `auth :: Authentication`: optional authentication object (see [the authentication section](@ref authentication)
for more information)
$(_DOCS_authentication_kwarg)
* `dryrun :: Bool`: if set to true, `submit_job` does not actually submit the job, but instead
returns a [`WorkloadConfig`](@ref) object, which can be used to inspect the configuration
Expand Down Expand Up @@ -1116,6 +1131,7 @@ function submit_job(
env=(),
project::Union{UUIDs.UUID, AbstractString, Nothing}=nothing,
timelimit::Limit=_DEFAULT_WorkloadConfig_timelimit,
expose::Union{Integer, Nothing}=nothing,
# internal, undocumented, may be removed an any point, not part of the public API:
_image_sha256::Union{AbstractString, Nothing}=nothing,
# General submit_job arguments
Expand All @@ -1135,8 +1151,8 @@ function submit_job(
project
end
submit_job(
WorkloadConfig(app, compute; alias, env, project, timelimit, _image_sha256);
kwargs...
WorkloadConfig(app, compute; alias, env, project, timelimit, expose, _image_sha256);
kwargs...,
)
end

Expand Down Expand Up @@ -1231,6 +1247,34 @@ function _job_submit_args(
else
(;)
end
# Note: this set of arguments will also set product_name which must override the value
# in `image_args`, achieved by splatting it later in the named tuple constructor below.
exposed_port_args = if !isnothing(workload.exposed_port)
product_name = if isnothing(batch.image)
# If the image was not specified for the job submissions, we assume that the
# corresponding interactive product is called 'standard-interactive' and that it
# is available to the user (we can not verify that at this point anymore though).
"standard-interactive"
elseif isnothing(batch.image._interactive_product_name)
throw(
InvalidRequestError(
"Product '$(batch.image.product_name)' does not support exposing a port."
),
)
else
batch.image._interactive_product_name
end
(;
product_name,
appArgs=Dict(
"authentication" => true,
"authorization" => "me",
"port" => workload.exposed_port,
),
)
else
(;)
end
sysimage_args = if batch.sysimage
sysimage_manifest_sha = _sysimage_manifest_sha(batch.environment)
if isnothing(sysimage_manifest_sha)
Expand All @@ -1242,7 +1286,7 @@ function _job_submit_args(
end
return (;
_job_submit_args(auth, workload, batch, batch.environment, _JobSubmission1; kwargs...)...,
image_args..., sysimage_args...,
image_args..., exposed_port_args..., sysimage_args...,
)
end

Expand Down Expand Up @@ -1290,7 +1334,7 @@ function _job_submit_args(
registry_name=packagejob.registry,
args=merge(
Dict("jobname" => packagejob.name, "jr_uuid" => packagejob.jr_uuid),
packagejob.args
packagejob.args,
),
# Just in case, we want to omit sysimage_build altogether when it is not requested.
sysimage_build=packagejob.sysimage ? true : nothing,
Expand Down

0 comments on commit 4d7ce32

Please sign in to comment.