--- a/collector.py Sun May 08 19:40:30 2011 -0400
+++ b/collector.py Sun May 08 22:14:10 2011 -0400
@@ -2,122 +2,175 @@
import time
import threading
import zmq
+import logging
+from logging import debug, info
+from itertools import count
-stop = False
READY = "READY"
context = zmq.Context()
+counter = count()
-def collector(frontend, backend):
- poller = zmq.Poller()
- poller.register(backend, zmq.POLLIN)
- backends = set()
- while not stop:
- frontend.send_multipart(["",READY])
- poller.register(frontend, zmq.POLLIN)
- for socket, event in poller.poll(100):
- request = socket.recv_multipart()
- if socket is backend:
- if request[2] == READY:
- backends.add(request[0])
- else:
- timeout = int(request[request.index("")+1])
- recipients = backends
- backends = set()
- for dest in recipients:
- backend.send_multipart([dest] + request)
- poller.unregister(frontend)
- try:
- while not stop and recipients:
- for socket, event in poller.poll(timeout):
- reply = socket.recv_multipart()
- if reply[2] == READY:
- backends.add(reply[0])
- recipients.discard(reply[0])
- else:
- frontend.send_multipart(reply[2:])
- except ZMQError:
- pass
+def collector(name, frontend, backend):
+ poller = zmq.Poller()
+ poller.register(backend, zmq.POLLIN)
+ backends = set()
+ info("collector %s is ready with %r backends", name, len(backends))
+ while True:
+ poller.register(frontend, zmq.POLLIN)
+ for socket, event in poller.poll(100):
+ request = socket.recv_multipart()
+ debug("collector %s received request %r", name, request)
+ if socket is backend:
+ if request[2] == READY:
+ debug("collector %s has new backend: %r", name, request[0])
+ backends.add(request[0])
+ else:
+ debug("collector %s discard reply %r", name, request)
+ else:
+ delim = request.index("")
+ address_stack = request[:delim+1]
+ timeout = request[delim+1]
+ if timeout != "STOP":
+ debug("collector %s has new work to do in %s ms", name, timeout)
+ recipients = backends
+ backends = set()
+ debug("collector %s send requests to %r", name, recipients)
+ for dest in recipients:
+ backend.send_multipart([dest] + request[delim:])
+ if timeout == "STOP":
+ info("collector %s is terminating", name)
+ return
+ timeout = int(timeout)
+ poller.unregister(frontend)
+ while recipients:
+ events = poller.poll(timeout)
+ if not events:
+ info("collector %s has a timeout with %r", name, recipients)
+ break
+ for socket, event in events:
+ reply = socket.recv_multipart()
+ if reply[2] == READY:
+ debug("%r is ready on %s", reply[0], name)
+ backends.add(reply[0])
+ recipients.discard(reply[0])
+ elif reply[0] in recipients:
+ debug("collector %s forward reply", name)
+ frontend.send_multipart(address_stack + reply[2:])
+ else:
+ debug("collector %s discard reply %r", name, reply)
+ frontend.send_multipart(address_stack + [READY])
+ info("collector %s is ready with %r backends", name, len(backends))
+
def broker_collector(frontend_url, backend_url):
- frontend = context.socket(zmq.XREP)
- backend = context.socket(zmq.XREP)
- frontend.bind(frontend_url)
- backend.bind(backend_url)
- collector(frontend, backend)
-
+ frontend = context.socket(zmq.XREP)
+ frontend.setsockopt(zmq.IDENTITY, "broker")
+ backend = context.socket(zmq.XREP)
+ info("Binding broker frontend to %s", frontend_url)
+ frontend.bind(frontend_url)
+ info("Binding broker backend to %s", backend_url)
+ backend.bind(backend_url)
+ collector("broker", frontend, backend)
+
def proxy_collector(frontend_url, backend_url):
- frontend = context.socket(zmq.XREP)
- backend = context.socket(zmq.XREP)
- frontend.connect(frontend_url)
- backend.bind(backend_url)
- collector(frontend, backend)
-
+ frontend = context.socket(zmq.XREQ)
+ frontend.setsockopt(zmq.IDENTITY, "proxy")
+ backend = context.socket(zmq.XREP)
+ info("Connecting proxy frontend to %s", frontend_url)
+ frontend.connect(frontend_url)
+ info("Binding proxy backend to %s", backend_url)
+ # Sending presence to frontend.
+ backend.bind(backend_url)
+ frontend.send_multipart(["", READY])
+ collector("proxy", frontend, backend)
+
def worker(socket, workload, failure_rate = 0):
- while not stop:
- socket.send_multipart(["",READY])
- request = socket.recv_multipart()
- delim = request.index("")
- timeout = request[delim+1]
- request = request[delim+2:]
- assert request[0] == "REQUEST"
- if failure_rate and random.randrange(failure_rate) == 0:
- return
- time.sleep(workload)
- socket.send_multipart(request[:delim+1] + ["DONE"])
+ while True:
+ info("Worker is ready")
+ socket.send_multipart(["",READY])
+ request = socket.recv_multipart()
+ info("Worker receive request %r", request)
+ delim = request.index("")
+ address = request[:delim+1]
+ timeout = request[delim+1]
+ if timeout == "STOP":
+ info("worker is terminating")
+ return True
+ request = request[delim+2:]
+ assert request[0] == "REQUEST"
+ if failure_rate and random.randrange(failure_rate) == 0:
+ info("worker failed")
+ return False
+ time.sleep(workload)
+ info("worker send reply")
+ socket.send_multipart(address + [request[1], "DONE"])
def connect_worker(url, workload, failure_rate = 0):
- while not stop:
- socket = context.socket(zmq.XREQ)
- socket.connect(url)
- worker(socket, workload, failure_rate)
+ while True:
+ socket = context.socket(zmq.XREQ)
+ info("Connecting worker to %s", url)
+ socket.connect(url)
+ if worker(socket, workload, failure_rate):
+ return
+
+stop = False
def requester(socket, timeout = -1):
- while not stop:
- socket.send_multipart(["", str(timeout), "REQUEST"])
- results = 0
- while True:
- reply = socket.recv_multipart()
- if reply == ["",READY]:
- break
- results += 1
- print results, "results received"
- socket.send_multipart(["", "STOP"])
+ while not stop:
+ i = str(counter.next())
+ info("Requester send request %s", i)
+ socket.send_multipart(["", str(timeout), "REQUEST", i])
+ results = 0
+ while True:
+ reply = socket.recv_multipart()
+ debug("requester received reply %r", reply)
+ if reply == ["",READY]:
+ break
+ assert reply[1] == i
+ results += 1
+ info("requester received %d results", results)
+ # time.sleep(1)
+ info("requester is terminating")
+ socket.send_multipart(["", "STOP"])
def connect_requester(url, timeout):
- socket = context.socket(zmq.XREQ)
- socket.connect(url)
- requester(socket, timeout)
+ socket = context.socket(zmq.XREQ)
+ info("Connecting requester to %s", url)
+ socket.connect(url)
+ requester(socket, timeout)
if __name__ == "__main__":
- feurl = "inproc://frontend"
- beurl = "inproc://backend"
- brokers = []
- broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
- broker.start()
- brokers.append(broker)
- time.sleep(1)
- senders = []
- for sender in xrange(2):
- sender = threading.Thread(target = connect_requester, args = (feurl,1000))
- sender.start()
- senders.append(sender)
- proxies = []
- proxy_urls = []
- for proxy in xrange(2):
- url = "inproc://proxy_be#%d" % (proxy,)
- proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
- proxy.start()
- proxies.append(proxy)
- proxy_urls.append(url)
- time.sleep(1)
- workers = []
- for url in proxy_urls:
- for work in xrange(5):
- work = threading.Thread(target = connect_worker, args = (url, 0.1, 100))
- work.start()
- workers.append(work)
- time.sleep(10)
- stop = True
- for thread in senders + brokers + proxies + workers:
- thread.join()
-
+ logging.getLogger().setLevel(logging.INFO)
+ feurl = "inproc://frontend"
+ beurl = "inproc://backend"
+ brokers = []
+ broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
+ broker.start()
+ brokers.append(broker)
+ time.sleep(2)
+ senders = []
+ for sender in xrange(5):
+ sender = threading.Thread(target = connect_requester, args = (feurl,5000))
+ sender.start()
+ senders.append(sender)
+ proxies = []
+ proxy_urls = []
+ for proxy in xrange(1):
+ url = "inproc://proxy_be#%d" % (proxy,)
+ proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
+ proxy.start()
+ proxies.append(proxy)
+ proxy_urls.append(url)
+ time.sleep(2)
+ workers = []
+ for url in proxy_urls:
+ for work in xrange(1):
+ work = threading.Thread(target = connect_worker, args = (url, 1, 0))
+ work.start()
+ workers.append(work)
+ time.sleep(10)
+ stop = True
+ info("Joining thread")
+ for thread in senders + brokers + proxies + workers:
+ thread.join()
+