diff --git a/project/pipeline/etl/extract/geo_extractor.py b/project/pipeline/etl/extract/geo_extractor.py index bfb9723f47..56a23007eb 100644 --- a/project/pipeline/etl/extract/geo_extractor.py +++ b/project/pipeline/etl/extract/geo_extractor.py @@ -17,4 +17,4 @@ def geo_data_extractor(compressed=True): zip_helper = GZipFileHelper() url = f"https://ec.europa.eu/eurostat/api/dissemination/sdmx/2.1/codelist/ESTAT/GEO/?compressed={compressed}&format=TSV&lang=en" tsv_file_path = zip_helper.download_and_extract_url_file(url) - return pd.read_csv(tsv_file_path, sep="\t", header=0) + return pd.read_csv(tsv_file_path, sep="\t", header=0, names=['abbr', 'geo_full_name']) diff --git a/project/pipeline/etl/extract/motor_energy_extractor.py b/project/pipeline/etl/extract/motor_energy_extractor.py new file mode 100644 index 0000000000..46c77e4c32 --- /dev/null +++ b/project/pipeline/etl/extract/motor_energy_extractor.py @@ -0,0 +1,20 @@ +import pandas as pd +from pipeline_utils.zip_helper import GZipFileHelper + + +def motor_energy_data_extractor(compressed=True): + """ + Extracts motor energy data from the data source URL. This is an abbr mapper to its meaning. + + Parameters: + - compressed (bool, optional): Flag indicating whether the file is compressed. Both are possible from the data provider. + When True (default), the function assumes the file is compressed and uses GZip decompression. + When False, the file is assumed to be uncompressed. + + Returns: + - DataFrame: A DataFrame containing the extracted data. + """ + zip_helper = GZipFileHelper() + url = f"https://ec.europa.eu/eurostat/api/dissemination/sdmx/2.1/codelist/ESTAT/MOT_NRG/?compressed={compressed}&format=TSV&lang=en" + tsv_file_path = zip_helper.download_and_extract_url_file(url) + return pd.read_csv(tsv_file_path, sep="\t", header=0, names=['abbr', 'motor_energy_full_name']) diff --git a/project/pipeline/etl/transform/road_eqr_carpda_transformer.py b/project/pipeline/etl/transform/road_eqr_carpda_transformer.py index 01015126a5..50d35ddff6 100644 --- a/project/pipeline/etl/transform/road_eqr_carpda_transformer.py +++ b/project/pipeline/etl/transform/road_eqr_carpda_transformer.py @@ -17,15 +17,27 @@ def road_eqr_carpda_data_transformer(data_frame: pd.DataFrame) -> pd.DataFrame: # Filter and drop rows that its frequency(freq) is not A|a. # This means we only consider annual frequencies! if "freq" in data_frame.columns: - frame_filter = data_frame["freq"].str.contains(r"[A|a]") == False + frame_filter = data_frame["freq"].str.contains("A", case=False) == False data_frame = data_frame[~frame_filter] # Now that rows are filtered, we drop the column data_frame = data_frame.drop(["freq"], axis=1) + # Filter those rows with a "NR" value for "unit". NR means number + if "unit" in data_frame.columns: + frame_filter = data_frame["unit"].str.contains("NR", case=False) == False + data_frame = data_frame[~frame_filter] + # Now that rows are filtered, we drop the column + data_frame = data_frame.drop(["unit"], axis=1) data_frame = data_frame.dropna() # Convert [OBS_VALUE] to contain [int] values if "OBS_VALUE" in data_frame.columns: data_frame["OBS_VALUE"] = data_frame["OBS_VALUE"].astype(int) - data_frame = data_frame.rename({"OBS_VALUE": "n_passengers"}, axis=1) - if "unit" in data_frame.columns: - data_frame = data_frame.rename({"unit": "passengers_unit"}, axis=1) + data_frame = data_frame.rename({"OBS_VALUE": "n_passenger_cars"}, axis=1) + + if "TIME_PERIOD" in data_frame.columns: + # Convert [TIME_PERIOD] to [datetime] values + data_frame["TIME_PERIOD"] = data_frame["TIME_PERIOD"].astype(str) + data_frame["TIME_PERIOD"] = pd.to_datetime(data_frame["TIME_PERIOD"], format='%Y') + # Reset indexes after changing and dropping rows + data_frame = data_frame.reset_index(drop=True) + print(data_frame) return data_frame diff --git a/project/pipeline/etl/transform/sdg_transformer.py b/project/pipeline/etl/transform/sdg_transformer.py index 3f1cd59af1..ef3a855870 100644 --- a/project/pipeline/etl/transform/sdg_transformer.py +++ b/project/pipeline/etl/transform/sdg_transformer.py @@ -13,17 +13,24 @@ def sdg_data_transformer(data_frame: pd.DataFrame) -> pd.DataFrame: # Dropping some columns we do not need to_drop = ["DATAFLOW", "LAST UPDATE", "OBS_FLAG"] to_drop_filter = data_frame.filter(to_drop) - data_frame = data_frame.drop(to_drop_filter, axis=1) + data_frame.drop(to_drop_filter, axis=1, inplace=True) # Filter and drop rows that its frequency(freq) is not A|a. # This means we only consider annual frequencies! if "freq" in data_frame.columns: - frame_filter = data_frame["freq"].str.contains(r"[A|a]") == False + frame_filter = data_frame["freq"].str.contains("A", case=False) == False data_frame = data_frame[~frame_filter] # Now that rows are filtered, we drop the column data_frame = data_frame.drop(["freq"], axis=1) - data_frame = data_frame.dropna() + data_frame.dropna(inplace=True) if "OBS_VALUE" in data_frame.columns: - # Convert [OBS_VALUE] to contains [int] values + # Convert [OBS_VALUE] to [int] values data_frame["OBS_VALUE"] = data_frame["OBS_VALUE"].astype(int) - data_frame = data_frame.rename({"OBS_VALUE": "emitted_co2"}, axis=1) + data_frame.rename({"OBS_VALUE": "emitted_co2"}, axis=1, inplace=True) + + if "TIME_PERIOD" in data_frame.columns: + # Convert [TIME_PERIOD] to [datetime] values + data_frame["TIME_PERIOD"] = data_frame["TIME_PERIOD"].astype(str) + data_frame["TIME_PERIOD"] = pd.to_datetime(data_frame["TIME_PERIOD"], format='%Y') + # Reset indexes after changing and dropping rows + data_frame = data_frame.reset_index(drop=True) return data_frame diff --git a/project/pipeline/etl/transform/tran_r_vehst_transformer.py b/project/pipeline/etl/transform/tran_r_vehst_transformer.py index db80737bef..a7a9901741 100644 --- a/project/pipeline/etl/transform/tran_r_vehst_transformer.py +++ b/project/pipeline/etl/transform/tran_r_vehst_transformer.py @@ -34,4 +34,12 @@ def tran_r_vehst_data_transformer(data_frame: pd.DataFrame) -> pd.DataFrame: data_frame = data_frame.rename({"OBS_VALUE": "n_vehicles"}, axis=1) if "unit" in data_frame.columns: data_frame = data_frame.rename({"unit": "vehicles_unit"}, axis=1) + # Reset indexes after changing and dropping rows + data_frame = data_frame.reset_index(drop=True) + if "TIME_PERIOD" in data_frame.columns: + # Convert [TIME_PERIOD] to [datetime] values + data_frame["TIME_PERIOD"] = data_frame["TIME_PERIOD"].astype(str) + data_frame["TIME_PERIOD"] = pd.to_datetime(data_frame["TIME_PERIOD"], format='%Y') + # Reset indexes after changing and dropping rows + data_frame = data_frame.reset_index(drop=True) return data_frame diff --git a/project/pipeline/pipeline.py b/project/pipeline/pipeline.py index 260fc962c8..fbbe2a1c01 100644 --- a/project/pipeline/pipeline.py +++ b/project/pipeline/pipeline.py @@ -3,6 +3,7 @@ import etl.extract.sdg_extractor as sdg_e import etl.extract.geo_extractor as geo_e +import etl.extract.motor_energy_extractor as motor_e import etl.extract.unit_extractor as unit_e import etl.extract.tran_r_vehst_extractor as tran_e import etl.extract.road_eqr_carpda_extractor as road_e @@ -21,6 +22,10 @@ def __extract(self): self.sdg = sdg_e.sdg_data_extractor() print(colored("Extracting SDG data source Finished!", "green")) + print(colored("Extracting motor engine data source...", "green")) + self.motor_nrg = motor_e.motor_energy_data_extractor() + print(colored("Extracting SDG data source Finished!", "green")) + print(colored("Extracting GEO data source...", "green")) self.geo = geo_e.geo_data_extractor() print(colored("Extracting GEO data source Finished!", "green")) diff --git a/project/pipeline/requirements.txt b/project/pipeline/requirements.txt index 36d1d1fa3e..43230674d9 100644 --- a/project/pipeline/requirements.txt +++ b/project/pipeline/requirements.txt @@ -4,7 +4,7 @@ termcolor==2.3.0 attrs==22.2.0 greenlet==2.0.2 iniconfig==2.0.0 -numpy==1.24.2 +numpy==1.26.2 packaging==23.0 pandas==1.5.3 pluggy==1.0.0 diff --git a/project/project-plan.md b/project/project-plan.md index 8bdf66310f..941925a5e7 100644 --- a/project/project-plan.md +++ b/project/project-plan.md @@ -2,7 +2,7 @@ ## Title -Correlation analysis between newly registered cars and Greenhouse gas emissions in the European Union. +Correlation analysis between newly registered cars and Greenhouse gas emissions in the Europe. ## Main Question @@ -59,7 +59,7 @@ These data sources are abbreviation used in the other data sources. ## WIKI -Here we is the link to wiki for the projects or exercises if they need more explanation. +Here is the link to wiki for the projects or exercises if they need more explanation. * [Week-03][l1] * [Week-04][l2] @@ -68,18 +68,18 @@ Here we is the link to wiki for the projects or exercises if they need more expl -### Week 02 +### Project Work 02 * [Project needs at least two open data source][i2] * [Project can cover next exercises][i3] * [Write the project plan][i4] -### Week 03 +### Project Work 03 * [Create pipeline for project][i5] * [Update project-plan.md for Project Work 03][i6] -### Week 04 +### Project Work 04 * [Write test for pipeline][i7] @@ -87,6 +87,12 @@ Here we is the link to wiki for the projects or exercises if they need more expl * [Create a workflow using GitHub actions][i8] +### Project Work 06 + +* [Final report][i9] + +* [Update final ETL database to meet the requirements][i10] + [i2]: https://github.com/rafoolin/made-template/issues/2 [i3]: https://github.com/rafoolin/made-template/issues/3 [i4]: https://github.com/rafoolin/made-template/issues/4 @@ -94,6 +100,8 @@ Here we is the link to wiki for the projects or exercises if they need more expl [i6]: https://github.com/rafoolin/made-template/issues/12 [i7]: https://github.com/rafoolin/made-template/issues/18 [i8]: https://github.com/rafoolin/made-template/issues/20 +[i9]: https://github.com/rafoolin/made-template/issues/22 +[i10]: https://github.com/rafoolin/made-template/issues/23 ## References and footnotes diff --git a/project/tests/etl/extract/geo_extractor_test.py b/project/tests/etl/extract/geo_extractor_test.py index 95f5a42815..55bc7ce0d1 100644 --- a/project/tests/etl/extract/geo_extractor_test.py +++ b/project/tests/etl/extract/geo_extractor_test.py @@ -11,13 +11,13 @@ class TestGeoDataExtractor(unittest.TestCase): @patch("pandas.read_csv") def test_geo_data_extractor(self, mock_read_csv, mock_zip_helper): mock_zip_helper.return_value = "geo_file.tsv" - mock_csv_data = pd.DataFrame({"C1": [1, 2], "C2": [3, 4]}) + mock_csv_data = pd.DataFrame({"abbr": [1, 2], "geo_full_name": [3, 4]}) mock_read_csv.return_value = mock_csv_data # Call the geo_data_extractor with compressed=True result = geo_data_extractor(compressed=False) path = "https://ec.europa.eu/eurostat/api/dissemination/sdmx/2.1/codelist/ESTAT/GEO/?compressed=False&format=TSV&lang=en" mock_zip_helper.assert_called_once_with(path) - mock_read_csv.assert_called_once_with("geo_file.tsv", sep="\t", header=0) + mock_read_csv.assert_called_once_with("geo_file.tsv", sep="\t", header=0, names=['abbr', 'geo_full_name']) self.assertTrue(result.equals(mock_csv_data)) diff --git a/project/tests/etl/transform/road_eqr_carpda_transformer_test.py b/project/tests/etl/transform/road_eqr_carpda_transformer_test.py index 01dc3ce596..6640bea098 100644 --- a/project/tests/etl/transform/road_eqr_carpda_transformer_test.py +++ b/project/tests/etl/transform/road_eqr_carpda_transformer_test.py @@ -12,13 +12,12 @@ def test_road_eqr_carpda_data_transformer(self) -> None: "LAST UPDATE": [1, 2, 3], "OBS_VALUE": ["1", "2", "3"], "freq": ["a", "a", "b"], - "unit": ["A", "B", "C"], + "unit": ["NR", "NR", "C"], } ) expected_result = pd.DataFrame( { - "n_passengers": [1, 2], - "passengers_unit": ["A", "B"], + "n_passenger_cars": [1, 2], } ) result = road_eqr_carpda_data_transformer(mock_csv_data) diff --git a/project/tests/pipeline_test.py b/project/tests/pipeline_test.py index c277c7b1dc..7021a9533b 100644 --- a/project/tests/pipeline_test.py +++ b/project/tests/pipeline_test.py @@ -111,12 +111,10 @@ def setUp(self) -> None: { "geo": ["AT", "AT", "AT", "AT", "AT", "AT"], "vehicles_unit": ["NR", "NR", "NR", "NR", "NR", "NR"], - "passengers_unit": ["NR", "NR", "NR", "NR", "NR", "NR"], "n_vehicles": [7146, 7535, 7627, 7867, 6583, 7146], - "n_passengers": [4935, 5703, 4114, 1285, 2074, 2389], + "n_passenger_cars": [4935, 5703, 4114, 1285, 2074, 2389], "emitted_co2": [165, 164, 163, 161, 162, 163], "mot_nrg": ["ALT", "ALT", "ALT", "ALT", "ALT", "ALT"], - "time_period": [2001, 2002, 2003, 2004, 2005, 2006], } ) self.db_name = "pipeline" @@ -150,7 +148,7 @@ def test_data_after_run_pipeline( conn = sqlite3.connect(self.db_path) query = f"SELECT * FROM {self.table_name}" result = pd.read_sql_query(query, conn) - pd.testing.assert_frame_equal(result, self.expected_result, check_like=True) + pd.testing.assert_frame_equal(result.drop('time_period', axis=1), self.expected_result, check_like=True, check_dtype=False) conn.commit() conn.close()