collector.py
changeset 1 48d514cc3309
parent 0 57d81f2bf26f
child 2 2744eb2a589b
equal deleted inserted replaced
0:57d81f2bf26f 1:48d514cc3309
     1 import random
     1 import random
     2 import time
     2 import time
     3 import threading
     3 import threading
     4 import zmq
     4 import zmq
       
     5 import logging
       
     6 from logging import debug, info
       
     7 from itertools import count
       
     8 
       
     9 READY = "READY"
       
    10 context = zmq.Context()
       
    11 counter = count()
       
    12 
       
    13 def collector(name, frontend, backend):
       
    14     poller = zmq.Poller()
       
    15     poller.register(backend, zmq.POLLIN)
       
    16     backends = set()
       
    17     info("collector %s is ready with %r backends", name, len(backends))
       
    18     while True:
       
    19         poller.register(frontend, zmq.POLLIN)
       
    20         for socket, event in poller.poll(100):
       
    21             request = socket.recv_multipart()
       
    22             debug("collector %s received request %r", name, request)
       
    23             if socket is backend:
       
    24                 if request[2] == READY:
       
    25                     debug("collector %s has new backend: %r", name, request[0])
       
    26                     backends.add(request[0])
       
    27                 else:
       
    28                     debug("collector %s discard reply %r", name, request) 
       
    29             else:
       
    30                 delim = request.index("")
       
    31                 address_stack = request[:delim+1]
       
    32                 timeout = request[delim+1]
       
    33                 if timeout != "STOP":
       
    34                     debug("collector %s has new work to do in %s ms", name, timeout)
       
    35                 recipients = backends
       
    36                 backends = set()
       
    37                 debug("collector %s send requests to %r", name, recipients)
       
    38                 for dest in recipients:
       
    39                     backend.send_multipart([dest] + request[delim:])
       
    40                 if timeout == "STOP":
       
    41                     info("collector %s is terminating", name)
       
    42                     return
       
    43                 timeout = int(timeout)
       
    44                 poller.unregister(frontend)
       
    45                 while recipients:
       
    46                     events = poller.poll(timeout)
       
    47                     if not events:
       
    48                         info("collector %s has a timeout with %r", name, recipients)
       
    49                         break
       
    50                     for socket, event in events:
       
    51                         reply = socket.recv_multipart()
       
    52                         if reply[2] == READY:
       
    53                             debug("%r is ready on %s", reply[0], name)
       
    54                             backends.add(reply[0])
       
    55                             recipients.discard(reply[0])
       
    56                         elif reply[0] in recipients:
       
    57                             debug("collector %s forward reply", name)
       
    58                             frontend.send_multipart(address_stack + reply[2:])
       
    59                         else:
       
    60                             debug("collector %s discard reply %r", name, reply)
       
    61                 frontend.send_multipart(address_stack + [READY])
       
    62                 info("collector %s is ready with %r backends", name, len(backends))
       
    63 
       
    64 
       
    65 def broker_collector(frontend_url, backend_url):
       
    66     frontend = context.socket(zmq.XREP)
       
    67     frontend.setsockopt(zmq.IDENTITY, "broker")
       
    68     backend = context.socket(zmq.XREP)
       
    69     info("Binding broker frontend to %s", frontend_url)
       
    70     frontend.bind(frontend_url)
       
    71     info("Binding broker backend to %s", backend_url)
       
    72     backend.bind(backend_url)
       
    73     collector("broker", frontend, backend)
       
    74 
       
    75 def proxy_collector(frontend_url, backend_url):
       
    76     frontend = context.socket(zmq.XREQ)
       
    77     frontend.setsockopt(zmq.IDENTITY, "proxy")
       
    78     backend = context.socket(zmq.XREP)
       
    79     info("Connecting proxy frontend to %s", frontend_url)
       
    80     frontend.connect(frontend_url)
       
    81     info("Binding proxy backend to %s", backend_url)
       
    82     # Sending presence to frontend.
       
    83     backend.bind(backend_url)
       
    84     frontend.send_multipart(["", READY])
       
    85     collector("proxy", frontend, backend)
       
    86 
       
    87 def worker(socket, workload, failure_rate = 0):
       
    88     while True:
       
    89         info("Worker is ready")
       
    90         socket.send_multipart(["",READY])
       
    91         request = socket.recv_multipart()
       
    92         info("Worker receive request %r", request)
       
    93         delim = request.index("")
       
    94         address = request[:delim+1]
       
    95         timeout = request[delim+1]
       
    96         if timeout == "STOP":
       
    97             info("worker is terminating")
       
    98             return True
       
    99         request = request[delim+2:]
       
   100         assert request[0] == "REQUEST"
       
   101         if failure_rate and random.randrange(failure_rate) == 0:
       
   102             info("worker failed")
       
   103             return False
       
   104         time.sleep(workload)
       
   105         info("worker send reply")
       
   106         socket.send_multipart(address + [request[1], "DONE"])
       
   107 
       
   108 def connect_worker(url, workload, failure_rate = 0):
       
   109     while True:
       
   110         socket = context.socket(zmq.XREQ)
       
   111         info("Connecting worker to %s", url)
       
   112         socket.connect(url)
       
   113         if worker(socket, workload, failure_rate):
       
   114             return
     5 
   115 
     6 stop = False
   116 stop = False
     7 READY = "READY"
       
     8 context = zmq.Context()
       
     9 
       
    10 def collector(frontend, backend):
       
    11   poller = zmq.Poller()
       
    12   poller.register(backend, zmq.POLLIN)
       
    13   backends = set()
       
    14   while not stop:
       
    15     frontend.send_multipart(["",READY])
       
    16     poller.register(frontend, zmq.POLLIN)
       
    17     for socket, event in poller.poll(100):
       
    18       request = socket.recv_multipart()
       
    19       if socket is backend:
       
    20         if request[2] == READY:
       
    21           backends.add(request[0])
       
    22       else:
       
    23         timeout = int(request[request.index("")+1])
       
    24         recipients = backends
       
    25         backends = set()
       
    26         for dest in recipients:
       
    27           backend.send_multipart([dest] + request)
       
    28         poller.unregister(frontend)
       
    29         try:
       
    30           while not stop and recipients:
       
    31             for socket, event in poller.poll(timeout):
       
    32               reply = socket.recv_multipart()
       
    33               if reply[2] == READY:
       
    34                 backends.add(reply[0])
       
    35                 recipients.discard(reply[0])
       
    36               else:
       
    37                 frontend.send_multipart(reply[2:])
       
    38         except ZMQError:
       
    39           pass
       
    40 
       
    41 def broker_collector(frontend_url, backend_url):
       
    42   frontend = context.socket(zmq.XREP)
       
    43   backend = context.socket(zmq.XREP)
       
    44   frontend.bind(frontend_url)
       
    45   backend.bind(backend_url)
       
    46   collector(frontend, backend)
       
    47   
       
    48 def proxy_collector(frontend_url, backend_url):
       
    49   frontend = context.socket(zmq.XREP)
       
    50   backend = context.socket(zmq.XREP)
       
    51   frontend.connect(frontend_url)
       
    52   backend.bind(backend_url)
       
    53   collector(frontend, backend)
       
    54   
       
    55 def worker(socket, workload, failure_rate = 0):
       
    56   while not stop:
       
    57     socket.send_multipart(["",READY])
       
    58     request = socket.recv_multipart()
       
    59     delim = request.index("")
       
    60     timeout = request[delim+1]
       
    61     request = request[delim+2:]
       
    62     assert request[0] == "REQUEST"
       
    63     if failure_rate and random.randrange(failure_rate) == 0:
       
    64       return
       
    65     time.sleep(workload)
       
    66     socket.send_multipart(request[:delim+1] + ["DONE"])
       
    67 
       
    68 def connect_worker(url, workload, failure_rate = 0):
       
    69   while not stop:
       
    70     socket = context.socket(zmq.XREQ)
       
    71     socket.connect(url)
       
    72     worker(socket, workload, failure_rate)
       
    73 
   117 
    74 def requester(socket, timeout = -1):
   118 def requester(socket, timeout = -1):
    75   while not stop:
   119     while not stop:
    76     socket.send_multipart(["", str(timeout), "REQUEST"])
   120         i = str(counter.next())
    77     results = 0
   121         info("Requester send request %s", i)
    78     while True:
   122         socket.send_multipart(["", str(timeout), "REQUEST", i])
    79       reply = socket.recv_multipart()
   123         results = 0
    80       if reply == ["",READY]:
   124         while True:
    81         break
   125             reply = socket.recv_multipart()
    82       results += 1
   126             debug("requester received reply %r", reply)
    83     print results, "results received"
   127             if reply == ["",READY]:
    84   socket.send_multipart(["", "STOP"])
   128                 break
       
   129             assert reply[1] == i
       
   130             results += 1
       
   131         info("requester received %d results", results)
       
   132         # time.sleep(1)
       
   133     info("requester is terminating")
       
   134     socket.send_multipart(["", "STOP"])
    85 
   135 
    86 def connect_requester(url, timeout):
   136 def connect_requester(url, timeout):
    87   socket = context.socket(zmq.XREQ)
   137     socket = context.socket(zmq.XREQ)
    88   socket.connect(url)
   138     info("Connecting requester to %s", url)
    89   requester(socket, timeout)
   139     socket.connect(url)
       
   140     requester(socket, timeout)
    90 
   141 
    91 if __name__ == "__main__":
   142 if __name__ == "__main__":
    92   feurl = "inproc://frontend"
   143     logging.getLogger().setLevel(logging.INFO)
    93   beurl = "inproc://backend"
   144     feurl = "inproc://frontend"
    94   brokers = []
   145     beurl = "inproc://backend"
    95   broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
   146     brokers = []
    96   broker.start()
   147     broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
    97   brokers.append(broker)
   148     broker.start()
    98   time.sleep(1)
   149     brokers.append(broker)
    99   senders = []
   150     time.sleep(2)
   100   for sender in xrange(2):
   151     senders = []
   101     sender = threading.Thread(target = connect_requester, args = (feurl,1000))
   152     for sender in xrange(5):
   102     sender.start()
   153         sender = threading.Thread(target = connect_requester, args = (feurl,5000))
   103     senders.append(sender)
   154         sender.start()
   104   proxies = []
   155         senders.append(sender)
   105   proxy_urls = []
   156     proxies = []
   106   for proxy in xrange(2):
   157     proxy_urls = []
   107     url = "inproc://proxy_be#%d" % (proxy,)
   158     for proxy in xrange(1):
   108     proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
   159         url = "inproc://proxy_be#%d" % (proxy,)
   109     proxy.start()
   160         proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
   110     proxies.append(proxy)
   161         proxy.start()
   111     proxy_urls.append(url)
   162         proxies.append(proxy)
   112   time.sleep(1)
   163         proxy_urls.append(url)
   113   workers = []
   164     time.sleep(2)
   114   for url in proxy_urls:
   165     workers = []
   115     for work in xrange(5):
   166     for url in proxy_urls:
   116       work = threading.Thread(target = connect_worker, args = (url, 0.1, 100))
   167         for work in xrange(1):
   117       work.start()
   168             work = threading.Thread(target = connect_worker, args = (url, 1, 0))
   118       workers.append(work)
   169             work.start()
   119   time.sleep(10)
   170             workers.append(work)
   120   stop = True
   171     time.sleep(10)
   121   for thread in senders + brokers + proxies + workers:
   172     stop = True
   122     thread.join()
   173     info("Joining thread")
   123     
   174     for thread in senders + brokers + proxies + workers:
       
   175         thread.join()
       
   176