collector.py
changeset 3 197572da88ea
parent 2 2744eb2a589b
child 4 4b5a51cb5fc7
equal deleted inserted replaced
2:2744eb2a589b 3:197572da88ea
    76 
    76 
    77 
    77 
    78 @checkzmqerror
    78 @checkzmqerror
    79 def broker_collector(frontend_url, backend_url):
    79 def broker_collector(frontend_url, backend_url):
    80     frontend = context.socket(zmq.XREP)
    80     frontend = context.socket(zmq.XREP)
    81     frontend.setsockopt(zmq.IDENTITY, "broker")
    81     frontend.setsockopt(zmq.IDENTITY, backend_url)
    82     backend = context.socket(zmq.XREP)
    82     backend = context.socket(zmq.XREP)
    83     info("Binding broker frontend to %s", frontend_url)
    83     info("Binding broker frontend to %s", frontend_url)
    84     frontend.bind(frontend_url)
    84     frontend.bind(frontend_url)
    85     info("Binding broker backend to %s", backend_url)
    85     info("Binding broker backend to %s", backend_url)
    86     backend.bind(backend_url)
    86     backend.bind(backend_url)
    87     collector("broker", frontend, backend)
    87     collector("broker", frontend, backend)
    88 
    88 
    89 @checkzmqerror
    89 @checkzmqerror
    90 def proxy_collector(frontend_url, backend_url):
    90 def proxy_collector(frontend_url, backend_url):
    91     frontend = context.socket(zmq.XREQ)
    91     frontend = context.socket(zmq.XREQ)
    92     frontend.setsockopt(zmq.IDENTITY, "proxy")
    92     frontend.setsockopt(zmq.IDENTITY, backend_url)
    93     backend = context.socket(zmq.XREP)
    93     backend = context.socket(zmq.XREP)
    94     info("Connecting proxy frontend to %s", frontend_url)
    94     info("Connecting proxy frontend to %s", frontend_url)
    95     frontend.connect(frontend_url)
    95     frontend.connect(frontend_url)
    96     info("Binding proxy backend to %s", backend_url)
    96     info("Binding proxy backend to %s", backend_url)
    97     # Sending presence to frontend.
    97     # Sending presence to frontend.
   156     broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
   156     broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
   157     broker.start()
   157     broker.start()
   158     brokers.append(broker)
   158     brokers.append(broker)
   159     time.sleep(2)
   159     time.sleep(2)
   160     senders = []
   160     senders = []
   161     for sender in xrange(5):
   161     for sender in xrange(10):
   162         sender = threading.Thread(target = connect_requester, args = (feurl,5000))
   162         sender = threading.Thread(target = connect_requester, args = (feurl,5000))
   163         sender.start()
   163         sender.start()
   164         senders.append(sender)
   164         senders.append(sender)
   165     proxies = []
   165     proxies = []
   166     proxy_urls = []
   166     proxy_urls = []
   167     for proxy in xrange(1):
   167     for proxy in xrange(5):
   168         url = "inproc://proxy_be#%d" % (proxy,)
   168         url = "inproc://proxy_be#%d" % (proxy,)
   169         proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
   169         proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
   170         proxy.start()
   170         proxy.start()
   171         proxies.append(proxy)
   171         proxies.append(proxy)
   172         proxy_urls.append(url)
   172         proxy_urls.append(url)
   173     time.sleep(2)
   173     time.sleep(2)
   174     workers = []
   174     workers = []
   175     for url in proxy_urls:
   175     for url in proxy_urls:
   176         for work in xrange(1):
   176         for work in xrange(5):
   177             work = threading.Thread(target = connect_worker, args = (url, 1, 0))
   177             work = threading.Thread(target = connect_worker, args = (url, 1, 0))
   178             work.start()
   178             work.start()
   179             workers.append(work)
   179             workers.append(work)
   180     time.sleep(10)
   180     time.sleep(20)
   181     info("Joining thread")
   181     info("Joining thread")
   182     context.term()
   182     context.term()
   183     for thread in senders + brokers + proxies + workers:
   183     for thread in senders + brokers + proxies + workers:
   184         thread.join()
   184         thread.join()
   185 
   185