Add randomness to workload.
import random
import time
import threading
import zmq
import logging
from logging import debug, info
from itertools import count
READY = "READY"
context = zmq.Context()
counter = count()
def checkzmqerror(func):
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
except zmq.ZMQError, err:
info("%r(*%r, **%r) is terminating with error %s", func, args, kwargs, err)
return wrapper
def collector(name, frontend, backend):
backends = set()
debug("collector %s is ready with %r backends", name, len(backends))
dropped = 0
while True:
poller = zmq.Poller()
poller.register(backend, zmq.POLLIN)
poller.register(frontend, zmq.POLLIN)
for socket, event in poller.poll(100):
request = socket.recv_multipart()
debug("collector %s received request %r", name, request)
if socket is backend:
if request[2] == READY:
debug("collector %s has new backend: %r", name, request[0])
backends.add(request[0])
else:
debug("collector %s discard reply %r", name, request)
else:
delim = request.index("")
address_stack = request[:delim+1]
timeout = request[delim+1]
debug("collector %s has new work to do in %s ms", name, timeout)
recipients = backends
backends = set()
debug("collector %s send requests to %r", name, recipients)
for dest in recipients:
backend.send_multipart([dest] + request[delim:])
timeout = int(timeout)
poller = zmq.Poller()
poller.register(backend, zmq.POLLIN)
while recipients:
start = time.time()
debug("%r: collector %s wait %r on %r", start, name, timeout, recipients)
events = poller.poll(timeout)
if not events:
end = time.time()
if (end-start)*1000 < timeout:
info("no event but timeout: %r", events)
else:
dropped += 1
debug("%r: collector %s has a %d timeout with %r (%r)", end, name, dropped, recipients, timeout)
break
for socket, event in events:
reply = socket.recv_multipart()
if reply[2] == READY:
debug("%r is ready on %s", reply[0], name)
backends.add(reply[0])
recipients.discard(reply[0])
elif reply[0] in recipients:
debug("collector %s forward reply", name)
frontend.send_multipart(address_stack + reply[2:])
else:
debug("collector %s discard reply %r", name, reply)
frontend.send_multipart(address_stack + [READY])
debug("collector %s is ready with %r backends", name, len(backends))
@checkzmqerror
def broker_collector(frontend_url, backend_url):
frontend = context.socket(zmq.XREP)
frontend.setsockopt(zmq.IDENTITY, backend_url)
backend = context.socket(zmq.XREP)
info("Binding broker frontend to %s", frontend_url)
frontend.bind(frontend_url)
info("Binding broker backend to %s", backend_url)
backend.bind(backend_url)
collector("broker", frontend, backend)
@checkzmqerror
def proxy_collector(frontend_url, backend_url):
frontend = context.socket(zmq.XREQ)
frontend.setsockopt(zmq.IDENTITY, backend_url)
backend = context.socket(zmq.XREP)
info("Connecting proxy frontend to %s", frontend_url)
frontend.connect(frontend_url)
info("Binding proxy backend to %s", backend_url)
# Sending presence to frontend.
backend.bind(backend_url)
frontend.send_multipart(["", READY])
collector("proxy", frontend, backend)
def worker(socket, workload, failure_rate = 0):
while True:
debug("Worker is ready")
socket.send_multipart(["",READY])
request = socket.recv_multipart()
debug("Worker receive request %r", request)
delim = request.index("")
address = request[:delim+1]
timeout = request[delim+1]
request = request[delim+2:]
assert request[0] == "REQUEST"
if failure_rate and random.randrange(failure_rate) == 0:
info("worker failed")
return False
time.sleep(workload * (1 + random.random()))
debug("worker send reply")
socket.send_multipart(address + [request[1], "DONE"])
@checkzmqerror
def connect_worker(url, workload, failure_rate = 0):
while True:
socket = context.socket(zmq.XREQ)
info("Connecting worker to %s", url)
socket.connect(url)
worker(socket, workload, failure_rate)
def requester(socket, timeout = -1):
while True:
i = str(counter.next())
info("Requester send request %s", i)
socket.send_multipart(["", str(timeout), "REQUEST", i])
results = 0
while True:
reply = socket.recv_multipart()
debug("requester received reply %r", reply)
if reply == ["",READY]:
break
assert reply[1] == i
results += 1
info("requester received %d results", results)
# time.sleep(1)
@checkzmqerror
def connect_requester(url, timeout):
socket = context.socket(zmq.XREQ)
info("Connecting requester to %s", url)
socket.connect(url)
requester(socket, timeout)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
feurl = "inproc://frontend"
beurl = "inproc://backend"
brokers = []
broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
broker.start()
brokers.append(broker)
time.sleep(2)
senders = []
for sender in xrange(10):
sender = threading.Thread(target = connect_requester, args = (feurl,5000))
sender.start()
senders.append(sender)
proxies = []
proxy_urls = []
for proxy in xrange(5):
url = "inproc://proxy_be#%d" % (proxy,)
proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
proxy.start()
proxies.append(proxy)
proxy_urls.append(url)
time.sleep(2)
workers = []
for url in proxy_urls:
for work in xrange(5):
work = threading.Thread(target = connect_worker, args = (url, 1, 4800))
work.start()
workers.append(work)
time.sleep(20)
info("Joining thread")
context.term()
for thread in senders + brokers + proxies + workers:
thread.join()