forked from brandonmcfadd/cta-reliability
-
Notifications
You must be signed in to change notification settings - Fork 0
/
export_single_day_arrivals.py
132 lines (110 loc) · 5.64 KB
/
export_single_day_arrivals.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""export data for previous days arrivals by Brandon McFadden"""
import os
import json
from csv import DictWriter
from datetime import datetime, timedelta
from time import sleep
import pandas as pd
from dotenv import load_dotenv # Used to Load Env Var
import requests # Used for API Calls
# Load .env variables
load_dotenv()
microsoft_username = os.getenv('MICROSOFT_USERNAME')
microsoft_password = os.getenv('MICROSOFT_PASSWORD')
microsoft_client_id = os.getenv('MICROSOFT_CLIENT_ID')
microsoft_client_secret = os.getenv('MICROSOFT_CLIENT_SECRET')
main_file_path_csv = os.getenv('FILE_PATH_CSV')
cta_dataset_id = os.getenv('CTA_DATASET_ID')
metra_dataset_id = os.getenv('METRA_DATASET_ID')
train_arrivals_csv_headers = ['Station_ID', 'Stop_ID', 'Station_Name', 'Destination', 'Route',
'Run_Number', 'Prediction_Time', 'Arrival_Time', 'Headway', 'Time_Of_Week', 'Time_Of_Day', 'Consistent_Interval']
def get_date(date_type, delay):
"""formatted date shortcut"""
if date_type == "short":
date = datetime.strftime(datetime.now(), "%Y%m%d")
elif date_type == "hour":
date = datetime.strftime(datetime.now(), "%H")
elif date_type == "file":
date = datetime.strftime(
datetime.now()-timedelta(days=delay), "%Y-%m-%d")
elif date_type == "year":
date = datetime.strftime(datetime.now()-timedelta(days=delay), "%Y")
elif date_type == "month":
date = datetime.strftime(datetime.now()-timedelta(days=delay), "%m")
elif date_type == "day":
date = datetime.strftime(datetime.now()-timedelta(days=delay), "%d")
return date
def get_token():
"""used to get a MS token for PBI API"""
url = "https://login.microsoftonline.com/common/oauth2/token"
payload = f"grant_type=password\n&username={microsoft_username}\n&password={microsoft_password}\n&client_id={microsoft_client_id}\n&client_secret={microsoft_client_secret}\n&resource=https://analysis.windows.net/powerbi/api"
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Cookie': 'fpc=Ar43AgyVlG9Iq2cDNhyCm7TZQxmvAQAAAC18YtsOAAAAM7ttQwEAAADze2LbDgAAAA; stsservicecookie=estsfd; x-ms-gateway-slice=estsfd'
}
response = requests.request(
"POST", url, headers=headers, data=payload).json()
return response.get('access_token')
def get_report_data(dataset, delay):
"""makes call to PBI API to actually get the data we need"""
url = f"https://api.powerbi.com/v1.0/myorg/datasets/{dataset}/executeQueries"
year = get_date("year", delay)
month = get_date("month", delay)
day = get_date("day", delay)
payload = json.dumps({
"queries": [
{
"query": f"EVALUATE FILTER(train_arrivals,train_arrivals[Arrival_Time.Date]=DATE({year},{month},{day}))"
}
],
"serializerSettings": {
"includeNulls": True
},
"impersonatedUserName": "brandonmcfadden@brandonmcfadden.onmicrosoft.com"
})
headers = {
'Authorization': f'Bearer {bearer_token}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
response_json = json.loads(response.text)
try:
return response_json['results'][0].get('tables')[0].get('rows')
except: # pylint: disable=bare-except
print("error in:", response_json)
def make_clean_train_file(delay):
"""Used to check if file exists"""
shortened_date = get_date("file", delay)
csv_file_path = main_file_path_csv + "cta/" + shortened_date + ".csv"
print(f"Creating Empty File and Adding Headers: {csv_file_path}")
with open(csv_file_path, 'w+', newline='', encoding='utf8') as csvfile:
writer_object = DictWriter(
csvfile, fieldnames=train_arrivals_csv_headers)
writer_object.writeheader()
def parse_response_cta(data, delay):
"""takes api response and turns it into usable data without all the extra powerbi stuff"""
make_clean_train_file(delay)
for item in data:
shortened_date = get_date("file", delay)
csv_file_path = main_file_path_csv + "cta/" + shortened_date + ".csv"
with open(csv_file_path, 'a', newline='', encoding='utf8') as csvfile:
writer_object = DictWriter(
csvfile, fieldnames=train_arrivals_csv_headers)
writer_object.writerow({'Station_ID': item["train_arrivals[Station_ID]"], 'Stop_ID': item["train_arrivals[Stop_ID]"],
'Station_Name': item["train_arrivals[Station_Name]"], 'Destination': item["train_arrivals[Destination]"], 'Route': item["train_arrivals[Route]"],
'Run_Number': item["train_arrivals[Run_Number]"], 'Prediction_Time': item["train_arrivals[Prediction_Time.Date]"][0:10] + item["train_arrivals[Prediction_Time.Time]"][10:],
'Arrival_Time': item["train_arrivals[Arrival_Time_Combined]"], 'Headway': item["train_arrivals[Headway]"], 'Time_Of_Week': item["train_arrivals[Time of Week]"], 'Time_Of_Day': item["train_arrivals[Time Of Day]"], 'Consistent_Interval': item["train_arrivals[Headway Consistency]"]})
data_frame = pd.read_csv(csv_file_path)
sorted_data_frame = data_frame.sort_values(
by=["Route", "Stop_ID", "Arrival_Time"], ascending=True)
sorted_data_frame.to_csv(csv_file_path, index=False)
bearer_token = get_token()
remaining = 2
while remaining > 0:
try:
parse_response_cta(get_report_data(cta_dataset_id, remaining),remaining)
except: # pylint: disable=bare-except
print("Failed to grab #", remaining)
remaining -= 1
print("total cta remaining:", remaining)
sleep(1)