-
Notifications
You must be signed in to change notification settings - Fork 45
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'feature-branch-timeseries-metrics' into issue-638-seque…
…nce-similarity
- Loading branch information
Showing
4 changed files
with
292 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
"""InterRowMSAS module.""" | ||
|
||
import warnings | ||
|
||
import numpy as np | ||
import pandas as pd | ||
|
||
from sdmetrics.goal import Goal | ||
from sdmetrics.single_column.statistical.kscomplement import KSComplement | ||
|
||
|
||
class InterRowMSAS: | ||
"""Inter-Row Multi-Sequence Aggregate Similarity (MSAS) metric. | ||
Attributes: | ||
name (str): | ||
Name to use when reports about this metric are printed. | ||
goal (sdmetrics.goal.Goal): | ||
The goal of this metric. | ||
min_value (Union[float, tuple[float]]): | ||
Minimum value or values that this metric can take. | ||
max_value (Union[float, tuple[float]]): | ||
Maximum value or values that this metric can take. | ||
""" | ||
|
||
name = 'Inter-Row Multi-Sequence Aggregate Similarity' | ||
goal = Goal.MAXIMIZE | ||
min_value = 0.0 | ||
max_value = 1.0 | ||
|
||
@staticmethod | ||
def compute(real_data, synthetic_data, n_rows_diff=1, apply_log=False): | ||
"""Compute this metric. | ||
This metric compares the inter-row differences of sequences in the real data | ||
vs. the synthetic data. | ||
It works as follows: | ||
- Calculate the difference between row r and row r+x for each row in the real data | ||
- Take the average over each sequence to form a distribution D_r | ||
- Do the same for the synthetic data to form a new distribution D_s | ||
- Apply the KSComplement metric to compare the similarities of (D_r, D_s) | ||
- Return this score | ||
Args: | ||
real_data (tuple[pd.Series, pd.Series]): | ||
A tuple of 2 pandas.Series objects. The first represents the sequence key | ||
of the real data and the second represents a continuous column of data. | ||
synthetic_data (tuple[pd.Series, pd.Series]): | ||
A tuple of 2 pandas.Series objects. The first represents the sequence key | ||
of the synthetic data and the second represents a continuous column of data. | ||
n_rows_diff (int): | ||
An integer representing the number of rows to consider when taking the difference. | ||
apply_log (bool): | ||
Whether to apply a natural log before taking the difference. | ||
Returns: | ||
float: | ||
The similarity score between the real and synthetic data distributions. | ||
""" | ||
for data in [real_data, synthetic_data]: | ||
if ( | ||
not isinstance(data, tuple) | ||
or len(data) != 2 | ||
or (not (isinstance(data[0], pd.Series) and isinstance(data[1], pd.Series))) | ||
): | ||
raise ValueError('The data must be a tuple of two pandas series.') | ||
|
||
if not isinstance(n_rows_diff, int) or n_rows_diff < 1: | ||
raise ValueError("'n_rows_diff' must be an integer greater than zero.") | ||
|
||
if not isinstance(apply_log, bool): | ||
raise ValueError("'apply_log' must be a boolean.") | ||
|
||
real_keys, real_values = real_data | ||
synthetic_keys, synthetic_values = synthetic_data | ||
|
||
if apply_log: | ||
real_values = np.log(real_values) | ||
synthetic_values = np.log(synthetic_values) | ||
|
||
def calculate_differences(keys, values, n_rows_diff, data_name): | ||
group_sizes = values.groupby(keys).size() | ||
num_invalid_groups = group_sizes[group_sizes <= n_rows_diff].count() | ||
if num_invalid_groups > 0: | ||
warnings.warn( | ||
f"n_rows_diff '{n_rows_diff}' is greater than the " | ||
f'size of {num_invalid_groups} sequence keys in {data_name}.' | ||
) | ||
|
||
differences = values.groupby(keys).apply( | ||
lambda group: np.mean( | ||
group.to_numpy()[n_rows_diff:] - group.to_numpy()[:-n_rows_diff] | ||
) | ||
if len(group) > n_rows_diff | ||
else np.nan | ||
) | ||
|
||
return pd.Series(differences) | ||
|
||
real_diff = calculate_differences(real_keys, real_values, n_rows_diff, 'real_data') | ||
synthetic_diff = calculate_differences( | ||
synthetic_keys, synthetic_values, n_rows_diff, 'synthetic_data' | ||
) | ||
|
||
return KSComplement.compute(real_diff, synthetic_diff) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
import pandas as pd | ||
import pytest | ||
|
||
from sdmetrics.timeseries.inter_row_msas import InterRowMSAS | ||
|
||
|
||
class TestInterRowMSAS: | ||
def test_compute(self): | ||
"""Test it runs.""" | ||
# Setup | ||
real_keys = pd.Series(['id1', 'id1', 'id1', 'id2', 'id2', 'id2']) | ||
real_values = pd.Series([1, 2, 3, 4, 5, 6]) | ||
synthetic_keys = pd.Series(['id3', 'id3', 'id3', 'id4', 'id4', 'id4']) | ||
synthetic_values = pd.Series([1, 10, 3, 7, 5, 1]) | ||
|
||
# Run | ||
score = InterRowMSAS.compute( | ||
real_data=(real_keys, real_values), synthetic_data=(synthetic_keys, synthetic_values) | ||
) | ||
|
||
# Assert | ||
assert score == 0.5 | ||
|
||
def test_compute_identical_sequences(self): | ||
"""Test it returns 1 when real and synthetic data are identical.""" | ||
# Setup | ||
real_keys = pd.Series(['id1', 'id1', 'id1', 'id2', 'id2', 'id2']) | ||
real_values = pd.Series([1, 2, 3, 4, 5, 6]) | ||
synthetic_keys = pd.Series(['id1', 'id1', 'id1', 'id2', 'id2', 'id2']) | ||
synthetic_values = pd.Series([1, 2, 3, 4, 5, 6]) | ||
|
||
# Run | ||
score = InterRowMSAS.compute( | ||
real_data=(real_keys, real_values), synthetic_data=(synthetic_keys, synthetic_values) | ||
) | ||
|
||
# Assert | ||
assert score == 1 | ||
|
||
def test_compute_different_sequences(self): | ||
"""Test it for distinct distributions.""" | ||
# Setup | ||
real_keys = pd.Series(['id1', 'id1', 'id1', 'id2', 'id2', 'id2']) | ||
real_values = pd.Series([1, 2, 3, 4, 5, 6]) | ||
synthetic_keys = pd.Series(['id1', 'id1', 'id1', 'id2', 'id2', 'id2']) | ||
synthetic_values = pd.Series([1, 3, 5, 2, 4, 6]) | ||
|
||
# Run | ||
score = InterRowMSAS.compute( | ||
real_data=(real_keys, real_values), synthetic_data=(synthetic_keys, synthetic_values) | ||
) | ||
|
||
# Assert | ||
assert score == 0 | ||
|
||
def test_compute_with_log(self): | ||
"""Test it with logarithmic transformation.""" | ||
# Setup | ||
real_keys = pd.Series(['id1', 'id1', 'id1', 'id2', 'id2', 'id2']) | ||
real_values = pd.Series([1, 2, 4, 8, 16, 32]) | ||
synthetic_keys = pd.Series(['id1', 'id1', 'id1', 'id2', 'id2', 'id2']) | ||
synthetic_values = pd.Series([1, 2, 4, 8, 16, 32]) | ||
|
||
# Run | ||
score = InterRowMSAS.compute( | ||
real_data=(real_keys, real_values), | ||
synthetic_data=(synthetic_keys, synthetic_values), | ||
apply_log=True, | ||
) | ||
|
||
# Assert | ||
assert score == 1 | ||
|
||
def test_compute_different_n_rows_diff(self): | ||
"""Test it with different n_rows_diff.""" | ||
# Setup | ||
real_keys = pd.Series(['id1'] * 10 + ['id2'] * 10) | ||
real_values = pd.Series(list(range(10)) + list(range(10))) | ||
synthetic_keys = pd.Series(['id1'] * 10 + ['id2'] * 10) | ||
synthetic_values = pd.Series(list(range(10)) + list(range(10))) | ||
|
||
# Run | ||
score = InterRowMSAS.compute( | ||
real_data=(real_keys, real_values), | ||
synthetic_data=(synthetic_keys, synthetic_values), | ||
n_rows_diff=3, | ||
) | ||
|
||
# Assert | ||
assert score == 1 | ||
|
||
def test_compute_invalid_real_data(self): | ||
"""Test that it raises ValueError when real_data is invalid.""" | ||
# Setup | ||
real_data = [[1, 2, 3], [4, 5, 6]] # Not a tuple of pandas Series | ||
synthetic_keys = pd.Series(['id1', 'id1', 'id2', 'id2']) | ||
synthetic_values = pd.Series([1, 2, 3, 4]) | ||
|
||
# Run and Assert | ||
with pytest.raises(ValueError, match='The data must be a tuple of two pandas series.'): | ||
InterRowMSAS.compute( | ||
real_data=real_data, | ||
synthetic_data=(synthetic_keys, synthetic_values), | ||
n_rows_diff=1, | ||
apply_log=False, | ||
) | ||
|
||
def test_compute_invalid_synthetic_data(self): | ||
"""Test that it raises ValueError when synthetic_data is invalid.""" | ||
# Setup | ||
real_keys = pd.Series(['id1', 'id1', 'id2', 'id2']) | ||
real_values = pd.Series([1, 2, 3, 4]) | ||
synthetic_data = [[1, 2, 3], [4, 5, 6]] # Not a tuple of pandas Series | ||
|
||
# Run and Assert | ||
with pytest.raises(ValueError, match='The data must be a tuple of two pandas series.'): | ||
InterRowMSAS.compute( | ||
real_data=(real_keys, real_values), | ||
synthetic_data=synthetic_data, | ||
n_rows_diff=1, | ||
apply_log=False, | ||
) | ||
|
||
def test_compute_invalid_n_rows_diff(self): | ||
"""Test that it raises ValueError when n_rows_diff is invalid.""" | ||
# Setup | ||
real_keys = pd.Series(['id1', 'id1', 'id2', 'id2']) | ||
real_values = pd.Series([1, 2, 3, 4]) | ||
synthetic_keys = pd.Series(['id3', 'id3', 'id4', 'id4']) | ||
synthetic_values = pd.Series([1, 2, 3, 4]) | ||
|
||
# Run and Assert | ||
with pytest.raises(ValueError, match="'n_rows_diff' must be an integer greater than zero."): | ||
InterRowMSAS.compute( | ||
real_data=(real_keys, real_values), | ||
synthetic_data=(synthetic_keys, synthetic_values), | ||
n_rows_diff=0, | ||
apply_log=False, | ||
) | ||
|
||
def test_compute_invalid_apply_log(self): | ||
"""Test that it raises ValueError when apply_log is invalid.""" | ||
# Setup | ||
real_keys = pd.Series(['id1', 'id1', 'id2', 'id2']) | ||
real_values = pd.Series([1, 2, 3, 4]) | ||
synthetic_keys = pd.Series(['id1', 'id1', 'id2', 'id2']) | ||
synthetic_values = pd.Series([1, 2, 3, 4]) | ||
|
||
# Run and Assert | ||
with pytest.raises(ValueError, match="'apply_log' must be a boolean."): | ||
InterRowMSAS.compute( | ||
real_data=(real_keys, real_values), | ||
synthetic_data=(synthetic_keys, synthetic_values), | ||
n_rows_diff=1, | ||
apply_log='True', # Should be a boolean, not a string | ||
) | ||
|
||
def test_compute_warning(self): | ||
"""Test a warning is raised when n_rows_diff is greater than sequence values size.""" | ||
# Setup | ||
real_keys = pd.Series(['id1', 'id1', 'id1', 'id2', 'id2', 'id2']) | ||
real_values = pd.Series([1, 2, 3, 4, 5, 6]) | ||
synthetic_keys = pd.Series(['id3', 'id3', 'id3', 'id4', 'id4', 'id4']) | ||
synthetic_values = pd.Series([1, 10, 3, 7, 5, 1]) | ||
|
||
# Run and Assert | ||
warn_msg = "n_rows_diff '10' is greater than the size of 2 sequence keys in real_data." | ||
with pytest.warns(UserWarning, match=warn_msg): | ||
score = InterRowMSAS.compute( | ||
real_data=(real_keys, real_values), | ||
synthetic_data=(synthetic_keys, synthetic_values), | ||
n_rows_diff=10, | ||
) | ||
|
||
# Assert | ||
assert pd.isna(score) |