0
|
1 |
import random
|
|
2 |
import time
|
|
3 |
import threading
|
|
4 |
import zmq
|
|
5 |
|
|
6 |
stop = False
|
|
7 |
READY = "READY"
|
|
8 |
context = zmq.Context()
|
|
9 |
|
|
10 |
def collector(frontend, backend):
|
|
11 |
poller = zmq.Poller()
|
|
12 |
poller.register(backend, zmq.POLLIN)
|
|
13 |
backends = set()
|
|
14 |
while not stop:
|
|
15 |
frontend.send_multipart(["",READY])
|
|
16 |
poller.register(frontend, zmq.POLLIN)
|
|
17 |
for socket, event in poller.poll(100):
|
|
18 |
request = socket.recv_multipart()
|
|
19 |
if socket is backend:
|
|
20 |
if request[2] == READY:
|
|
21 |
backends.add(request[0])
|
|
22 |
else:
|
|
23 |
timeout = int(request[request.index("")+1])
|
|
24 |
recipients = backends
|
|
25 |
backends = set()
|
|
26 |
for dest in recipients:
|
|
27 |
backend.send_multipart([dest] + request)
|
|
28 |
poller.unregister(frontend)
|
|
29 |
try:
|
|
30 |
while not stop and recipients:
|
|
31 |
for socket, event in poller.poll(timeout):
|
|
32 |
reply = socket.recv_multipart()
|
|
33 |
if reply[2] == READY:
|
|
34 |
backends.add(reply[0])
|
|
35 |
recipients.discard(reply[0])
|
|
36 |
else:
|
|
37 |
frontend.send_multipart(reply[2:])
|
|
38 |
except ZMQError:
|
|
39 |
pass
|
|
40 |
|
|
41 |
def broker_collector(frontend_url, backend_url):
|
|
42 |
frontend = context.socket(zmq.XREP)
|
|
43 |
backend = context.socket(zmq.XREP)
|
|
44 |
frontend.bind(frontend_url)
|
|
45 |
backend.bind(backend_url)
|
|
46 |
collector(frontend, backend)
|
|
47 |
|
|
48 |
def proxy_collector(frontend_url, backend_url):
|
|
49 |
frontend = context.socket(zmq.XREP)
|
|
50 |
backend = context.socket(zmq.XREP)
|
|
51 |
frontend.connect(frontend_url)
|
|
52 |
backend.bind(backend_url)
|
|
53 |
collector(frontend, backend)
|
|
54 |
|
|
55 |
def worker(socket, workload, failure_rate = 0):
|
|
56 |
while not stop:
|
|
57 |
socket.send_multipart(["",READY])
|
|
58 |
request = socket.recv_multipart()
|
|
59 |
delim = request.index("")
|
|
60 |
timeout = request[delim+1]
|
|
61 |
request = request[delim+2:]
|
|
62 |
assert request[0] == "REQUEST"
|
|
63 |
if failure_rate and random.randrange(failure_rate) == 0:
|
|
64 |
return
|
|
65 |
time.sleep(workload)
|
|
66 |
socket.send_multipart(request[:delim+1] + ["DONE"])
|
|
67 |
|
|
68 |
def connect_worker(url, workload, failure_rate = 0):
|
|
69 |
while not stop:
|
|
70 |
socket = context.socket(zmq.XREQ)
|
|
71 |
socket.connect(url)
|
|
72 |
worker(socket, workload, failure_rate)
|
|
73 |
|
|
74 |
def requester(socket, timeout = -1):
|
|
75 |
while not stop:
|
|
76 |
socket.send_multipart(["", str(timeout), "REQUEST"])
|
|
77 |
results = 0
|
|
78 |
while True:
|
|
79 |
reply = socket.recv_multipart()
|
|
80 |
if reply == ["",READY]:
|
|
81 |
break
|
|
82 |
results += 1
|
|
83 |
print results, "results received"
|
|
84 |
socket.send_multipart(["", "STOP"])
|
|
85 |
|
|
86 |
def connect_requester(url, timeout):
|
|
87 |
socket = context.socket(zmq.XREQ)
|
|
88 |
socket.connect(url)
|
|
89 |
requester(socket, timeout)
|
|
90 |
|
|
91 |
if __name__ == "__main__":
|
|
92 |
feurl = "inproc://frontend"
|
|
93 |
beurl = "inproc://backend"
|
|
94 |
brokers = []
|
|
95 |
broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
|
|
96 |
broker.start()
|
|
97 |
brokers.append(broker)
|
|
98 |
time.sleep(1)
|
|
99 |
senders = []
|
|
100 |
for sender in xrange(2):
|
|
101 |
sender = threading.Thread(target = connect_requester, args = (feurl,1000))
|
|
102 |
sender.start()
|
|
103 |
senders.append(sender)
|
|
104 |
proxies = []
|
|
105 |
proxy_urls = []
|
|
106 |
for proxy in xrange(2):
|
|
107 |
url = "inproc://proxy_be#%d" % (proxy,)
|
|
108 |
proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
|
|
109 |
proxy.start()
|
|
110 |
proxies.append(proxy)
|
|
111 |
proxy_urls.append(url)
|
|
112 |
time.sleep(1)
|
|
113 |
workers = []
|
|
114 |
for url in proxy_urls:
|
|
115 |
for work in xrange(5):
|
|
116 |
work = threading.Thread(target = connect_worker, args = (url, 0.1, 100))
|
|
117 |
work.start()
|
|
118 |
workers.append(work)
|
|
119 |
time.sleep(10)
|
|
120 |
stop = True
|
|
121 |
for thread in senders + brokers + proxies + workers:
|
|
122 |
thread.join()
|
|
123 |
|