-
Notifications
You must be signed in to change notification settings - Fork 0
/
cowin_helper.py
183 lines (155 loc) · 7.81 KB
/
cowin_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
import requests
from pprint import pprint
import json
from datetime import date
from dotenv import load_dotenv
import os
import logging
load_dotenv()
useragent = os.getenv("USERAGENT")
headers = {"Accept-Language":"hi_IN","User-Agent":useragent}
class CowinWorker:
def __init__(self):
logging.basicConfig(
filename="cowin.log",
filemode= "w",
format="%(asctime)s %(levelname)-8s %(message)s",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S"
)
def get_state_list(self):
"""To get the state list of India
each state has state_id and state_name in it
"""
try:
url = "https://cdn-api.co-vin.in/api/v2/admin/location/states"
response = requests.request("GET", url, headers=headers)
except Exception as e:
logging.exception(str(e))
if response.status_code==200:
state_list = json.loads(response.text)["states"]
return state_list
else:
msg = {"status code": response.status_code,
"message":"Currently State API is not responding, try after sometime or check if headers are present while sending request",
"response text": response.text}
logging.exception(msg)
raise Exception(msg)
def get_district_list(self,state_name):
"""Returns district list
Args:
state_name (string): [description]
Returns:
[type]: [description]
"""
states = self.get_state_list()
for state in states:
if state["state_name"]==state_name:
id = state["state_id"]
try:
url = f"https://cdn-api.co-vin.in/api/v2/admin/location/districts/{id}"
response = requests.request("GET", url, headers=headers)
except Exception as e:
logging.exception(str(e))
if response.status_code==200:
district_list = json.loads(response.text)["districts"]
return district_list
else:
msg = {"status code": response.status_code,
"message":"Currently districts API is not responding, try after sometime or check if headers are present while sending request",
"response text": response.text}
logging.exception(msg)
def get_slot_by_district(self,state_name,district_name,age):
"""Gives us the slots by district name
Args:
state_name (string): Name of the state to which the district belongs
district_name (string): Name of the district
age (integer): min age for which you want to check the slot 18 or 45
Returns:
dict: returns customized message
"""
today = date.today().strftime("%d-%m-%Y")
available_centers= []
districts = self.get_district_list(state_name)
for district in districts:
if district["district_name"]==district_name:
id = district["district_id"]
try:
url = f"https://cdn-api.co-vin.in/api/v2/appointment/sessions/public/calendarByDistrict?district_id={id}&date={today}"
response = requests.request("GET", url, headers=headers)
except Exception as e:
logging.exception(str(e))
if response.status_code==200:
centers = json.loads(response.text)["centers"]
for center in centers:
if len(center["sessions"])== 1:
if center["sessions"][0]["available_capacity"]>0 and center["sessions"][0]["min_age_limit"] == age:
available_centers.append(center)
msg = {"status code":response.status_code,
"message":"Hurry up! slots available in your area",
"details":available_centers}
logging.info(msg)
return msg
else:
msg = {"message":"No slots available","center_name":center["name"]}
logging.info(msg)
else:
sessions_list = center["sessions"]
for session in sessions_list:
if session["available_capacity"]>0 and session["min_age_limit"]==age:
available_centers.append(center)
msg = {"status code":response.status_code,
"message":"Hurry up! slots available in your area",
"details":available_centers}
logging.info(msg)
return msg
else:
msg = {"message":"No slots available","center_name":center["name"]}
logging.info(msg)
else:
msg = {"status code": response.status_code,
"message":"Currently slot by district API is not responding, try after sometime or check if headers are present while sending request",
"response text": response.text}
logging.exception(msg)
def get_slot_by_pin(self,pin,age):
"""Gives us slots by PIN code
Args:
pin (integer): [description]
age (integer): min age for which you want to check the slot 18 or 45
Returns:
dict: returns customized message
"""
today = date.today().strftime("%d-%m-%Y")
available_centers= []
try:
url = f"https://cdn-api.co-vin.in/api/v2/appointment/sessions/public/calendarByPin?pincode={pin}&date={today}"
response = requests.request("GET", url, headers=headers)
except Exception as e:
logging.exception(str(e))
if response.status_code==200:
centers = json.loads(response.text)["centers"]
for center in centers:
if len(center["sessions"])== 1:
if center["sessions"][0]["available_capacity"]>0 and center["sessions"][0]["min_age_limit"] == age:
available_centers.append(center)
msg = {"status code":response.status_code,
"message":"Hurry up! slots available in your area",
"details":available_centers}
logging.info(msg)
return msg
else:
sessions_list = center["sessions"]
for session in sessions_list:
if session["available_capacity"]>0 and session["min_age_limit"]==age:
available_centers.append(center)
msg = {"status code":response.status_code,
"message":"Hurry up! slots available in your area",
"details":available_centers}
logging.info(msg)
return msg
else:
msg = {"status code": response.status_code,
"message":"Currently slot by PIN API is not responding, try after sometime or check if headers are present while sending request",
"response text": response.text}
return msg
logging.exception(msg)