equal
deleted
inserted
replaced
13 def checkzmqerror(func): |
13 def checkzmqerror(func): |
14 def wrapper(*args, **kwargs): |
14 def wrapper(*args, **kwargs): |
15 try: |
15 try: |
16 func(*args, **kwargs) |
16 func(*args, **kwargs) |
17 except zmq.ZMQError, err: |
17 except zmq.ZMQError, err: |
18 info("%r(*%r, **%r) is terminating", func, args, kwargs) |
18 info("%r(*%r, **%r) is terminating with error %s", func, args, kwargs, err) |
19 return wrapper |
19 return wrapper |
20 |
20 |
21 def collector(name, frontend, backend): |
21 def collector(name, frontend, backend): |
22 backends = set() |
22 backends = set() |
23 info("collector %s is ready with %r backends", name, len(backends)) |
23 debug("collector %s is ready with %r backends", name, len(backends)) |
24 dropped = 0 |
24 dropped = 0 |
25 while True: |
25 while True: |
26 poller = zmq.Poller() |
26 poller = zmq.Poller() |
27 poller.register(backend, zmq.POLLIN) |
27 poller.register(backend, zmq.POLLIN) |
28 poller.register(frontend, zmq.POLLIN) |
28 poller.register(frontend, zmq.POLLIN) |
70 debug("collector %s forward reply", name) |
70 debug("collector %s forward reply", name) |
71 frontend.send_multipart(address_stack + reply[2:]) |
71 frontend.send_multipart(address_stack + reply[2:]) |
72 else: |
72 else: |
73 debug("collector %s discard reply %r", name, reply) |
73 debug("collector %s discard reply %r", name, reply) |
74 frontend.send_multipart(address_stack + [READY]) |
74 frontend.send_multipart(address_stack + [READY]) |
75 info("collector %s is ready with %r backends", name, len(backends)) |
75 debug("collector %s is ready with %r backends", name, len(backends)) |
76 |
76 |
77 |
77 |
78 @checkzmqerror |
78 @checkzmqerror |
79 def broker_collector(frontend_url, backend_url): |
79 def broker_collector(frontend_url, backend_url): |
80 frontend = context.socket(zmq.XREP) |
80 frontend = context.socket(zmq.XREP) |
99 frontend.send_multipart(["", READY]) |
99 frontend.send_multipart(["", READY]) |
100 collector("proxy", frontend, backend) |
100 collector("proxy", frontend, backend) |
101 |
101 |
102 def worker(socket, workload, failure_rate = 0): |
102 def worker(socket, workload, failure_rate = 0): |
103 while True: |
103 while True: |
104 info("Worker is ready") |
104 debug("Worker is ready") |
105 socket.send_multipart(["",READY]) |
105 socket.send_multipart(["",READY]) |
106 request = socket.recv_multipart() |
106 request = socket.recv_multipart() |
107 debug("Worker receive request %r", request) |
107 debug("Worker receive request %r", request) |
108 delim = request.index("") |
108 delim = request.index("") |
109 address = request[:delim+1] |
109 address = request[:delim+1] |
172 proxy_urls.append(url) |
172 proxy_urls.append(url) |
173 time.sleep(2) |
173 time.sleep(2) |
174 workers = [] |
174 workers = [] |
175 for url in proxy_urls: |
175 for url in proxy_urls: |
176 for work in xrange(5): |
176 for work in xrange(5): |
177 work = threading.Thread(target = connect_worker, args = (url, 1, 0)) |
177 work = threading.Thread(target = connect_worker, args = (url, 1, 4500)) |
178 work.start() |
178 work.start() |
179 workers.append(work) |
179 workers.append(work) |
180 time.sleep(20) |
180 time.sleep(20) |
181 info("Joining thread") |
181 info("Joining thread") |
182 context.term() |
182 context.term() |