-
Notifications
You must be signed in to change notification settings - Fork 3
/
ssl_connect_contextfactory.py
74 lines (52 loc) · 1.72 KB
/
ssl_connect_contextfactory.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""
Pathological case similar to connecting to many hosts, regenerating the
CertificateOptions repeatedly.
"""
from time import time
from ssl_throughput import cert
from tcp_connect import Client, CloseConnection
from twisted.internet.endpoints import SSL4ClientEndpoint, SSL4ServerEndpoint
from twisted.internet.protocol import Factory, Protocol
from benchlib import driver
class WriteOneByte(Protocol):
def connectionMade(self):
self.transport.write(b"x")
class Client(Client):
protocol = WriteOneByte
def _request(self):
factory = Factory()
factory.protocol = self.protocol
d = self._server().connect(factory)
d.addCallback(self._continue)
d.addErrback(self._stop)
def main(reactor, duration):
concurrency = 50
interface = '127.0.0.%d' % (int(time()) % 254 + 1,)
factory = Factory()
factory.protocol = CloseConnection
serverEndpoint = SSL4ServerEndpoint(
reactor, 0, cert.options(), interface=interface
)
listen = serverEndpoint.listen(factory)
def cbListening(port):
server = lambda: SSL4ClientEndpoint(
reactor,
interface,
port.getHost().port,
cert.options(),
bindAddress=(interface, 0),
)
client = Client(reactor, server)
d = client.run(concurrency, duration)
def cleanup(passthrough):
d = port.stopListening()
d.addCallback(lambda ignored: passthrough)
return d
d.addCallback(cleanup)
return d
listen.addCallback(cbListening)
return listen
if __name__ == '__main__':
import sys
import ssl_connect_contextfactory
driver(ssl_connect_contextfactory.main, sys.argv)