Skip to content

Commit

Permalink
Fixed TDAmeritrade#772 Add Ray Support (TDAmeritrade#986)
Browse files Browse the repository at this point in the history
* Initial commit for adding ray support

* Fixed missing import

* Fixed typo

* Refactored coverage reporting

* Force codecov to use coverage.py report instead of generating its own

* Specify coverage.xml in PWD

* Added verbose flag

* Minor change

* Added new codecov token

* Added temporary break in test converage

* Minor change

* Changed name of coverage file

* Expand coverage to all tests, removed break

* Removed comments in workflow

* Removed codecov patch/project status

* Minor change

* Added ability to specify coverage.xml file

* Split xml report and displaying report

* Updated docstrings to include `ray` example

* Fixed flake8 problem

* Added ray docstring examples

* Fixed typo

* Reverted missing docstring

* Minor change

* Reverted docstring

* Minor changes

* Added ray.shutdown()

* Changed how `step` is calculated to be more precise

* Fixed missing paratheses
  • Loading branch information
seanlaw authored Jul 6, 2024
1 parent 5892869 commit fb6b8b8
Show file tree
Hide file tree
Showing 17 changed files with 1,200 additions and 67 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/github-actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,12 @@ jobs:
- name: Run Coverage Tests
run: ./test.sh coverage
shell: bash
- name: Check Coverage Report
run: coverage report -m --fail-under=100 --skip-covered --omit=docstring.py,min.py,stumpy/cache.py
- name: Generate Coverage Report
run: ./test.sh report coverage.stumpy.xml
shell: bash
- name: Upload Coverage Tests Results
uses: codecov/codecov-action@v4
with:
file: ./coverage.stumpy.xml
verbose: true
token: ${{ secrets.CODECOV_TOKEN }}
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ LOG*
PID
.coverage*
coverage.xml
stumpy.coverage.xml
dask-worker-space
stumpy.egg-info
build
Expand All @@ -20,4 +21,4 @@ docs/_build
.mypy_cache
.directory
test.py
*.nbconvert.ipynb
*.nbconvert.ipynb
4 changes: 4 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
coverage:
status:
project: off
patch: off
15 changes: 15 additions & 0 deletions conda.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ if [[ $# -gt 0 ]]; then
if [ $1 == "min" ]; then
install_mode="min"
echo "Installing minimum dependencies with install_mode=\"min\""
elif [[ $1 == "ray" ]]; then
install_mode="ray"
echo "Installing ray dependencies with install_mode=\"ray\""
elif [[ $1 == "numba" ]] && [[ "${arch_name}" != "arm64" ]]; then
install_mode="numba"
echo "Installing numba release candidate dependencies with install_mode=\"numba\""
Expand Down Expand Up @@ -57,6 +60,14 @@ generate_numba_environment_yaml()
grep -Ev "numba|python" environment.yml > environment.numba.yml
}

generate_ray_environment_yaml()
{
# Limit max Python version and append pip install ray
echo "Generating \"environment.ray.yml\" File"
ray_python=`./ray_python_version.py`
sed "/ - python/ s/$/,<=$ray_python/" environment.yml | cat - <(echo $' - pip\n - pip:\n - ray>=2.23.0') > environment.ray.yml
}

fix_libopenblas()
{
if [ ! -f $CONDA_PREFIX/lib/libopenblas.dylib ]; then
Expand All @@ -71,6 +82,7 @@ clean_up()
echo "Cleaning Up"
rm -rf "environment.min.yml"
rm -rf "environment.numba.yml"
rm -rf "environment.ray.yml"
}

###########
Expand All @@ -92,6 +104,9 @@ fi
if [[ $install_mode == "min" ]]; then
generate_min_environment_yaml
mamba env update --name $conda_env --file environment.min.yml || conda env update --name $conda_env --file environment.min.yml
elif [[ $install_mode == "ray" ]]; then
generate_ray_environment_yaml
mamba env update --name $conda_env --file environment.ray.yml || conda env update --name $conda_env --file environment.ray.yml
elif [[ $install_mode == "numba" ]]; then
echo ""
echo "Installing python=$python_version"
Expand Down
File renamed without changes.
17 changes: 17 additions & 0 deletions ray_python_version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/usr/bin/env python

import requests
from packaging.version import Version

classifiers = (
requests.get("https://pypi.org/pypi/ray/json").json().get("info").get("classifiers")
)

versions = []
for c in classifiers:
x = c.split()
if "Python" in x:
versions.append(x[-1])

versions.sort(key=Version)
print(versions[-1])
12 changes: 5 additions & 7 deletions stumpy/aamp_stimp.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,16 +486,15 @@ def __init__(

class aamp_stimped(_aamp_stimp):
"""
Compute the Pan Matrix Profile with a distributed dask cluster
Compute the Pan Matrix Profile with a `dask`/`ray` cluster
This is based on the SKIMP algorithm.
Parameters
----------
client : client
A Dask or Ray Distributed client. Setting up a distributed cluster is beyond
the scope of this library. Please refer to the Dask or Ray Distributed
documentation.
A `dask`/`ray` client. Setting up a cluster is beyond the scope of this library.
Please refer to the `dask`/`ray` documentation.
T : numpy.ndarray
The time series or sequence for which to compute the pan matrix profile
Expand Down Expand Up @@ -556,9 +555,8 @@ def __init__(
Parameters
----------
client : client
A Dask or Ray Distributed client. Setting up a distributed cluster is beyond
the scope of this library. Please refer to the Dask or Ray Distributed
documentation.
A `dask`/`ray` client. Setting up a cluster is beyond the scope of this
library. Please refer to the `dask`/`ray` documentation.
T : numpy.ndarray
The time series or sequence for which to compute the pan matrix profile
Expand Down
163 changes: 154 additions & 9 deletions stumpy/aamped.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def _dask_aamped(
):
"""
Compute the non-normalized (i.e., without z-normalization) matrix profile with a
distributed dask cluster
`dask` cluster
This is a highly distributed implementation around the Numba JIT-compiled
parallelized `_aamp` function which computes the non-normalized matrix profile
Expand All @@ -33,17 +33,15 @@ def _dask_aamped(
Parameters
----------
dask_client : client
A Dask Distributed client. Setting up a distributed cluster is beyond
the scope of this library. Please refer to the Dask Distributed
documentation.
A `dask` client. Setting up a cluster is beyond the scope of this library.
Please refer to the `dask` documentation.
T_A : numpy.ndarray
The time series or sequence for which to compute the matrix profile
T_B : numpy.ndarray
The time series or sequence that will be used to annotate T_A. For every
subsequence in T_A, its nearest neighbor in T_B will be recorded. Default is
`None` which corresponds to a self-join.
subsequence in T_A, its nearest neighbor in T_B will be recorded.
m : int
Window size
Expand Down Expand Up @@ -159,9 +157,157 @@ def _dask_aamped(
return out


def _ray_aamped(
ray_client,
T_A,
T_B,
m,
T_A_subseq_isfinite,
T_B_subseq_isfinite,
p,
diags,
ignore_trivial,
k,
):
"""
Compute the non-normalized (i.e., without z-normalization) matrix profile with a
`ray` cluster
This is a highly distributed implementation around the Numba JIT-compiled
parallelized `_aamp` function which computes the non-normalized matrix profile
according to AAMP.
Parameters
----------
ray_client : client
A `ray` client. Setting up a cluster is beyond the scope of this library.
Please refer to the `ray` documentation.
T_A : numpy.ndarray
The time series or sequence for which to compute the matrix profile
T_B : numpy.ndarray
The time series or sequence that will be used to annotate T_A. For every
subsequence in T_A, its nearest neighbor in T_B will be recorded.
m : int
Window size
T_A_subseq_isfinite : numpy.ndarray
A boolean array that indicates whether a subsequence in `T_A` contains a
`np.nan`/`np.inf` value (False)
T_B_subseq_isfinite : numpy.ndarray
A boolean array that indicates whether a subsequence in `T_B` contains a
`np.nan`/`np.inf` value (False)
p : float
The p-norm to apply for computing the Minkowski distance. Minkowski distance is
typically used with `p` being 1 or 2, which correspond to the Manhattan distance
and the Euclidean distance, respectively.
diags : numpy.ndarray
The diagonal indices
ignore_trivial : bool, default True
Set to `True` if this is a self-join. Otherwise, for AB-join, set this
to `False`. Default is `True`.
k : int, default 1
The number of top `k` smallest distances used to construct the matrix profile.
Note that this will increase the total computational time and memory usage
when k > 1. If you have access to a GPU device, then you may be able to
leverage `gpu_stump` for better performance and scalability.
Returns
-------
out : numpy.ndarray
When k = 1 (default), the first column consists of the matrix profile,
the second column consists of the matrix profile indices, the third column
consists of the left matrix profile indices, and the fourth column consists
of the right matrix profile indices. However, when k > 1, the output array
will contain exactly 2 * k + 2 columns. The first k columns (i.e., out[:, :k])
consists of the top-k matrix profile, the next set of k columns
(i.e., out[:, k:2k]) consists of the corresponding top-k matrix profile
indices, and the last two columns (i.e., out[:, 2k] and out[:, 2k+1] or,
equivalently, out[:, -2] and out[:, -1]) correspond to the top-1 left
matrix profile indices and the top-1 right matrix profile indices, respectively.
"""
core.check_ray(ray_client)

n_A = T_A.shape[0]
n_B = T_B.shape[0]
l = n_A - m + 1

nworkers = core.get_ray_nworkers(ray_client)

ndist_counts = core._count_diagonal_ndist(diags, m, n_A, n_B)
diags_ranges = core._get_array_ranges(ndist_counts, nworkers, False)
diags_ranges += diags[0]

# Scatter data to Ray cluster
T_A_ref = ray_client.put(T_A)
T_B_ref = ray_client.put(T_B)
T_A_subseq_isfinite_ref = ray_client.put(T_A_subseq_isfinite)
T_B_subseq_isfinite_ref = ray_client.put(T_B_subseq_isfinite)

diags_refs = []
for i in range(nworkers):
diags_ref = ray_client.put(
np.arange(diags_ranges[i, 0], diags_ranges[i, 1], dtype=np.int64)
)
diags_refs.append(diags_ref)

ray_aamp_func = ray_client.remote(core.deco_ray_tor(_aamp))

refs = []
for i in range(nworkers):
refs.append(
ray_aamp_func.remote(
T_A_ref,
T_B_ref,
m,
T_A_subseq_isfinite_ref,
T_B_subseq_isfinite_ref,
p,
diags_refs[i],
ignore_trivial,
k,
)
)

results = ray_client.get(refs)
# Must make a mutable copy from Ray's object store (ndarrays are immutable)
profile, profile_L, profile_R, indices, indices_L, indices_R = [
arr.copy() for arr in results[0]
]

for i in range(1, nworkers):
P, PL, PR, I, IL, IR = results[i] # Read-only variables
# Update top-k matrix profile and matrix profile indices
core._merge_topk_PI(profile, P, indices, I)

# Update top-1 left matrix profile and matrix profile index
mask = PL < profile_L
profile_L[mask] = PL[mask]
indices_L[mask] = IL[mask]

# Update top-1 right matrix profile and matrix profile index
mask = PR < profile_R
profile_R[mask] = PR[mask]
indices_R[mask] = IR[mask]

out = np.empty((l, 2 * k + 2), dtype=object)
out[:, :k] = profile
out[:, k : 2 * k + 2] = np.column_stack((indices, indices_L, indices_R))

return out


def aamped(client, T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
"""
Compute the non-normalized (i.e., without z-normalization) matrix profile
with a `dask`/`ray` cluster
This is a highly distributed implementation around the Numba JIT-compiled
parallelized `_aamp` function which computes the non-normalized matrix profile
Expand All @@ -170,9 +316,8 @@ def aamped(client, T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
Parameters
----------
client : client
A Dask or Ray Distributed client. Setting up a distributed cluster is beyond
the scope of this library. Please refer to the Dask or Ray Distributed
documentation.
A `dask`/`ray` client. Setting up a cluster is beyond the scope of this library.
Please refer to the `dask`/`ray` documentation.
T_A : numpy.ndarray
The time series or sequence for which to compute the matrix profile
Expand Down
Loading

0 comments on commit fb6b8b8

Please sign in to comment.