-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgps-tracks-csv-upload.py
67 lines (50 loc) · 1.97 KB
/
gps-tracks-csv-upload.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import json
from time import sleep
import requests
import os
from urllib.parse import urlencode
TOLLGURU_API_KEY = os.environ.get("TOLLGURU_API_KEY")
TOLLGURU_API_URL = "https://apis.tollguru.com/toll/v2"
GPS_UPLOAD_ENDPOINT = "gps-tracks-csv-upload"
GPS_DOWNLOAD_ENDPOINT = "gps-tracks-csv-download"
# Configurable parameters for GPS upload endpoint
PARAMETERS = {
"vehicle": {
"weight": 3000,
"height": 10,
"type": "5AxlesTruck",
},
}
# Upload CSV file - You can use the CSV format file as shown in the table below.
def gps_tracks_csv_upload(is_async: bool = False):
payload = "Sample-GPS-tracks/france-GPS-Track-sample.csv"
url = f"{TOLLGURU_API_URL}/{GPS_UPLOAD_ENDPOINT}?{urlencode({**PARAMETERS['vehicle'], 'isAsync': is_async})}"
headers = {"x-api-key": TOLLGURU_API_KEY, "Content-Type": "text/csv"}
with open(payload, "rb") as f:
response = requests.request("POST", url, headers=headers, data=f)
# response = json.loads(response.text)
return response
# When isAsync is set to true, use this funcation to get the result by passing the return value from the gps_tracks_csv_upload
def gps_tracks_csv_download(
response_from_gps_tracks_csv_upload, retry: int = 5, delay: int = 5
):
count = retry # number of retries before stoping
status = "ERROR"
response = None
while count >= 0 and status == "ERROR":
url = f"{TOLLGURU_API_URL}/{GPS_DOWNLOAD_ENDPOINT}"
headers = {"x-api-key": TOLLGURU_API_KEY, "Content-Type": "application/json"}
response = requests.request(
"POST", url, headers=headers, data=response_from_gps_tracks_csv_upload
)
response = json.loads(response.text)
status = response["status"]
count -= 1
sleep(delay) # delay between each request.
return response
print(gps_tracks_csv_upload())
print(
gps_tracks_csv_download(
response_from_gps_tracks_csv_upload=gps_tracks_csv_upload(is_async=True)
)
)