--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/collector.py Sun May 08 19:40:30 2011 -0400
@@ -0,0 +1,123 @@
+import random
+import time
+import threading
+import zmq
+
+stop = False
+READY = "READY"
+context = zmq.Context()
+
+def collector(frontend, backend):
+ poller = zmq.Poller()
+ poller.register(backend, zmq.POLLIN)
+ backends = set()
+ while not stop:
+ frontend.send_multipart(["",READY])
+ poller.register(frontend, zmq.POLLIN)
+ for socket, event in poller.poll(100):
+ request = socket.recv_multipart()
+ if socket is backend:
+ if request[2] == READY:
+ backends.add(request[0])
+ else:
+ timeout = int(request[request.index("")+1])
+ recipients = backends
+ backends = set()
+ for dest in recipients:
+ backend.send_multipart([dest] + request)
+ poller.unregister(frontend)
+ try:
+ while not stop and recipients:
+ for socket, event in poller.poll(timeout):
+ reply = socket.recv_multipart()
+ if reply[2] == READY:
+ backends.add(reply[0])
+ recipients.discard(reply[0])
+ else:
+ frontend.send_multipart(reply[2:])
+ except ZMQError:
+ pass
+
+def broker_collector(frontend_url, backend_url):
+ frontend = context.socket(zmq.XREP)
+ backend = context.socket(zmq.XREP)
+ frontend.bind(frontend_url)
+ backend.bind(backend_url)
+ collector(frontend, backend)
+
+def proxy_collector(frontend_url, backend_url):
+ frontend = context.socket(zmq.XREP)
+ backend = context.socket(zmq.XREP)
+ frontend.connect(frontend_url)
+ backend.bind(backend_url)
+ collector(frontend, backend)
+
+def worker(socket, workload, failure_rate = 0):
+ while not stop:
+ socket.send_multipart(["",READY])
+ request = socket.recv_multipart()
+ delim = request.index("")
+ timeout = request[delim+1]
+ request = request[delim+2:]
+ assert request[0] == "REQUEST"
+ if failure_rate and random.randrange(failure_rate) == 0:
+ return
+ time.sleep(workload)
+ socket.send_multipart(request[:delim+1] + ["DONE"])
+
+def connect_worker(url, workload, failure_rate = 0):
+ while not stop:
+ socket = context.socket(zmq.XREQ)
+ socket.connect(url)
+ worker(socket, workload, failure_rate)
+
+def requester(socket, timeout = -1):
+ while not stop:
+ socket.send_multipart(["", str(timeout), "REQUEST"])
+ results = 0
+ while True:
+ reply = socket.recv_multipart()
+ if reply == ["",READY]:
+ break
+ results += 1
+ print results, "results received"
+ socket.send_multipart(["", "STOP"])
+
+def connect_requester(url, timeout):
+ socket = context.socket(zmq.XREQ)
+ socket.connect(url)
+ requester(socket, timeout)
+
+if __name__ == "__main__":
+ feurl = "inproc://frontend"
+ beurl = "inproc://backend"
+ brokers = []
+ broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
+ broker.start()
+ brokers.append(broker)
+ time.sleep(1)
+ senders = []
+ for sender in xrange(2):
+ sender = threading.Thread(target = connect_requester, args = (feurl,1000))
+ sender.start()
+ senders.append(sender)
+ proxies = []
+ proxy_urls = []
+ for proxy in xrange(2):
+ url = "inproc://proxy_be#%d" % (proxy,)
+ proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
+ proxy.start()
+ proxies.append(proxy)
+ proxy_urls.append(url)
+ time.sleep(1)
+ workers = []
+ for url in proxy_urls:
+ for work in xrange(5):
+ work = threading.Thread(target = connect_worker, args = (url, 0.1, 100))
+ work.start()
+ workers.append(work)
+ time.sleep(10)
+ stop = True
+ for thread in senders + brokers + proxies + workers:
+ thread.join()
+