76 |
76 |
77 |
77 |
78 @checkzmqerror |
78 @checkzmqerror |
79 def broker_collector(frontend_url, backend_url): |
79 def broker_collector(frontend_url, backend_url): |
80 frontend = context.socket(zmq.XREP) |
80 frontend = context.socket(zmq.XREP) |
81 frontend.setsockopt(zmq.IDENTITY, "broker") |
81 frontend.setsockopt(zmq.IDENTITY, backend_url) |
82 backend = context.socket(zmq.XREP) |
82 backend = context.socket(zmq.XREP) |
83 info("Binding broker frontend to %s", frontend_url) |
83 info("Binding broker frontend to %s", frontend_url) |
84 frontend.bind(frontend_url) |
84 frontend.bind(frontend_url) |
85 info("Binding broker backend to %s", backend_url) |
85 info("Binding broker backend to %s", backend_url) |
86 backend.bind(backend_url) |
86 backend.bind(backend_url) |
87 collector("broker", frontend, backend) |
87 collector("broker", frontend, backend) |
88 |
88 |
89 @checkzmqerror |
89 @checkzmqerror |
90 def proxy_collector(frontend_url, backend_url): |
90 def proxy_collector(frontend_url, backend_url): |
91 frontend = context.socket(zmq.XREQ) |
91 frontend = context.socket(zmq.XREQ) |
92 frontend.setsockopt(zmq.IDENTITY, "proxy") |
92 frontend.setsockopt(zmq.IDENTITY, backend_url) |
93 backend = context.socket(zmq.XREP) |
93 backend = context.socket(zmq.XREP) |
94 info("Connecting proxy frontend to %s", frontend_url) |
94 info("Connecting proxy frontend to %s", frontend_url) |
95 frontend.connect(frontend_url) |
95 frontend.connect(frontend_url) |
96 info("Binding proxy backend to %s", backend_url) |
96 info("Binding proxy backend to %s", backend_url) |
97 # Sending presence to frontend. |
97 # Sending presence to frontend. |
156 broker = threading.Thread(target = broker_collector, args = (feurl, beurl)) |
156 broker = threading.Thread(target = broker_collector, args = (feurl, beurl)) |
157 broker.start() |
157 broker.start() |
158 brokers.append(broker) |
158 brokers.append(broker) |
159 time.sleep(2) |
159 time.sleep(2) |
160 senders = [] |
160 senders = [] |
161 for sender in xrange(5): |
161 for sender in xrange(10): |
162 sender = threading.Thread(target = connect_requester, args = (feurl,5000)) |
162 sender = threading.Thread(target = connect_requester, args = (feurl,5000)) |
163 sender.start() |
163 sender.start() |
164 senders.append(sender) |
164 senders.append(sender) |
165 proxies = [] |
165 proxies = [] |
166 proxy_urls = [] |
166 proxy_urls = [] |
167 for proxy in xrange(1): |
167 for proxy in xrange(5): |
168 url = "inproc://proxy_be#%d" % (proxy,) |
168 url = "inproc://proxy_be#%d" % (proxy,) |
169 proxy = threading.Thread(target = proxy_collector, args = (beurl, url)) |
169 proxy = threading.Thread(target = proxy_collector, args = (beurl, url)) |
170 proxy.start() |
170 proxy.start() |
171 proxies.append(proxy) |
171 proxies.append(proxy) |
172 proxy_urls.append(url) |
172 proxy_urls.append(url) |
173 time.sleep(2) |
173 time.sleep(2) |
174 workers = [] |
174 workers = [] |
175 for url in proxy_urls: |
175 for url in proxy_urls: |
176 for work in xrange(1): |
176 for work in xrange(5): |
177 work = threading.Thread(target = connect_worker, args = (url, 1, 0)) |
177 work = threading.Thread(target = connect_worker, args = (url, 1, 0)) |
178 work.start() |
178 work.start() |
179 workers.append(work) |
179 workers.append(work) |
180 time.sleep(10) |
180 time.sleep(20) |
181 info("Joining thread") |
181 info("Joining thread") |
182 context.term() |
182 context.term() |
183 for thread in senders + brokers + proxies + workers: |
183 for thread in senders + brokers + proxies + workers: |
184 thread.join() |
184 thread.join() |
185 |
185 |