diff --git a/co2calculator/calculate.py b/co2calculator/calculate.py index 89bc89b..82d660d 100644 --- a/co2calculator/calculate.py +++ b/co2calculator/calculate.py @@ -1,13 +1,8 @@ #!/usr/bin/env python # coding: utf-8 """Functions to calculate co2 emissions""" - import warnings -from pathlib import Path from typing import Tuple - -import pandas as pd - from ._types import Kilogram, Kilometer from .constants import ( KWH_TO_TJ, @@ -22,8 +17,14 @@ HeatingFuel, Unit, TransportationMode, + RangeCategory ) -from .distances import create_distance_request, get_distance, range_categories +from .distances import create_distance_request, get_distance, DistanceRequest, StructuredLocation, range_categories +from .parameters import CarEmissionParameters, BusEmissionParameters, TrainEmissionParameters, PlaneEmissionParameters, \ + MotorbikeEmissionParameters, TramEmissionParameters +from .reader import Reader +from .enums import * + script_path = str(Path(__file__).parent) emission_factor_df = pd.read_csv(f"{script_path}/../data/emission_factors.csv") @@ -35,6 +36,8 @@ f"{script_path}/../data/conversion_factors_heating.csv" ) +reader = Reader() + def calc_co2_car( distance: Kilometer, @@ -59,29 +62,36 @@ def calc_co2_car( :return: Total emissions of trip in co2 equivalents :rtype: Kilogram """ - - transport_mode = TransportationMode.CAR + # NOTE: Tests fail for 'cng' as `fuel_type` (IndexError) + transport_mode = TransportationMode.Car # Set default values + # params = {} if passengers is None: - passengers = 1 - warnings.warn( - f"Number of car passengers was not provided. Using default value: '{passengers}'" - ) - if size is None: - size = Size.AVERAGE - warnings.warn(f"Size of car was not provided. Using default value: '{size}'") - if fuel_type is None: - fuel_type = CarBusFuel.AVERAGE - warnings.warn( - f"Car fuel type was not provided. Using default value: '{fuel_type}'" - ) + passengers = 1 + #warnings.warn( + # f"Number of car passengers was not provided. Using default value: '{passengers}'" + #) + # if size is not None: + # params["size"] = size + # #warnings.warn(f"Size of car was not provided. Using default value: '{size}'") + # if fuel_type is not None: + # params["fuel_type"] = fuel_type + # #fuel_type = CarBusFuel.AVERAGE + # #warnings.warn( + # # f"Car fuel type was not provided. Using default value: '{fuel_type}'" + # #) + + params = locals() + params = {k: v for k, v in params.items() if v is not None} + params = CarEmissionParameters(**params) + co2e = reader.get_emission_factor(params.dict()) + #co2e = get_emission_factor( + # "transport", transport_mode, size=size, fuel_type=fuel_type + #) # Get the co2 factor, calculate and return - co2e = get_emission_factor( - "transport", transport_mode, size=size, fuel_type=fuel_type - ) - emissions = distance * co2e / passengers + emissions = distance * co2e / int(passengers) return emissions @@ -99,16 +109,19 @@ def calc_co2_motorbike(distance: Kilometer = None, size: str = None) -> Kilogram :rtype: Kilogram """ - transport_mode = TransportationMode.MOTORBIKE + transport_mode = TransportationMode.Motorbike # Set default values - if size is None: - size = Size.AVERAGE - warnings.warn( - f"Size of motorbike was not provided. Using default value: '{size}'" - ) - - co2e = get_emission_factor("transport", transport_mode, size=size) + #if size is None: + # size = Size.AVERAGE + #warnings.warn( + # f"Size of motorbike was not provided. Using default value: '{size}'" + #) + params = locals() + params = {k: v for k, v in params.items() if v is not None} + params = MotorbikeEmissionParameters(**params) + co2e = reader.get_emission_factor(params.dict()) + #co2e = get_emission_factor("transport", transport_mode, size=size) emissions = distance * co2e return emissions @@ -118,8 +131,8 @@ def calc_co2_bus( distance: Kilometer, size: str = None, fuel_type: str = None, - occupancy: int = None, - vehicle_range: str = None, + occupancy: float = None, + range: str = None, ) -> Kilogram: """ Function to compute the emissions of a bus trip. @@ -127,53 +140,19 @@ def calc_co2_bus( :param size: size class of the bus; ["medium", "large", "average"] :param fuel_type: type of fuel the bus is using; ["diesel", "cng", "hydrogen"] :param occupancy: number of people on the bus [20, 50, 80, 100] - :param vehicle_range: range/haul of the vehicle ["local", "long-distance"] + :param range: range/haul of the vehicle ["local", "long-distance"] :type distance: Kilometer :type size: str :type fuel_type: str :type occupancy: int - :type vehicle_range: str + :type range: str :return: Total emissions of trip in co2 equivalents :rtype: Kilogram """ - - transport_mode = TransportationMode.BUS - - # Set default values - if size is None: - size = Size.AVERAGE - warnings.warn(f"Size of bus was not provided. Using default value: '{size}'") - if fuel_type is None: - fuel_type = CarBusFuel.DIESEL - warnings.warn( - f"Bus fuel type was not provided. Using default value: '{fuel_type}'" - ) - elif fuel_type in [CarBusFuel.CNG, CarBusFuel.HYDROGEN]: - occupancy = -99 - size = Size.AVERAGE - elif fuel_type not in [CarBusFuel.DIESEL, CarBusFuel.CNG, CarBusFuel.HYDROGEN]: - warnings.warn( - f"Bus fuel type {fuel_type} not available. Using default value: 'diesel'" - ) - fuel_type = "diesel" - if occupancy is None: - occupancy = 50 - warnings.warn(f"Occupancy was not provided. Using default value: '{occupancy}'") - if vehicle_range is None: - vehicle_range = BusTrainRange.LONG_DISTANCE - warnings.warn( - f"Intended range of trip was not provided. Using default value: '{vehicle_range}'" - ) - - # Get co2 factor, calculate and return - co2e = get_emission_factor( - "transport", - transport_mode, - size=size, - fuel_type=fuel_type, - occupancy=occupancy, - range_cat=vehicle_range, - ) + params = locals() + params = {k: v for k, v in params.items() if v is not None} + params = BusEmissionParameters(**params) + co2e = reader.get_emission_factor(params.dict()) emissions = distance * co2e return emissions @@ -182,49 +161,54 @@ def calc_co2_bus( def calc_co2_train( distance: Kilometer, fuel_type: str = None, - vehicle_range: str = None, + range: str = None, ) -> Kilogram: """ Function to compute the emissions of a train trip. :param distance: Distance travelled by train; :param fuel_type: type of fuel the train is using; ["diesel", "electric", "average"] - :param vehicle_range: range/haul of the vehicle ["local", "long-distance"] + :param range: range/haul of the vehicle ["local", "long-distance"] :type distance: Kilometer :type fuel_type: float - :type vehicle_range: str + :type range: str :return: Total emissions of trip in co2 equivalents :rtype: Kilogram """ - - transport_mode = TransportationMode.TRAIN - size = Size.AVERAGE - # Set default values - if fuel_type is None: - fuel_type = TrainFuel.AVERAGE - warnings.warn( - f"Train fuel type was not provided. Using default value: '{fuel_type}'" - ) - if vehicle_range is None: - vehicle_range = BusTrainRange.LONG_DISTANCE - warnings.warn( - f"Intended range of trip was not provided. Using default value: '{vehicle_range}'" - ) - - # Get the co2 factor, calculate and return - co2e = get_emission_factor( - "transport", - transport_mode, - fuel_type=fuel_type, - range_cat=vehicle_range, - size=size, - ) + #if fuel_type is None: + params = locals() + params = {k: v for k, v in params.items() if v is not None} + params = TrainEmissionParameters(**params) + co2e = reader.get_emission_factor(params.dict()) emissions = distance * co2e + return emissions +def calc_co2_tram( + distance: Kilometer, + fuel_type: str = None, + size: str = None +) -> Kilogram: + """ + Function to compute the emissions of a train trip. + :param distance: Distance travelled by train; + :param fuel_type: type of fuel the train is using; ["diesel", "electric", "average"] + :param range: range/haul of the vehicle ["local", "long-distance"] + :type distance: Kilometer + :type fuel_type: float + :type range: str + :return: Total emissions of trip in co2 equivalents + :rtype: Kilogram + """ + # Set default values + params = locals() + params = {k: v for k, v in params.items() if v is not None} + params = TramEmissionParameters(**params) + co2e = reader.get_emission_factor(params.dict()) + emissions = distance * co2e return emissions -def calc_co2_plane(distance: Kilometer, seating_class: str = None) -> Kilogram: +def calc_co2_plane(distance: Kilometer, seating: str = None) -> Kilogram: """ Function to compute emissions of a plane trip :param distance: Distance of plane flight @@ -237,46 +221,22 @@ def calc_co2_plane(distance: Kilometer, seating_class: str = None) -> Kilogram: :return: Total emissions of flight in co2 equivalents :rtype: Kilogram """ - - transport_mode = TransportationMode.PLANE - fuel_type = "kerosine" - - # Set defaults - if seating_class is None: - seating_class = FlightClass.AVERAGE - warnings.warn( - f"Seating class was not provided. Using default value: '{seating_class}'" - ) + params = locals() + params = {k: v for k, v in params.items() if v is not None} # Retrieve whether distance is below or above 1500 km if distance <= 1500: - flight_range = FlightRange.SHORT_HAUL - elif distance > 1500: - flight_range = FlightRange.LONG_HAUL - # NOTE: Should be checked before geocoding and haversine calculation - seating_choices = [item for item in FlightClass] - - if seating_class not in seating_choices: - raise ValueError( - f"No emission factor available for the specified seating class '{seating_class}'.\n" - f"Please use one of the following: {seating_choices}" - ) - - # Get emission factor - co2e = get_emission_factor( - "transport", - transport_mode, - range_cat=flight_range, - seating_class=seating_class, - fuel_type=fuel_type, - ) - # multiply emission factor with distance + params["range"] = PlaneRange.Short_haul + else: + params["range"] = PlaneRange.Long_haul + params = PlaneEmissionParameters(**params) + co2e = reader.get_emission_factor(params.dict()) emissions = distance * co2e return emissions -def calc_co2_ferry(distance: Kilometer, seating_class: str = None) -> Kilogram: +def calc_co2_ferry(distance: Kilometer, seating: str = None) -> Kilogram: """ Function to compute emissions of a ferry trip :param distance: Distance of ferry trip @@ -287,16 +247,16 @@ def calc_co2_ferry(distance: Kilometer, seating_class: str = None) -> Kilogram: :rtype: Kilogram """ - transport_mode = TransportationMode.FERRY + transport_mode = TransportationMode.Ferry - if seating_class is None: - seating_class = FerryClass.AVERAGE - warnings.warn( - f"Seating class was not provided. Using default value: '{seating_class}'" - ) + if seating is None: + seating = FerryClass.AVERAGE + #warnings.warn( + # f"Seating class was not provided. Using default value: '{seating_class}'" + #) # get emission factor - co2e = get_emission_factor("transport", transport_mode, seating_class=seating_class) + co2e = get_emission_factor("transport", transport_mode, seating_class=seating) # multiply emission factor with distance emissions = distance * co2e @@ -386,7 +346,7 @@ def calc_co2_businesstrip( distance: Kilometer = None, size: str = None, fuel_type: str = None, - occupancy: int = None, + occupancy: float = None, seating: str = None, passengers: int = None, roundtrip: bool = False, @@ -431,10 +391,15 @@ def calc_co2_businesstrip( # - If stop-based, calculate distance first, then continue only distance-based if not distance: - request = create_distance_request(start, destination, transportation_mode) + request = DistanceRequest( + transportation_mode=transportation_mode, + start=StructuredLocation(**start), + destination=StructuredLocation(**destination), + ) + #request = create_distance_request(start, destination, transportation_mode) distance = get_distance(request) - if transportation_mode == TransportationMode.CAR: + if transportation_mode == TransportationMode.Car: emissions = calc_co2_car( distance=distance, passengers=passengers, @@ -442,27 +407,27 @@ def calc_co2_businesstrip( fuel_type=fuel_type, ) - elif transportation_mode == TransportationMode.BUS: + elif transportation_mode == TransportationMode.Bus: emissions = calc_co2_bus( distance=distance, size=size, fuel_type=fuel_type, occupancy=occupancy, - vehicle_range="long-distance", + range=TrainRange.Long_distance, ) - elif transportation_mode == TransportationMode.TRAIN: + elif transportation_mode == TransportationMode.Train: emissions = calc_co2_train( distance=distance, fuel_type=fuel_type, - vehicle_range="long-distance", + range=TrainRange.Long_distance, ) - elif transportation_mode == TransportationMode.PLANE: - emissions = calc_co2_plane(distance, seating_class=seating) + elif transportation_mode == TransportationMode.Plane: + emissions = calc_co2_plane(distance, seating=seating) - elif transportation_mode == TransportationMode.FERRY: - emissions = calc_co2_ferry(distance, seating_class=seating) + elif transportation_mode == TransportationMode.Ferry: + emissions = calc_co2_ferry(distance, seating=seating) else: raise ValueError( @@ -508,14 +473,14 @@ def get_emission_factor( co2e = emission_factor_df[ (emission_factor_df["category"] == category) & (emission_factor_df["subcategory"] == mode) - & (emission_factor_df["size_class"] == size) + & (emission_factor_df["size"] == size) & (emission_factor_df["fuel_type"] == fuel_type) & (emission_factor_df["occupancy"] == occupancy) & (emission_factor_df["range"] == range_cat) & (emission_factor_df["seating"] == seating_class) ]["co2e"].values[0] except IndexError: - if mode == TransportationMode.PLANE: + if mode == TransportationMode.Plane: default_seating = FlightClass.AVERAGE warnings.warn( f"Seating class '{seating_class}' not available for {range_cat} flights. Switching to " @@ -524,7 +489,7 @@ def get_emission_factor( co2e = emission_factor_df[ (emission_factor_df["category"] == category) & (emission_factor_df["subcategory"] == mode) - & (emission_factor_df["size_class"] == size) + & (emission_factor_df["size"] == size) & (emission_factor_df["fuel_type"] == fuel_type) & (emission_factor_df["occupancy"] == occupancy) & (emission_factor_df["range"] == range_cat) @@ -544,7 +509,7 @@ def get_emission_factor( co2e = emission_factor_df[ (emission_factor_df["category"] == category) & (emission_factor_df["subcategory"] == mode) - & (emission_factor_df["size_class"] == default_size) + & (emission_factor_df["size"] == default_size) & (emission_factor_df["fuel_type"] == fuel_type) & (emission_factor_df["occupancy"] == occupancy) & (emission_factor_df["range"] == range_cat) @@ -608,7 +573,7 @@ def calc_co2_commuting( """ # get weekly co2e for respective mode of transport - if transportation_mode == TransportationMode.CAR: + if transportation_mode == TransportationMode.Car: weekly_co2e = calc_co2_car( passengers=passengers, size=size, @@ -616,35 +581,31 @@ def calc_co2_commuting( distance=weekly_distance, ) - elif transportation_mode == TransportationMode.MOTORBIKE: + elif transportation_mode == TransportationMode.Motorbike: weekly_co2e = calc_co2_motorbike(size=size, distance=weekly_distance) - elif transportation_mode == TransportationMode.BUS: + elif transportation_mode == TransportationMode.Bus: weekly_co2e = calc_co2_bus( size=size, fuel_type=fuel_type, occupancy=occupancy, - vehicle_range="local", + range="local", distance=weekly_distance, ) - elif transportation_mode == TransportationMode.TRAIN: + elif transportation_mode == TransportationMode.Train: weekly_co2e = calc_co2_train( - fuel_type=fuel_type, vehicle_range="local", distance=weekly_distance + fuel_type=fuel_type, range="local", distance=weekly_distance ) - elif transportation_mode in [ - TransportationMode.PEDELEC, - TransportationMode.BICYCLE, + TransportationMode.Pedelec, + TransportationMode.Bicycle, ]: co2e = get_emission_factor("transport", transportation_mode) weekly_co2e = co2e * weekly_distance - elif transportation_mode == TransportationMode.TRAM: - fuel_type = CarBusFuel.ELECTRIC - size = Size.AVERAGE - co2e = get_emission_factor( - "transport", transportation_mode, fuel_type=fuel_type, size=size + elif transportation_mode == TransportationMode.Tram: + weekly_co2e = calc_co2_tram( + fuel_type=fuel_type, size= size, distance=weekly_distance ) - weekly_co2e = co2e * weekly_distance else: raise ValueError( f"Transportation mode {transportation_mode} not found in database" diff --git a/co2calculator/constants.py b/co2calculator/constants.py index 6ec5090..e24a5bf 100644 --- a/co2calculator/constants.py +++ b/co2calculator/constants.py @@ -53,6 +53,18 @@ class CarBusFuel(str, enum.Enum): HYDROGEN = "hydrogen" +@enum.unique +class BusFuel(str, enum.Enum): + """Enum for bus fuel types""" + + ELECTRIC = "electric" + DIESEL = "diesel" + AVERAGE = "average" + HYDROGEN = "hydrogen" + CNG = "cng" + + + @enum.unique class Size(str, enum.Enum): """Enum for car sizes""" diff --git a/co2calculator/distances.py b/co2calculator/distances.py index 785d66b..427b1ff 100644 --- a/co2calculator/distances.py +++ b/co2calculator/distances.py @@ -11,16 +11,18 @@ import numpy as np import openrouteservice import pandas as pd +import pydantic from dotenv import load_dotenv from openrouteservice.directions import directions from openrouteservice.geocode import pelias_search, pelias_structured from pydantic import BaseModel, ValidationError, Extra, confloat from thefuzz import fuzz from thefuzz import process +from iso3166 import countries from ._types import Kilometer +from .enums import TransportationMode from .constants import ( - TransportationMode, CountryCode2, CountryCode3, CountryName, @@ -39,7 +41,7 @@ script_path = str(Path(__file__).parent) -class StructuredLocation(BaseModel, extra=Extra.forbid): +class StructuredLocation(BaseModel, extra=Extra.ignore): address: Optional[str] locality: str country: Union[CountryCode2, CountryCode3, CountryName] @@ -50,9 +52,19 @@ class StructuredLocation(BaseModel, extra=Extra.forbid): neighbourhood: Optional[str] -class TrainStation(BaseModel): +class TrainStation(BaseModel, extra=Extra.ignore): station_name: str - country: CountryCode2 + country_code: CountryCode2 = None + country: CountryName = None + + @pydantic.root_validator(pre=False) + def get_country_code(cls, values): + if values["country_code"] is None: + try: + values["country_code"] = countries.get(values["country"]).alpha2 + except KeyError as e: + raise ValueError(f"Invalid country: {values['country']}.") + return values class Airport(BaseModel): @@ -305,7 +317,7 @@ def geocoding_train_stations(loc_dict): # remove stations with no coordinates stations_df.dropna(subset=["latitude", "longitude"], inplace=True) countries_eu = stations_df["country"].unique() - country_code = station.country + country_code = station.country_code if country_code not in countries_eu: warnings.warn( "The provided country is not within Europe. " @@ -493,10 +505,10 @@ def create_distance_request( try: if transportation_mode in [ - TransportationMode.CAR, - TransportationMode.MOTORBIKE, - TransportationMode.BUS, - TransportationMode.FERRY, + TransportationMode.Car, + TransportationMode.Motorbike, + TransportationMode.Bus, + TransportationMode.Ferry, ]: return DistanceRequest( transportation_mode=transportation_mode, @@ -504,14 +516,14 @@ def create_distance_request( destination=StructuredLocation(**destination), ) - if transportation_mode in [TransportationMode.TRAIN]: + if transportation_mode in [TransportationMode.Train]: return DistanceRequest( transportation_mode=transportation_mode, start=TrainStation(**start), destination=TrainStation(**destination), ) - if transportation_mode in [TransportationMode.PLANE]: + if transportation_mode in [TransportationMode.Plane]: return DistanceRequest( transportation_mode=transportation_mode, start=Airport(iata_code=start), @@ -519,8 +531,8 @@ def create_distance_request( ) except ValidationError as e: - raise InvalidSpatialInput(e) - + #raise InvalidSpatialInput(e) + raise InvalidSpatialInput(f"unknown transportation_mode: '{transportation_mode}'") raise InvalidSpatialInput(f"unknown transportation_mode: '{transportation_mode}'") @@ -536,17 +548,17 @@ def get_distance(request: DistanceRequest) -> Kilometer: """ detour_map = { - TransportationMode.CAR: False, - TransportationMode.MOTORBIKE: False, - TransportationMode.BUS: True, - TransportationMode.TRAIN: True, - TransportationMode.PLANE: True, - TransportationMode.FERRY: False, + TransportationMode.Car: False, + TransportationMode.Motorbike: False, + TransportationMode.Bus: True, + TransportationMode.Train: True, + TransportationMode.Plane: True, + TransportationMode.Ferry: True, } if request.transportation_mode in [ - TransportationMode.CAR, - TransportationMode.MOTORBIKE, + TransportationMode.Car, + TransportationMode.Motorbike, ]: coords = [] for loc in [request.start, request.destination]: @@ -554,7 +566,7 @@ def get_distance(request: DistanceRequest) -> Kilometer: coords.append(loc_coords) return get_route(coords, "driving-car") - if request.transportation_mode == TransportationMode.BUS: + if request.transportation_mode == TransportationMode.Bus: # Same as car (StructuredLocation) # TODO: Validate with BaseModel # TODO: Question: Why are we not calculating the bus trip like `driving-car` routes? @@ -570,7 +582,7 @@ def get_distance(request: DistanceRequest) -> Kilometer: ) return _apply_detour(distance, request.transportation_mode) - if request.transportation_mode == TransportationMode.TRAIN: + if request.transportation_mode == TransportationMode.Train: distance = 0 coords = [] @@ -591,17 +603,22 @@ def get_distance(request: DistanceRequest) -> Kilometer: ) return _apply_detour(distance, request.transportation_mode) - if request.transportation_mode == TransportationMode.PLANE: + if request.transportation_mode == TransportationMode.Plane: # Stops are IATA code of airports # TODO: Validate stops with BaseModel _, geom_start, _ = geocoding_airport(request.start.iata_code) _, geom_dest, _ = geocoding_airport(request.destination.iata_code) + #_, _, geom_start, _ = geocoding_structured(request.start.dict()) + #_, _, geom_dest, _ = geocoding_structured(request.destination.dict()) distance = haversine(geom_start[1], geom_start[0], geom_dest[1], geom_dest[0]) return _apply_detour(distance, request.transportation_mode) - if request.transportation_mode == TransportationMode.FERRY: + if request.transportation_mode == TransportationMode.Ferry: + # todo: Do we have a way of checking if there even exists a ferry connection between the given cities (or if the + # cities even have a port? + _, _, geom_start, _ = geocoding_structured(request.start.dict()) _, _, geom_dest, _ = geocoding_structured(request.destination.dict()) diff --git a/co2calculator/enums.py b/co2calculator/enums.py new file mode 100644 index 0000000..cfe0632 --- /dev/null +++ b/co2calculator/enums.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +"""Enums holding parameter choices derived from data set""" + +__author__ = "Christina Ludwig, GIScience Research Group, Heidelberg University" +__email__ = "christina.ludwig@uni-heidelberg.de" + +from enum import Enum +import pandas as pd +from pathlib import Path +import numpy as np + +script_path = str(Path(__file__).parent) + + +class EnumCreator: + + def __init__(self): + self.df = pd.read_csv(f"{script_path}/../data/emission_factors.csv") + + def create_enum(self, mode, column, name): + df = self.df[self.df["subcategory"] == mode] + if df[column].dtype == np.float64: + data = df[column].map(lambda x: "c_{:.0f}".format(x)).astype("str") + return Enum(name, {x: float(x[2:]) for x in data}, type=str) + else: + data = df[column].fillna("nan").unique() + return Enum(name, {x.capitalize().replace("-", "_"): x for x in data}, type=str) + + def create_enum_transport(self, column, name): + data = self.df[column].fillna("nan").astype("str").unique() + return Enum(name, {x.capitalize(): x for x in data}, type=str) + + +enum_creator = EnumCreator() +TransportationMode = enum_creator.create_enum_transport("subcategory", "TransportationMode") + +# Fuel types ------------------------------------------------------ + +TrainFuelType = enum_creator.create_enum(TransportationMode.Train, + "fuel_type", + "TrainFuelType") +CarFuelType = enum_creator.create_enum(TransportationMode.Car, + "fuel_type", + "CarFuelType") +BusFuelType = enum_creator.create_enum(TransportationMode.Bus, + "fuel_type", + "BusFuelType") +TramFuelType = enum_creator.create_enum(TransportationMode.Tram, + "fuel_type", "TramFuelType") +PlaneFuelType = enum_creator.create_enum(TransportationMode.Plane, + "fuel_type", + "PlaneFuelType") +FerryFuelType = enum_creator.create_enum(TransportationMode.Ferry, + "fuel_type", + "FerryFuelType") +BicycleFuelType = enum_creator.create_enum(TransportationMode.Bicycle, + "fuel_type", + "BicycleFuelType") +PedelecFuelType = enum_creator.create_enum(TransportationMode.Pedelec, + "fuel_type", + "PedelecFuelType") + +# Size +TrainSize = enum_creator.create_enum(TransportationMode.Train, + "size", + "TrainSize") +CarSize = enum_creator.create_enum(TransportationMode.Car, + "size", + "CarSize") +BusSize = enum_creator.create_enum(TransportationMode.Bus, + "size", + "BusSize") +MotorbikeSize = enum_creator.create_enum(TransportationMode.Motorbike, + "size", + "MotorbikeSize") +TramSize = enum_creator.create_enum(TransportationMode.Tram, + "size", + "TramSize") + +# Vehicle range +TrainRange = enum_creator.create_enum(TransportationMode.Train, + "range", + "TrainRange") +BusRange = enum_creator.create_enum(TransportationMode.Bus, + "range", + "BusRange") + +PlaneRange = enum_creator.create_enum(TransportationMode.Plane, + "range", + "PlaneRange") + +# Seating class +PlaneSeatingClass = enum_creator.create_enum(TransportationMode.Plane, + "seating", + "PlaneSeatingClass") +FerrySeatingClass = enum_creator.create_enum(TransportationMode.Ferry, + "seating", + "FerrySeatingClass") + +# Occupancy +TrainOccupancy = enum_creator.create_enum(TransportationMode.Train, + "occupancy", + "TrainOccupancy") +BusOccupancy = enum_creator.create_enum( TransportationMode.Bus, + "occupancy", + "BusOccupancy") \ No newline at end of file diff --git a/co2calculator/exceptions.py b/co2calculator/exceptions.py new file mode 100644 index 0000000..a81e995 --- /dev/null +++ b/co2calculator/exceptions.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +"""__description__""" + +__author__ = "Christina Ludwig, GIScience Research Group, Heidelberg University" +__email__ = "christina.ludwig@uni-heidelberg.de" + + +class ConversionFactorNotFound(Exception): + + def __init__(self, message): + self.message = message + diff --git a/co2calculator/parameters.py b/co2calculator/parameters.py new file mode 100644 index 0000000..6178ce7 --- /dev/null +++ b/co2calculator/parameters.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +"""__description__""" + +from pydantic import BaseModel, validator, root_validator +from .enums import * +from typing import Union + + +class TrainEmissionParameters(BaseModel): + + subcategory: TransportationMode = TransportationMode.Train + fuel_type: Union[TrainFuelType, str] = TrainFuelType.Average + range: Union[TrainRange, str] = TrainRange.Long_distance + size: Union[TrainSize, str] = TrainSize.Average + + @validator("fuel_type", allow_reuse=True) + def check_fueltype(cls, v): + v = v.lower() if isinstance(v, str) else v + return TrainFuelType(v) + + @validator("range", allow_reuse=True) + def check_range(cls, v): + v = v.lower() if isinstance(v, str) else v + return TrainRange(v) + + @validator("size", allow_reuse=True) + def check_size(cls, v): + v = v.lower() if isinstance(v, str) else v + return TrainSize(v) + + + +class TramEmissionParameters(BaseModel): + + subcategory: TransportationMode = TransportationMode.Tram + fuel_type: Union[TramFuelType, str] = TramFuelType.Electric + size: Union[TramSize, str] = TramSize.Average + + @validator("fuel_type", allow_reuse=True) + def check_fueltype(cls, v): + v = v.lower() if isinstance(v, str) else v + return TramFuelType(v) + + @validator("size", allow_reuse=True) + def check_size(cls, v): + v = v.lower() if isinstance(v, str) else v + return TramSize(v) + + +class CarEmissionParameters(BaseModel): + + subcategory: TransportationMode = TransportationMode.Car + fuel_type: Union[CarFuelType, str] = CarFuelType.Average + size: Union[CarSize, str] = CarSize.Average + + @validator("fuel_type", allow_reuse=True) + def check_fueltype(cls, v): + v = v.lower() if isinstance(v, str) else v + return CarFuelType(v) + + @validator("size", allow_reuse=True) + def check_size(cls, v, values): + v = v.lower() if isinstance(v, str) else v + return CarSize(v) + + +class PlaneEmissionParameters(BaseModel): + + subcategory: TransportationMode = TransportationMode.Plane + seating: Union[PlaneSeatingClass, str] = PlaneSeatingClass.Average + range: Union[PlaneRange, str] + + @validator("range") + def check_range(cls, v): + v = v.lower() if isinstance(v, str) else v + return PlaneRange(v) + + @validator("seating") + def check_seating(cls, v): + v = v.lower() if isinstance(v, str) else v + return PlaneSeatingClass(v) + + +class BusEmissionParameters(BaseModel): + + subcategory: TransportationMode = TransportationMode.Bus + fuel_type: Union[BusFuelType, str] = BusFuelType.Diesel + size: Union[BusSize, str] = BusSize.Average + occupancy: Union[BusOccupancy, str] = None + range: Union[BusRange, str] = BusRange.Long_distance + + @validator("fuel_type", allow_reuse=True) + def check_fueltype(cls, v): + if isinstance(v, str): + return BusFuelType(v.lower()) + else: + return v + + @validator("size", allow_reuse=True) + def check_size(cls, v): + v = v.lower() if isinstance(v, str) else v + return BusSize(v) + + @validator("range", allow_reuse=True) + def check_range(cls, v): + v = v.lower() if isinstance(v, str) else v + return BusRange(v) + + @validator("occupancy", allow_reuse=True) + def check_occupancy(cls, v): + v = v.lower() if isinstance(v, str) else v + return BusOccupancy(v) + + @root_validator(pre=False) + def check_occupancy(cls, values): + if values['occupancy'] is None: + if values['fuel_type'] in [BusFuelType.Cng, BusFuelType.Hydrogen]: + values['occupancy'] = None + else: + values['occupancy'] = BusOccupancy.c_50 + else: + values['occupancy'] = BusOccupancy(values['occupancy']) + return values + + +class MotorbikeEmissionParameters(BaseModel): + + subcategory: TransportationMode = TransportationMode.Motorbike + size: Union[MotorbikeSize, str] = MotorbikeSize.Average + + @validator("size", allow_reuse=True) + def check_size(cls, v): + v = v.lower() if isinstance(v, str) else v + return MotorbikeSize(v) + + + +# class EmissionParameters(BaseModel): +# +# subcategory: Union[TransportationMode, str] +# fuel_type: Union[CarFuelType, BusFuelType, PlaneFuelType, TrainFuelType, str] = None +# size: Union[CarSize, BusSize, TrainSize, str] = None +# occupancy: Union[BusOccupancy, TrainOccupancy, str] = None +# range: Union[BusRange, TrainRange, str] = None +# seating_class: Union[PlaneSeatingClass, FerrySeatingClass, str] = None +# +# @validator("fuel_type", allow_reuse=True) +# def check_fueltype(cls, v, values): +# v = v.lower() if isinstance(v, str) else v +# if v is "average": +# if "Average" not in TrainFuelType.__members__.keys(): +# return eval(f"{t}FuelType('nan')") +# else: +# raise FactorNotFound("Fuel type needs to be provided. No average value found.") +# else: +# return eval(f"{t}FuelType('{v}')") +# +# @validator("size", allow_reuse=True, pre=True) +# def check_size(cls, v, values): +# t = values["subcategory"].lower().capitalize() +# v = v.lower() if isinstance(v, str) else v +# return eval(f"{t}Size('{v}')") +# +# @validator("range", allow_reuse=True) +# def check_range(cls, v, values): +# t = values["subcategory"].lower().capitalize() +# v = v.lower() if isinstance(v, str) else v +# return eval(f"{t}Range('{v}')") +# +# @validator("seating_class", allow_reuse=True) +# def check_seating_class(cls, v, values): +# t = values["subcategory"].lower().capitalize() +# v = v.lower() if isinstance(v, str) else v +# return eval(f"{t}SeatingClass('{v}')") +# +# @validator("occupancy", allow_reuse=True) +# def check_occupancy(cls, v, values): +# t = values["subcategory"].lower().capitalize() +# v = v.lower() if isinstance(v, str) else v +# return eval(f"{t}Occupancy('{v}')") +# +# + diff --git a/co2calculator/reader.py b/co2calculator/reader.py new file mode 100644 index 0000000..bb16d66 --- /dev/null +++ b/co2calculator/reader.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +"""Reader class to retrieve co2 factors from database""" +import warnings +from pathlib import Path +import pandas as pd + +from .exceptions import ConversionFactorNotFound + +script_path = str(Path(__file__).parent) + + +class Reader: + + def __init__(self): + """Init""" + self.emission_factors = pd.read_csv(f"{script_path}/../data/emission_factors.csv") + self.conversion_factors = pd.read_csv( + f"{script_path}/../data/conversion_factors_heating.csv" + ) + self.detour_factors = pd.read_csv(f"{script_path}/../data/detour.csv") + + def get_emission_factor(self, parameters: dict): + """ + Returns factors from the database + :param parameters: + :type parameters: + :return: + :rtype: + """ + selected_factors = self.emission_factors + for k, v in parameters.items(): + if v is None: + continue + selected_factors_new = selected_factors[selected_factors[k].astype("str") == str(v.value)] + selected_factors = selected_factors_new + if selected_factors_new.empty: + raise ConversionFactorNotFound( + f"No suitable conversion factor found in database. Please adapt your query.") + + if len(selected_factors) > 1: + raise ConversionFactorNotFound(f"{len(selected_factors)} co2 conversion factors found. Please provide more specific selection criteria.") + else: + return selected_factors["co2e"].values[0] + + + diff --git a/data/emission_factors.csv b/data/emission_factors.csv index af460d2..6568f15 100644 --- a/data/emission_factors.csv +++ b/data/emission_factors.csv @@ -1,4 +1,4 @@ -,category,subcategory,source,model,name,unit,size_class,occupancy,capacity,range,fuel_type,co2e_unit,co2e,seating,comment +,category,subcategory,source,model,name,unit,size,occupancy,capacity,range,fuel_type,co2e_unit,co2e,seating,comment 0,transport,bus,Öko-Institut,gemis,Bus-Linie-BZ-DE-2020-Basis,P.km,average,"",,local,hydrogen,kg/P.km,0.0251,, 1,transport,bus,Öko-Institut-adapted,gemis,Bus-Linie-BZ-DE-2020-Basis,P.km,average,"",,long-distance,hydrogen,kg/P.km,0.0251,, 2,transport,bus,Öko-Institut,gemis,Bus-Linie-CNG-DE-2020-Basis,P.km,average,"",,local,cng,kg/P.km,0.0617,, diff --git a/tests/unit/test_calculate.py b/tests/unit/test_calculate.py index 478594e..16dace5 100644 --- a/tests/unit/test_calculate.py +++ b/tests/unit/test_calculate.py @@ -9,6 +9,7 @@ import co2calculator.calculate as candidate from co2calculator.constants import RangeCategory +from co2calculator.exceptions import ConversionFactorNotFound @pytest.mark.parametrize( @@ -51,6 +52,62 @@ def test_calc_co2_car( assert round(actual_emissions, 2) == expected_emissions +@pytest.mark.parametrize( + "start_address,start_city,start_country,destination_address,destination_city,destination_country,transportation_mode,passengers,size,fuel_type,expected_emissions", + [ + pytest.param(None, "Heidelberg", "Germany", None, "Berlin", "Germany", "car", None, None, None, 137.88, id="car"), + pytest.param("Augsburg Hochzoll", "Augsburg", "Germany", "Berlin Gesundbrunnen", "Berlin", "Germany", "train", None, None, None, 19.54, id="train"), + #pytest.param(10, 1, "small", None, 1.79, id="size: 'small'"), + #pytest.param(10, 1, "medium", None, 2.09, id="size: 'medium'"), + #pytest.param(10, 1, "large", None, 2.74, id="size: 'large'"), + #pytest.param(10, 1, "average", None, 2.15, id="size: 'average'"), + #pytest.param(10, 1, None, "diesel", 2.01, id="fuel_type: 'diesel'"), + ##pytest.param(10, 1, None, "gasoline", 2.24, id="fuel_type: 'gasoline'"), + # pytest.param(10, 1, None, "cng", 31.82, id="fuel_type: 'cng'"), + #pytest.param(10, 1, None, "electric", 0.57, id="fuel_type: 'electric'"), + #pytest.param(10, 1, None, "hybrid", 1.16, id="fuel_type: 'hybrid'"), + #pytest.param( + # 10, 1, None, "plug-in_hybrid", 0.97, id="fuel_type: 'plug-in_hybrid'" + #), + #pytest.param(10, 1, None, "average", 2.15, id="fuel_type: 'average'"), + ], +) +def test_calc_co2_busincesstrip( + start_address: str, + start_city: str, + start_country: str, + destination_address: str, + destination_city: str, + destination_country: str, + transportation_mode: str, + passengers: Optional[int], + size: Optional[str], + fuel_type: Optional[str], + expected_emissions: float, +): + """Test: Calculate car-trip emissions based on given distance. + Expect: Returns emissions and distance. + """ + start = {"station_name": start_address, + "address": start_address, + "locality": start_city, + "country": start_country} + destination = {"station_name": destination_address, + "address": destination_address, + "locality": destination_city, + "country": destination_country} + actual_emissions, _, _, _ = candidate.calc_co2_businesstrip( + transportation_mode=transportation_mode, + start=start, + destination=destination, + passengers=passengers, + size=size, + fuel_type=fuel_type, + ) + + assert round(actual_emissions, 2) == pytest.approx(expected_emissions, 0.1) + + @pytest.mark.parametrize( "distance,size,expected_emissions", [ @@ -69,24 +126,24 @@ def test_calc_co2_motorbike( """ actual_emissions = candidate.calc_co2_motorbike(distance=distance, size=size) - assert round(actual_emissions, 2) == expected_emissions + assert round(actual_emissions, 2) == pytest.approx(expected_emissions, 0.1) @pytest.mark.parametrize( - "distance,size,fuel_type,occupancy,vehicle_range,expected_emissions", + "distance,size,fuel_type,occupancy,range,expected_emissions", [ pytest.param(549, None, None, None, None, 21.63, id="defaults"), pytest.param( - 549, "large", "diesel", 80, "long-distance", 12.3, id="optional arguments" + 549, "large", "diesel", 80.0, "long-distance", 12.3, id="optional arguments" ), pytest.param(10, "medium", None, None, None, 0.42, id="size: 'medium'"), pytest.param(10, "large", None, None, None, 0.33, id="size: 'large'"), pytest.param(10, "average", None, None, None, 0.39, id="size: 'average'"), - pytest.param(10, None, None, 20, None, 0.92, id="occupancy: 20"), - pytest.param(10, None, None, 50, None, 0.39, id="occupancy: 50"), - pytest.param(10, None, None, 80, None, 0.26, id="occupancy: 80"), - pytest.param(10, None, None, 100, None, 0.22, id="occupancy: 100"), - pytest.param(10, None, None, None, "local", 0.39, id="vehicle_range: 'local'"), + pytest.param(10, None, None, 20.0, None, 0.92, id="occupancy: 20"), + pytest.param(10, None, None, 50.0, None, 0.39, id="occupancy: 50"), + pytest.param(10, None, None, 80.0, None, 0.26, id="occupancy: 80"), + pytest.param(10, None, None, 100.0, None, 0.22, id="occupancy: 100"), + pytest.param(10, None, None, None, "local", 0.39, id="range: 'local'"), pytest.param( 10, None, @@ -94,20 +151,20 @@ def test_calc_co2_motorbike( None, "long-distance", 0.39, - id="vehicle_range: 'long-distance'", + id="range: 'long-distance'", ), pytest.param( 10, - "small", + None, "diesel", None, "long-distance", 0.39, - id="size: 'small', fuel_type: `diesel`, vehicle_range: 'long-distance'", + id="size: 'small', fuel_type: `diesel`, range: 'long-distance'", ), pytest.param( 10, - "medium", + None, "cng", None, "long-distance", @@ -116,7 +173,7 @@ def test_calc_co2_motorbike( ), pytest.param( 10, - "small", + None, "hydrogen", None, "local", @@ -130,7 +187,7 @@ def test_calc_co2_bus( size: Optional[str], fuel_type: Optional[str], occupancy: Optional[int], - vehicle_range: Optional[str], + range: Optional[str], expected_emissions: float, ): """Test: Calculate bus-trip emissions based on given distance. @@ -143,14 +200,14 @@ def test_calc_co2_bus( size=size, fuel_type=fuel_type, occupancy=occupancy, - vehicle_range=vehicle_range, + range=range, ) assert round(actual_emissions, 2) == expected_emissions @pytest.mark.parametrize( - "distance,fuel_type,vehicle_range,expected_emissions", + "distance,fuel_type,range,expected_emissions", [ pytest.param(1162, None, None, 38.23, id="defaults"), pytest.param( @@ -159,16 +216,16 @@ def test_calc_co2_bus( pytest.param(10, "electric", None, 0.32, id="fuel_type: 'electric'"), pytest.param(10, "diesel", None, 0.7, id="fuel_type: 'diesel'"), pytest.param(10, "average", None, 0.33, id="fuel_type: 'average'"), - pytest.param(10, None, "local", 0.6, id="vehicle_range: 'local'"), + pytest.param(10, None, "local", 0.6, id="range: 'local'"), pytest.param( - 10, None, "long-distance", 0.33, id="vehicle_range: 'long-distance'" + 10, None, "long-distance", 0.33, id="range: 'long-distance'" ), ], ) def test_calc_co2_train( distance: float, fuel_type: Optional[str], - vehicle_range: Optional[str], + range: Optional[str], expected_emissions: float, ): """Test: Calculate train-trip emissions based on given distance. @@ -176,7 +233,7 @@ def test_calc_co2_train( """ actual_emissions = candidate.calc_co2_train( - distance=distance, fuel_type=fuel_type, vehicle_range=vehicle_range + distance=distance, fuel_type=fuel_type, range=range ) assert round(actual_emissions, 2) == expected_emissions @@ -200,7 +257,7 @@ def test_calc_co2_plane( """ actual_emissions = candidate.calc_co2_plane( - distance=distance, seating_class=seating_class + distance=distance, seating=seating_class ) assert round(actual_emissions, 2) == expected_emissions @@ -212,7 +269,7 @@ def test_calc_co2_plane__failed() -> None: """ with pytest.raises(ValueError): - candidate.calc_co2_plane(distance=5000, seating_class="NON-EXISTENT") + candidate.calc_co2_plane(distance=5000, seating="NON-EXISTENT") def test_calc_co2_plane__invalid_distance_seating_combo() -> None: @@ -221,10 +278,8 @@ def test_calc_co2_plane__invalid_distance_seating_combo() -> None: """ # Check if raises warning (premium economy class is not available for short-haul flights) - with pytest.warns( - UserWarning, match=r"Seating class '\w+' not available for short-haul flights" - ): - candidate.calc_co2_plane(distance=400, seating_class="premium_economy_class") + with pytest.raises(ConversionFactorNotFound): + candidate.calc_co2_plane(distance=400, seating="premium_economy_class") @pytest.mark.parametrize( @@ -242,7 +297,7 @@ def test_calc_ferry(seating_class: Optional[str], expected_emissions: float) -> """ actual_emissions = candidate.calc_co2_ferry( - distance=100, seating_class=seating_class + distance=100, seating=seating_class ) assert round(actual_emissions, 2) == expected_emissions @@ -394,3 +449,4 @@ def test_calc_co2_businesstrip( ) patched_method.assert_called_once() + diff --git a/tests/unit/test_parameters.py b/tests/unit/test_parameters.py new file mode 100644 index 0000000..ea2f5e3 --- /dev/null +++ b/tests/unit/test_parameters.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +"""__description__""" + +__author__ = "Christina Ludwig, GIScience Research Group, Heidelberg University" +__email__ = "christina.ludwig@uni-heidelberg.de" + +import pytest + +from co2calculator import Reader, PlaneRange, TrainFuelType +from co2calculator.parameters import CarEmissionParameters, TrainEmissionParameters, BusEmissionParameters, \ + PlaneEmissionParameters + + +def test_default_car_parameters(): + par = CarEmissionParameters() + reader = Reader() + expected = 0.215 + + actual = reader.get_emission_factor(par.dict()) + assert actual == pytest.approx(expected, 0.3) + + +def test_default_train_parameters(): + par = TrainEmissionParameters() + reader = Reader() + expected = 0.0329 + + actual = reader.get_emission_factor(par.dict()) + assert actual == pytest.approx(expected, 0.3) + + +def test_default_train_fuel_parameters(): + par = TrainEmissionParameters(fuel_type=TrainFuelType.Electric) + reader = Reader() + expected = 0.0329 + + actual = reader.get_emission_factor(par.dict()) + assert actual == pytest.approx(expected, 0.3) + + +def test_default_bus_parameters(): + par = BusEmissionParameters() + reader = Reader() + expected = 0.0394 + + actual = reader.get_emission_factor(par.dict()) + assert actual == pytest.approx(expected, 0.3) + + +def test_default_plane_parameters(): + par = PlaneEmissionParameters(range=PlaneRange.Long_haul) + reader = Reader() + expected = 0.19085 + + actual = reader.get_emission_factor(par.dict()) + assert actual == pytest.approx(expected, 0.3) \ No newline at end of file diff --git a/tests/unit/test_reader.py b/tests/unit/test_reader.py new file mode 100644 index 0000000..e5a8dce --- /dev/null +++ b/tests/unit/test_reader.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +"""__description__""" +import pytest + +from co2calculator.exceptions import ConversionFactorNotFound +from co2calculator.parameters import * +from co2calculator.enums import * +from co2calculator.reader import Reader + + +def test_read_emission_factors_ConversionFactorNotFound(): + + reader = Reader() + parameters = {"seating": PlaneSeatingClass.Premium_economy_class, + "range": PlaneRange.Short_haul} + param = PlaneEmissionParameters(**parameters) + + with pytest.raises(ConversionFactorNotFound): + reader.get_emission_factor(param.dict()) + + +def test_read_emission_factors(): + + reader = Reader() + parameters = { + "fuel_type": CarFuelType.Electric, + "size": CarSize.Average + } + expected = 0.05728 + + param = CarEmissionParameters(**parameters) + factor = reader.get_emission_factor(param.dict()) + assert factor == pytest.approx(expected, 0.3) +