Skip to content

Commit

Permalink
feat: update final ETL database to meet the requirements
Browse files Browse the repository at this point in the history
closes #23
  • Loading branch information
rafoolin committed Jan 10, 2024
1 parent 7cf70c5 commit 381aac6
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 25 deletions.
2 changes: 1 addition & 1 deletion project/pipeline/etl/extract/geo_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ def geo_data_extractor(compressed=True):
zip_helper = GZipFileHelper()
url = f"https://ec.europa.eu/eurostat/api/dissemination/sdmx/2.1/codelist/ESTAT/GEO/?compressed={compressed}&format=TSV&lang=en"
tsv_file_path = zip_helper.download_and_extract_url_file(url)
return pd.read_csv(tsv_file_path, sep="\t", header=0)
return pd.read_csv(tsv_file_path, sep="\t", header=0, names=['abbr', 'geo_full_name'])
20 changes: 20 additions & 0 deletions project/pipeline/etl/extract/motor_energy_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pandas as pd
from pipeline_utils.zip_helper import GZipFileHelper


def motor_energy_data_extractor(compressed=True):
"""
Extracts motor energy data from the data source URL. This is an abbr mapper to its meaning.
Parameters:
- compressed (bool, optional): Flag indicating whether the file is compressed. Both are possible from the data provider.
When True (default), the function assumes the file is compressed and uses GZip decompression.
When False, the file is assumed to be uncompressed.
Returns:
- DataFrame: A DataFrame containing the extracted data.
"""
zip_helper = GZipFileHelper()
url = f"https://ec.europa.eu/eurostat/api/dissemination/sdmx/2.1/codelist/ESTAT/MOT_NRG/?compressed={compressed}&format=TSV&lang=en"
tsv_file_path = zip_helper.download_and_extract_url_file(url)
return pd.read_csv(tsv_file_path, sep="\t", header=0, names=['abbr', 'motor_energy_full_name'])
20 changes: 16 additions & 4 deletions project/pipeline/etl/transform/road_eqr_carpda_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,27 @@ def road_eqr_carpda_data_transformer(data_frame: pd.DataFrame) -> pd.DataFrame:
# Filter and drop rows that its frequency(freq) is not A|a.
# This means we only consider annual frequencies!
if "freq" in data_frame.columns:
frame_filter = data_frame["freq"].str.contains(r"[A|a]") == False
frame_filter = data_frame["freq"].str.contains("A", case=False) == False
data_frame = data_frame[~frame_filter]
# Now that rows are filtered, we drop the column
data_frame = data_frame.drop(["freq"], axis=1)
# Filter those rows with a "NR" value for "unit". NR means number
if "unit" in data_frame.columns:
frame_filter = data_frame["unit"].str.contains("NR", case=False) == False
data_frame = data_frame[~frame_filter]
# Now that rows are filtered, we drop the column
data_frame = data_frame.drop(["unit"], axis=1)
data_frame = data_frame.dropna()
# Convert [OBS_VALUE] to contain [int] values
if "OBS_VALUE" in data_frame.columns:
data_frame["OBS_VALUE"] = data_frame["OBS_VALUE"].astype(int)
data_frame = data_frame.rename({"OBS_VALUE": "n_passengers"}, axis=1)
if "unit" in data_frame.columns:
data_frame = data_frame.rename({"unit": "passengers_unit"}, axis=1)
data_frame = data_frame.rename({"OBS_VALUE": "n_passenger_cars"}, axis=1)

if "TIME_PERIOD" in data_frame.columns:
# Convert [TIME_PERIOD] to [datetime] values
data_frame["TIME_PERIOD"] = data_frame["TIME_PERIOD"].astype(str)
data_frame["TIME_PERIOD"] = pd.to_datetime(data_frame["TIME_PERIOD"], format='%Y')
# Reset indexes after changing and dropping rows
data_frame = data_frame.reset_index(drop=True)
print(data_frame)
return data_frame
17 changes: 12 additions & 5 deletions project/pipeline/etl/transform/sdg_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,24 @@ def sdg_data_transformer(data_frame: pd.DataFrame) -> pd.DataFrame:
# Dropping some columns we do not need
to_drop = ["DATAFLOW", "LAST UPDATE", "OBS_FLAG"]
to_drop_filter = data_frame.filter(to_drop)
data_frame = data_frame.drop(to_drop_filter, axis=1)
data_frame.drop(to_drop_filter, axis=1, inplace=True)
# Filter and drop rows that its frequency(freq) is not A|a.
# This means we only consider annual frequencies!
if "freq" in data_frame.columns:
frame_filter = data_frame["freq"].str.contains(r"[A|a]") == False
frame_filter = data_frame["freq"].str.contains("A", case=False) == False
data_frame = data_frame[~frame_filter]
# Now that rows are filtered, we drop the column
data_frame = data_frame.drop(["freq"], axis=1)
data_frame = data_frame.dropna()
data_frame.dropna(inplace=True)
if "OBS_VALUE" in data_frame.columns:
# Convert [OBS_VALUE] to contains [int] values
# Convert [OBS_VALUE] to [int] values
data_frame["OBS_VALUE"] = data_frame["OBS_VALUE"].astype(int)
data_frame = data_frame.rename({"OBS_VALUE": "emitted_co2"}, axis=1)
data_frame.rename({"OBS_VALUE": "emitted_co2"}, axis=1, inplace=True)

if "TIME_PERIOD" in data_frame.columns:
# Convert [TIME_PERIOD] to [datetime] values
data_frame["TIME_PERIOD"] = data_frame["TIME_PERIOD"].astype(str)
data_frame["TIME_PERIOD"] = pd.to_datetime(data_frame["TIME_PERIOD"], format='%Y')
# Reset indexes after changing and dropping rows
data_frame = data_frame.reset_index(drop=True)
return data_frame
8 changes: 8 additions & 0 deletions project/pipeline/etl/transform/tran_r_vehst_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,12 @@ def tran_r_vehst_data_transformer(data_frame: pd.DataFrame) -> pd.DataFrame:
data_frame = data_frame.rename({"OBS_VALUE": "n_vehicles"}, axis=1)
if "unit" in data_frame.columns:
data_frame = data_frame.rename({"unit": "vehicles_unit"}, axis=1)
# Reset indexes after changing and dropping rows
data_frame = data_frame.reset_index(drop=True)
if "TIME_PERIOD" in data_frame.columns:
# Convert [TIME_PERIOD] to [datetime] values
data_frame["TIME_PERIOD"] = data_frame["TIME_PERIOD"].astype(str)
data_frame["TIME_PERIOD"] = pd.to_datetime(data_frame["TIME_PERIOD"], format='%Y')
# Reset indexes after changing and dropping rows
data_frame = data_frame.reset_index(drop=True)
return data_frame
5 changes: 5 additions & 0 deletions project/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import etl.extract.sdg_extractor as sdg_e
import etl.extract.geo_extractor as geo_e
import etl.extract.motor_energy_extractor as motor_e
import etl.extract.unit_extractor as unit_e
import etl.extract.tran_r_vehst_extractor as tran_e
import etl.extract.road_eqr_carpda_extractor as road_e
Expand All @@ -21,6 +22,10 @@ def __extract(self):
self.sdg = sdg_e.sdg_data_extractor()
print(colored("Extracting SDG data source Finished!", "green"))

print(colored("Extracting motor engine data source...", "green"))
self.motor_nrg = motor_e.motor_energy_data_extractor()
print(colored("Extracting SDG data source Finished!", "green"))

print(colored("Extracting GEO data source...", "green"))
self.geo = geo_e.geo_data_extractor()
print(colored("Extracting GEO data source Finished!", "green"))
Expand Down
2 changes: 1 addition & 1 deletion project/pipeline/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ termcolor==2.3.0
attrs==22.2.0
greenlet==2.0.2
iniconfig==2.0.0
numpy==1.24.2
numpy==1.26.2
packaging==23.0
pandas==1.5.3
pluggy==1.0.0
Expand Down
18 changes: 13 additions & 5 deletions project/project-plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Title
<!-- Give your project a short title. -->
Correlation analysis between newly registered cars and Greenhouse gas emissions in the European Union.
Correlation analysis between newly registered cars and Greenhouse gas emissions in the Europe.

## Main Question

Expand Down Expand Up @@ -59,7 +59,7 @@ These data sources are abbreviation used in the other data sources.

## WIKI

Here we is the link to wiki for the projects or exercises if they need more explanation.
Here is the link to wiki for the projects or exercises if they need more explanation.

* [Week-03][l1]
* [Week-04][l2]
Expand All @@ -68,32 +68,40 @@ Here we is the link to wiki for the projects or exercises if they need more expl

<!-- List of work packages ordered sequentially, each pointing to an issue with more details. -->

### Week 02
### Project Work 02

* [Project needs at least two open data source][i2]
* [Project can cover next exercises][i3]
* [Write the project plan][i4]

### Week 03
### Project Work 03

* [Create pipeline for project][i5]
* [Update project-plan.md for Project Work 03][i6]

### Week 04
### Project Work 04

* [Write test for pipeline][i7]

### Project Work 05

* [Create a workflow using GitHub actions][i8]

### Project Work 06

* [Final report][i9]

* [Update final ETL database to meet the requirements][i10]

[i2]: https://github.com/rafoolin/made-template/issues/2
[i3]: https://github.com/rafoolin/made-template/issues/3
[i4]: https://github.com/rafoolin/made-template/issues/4
[i5]: https://github.com/rafoolin/made-template/issues/13
[i6]: https://github.com/rafoolin/made-template/issues/12
[i7]: https://github.com/rafoolin/made-template/issues/18
[i8]: https://github.com/rafoolin/made-template/issues/20
[i9]: https://github.com/rafoolin/made-template/issues/22
[i10]: https://github.com/rafoolin/made-template/issues/23

## References and footnotes

Expand Down
4 changes: 2 additions & 2 deletions project/tests/etl/extract/geo_extractor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ class TestGeoDataExtractor(unittest.TestCase):
@patch("pandas.read_csv")
def test_geo_data_extractor(self, mock_read_csv, mock_zip_helper):
mock_zip_helper.return_value = "geo_file.tsv"
mock_csv_data = pd.DataFrame({"C1": [1, 2], "C2": [3, 4]})
mock_csv_data = pd.DataFrame({"abbr": [1, 2], "geo_full_name": [3, 4]})
mock_read_csv.return_value = mock_csv_data
# Call the geo_data_extractor with compressed=True
result = geo_data_extractor(compressed=False)
path = "https://ec.europa.eu/eurostat/api/dissemination/sdmx/2.1/codelist/ESTAT/GEO/?compressed=False&format=TSV&lang=en"
mock_zip_helper.assert_called_once_with(path)
mock_read_csv.assert_called_once_with("geo_file.tsv", sep="\t", header=0)
mock_read_csv.assert_called_once_with("geo_file.tsv", sep="\t", header=0, names=['abbr', 'geo_full_name'])
self.assertTrue(result.equals(mock_csv_data))


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ def test_road_eqr_carpda_data_transformer(self) -> None:
"LAST UPDATE": [1, 2, 3],
"OBS_VALUE": ["1", "2", "3"],
"freq": ["a", "a", "b"],
"unit": ["A", "B", "C"],
"unit": ["NR", "NR", "C"],
}
)
expected_result = pd.DataFrame(
{
"n_passengers": [1, 2],
"passengers_unit": ["A", "B"],
"n_passenger_cars": [1, 2],
}
)
result = road_eqr_carpda_data_transformer(mock_csv_data)
Expand Down
6 changes: 2 additions & 4 deletions project/tests/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,10 @@ def setUp(self) -> None:
{
"geo": ["AT", "AT", "AT", "AT", "AT", "AT"],
"vehicles_unit": ["NR", "NR", "NR", "NR", "NR", "NR"],
"passengers_unit": ["NR", "NR", "NR", "NR", "NR", "NR"],
"n_vehicles": [7146, 7535, 7627, 7867, 6583, 7146],
"n_passengers": [4935, 5703, 4114, 1285, 2074, 2389],
"n_passenger_cars": [4935, 5703, 4114, 1285, 2074, 2389],
"emitted_co2": [165, 164, 163, 161, 162, 163],
"mot_nrg": ["ALT", "ALT", "ALT", "ALT", "ALT", "ALT"],
"time_period": [2001, 2002, 2003, 2004, 2005, 2006],
}
)
self.db_name = "pipeline"
Expand Down Expand Up @@ -150,7 +148,7 @@ def test_data_after_run_pipeline(
conn = sqlite3.connect(self.db_path)
query = f"SELECT * FROM {self.table_name}"
result = pd.read_sql_query(query, conn)
pd.testing.assert_frame_equal(result, self.expected_result, check_like=True)
pd.testing.assert_frame_equal(result.drop('time_period', axis=1), self.expected_result, check_like=True, check_dtype=False)
conn.commit()
conn.close()

Expand Down

0 comments on commit 381aac6

Please sign in to comment.