-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscheduler.py
143 lines (126 loc) · 6.13 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from flask import Flask, render_template, request, redirect, url_for
from datetime import datetime, timedelta
from scraperModule import StudyRoomBooker
from minheap import taskObj, taskMinHeap
import webbrowser
import logging, signal, os
import csv
app = Flask(__name__)
class StudyRoomScheduler:
def __init__(self):
self.username = ""
self.password = "password"
self.ucfID = ""
self.taskHeap = taskMinHeap()
self.taskQueue = []
def write_to_csv(self):
try:
with open("data.txt", 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow([f'{self.username}', f'{self.ucfID}'])
writer.writerow(['Jobs:'])
writer.writerow(['date', 'start_time', 'duration', 'reservationType', 'room_option', 'room_number', 'min_capacity'])
while not self.taskHeap.isEmpty():
writer.writerow(self.taskHeap.extractMin().get_all_variables())
except Exception as e:
logging.error(f"Failed to dump data to txt: {e}")
def save_data_on_shutdown(self):
self.write_to_csv()
def schedule_Task(self, username, password, ucfID, date, start_time, duration, reservationType, room_option, room_number, min_capacity):
input_date = datetime.strptime(date, '%Y-%m-%d')
current_date = datetime.now()
date_difference = input_date - current_date
if date_difference >= timedelta(days=7):
newTask = taskObj(date, start_time, int(duration), reservationType, room_option, room_number, min_capacity)
self.taskHeap.insert(newTask)
else:
if reservationType == "group":
room_number = StudyRoomBooker.rand_room(room_option, min_capacity)
try:
mainBrowser = StudyRoomBooker()
mainBrowser.book_room(username, password, ucfID, date, room_number, start_time, int(duration))
mainBrowser.close()
except Exception as e:
# Log the error
logging.exception("An error occurred while scheduling the task: %s", str(e))
return False
return True
#Only to be called within enqueue task
def valid_task(self):
self.username = request.form.get('input1')
self.password = request.form.get('input2')
start_time = request.form.get('input4')
duration = request.form.get('input5')
reservationType = request.form.get('input6')
room_number = request.form.get('input7')
room_option = request.form.get('room-option')
min_capacity = request.form.get('input8')
self.ucfID = request.form.get('input9')
date = request.form.get('input3')
#while the below is in progress I want to have a loading screen of sorts
#print(self.mainBrowser.driver.session_id)
if self.schedule_Task(self.username, self.password, self.ucfID, date, start_time, duration, reservationType, room_option, room_number, min_capacity):
return redirect(url_for('completion_screen'))
return render_template('tester.html', username=self.username, password=self.password, start_time=start_time,
duration=duration,reservation_type=reservationType, min_capacity=min_capacity,
date=date, room_option=room_option, room_number=room_number,ucfID = self.ucfID)
def process_file(self):
tasks =[]
try:
with open('data.txt', 'r') as file:
reader = csv.reader(file)
self.username, self.ucfID = next(reader)
next(reader)
next(reader)
for row in reader:
if len(row) == 7:
tasks.append(row)
for task in tasks:
date, start_time, duration, reservationType, room_option, room_number, min_capacity = task
current_date = datetime.now()
date_difference = datetime.strptime(date, '%Y-%m-%d') - current_date
if date_difference >= timedelta(days=7):
newTask = taskObj(date, start_time, duration, reservationType, room_option, room_number, min_capacity)
self.taskHeap.insert(newTask)
elif date_difference > 0: #check for past
self.taskQueue.append(task)
self.schedule_Task(self.username, self.password, self.ucfID, date, start_time, duration, reservationType, room_option, room_number, min_capacity)
else:
logging.info(f"Previously Scheduled Task: {task} is invalid")
except Exception as e:
logging.error("An error occurred while processing the file: %s", str(e))
scheduler = StudyRoomScheduler()
def keyboard_interrupt_handler(signal, frame):
logging.info(f'Keyboard Interrupt (Ctrl + C) detected. Exiting now, this may take a minute...')
scheduler.save_data_on_shutdown()
os._exit(0)
#ROUTES
@app.route('/task_enqueue', methods=['POST'])
def enqueue_task():
return scheduler.valid_task()
@app.route('/completion')
def completion_screen():
# Render the completion template
return render_template('completion.html')
@app.route('/')
def index():
return render_template('index.html',username = scheduler.username, password = scheduler.password, ucfID = scheduler.ucfID)
@app.route('/shutdown', methods=['POST'])
def shutdown_server():
if request.method == 'POST':
scheduler.save_data_on_shutdown()
logging.info("Server Shutdown")
os._exit(0)
return "Server shutting down..."
if __name__ == '__main__':
signal.signal(signal.SIGINT, keyboard_interrupt_handler)
logging.basicConfig(filename='app.log', level=logging.INFO,filemode='w', format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger('werkzeug')
handler = logging.StreamHandler()
logger.addHandler(handler)
scheduler.process_file()
host = '127.0.0.1'
port = 5000
url = f'http://{host}:{port}/'
webbrowser.open(url)
app.run(host=host, port=port, debug=False, use_reloader=False)