# HG changeset patch # User Fabien Ninoles # Date 1304907250 14400 # Node ID 48d514cc3309c8ffd58b6576df53fc8f257bc8f3 # Parent 57d81f2bf26f8f635a9ad1e47c1e17554203e88f Some rewrite and debugging. Issue: worker is accumulating past job. diff -r 57d81f2bf26f -r 48d514cc3309 collector.py --- a/collector.py Sun May 08 19:40:30 2011 -0400 +++ b/collector.py Sun May 08 22:14:10 2011 -0400 @@ -2,122 +2,175 @@ import time import threading import zmq +import logging +from logging import debug, info +from itertools import count -stop = False READY = "READY" context = zmq.Context() +counter = count() -def collector(frontend, backend): - poller = zmq.Poller() - poller.register(backend, zmq.POLLIN) - backends = set() - while not stop: - frontend.send_multipart(["",READY]) - poller.register(frontend, zmq.POLLIN) - for socket, event in poller.poll(100): - request = socket.recv_multipart() - if socket is backend: - if request[2] == READY: - backends.add(request[0]) - else: - timeout = int(request[request.index("")+1]) - recipients = backends - backends = set() - for dest in recipients: - backend.send_multipart([dest] + request) - poller.unregister(frontend) - try: - while not stop and recipients: - for socket, event in poller.poll(timeout): - reply = socket.recv_multipart() - if reply[2] == READY: - backends.add(reply[0]) - recipients.discard(reply[0]) - else: - frontend.send_multipart(reply[2:]) - except ZMQError: - pass +def collector(name, frontend, backend): + poller = zmq.Poller() + poller.register(backend, zmq.POLLIN) + backends = set() + info("collector %s is ready with %r backends", name, len(backends)) + while True: + poller.register(frontend, zmq.POLLIN) + for socket, event in poller.poll(100): + request = socket.recv_multipart() + debug("collector %s received request %r", name, request) + if socket is backend: + if request[2] == READY: + debug("collector %s has new backend: %r", name, request[0]) + backends.add(request[0]) + else: + debug("collector %s discard reply %r", name, request) + else: + delim = request.index("") + address_stack = request[:delim+1] + timeout = request[delim+1] + if timeout != "STOP": + debug("collector %s has new work to do in %s ms", name, timeout) + recipients = backends + backends = set() + debug("collector %s send requests to %r", name, recipients) + for dest in recipients: + backend.send_multipart([dest] + request[delim:]) + if timeout == "STOP": + info("collector %s is terminating", name) + return + timeout = int(timeout) + poller.unregister(frontend) + while recipients: + events = poller.poll(timeout) + if not events: + info("collector %s has a timeout with %r", name, recipients) + break + for socket, event in events: + reply = socket.recv_multipart() + if reply[2] == READY: + debug("%r is ready on %s", reply[0], name) + backends.add(reply[0]) + recipients.discard(reply[0]) + elif reply[0] in recipients: + debug("collector %s forward reply", name) + frontend.send_multipart(address_stack + reply[2:]) + else: + debug("collector %s discard reply %r", name, reply) + frontend.send_multipart(address_stack + [READY]) + info("collector %s is ready with %r backends", name, len(backends)) + def broker_collector(frontend_url, backend_url): - frontend = context.socket(zmq.XREP) - backend = context.socket(zmq.XREP) - frontend.bind(frontend_url) - backend.bind(backend_url) - collector(frontend, backend) - + frontend = context.socket(zmq.XREP) + frontend.setsockopt(zmq.IDENTITY, "broker") + backend = context.socket(zmq.XREP) + info("Binding broker frontend to %s", frontend_url) + frontend.bind(frontend_url) + info("Binding broker backend to %s", backend_url) + backend.bind(backend_url) + collector("broker", frontend, backend) + def proxy_collector(frontend_url, backend_url): - frontend = context.socket(zmq.XREP) - backend = context.socket(zmq.XREP) - frontend.connect(frontend_url) - backend.bind(backend_url) - collector(frontend, backend) - + frontend = context.socket(zmq.XREQ) + frontend.setsockopt(zmq.IDENTITY, "proxy") + backend = context.socket(zmq.XREP) + info("Connecting proxy frontend to %s", frontend_url) + frontend.connect(frontend_url) + info("Binding proxy backend to %s", backend_url) + # Sending presence to frontend. + backend.bind(backend_url) + frontend.send_multipart(["", READY]) + collector("proxy", frontend, backend) + def worker(socket, workload, failure_rate = 0): - while not stop: - socket.send_multipart(["",READY]) - request = socket.recv_multipart() - delim = request.index("") - timeout = request[delim+1] - request = request[delim+2:] - assert request[0] == "REQUEST" - if failure_rate and random.randrange(failure_rate) == 0: - return - time.sleep(workload) - socket.send_multipart(request[:delim+1] + ["DONE"]) + while True: + info("Worker is ready") + socket.send_multipart(["",READY]) + request = socket.recv_multipart() + info("Worker receive request %r", request) + delim = request.index("") + address = request[:delim+1] + timeout = request[delim+1] + if timeout == "STOP": + info("worker is terminating") + return True + request = request[delim+2:] + assert request[0] == "REQUEST" + if failure_rate and random.randrange(failure_rate) == 0: + info("worker failed") + return False + time.sleep(workload) + info("worker send reply") + socket.send_multipart(address + [request[1], "DONE"]) def connect_worker(url, workload, failure_rate = 0): - while not stop: - socket = context.socket(zmq.XREQ) - socket.connect(url) - worker(socket, workload, failure_rate) + while True: + socket = context.socket(zmq.XREQ) + info("Connecting worker to %s", url) + socket.connect(url) + if worker(socket, workload, failure_rate): + return + +stop = False def requester(socket, timeout = -1): - while not stop: - socket.send_multipart(["", str(timeout), "REQUEST"]) - results = 0 - while True: - reply = socket.recv_multipart() - if reply == ["",READY]: - break - results += 1 - print results, "results received" - socket.send_multipart(["", "STOP"]) + while not stop: + i = str(counter.next()) + info("Requester send request %s", i) + socket.send_multipart(["", str(timeout), "REQUEST", i]) + results = 0 + while True: + reply = socket.recv_multipart() + debug("requester received reply %r", reply) + if reply == ["",READY]: + break + assert reply[1] == i + results += 1 + info("requester received %d results", results) + # time.sleep(1) + info("requester is terminating") + socket.send_multipart(["", "STOP"]) def connect_requester(url, timeout): - socket = context.socket(zmq.XREQ) - socket.connect(url) - requester(socket, timeout) + socket = context.socket(zmq.XREQ) + info("Connecting requester to %s", url) + socket.connect(url) + requester(socket, timeout) if __name__ == "__main__": - feurl = "inproc://frontend" - beurl = "inproc://backend" - brokers = [] - broker = threading.Thread(target = broker_collector, args = (feurl, beurl)) - broker.start() - brokers.append(broker) - time.sleep(1) - senders = [] - for sender in xrange(2): - sender = threading.Thread(target = connect_requester, args = (feurl,1000)) - sender.start() - senders.append(sender) - proxies = [] - proxy_urls = [] - for proxy in xrange(2): - url = "inproc://proxy_be#%d" % (proxy,) - proxy = threading.Thread(target = proxy_collector, args = (beurl, url)) - proxy.start() - proxies.append(proxy) - proxy_urls.append(url) - time.sleep(1) - workers = [] - for url in proxy_urls: - for work in xrange(5): - work = threading.Thread(target = connect_worker, args = (url, 0.1, 100)) - work.start() - workers.append(work) - time.sleep(10) - stop = True - for thread in senders + brokers + proxies + workers: - thread.join() - + logging.getLogger().setLevel(logging.INFO) + feurl = "inproc://frontend" + beurl = "inproc://backend" + brokers = [] + broker = threading.Thread(target = broker_collector, args = (feurl, beurl)) + broker.start() + brokers.append(broker) + time.sleep(2) + senders = [] + for sender in xrange(5): + sender = threading.Thread(target = connect_requester, args = (feurl,5000)) + sender.start() + senders.append(sender) + proxies = [] + proxy_urls = [] + for proxy in xrange(1): + url = "inproc://proxy_be#%d" % (proxy,) + proxy = threading.Thread(target = proxy_collector, args = (beurl, url)) + proxy.start() + proxies.append(proxy) + proxy_urls.append(url) + time.sleep(2) + workers = [] + for url in proxy_urls: + for work in xrange(1): + work = threading.Thread(target = connect_worker, args = (url, 1, 0)) + work.start() + workers.append(work) + time.sleep(10) + stop = True + info("Joining thread") + for thread in senders + brokers + proxies + workers: + thread.join() +