author | Fabien Ninoles <fabien@tzone.org> |
Sun, 08 May 2011 22:47:41 -0400 | |
changeset 3 | 197572da88ea |
parent 2 | 2744eb2a589b |
child 4 | 4b5a51cb5fc7 |
permissions | -rw-r--r-- |
0 | 1 |
import random |
2 |
import time |
|
3 |
import threading |
|
4 |
import zmq |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
5 |
import logging |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
6 |
from logging import debug, info |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
7 |
from itertools import count |
0 | 8 |
|
9 |
READY = "READY" |
|
10 |
context = zmq.Context() |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
11 |
counter = count() |
0 | 12 |
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
13 |
def checkzmqerror(func): |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
14 |
def wrapper(*args, **kwargs): |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
15 |
try: |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
16 |
func(*args, **kwargs) |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
17 |
except zmq.ZMQError, err: |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
18 |
info("%r(*%r, **%r) is terminating", func, args, kwargs) |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
19 |
return wrapper |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
20 |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
21 |
def collector(name, frontend, backend): |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
22 |
backends = set() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
23 |
info("collector %s is ready with %r backends", name, len(backends)) |
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
24 |
dropped = 0 |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
25 |
while True: |
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
26 |
poller = zmq.Poller() |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
27 |
poller.register(backend, zmq.POLLIN) |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
28 |
poller.register(frontend, zmq.POLLIN) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
29 |
for socket, event in poller.poll(100): |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
30 |
request = socket.recv_multipart() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
31 |
debug("collector %s received request %r", name, request) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
32 |
if socket is backend: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
33 |
if request[2] == READY: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
34 |
debug("collector %s has new backend: %r", name, request[0]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
35 |
backends.add(request[0]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
36 |
else: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
37 |
debug("collector %s discard reply %r", name, request) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
38 |
else: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
39 |
delim = request.index("") |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
40 |
address_stack = request[:delim+1] |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
41 |
timeout = request[delim+1] |
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
42 |
debug("collector %s has new work to do in %s ms", name, timeout) |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
43 |
recipients = backends |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
44 |
backends = set() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
45 |
debug("collector %s send requests to %r", name, recipients) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
46 |
for dest in recipients: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
47 |
backend.send_multipart([dest] + request[delim:]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
48 |
timeout = int(timeout) |
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
49 |
poller = zmq.Poller() |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
50 |
poller.register(backend, zmq.POLLIN) |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
51 |
while recipients: |
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
52 |
start = time.time() |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
53 |
debug("%r: collector %s wait %r on %r", start, name, timeout, recipients) |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
54 |
events = poller.poll(timeout) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
55 |
if not events: |
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
56 |
end = time.time() |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
57 |
if (end-start)*1000 < timeout: |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
58 |
info("no event but timeout: %r", events) |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
59 |
else: |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
60 |
dropped += 1 |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
61 |
debug("%r: collector %s has a %d timeout with %r (%r)", end, name, dropped, recipients, timeout) |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
62 |
break |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
63 |
for socket, event in events: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
64 |
reply = socket.recv_multipart() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
65 |
if reply[2] == READY: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
66 |
debug("%r is ready on %s", reply[0], name) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
67 |
backends.add(reply[0]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
68 |
recipients.discard(reply[0]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
69 |
elif reply[0] in recipients: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
70 |
debug("collector %s forward reply", name) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
71 |
frontend.send_multipart(address_stack + reply[2:]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
72 |
else: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
73 |
debug("collector %s discard reply %r", name, reply) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
74 |
frontend.send_multipart(address_stack + [READY]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
75 |
info("collector %s is ready with %r backends", name, len(backends)) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
76 |
|
0 | 77 |
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
78 |
@checkzmqerror |
0 | 79 |
def broker_collector(frontend_url, backend_url): |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
80 |
frontend = context.socket(zmq.XREP) |
3 | 81 |
frontend.setsockopt(zmq.IDENTITY, backend_url) |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
82 |
backend = context.socket(zmq.XREP) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
83 |
info("Binding broker frontend to %s", frontend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
84 |
frontend.bind(frontend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
85 |
info("Binding broker backend to %s", backend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
86 |
backend.bind(backend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
87 |
collector("broker", frontend, backend) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
88 |
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
89 |
@checkzmqerror |
0 | 90 |
def proxy_collector(frontend_url, backend_url): |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
91 |
frontend = context.socket(zmq.XREQ) |
3 | 92 |
frontend.setsockopt(zmq.IDENTITY, backend_url) |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
93 |
backend = context.socket(zmq.XREP) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
94 |
info("Connecting proxy frontend to %s", frontend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
95 |
frontend.connect(frontend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
96 |
info("Binding proxy backend to %s", backend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
97 |
# Sending presence to frontend. |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
98 |
backend.bind(backend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
99 |
frontend.send_multipart(["", READY]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
100 |
collector("proxy", frontend, backend) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
101 |
|
0 | 102 |
def worker(socket, workload, failure_rate = 0): |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
103 |
while True: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
104 |
info("Worker is ready") |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
105 |
socket.send_multipart(["",READY]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
106 |
request = socket.recv_multipart() |
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
107 |
debug("Worker receive request %r", request) |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
108 |
delim = request.index("") |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
109 |
address = request[:delim+1] |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
110 |
timeout = request[delim+1] |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
111 |
request = request[delim+2:] |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
112 |
assert request[0] == "REQUEST" |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
113 |
if failure_rate and random.randrange(failure_rate) == 0: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
114 |
info("worker failed") |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
115 |
return False |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
116 |
time.sleep(workload) |
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
117 |
debug("worker send reply") |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
118 |
socket.send_multipart(address + [request[1], "DONE"]) |
0 | 119 |
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
120 |
@checkzmqerror |
0 | 121 |
def connect_worker(url, workload, failure_rate = 0): |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
122 |
while True: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
123 |
socket = context.socket(zmq.XREQ) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
124 |
info("Connecting worker to %s", url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
125 |
socket.connect(url) |
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
126 |
worker(socket, workload, failure_rate) |
0 | 127 |
|
128 |
def requester(socket, timeout = -1): |
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
129 |
while True: |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
130 |
i = str(counter.next()) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
131 |
info("Requester send request %s", i) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
132 |
socket.send_multipart(["", str(timeout), "REQUEST", i]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
133 |
results = 0 |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
134 |
while True: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
135 |
reply = socket.recv_multipart() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
136 |
debug("requester received reply %r", reply) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
137 |
if reply == ["",READY]: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
138 |
break |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
139 |
assert reply[1] == i |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
140 |
results += 1 |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
141 |
info("requester received %d results", results) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
142 |
# time.sleep(1) |
0 | 143 |
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
144 |
@checkzmqerror |
0 | 145 |
def connect_requester(url, timeout): |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
146 |
socket = context.socket(zmq.XREQ) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
147 |
info("Connecting requester to %s", url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
148 |
socket.connect(url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
149 |
requester(socket, timeout) |
0 | 150 |
|
151 |
if __name__ == "__main__": |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
152 |
logging.getLogger().setLevel(logging.INFO) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
153 |
feurl = "inproc://frontend" |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
154 |
beurl = "inproc://backend" |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
155 |
brokers = [] |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
156 |
broker = threading.Thread(target = broker_collector, args = (feurl, beurl)) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
157 |
broker.start() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
158 |
brokers.append(broker) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
159 |
time.sleep(2) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
160 |
senders = [] |
3 | 161 |
for sender in xrange(10): |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
162 |
sender = threading.Thread(target = connect_requester, args = (feurl,5000)) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
163 |
sender.start() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
164 |
senders.append(sender) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
165 |
proxies = [] |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
166 |
proxy_urls = [] |
3 | 167 |
for proxy in xrange(5): |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
168 |
url = "inproc://proxy_be#%d" % (proxy,) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
169 |
proxy = threading.Thread(target = proxy_collector, args = (beurl, url)) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
170 |
proxy.start() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
171 |
proxies.append(proxy) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
172 |
proxy_urls.append(url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
173 |
time.sleep(2) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
174 |
workers = [] |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
175 |
for url in proxy_urls: |
3 | 176 |
for work in xrange(5): |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
177 |
work = threading.Thread(target = connect_worker, args = (url, 1, 0)) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
178 |
work.start() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
179 |
workers.append(work) |
3 | 180 |
time.sleep(20) |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
181 |
info("Joining thread") |
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
182 |
context.term() |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
183 |
for thread in senders + brokers + proxies + workers: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
184 |
thread.join() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
185 |