diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index feeb729fd..9518dd399 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -17,6 +17,9 @@ jobs: ADDITIONAL_INSTALLED_APPS: smbackend_turku,ptv PTV_ID_OFFSET: 10000000 LAM_COUNTER_API_BASE_URL: https://tie.digitraffic.fi/api/tms/v1/history + ECO_COUNTER_STATIONS_URL: https://dev.turku.fi/datasets/ecocounter/liikennelaskimet.geojson + ECO_COUNTER_OBSERVATIONS_URL: https://data.turku.fi/cjtv3brqr7gectdv7rfttc/counters-15min.csv + TRAFFIC_COUNTER_OBSERVATIONS_BASE_URL: https://data.turku.fi/2yxpk2imqi2mzxpa6e6knq/ steps: - uses: actions/checkout@v3 diff --git a/eco_counter/README.md b/eco_counter/README.md index 32bdcad68..c87f88554 100644 --- a/eco_counter/README.md +++ b/eco_counter/README.md @@ -34,6 +34,13 @@ e.g. ./manage.py import_counter_data --counters EC TC Counter names are: EC (Eco Counter), TC (Traffic Counter), LC (Lam Counter) and TR (Telraam Counter). Note, Traffic Counter data is updated once a week and Lam Counter data once a day. +## Deleting data +To delete data use the delete_counter_data management command. +e.g. to delete all Lam Counter data type: +``` +./manage.py delete_counter_data --counters LC +``` + ### Importing Telraam raw data In order to import Telraam data into the database the raw data has to be imported. The raw data is imported with the _import_telraam_to_csv_ management command. The imported should be set to be run once a hour (see: https://github.com/City-of-Turku/smbackend/wiki/Celery-Tasks#telraam-to-csv-eco_countertasksimport_telraam_to_csv ) diff --git a/eco_counter/api/serializers.py b/eco_counter/api/serializers.py index ea095df18..685c594dd 100644 --- a/eco_counter/api/serializers.py +++ b/eco_counter/api/serializers.py @@ -1,3 +1,5 @@ +from datetime import date, timedelta + from django.db.models import Q from rest_framework import serializers @@ -28,6 +30,7 @@ "value_bp", "value_bt", ] +Q_EXP = Q(value_at__gt=0) | Q(value_pt__gt=0) | Q(value_jt__gt=0) | Q(value_bt__gt=0) class StationSerializer(serializers.ModelSerializer): @@ -36,7 +39,9 @@ class StationSerializer(serializers.ModelSerializer): lon = serializers.SerializerMethodField() lat = serializers.SerializerMethodField() sensor_types = serializers.SerializerMethodField() - data_from_year = serializers.SerializerMethodField() + data_until_date = serializers.SerializerMethodField() + data_from_date = serializers.SerializerMethodField() + is_active = serializers.SerializerMethodField() class Meta: model = Station @@ -54,7 +59,9 @@ class Meta: "lon", "lat", "sensor_types", - "data_from_year", + "data_until_date", + "data_from_date", + "is_active", ] def get_y(self, obj): @@ -82,17 +89,35 @@ def get_sensor_types(self, obj): result.append(type) return result - def get_data_from_year(self, obj): - q_exp = ( - Q(value_at__gt=0) - | Q(value_pt__gt=0) - | Q(value_jt__gt=0) - | Q(value_bt__gt=0) - ) - qs = YearData.objects.filter(q_exp, station=obj).order_by("year__year_number") - if qs.count() > 0: - return qs[0].year.year_number - else: + def get_is_active(self, obj): + num_days = [1, 7, 30, 365] + res = {} + for days in num_days: + from_date = date.today() - timedelta(days=days - 1) + day_qs = Day.objects.filter(station=obj, date__gte=from_date) + day_data_qs = DayData.objects.filter(day__in=day_qs) + if day_data_qs.filter(Q_EXP).count() > 0: + res[days] = True + else: + res[days] = False + return res + + def get_data_until_date(self, obj): + try: + return ( + DayData.objects.filter(Q_EXP, station=obj).latest("day__date").day.date + ) + except DayData.DoesNotExist: + return None + + def get_data_from_date(self, obj): + try: + return ( + DayData.objects.filter(Q_EXP, station=obj) + .earliest("day__date") + .day.date + ) + except DayData.DoesNotExist: return None diff --git a/eco_counter/constants.py b/eco_counter/constants.py index 94d4ab1a8..c41829e77 100644 --- a/eco_counter/constants.py +++ b/eco_counter/constants.py @@ -30,6 +30,10 @@ COUNTERS.LAM_COUNTER = LAM_COUNTER COUNTERS.TELRAAM_COUNTER = TELRAAM_COUNTER +COUNTERS_LIST = [ECO_COUNTER, TRAFFIC_COUNTER, LAM_COUNTER, TELRAAM_COUNTER] +COUNTER_CHOICES_STR = ( + f"{ECO_COUNTER}, {TRAFFIC_COUNTER}, {TELRAAM_COUNTER} and {LAM_COUNTER}" +) CSV_DATA_SOURCES = ( (TRAFFIC_COUNTER, "TrafficCounter"), (ECO_COUNTER, "EcoCounter"), @@ -118,12 +122,14 @@ TELRAAM_COUNTER_CSV_FILE_PATH = f"{settings.MEDIA_ROOT}/telraam_data/" TELRAAM_COUNTER_CSV_FILE = ( - TELRAAM_COUNTER_CSV_FILE_PATH + "telraam_data_{id}_{day}_{month}_{year}.csv" + TELRAAM_COUNTER_CSV_FILE_PATH + "telraam_data_{mac}_{day}_{month}_{year}.csv" ) +TELRAAM_STATION_350457790598039 = 350457790598039 +TELRAAM_STATION_350457790600975 = 350457790600975 TELRAAM_COUNTER_CAMERAS = { # Mac id: Direction flag (True=rgt prefix will be keskustaan päin) - 350457790598039: False, # Kristiinanankatu, Joelle katsottaessa vasemmalle - 350457790600975: True, # Kristiinanankatu, Joelle katsottaessa oikealle + TELRAAM_STATION_350457790598039: False, # Kristiinanankatu, Joelle katsottaessa vasemmalle + TELRAAM_STATION_350457790600975: True, # Kristiinanankatu, Joelle katsottaessa oikealle } # For 429 (too many request) TELRAAM need a retry strategy retry_strategy = Retry( @@ -136,3 +142,23 @@ TELRAAM_HTTP = requests.Session() TELRAAM_HTTP.mount("https://", adapter) TELRAAM_HTTP.mount("http://", adapter) + + +# Telraam stations initial geometries in WKT format +# These coordinates are used if CSV files do not include any geometries. +TELRAAM_STATIONS_INITIAL_WKT_GEOMETRIES = { + TELRAAM_STATION_350457790598039: { + "location": "POINT (239628.47846388057 6710757.471557152)", + "geometry": "MULTILINESTRING ((239565.80107971327 6710861.8209667895, 239572.58901459936 6710850.524818219," + " 239574.73294378238 6710846.950531884, 239628.47846388057 6710757.471557152," + " 239630.0339247121 6710754.923177836, 239635.52748551324 6710745.732077925))", + }, + TELRAAM_STATION_350457790600975: { + "location": "POINT (239523.2288977413 6710932.715108742)", + "geometry": "MULTILINESTRING ((239490.42663459244 6710989.092283992, 239493.45037993207 6710983.7110835295," + " 239495.88642941663 6710979.3668986475, 239517.9904128411 6710941.530425406," + " 239520.0691194288 6710937.971973339, 239523.2288977413 6710932.715108742," + " 239529.37000273907 6710922.482472116, 239558.08254550528 6710874.681753734," + " 239559.97438753376 6710871.516775628, 239565.80107971327 6710861.8209667895))", + }, +} diff --git a/eco_counter/management/commands/delete_all_counter_data.py b/eco_counter/management/commands/delete_all_counter_data.py deleted file mode 100644 index 9633df83c..000000000 --- a/eco_counter/management/commands/delete_all_counter_data.py +++ /dev/null @@ -1,17 +0,0 @@ -import logging - -from django import db -from django.core.management.base import BaseCommand - -from eco_counter.models import ImportState, Station - -logger = logging.getLogger("eco_counter") - - -class Command(BaseCommand): - @db.transaction.atomic - def handle(self, *args, **options): - logger.info("Deleting all counter data...") - logger.info(f"{Station.objects.all().delete()}") - logger.info(f"{ImportState.objects.all().delete()}") - logger.info("Deleted all counter data.") diff --git a/eco_counter/management/commands/delete_counter_data.py b/eco_counter/management/commands/delete_counter_data.py new file mode 100644 index 000000000..e145e3de7 --- /dev/null +++ b/eco_counter/management/commands/delete_counter_data.py @@ -0,0 +1,36 @@ +import logging + +from django import db +from django.core.management.base import BaseCommand + +from eco_counter.constants import COUNTER_CHOICES_STR +from eco_counter.management.commands.utils import check_counters_argument +from eco_counter.models import ImportState, Station + +logger = logging.getLogger("eco_counter") + + +class Command(BaseCommand): + def add_arguments(self, parser): + parser.add_argument( + "--counters", + type=str, + nargs="+", + default=False, + help=f"Delete given counter data, choices are: {COUNTER_CHOICES_STR}.", + ) + + @db.transaction.atomic + def handle(self, *args, **options): + counters = options.get("counters", None) + check_counters_argument(counters) + if counters: + for counter in counters: + logger.info(f"Deleting counter data for {counter}") + logger.info( + f"{Station.objects.filter(csv_data_source=counter).delete()}" + ) + logger.info( + f"{ImportState.objects.filter(csv_data_source=counter).delete()}" + ) + logger.info("Deleted counter data.") diff --git a/eco_counter/management/commands/import_counter_data.py b/eco_counter/management/commands/import_counter_data.py index 0a607e682..01fbcc203 100644 --- a/eco_counter/management/commands/import_counter_data.py +++ b/eco_counter/management/commands/import_counter_data.py @@ -16,6 +16,7 @@ from django.core.management.base import BaseCommand, CommandError from eco_counter.constants import ( + COUNTER_CHOICES_STR, COUNTER_START_YEARS, COUNTERS, ECO_COUNTER, @@ -41,10 +42,12 @@ ) from .utils import ( + check_counters_argument, gen_eco_counter_test_csv, get_eco_counter_csv, get_lam_counter_csv, - get_telraam_counter_csv, + get_or_create_telraam_station, + get_telraam_data_frames, get_test_dataframe, get_traffic_counter_csv, save_stations, @@ -63,295 +66,400 @@ # on a six lane road during the sample time (15min), 15min*60s*6lanes. # If the value is greater than the threshold value the value is set to 0. ERRORNEOUS_VALUE_THRESHOLD = 5400 +TIMEZONE = pytz.timezone("Europe/Helsinki") +""" +Movement types: +(A)uto, car +(P)yörä, bicycle +(J)alankulkija, pedestrian +(B)ussi, bus +Direction types: +(K)eskustaan päin, towards the center +(P)poispäin keskustasta, away from the center +So for the example column with prefix "ap" contains data for cars moving away from the center. +The naming convention is derived from the eco-counter source data that was the +original data source. +""" +STATION_TYPES = [ + ("ak", "ap", "at"), + ("pk", "pp", "pt"), + ("jk", "jp", "jt"), + ("bk", "bp", "bt"), +] + +TYPE_DIRS = ["AK", "AP", "JK", "JP", "BK", "BP", "PK", "PP"] +ALL_TYPE_DIRS = TYPE_DIRS + ["AT", "JT", "BT", "PT"] + + +def delete_tables( + csv_data_sources=[ECO_COUNTER, TRAFFIC_COUNTER, LAM_COUNTER, TELRAAM_COUNTER], +): + for csv_data_source in csv_data_sources: + for station in Station.objects.filter(csv_data_source=csv_data_source): + Year.objects.filter(station=station).delete() + ImportState.objects.filter(csv_data_source=csv_data_source).delete() + + +def save_hour_data_values(hour_data, values): + for td in ALL_TYPE_DIRS: + setattr(hour_data, f"values_{td.lower()}", values[td]) + hour_data.save() + + +def save_values(values, dst_obj): + for station_types in STATION_TYPES: + setattr(dst_obj, f"value_{station_types[0]}", values[station_types[0]]) + setattr(dst_obj, f"value_{station_types[1]}", values[station_types[1]]) + setattr( + dst_obj, + f"value_{station_types[2]}", + values[station_types[0]] + values[station_types[1]], + ) + dst_obj.save() -class Command(BaseCommand): - help = "Imports traffic counter data in the Turku region." - COUNTERS = [ECO_COUNTER, TRAFFIC_COUNTER, LAM_COUNTER, TELRAAM_COUNTER] - COUNTER_CHOICES_STR = ( - f"{ECO_COUNTER}, {TRAFFIC_COUNTER}, {TELRAAM_COUNTER} and {LAM_COUNTER}" - ) - TIMEZONE = pytz.timezone("Europe/Helsinki") - """ - Movement types: - (A)uto, car - (P)yörä, bicycle - (J)alankulkija, pedestrian - (B)ussi, bus - Direction types: - (K)eskustaan päin, towards the center - (P)poispäin keskustasta, away from the center - So for the example column with prefix "ap" contains data for cars moving away from the center. - The naming convention is derived from the eco-counter source data that was the - original data source. +def add_values(values, dst_obj): """ - STATION_TYPES = [ - ("ak", "ap", "at"), - ("pk", "pp", "pt"), - ("jk", "jp", "jt"), - ("bk", "bp", "bt"), - ] - - TYPE_DIRS = ["AK", "AP", "JK", "JP", "BK", "BP", "PK", "PP"] - ALL_TYPE_DIRS = TYPE_DIRS + ["AT", "JT", "BT", "PT"] - type_dirs_lower = [TD.lower() for TD in TYPE_DIRS] - - def delete_tables( - self, - csv_data_sources=[ECO_COUNTER, TRAFFIC_COUNTER, LAM_COUNTER, TELRAAM_COUNTER], - ): - for csv_data_source in csv_data_sources: - for station in Station.objects.filter(csv_data_source=csv_data_source): - Year.objects.filter(station=station).delete() - ImportState.objects.filter(csv_data_source=csv_data_source).delete() - - def save_values(self, values, dst_obj): - for station_types in self.STATION_TYPES: - setattr(dst_obj, f"value_{station_types[0]}", values[station_types[0]]) - setattr(dst_obj, f"value_{station_types[1]}", values[station_types[1]]) - setattr( - dst_obj, - f"value_{station_types[2]}", - values[station_types[0]] + values[station_types[1]], - ) - dst_obj.save() - - def add_values(self, values, dst_obj): - """ - Populate values for all movement types and directions for a station. - """ - for station_types in self.STATION_TYPES: - key = f"value_{station_types[0]}" - k_val = getattr(dst_obj, key, 0) + values[station_types[0]] - setattr(dst_obj, key, k_val) - key = f"value_{station_types[1]}" - p_val = getattr(dst_obj, key, 0) + values[station_types[1]] - setattr(dst_obj, key, p_val) - key = f"value_{station_types[2]}" - t_val = ( - getattr(dst_obj, key, 0) - + values[station_types[0]] - + values[station_types[1]] - ) - setattr(dst_obj, key, t_val) - dst_obj.save() - - def get_values(self, sum_series, station_name): - """ - Returns a dict containing the aggregated sum value for every movement type and direction. - """ - values = {} - for type_dir in self.TYPE_DIRS: - key = f"{station_name} {type_dir}" - values[type_dir.lower()] = sum_series.get(key, 0) - return values - - def save_years(self, df, stations): - logger.info("Saving years...") - years = df.groupby(df.index.year) - for index, row in years: - logger.info(f"Saving year {index}") - sum_series = row.sum() - for station in stations: - year, _ = Year.objects.get_or_create(station=station, year_number=index) - values = self.get_values(sum_series, station.name) - year_data, _ = YearData.objects.get_or_create( - year=year, station=station - ) - self.save_values(values, year_data) - - def save_months(self, df, stations): - logger.info("Saving months...") - months = df.groupby([df.index.year, df.index.month]) - for index, row in months: - year_number, month_number = index - logger.info(f"Saving month {month_number} of year {year_number}") - sum_series = row.sum() - for station in stations: - year, _ = Year.objects.get_or_create( - station=station, year_number=year_number - ) - month, _ = Month.objects.get_or_create( - station=station, year=year, month_number=month_number - ) - values = self.get_values(sum_series, station.name) - month_data, _ = MonthData.objects.get_or_create( - year=year, month=month, station=station - ) - self.save_values(values, month_data) + Populate values for all movement types and directions for a station. + """ + for station_types in STATION_TYPES: + key = f"value_{station_types[0]}" + k_val = getattr(dst_obj, key, 0) + values[station_types[0]] + setattr(dst_obj, key, k_val) + key = f"value_{station_types[1]}" + p_val = getattr(dst_obj, key, 0) + values[station_types[1]] + setattr(dst_obj, key, p_val) + key = f"value_{station_types[2]}" + t_val = ( + getattr(dst_obj, key, 0) + + values[station_types[0]] + + values[station_types[1]] + ) + setattr(dst_obj, key, t_val) + dst_obj.save() + - def save_current_year(self, stations, year_number, end_month_number): - logger.info(f"Saving current year {year_number}") +def get_values(sum_series, station_name): + """ + Returns a dict containing the aggregated sum value for every movement type and direction. + """ + values = {} + for type_dir in TYPE_DIRS: + key = f"{station_name} {type_dir}" + values[type_dir.lower()] = sum_series.get(key, 0) + return values + + +def save_years(df, stations): + logger.info("Saving years...") + years = df.groupby(df.index.year) + for index, row in years: + logger.info(f"Saving year {index}") + sum_series = row.sum() + for station in stations: + year, _ = Year.objects.get_or_create(station=station, year_number=index) + values = get_values(sum_series, station.name) + year_data, _ = YearData.objects.get_or_create(year=year, station=station) + save_values(values, year_data) + + +def save_months(df, stations): + logger.info("Saving months...") + months = df.groupby([df.index.year, df.index.month]) + for index, row in months: + year_number, month_number = index + logger.info(f"Saving month {month_number} of year {year_number}") + sum_series = row.sum() for station in stations: year, _ = Year.objects.get_or_create( station=station, year_number=year_number ) - year_data, _ = YearData.objects.get_or_create(station=station, year=year) - for station_types in self.STATION_TYPES: - setattr(year_data, f"value_{station_types[0]}", 0) - setattr(year_data, f"value_{station_types[1]}", 0) - setattr(year_data, f"value_{station_types[2]}", 0) - for month_number in range(1, end_month_number + 1): - month, _ = Month.objects.get_or_create( - station=station, year=year, month_number=month_number - ) - month_data, _ = MonthData.objects.get_or_create( - station=station, month=month, year=year - ) - for station_types in self.STATION_TYPES: - for i in range(3): - key = f"value_{station_types[i]}" - m_val = getattr(month_data, key, 0) - y_val = getattr(year_data, key, 0) - setattr(year_data, key, m_val + y_val) - year_data.save() - - def save_weeks(self, df, stations): - logger.info("Saving weeks...") - weeks = df.groupby([df.index.year, df.index.isocalendar().week]) - for index, row in weeks: - year_number, week_number = index - logger.info(f"Saving week number {week_number} of year {year_number}") - sum_series = row.sum() - for station in stations: - year = Year.objects.get(station=station, year_number=year_number) - week, _ = Week.objects.get_or_create( - station=station, - week_number=week_number, - years__year_number=year_number, - ) - if week.years.count() == 0: - week.years.add(year) + month, _ = Month.objects.get_or_create( + station=station, year=year, month_number=month_number + ) + values = get_values(sum_series, station.name) + month_data, _ = MonthData.objects.get_or_create( + year=year, month=month, station=station + ) + save_values(values, month_data) + + +def save_current_year(stations, year_number, end_month_number): + logger.info(f"Saving current year {year_number}") + for station in stations: + year, _ = Year.objects.get_or_create(station=station, year_number=year_number) + year_data, _ = YearData.objects.get_or_create(station=station, year=year) + for station_types in STATION_TYPES: + setattr(year_data, f"value_{station_types[0]}", 0) + setattr(year_data, f"value_{station_types[1]}", 0) + setattr(year_data, f"value_{station_types[2]}", 0) + for month_number in range(1, end_month_number + 1): + month, _ = Month.objects.get_or_create( + station=station, year=year, month_number=month_number + ) + month_data, _ = MonthData.objects.get_or_create( + station=station, month=month, year=year + ) + for station_types in STATION_TYPES: + for i in range(3): + key = f"value_{station_types[i]}" + m_val = getattr(month_data, key, 0) + y_val = getattr(year_data, key, 0) + setattr(year_data, key, m_val + y_val) + year_data.save() + + +def save_weeks(df, stations): + logger.info("Saving weeks...") + weeks = df.groupby([df.index.year, df.index.isocalendar().week]) + for index, row in weeks: + year_number, week_number = index + logger.info(f"Saving week number {week_number} of year {year_number}") + sum_series = row.sum() + for station in stations: + year = Year.objects.get(station=station, year_number=year_number) + week, _ = Week.objects.get_or_create( + station=station, + week_number=week_number, + years__year_number=year_number, + ) + if week.years.count() == 0: + week.years.add(year) - values = self.get_values(sum_series, station.name) - week_data, _ = WeekData.objects.get_or_create( - station=station, week=week - ) - self.save_values(values, week_data) + values = get_values(sum_series, station.name) + week_data, _ = WeekData.objects.get_or_create(station=station, week=week) + save_values(values, week_data) - def save_days(self, df, stations): - logger.info("Saving days...") - days = df.groupby( - [df.index.year, df.index.month, df.index.isocalendar().week, df.index.day] - ) - prev_week_number = None - for index, row in days: - year_number, month_number, week_number, day_number = index - date = datetime(year_number, month_number, day_number) +def save_days(df, stations): + logger.info("Saving days...") + days = df.groupby( + [df.index.year, df.index.month, df.index.isocalendar().week, df.index.day] + ) + prev_week_number = None + for index, row in days: + year_number, month_number, week_number, day_number = index + + date = datetime(year_number, month_number, day_number) + sum_series = row.sum() + for station in stations: + year = Year.objects.get(station=station, year_number=year_number) + month = Month.objects.get( + station=station, year=year, month_number=month_number + ) + week = Week.objects.get( + station=station, years=year, week_number=week_number + ) + day, _ = Day.objects.get_or_create( + station=station, + date=date, + weekday_number=date.weekday(), + year=year, + month=month, + week=week, + ) + values = get_values(sum_series, station.name) + day_data, _ = DayData.objects.get_or_create(station=station, day=day) + save_values(values, day_data) + if not prev_week_number or prev_week_number != week_number: + prev_week_number = week_number + logger.info(f"Saved days for week {week_number} of year {year_number}") + + +def save_hours(df, stations): + logger.info("Saving hours...") + hours = df.groupby([df.index.year, df.index.month, df.index.day, df.index.hour]) + for i_station, station in enumerate(stations): + prev_day_number = None + prev_month_number = None + values = {k: [] for k in ALL_TYPE_DIRS} + for index, row in hours: sum_series = row.sum() - for station in stations: - year = Year.objects.get(station=station, year_number=year_number) - month = Month.objects.get( - station=station, year=year, month_number=month_number - ) - week = Week.objects.get( - station=station, years=year, week_number=week_number - ) - day, _ = Day.objects.get_or_create( + year_number, month_number, day_number, _ = index + if not prev_day_number: + prev_day_number = day_number + if not prev_month_number: + prev_month_number = month_number + + if day_number != prev_day_number or month_number != prev_month_number: + """ + If day or month changed. Save the hours for the day and clear the values dict. + """ + if month_number != prev_month_number: + prev_day_number = day_number + day = Day.objects.get( + date=datetime(year_number, month_number, prev_day_number), station=station, - date=date, - weekday_number=date.weekday(), - year=year, - month=month, - week=week, ) - values = self.get_values(sum_series, station.name) - day_data, _ = DayData.objects.get_or_create(station=station, day=day) - self.save_values(values, day_data) - if not prev_week_number or prev_week_number != week_number: - prev_week_number = week_number - logger.info(f"Saved days for week {week_number} of year {year_number}") - - def save_hours(self, df, stations): - logger.info("Saving hours...") - hours = df.groupby([df.index.year, df.index.month, df.index.day, df.index.hour]) - for i_station, station in enumerate(stations): - prev_day_number = None - prev_month_number = None - values = {k: [] for k in self.ALL_TYPE_DIRS} - for index, row in hours: - sum_series = row.sum() - year_number, month_number, day_number, _ = index - if not prev_day_number: - prev_day_number = day_number - if not prev_month_number: - prev_month_number = month_number - - if day_number != prev_day_number or month_number != prev_month_number: - """ - If day or month changed. Save the hours for the day and clear the values dict. - """ - if month_number != prev_month_number: - prev_day_number = day_number - day = Day.objects.get( - date=datetime(year_number, month_number, prev_day_number), - station=station, + hour_data, _ = HourData.objects.get_or_create(station=station, day=day) + save_hour_data_values(hour_data, values) + values = {k: [] for k in ALL_TYPE_DIRS} + # output logger only when last station is saved + if i_station == len(stations) - 1: + logger.info( + f"Saved hour data for day {prev_day_number}, month {prev_month_number} year {year_number}" ) - hour_data, _ = HourData.objects.get_or_create( - station=station, day=day - ) - for td in self.ALL_TYPE_DIRS: - setattr(hour_data, f"values_{td.lower()}", values[td]) - hour_data.save() - values = {k: [] for k in self.ALL_TYPE_DIRS} - # output logger only when last station is saved - if i_station == len(stations) - 1: - logger.info( - f"Saved hour data for day {prev_day_number}, month {prev_month_number} year {year_number}" - ) - prev_day_number = day_number - prev_month_number = month_number - else: - # Add data to values dict for an hour - for station_types in self.STATION_TYPES: - for i in range(3): - if i < 2: - dir_key = f"{station.name} {station_types[i].upper()}" - val = sum_series.get(dir_key, 0) - else: - k_key = f"{station.name} {station_types[0].upper()}" - p_key = f"{station.name} {station_types[1].upper()}" - val = sum_series.get(p_key, 0) + sum_series.get( - k_key, 0 - ) - values_key = station_types[i].upper() - values[values_key].append(val) - - def save_observations(self, csv_data, start_time, csv_data_source=ECO_COUNTER): - import_state = ImportState.objects.get(csv_data_source=csv_data_source) - # Populate stations list, this is used to set/lookup station relations. + prev_day_number = day_number + prev_month_number = month_number + # Add data to values dict for an hour + for station_types in STATION_TYPES: + for i in range(len(station_types)): + if i < 2: + dir_key = f"{station.name} {station_types[i].upper()}" + val = sum_series.get(dir_key, 0) + else: + k_key = f"{station.name} {station_types[0].upper()}" + p_key = f"{station.name} {station_types[1].upper()}" + val = sum_series.get(p_key, 0) + sum_series.get(k_key, 0) + values_key = station_types[i].upper() + values[values_key].append(val) + + # Save hour datas for the last day in data frame + day, _ = Day.objects.get_or_create( + date=datetime(year_number, month_number, day_number), + station=station, + ) + hour_data, _ = HourData.objects.get_or_create(station=station, day=day) + save_hour_data_values(hour_data, values) + + +def save_observations(csv_data, start_time, csv_data_source=ECO_COUNTER, station=None): + import_state = ImportState.objects.get(csv_data_source=csv_data_source) + # Populate stations list, this is used to set/lookup station relations. + if not station: stations = [ station for station in Station.objects.filter(csv_data_source=csv_data_source) ] - df = csv_data - df["Date"] = pd.to_datetime(df["startTime"], format="%Y-%m-%dT%H:%M") - df = df.drop("startTime", axis=1) - df = df.set_index("Date") - # Fill missing cells with the value 0 - df = df.fillna(0) - # Set negative numbers to 0 - df = df.clip(lower=0) - # Set values higher than ERRORNEOUS_VALUES_THRESHOLD to 0 - df[df > ERRORNEOUS_VALUE_THRESHOLD] = 0 - if not import_state.current_year_number: - # In initial import populate all years. - self.save_years(df, stations) - self.save_months(df, stations) - if import_state.current_year_number: - end_month_number = df.index[-1].month - self.save_current_year(stations, start_time.year, end_month_number) - - self.save_weeks(df, stations) - self.save_days(df, stations) - self.save_hours(df, stations) - end_date = df.index[-1] - import_state.current_year_number = end_date.year - import_state.current_month_number = end_date.month - import_state.save() - logger.info(f"Imported observations until:{str(end_date)}") + else: + stations = [station] + df = csv_data + df["Date"] = pd.to_datetime(df["startTime"], format="%Y-%m-%dT%H:%M") + df = df.drop("startTime", axis=1) + df = df.set_index("Date") + # Fill missing cells with the value 0 + df = df.fillna(0) + # Set negative numbers to 0 + df = df.clip(lower=0) + # Set values higher than ERRORNEOUS_VALUES_THRESHOLD to 0 + df[df > ERRORNEOUS_VALUE_THRESHOLD] = 0 + if not import_state.current_year_number: + # In initial import populate all years. + save_years(df, stations) + save_months(df, stations) + if import_state.current_year_number: + end_month_number = df.index[-1].month + save_current_year(stations, start_time.year, end_month_number) + + save_weeks(df, stations) + save_days(df, stations) + save_hours(df, stations) + end_date = df.index[-1] + import_state.current_year_number = end_date.year + import_state.current_month_number = end_date.month + import_state.current_day_number = end_date.day + import_state.save() + logger.info(f"Imported observations until:{str(end_date)}") + + +def save_telraam_data(start_time): + data_frames = get_telraam_data_frames(start_time.date()) + for item in data_frames.items(): + if len(item) == 0: + logger.error("Found Telraam dataframe without data") + break + station = get_or_create_telraam_station(item[0]) + logger.info(f"Saving Telraam station {station.name}") + # Save dataframes for the camera(station) + for csv_data in item[1]: + start_time = csv_data.iloc[0][0].to_pydatetime() + save_observations( + csv_data, + start_time, + csv_data_source=TELRAAM_COUNTER, + station=station, + ) + + +def handle_initial_import(initial_import_counters): + delete_tables(csv_data_sources=initial_import_counters) + for counter in initial_import_counters: + ImportState.objects.filter(csv_data_source=counter).delete() + ImportState.objects.create(csv_data_source=counter) + logger.info(f"Retrieving stations for {counter}.") + # As Telraam counters are dynamic, create after CSV data is processed + if counter == TELRAAM_COUNTER: + Station.objects.filter(csv_data_source=counter).delete() + else: + save_stations(counter) + + +def import_data(counters): + for counter in counters: + logger.info(f"Importing/counting data for {counter}...") + import_state = ImportState.objects.filter(csv_data_source=counter).first() + if not import_state: + logger.error( + "ImportState instance not found, try importing with the '--init' argument." + ) + break + if import_state.current_year_number and import_state.current_month_number: + start_time = "{year}-{month}-1T00:00".format( + year=import_state.current_year_number, + month=import_state.current_month_number, + ) + else: + start_month = ( + TELRAAM_COUNTER_START_MONTH if counter == TELRAAM_COUNTER else "01" + ) + start_time = f"{COUNTER_START_YEARS[counter]}-{start_month}-01" + + start_time = dateutil.parser.parse(start_time) + start_time = TIMEZONE.localize(start_time) + # The timeformat for the input data is : 2020-03-01T00:00 + # Convert starting time to input datas timeformat + start_time_string = start_time.strftime("%Y-%m-%dT%H:%M") + match counter: + # case COUNTERS.TELRAAM_COUNTER: + # Telraam counters are handled differently due to their dynamic nature + case COUNTERS.LAM_COUNTER: + csv_data = get_lam_counter_csv(start_time.date()) + case COUNTERS.ECO_COUNTER: + csv_data = get_eco_counter_csv() + case COUNTERS.TRAFFIC_COUNTER: + if import_state.current_year_number: + start_year = import_state.current_year_number + else: + start_year = TRAFFIC_COUNTER_START_YEAR + csv_data = get_traffic_counter_csv(start_year=start_year) + + if counter == TELRAAM_COUNTER: + save_telraam_data(start_time) + else: + start_index = csv_data.index[ + csv_data[INDEX_COLUMN_NAME] == start_time_string + ].values[0] + # As LAM data is fetched with a timespan, no index data is available, instead + # show time. + if counter == LAM_COUNTER: + logger.info(f"Starting saving observations at time:{start_time}") + else: + logger.info(f"Starting saving observations at index:{start_index}") + + csv_data = csv_data[start_index:] + save_observations( + csv_data, + start_time, + csv_data_source=counter, + ) + # Try to free some memory + del csv_data + gc.collect() + + +class Command(BaseCommand): + help = "Imports traffic counter data in the Turku region." def add_arguments(self, parser): parser.add_argument( @@ -360,7 +468,7 @@ def add_arguments(self, parser): nargs="+", default=False, help=f"For given counters in arguments deletes all tables before importing, imports stations and\ - starts importing from row 0. The counter arguments are: {self.COUNTER_CHOICES_STR}", + starts importing from row 0. The counter arguments are: {COUNTER_CHOICES_STR}", ) parser.add_argument( "--test-counter", @@ -374,118 +482,45 @@ def add_arguments(self, parser): type=str, nargs="+", default=False, - help=f"Import specific counter(s) data, choices are: {self.COUNTER_CHOICES_STR}.", + help=f"Import specific counter(s) data, choices are: {COUNTER_CHOICES_STR}.", ) - def check_counters_argument(self, counters): - for counter in counters: - if counter not in self.COUNTERS: - raise CommandError( - f"Invalid counter type, valid types are: {self.COUNTER_CHOICES_STR}." - ) - def handle(self, *args, **options): initial_import_counters = None start_time = None if options["initial_import"]: if len(options["initial_import"]) == 0: raise CommandError( - f"Specify the counter(s), choices are: {self.COUNTER_CHOICES_STR}." + f"Specify the counter(s), choices are: {COUNTER_CHOICES_STR}." ) else: initial_import_counters = options["initial_import"] - self.check_counters_argument(initial_import_counters) + check_counters_argument(initial_import_counters) logger.info(f"Deleting tables for: {initial_import_counters}") - self.delete_tables(csv_data_sources=initial_import_counters) - for counter in initial_import_counters: - ImportState.objects.filter(csv_data_source=counter).delete() - import_state = ImportState.objects.create( - csv_data_source=counter, - ) - logger.info(f"Retrieving stations for {counter}.") - save_stations(counter) + handle_initial_import(initial_import_counters) if options["test_counter"]: logger.info("Testing eco_counter importer.") counter = options["test_counter"][0] start_time = options["test_counter"][1] end_time = options["test_counter"][2] - import_state, _ = ImportState.objects.get_or_create(csv_data_source=counter) + ImportState.objects.get_or_create(csv_data_source=counter) test_dataframe = get_test_dataframe(counter) csv_data = gen_eco_counter_test_csv( test_dataframe.keys(), start_time, end_time ) - self.save_observations( + save_observations( csv_data, start_time, csv_data_source=counter, ) + # Import if counters arg or initial import. if options["counters"] or initial_import_counters: if not initial_import_counters: # run with counters argument counters = options["counters"] - self.check_counters_argument(counters) + check_counters_argument(counters) else: counters = initial_import_counters - - for counter in counters: - logger.info(f"Importing/counting data for {counter}...") - import_state = ImportState.objects.filter( - csv_data_source=counter - ).first() - - if ( - import_state.current_year_number - and import_state.current_month_number - ): - start_time = "{year}-{month}-1T00:00".format( - year=import_state.current_year_number, - month=import_state.current_month_number, - ) - else: - start_month = ( - TELRAAM_COUNTER_START_MONTH - if counter == TELRAAM_COUNTER - else "01" - ) - start_time = f"{COUNTER_START_YEARS[counter]}-{start_month}-01" - - start_time = dateutil.parser.parse(start_time) - start_time = self.TIMEZONE.localize(start_time) - # The timeformat for the input data is : 2020-03-01T00:00 - # Convert starting time to input datas timeformat - start_time_string = start_time.strftime("%Y-%m-%dT%H:%M") - # start_index = None - match counter: - case COUNTERS.TELRAAM_COUNTER: - csv_data = get_telraam_counter_csv(start_time.date()) - case COUNTERS.LAM_COUNTER: - csv_data = get_lam_counter_csv(start_time.date()) - case COUNTERS.ECO_COUNTER: - csv_data = get_eco_counter_csv() - case COUNTERS.TRAFFIC_COUNTER: - if import_state.current_year_number: - start_year = import_state.current_year_number - else: - start_year = TRAFFIC_COUNTER_START_YEAR - csv_data = get_traffic_counter_csv(start_year=start_year) - start_index = csv_data.index[ - csv_data[INDEX_COLUMN_NAME] == start_time_string - ].values[0] - # As LAM data is fetched with a timespan, no index data is available, instead - # show time. - if counter == LAM_COUNTER: - logger.info(f"Starting saving observations at time:{start_time}") - else: - logger.info(f"Starting saving observations at index:{start_index}") - - csv_data = csv_data[start_index:] - self.save_observations( - csv_data, - start_time, - csv_data_source=counter, - ) - # Try to Free memory - del csv_data - gc.collect() + import_data(counters) diff --git a/eco_counter/management/commands/import_telraam_to_csv.py b/eco_counter/management/commands/import_telraam_to_csv.py index a6143a79a..729e04c41 100644 --- a/eco_counter/management/commands/import_telraam_to_csv.py +++ b/eco_counter/management/commands/import_telraam_to_csv.py @@ -28,7 +28,10 @@ TELRAAM_CSV, TELRAAM_HTTP, ) -from eco_counter.management.commands.utils import get_telraam_cameras +from eco_counter.management.commands.utils import ( + get_telraam_camera_location_and_geometry, + get_telraam_cameras, +) from eco_counter.models import ImportState TOKEN = settings.TELRAAM_TOKEN @@ -257,7 +260,7 @@ def save_dataframe(from_date: date = True) -> datetime: df = df.astype(int) csv_file = TELRAAM_COUNTER_CSV_FILE.format( - id=camera["mac"], + mac=camera["mac"], day=start_date.day, month=start_date.month, year=start_date.year, @@ -267,7 +270,15 @@ def save_dataframe(from_date: date = True) -> datetime: if os.path.exists(csv_file): os.remove(csv_file) if not os.path.exists(csv_file) or can_overwrite_csv_file: - df.to_csv(csv_file) + location, geometry = get_telraam_camera_location_and_geometry( + camera["segment_id"] + ) + # Write to WKT of the location, as the cameras position can change + with open(csv_file, "w") as file: + file.write(f"# {location.wkt} \n") + file.write(f"# {geometry.wkt} \n") + + df.to_csv(csv_file, mode="a") start_date += timedelta(days=1) start_date -= timedelta(days=1) diff --git a/eco_counter/management/commands/utils.py b/eco_counter/management/commands/utils.py index 695eefb81..90363b66b 100644 --- a/eco_counter/management/commands/utils.py +++ b/eco_counter/management/commands/utils.py @@ -8,9 +8,12 @@ from django.conf import settings from django.contrib.gis.gdal import DataSource from django.contrib.gis.geos import GEOSGeometry, LineString, MultiLineString, Point +from django.core.management.base import CommandError from eco_counter.constants import ( + COUNTER_CHOICES_STR, COUNTERS, + COUNTERS_LIST, ECO_COUNTER, INDEX_COLUMN_NAME, LAM_COUNTER, @@ -24,14 +27,14 @@ TELRAAM_COUNTER_CAMERAS, TELRAAM_COUNTER_CAMERAS_URL, TELRAAM_COUNTER_CSV_FILE, - TELRAAM_CSV, TELRAAM_HTTP, + TELRAAM_STATIONS_INITIAL_WKT_GEOMETRIES, TRAFFIC_COUNTER, TRAFFIC_COUNTER_CSV_URLS, TRAFFIC_COUNTER_METADATA_GEOJSON, ) -from eco_counter.models import ImportState, Station -from eco_counter.tests.test_import_counter_data import TEST_COLUMN_NAMES +from eco_counter.models import Station +from eco_counter.tests.constants import TEST_COLUMN_NAMES from mobility_data.importers.utils import get_root_dir logger = logging.getLogger("eco_counter") @@ -67,47 +70,22 @@ def __init__(self, feature): self.location = geom -class TelraamCounterStation: - # The Telraam API return the coordinates in EPSGS 31370 - SOURCE_SRID = 4326 - TARGET_SRID = settings.DEFAULT_SRID - - def get_location_and_geometry(self, id): - url = TELRAAM_COUNTER_CAMERA_SEGMENTS_URL.format(id=id) - headers = { - "X-Api-Key": settings.TELRAAM_TOKEN, - } - response = TELRAAM_HTTP.get(url, headers=headers) - assert ( - response.status_code == 200 - ), "Could not fetch segment for camera {id}".format(id=id) - json_data = response.json() - coords = json_data["features"][0]["geometry"]["coordinates"] - lss = [] - for coord in coords: - ls = LineString(coord, srid=self.SOURCE_SRID) - lss.append(ls) - geometry = MultiLineString(lss, srid=self.SOURCE_SRID) - geometry.transform(self.TARGET_SRID) - mid_line = round(len(coords) / 2) - mid_point = round(len(coords[mid_line]) / 2) - location = Point(coords[mid_line][mid_point], srid=self.SOURCE_SRID) - location.transform(self.TARGET_SRID) - return location, geometry +# class TelraamCounterStation: +# # The Telraam API return the coordinates in EPSGS 31370 +# SOURCE_SRID = 4326 +# TARGET_SRID = settings.DEFAULT_SRID - def __init__(self, feature): - self.name = feature["mac"] - self.name_sv = feature["mac"] - self.name_en = feature["mac"] - self.location, self.geometry = self.get_location_and_geometry( - feature["segment_id"] - ) - self.station_id = feature["mac"] +# def __init__(self, feature): +# self.name = feature["mac"] +# self.name_sv = feature["mac"] +# self.name_en = feature["mac"] +# self.location, self.geometry = get_telraam_camera_location_and_geometry( +# feature["segment_id"], self.SOURCE_SRID, self.TARGET_SRID +# ) +# self.station_id = feature["mac"] -class ObservationStation( - LAMStation, EcoCounterStation, TrafficCounterStation, TelraamCounterStation -): +class ObservationStation(LAMStation, EcoCounterStation, TrafficCounterStation): def __init__(self, csv_data_source, feature): self.csv_data_source = csv_data_source self.name = None @@ -117,8 +95,8 @@ def __init__(self, csv_data_source, feature): self.geometry = None self.station_id = None match csv_data_source: - case COUNTERS.TELRAAM_COUNTER: - TelraamCounterStation.__init__(self, feature) + # case COUNTERS.TELRAAM_COUNTER: + # TelraamCounterStation.__init__(self, feature) case COUNTERS.LAM_COUNTER: LAMStation.__init__(self, feature) case COUNTERS.ECO_COUNTER: @@ -127,6 +105,21 @@ def __init__(self, csv_data_source, feature): TrafficCounterStation.__init__(self, feature) +class TelraamStation: + def __init__(self, mac, location, geometry): + self.mac = mac + self.location = location + self.geometry = geometry + + +def check_counters_argument(counters): + for counter in counters: + if counter not in COUNTERS_LIST: + raise CommandError( + f"Invalid counter type, valid types are: {COUNTER_CHOICES_STR}." + ) + + def get_traffic_counter_metadata_data_layer(): meta_file = f"{get_root_dir()}/eco_counter/data/{TRAFFIC_COUNTER_METADATA_GEOJSON}" return DataSource(meta_file)[0] @@ -407,56 +400,169 @@ def get_telraam_counter_stations(): return stations -def get_telraam_counter_csv(from_date): - df = pd.DataFrame() - try: - import_state = ImportState.objects.get(csv_data_source=TELRAAM_CSV) - except ImportState.DoesNotExist: - return None - end_date = date( - import_state.current_year_number, - import_state.current_month_number, - import_state.current_day_number, +def get_telraam_camera_location_and_geometry(id, source_srid=4326, target_srid=3067): + url = TELRAAM_COUNTER_CAMERA_SEGMENTS_URL.format(id=id) + headers = { + "X-Api-Key": settings.TELRAAM_TOKEN, + } + response = TELRAAM_HTTP.get(url, headers=headers) + assert ( + response.status_code == 200 + ), "Could not fetch segment for camera {id}".format(id=id) + json_data = response.json() + if len(json_data["features"]) == 0: + logger.error(f"No data for Telraam camera with segment_id: {id}") + return None, None + + coords = json_data["features"][0]["geometry"]["coordinates"] + lss = [] + for coord in coords: + ls = LineString(coord, srid=source_srid) + lss.append(ls) + geometry = MultiLineString(lss, srid=source_srid) + geometry.transform(target_srid) + mid_line = round(len(coords) / 2) + mid_point = round(len(coords[mid_line]) / 2) + location = Point(coords[mid_line][mid_point], srid=source_srid) + location.transform(target_srid) + return location, geometry + + +def get_telraam_dataframe(mac, day, month, year): + csv_file = TELRAAM_COUNTER_CSV_FILE.format( + mac=mac, + day=day, + month=month, + year=year, + ) + comment_lines = [] + skiprows = 0 + # The location and geometry is stored as comments to the csv file + with open(csv_file, "r") as file: + for line in file: + if line.startswith("#"): + comment_lines.append(line) + skiprows += 1 + else: + break + return ( + pd.read_csv(csv_file, index_col=False, skiprows=skiprows), + csv_file, + comment_lines, ) + + +def parse_telraam_comment_lines(comment_lines): + location = None + geometry = None + comment_lines = [c.replace("# ", "") for c in comment_lines] + if len(comment_lines) > 0: + location = GEOSGeometry(comment_lines[0]) + if len(comment_lines) > 1: + geometry = GEOSGeometry(comment_lines[1]) + return location, geometry + + +def get_telraam_data_frames(from_date): + """ + For every camera create a dataframe for each location the camera has been placed. + """ + end_date = date.today() + data_frames = {} for camera in get_telraam_cameras(): df_cam = pd.DataFrame() start_date = from_date - + current_station = None + prev_comment_lines = [] while start_date <= end_date: - csv_file = TELRAAM_COUNTER_CSV_FILE.format( - id=camera["mac"], - day=start_date.day, - month=start_date.month, - year=start_date.year, - ) try: - df_tmp = pd.read_csv(csv_file, index_col=False) + df_tmp, csv_file, comment_lines = get_telraam_dataframe( + camera["mac"], start_date.day, start_date.month, start_date.year + ) except FileNotFoundError: logger.warning( f"File {csv_file} not found, skipping day{str(start_date)} for camera {camera}" ) else: + if not comment_lines and not current_station: + # Set the initial station, e.i, no coordinates defined in CSV source data + current_station = TelraamStation( + mac=camera["mac"], + location=GEOSGeometry( + TELRAAM_STATIONS_INITIAL_WKT_GEOMETRIES[camera["mac"]][ + "location" + ] + ), + geometry=GEOSGeometry( + TELRAAM_STATIONS_INITIAL_WKT_GEOMETRIES[camera["mac"]][ + "geometry" + ] + ), + ) + data_frames[current_station] = [] + elif comment_lines and not current_station: + location, geometry = parse_telraam_comment_lines(comment_lines) + current_station = TelraamStation( + mac=camera["mac"], location=location, geometry=geometry + ) + data_frames[current_station] = [] + + if prev_comment_lines != comment_lines: + location, geometry = parse_telraam_comment_lines(comment_lines) + # CSV files might contain the initial coordinates, to avoid creating duplicated check coordinates + if ( + location.wkt != current_station.location.wkt + and geometry.wkt != current_station.geometry.wkt + ): + df_cam[INDEX_COLUMN_NAME] = pd.to_datetime( + df_cam[INDEX_COLUMN_NAME], + format=TELRAAM_COUNTER_API_TIME_FORMAT, + ) + data_frames[current_station].append(df_cam) + current_station = TelraamStation( + mac=camera["mac"], location=location, geometry=geometry + ) + df_cam = pd.DataFrame() + data_frames[current_station] = [] df_cam = pd.concat([df_cam, df_tmp]) + finally: + prev_comment_lines = comment_lines start_date += timedelta(days=1) + if not df_cam.empty: + df_cam[INDEX_COLUMN_NAME] = pd.to_datetime( + df_cam[INDEX_COLUMN_NAME], format=TELRAAM_COUNTER_API_TIME_FORMAT + ) + data_frames[current_station].append(df_cam) - if df.empty: - df = df_cam - else: - df = pd.merge(df, df_cam, on=INDEX_COLUMN_NAME) + return data_frames - df[INDEX_COLUMN_NAME] = pd.to_datetime( - df[INDEX_COLUMN_NAME], format=TELRAAM_COUNTER_API_TIME_FORMAT - ) - return df + +def get_or_create_telraam_station(station): + name = str(station.mac) + filter = { + "csv_data_source": TELRAAM_COUNTER, + "name": name, + "name_sv": name, + "name_en": name, + "location": station.location, + "geometry": station.geometry, + "station_id": station.mac, + } + station_qs = Station.objects.filter(**filter) + if not station_qs.exists(): + obj = Station.objects.create(**filter) + else: + obj = station_qs.first() + return obj def save_stations(csv_data_source): stations = [] num_created = 0 match csv_data_source: - case COUNTERS.TELRAAM_COUNTER: - stations = get_telraam_counter_stations() + # case COUNTERS.TELRAAM_COUNTER: + # Telraam station are handled differently as they are dynamic case COUNTERS.LAM_COUNTER: stations = get_lam_counter_stations() case COUNTERS.ECO_COUNTER: @@ -503,14 +609,14 @@ def get_test_dataframe(counter): def gen_eco_counter_test_csv( - columns, start_time, end_time, time_stamp_column="startTime" + columns, start_time, end_time, time_stamp_column="startTime", freq="15min" ): """ Generates test data for a given timespan, - for every row (15min) the value 1 is set. + for every row ('freq') the value 1 is set. """ df = pd.DataFrame() - timestamps = pd.date_range(start=start_time, end=end_time, freq="15min") + timestamps = pd.date_range(start=start_time, end=end_time, freq=freq) for col in columns: vals = [1 for i in range(len(timestamps))] df.insert(0, col, vals) diff --git a/eco_counter/tasks.py b/eco_counter/tasks.py index 9ddefd6f4..af525b84a 100644 --- a/eco_counter/tasks.py +++ b/eco_counter/tasks.py @@ -14,8 +14,8 @@ def initial_import_counter_data(args, name="initial_import_counter_data"): @shared_task_email -def delete_all_counter_data(name="delete_all_counter_data"): - management.call_command("delete_all_counter_data") +def delete_counter_data(args, name="delete_counter_data"): + management.call_command("delete_counter_data", "--counters", args) @shared_task_email diff --git a/eco_counter/tests/conftest.py b/eco_counter/tests/conftest.py index e5af61b79..ed3fb16ef 100644 --- a/eco_counter/tests/conftest.py +++ b/eco_counter/tests/conftest.py @@ -1,4 +1,4 @@ -from datetime import timedelta +from datetime import date, timedelta import dateutil.parser import pytest @@ -133,58 +133,8 @@ def hour_data(stations, days): station=stations[0], day=days[0], ) - hour_data.values_ak = [ - 1, - 2, - 3, - 4, - 5, - 6, - 7, - 8, - 9, - 10, - 11, - 12, - 13, - 14, - 15, - 16, - 17, - 18, - 19, - 20, - 21, - 22, - 23, - 24, - ] - hour_data.values_ap = [ - 1, - 2, - 3, - 4, - 5, - 6, - 7, - 8, - 9, - 10, - 11, - 12, - 13, - 14, - 15, - 16, - 17, - 18, - 19, - 20, - 21, - 22, - 23, - 24, - ] + hour_data.values_ak = [v for v in range(1, 25)] + hour_data.values_ap = [v for v in range(1, 25)] hour_data.save() return hour_data @@ -197,6 +147,7 @@ def day_datas(stations, days): day_data = DayData.objects.create(station=stations[0], day=days[i]) day_data.value_ak = 5 + i day_data.value_ap = 6 + i + day_data.value_at = day_data.value_ak + day_data.value_ap day_data.save() day_datas.append(day_data) return day_datas @@ -240,3 +191,63 @@ def year_datas(stations, years): year_data.save() year_datas.append(year_data) return year_datas + + +@pytest.mark.django_db +@pytest.fixture +def is_active_fixtures(): + station0 = Station.objects.create( + id=0, + name="Station with 0 day of data", + location="POINT(0 0)", + csv_data_source=LAM_COUNTER, + ) + station1 = Station.objects.create( + id=1, + name="Station with 1 day of data", + location="POINT(0 0)", + csv_data_source=LAM_COUNTER, + ) + station7 = Station.objects.create( + id=7, + name="Station with 7 days of data", + location="POINT(0 0)", + csv_data_source=LAM_COUNTER, + ) + station30 = Station.objects.create( + id=30, + name="Station with 30 days of data", + location="POINT(0 0)", + csv_data_source=LAM_COUNTER, + ) + start_date = date.today() + current_date = start_date + days_counter = 0 + day_counts = [0, 1, 7, 30] + stations = [station0, station1, station7, station30] + while current_date >= start_date - timedelta(days=32): + for i, station in enumerate(stations): + days = day_counts[i] + day = Day.objects.create(station=station, date=current_date) + day_data = DayData.objects.create(station=station, day=day) + if i > 0: + start_day = day_counts[i - 1] + else: + start_day = 10000 + + if days > days_counter & days_counter >= start_day: + day_data.value_at = 1 + day_data.value_pt = 1 + day_data.value_jt = 1 + day_data.value_bt = 1 + day_data.save() + else: + day_data.value_at = 0 + day_data.value_pt = 0 + day_data.value_jt = 0 + day_data.value_bt = 0 + day_data.save() + + current_date -= timedelta(days=1) + days_counter += 1 + return Station.objects.all(), Day.objects.all(), DayData.objects.all() diff --git a/eco_counter/tests/constants.py b/eco_counter/tests/constants.py index 2935a3cc3..e3b9b3b14 100644 --- a/eco_counter/tests/constants.py +++ b/eco_counter/tests/constants.py @@ -1,3 +1,5 @@ +from eco_counter.constants import ECO_COUNTER, LAM_COUNTER, TRAFFIC_COUNTER + TEST_EC_STATION_NAME = "Auransilta" TEST_TC_STATION_NAME = "Myllysilta" TEST_LC_STATION_NAME = "Tie 8 Raisio" @@ -37,3 +39,9 @@ "Tie 8 Raisio BP", "Tie 8 Raisio BK", ] + +TEST_COLUMN_NAMES = { + ECO_COUNTER: ECO_COUNTER_TEST_COLUMN_NAMES, + TRAFFIC_COUNTER: TRAFFIC_COUNTER_TEST_COLUMN_NAMES, + LAM_COUNTER: LAM_COUNTER_TEST_COLUMN_NAMES, +} diff --git a/eco_counter/tests/test_api.py b/eco_counter/tests/test_api.py index 9e7fc2ce7..237d21886 100644 --- a/eco_counter/tests/test_api.py +++ b/eco_counter/tests/test_api.py @@ -3,12 +3,47 @@ import pytest from rest_framework.reverse import reverse -from .conftest import TEST_TIMESTAMP from .constants import TEST_EC_STATION_NAME @pytest.mark.django_db -def test__hour_data(api_client, hour_data): +def test_is_active(api_client, is_active_fixtures): + url = reverse("eco_counter:stations-detail", args=[0]) + response = api_client.get(url) + assert response.status_code == 200 + is_active = response.json()["is_active"] + assert is_active["1"] is False + assert is_active["7"] is False + assert is_active["30"] is False + assert is_active["365"] is False + url = reverse("eco_counter:stations-detail", args=[1]) + response = api_client.get(url) + assert response.status_code == 200 + is_active = response.json()["is_active"] + assert is_active["1"] is True + assert is_active["7"] is True + assert is_active["30"] is True + assert is_active["365"] is True + url = reverse("eco_counter:stations-detail", args=[7]) + response = api_client.get(url) + assert response.status_code == 200 + is_active = response.json()["is_active"] + assert is_active["1"] is False + assert is_active["7"] is True + assert is_active["30"] is True + assert is_active["365"] is True + url = reverse("eco_counter:stations-detail", args=[30]) + response = api_client.get(url) + assert response.status_code == 200 + is_active = response.json()["is_active"] + assert is_active["1"] is False + assert is_active["7"] is False + assert is_active["30"] is True + assert is_active["365"] is True + + +@pytest.mark.django_db +def test_hour_data(api_client, hour_data): url = reverse("eco_counter:hour_data-list") response = api_client.get(url) assert response.status_code == 200 @@ -18,7 +53,7 @@ def test__hour_data(api_client, hour_data): @pytest.mark.django_db -def test__day_data( +def test_day_data( api_client, day_datas, ): @@ -38,7 +73,7 @@ def test__day_data( @pytest.mark.django_db -def test__get_day_data(api_client, day_datas, station_id, test_timestamp): +def test_get_day_data(api_client, day_datas, station_id, test_timestamp): url = reverse( "eco_counter:day_data-get-day-data" ) + "?station_id={}&date={}".format(station_id, test_timestamp + timedelta(days=3)) @@ -50,7 +85,7 @@ def test__get_day_data(api_client, day_datas, station_id, test_timestamp): @pytest.mark.django_db -def test__get_day_datas(api_client, day_datas, station_id, test_timestamp): +def test_get_day_datas(api_client, day_datas, station_id, test_timestamp): url = reverse( "eco_counter:day_data-get-day-datas" ) + "?station_id={}&start_date={}&end_date={}".format( @@ -59,7 +94,6 @@ def test__get_day_datas(api_client, day_datas, station_id, test_timestamp): response = api_client.get(url) assert response.status_code == 200 res_json = response.json() - for i in range(4): assert res_json[i]["value_ak"] == day_datas[i].value_ak assert res_json[i]["value_ap"] == day_datas[i].value_ap @@ -67,7 +101,7 @@ def test__get_day_datas(api_client, day_datas, station_id, test_timestamp): @pytest.mark.django_db -def test__week_data(api_client, week_datas): +def test_week_data(api_client, week_datas): url = reverse("eco_counter:week_data-list") response = api_client.get(url) assert response.status_code == 200 @@ -84,7 +118,7 @@ def test__week_data(api_client, week_datas): @pytest.mark.django_db -def test__get_week_data(api_client, week_datas, station_id, test_timestamp): +def test_get_week_data(api_client, week_datas, station_id, test_timestamp): url = reverse( "eco_counter:week_data-get-week-data" ) + "?station_id={}&week_number={}&year_number={}".format( @@ -97,7 +131,7 @@ def test__get_week_data(api_client, week_datas, station_id, test_timestamp): @pytest.mark.django_db -def test__get_week_datas(api_client, week_datas, station_id, test_timestamp): +def test_get_week_datas(api_client, week_datas, station_id, test_timestamp): end_week_number = test_timestamp + timedelta(weeks=4) url = reverse( "eco_counter:week_data-get-week-datas" @@ -117,7 +151,7 @@ def test__get_week_datas(api_client, week_datas, station_id, test_timestamp): @pytest.mark.django_db -def test__month_data(api_client, month_datas): +def test_month_data(api_client, month_datas): url = reverse("eco_counter:month_data-list") response = api_client.get(url) assert response.status_code == 200 @@ -135,7 +169,7 @@ def test__month_data(api_client, month_datas): @pytest.mark.django_db -def test__get_month_data(api_client, month_datas, station_id, test_timestamp): +def test_get_month_data(api_client, month_datas, station_id, test_timestamp): url = reverse( "eco_counter:month_data-get-month-data" ) + "?station_id={}&month_number={}&year_number={}".format( @@ -149,7 +183,7 @@ def test__get_month_data(api_client, month_datas, station_id, test_timestamp): @pytest.mark.django_db -def test__get_year_datas(api_client, year_datas, station_id, test_timestamp): +def test_get_year_datas(api_client, year_datas, station_id, test_timestamp): end_year_number = test_timestamp.replace(year=test_timestamp.year + 1).year url = reverse( "eco_counter:year_data-get-year-datas" @@ -178,7 +212,7 @@ def test__get_year_datas(api_client, year_datas, station_id, test_timestamp): @pytest.mark.django_db -def test__get_month_datas(api_client, month_datas, station_id, test_timestamp): +def test_get_month_datas(api_client, month_datas, station_id, test_timestamp): url = reverse( "eco_counter:month_data-get-month-datas" ) + "?station_id={}&start_month_number={}&end_month_number={}&year_number={}".format( @@ -194,7 +228,7 @@ def test__get_month_datas(api_client, month_datas, station_id, test_timestamp): @pytest.mark.django_db -def test__year_data(api_client, year_datas): +def test_year_data(api_client, year_datas): url = reverse("eco_counter:year_data-list") response = api_client.get(url) assert response.status_code == 200 @@ -207,7 +241,7 @@ def test__year_data(api_client, year_datas): @pytest.mark.django_db -def test__days(api_client, days, test_timestamp): +def test_days(api_client, days, test_timestamp): url = reverse("eco_counter:days-list") response = api_client.get(url) assert response.status_code == 200 @@ -222,7 +256,7 @@ def test__days(api_client, days, test_timestamp): @pytest.mark.django_db -def test__weeks(api_client, weeks, test_timestamp): +def test_weeks(api_client, weeks, test_timestamp): url = reverse("eco_counter:weeks-list") response = api_client.get(url) assert response.status_code == 200 @@ -236,7 +270,7 @@ def test__weeks(api_client, weeks, test_timestamp): @pytest.mark.django_db -def test__months(api_client, months, test_timestamp): +def test_months(api_client, months, test_timestamp): url = reverse("eco_counter:months-list") response = api_client.get(url) assert response.status_code == 200 @@ -252,7 +286,7 @@ def test__months(api_client, months, test_timestamp): @pytest.mark.django_db -def test__months_multiple_years(api_client, years, test_timestamp): +def test_months_multiple_years(api_client, years, test_timestamp): url = reverse("eco_counter:years-list") response = api_client.get(url) assert response.status_code == 200 @@ -266,13 +300,14 @@ def test__months_multiple_years(api_client, years, test_timestamp): @pytest.mark.django_db -def test__station(api_client, stations, year_datas): +def test_station(api_client, stations, year_datas, day_datas): url = reverse("eco_counter:stations-list") response = api_client.get(url) assert response.status_code == 200 assert response.json()["results"][0]["name"] == TEST_EC_STATION_NAME assert response.json()["results"][0]["sensor_types"] == ["at"] - assert response.json()["results"][0]["data_from_year"] == TEST_TIMESTAMP.year + assert response.json()["results"][0]["data_until_date"] == "2020-01-07" + assert response.json()["results"][0]["data_from_date"] == "2020-01-01" # Test retrieving station by data type url = reverse("eco_counter:stations-list") + "?data_type=a" response = api_client.get(url) diff --git a/eco_counter/tests/test_import_counter_data.py b/eco_counter/tests/test_import_counter_data.py index 0a1856e80..b3c69d02f 100644 --- a/eco_counter/tests/test_import_counter_data.py +++ b/eco_counter/tests/test_import_counter_data.py @@ -7,13 +7,20 @@ imports and calculates the data correctly. """ import calendar +from datetime import timedelta from io import StringIO +from unittest.mock import patch import dateutil.parser import pytest from django.core.management import call_command -from eco_counter.constants import ECO_COUNTER, LAM_COUNTER, TRAFFIC_COUNTER +from eco_counter.constants import ( + ECO_COUNTER, + LAM_COUNTER, + TELRAAM_COUNTER, + TRAFFIC_COUNTER, +) from eco_counter.models import ( Day, DayData, @@ -27,21 +34,9 @@ Year, YearData, ) +from eco_counter.tests.utils import get_telraam_data_frames_test_fixture -from .constants import ( - ECO_COUNTER_TEST_COLUMN_NAMES, - LAM_COUNTER_TEST_COLUMN_NAMES, - TEST_EC_STATION_NAME, - TEST_LC_STATION_NAME, - TEST_TC_STATION_NAME, - TRAFFIC_COUNTER_TEST_COLUMN_NAMES, -) - -TEST_COLUMN_NAMES = { - ECO_COUNTER: ECO_COUNTER_TEST_COLUMN_NAMES, - TRAFFIC_COUNTER: TRAFFIC_COUNTER_TEST_COLUMN_NAMES, - LAM_COUNTER: LAM_COUNTER_TEST_COLUMN_NAMES, -} +from .constants import TEST_EC_STATION_NAME, TEST_LC_STATION_NAME, TEST_TC_STATION_NAME def import_command(*args, **kwargs): @@ -56,6 +51,114 @@ def import_command(*args, **kwargs): return out.getvalue() +@pytest.mark.django_db +@patch("eco_counter.management.commands.utils.get_telraam_data_frames") +def test_import_telraam(get_telraam_data_frames_mock): + from eco_counter.management.commands.import_counter_data import import_data + + start_time = dateutil.parser.parse("2023-09-01T00:00") + ImportState.objects.create( + current_year_number=start_time.year, + current_month_number=start_time.month, + current_day_number=start_time.day, + csv_data_source=TELRAAM_COUNTER, + ) + num_days_per_location = 2 + get_telraam_data_frames_mock.return_value = get_telraam_data_frames_test_fixture( + start_time, + num_cameras=2, + num_locations=3, + num_days_per_location=num_days_per_location, + ) + import_data([TELRAAM_COUNTER]) + stations_qs = Station.objects.all() + # num_stations * nul_locations, as for every location a station is created + assert stations_qs.count() == 6 + assert DayData.objects.count() == 12 + assert HourData.objects.count() == 12 + assert stations_qs.first().location.wkt == "POINT (2032 2032)" + assert Year.objects.count() == stations_qs.count() + import_state_qs = ImportState.objects.filter(csv_data_source=TELRAAM_COUNTER) + assert import_state_qs.count() == 1 + import_state = import_state_qs.first() + assert import_state.current_year_number == 2023 + assert import_state.current_month_number == 9 + assert import_state.current_day_number == 2 + # 12 + assert Day.objects.count() == stations_qs.count() * num_days_per_location + # Test that duplicates are not created + get_telraam_data_frames_mock.return_value = get_telraam_data_frames_test_fixture( + start_time, + num_cameras=2, + num_locations=3, + num_days_per_location=num_days_per_location, + ) + import_data([TELRAAM_COUNTER]) + assert stations_qs.count() == 6 + assert Year.objects.count() == stations_qs.count() + assert Day.objects.count() == stations_qs.count() * num_days_per_location + assert DayData.objects.count() == 12 + assert HourData.objects.count() == 12 + # Test new locations, adds 2 stations + new_start_time = start_time + timedelta(days=2) + get_telraam_data_frames_mock.return_value = get_telraam_data_frames_test_fixture( + new_start_time, + num_cameras=2, + num_locations=1, + num_days_per_location=num_days_per_location, + ) + import_data([TELRAAM_COUNTER]) + stations_qs = Station.objects.all() + assert stations_qs.count() == 8 + assert Year.objects.count() == stations_qs.count() + assert Day.objects.count() == 16 + # Test adding camera + get_telraam_data_frames_mock.return_value = get_telraam_data_frames_test_fixture( + new_start_time, + num_cameras=3, + num_locations=1, + num_days_per_location=num_days_per_location, + ) + import_data([TELRAAM_COUNTER]) + stations_qs = Station.objects.all() + assert stations_qs.count() == 9 + assert Year.objects.count() == stations_qs.count() + # Test data related to first station + station = Station.objects.filter(station_id="0").first() + year_data = YearData.objects.get(station=station) + assert year_data.value_ak == 24 * num_days_per_location + assert year_data.value_ap == 24 * num_days_per_location + assert year_data.value_at == 24 * num_days_per_location * 2 + assert year_data.value_pk == 24 * num_days_per_location + assert year_data.value_pp == 24 * num_days_per_location + assert year_data.value_pt == 24 * num_days_per_location * 2 + assert MonthData.objects.count() == stations_qs.count() * Year.objects.count() + assert Month.objects.count() == stations_qs.count() * Year.objects.count() + assert ( + MonthData.objects.get(station=station, month=Month.objects.first()).value_at + == 24 * num_days_per_location * 2 + ) + # 1.9.2023 is a friday, 9 stations has data for 1-3.9(week 34) and 3 stations has data for + # 4.5 (week 36) + assert WeekData.objects.count() == 12 + assert Week.objects.count() == 12 + # location*camera = 6 * num_days_per_location + cameras * num_days_per_location + DayData.objects.count() == 6 * 2 + 3 * 2 + Day.objects.count() == 6 * 2 + 3 * 2 + + # Three locations for two cameras + assert Day.objects.filter(date__day=1).count() == 6 + # One location for Three cameras + assert Day.objects.filter(date__day=4).count() == 3 + assert DayData.objects.first().value_at == 48 + assert DayData.objects.first().value_ap == 24 + assert DayData.objects.first().value_ak == 24 + HourData.objects.count() == 18 + for hour_data in HourData.objects.all(): + hour_data.values_ak == [1 for x in range(24)] + hour_data.values_at == [2 for x in range(24)] + + @pytest.mark.test_import_counter_data @pytest.mark.django_db def test_import_eco_counter_data(stations): diff --git a/eco_counter/tests/utils.py b/eco_counter/tests/utils.py new file mode 100644 index 000000000..55946dd38 --- /dev/null +++ b/eco_counter/tests/utils.py @@ -0,0 +1,49 @@ +from datetime import timedelta + +import pandas as pd +from django.contrib.gis.geos import GEOSGeometry + +from eco_counter.management.commands.utils import ( + gen_eco_counter_test_csv, + TelraamStation, +) + + +def get_telraam_data_frames_test_fixture( + from_date, + num_cameras=1, + num_locations=2, + num_days_per_location=2, +): + def get_location_and_geometry(i): + location = GEOSGeometry(f"POINT({i} {i})") + geometry = GEOSGeometry( + f"MULTILINESTRING (({i} {i}, 1 1), (1 1, 2 2), (2 2, 3 3))" + ) + return location, geometry + + if num_locations <= 0 or num_cameras <= 0 or num_days_per_location <= 0: + raise ValueError( + "'num_locations', 'num_cameras' and 'num_days_per_location' must be greated than 0." + ) + + column_types = ["AK", "AP", "PK", "PP"] + data_frames = {} + for c_c in range(num_cameras): + for l_c in range(num_locations): + index = c_c + l_c + from_date.year + from_date.month * from_date.day + location, geometry = get_location_and_geometry(index) + station = TelraamStation(c_c, location, geometry) + data_frames[station] = [] + columns = [f"{c_c} {t}" for t in column_types] + df = pd.DataFrame() + # Generate 'num_days_per_location' days of data for every location + start_date = from_date + for day in range(num_days_per_location): + csv_data = gen_eco_counter_test_csv( + columns, start_date, start_date + timedelta(hours=23), freq="1h" + ) + start_date += timedelta(days=1) + df = pd.concat([df, csv_data]) + data_frames[station].append(df) + return data_frames