author | Fabien Ninoles <fabien@tzone.org> |
Sun, 08 May 2011 22:14:10 -0400 | |
changeset 1 | 48d514cc3309 |
parent 0 | 57d81f2bf26f |
child 2 | 2744eb2a589b |
permissions | -rw-r--r-- |
0 | 1 |
import random |
2 |
import time |
|
3 |
import threading |
|
4 |
import zmq |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
5 |
import logging |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
6 |
from logging import debug, info |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
7 |
from itertools import count |
0 | 8 |
|
9 |
READY = "READY" |
|
10 |
context = zmq.Context() |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
11 |
counter = count() |
0 | 12 |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
13 |
def collector(name, frontend, backend): |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
14 |
poller = zmq.Poller() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
15 |
poller.register(backend, zmq.POLLIN) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
16 |
backends = set() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
17 |
info("collector %s is ready with %r backends", name, len(backends)) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
18 |
while True: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
19 |
poller.register(frontend, zmq.POLLIN) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
20 |
for socket, event in poller.poll(100): |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
21 |
request = socket.recv_multipart() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
22 |
debug("collector %s received request %r", name, request) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
23 |
if socket is backend: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
24 |
if request[2] == READY: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
25 |
debug("collector %s has new backend: %r", name, request[0]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
26 |
backends.add(request[0]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
27 |
else: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
28 |
debug("collector %s discard reply %r", name, request) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
29 |
else: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
30 |
delim = request.index("") |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
31 |
address_stack = request[:delim+1] |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
32 |
timeout = request[delim+1] |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
33 |
if timeout != "STOP": |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
34 |
debug("collector %s has new work to do in %s ms", name, timeout) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
35 |
recipients = backends |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
36 |
backends = set() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
37 |
debug("collector %s send requests to %r", name, recipients) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
38 |
for dest in recipients: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
39 |
backend.send_multipart([dest] + request[delim:]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
40 |
if timeout == "STOP": |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
41 |
info("collector %s is terminating", name) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
42 |
return |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
43 |
timeout = int(timeout) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
44 |
poller.unregister(frontend) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
45 |
while recipients: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
46 |
events = poller.poll(timeout) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
47 |
if not events: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
48 |
info("collector %s has a timeout with %r", name, recipients) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
49 |
break |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
50 |
for socket, event in events: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
51 |
reply = socket.recv_multipart() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
52 |
if reply[2] == READY: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
53 |
debug("%r is ready on %s", reply[0], name) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
54 |
backends.add(reply[0]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
55 |
recipients.discard(reply[0]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
56 |
elif reply[0] in recipients: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
57 |
debug("collector %s forward reply", name) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
58 |
frontend.send_multipart(address_stack + reply[2:]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
59 |
else: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
60 |
debug("collector %s discard reply %r", name, reply) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
61 |
frontend.send_multipart(address_stack + [READY]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
62 |
info("collector %s is ready with %r backends", name, len(backends)) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
63 |
|
0 | 64 |
|
65 |
def broker_collector(frontend_url, backend_url): |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
66 |
frontend = context.socket(zmq.XREP) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
67 |
frontend.setsockopt(zmq.IDENTITY, "broker") |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
68 |
backend = context.socket(zmq.XREP) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
69 |
info("Binding broker frontend to %s", frontend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
70 |
frontend.bind(frontend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
71 |
info("Binding broker backend to %s", backend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
72 |
backend.bind(backend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
73 |
collector("broker", frontend, backend) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
74 |
|
0 | 75 |
def proxy_collector(frontend_url, backend_url): |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
76 |
frontend = context.socket(zmq.XREQ) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
77 |
frontend.setsockopt(zmq.IDENTITY, "proxy") |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
78 |
backend = context.socket(zmq.XREP) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
79 |
info("Connecting proxy frontend to %s", frontend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
80 |
frontend.connect(frontend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
81 |
info("Binding proxy backend to %s", backend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
82 |
# Sending presence to frontend. |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
83 |
backend.bind(backend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
84 |
frontend.send_multipart(["", READY]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
85 |
collector("proxy", frontend, backend) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
86 |
|
0 | 87 |
def worker(socket, workload, failure_rate = 0): |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
88 |
while True: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
89 |
info("Worker is ready") |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
90 |
socket.send_multipart(["",READY]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
91 |
request = socket.recv_multipart() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
92 |
info("Worker receive request %r", request) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
93 |
delim = request.index("") |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
94 |
address = request[:delim+1] |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
95 |
timeout = request[delim+1] |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
96 |
if timeout == "STOP": |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
97 |
info("worker is terminating") |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
98 |
return True |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
99 |
request = request[delim+2:] |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
100 |
assert request[0] == "REQUEST" |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
101 |
if failure_rate and random.randrange(failure_rate) == 0: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
102 |
info("worker failed") |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
103 |
return False |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
104 |
time.sleep(workload) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
105 |
info("worker send reply") |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
106 |
socket.send_multipart(address + [request[1], "DONE"]) |
0 | 107 |
|
108 |
def connect_worker(url, workload, failure_rate = 0): |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
109 |
while True: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
110 |
socket = context.socket(zmq.XREQ) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
111 |
info("Connecting worker to %s", url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
112 |
socket.connect(url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
113 |
if worker(socket, workload, failure_rate): |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
114 |
return |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
115 |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
116 |
stop = False |
0 | 117 |
|
118 |
def requester(socket, timeout = -1): |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
119 |
while not stop: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
120 |
i = str(counter.next()) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
121 |
info("Requester send request %s", i) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
122 |
socket.send_multipart(["", str(timeout), "REQUEST", i]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
123 |
results = 0 |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
124 |
while True: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
125 |
reply = socket.recv_multipart() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
126 |
debug("requester received reply %r", reply) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
127 |
if reply == ["",READY]: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
128 |
break |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
129 |
assert reply[1] == i |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
130 |
results += 1 |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
131 |
info("requester received %d results", results) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
132 |
# time.sleep(1) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
133 |
info("requester is terminating") |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
134 |
socket.send_multipart(["", "STOP"]) |
0 | 135 |
|
136 |
def connect_requester(url, timeout): |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
137 |
socket = context.socket(zmq.XREQ) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
138 |
info("Connecting requester to %s", url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
139 |
socket.connect(url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
140 |
requester(socket, timeout) |
0 | 141 |
|
142 |
if __name__ == "__main__": |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
143 |
logging.getLogger().setLevel(logging.INFO) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
144 |
feurl = "inproc://frontend" |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
145 |
beurl = "inproc://backend" |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
146 |
brokers = [] |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
147 |
broker = threading.Thread(target = broker_collector, args = (feurl, beurl)) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
148 |
broker.start() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
149 |
brokers.append(broker) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
150 |
time.sleep(2) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
151 |
senders = [] |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
152 |
for sender in xrange(5): |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
153 |
sender = threading.Thread(target = connect_requester, args = (feurl,5000)) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
154 |
sender.start() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
155 |
senders.append(sender) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
156 |
proxies = [] |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
157 |
proxy_urls = [] |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
158 |
for proxy in xrange(1): |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
159 |
url = "inproc://proxy_be#%d" % (proxy,) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
160 |
proxy = threading.Thread(target = proxy_collector, args = (beurl, url)) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
161 |
proxy.start() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
162 |
proxies.append(proxy) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
163 |
proxy_urls.append(url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
164 |
time.sleep(2) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
165 |
workers = [] |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
166 |
for url in proxy_urls: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
167 |
for work in xrange(1): |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
168 |
work = threading.Thread(target = connect_worker, args = (url, 1, 0)) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
169 |
work.start() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
170 |
workers.append(work) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
171 |
time.sleep(10) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
172 |
stop = True |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
173 |
info("Joining thread") |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
174 |
for thread in senders + brokers + proxies + workers: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
175 |
thread.join() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
176 |