-
Notifications
You must be signed in to change notification settings - Fork 0
/
crawling.py
executable file
·116 lines (92 loc) · 3.69 KB
/
crawling.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
from collections import Counter
import logging
from node import Node, NodeHeap
from utils import gather_dict
log = logging.getLogger(__name__)
class SpiderCrawl:
def __init__(self, protocol, node, peers, ksize, alpha):
self.protocol = protocol
self.ksize = ksize
self.alpha = alpha
self.node = node
self.nearest = NodeHeap(self.node, self.ksize)
self.last_ids_crawled = []
log.info("creating spider with peers: %s", peers)
self.nearest.push(peers)
async def _find(self, rpcmethod):
log.info("crawling network with nearest: %s", str(tuple(self.nearest)))
count = self.alpha
if self.nearest.get_ids() == self.last_ids_crawled:
count = len(self.nearest)
self.last_ids_crawled = self.nearest.get_ids()
dicts = {}
for peer in self.nearest.get_uncontacted()[:count]:
dicts[peer.id] = rpcmethod(peer, self.node)
self.nearest.mark_contacted(peer)
found = await gather_dict(dicts)
return await self._nodes_found(found)
async def _nodes_found(self, responses):
raise NotImplementedError
class ValueSpiderCrawl(SpiderCrawl):
def __init__(self, protocol, node, peers, ksize, alpha):
SpiderCrawl.__init__(self, protocol, node, peers, ksize, alpha)
self.nearest_without_value = NodeHeap(self.node, 1)
async def find(self):
return await self._find(self.protocol.call_find_value)
async def _nodes_found(self, responses):
toremove = []
found_values = []
for peerid, response in responses.items():
response = RPCFindResponse(response)
if not response.happened():
toremove.append(peerid)
elif response.has_value():
found_values.append(response.get_value())
else:
peer = self.nearest.get_node(peerid)
self.nearest_without_value.push(peer)
self.nearest.push(response.get_node_list())
self.nearest.remove(toremove)
if found_values:
return await self._handle_found_values(found_values)
if self.nearest.have_contacted_all():
# not found!
return None
return await self.find()
async def _handle_found_values(self, values):
value_counts = Counter(values)
if len(value_counts) != 1:
pass
# log.warning("Got multiple values for key %i: %s", self.node.long_id, str(values))
value = value_counts.most_common(1)[0][0]
peer = self.nearest_without_value.popleft()
if peer:
await self.protocol.call_store(peer, self.node.id, value)
return value
class NodeSpiderCrawl(SpiderCrawl):
async def find(self):
return await self._find(self.protocol.call_find_node)
async def _nodes_found(self, responses):
toremove = []
for peerid, response in responses.items():
response = RPCFindResponse(response)
if not response.happened():
toremove.append(peerid)
else:
self.nearest.push(response.get_node_list())
self.nearest.remove(toremove)
if self.nearest.have_contacted_all():
return list(self.nearest)
return await self.find()
class RPCFindResponse:
def __init__(self, response):
self.response = response
def happened(self):
return self.response[0]
def has_value(self):
return isinstance(self.response[1], dict)
def get_value(self):
return self.response[1]['value']
def get_node_list(self):
nodelist = self.response[1] or []
return [Node(*nodeple) for nodeple in nodelist]