Skip to content

Commit

Permalink
Merge pull request #357 from City-of-Turku/develop
Browse files Browse the repository at this point in the history
Production update
  • Loading branch information
juuso-j committed Jun 4, 2024
2 parents fbb49bf + 0e269f4 commit 4f9bb61
Show file tree
Hide file tree
Showing 50 changed files with 1,111 additions and 320 deletions.
146 changes: 89 additions & 57 deletions eco_counter/management/commands/import_counter_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,72 +387,100 @@ def save_telraam_data(start_time):
)


def handle_initial_import(initial_import_counters):
delete_tables(csv_data_sources=initial_import_counters)
for counter in initial_import_counters:
ImportState.objects.filter(csv_data_source=counter).delete()
ImportState.objects.create(csv_data_source=counter)
logger.info(f"Retrieving stations for {counter}.")
# As Telraam counters are dynamic, create after CSV data is processed
if counter == TELRAAM_COUNTER:
Station.objects.filter(csv_data_source=counter).delete()
def handle_initial_import(counter):
logger.info(f"Deleting tables for: {counter}")
delete_tables(csv_data_sources=[counter])
ImportState.objects.filter(csv_data_source=counter).delete()
import_state = ImportState.objects.create(csv_data_source=counter)
logger.info(f"Retrieving stations for {counter}.")
# As Telraam counters are dynamic, create after CSV data is processed
if counter == TELRAAM_COUNTER:
Station.objects.filter(csv_data_source=counter).delete()
else:
save_stations(counter)
return import_state


def get_start_time(counter, import_state):
if import_state.current_year_number and import_state.current_month_number:
start_time = "{year}-{month}-1T00:00".format(
year=import_state.current_year_number,
month=import_state.current_month_number,
)
else:
start_month = (
TELRAAM_COUNTER_START_MONTH if counter == TELRAAM_COUNTER else "01"
)
start_time = f"{COUNTER_START_YEARS[counter]}-{start_month}-01"

start_time = dateutil.parser.parse(start_time)
start_time = TIMEZONE.localize(start_time)
# The timeformat for the input data is : 2020-03-01T00:00
# Convert starting time to input datas timeformat
return start_time


def get_csv_data(counter, import_state, start_time, verbose=True):
match counter:
# case COUNTERS.TELRAAM_COUNTER:
# Telraam counters are handled differently due to their dynamic nature
case COUNTERS.LAM_COUNTER:
csv_data = get_lam_counter_csv(start_time.date())
case COUNTERS.ECO_COUNTER:
csv_data = get_eco_counter_csv()
case COUNTERS.TRAFFIC_COUNTER:
if import_state.current_year_number:
start_year = import_state.current_year_number
else:
start_year = TRAFFIC_COUNTER_START_YEAR
csv_data = get_traffic_counter_csv(start_year=start_year)

start_time_string = start_time.strftime("%Y-%m-%dT%H:%M")
start_index = csv_data.index[
csv_data[INDEX_COLUMN_NAME] == start_time_string
].values[0]
if verbose:
# As LAM data is fetched with a timespan, no index data is available, instead display start_time.
if counter == LAM_COUNTER:
logger.info(f"Starting saving observations at time:{start_time}")
else:
save_stations(counter)
logger.info(f"Starting saving observations at index:{start_index}")

csv_data = csv_data[start_index:]
return csv_data


def import_data(counters):
def import_data(counters, initial_import=False, force=False):
for counter in counters:
logger.info(f"Importing/counting data for {counter}...")
import_state = ImportState.objects.filter(csv_data_source=counter).first()

# Before deleting state and data, check that data is available.
if not force and import_state and initial_import:
start_time = get_start_time(counter, import_state)
csv_data = get_csv_data(counter, import_state, start_time, verbose=False)
if len(csv_data) == 0:
logger.info(
"No data to retrieve, skipping initial import. Use --force to discard."
)
continue

if initial_import:
handle_initial_import(counter)
import_state = ImportState.objects.filter(csv_data_source=counter).first()

if not import_state:
logger.error(
"ImportState instance not found, try importing with the '--init' argument."
)
break
if import_state.current_year_number and import_state.current_month_number:
start_time = "{year}-{month}-1T00:00".format(
year=import_state.current_year_number,
month=import_state.current_month_number,
)
else:
start_month = (
TELRAAM_COUNTER_START_MONTH if counter == TELRAAM_COUNTER else "01"
)
start_time = f"{COUNTER_START_YEARS[counter]}-{start_month}-01"

start_time = dateutil.parser.parse(start_time)
start_time = TIMEZONE.localize(start_time)
# The timeformat for the input data is : 2020-03-01T00:00
# Convert starting time to input datas timeformat
start_time_string = start_time.strftime("%Y-%m-%dT%H:%M")
match counter:
# case COUNTERS.TELRAAM_COUNTER:
# Telraam counters are handled differently due to their dynamic nature
case COUNTERS.LAM_COUNTER:
csv_data = get_lam_counter_csv(start_time.date())
case COUNTERS.ECO_COUNTER:
csv_data = get_eco_counter_csv()
case COUNTERS.TRAFFIC_COUNTER:
if import_state.current_year_number:
start_year = import_state.current_year_number
else:
start_year = TRAFFIC_COUNTER_START_YEAR
csv_data = get_traffic_counter_csv(start_year=start_year)

start_time = get_start_time(counter, import_state)

if counter == TELRAAM_COUNTER:
save_telraam_data(start_time)
else:
start_index = csv_data.index[
csv_data[INDEX_COLUMN_NAME] == start_time_string
].values[0]
# As LAM data is fetched with a timespan, no index data is available, instead
# show time.
if counter == LAM_COUNTER:
logger.info(f"Starting saving observations at time:{start_time}")
else:
logger.info(f"Starting saving observations at index:{start_index}")

csv_data = csv_data[start_index:]
csv_data = get_csv_data(counter, import_state, start_time)
save_observations(
csv_data,
start_time,
Expand All @@ -464,7 +492,6 @@ def import_data(counters):


def add_additional_data_to_stations(csv_data_source):

logger.info(f"Updating {csv_data_source} stations informations...")
for station in Station.objects.filter(csv_data_source=csv_data_source):
station.data_from_date = get_data_from_date(station)
Expand Down Expand Up @@ -500,20 +527,25 @@ def add_arguments(self, parser):
default=False,
help=f"Import specific counter(s) data, choices are: {COUNTER_CHOICES_STR}.",
)
parser.add_argument(
"--force",
action="store_true",
help="Force the initial import and discard data check",
)

def handle(self, *args, **options):
initial_import_counters = None
start_time = None
initial_import = False
force = options.get("force", False)
if options["initial_import"]:
if len(options["initial_import"]) == 0:
raise CommandError(
f"Specify the counter(s), choices are: {COUNTER_CHOICES_STR}."
)
else:
initial_import_counters = options["initial_import"]
check_counters_argument(initial_import_counters)
logger.info(f"Deleting tables for: {initial_import_counters}")
handle_initial_import(initial_import_counters)
initial_import = True

if options["test_counter"]:
logger.info("Testing eco_counter importer.")
Expand All @@ -536,7 +568,7 @@ def handle(self, *args, **options):
if not initial_import_counters:
# run with counters argument
counters = options["counters"]
check_counters_argument(counters)
else:
counters = initial_import_counters
import_data(counters)
check_counters_argument(counters)
import_data(counters, initial_import, force)
5 changes: 5 additions & 0 deletions eco_counter/management/commands/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ def get_traffic_counter_csv(start_year=2015):

def get_lam_dataframe(csv_url):
response = requests.get(csv_url, headers=LAM_STATION_USER_HEADER)
assert (
response.status_code == 200
), "Fetching LAM data from {} , status code {}".format(
settings.ECO_COUNTER_STATIONS_URL, response.status_code
)
string_data = response.content
csv_data = pd.read_csv(io.StringIO(string_data.decode("utf-8")), delimiter=";")
return csv_data
Expand Down
5 changes: 5 additions & 0 deletions eco_counter/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ def initial_import_counter_data(args, name="initial_import_counter_data"):
management.call_command("import_counter_data", "--init", args)


@shared_task_email
def force_initial_import_counter_data(args, name="force_initial_import_counter_data"):
management.call_command("import_counter_data", "--force", "--init", args)


@shared_task_email
def delete_counter_data(args, name="delete_counter_data"):
management.call_command("delete_counter_data", "--counters", args)
Expand Down
1 change: 1 addition & 0 deletions eco_counter/tests/test_import_counter_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
The main purpose of these tests are to verify that the importer
imports and calculates the data correctly.
"""

import calendar
from datetime import datetime, timedelta
from io import StringIO
Expand Down
Loading

0 comments on commit 4f9bb61

Please sign in to comment.