Skip to content

Commit

Permalink
Refactor using test_utils helper for reading time series
Browse files Browse the repository at this point in the history
  • Loading branch information
aayush-se committed Sep 10, 2024
1 parent 94b95af commit 55f2aa6
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 162 deletions.
63 changes: 21 additions & 42 deletions tests/seer/anomaly_detection/detectors/test_anomaly_detectors.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import json
import os
import unittest
from unittest.mock import MagicMock, patch

Expand All @@ -13,6 +11,7 @@
from seer.anomaly_detection.models import MPTimeSeriesAnomalies
from seer.anomaly_detection.models.external import AnomalyDetectionConfig
from seer.anomaly_detection.models.timeseries import TimeSeries
from tests.seer.anomaly_detection.test_utils import convert_synthetic_ts


class TestMPBatchAnomalyDetector(unittest.TestCase):
Expand All @@ -36,51 +35,31 @@ def test_compute_matrix_profile(self, mock_stump):
mock_stump.return_value = np.array([1, 2, 3, 4])
self.scorer.batch_score = MagicMock(return_value=([], []))

# TODO: Convert this file reading routine into a helper in test_util file

# Load in time series JSON files in test_data
dir = "tests/seer/anomaly_detection/detectors/test_data/synthetic_series"
for filename in os.listdir(dir):
f = os.path.join(dir, filename)

if os.path.isfile(f):
if not os.path.isfile(f):
raise Exception("Filename is not a valid file")

file_params = filename.split(".")[0].split("_")
window_size = int(file_params[2])

# Load json and convert to ts and mp_dist
with open(f) as file:

data = json.load(file)
data = data["ts"]

ts_values = np.array([point["value"] for point in data], dtype=np.float64)
ts = TimeSeries(timestamps=np.array([]), values=ts_values)
timeseries, mp_dists, window_sizes = convert_synthetic_ts(
"tests/seer/anomaly_detection/test_data/synthetic_series", as_ts_datatype=False
)

mp_dist_baseline = np.array(
[point["mp_dist"] for point in data], dtype=np.float64
)
for ts_values, mp_dist_baseline, window_size in zip(timeseries, mp_dists, window_sizes):
ts = TimeSeries(timestamps=np.array([]), values=ts_values)

self.ws_selector.optimal_window_size = MagicMock(return_value=window_size)
self.ws_selector.optimal_window_size = MagicMock(return_value=window_size)

self.mp_utils.get_mp_dist_from_mp = MagicMock(return_value=mp_dist_baseline)
self.mp_utils.get_mp_dist_from_mp = MagicMock(return_value=mp_dist_baseline)

result = self.detector._compute_matrix_profile(
ts,
self.config,
ws_selector=self.ws_selector,
mp_config=self.mp_config,
scorer=self.scorer,
mp_utils=self.mp_utils,
)
result = self.detector._compute_matrix_profile(
ts,
self.config,
ws_selector=self.ws_selector,
mp_config=self.mp_config,
scorer=self.scorer,
mp_utils=self.mp_utils,
)

self.assertIsInstance(result, MPTimeSeriesAnomalies)
self.assertIsInstance(result.flags, list)
self.assertIsInstance(result.scores, list)
self.assertIsInstance(result.matrix_profile, np.ndarray)
self.assertIsInstance(result.window_size, int)
self.assertIsInstance(result, MPTimeSeriesAnomalies)
self.assertIsInstance(result.flags, list)
self.assertIsInstance(result.scores, list)
self.assertIsInstance(result.matrix_profile, np.ndarray)
self.assertIsInstance(result.window_size, int)


class TestMPStreamAnomalyDetector(unittest.TestCase):
Expand Down
141 changes: 50 additions & 91 deletions tests/seer/anomaly_detection/detectors/test_mp_scorers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import json
import os
import unittest

import numpy as np

from seer.anomaly_detection.detectors.mp_scorers import MPIRQScorer
from tests.seer.anomaly_detection.test_utils import convert_synthetic_ts


class TestMPScorers(unittest.TestCase):
Expand All @@ -14,104 +11,66 @@ def setUp(self):

def test_batch_score_synthetic_data(self):

def is_anomaly_detected(filename, threshold, window_size, start, end):

if not os.path.isfile(filename):
raise Exception("Filename is not a valid file")

# Load json and convert to ts and mp_dist
with open(filename) as f:

data = json.load(f)
data = data["ts"]
# TODO: sensitivity and direction are placeholders as they are not actually used in scoring yet
sensitivity = ""
direction = ""

ts = np.array([point["value"] for point in data], dtype=np.float64)
mp_dist = np.array([point["mp_dist"] for point in data], dtype=np.float64)
expected_types, timeseries, mp_dists, window_sizes, window_starts, window_ends = (
convert_synthetic_ts(
"tests/seer/anomaly_detection/test_data/synthetic_series",
as_ts_datatype=False,
include_range=True,
)
)

# TODO: sensitivity and direction are placeholders as they are not actually used in scoring yet
sensitivity = ""
direction = ""
threshold = 0.1

actual_scores, actual_flags = self.scorer.batch_score(
ts, mp_dist, sensitivity, direction, window_size
)
for expected_type, ts, mp_dist, window_size, start, end in zip(
expected_types, timeseries, mp_dists, window_sizes, window_starts, window_ends
):

# Calculate percentage of anomaly flags in given range
num_anomalies_detected = 0
for flag in actual_flags[start : end + 1]:
if flag == "anomaly_higher_confidence":
num_anomalies_detected += 1
_, actual_flags = self.scorer.batch_score(
ts, mp_dist, sensitivity, direction, window_size
)

return (
"anomaly"
if (num_anomalies_detected / (end - start + 1)) >= threshold
else "noanomaly"
)
# Calculate percentage of anomaly flags in given range
num_anomalies_detected = 0
for flag in actual_flags[start : end + 1]:
if flag == "anomaly_higher_confidence":
num_anomalies_detected += 1

actual_results = []
expected_results = []

# Check time series JSON files in test_data
dir = "tests/seer/anomaly_detection/detectors/test_data/synthetic_series"
for filename in os.listdir(dir):
f = os.path.join(dir, filename)

if os.path.isfile(f):
# filename is in format expected_type, window_size, start, end separated by '_'
file_params = filename.split(".")[0].split("_")
expected_type, window_size, start, end = (
file_params[1],
int(file_params[2]),
int(file_params[3]),
int(file_params[4]),
)
actual_results.append(is_anomaly_detected(f, 0.1, window_size, start, end))
expected_results.append(expected_type)
result = (
"anomaly"
if (num_anomalies_detected / (end - start + 1)) >= threshold
else "noanomaly"
)

self.assertListEqual(actual_results, expected_results)
self.assertEqual(result, expected_type)

def test_stream_score(self):

test_ts_mp_mulipliers = [1000, -1000, 1]
expected_flags = ["anomaly_higher_confidence", "anomaly_higher_confidence", "none"]

# Check time series JSON files in test_data
dir = "tests/seer/anomaly_detection/detectors/test_data/synthetic_series"
for filename in os.listdir(dir):
f = os.path.join(dir, filename)

if os.path.isfile(f):
if not os.path.isfile(f):
raise Exception("Filename is not a valid file")

file_params = filename.split(".")[0].split("_")
window_size = int(file_params[2])

# Load json and convert to ts and mp_dist
with open(f) as file:

data = json.load(file)
data = data["ts"]

ts_baseline = np.array([point["value"] for point in data], dtype=np.float64)
mp_dist_baseline = np.array(
[point["mp_dist"] for point in data], dtype=np.float64
)

sensitivity, direction = "", "" # TODO: Placeholders as values are not used

for i, multiplier in enumerate(test_ts_mp_mulipliers):
test_ts_val = ts_baseline[-1] * multiplier
test_mp_dist = mp_dist_baseline[-1] * abs(multiplier)

_, flag = self.scorer.stream_score(
test_ts_val,
test_mp_dist,
sensitivity,
direction,
window_size,
ts_baseline,
mp_dist_baseline,
)
timeseries, mp_dists, window_sizes = convert_synthetic_ts(
"tests/seer/anomaly_detection/test_data/synthetic_series", as_ts_datatype=False
)

for ts_baseline, mp_dist_baseline, window_size in zip(timeseries, mp_dists, window_sizes):
sensitivity, direction = "", "" # TODO: Placeholders as values are not used

for i, multiplier in enumerate(test_ts_mp_mulipliers):
test_ts_val = ts_baseline[-1] * multiplier
test_mp_dist = mp_dist_baseline[-1] * abs(multiplier)

_, flag = self.scorer.stream_score(
test_ts_val,
test_mp_dist,
sensitivity,
direction,
window_size,
ts_baseline,
mp_dist_baseline,
)

self.assertEqual(flag[0], expected_flags[i])
self.assertEqual(flag[0], expected_flags[i])
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
import json
import os
import unittest

import numpy as np

from seer.anomaly_detection.detectors.window_size_selectors import SuSSWindowSizeSelector

# from tests.seer.anomaly_detection.timeseries.timeseries import context
from tests.seer.anomaly_detection.test_utils import convert_synthetic_ts


class TestSuSSWindowSizeSelector(unittest.TestCase):
Expand All @@ -18,28 +13,17 @@ def test_optimal_window_size(self):

actual_windows = []

# Check time series JSON files in test_data
dir = "tests/seer/anomaly_detection/detectors/test_data/synthetic_series"
for filename in os.listdir(dir):
f = os.path.join(dir, filename)

if os.path.isfile(f):
if not os.path.isfile(f):
raise Exception("Filename is not a valid file")

# Load json and convert to ts and mp_dist
with open(f) as file:

data = json.load(file)
data = data["ts"]

ts = np.array([point["value"] for point in data], dtype=np.float64)
timeseries, _, window_sizes = convert_synthetic_ts(
"tests/seer/anomaly_detection/test_data/synthetic_series", as_ts_datatype=False
)

window = self.selector.optimal_window_size(ts)
actual_windows.append(window)
for ts, window_size in zip(timeseries, window_sizes):
window = self.selector.optimal_window_size(ts)
actual_windows.append(window_size)

# Check if window is within half a period
# Check if window is within n% of period
n = 0.6
period = 24 * 4

for window in actual_windows:
self.assertTrue(period / 2 <= window <= period * 1.5)
self.assertTrue(window - (period * n) <= window <= window + (period * n))
18 changes: 15 additions & 3 deletions tests/seer/anomaly_detection/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@


# Returns timeseries and mp_distances as lists of numpy arrays from the synthetic data
def convert_synthetic_ts(directory: str, as_ts_datatype: bool):
def convert_synthetic_ts(directory: str, as_ts_datatype: bool, include_range: bool = False):

timeseries = []
mp_dists = []
window_sizes = []
window_starts = []
window_ends = []
expected_types = []

# Load in time series JSON files in test_data
for filename in os.listdir(directory):
Expand All @@ -24,8 +27,12 @@ def convert_synthetic_ts(directory: str, as_ts_datatype: bool):
raise Exception("File is not a JSON file")

file_params = filename.split(".")[0].split("_")
print(filename, file_params)
window_size = int(file_params[2])
expected_type, window_size, start, end = (
file_params[1],
int(file_params[2]),
int(file_params[3]),
int(file_params[4]),
)

# Load json and convert to ts and mp_dist
with open(f) as file:
Expand All @@ -47,5 +54,10 @@ def convert_synthetic_ts(directory: str, as_ts_datatype: bool):
timeseries.append(ts)
mp_dists.append(mp_dist)
window_sizes.append(window_size)
window_starts.append(start)
window_ends.append(end)
expected_types.append(expected_type)

if include_range:
return expected_types, timeseries, mp_dists, window_sizes, window_starts, window_ends
return timeseries, mp_dists, window_sizes

0 comments on commit 55f2aa6

Please sign in to comment.