-
Notifications
You must be signed in to change notification settings - Fork 4
/
crawler.py
133 lines (113 loc) · 3.42 KB
/
crawler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
from concurrent import futures
import time
import math
import thread
from argparse import ArgumentParser
import argparse
import grpc
import search_pb2
import search_pb2_grpc
import json
import logging
from utils import querydb, init_logger, parse_level
from data.generatedata import generate_indices
from bson import json_util
from bson import BSON
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
MAX_RETRIES = 3
def build_parser():
parser = ArgumentParser()
parser.add_argument('--master',
dest='master', help='Master IP address',
default='localhost:50051',
required=True)
parser.add_argument('--backup',
dest='backup',
default='localhost:50063',
help='backup IP address',
required=True)
parser.add_argument('--port',
dest='port',
default='50060',
help='Port',
required=False)
choices = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
parser.add_argument('--logging',
dest='logging_level', help='Logging level',
choices=choices,
default='DEBUG',
required=False)
return parser
class Crawler(object):
def __init__(self, master, backup, logging_level, data=None):
# initialize logger
self.logger = init_logger('crawler', logging_level)
self.master = master
self.backup = backup
self.data = data
# TODO: add sync between backup and crawler
def MasterChange(self, request, context):
self.master = self.backup
self.logger.info("Changed master ip to "+ self.master)
return search_pb2.Acknowledgement(status=1)
def write_to_master(self, word):
if self.data is None:
self.data = generate_indices('pending', word, 25, 30)
print self.data
logger = self.logger
# send to master
print "Master is ", self.master
master_channel = grpc.insecure_channel(self.master)
master_stub = search_pb2_grpc.DatabaseWriteStub(master_channel)
logger.info("Sending data to master")
#try:
request = search_pb2.CommitRequest(data=json.dumps(self.data))
response = master_stub.WriteIndicesToTable(request)
logger.info("Operation success")
print "Done"
def pushWrite(crawler):
while True:
query = raw_input("Do you want to push the write(Y/N): ")
query = query.strip()
if query == 'N' or query == 'No' or query == 'n':
break
elif query == 'Y' or query == 'Yes' or query == 'y':
try:
word = raw_input("Enter word: ")
word = word.strip()
crawler.write_to_master(word)
except Exception as e:
print str(e)
break
def run(master, backup, logging_level, port, data=None):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
# add write service to backup server to handle database updates from crawler
crawler = Crawler(master, backup, logging_level, data)
search_pb2_grpc.add_LeaderNoticeServicer_to_server(crawler, server)
server.add_insecure_port('[::]:'+ port)
print "Started crawler"
crawler.logger.info("Starting server")
# set up query for writes
try:
thread.start_new_thread(pushWrite, (crawler,))
except Exception as e:
print str(e)
crawler.logger.error("Cannot start new thread due to " + str(e))
server.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
crawler.logger.info("Shutting down server")
logging.shutdown()
server.stop(0)
def main():
parser = build_parser()
options = parser.parse_args()
master = options.master
backup = options.backup
port = options.port
logging_level = parse_level(options.logging_level)
run(master, backup, logging_level, port, data=None)
if __name__ == '__main__':
main()