8 |
8 |
9 READY = "READY" |
9 READY = "READY" |
10 context = zmq.Context() |
10 context = zmq.Context() |
11 counter = count() |
11 counter = count() |
12 |
12 |
|
13 def checkzmqerror(func): |
|
14 def wrapper(*args, **kwargs): |
|
15 try: |
|
16 func(*args, **kwargs) |
|
17 except zmq.ZMQError, err: |
|
18 info("%r(*%r, **%r) is terminating", func, args, kwargs) |
|
19 return wrapper |
|
20 |
13 def collector(name, frontend, backend): |
21 def collector(name, frontend, backend): |
14 poller = zmq.Poller() |
|
15 poller.register(backend, zmq.POLLIN) |
|
16 backends = set() |
22 backends = set() |
17 info("collector %s is ready with %r backends", name, len(backends)) |
23 info("collector %s is ready with %r backends", name, len(backends)) |
|
24 dropped = 0 |
18 while True: |
25 while True: |
|
26 poller = zmq.Poller() |
|
27 poller.register(backend, zmq.POLLIN) |
19 poller.register(frontend, zmq.POLLIN) |
28 poller.register(frontend, zmq.POLLIN) |
20 for socket, event in poller.poll(100): |
29 for socket, event in poller.poll(100): |
21 request = socket.recv_multipart() |
30 request = socket.recv_multipart() |
22 debug("collector %s received request %r", name, request) |
31 debug("collector %s received request %r", name, request) |
23 if socket is backend: |
32 if socket is backend: |
28 debug("collector %s discard reply %r", name, request) |
37 debug("collector %s discard reply %r", name, request) |
29 else: |
38 else: |
30 delim = request.index("") |
39 delim = request.index("") |
31 address_stack = request[:delim+1] |
40 address_stack = request[:delim+1] |
32 timeout = request[delim+1] |
41 timeout = request[delim+1] |
33 if timeout != "STOP": |
42 debug("collector %s has new work to do in %s ms", name, timeout) |
34 debug("collector %s has new work to do in %s ms", name, timeout) |
|
35 recipients = backends |
43 recipients = backends |
36 backends = set() |
44 backends = set() |
37 debug("collector %s send requests to %r", name, recipients) |
45 debug("collector %s send requests to %r", name, recipients) |
38 for dest in recipients: |
46 for dest in recipients: |
39 backend.send_multipart([dest] + request[delim:]) |
47 backend.send_multipart([dest] + request[delim:]) |
40 if timeout == "STOP": |
|
41 info("collector %s is terminating", name) |
|
42 return |
|
43 timeout = int(timeout) |
48 timeout = int(timeout) |
44 poller.unregister(frontend) |
49 poller = zmq.Poller() |
|
50 poller.register(backend, zmq.POLLIN) |
45 while recipients: |
51 while recipients: |
|
52 start = time.time() |
|
53 debug("%r: collector %s wait %r on %r", start, name, timeout, recipients) |
46 events = poller.poll(timeout) |
54 events = poller.poll(timeout) |
47 if not events: |
55 if not events: |
48 info("collector %s has a timeout with %r", name, recipients) |
56 end = time.time() |
49 break |
57 if (end-start)*1000 < timeout: |
|
58 info("no event but timeout: %r", events) |
|
59 else: |
|
60 dropped += 1 |
|
61 debug("%r: collector %s has a %d timeout with %r (%r)", end, name, dropped, recipients, timeout) |
|
62 break |
50 for socket, event in events: |
63 for socket, event in events: |
51 reply = socket.recv_multipart() |
64 reply = socket.recv_multipart() |
52 if reply[2] == READY: |
65 if reply[2] == READY: |
53 debug("%r is ready on %s", reply[0], name) |
66 debug("%r is ready on %s", reply[0], name) |
54 backends.add(reply[0]) |
67 backends.add(reply[0]) |
60 debug("collector %s discard reply %r", name, reply) |
73 debug("collector %s discard reply %r", name, reply) |
61 frontend.send_multipart(address_stack + [READY]) |
74 frontend.send_multipart(address_stack + [READY]) |
62 info("collector %s is ready with %r backends", name, len(backends)) |
75 info("collector %s is ready with %r backends", name, len(backends)) |
63 |
76 |
64 |
77 |
|
78 @checkzmqerror |
65 def broker_collector(frontend_url, backend_url): |
79 def broker_collector(frontend_url, backend_url): |
66 frontend = context.socket(zmq.XREP) |
80 frontend = context.socket(zmq.XREP) |
67 frontend.setsockopt(zmq.IDENTITY, "broker") |
81 frontend.setsockopt(zmq.IDENTITY, "broker") |
68 backend = context.socket(zmq.XREP) |
82 backend = context.socket(zmq.XREP) |
69 info("Binding broker frontend to %s", frontend_url) |
83 info("Binding broker frontend to %s", frontend_url) |
70 frontend.bind(frontend_url) |
84 frontend.bind(frontend_url) |
71 info("Binding broker backend to %s", backend_url) |
85 info("Binding broker backend to %s", backend_url) |
72 backend.bind(backend_url) |
86 backend.bind(backend_url) |
73 collector("broker", frontend, backend) |
87 collector("broker", frontend, backend) |
74 |
88 |
|
89 @checkzmqerror |
75 def proxy_collector(frontend_url, backend_url): |
90 def proxy_collector(frontend_url, backend_url): |
76 frontend = context.socket(zmq.XREQ) |
91 frontend = context.socket(zmq.XREQ) |
77 frontend.setsockopt(zmq.IDENTITY, "proxy") |
92 frontend.setsockopt(zmq.IDENTITY, "proxy") |
78 backend = context.socket(zmq.XREP) |
93 backend = context.socket(zmq.XREP) |
79 info("Connecting proxy frontend to %s", frontend_url) |
94 info("Connecting proxy frontend to %s", frontend_url) |
87 def worker(socket, workload, failure_rate = 0): |
102 def worker(socket, workload, failure_rate = 0): |
88 while True: |
103 while True: |
89 info("Worker is ready") |
104 info("Worker is ready") |
90 socket.send_multipart(["",READY]) |
105 socket.send_multipart(["",READY]) |
91 request = socket.recv_multipart() |
106 request = socket.recv_multipart() |
92 info("Worker receive request %r", request) |
107 debug("Worker receive request %r", request) |
93 delim = request.index("") |
108 delim = request.index("") |
94 address = request[:delim+1] |
109 address = request[:delim+1] |
95 timeout = request[delim+1] |
110 timeout = request[delim+1] |
96 if timeout == "STOP": |
|
97 info("worker is terminating") |
|
98 return True |
|
99 request = request[delim+2:] |
111 request = request[delim+2:] |
100 assert request[0] == "REQUEST" |
112 assert request[0] == "REQUEST" |
101 if failure_rate and random.randrange(failure_rate) == 0: |
113 if failure_rate and random.randrange(failure_rate) == 0: |
102 info("worker failed") |
114 info("worker failed") |
103 return False |
115 return False |
104 time.sleep(workload) |
116 time.sleep(workload) |
105 info("worker send reply") |
117 debug("worker send reply") |
106 socket.send_multipart(address + [request[1], "DONE"]) |
118 socket.send_multipart(address + [request[1], "DONE"]) |
107 |
119 |
|
120 @checkzmqerror |
108 def connect_worker(url, workload, failure_rate = 0): |
121 def connect_worker(url, workload, failure_rate = 0): |
109 while True: |
122 while True: |
110 socket = context.socket(zmq.XREQ) |
123 socket = context.socket(zmq.XREQ) |
111 info("Connecting worker to %s", url) |
124 info("Connecting worker to %s", url) |
112 socket.connect(url) |
125 socket.connect(url) |
113 if worker(socket, workload, failure_rate): |
126 worker(socket, workload, failure_rate) |
114 return |
|
115 |
|
116 stop = False |
|
117 |
127 |
118 def requester(socket, timeout = -1): |
128 def requester(socket, timeout = -1): |
119 while not stop: |
129 while True: |
120 i = str(counter.next()) |
130 i = str(counter.next()) |
121 info("Requester send request %s", i) |
131 info("Requester send request %s", i) |
122 socket.send_multipart(["", str(timeout), "REQUEST", i]) |
132 socket.send_multipart(["", str(timeout), "REQUEST", i]) |
123 results = 0 |
133 results = 0 |
124 while True: |
134 while True: |
128 break |
138 break |
129 assert reply[1] == i |
139 assert reply[1] == i |
130 results += 1 |
140 results += 1 |
131 info("requester received %d results", results) |
141 info("requester received %d results", results) |
132 # time.sleep(1) |
142 # time.sleep(1) |
133 info("requester is terminating") |
|
134 socket.send_multipart(["", "STOP"]) |
|
135 |
143 |
|
144 @checkzmqerror |
136 def connect_requester(url, timeout): |
145 def connect_requester(url, timeout): |
137 socket = context.socket(zmq.XREQ) |
146 socket = context.socket(zmq.XREQ) |
138 info("Connecting requester to %s", url) |
147 info("Connecting requester to %s", url) |
139 socket.connect(url) |
148 socket.connect(url) |
140 requester(socket, timeout) |
149 requester(socket, timeout) |