--- a/collector.py Sun May 08 22:41:11 2011 -0400
+++ b/collector.py Sun May 08 22:47:41 2011 -0400
@@ -78,7 +78,7 @@
@checkzmqerror
def broker_collector(frontend_url, backend_url):
frontend = context.socket(zmq.XREP)
- frontend.setsockopt(zmq.IDENTITY, "broker")
+ frontend.setsockopt(zmq.IDENTITY, backend_url)
backend = context.socket(zmq.XREP)
info("Binding broker frontend to %s", frontend_url)
frontend.bind(frontend_url)
@@ -89,7 +89,7 @@
@checkzmqerror
def proxy_collector(frontend_url, backend_url):
frontend = context.socket(zmq.XREQ)
- frontend.setsockopt(zmq.IDENTITY, "proxy")
+ frontend.setsockopt(zmq.IDENTITY, backend_url)
backend = context.socket(zmq.XREP)
info("Connecting proxy frontend to %s", frontend_url)
frontend.connect(frontend_url)
@@ -158,13 +158,13 @@
brokers.append(broker)
time.sleep(2)
senders = []
- for sender in xrange(5):
+ for sender in xrange(10):
sender = threading.Thread(target = connect_requester, args = (feurl,5000))
sender.start()
senders.append(sender)
proxies = []
proxy_urls = []
- for proxy in xrange(1):
+ for proxy in xrange(5):
url = "inproc://proxy_be#%d" % (proxy,)
proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
proxy.start()
@@ -173,11 +173,11 @@
time.sleep(2)
workers = []
for url in proxy_urls:
- for work in xrange(1):
+ for work in xrange(5):
work = threading.Thread(target = connect_worker, args = (url, 1, 0))
work.start()
workers.append(work)
- time.sleep(10)
+ time.sleep(20)
info("Joining thread")
context.term()
for thread in senders + brokers + proxies + workers: