Skip to content

Commit

Permalink
Merge pull request #204 from Pluimvee/Adding-comments-and-reschuffle-…
Browse files Browse the repository at this point in the history
…code-without-functional-changes

Adding comments and reschuffle code without functional changes
  • Loading branch information
Roeland54 authored Oct 10, 2024
2 parents 0a4c9d3 + 2569b33 commit e604e67
Showing 1 changed file with 101 additions and 75 deletions.
176 changes: 101 additions & 75 deletions custom_components/entsoe/coordinator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import logging
from datetime import datetime, timedelta
from datetime import timedelta

import homeassistant.helpers.config_validation as cv
from homeassistant.core import HomeAssistant
Expand All @@ -18,6 +18,9 @@
MIN_HOURS = 20


# This class contains actually two main tasks
# 1. ENTSO: Refresh data from ENTSO on interval basis triggered by HASS every 60 minutes
# 2. ANALYSIS: Implement some analysis on this data, like min(), max(), avg(), perc(). Updated analysis is triggered by an explicit call from a sensor
class EntsoeCoordinator(DataUpdateCoordinator):
"""Get the latest data and update the states."""

Expand Down Expand Up @@ -62,7 +65,7 @@ def __init__(
update_interval=timedelta(minutes=60),
)

# calculate the price using the given template
# ENTSO: recalculate the price using the given template
def calc_price(self, value, fake_dt=None, no_template=False) -> float:
"""Calculate price based on the users settings."""
# Used to inject the current hour.
Expand Down Expand Up @@ -90,12 +93,13 @@ def inner(*args, **kwargs):

return price

# ENTSO: recalculate the price for each price
def parse_hourprices(self, hourprices):
for hour, price in hourprices.items():
hourprices[hour] = self.calc_price(value=price, fake_dt=hour)
return hourprices

# Called by HA every refresh interval (60 minutes)
# ENTSO: Triggered by HA to refresh the data (interval = 60 minutes)
async def _async_update_data(self) -> dict:
"""Get the latest data from ENTSO-e"""
self.logger.debug("ENTSO-e DataUpdateCoordinator data update")
Expand All @@ -104,7 +108,7 @@ async def _async_update_data(self) -> dict:
now = dt.now()
self.today = now.replace(hour=0, minute=0, second=0, microsecond=0)
if self.check_update_needed(now) is False:
self.logger.debug(f"Skipping api fetch. All data is already available")
self.logger.debug("Skipping api fetch. All data is already available")
return self.data

yesterday = self.today - timedelta(days=1)
Expand All @@ -125,6 +129,7 @@ async def _async_update_data(self) -> dict:
self.filtered_hourprices = self._filter_calculated_hourprices(parsed_data)
return parsed_data

# ENTSO: check if we need to refresh the data. If we have None, or less than 20hrs left for today, or less than 20hrs tomorrow and its after 11
def check_update_needed(self, now):
if self.data is None:
return True
Expand All @@ -134,6 +139,7 @@ def check_update_needed(self, now):
return True
return False

# ENTSO: new prices using an async job
async def fetch_prices(self, start_date, end_date):
try:
# run api_update in async job
Expand Down Expand Up @@ -161,49 +167,93 @@ async def fetch_prices(self, start_date, end_date):
f"Warning the integration doesn't have any up to date local data this means that entities won't get updated but access remains to restorable entities: {exc}."
)

# ENTSO: the async fetch job itself
def api_update(self, start_date, end_date, api_key):
client = EntsoeClient(api_key=api_key)
return client.query_day_ahead_prices(
country_code=self.area, start=start_date, end=end_date
)

async def get_energy_prices(self, start_date, end_date):
# check if we have the data already
if (
len(self.get_data(start_date)) > MIN_HOURS
and len(self.get_data(end_date)) > MIN_HOURS
):
self.logger.debug(f"return prices from coordinator cache.")
return {
k: v
for k, v in self.data.items()
if k.date() >= start_date.date() and k.date() <= end_date.date()
}
return self.parse_hourprices(await self.fetch_prices(start_date, end_date))
# ENTSO: Return the data for the given date
def get_data(self, date):
return {k: v for k, v in self.data.items() if k.date() == date.date()}

# ENTSO: Return the data for today
def get_data_today(self):
return self.get_data(self.today)

# ENTSO: Return the data for tomorrow
def get_data_tomorrow(self):
return self.get_data(self.today + timedelta(days=1))

# ENTSO: Return the data for yesterday
def get_data_yesterday(self):
return self.get_data(self.today - timedelta(days=1))

# SENSOR: Do we have data available for today
def today_data_available(self):
return len(self.get_data_today()) > MIN_HOURS

# this method is called by each sensor, each complete hour, and ensures the date and filtered hourprices are in line with the current time
# SENSOR: Get the current price
def get_current_hourprice(self) -> int:
return self.data[dt.now().replace(minute=0, second=0, microsecond=0)]

# SENSOR: Get the next hour price
def get_next_hourprice(self) -> int:
return self.data[
dt.now().replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
]

# SENSOR: Get timestamped prices of today as attribute for Average Sensor
def get_prices_today(self):
return self.get_timestamped_prices(self.get_data_today())

# SENSOR: Get timestamped prices of tomorrow as attribute for Average Sensor
def get_prices_tomorrow(self):
return self.get_timestamped_prices(self.get_data_tomorrow())

# SENSOR: Get timestamped prices of today & tomorrow or yesterday & today as attribute for Average Sensor
def get_prices(self):
if len(self.data) > 48:
return self.get_timestamped_prices(
{hour: price for hour, price in self.data.items() if hour >= self.today}
)
return self.get_timestamped_prices(
{
hour: price
for hour, price in self.data.items()
if hour >= self.today - timedelta(days=1)
}
)

# SENSOR: Timestamp the prices
def get_timestamped_prices(self, hourprices):
list = []
for hour, price in hourprices.items():
str_hour = str(hour)
list.append({"time": str_hour, "price": price})
return list

# --------------------------------------------------------------------------------------------------------------------------------
# ANALYSIS: this method is called by each sensor, each complete hour, and ensures the date and filtered hourprices are in line with the current time
# we could still optimize as not every calculator mode needs hourly updates
def sync_calculator(self):
now = dt.now()
if (
self.calculator_last_sync is None
or self.calculator_last_sync.hour != now.hour
):
self.logger.debug(
f"The calculator needs to be synced with the current time"
)
self.logger.debug("The calculator needs to be synced with the current time")
if self.today.date() != now.date():
self.logger.debug(
f"new day detected: update today and filtered hourprices"
"new day detected: update today and filtered hourprices"
)
self.today = now.replace(hour=0, minute=0, second=0, microsecond=0)
self.filtered_hourprices = self._filter_calculated_hourprices(self.data)

self.calculator_last_sync = now

# ANALYSIS: filter the hourprices on which to apply the calculations based on the calculation_mode
def _filter_calculated_hourprices(self, data):
# rotation = calculations made upon 24hrs today
if self.calculation_mode == CALCULATION_MODE["rotation"]:
Expand All @@ -227,77 +277,53 @@ def _filter_calculated_hourprices(self, data):
if hour >= self.today - timedelta(days=1)
}

def get_prices_today(self):
return self.get_timestamped_prices(self.get_data_today())

def get_prices_tomorrow(self):
return self.get_timestamped_prices(self.get_data_tomorrow())

def get_prices(self):
if len(self.data) > 48:
return self.get_timestamped_prices(
{hour: price for hour, price in self.data.items() if hour >= self.today}
)
return self.get_timestamped_prices(
{
hour: price
for hour, price in self.data.items()
if hour >= self.today - timedelta(days=1)
}
)

def get_data(self, date):
return {k: v for k, v in self.data.items() if k.date() == date.date()}

def get_data_today(self):
return {k: v for k, v in self.data.items() if k.date() == self.today.date()}

def get_data_tomorrow(self):
return {
k: v
for k, v in self.data.items()
if k.date() == self.today.date() + timedelta(days=1)
}

def get_next_hourprice(self) -> int:
return self.data[
dt.now().replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
]

def get_current_hourprice(self) -> int:
return self.data[dt.now().replace(minute=0, second=0, microsecond=0)]

def get_avg_price(self):
return round(
sum(self.filtered_hourprices.values())
/ len(self.filtered_hourprices.values()),
5,
)

# ANALYSIS: Get max price in filtered period
def get_max_price(self):
return max(self.filtered_hourprices.values())

# ANALYSIS: Get min price in filtered period
def get_min_price(self):
return min(self.filtered_hourprices.values())

# ANALYSIS: Get timestamp of max price in filtered period
def get_max_time(self):
return max(self.filtered_hourprices, key=self.filtered_hourprices.get)

# ANALYSIS: Get timestamp of min price in filtered period
def get_min_time(self):
return min(self.filtered_hourprices, key=self.filtered_hourprices.get)

# ANALYSIS: Get avg price in filtered period
def get_avg_price(self):
return round(
sum(self.filtered_hourprices.values())
/ len(self.filtered_hourprices.values()),
5,
)

# ANALYSIS: Get percentage of current price relative to maximum of filtered period
def get_percentage_of_max(self):
return round(self.get_current_hourprice() / self.get_max_price() * 100, 1)

# ANALYSIS: Get percentage of current price relative to spread (max-min) of filtered period
def get_percentage_of_range(self):
min = self.get_min_price()
spread = self.get_max_price() - min
current = self.get_current_hourprice() - min
return round(current / spread * 100, 1)

def get_timestamped_prices(self, hourprices):
list = []
for hour, price in hourprices.items():
str_hour = str(hour)
list.append({"time": str_hour, "price": price})
return list
# --------------------------------------------------------------------------------------------------------------------------------
# SERVICES: returns data from the coordinator cache, or directly from ENTSO when not availble
async def get_energy_prices(self, start_date, end_date):
# check if we have the data already
if (
len(self.get_data(start_date)) > MIN_HOURS
and len(self.get_data(end_date)) > MIN_HOURS
):
self.logger.debug("return prices from coordinator cache.")
return {
k: v
for k, v in self.data.items()
if k.date() >= start_date.date() and k.date() <= end_date.date()
}
return self.parse_hourprices(await self.fetch_prices(start_date, end_date))

0 comments on commit e604e67

Please sign in to comment.