16 func(*args, **kwargs) |
16 func(*args, **kwargs) |
17 except zmq.ZMQError, err: |
17 except zmq.ZMQError, err: |
18 info("%r(*%r, **%r) is terminating with error %s", func, args, kwargs, err) |
18 info("%r(*%r, **%r) is terminating with error %s", func, args, kwargs, err) |
19 return wrapper |
19 return wrapper |
20 |
20 |
21 def collector(name, frontend, backend): |
21 def collector(name, frontend, backend, timeout): |
22 backends = set() |
22 backends = set() |
23 debug("collector %s is ready with %r backends", name, len(backends)) |
23 debug("collector %s is ready with %r backends", name, len(backends)) |
24 dropped = 0 |
24 dropped = 0 |
25 while True: |
25 while True: |
26 poller = zmq.Poller() |
26 poller = zmq.Poller() |
27 poller.register(backend, zmq.POLLIN) |
27 poller.register(backend, zmq.POLLIN) |
28 poller.register(frontend, zmq.POLLIN) |
28 poller.register(frontend, zmq.POLLIN) |
29 for socket, event in poller.poll(100): |
29 for socket, event in poller.poll(): |
30 request = socket.recv_multipart() |
30 request = socket.recv_multipart() |
31 debug("collector %s received request %r", name, request) |
31 debug("collector %s received request %r", name, request) |
32 if socket is backend: |
32 if socket is backend: |
33 if request[2] == READY: |
33 if request[2] == READY: |
34 debug("collector %s has new backend: %r", name, request[0]) |
34 debug("collector %s has new backend: %r", name, request[0]) |
36 else: |
36 else: |
37 debug("collector %s discard reply %r", name, request) |
37 debug("collector %s discard reply %r", name, request) |
38 else: |
38 else: |
39 delim = request.index("") |
39 delim = request.index("") |
40 address_stack = request[:delim+1] |
40 address_stack = request[:delim+1] |
41 timeout = request[delim+1] |
41 debug("collector %s has new work to do", name) |
42 debug("collector %s has new work to do in %s ms", name, timeout) |
|
43 recipients = backends |
42 recipients = backends |
44 backends = set() |
43 backends = set() |
45 debug("collector %s send requests to %r", name, recipients) |
44 debug("collector %s send requests to %r", name, recipients) |
46 for dest in recipients: |
45 for dest in recipients: |
47 backend.send_multipart([dest] + request[delim:]) |
46 backend.send_multipart([dest] + request[delim:]) |
48 timeout = int(timeout) |
|
49 poller = zmq.Poller() |
47 poller = zmq.Poller() |
50 poller.register(backend, zmq.POLLIN) |
48 poller.register(backend, zmq.POLLIN) |
|
49 start = time.time() |
|
50 deadline = start + timeout / 1000.0 |
51 while recipients: |
51 while recipients: |
52 start = time.time() |
52 debug("%r: collector %s wait on on %r", start, name, recipients) |
53 debug("%r: collector %s wait %r on %r", start, name, timeout, recipients) |
53 events = poller.poll(max(0,deadline-time.time())) |
54 events = poller.poll(timeout) |
|
55 if not events: |
|
56 end = time.time() |
|
57 if (end-start)*1000 < timeout: |
|
58 info("no event but timeout: %r", events) |
|
59 else: |
|
60 dropped += 1 |
|
61 debug("%r: collector %s has a %d timeout with %r (%r)", end, name, dropped, recipients, timeout) |
|
62 break |
|
63 for socket, event in events: |
54 for socket, event in events: |
64 reply = socket.recv_multipart() |
55 reply = socket.recv_multipart() |
65 if reply[2] == READY: |
56 if reply[2] == READY: |
66 debug("%r is ready on %s", reply[0], name) |
57 debug("%r is ready on %s", reply[0], name) |
67 backends.add(reply[0]) |
58 backends.add(reply[0]) |
69 elif reply[0] in recipients: |
60 elif reply[0] in recipients: |
70 debug("collector %s forward reply", name) |
61 debug("collector %s forward reply", name) |
71 frontend.send_multipart(address_stack + reply[2:]) |
62 frontend.send_multipart(address_stack + reply[2:]) |
72 else: |
63 else: |
73 debug("collector %s discard reply %r", name, reply) |
64 debug("collector %s discard reply %r", name, reply) |
|
65 end = time.time() |
|
66 if recipients and end > deadline: |
|
67 info("%r: collector %s has timeout with %d recipients", end, name, len(recipients)) |
|
68 break |
74 frontend.send_multipart(address_stack + [READY]) |
69 frontend.send_multipart(address_stack + [READY]) |
75 debug("collector %s is ready with %r backends", name, len(backends)) |
70 debug("collector %s is ready with %r backends", name, len(backends)) |
76 |
71 |
77 |
72 |
78 @checkzmqerror |
73 @checkzmqerror |
79 def broker_collector(frontend_url, backend_url): |
74 def broker_collector(frontend_url, backend_url, timeout): |
80 frontend = context.socket(zmq.XREP) |
75 frontend = context.socket(zmq.XREP) |
81 frontend.setsockopt(zmq.IDENTITY, backend_url) |
76 frontend.setsockopt(zmq.IDENTITY, backend_url) |
82 backend = context.socket(zmq.XREP) |
77 backend = context.socket(zmq.XREP) |
83 info("Binding broker frontend to %s", frontend_url) |
78 info("Binding broker frontend to %s", frontend_url) |
84 frontend.bind(frontend_url) |
79 frontend.bind(frontend_url) |
85 info("Binding broker backend to %s", backend_url) |
80 info("Binding broker backend to %s", backend_url) |
86 backend.bind(backend_url) |
81 backend.bind(backend_url) |
87 collector("broker", frontend, backend) |
82 collector("broker", frontend, backend, timeout) |
88 |
83 |
89 @checkzmqerror |
84 @checkzmqerror |
90 def proxy_collector(frontend_url, backend_url): |
85 def proxy_collector(frontend_url, backend_url, timeout): |
91 frontend = context.socket(zmq.XREQ) |
86 frontend = context.socket(zmq.XREQ) |
92 frontend.setsockopt(zmq.IDENTITY, backend_url) |
87 frontend.setsockopt(zmq.IDENTITY, backend_url) |
93 backend = context.socket(zmq.XREP) |
88 backend = context.socket(zmq.XREP) |
94 info("Connecting proxy frontend to %s", frontend_url) |
89 info("Connecting proxy frontend to %s", frontend_url) |
95 frontend.connect(frontend_url) |
90 frontend.connect(frontend_url) |
96 info("Binding proxy backend to %s", backend_url) |
91 info("Binding proxy backend to %s", backend_url) |
97 # Sending presence to frontend. |
92 # Sending presence to frontend. |
98 backend.bind(backend_url) |
93 backend.bind(backend_url) |
99 frontend.send_multipart(["", READY]) |
94 frontend.send_multipart(["", READY]) |
100 collector("proxy", frontend, backend) |
95 collector("proxy", frontend, backend, timeout) |
101 |
96 |
102 def worker(socket, workload, failure_rate = 0): |
97 def worker(socket, workload, failure_rate = 0): |
103 while True: |
98 while True: |
104 debug("Worker is ready") |
99 debug("Worker is ready") |
105 socket.send_multipart(["",READY]) |
100 socket.send_multipart(["",READY]) |
106 request = socket.recv_multipart() |
101 request = socket.recv_multipart() |
107 debug("Worker receive request %r", request) |
102 debug("Worker receive request %r", request) |
108 delim = request.index("") |
103 content = request.index("") + 1 |
109 address = request[:delim+1] |
104 address = request[:content] |
110 timeout = request[delim+1] |
105 request = request[content:] |
111 request = request[delim+2:] |
|
112 assert request[0] == "REQUEST" |
106 assert request[0] == "REQUEST" |
113 if failure_rate and random.randrange(failure_rate) == 0: |
107 if failure_rate and random.randrange(failure_rate) == 0: |
114 info("worker failed") |
108 info("worker failed") |
115 return False |
109 return False |
116 time.sleep(workload * (1 + random.random())) |
110 time.sleep(workload * (1 + random.random())) |
123 socket = context.socket(zmq.XREQ) |
117 socket = context.socket(zmq.XREQ) |
124 info("Connecting worker to %s", url) |
118 info("Connecting worker to %s", url) |
125 socket.connect(url) |
119 socket.connect(url) |
126 worker(socket, workload, failure_rate) |
120 worker(socket, workload, failure_rate) |
127 |
121 |
128 def requester(socket, timeout = -1): |
122 def requester(socket): |
129 while True: |
123 while True: |
130 i = str(counter.next()) |
124 i = str(counter.next()) |
131 info("Requester send request %s", i) |
125 info("Requester send request %s", i) |
132 socket.send_multipart(["", str(timeout), "REQUEST", i]) |
126 socket.send_multipart(["", "REQUEST", i]) |
133 results = 0 |
127 results = 0 |
134 while True: |
128 while True: |
135 reply = socket.recv_multipart() |
129 reply = socket.recv_multipart() |
136 debug("requester received reply %r", reply) |
130 debug("requester received reply %r", reply) |
137 if reply == ["",READY]: |
131 if reply == ["",READY]: |
138 break |
132 break |
139 assert reply[1] == i |
133 assert reply[1] == i |
140 results += 1 |
134 results += 1 |
141 info("requester received %d results", results) |
135 info("requester received %d results", results) |
142 # time.sleep(1) |
|
143 |
136 |
144 @checkzmqerror |
137 @checkzmqerror |
145 def connect_requester(url, timeout): |
138 def connect_requester(url): |
146 socket = context.socket(zmq.XREQ) |
139 socket = context.socket(zmq.XREQ) |
147 info("Connecting requester to %s", url) |
140 info("Connecting requester to %s", url) |
148 socket.connect(url) |
141 socket.connect(url) |
149 requester(socket, timeout) |
142 requester(socket) |
150 |
143 |
151 if __name__ == "__main__": |
144 if __name__ == "__main__": |
152 logging.getLogger().setLevel(logging.INFO) |
145 logging.getLogger().setLevel(logging.INFO) |
153 feurl = "inproc://frontend" |
146 feurl = "inproc://frontend" |
154 beurl = "inproc://backend" |
147 beurl = "inproc://backend" |
|
148 workload = 2.5 |
|
149 broker_timeout = 5000 |
|
150 proxy_timeout = 5000 |
155 brokers = [] |
151 brokers = [] |
156 broker = threading.Thread(target = broker_collector, args = (feurl, beurl)) |
152 broker = threading.Thread(target = broker_collector, args = (feurl, beurl, broker_timeout)) |
157 broker.start() |
153 broker.start() |
158 brokers.append(broker) |
154 brokers.append(broker) |
159 time.sleep(2) |
155 time.sleep(2) |
160 senders = [] |
156 senders = [] |
161 for sender in xrange(10): |
157 for sender in xrange(10): |
162 sender = threading.Thread(target = connect_requester, args = (feurl,5000)) |
158 sender = threading.Thread(target = connect_requester, args = (feurl,)) |
163 sender.start() |
159 sender.start() |
164 senders.append(sender) |
160 senders.append(sender) |
165 proxies = [] |
161 proxies = [] |
166 proxy_urls = [] |
162 proxy_urls = [] |
167 for proxy in xrange(5): |
163 for proxy in xrange(5): |
168 url = "inproc://proxy_be#%d" % (proxy,) |
164 url = "inproc://proxy_be#%d" % (proxy,) |
169 proxy = threading.Thread(target = proxy_collector, args = (beurl, url)) |
165 proxy = threading.Thread(target = proxy_collector, args = (beurl, url, proxy_timeout)) |
170 proxy.start() |
166 proxy.start() |
171 proxies.append(proxy) |
167 proxies.append(proxy) |
172 proxy_urls.append(url) |
168 proxy_urls.append(url) |
173 time.sleep(2) |
169 time.sleep(2) |
174 workers = [] |
170 workers = [] |
175 for url in proxy_urls: |
171 for url in proxy_urls: |
176 for work in xrange(5): |
172 for work in xrange(5): |
177 work = threading.Thread(target = connect_worker, args = (url, 1, 4800)) |
173 work = threading.Thread(target = connect_worker, args = (url, 3, 10)) |
178 work.start() |
174 work.start() |
179 workers.append(work) |
175 workers.append(work) |
180 time.sleep(20) |
176 time.sleep(20) |
181 info("Joining thread") |
177 info("Joining thread") |
182 context.term() |
178 context.term() |