--- a/collector.py Sun May 08 23:03:00 2011 -0400
+++ b/collector.py Tue May 10 00:21:11 2011 -0400
@@ -18,7 +18,7 @@
info("%r(*%r, **%r) is terminating with error %s", func, args, kwargs, err)
return wrapper
-def collector(name, frontend, backend):
+def collector(name, frontend, backend, timeout):
backends = set()
debug("collector %s is ready with %r backends", name, len(backends))
dropped = 0
@@ -26,7 +26,7 @@
poller = zmq.Poller()
poller.register(backend, zmq.POLLIN)
poller.register(frontend, zmq.POLLIN)
- for socket, event in poller.poll(100):
+ for socket, event in poller.poll():
request = socket.recv_multipart()
debug("collector %s received request %r", name, request)
if socket is backend:
@@ -38,28 +38,19 @@
else:
delim = request.index("")
address_stack = request[:delim+1]
- timeout = request[delim+1]
- debug("collector %s has new work to do in %s ms", name, timeout)
+ debug("collector %s has new work to do", name)
recipients = backends
backends = set()
debug("collector %s send requests to %r", name, recipients)
for dest in recipients:
backend.send_multipart([dest] + request[delim:])
- timeout = int(timeout)
poller = zmq.Poller()
poller.register(backend, zmq.POLLIN)
+ start = time.time()
+ deadline = start + timeout / 1000.0
while recipients:
- start = time.time()
- debug("%r: collector %s wait %r on %r", start, name, timeout, recipients)
- events = poller.poll(timeout)
- if not events:
- end = time.time()
- if (end-start)*1000 < timeout:
- info("no event but timeout: %r", events)
- else:
- dropped += 1
- debug("%r: collector %s has a %d timeout with %r (%r)", end, name, dropped, recipients, timeout)
- break
+ debug("%r: collector %s wait on on %r", start, name, recipients)
+ events = poller.poll(max(0,deadline-time.time()))
for socket, event in events:
reply = socket.recv_multipart()
if reply[2] == READY:
@@ -71,12 +62,16 @@
frontend.send_multipart(address_stack + reply[2:])
else:
debug("collector %s discard reply %r", name, reply)
+ end = time.time()
+ if recipients and end > deadline:
+ info("%r: collector %s has timeout with %d recipients", end, name, len(recipients))
+ break
frontend.send_multipart(address_stack + [READY])
debug("collector %s is ready with %r backends", name, len(backends))
@checkzmqerror
-def broker_collector(frontend_url, backend_url):
+def broker_collector(frontend_url, backend_url, timeout):
frontend = context.socket(zmq.XREP)
frontend.setsockopt(zmq.IDENTITY, backend_url)
backend = context.socket(zmq.XREP)
@@ -84,10 +79,10 @@
frontend.bind(frontend_url)
info("Binding broker backend to %s", backend_url)
backend.bind(backend_url)
- collector("broker", frontend, backend)
+ collector("broker", frontend, backend, timeout)
@checkzmqerror
-def proxy_collector(frontend_url, backend_url):
+def proxy_collector(frontend_url, backend_url, timeout):
frontend = context.socket(zmq.XREQ)
frontend.setsockopt(zmq.IDENTITY, backend_url)
backend = context.socket(zmq.XREP)
@@ -97,7 +92,7 @@
# Sending presence to frontend.
backend.bind(backend_url)
frontend.send_multipart(["", READY])
- collector("proxy", frontend, backend)
+ collector("proxy", frontend, backend, timeout)
def worker(socket, workload, failure_rate = 0):
while True:
@@ -105,10 +100,9 @@
socket.send_multipart(["",READY])
request = socket.recv_multipart()
debug("Worker receive request %r", request)
- delim = request.index("")
- address = request[:delim+1]
- timeout = request[delim+1]
- request = request[delim+2:]
+ content = request.index("") + 1
+ address = request[:content]
+ request = request[content:]
assert request[0] == "REQUEST"
if failure_rate and random.randrange(failure_rate) == 0:
info("worker failed")
@@ -125,11 +119,11 @@
socket.connect(url)
worker(socket, workload, failure_rate)
-def requester(socket, timeout = -1):
+def requester(socket):
while True:
i = str(counter.next())
info("Requester send request %s", i)
- socket.send_multipart(["", str(timeout), "REQUEST", i])
+ socket.send_multipart(["", "REQUEST", i])
results = 0
while True:
reply = socket.recv_multipart()
@@ -139,34 +133,36 @@
assert reply[1] == i
results += 1
info("requester received %d results", results)
- # time.sleep(1)
@checkzmqerror
-def connect_requester(url, timeout):
+def connect_requester(url):
socket = context.socket(zmq.XREQ)
info("Connecting requester to %s", url)
socket.connect(url)
- requester(socket, timeout)
+ requester(socket)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
feurl = "inproc://frontend"
beurl = "inproc://backend"
+ workload = 2.5
+ broker_timeout = 5000
+ proxy_timeout = 5000
brokers = []
- broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
+ broker = threading.Thread(target = broker_collector, args = (feurl, beurl, broker_timeout))
broker.start()
brokers.append(broker)
time.sleep(2)
senders = []
for sender in xrange(10):
- sender = threading.Thread(target = connect_requester, args = (feurl,5000))
+ sender = threading.Thread(target = connect_requester, args = (feurl,))
sender.start()
senders.append(sender)
proxies = []
proxy_urls = []
for proxy in xrange(5):
url = "inproc://proxy_be#%d" % (proxy,)
- proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
+ proxy = threading.Thread(target = proxy_collector, args = (beurl, url, proxy_timeout))
proxy.start()
proxies.append(proxy)
proxy_urls.append(url)
@@ -174,7 +170,7 @@
workers = []
for url in proxy_urls:
for work in xrange(5):
- work = threading.Thread(target = connect_worker, args = (url, 1, 4800))
+ work = threading.Thread(target = connect_worker, args = (url, 3, 10))
work.start()
workers.append(work)
time.sleep(20)