Add collector.
import random
import time
import threading
import zmq
stop = False
READY = "READY"
context = zmq.Context()
def collector(frontend, backend):
poller = zmq.Poller()
poller.register(backend, zmq.POLLIN)
backends = set()
while not stop:
frontend.send_multipart(["",READY])
poller.register(frontend, zmq.POLLIN)
for socket, event in poller.poll(100):
request = socket.recv_multipart()
if socket is backend:
if request[2] == READY:
backends.add(request[0])
else:
timeout = int(request[request.index("")+1])
recipients = backends
backends = set()
for dest in recipients:
backend.send_multipart([dest] + request)
poller.unregister(frontend)
try:
while not stop and recipients:
for socket, event in poller.poll(timeout):
reply = socket.recv_multipart()
if reply[2] == READY:
backends.add(reply[0])
recipients.discard(reply[0])
else:
frontend.send_multipart(reply[2:])
except ZMQError:
pass
def broker_collector(frontend_url, backend_url):
frontend = context.socket(zmq.XREP)
backend = context.socket(zmq.XREP)
frontend.bind(frontend_url)
backend.bind(backend_url)
collector(frontend, backend)
def proxy_collector(frontend_url, backend_url):
frontend = context.socket(zmq.XREP)
backend = context.socket(zmq.XREP)
frontend.connect(frontend_url)
backend.bind(backend_url)
collector(frontend, backend)
def worker(socket, workload, failure_rate = 0):
while not stop:
socket.send_multipart(["",READY])
request = socket.recv_multipart()
delim = request.index("")
timeout = request[delim+1]
request = request[delim+2:]
assert request[0] == "REQUEST"
if failure_rate and random.randrange(failure_rate) == 0:
return
time.sleep(workload)
socket.send_multipart(request[:delim+1] + ["DONE"])
def connect_worker(url, workload, failure_rate = 0):
while not stop:
socket = context.socket(zmq.XREQ)
socket.connect(url)
worker(socket, workload, failure_rate)
def requester(socket, timeout = -1):
while not stop:
socket.send_multipart(["", str(timeout), "REQUEST"])
results = 0
while True:
reply = socket.recv_multipart()
if reply == ["",READY]:
break
results += 1
print results, "results received"
socket.send_multipart(["", "STOP"])
def connect_requester(url, timeout):
socket = context.socket(zmq.XREQ)
socket.connect(url)
requester(socket, timeout)
if __name__ == "__main__":
feurl = "inproc://frontend"
beurl = "inproc://backend"
brokers = []
broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
broker.start()
brokers.append(broker)
time.sleep(1)
senders = []
for sender in xrange(2):
sender = threading.Thread(target = connect_requester, args = (feurl,1000))
sender.start()
senders.append(sender)
proxies = []
proxy_urls = []
for proxy in xrange(2):
url = "inproc://proxy_be#%d" % (proxy,)
proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
proxy.start()
proxies.append(proxy)
proxy_urls.append(url)
time.sleep(1)
workers = []
for url in proxy_urls:
for work in xrange(5):
work = threading.Thread(target = connect_worker, args = (url, 0.1, 100))
work.start()
workers.append(work)
time.sleep(10)
stop = True
for thread in senders + brokers + proxies + workers:
thread.join()