Skip to content

Commit

Permalink
Merge pull request #70 from CoronaWhy/airflow
Browse files Browse the repository at this point in the history
Update load_data_forecast.py
  • Loading branch information
isaacmg authored Jul 13, 2020
2 parents d9aa13f + 8da6aa0 commit 7205827
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 8 deletions.
31 changes: 27 additions & 4 deletions airflow/load_data_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,34 @@ def data_to_GCS(csv_name: str, folder_name: str,
data = load_data()
df = pd.DataFrame(data=data)
df.to_csv('corona_data.csv', index=False)

hook.upload(bucket_name,
object='{}/{}.csv'.format(folder_name, csv_name),
filename='corona_data.csv',
columns_to_consider_for_uniqueness=['country', 'region', 'sub_region']
unique_column_name='full_county'
minimum_datapoints_threshold=60
"""
Function to split data-frame based on state or county.
"""
unique_df_list = []
for col in columns_to_consider_for_uniqueness:
df[col] = df[col].fillna('').apply(lambda x: x.replace(" ", "_"))
df[unique_column_name] = df[columns_to_consider_for_uniqueness[0]].str.cat(df[columns_to_consider_for_uniqueness[1:]], sep="_>
for i, g in df.groupby('full_county'):
df_code = g.copy()
ts_count = len(df_code)
if ts_count > minimum_datapoints_threshold:
df_code.reset_index(drop = True).loc[:, ~df.columns.str.contains('^Unnamed')].to_csv('{}.csv'.format(i), index = Fals>
hook.upload(bucket_name,
object='{}/{}.csv'.format(folder_name, i),
filename='{}.csv'.format(i),
mime_type='text/csv')
""" Function for full data pull
df.to_csv('corona_data.csv', index=False)
hook.upload(bucket_name,
object='{}/{}.csv'.format(folder_name, csv_name),
filename='corona_data.csv',
mime_type='text/csv')
"""



default_args = {
'owner': 'airflow',
Expand Down
13 changes: 9 additions & 4 deletions corona_ts/data_utils/data_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@ def fetch_time_series() -> pd.DataFrame:
Returns:
pd.DataFrame: raw timeseries data at county/sub-region level
"""
""" Old function
if not time_series_path.exists():
logger.info("Time series not present locally, downloading...")
url = "https://coronadatascraper.com/timeseries.csv"
urllib.request.urlretrieve(url, time_series_path)
"""
time_series_path = DATA_DIR / "timeseries.csv"
if not time_series_path.exists():
logger.info("Time series not present locally, downloading...")
url = "https://coronadatascraper.com/timeseries.csv"
urllib.request.urlretrieve(url, time_series_path)
logger.info("Pulling current time series data, downloading...")
url = "https://coronadatascraper.com/timeseries.csv"
urllib.request.urlretrieve(url, time_series_path)

time_series_df = pd.read_csv(time_series_path)
return time_series_df
Expand Down

0 comments on commit 7205827

Please sign in to comment.