--- a/collector.py Sun May 08 22:14:10 2011 -0400
+++ b/collector.py Sun May 08 22:41:11 2011 -0400
@@ -10,12 +10,21 @@
context = zmq.Context()
counter = count()
+def checkzmqerror(func):
+ def wrapper(*args, **kwargs):
+ try:
+ func(*args, **kwargs)
+ except zmq.ZMQError, err:
+ info("%r(*%r, **%r) is terminating", func, args, kwargs)
+ return wrapper
+
def collector(name, frontend, backend):
- poller = zmq.Poller()
- poller.register(backend, zmq.POLLIN)
backends = set()
info("collector %s is ready with %r backends", name, len(backends))
+ dropped = 0
while True:
+ poller = zmq.Poller()
+ poller.register(backend, zmq.POLLIN)
poller.register(frontend, zmq.POLLIN)
for socket, event in poller.poll(100):
request = socket.recv_multipart()
@@ -30,23 +39,27 @@
delim = request.index("")
address_stack = request[:delim+1]
timeout = request[delim+1]
- if timeout != "STOP":
- debug("collector %s has new work to do in %s ms", name, timeout)
+ debug("collector %s has new work to do in %s ms", name, timeout)
recipients = backends
backends = set()
debug("collector %s send requests to %r", name, recipients)
for dest in recipients:
backend.send_multipart([dest] + request[delim:])
- if timeout == "STOP":
- info("collector %s is terminating", name)
- return
timeout = int(timeout)
- poller.unregister(frontend)
+ poller = zmq.Poller()
+ poller.register(backend, zmq.POLLIN)
while recipients:
+ start = time.time()
+ debug("%r: collector %s wait %r on %r", start, name, timeout, recipients)
events = poller.poll(timeout)
if not events:
- info("collector %s has a timeout with %r", name, recipients)
- break
+ end = time.time()
+ if (end-start)*1000 < timeout:
+ info("no event but timeout: %r", events)
+ else:
+ dropped += 1
+ debug("%r: collector %s has a %d timeout with %r (%r)", end, name, dropped, recipients, timeout)
+ break
for socket, event in events:
reply = socket.recv_multipart()
if reply[2] == READY:
@@ -62,6 +75,7 @@
info("collector %s is ready with %r backends", name, len(backends))
+@checkzmqerror
def broker_collector(frontend_url, backend_url):
frontend = context.socket(zmq.XREP)
frontend.setsockopt(zmq.IDENTITY, "broker")
@@ -72,6 +86,7 @@
backend.bind(backend_url)
collector("broker", frontend, backend)
+@checkzmqerror
def proxy_collector(frontend_url, backend_url):
frontend = context.socket(zmq.XREQ)
frontend.setsockopt(zmq.IDENTITY, "proxy")
@@ -89,34 +104,29 @@
info("Worker is ready")
socket.send_multipart(["",READY])
request = socket.recv_multipart()
- info("Worker receive request %r", request)
+ debug("Worker receive request %r", request)
delim = request.index("")
address = request[:delim+1]
timeout = request[delim+1]
- if timeout == "STOP":
- info("worker is terminating")
- return True
request = request[delim+2:]
assert request[0] == "REQUEST"
if failure_rate and random.randrange(failure_rate) == 0:
info("worker failed")
return False
time.sleep(workload)
- info("worker send reply")
+ debug("worker send reply")
socket.send_multipart(address + [request[1], "DONE"])
+@checkzmqerror
def connect_worker(url, workload, failure_rate = 0):
while True:
socket = context.socket(zmq.XREQ)
info("Connecting worker to %s", url)
socket.connect(url)
- if worker(socket, workload, failure_rate):
- return
-
-stop = False
+ worker(socket, workload, failure_rate)
def requester(socket, timeout = -1):
- while not stop:
+ while True:
i = str(counter.next())
info("Requester send request %s", i)
socket.send_multipart(["", str(timeout), "REQUEST", i])
@@ -130,9 +140,8 @@
results += 1
info("requester received %d results", results)
# time.sleep(1)
- info("requester is terminating")
- socket.send_multipart(["", "STOP"])
+@checkzmqerror
def connect_requester(url, timeout):
socket = context.socket(zmq.XREQ)
info("Connecting requester to %s", url)
@@ -169,8 +178,8 @@
work.start()
workers.append(work)
time.sleep(10)
- stop = True
info("Joining thread")
+ context.term()
for thread in senders + brokers + proxies + workers:
thread.join()