diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 1cb7e17..73bb111 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -35,7 +35,7 @@ jobs: cache: 'pip' - name: Install dependencies run: | - python3 -m pip install --upgrade pip + python3 -m pip install --upgrade pip python3 -m pip install '.[test]' - name: Lint with flake8 run: | diff --git a/cats/CI_api_interface.py b/cats/CI_api_interface.py index 2423648..a5d4076 100644 --- a/cats/CI_api_interface.py +++ b/cats/CI_api_interface.py @@ -85,6 +85,6 @@ def invalid_code(r: dict) -> bool: "carbonintensity.org.uk": APIInterface( get_request_url=ciuk_request_url, parse_response_data=ciuk_parse_response_data, - max_duration=2880, # 48h + max_duration=2850, # 48h - 30min so that carbon intensity is defined over the last interval ), } diff --git a/cats/configure.py b/cats/configure.py index 0c58395..f3bb993 100644 --- a/cats/configure.py +++ b/cats/configure.py @@ -13,7 +13,7 @@ import logging import sys from collections.abc import Mapping -from typing import Optional, Any +from typing import Any, Optional import requests import yaml @@ -24,9 +24,9 @@ __all__ = ["get_runtime_config"] -def get_runtime_config(args) -> tuple[APIInterface, str, int, - Optional[list[tuple[int, float]]], - Optional[float]]: +def get_runtime_config( + args, +) -> tuple[APIInterface, str, int, Optional[list[tuple[int, float]]], Optional[float]]: """Return the runtime cats configuration from list of command line arguments and content of configuration file. diff --git a/cats/forecast.py b/cats/forecast.py index dd3cd68..58b65a1 100644 --- a/cats/forecast.py +++ b/cats/forecast.py @@ -13,6 +13,9 @@ class CarbonIntensityPointEstimate: value: float # the first attribute is used automatically for sorting methods datetime: datetime + def __repr__(self): + return f"{self.datetime.isoformat()}\t{self.value}" + @dataclass(order=True) class CarbonIntensityAverageEstimate: @@ -62,10 +65,10 @@ def bisect_right(data, t): # def bisect_left(data, t): for i, d in enumerate(data): - if d.datetime >= t: - return i + if d.datetime + self.data_stepsize >= t: + return i + 1 - self.ndata = bisect_left(self.data, self.end) + 1 + self.ndata = bisect_left(self.data, self.end) # window size def __getitem__(self, index: int) -> CarbonIntensityAverageEstimate: """Return the average of timeseries data from index over the @@ -84,24 +87,30 @@ def __getitem__(self, index: int) -> CarbonIntensityAverageEstimate: # intensity is interpolated between the first (index) and # second data point (index + 1) in the window. The ending # intensity value is interpolated between the last and - # penultimate data points in he window. + # penultimate data points in the window. window_start = self.start + index * self.data_stepsize window_end = self.end + index * self.data_stepsize + + # lbound: carbon intensity point estimate at window start lbound = self.interp( self.data[index], self.data[index + 1], when=window_start, ) - rbound = self.interp( - self.data[index + self.ndata - 2], - self.data[index + self.ndata - 1], - when=window_end, - ) + # rbound: carbon intensity point estimate at window end + # Handle case when last data point exactly matches last carbon intensity, + # so there is no further data point to interpolate from. + if index + self.ndata == len(self.data): + rbound = self.data[-1] + else: + rbound = self.interp( + self.data[index + self.ndata - 1], + self.data[index + self.ndata], + when=window_end, + ) # window_data <- [lbound] + [...bulk...] + [rbound] where # lbound and rbound are interpolated intensity values. - window_data = ( - [lbound] + self.data[index + 1 : index + self.ndata - 1] + [rbound] - ) + window_data = [lbound] + self.data[index + 1 : index + self.ndata] + [rbound] acc = [ 0.5 * (a.value + b.value) * (b.datetime - a.datetime).total_seconds() for a, b in zip(window_data[:-1], window_data[1:]) @@ -138,4 +147,4 @@ def __iter__(self): yield self.__getitem__(index) def __len__(self): - return len(self.data) - self.ndata + return len(self.data) - self.ndata + 1 diff --git a/tests/test_windowed_forecast.py b/tests/test_windowed_forecast.py index ffbdf08..c20a703 100644 --- a/tests/test_windowed_forecast.py +++ b/tests/test_windowed_forecast.py @@ -4,6 +4,7 @@ from pathlib import Path from zoneinfo import ZoneInfo +import pytest from numpy.testing import assert_allclose from cats.forecast import ( @@ -23,16 +24,36 @@ for i in range(NDATA) ] -TEST_DATA = Path(__file__).parent / "carbon_intensity_24h.csv" + +@pytest.fixture(scope="session") +def sample_data(): + with open(Path(__file__).parent / "carbon_intensity_24h.csv", "r") as f: + csvfile = csv.reader(f, delimiter=",") + next(csvfile) # Skip header line + data = [ + CarbonIntensityPointEstimate( + datetime=datetime.fromisoformat(datestr[:-1] + "+00:00"), + value=float(intensity_value), + ) + for datestr, _, _, intensity_value in csvfile + ] + return data + + +def test_repr(): + est = CarbonIntensityPointEstimate( + datetime=datetime.fromisoformat("2023-05-04T12:30+00:00"), value=12.0 + ) + assert repr(est) == "2023-05-04T12:30:00+00:00\t12.0" def test_has_right_length(): window_size = 160 # In number of time intervals wf = WindowedForecast(DATA, window_size, start=DATA[0].datetime) - # Expecting (200 - 160 - 1) (39) data points in the time + # Expecting (200 - 160 + 1) (41) data points in the time # integrated timeseries. - assert len(wf) == NDATA - window_size - 1 + assert len(wf) == NDATA - window_size + 1 def test_values(): @@ -46,7 +67,7 @@ def test_values(): wf = WindowedForecast(DATA, window_size, start=DATA[0].datetime) expected = [ math.cos((i + window_size) * step) - math.cos(i * step) - for i in range(len(DATA) - window_size - 1) + for i in range(len(DATA) - window_size + 1) ] # average expected = [e / (window_size * step) for e in expected] @@ -54,94 +75,78 @@ def test_values(): assert_allclose(actual=[p.value for p in wf], desired=expected, rtol=0.01) -def test_minimise_average(): - with open(TEST_DATA, "r") as f: - csvfile = csv.reader(f, delimiter=",") - next(csvfile) # Skip header line - data = [ - CarbonIntensityPointEstimate( - datetime=datetime.fromisoformat(datestr[:-1] + "+00:00"), - value=float(intensity_value), - ) - for datestr, _, _, intensity_value in csvfile - ] +def test_minimise_average(sample_data): + window_size = 6 + # Data points separated by 30 minutes intervals + duration = window_size * 30 + result = min(WindowedForecast(sample_data, duration, start=sample_data[0].datetime)) - window_size = 6 - # Data points separated by 30 minutes intervals - duration = window_size * 30 - result = min(WindowedForecast(data, duration, start=data[0].datetime)) - - # Intensity point estimates over best runtime period - v = [10, 8, 7, 7, 5, 8, 8] - expected = CarbonIntensityAverageEstimate( - start=datetime.fromisoformat("2023-05-05T12:00+00:00"), - end=datetime.fromisoformat("2023-05-05T15:00+00:00"), - value=sum([0.5 * (a + b) for a, b in zip(v[:-1], v[1:])]) / window_size, - ) - assert result == expected + # Intensity point estimates over best runtime period + v = [10, 8, 7, 7, 5, 8, 8] + expected = CarbonIntensityAverageEstimate( + start=datetime.fromisoformat("2023-05-05T12:00+00:00"), + end=datetime.fromisoformat("2023-05-05T15:00+00:00"), + value=sum([0.5 * (a + b) for a, b in zip(v[:-1], v[1:])]) / window_size, + ) + assert result == expected -def test_minimise_average_bst(): +def test_maximum_duration(sample_data): + window_size = 95 # corresponds to 2850 minutes + # Data points separated by 30 minutes intervals + duration = window_size * 30 + result = min(WindowedForecast(sample_data, duration, start=sample_data[0].datetime)) + + # Intensity point estimates over best runtime period + # In this case, the entire period is selected which is 2850 minutes long + v = [s.value for s in sample_data] + expected = CarbonIntensityAverageEstimate( + start=datetime.fromisoformat("2023-05-04T12:30+00:00"), + end=datetime.fromisoformat("2023-05-06T12:00+00:00"), + value=sum([0.5 * (a + b) for a, b in zip(v[:-1], v[1:])]) / window_size, + ) + assert result == expected + + +def test_minimise_average_bst(sample_data): # We should get a start time in BST if we provide the starting time # in that timezone, even if the intensity estimate is in UTC. This # is needed as the `at` command works in local system time (and that's # what we put in) - with open(TEST_DATA, "r") as f: - csvfile = csv.reader(f, delimiter=",") - next(csvfile) # Skip header line - data = [ - CarbonIntensityPointEstimate( - datetime=datetime.fromisoformat(datestr[:-1] + "+00:00"), - value=float(intensity_value), - ) - for datestr, _, _, intensity_value in csvfile - ] + window_size = 6 + # Data points separated by 30 minutes intervals + duration = window_size * 30 + start_time_bst = sample_data[0].datetime.replace( + tzinfo=timezone(timedelta(seconds=-3600)) + ) + result = min(WindowedForecast(sample_data, duration, start=start_time_bst)) - window_size = 6 - # Data points separated by 30 minutes intervals - duration = window_size * 30 - start_time_bst = data[0].datetime.replace( - tzinfo=timezone(timedelta(seconds=-3600)) - ) - result = min(WindowedForecast(data, duration, start=start_time_bst)) - - # Intensity point estimates over best runtime period - v = [10, 8, 7, 7, 5, 8, 8] - expected = CarbonIntensityAverageEstimate( - start=datetime.fromisoformat("2023-05-05T11:00-01:00"), - end=datetime.fromisoformat("2023-05-05T14:00-01:00"), - value=sum([0.5 * (a + b) for a, b in zip(v[:-1], v[1:])]) / window_size, - ) - assert result == expected - assert result.start.tzinfo == expected.start.tzinfo - assert result.end.tzinfo == expected.end.tzinfo + # Intensity point estimates over best runtime period + v = [10, 8, 7, 7, 5, 8, 8] + expected = CarbonIntensityAverageEstimate( + start=datetime.fromisoformat("2023-05-05T11:00-01:00"), + end=datetime.fromisoformat("2023-05-05T14:00-01:00"), + value=sum([0.5 * (a + b) for a, b in zip(v[:-1], v[1:])]) / window_size, + ) + assert result == expected + assert result.start.tzinfo == expected.start.tzinfo + assert result.end.tzinfo == expected.end.tzinfo -def test_average_intensity_now(): - with open(TEST_DATA, "r") as f: - csvfile = csv.reader(f, delimiter=",") - next(csvfile) # Skip header line - data = [ - CarbonIntensityPointEstimate( - datetime=datetime.fromisoformat(datestr[:-1] + "+00:00"), - value=float(intensity_value), - ) - for datestr, _, _, intensity_value in csvfile - ] +def test_average_intensity_now(sample_data): + window_size = 11 + # Data points separated by 30 minutes intervals + duration = window_size * 30 + result = WindowedForecast(sample_data, duration, start=sample_data[0].datetime)[0] - window_size = 11 - # Data points separated by 30 minutes intervals - duration = window_size * 30 - result = WindowedForecast(data, duration, start=data[0].datetime)[0] - - # Intensity point estimates over best runtime period - v = [p.value for p in data[: window_size + 1]] - expected = CarbonIntensityAverageEstimate( - start=data[0].datetime, - end=data[window_size].datetime, - value=sum([0.5 * (a + b) for a, b in zip(v[:-1], v[1:])]) / window_size, - ) - assert result == expected + # Intensity point estimates over best runtime period + v = [p.value for p in sample_data[: window_size + 1]] + expected = CarbonIntensityAverageEstimate( + start=sample_data[0].datetime, + end=sample_data[window_size].datetime, + value=sum([0.5 * (a + b) for a, b in zip(v[:-1], v[1:])]) / window_size, + ) + assert result == expected def test_average_intensity_with_offset(): @@ -187,81 +192,59 @@ def test_average_intensity_with_offset(): assert result == expected -def test_average_intensity_with_offset_long_job(): +def test_average_intensity_with_offset_long_job(sample_data): # Case where job start and end time are not colocated with data # carbon intensity data points. In this case cats interpolate the # intensity value at beginning and end of each potential job # duration window. - with open(TEST_DATA, "r") as f: - csvfile = csv.reader(f, delimiter=",") - next(csvfile) # Skip header line - data = [ - CarbonIntensityPointEstimate( - datetime=datetime.fromisoformat(datestr[:-1] + "+00:00"), - value=float(intensity_value), - ) - for datestr, _, _, intensity_value in csvfile - ] - - duration = 194 # in minutes - # First available data point is for 12:30 but the job - # starts 18 minutes later. - # Start time in BST - job_start = datetime.fromisoformat("2023-05-04T13:48+01:00") - result = WindowedForecast(data, duration, start=job_start)[2] - - # First and last element in v are interpolated intensity value. - # e.g v[0] = 15 + 18min * (18 - 15) / 30min = 16.8 - v = [16.8, 18, 19, 17, 16, 11, 11, 11, 11] - data_timestep = data[1].datetime - data[0].datetime # 30 minutes - expected = CarbonIntensityAverageEstimate( - start=job_start + 2 * data_timestep, - end=job_start + 2 * data_timestep + timedelta(minutes=duration), - value=( - 0.5 * (v[0] + v[1]) * 12 - + sum([0.5 * (a + b) * 30 for a, b in zip(v[1:-2], v[2:-1])]) - + 0.5 * (v[7] + v[8]) * 2 - ) - / duration, + duration = 194 # in minutes + # First available data point is for 12:30 but the job + # starts 18 minutes later. + # Start time in BST + job_start = datetime.fromisoformat("2023-05-04T13:48+01:00") + result = WindowedForecast(sample_data, duration, start=job_start)[2] + + # First and last element in v are interpolated intensity value. + # e.g v[0] = 15 + 18min * (18 - 15) / 30min = 16.8 + v = [16.8, 18, 19, 17, 16, 11, 11, 11, 11] + data_timestep = sample_data[1].datetime - sample_data[0].datetime # 30 minutes + expected = CarbonIntensityAverageEstimate( + start=job_start + 2 * data_timestep, + end=job_start + 2 * data_timestep + timedelta(minutes=duration), + value=( + 0.5 * (v[0] + v[1]) * 12 + + sum([0.5 * (a + b) * 30 for a, b in zip(v[1:-2], v[2:-1])]) + + 0.5 * (v[7] + v[8]) * 2 ) - assert result == expected - assert result.start.tzinfo == expected.start.tzinfo - assert result.end.tzinfo == expected.end.tzinfo + / duration, + ) + assert result == expected + assert result.start.tzinfo == expected.start.tzinfo + assert result.end.tzinfo == expected.end.tzinfo -def test_average_intensity_with_offset_short_job(): +def test_average_intensity_with_offset_short_job(sample_data): # Case where job is short: start and end time fall between two # consecutive data points (first and second). - with open(TEST_DATA, "r") as f: - csvfile = csv.reader(f, delimiter=",") - next(csvfile) # Skip header line - data = [ - CarbonIntensityPointEstimate( - datetime=datetime.fromisoformat(datestr[:-1] + "+00:00"), - value=float(intensity_value), - ) - for datestr, _, _, intensity_value in csvfile - ] - - duration = 6 # in minutes - # First available data point is for 12:30 but the job - # starts 6 minutes later. - job_start = datetime.fromisoformat("2023-05-04T12:48+00:00") - result = WindowedForecast(data, duration, start=job_start)[2] - - # Job starts at 12:48 and ends at 12:54. For each candidate - # running window, both start and end times fall between two - # consecutive data points (e.g. 13:30 and 14:00 for the third - # window). - # - # First and second element in v are interpolated intensity - # values. e.g v[0] = 15 + 18min * (18 - 15) / 30min = 16.8 - # and v[1] = v[-1] = 15 + 24min * (18 - 15) / 30min = 17.4 - v = [16.8, 17.4] - data_timestep = data[1].datetime - data[0].datetime - expected = CarbonIntensityAverageEstimate( - start=job_start + 2 * data_timestep, - end=job_start + 2 * data_timestep + timedelta(minutes=duration), - value=sum([0.5 * (a + b) for a, b in zip(v[:-1], v[1:])]) / (len(v) - 1), - ) - assert result == expected + duration = 6 # in minutes + # First available data point is for 12:30 but the job + # starts 6 minutes later. + job_start = datetime.fromisoformat("2023-05-04T12:48+00:00") + result = WindowedForecast(sample_data, duration, start=job_start)[2] + + # Job starts at 12:48 and ends at 12:54. For each candidate + # running window, both start and end times fall between two + # consecutive data points (e.g. 13:30 and 14:00 for the third + # window). + # + # First and second element in v are interpolated intensity + # values. e.g v[0] = 15 + 18min * (18 - 15) / 30min = 16.8 + # and v[1] = v[-1] = 15 + 24min * (18 - 15) / 30min = 17.4 + v = [16.8, 17.4] + data_timestep = sample_data[1].datetime - sample_data[0].datetime + expected = CarbonIntensityAverageEstimate( + start=job_start + 2 * data_timestep, + end=job_start + 2 * data_timestep + timedelta(minutes=duration), + value=sum([0.5 * (a + b) for a, b in zip(v[:-1], v[1:])]) / (len(v) - 1), + ) + assert result == expected