Some rewrite and debugging.
Issue: worker is accumulating past job.
import random
import time
import threading
import zmq
import logging
from logging import debug, info
from itertools import count
READY = "READY"
context = zmq.Context()
counter = count()
def collector(name, frontend, backend):
poller = zmq.Poller()
poller.register(backend, zmq.POLLIN)
backends = set()
info("collector %s is ready with %r backends", name, len(backends))
while True:
poller.register(frontend, zmq.POLLIN)
for socket, event in poller.poll(100):
request = socket.recv_multipart()
debug("collector %s received request %r", name, request)
if socket is backend:
if request[2] == READY:
debug("collector %s has new backend: %r", name, request[0])
backends.add(request[0])
else:
debug("collector %s discard reply %r", name, request)
else:
delim = request.index("")
address_stack = request[:delim+1]
timeout = request[delim+1]
if timeout != "STOP":
debug("collector %s has new work to do in %s ms", name, timeout)
recipients = backends
backends = set()
debug("collector %s send requests to %r", name, recipients)
for dest in recipients:
backend.send_multipart([dest] + request[delim:])
if timeout == "STOP":
info("collector %s is terminating", name)
return
timeout = int(timeout)
poller.unregister(frontend)
while recipients:
events = poller.poll(timeout)
if not events:
info("collector %s has a timeout with %r", name, recipients)
break
for socket, event in events:
reply = socket.recv_multipart()
if reply[2] == READY:
debug("%r is ready on %s", reply[0], name)
backends.add(reply[0])
recipients.discard(reply[0])
elif reply[0] in recipients:
debug("collector %s forward reply", name)
frontend.send_multipart(address_stack + reply[2:])
else:
debug("collector %s discard reply %r", name, reply)
frontend.send_multipart(address_stack + [READY])
info("collector %s is ready with %r backends", name, len(backends))
def broker_collector(frontend_url, backend_url):
frontend = context.socket(zmq.XREP)
frontend.setsockopt(zmq.IDENTITY, "broker")
backend = context.socket(zmq.XREP)
info("Binding broker frontend to %s", frontend_url)
frontend.bind(frontend_url)
info("Binding broker backend to %s", backend_url)
backend.bind(backend_url)
collector("broker", frontend, backend)
def proxy_collector(frontend_url, backend_url):
frontend = context.socket(zmq.XREQ)
frontend.setsockopt(zmq.IDENTITY, "proxy")
backend = context.socket(zmq.XREP)
info("Connecting proxy frontend to %s", frontend_url)
frontend.connect(frontend_url)
info("Binding proxy backend to %s", backend_url)
# Sending presence to frontend.
backend.bind(backend_url)
frontend.send_multipart(["", READY])
collector("proxy", frontend, backend)
def worker(socket, workload, failure_rate = 0):
while True:
info("Worker is ready")
socket.send_multipart(["",READY])
request = socket.recv_multipart()
info("Worker receive request %r", request)
delim = request.index("")
address = request[:delim+1]
timeout = request[delim+1]
if timeout == "STOP":
info("worker is terminating")
return True
request = request[delim+2:]
assert request[0] == "REQUEST"
if failure_rate and random.randrange(failure_rate) == 0:
info("worker failed")
return False
time.sleep(workload)
info("worker send reply")
socket.send_multipart(address + [request[1], "DONE"])
def connect_worker(url, workload, failure_rate = 0):
while True:
socket = context.socket(zmq.XREQ)
info("Connecting worker to %s", url)
socket.connect(url)
if worker(socket, workload, failure_rate):
return
stop = False
def requester(socket, timeout = -1):
while not stop:
i = str(counter.next())
info("Requester send request %s", i)
socket.send_multipart(["", str(timeout), "REQUEST", i])
results = 0
while True:
reply = socket.recv_multipart()
debug("requester received reply %r", reply)
if reply == ["",READY]:
break
assert reply[1] == i
results += 1
info("requester received %d results", results)
# time.sleep(1)
info("requester is terminating")
socket.send_multipart(["", "STOP"])
def connect_requester(url, timeout):
socket = context.socket(zmq.XREQ)
info("Connecting requester to %s", url)
socket.connect(url)
requester(socket, timeout)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
feurl = "inproc://frontend"
beurl = "inproc://backend"
brokers = []
broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
broker.start()
brokers.append(broker)
time.sleep(2)
senders = []
for sender in xrange(5):
sender = threading.Thread(target = connect_requester, args = (feurl,5000))
sender.start()
senders.append(sender)
proxies = []
proxy_urls = []
for proxy in xrange(1):
url = "inproc://proxy_be#%d" % (proxy,)
proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
proxy.start()
proxies.append(proxy)
proxy_urls.append(url)
time.sleep(2)
workers = []
for url in proxy_urls:
for work in xrange(1):
work = threading.Thread(target = connect_worker, args = (url, 1, 0))
work.start()
workers.append(work)
time.sleep(10)
stop = True
info("Joining thread")
for thread in senders + brokers + proxies + workers:
thread.join()