author | Fabien Ninoles <fabien@tzone.org> |
Tue, 10 May 2011 00:21:11 -0400 | |
changeset 6 | fdf7da5a5d21 |
parent 5 | 55f26e7ee45e |
permissions | -rw-r--r-- |
0 | 1 |
import random |
2 |
import time |
|
3 |
import threading |
|
4 |
import zmq |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
5 |
import logging |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
6 |
from logging import debug, info |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
7 |
from itertools import count |
0 | 8 |
|
9 |
READY = "READY" |
|
10 |
context = zmq.Context() |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
11 |
counter = count() |
0 | 12 |
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
13 |
def checkzmqerror(func): |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
14 |
def wrapper(*args, **kwargs): |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
15 |
try: |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
16 |
func(*args, **kwargs) |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
17 |
except zmq.ZMQError, err: |
4
4b5a51cb5fc7
Add failure and near timeout workload.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
18 |
info("%r(*%r, **%r) is terminating with error %s", func, args, kwargs, err) |
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
19 |
return wrapper |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
20 |
|
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
21 |
def collector(name, frontend, backend, timeout): |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
22 |
backends = set() |
4
4b5a51cb5fc7
Add failure and near timeout workload.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
23 |
debug("collector %s is ready with %r backends", name, len(backends)) |
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
24 |
dropped = 0 |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
25 |
while True: |
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
26 |
poller = zmq.Poller() |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
27 |
poller.register(backend, zmq.POLLIN) |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
28 |
poller.register(frontend, zmq.POLLIN) |
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
29 |
for socket, event in poller.poll(): |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
30 |
request = socket.recv_multipart() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
31 |
debug("collector %s received request %r", name, request) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
32 |
if socket is backend: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
33 |
if request[2] == READY: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
34 |
debug("collector %s has new backend: %r", name, request[0]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
35 |
backends.add(request[0]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
36 |
else: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
37 |
debug("collector %s discard reply %r", name, request) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
38 |
else: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
39 |
delim = request.index("") |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
40 |
address_stack = request[:delim+1] |
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
41 |
debug("collector %s has new work to do", name) |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
42 |
recipients = backends |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
43 |
backends = set() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
44 |
debug("collector %s send requests to %r", name, recipients) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
45 |
for dest in recipients: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
46 |
backend.send_multipart([dest] + request[delim:]) |
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
47 |
poller = zmq.Poller() |
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
48 |
poller.register(backend, zmq.POLLIN) |
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
49 |
start = time.time() |
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
50 |
deadline = start + timeout / 1000.0 |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
51 |
while recipients: |
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
52 |
debug("%r: collector %s wait on on %r", start, name, recipients) |
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
53 |
events = poller.poll(max(0,deadline-time.time())) |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
54 |
for socket, event in events: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
55 |
reply = socket.recv_multipart() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
56 |
if reply[2] == READY: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
57 |
debug("%r is ready on %s", reply[0], name) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
58 |
backends.add(reply[0]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
59 |
recipients.discard(reply[0]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
60 |
elif reply[0] in recipients: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
61 |
debug("collector %s forward reply", name) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
62 |
frontend.send_multipart(address_stack + reply[2:]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
63 |
else: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
64 |
debug("collector %s discard reply %r", name, reply) |
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
65 |
end = time.time() |
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
66 |
if recipients and end > deadline: |
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
67 |
info("%r: collector %s has timeout with %d recipients", end, name, len(recipients)) |
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
68 |
break |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
69 |
frontend.send_multipart(address_stack + [READY]) |
4
4b5a51cb5fc7
Add failure and near timeout workload.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
70 |
debug("collector %s is ready with %r backends", name, len(backends)) |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
71 |
|
0 | 72 |
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
73 |
@checkzmqerror |
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
74 |
def broker_collector(frontend_url, backend_url, timeout): |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
75 |
frontend = context.socket(zmq.XREP) |
3 | 76 |
frontend.setsockopt(zmq.IDENTITY, backend_url) |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
77 |
backend = context.socket(zmq.XREP) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
78 |
info("Binding broker frontend to %s", frontend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
79 |
frontend.bind(frontend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
80 |
info("Binding broker backend to %s", backend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
81 |
backend.bind(backend_url) |
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
82 |
collector("broker", frontend, backend, timeout) |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
83 |
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
84 |
@checkzmqerror |
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
85 |
def proxy_collector(frontend_url, backend_url, timeout): |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
86 |
frontend = context.socket(zmq.XREQ) |
3 | 87 |
frontend.setsockopt(zmq.IDENTITY, backend_url) |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
88 |
backend = context.socket(zmq.XREP) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
89 |
info("Connecting proxy frontend to %s", frontend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
90 |
frontend.connect(frontend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
91 |
info("Binding proxy backend to %s", backend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
92 |
# Sending presence to frontend. |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
93 |
backend.bind(backend_url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
94 |
frontend.send_multipart(["", READY]) |
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
95 |
collector("proxy", frontend, backend, timeout) |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
96 |
|
0 | 97 |
def worker(socket, workload, failure_rate = 0): |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
98 |
while True: |
4
4b5a51cb5fc7
Add failure and near timeout workload.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
99 |
debug("Worker is ready") |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
100 |
socket.send_multipart(["",READY]) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
101 |
request = socket.recv_multipart() |
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
102 |
debug("Worker receive request %r", request) |
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
103 |
content = request.index("") + 1 |
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
104 |
address = request[:content] |
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
105 |
request = request[content:] |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
106 |
assert request[0] == "REQUEST" |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
107 |
if failure_rate and random.randrange(failure_rate) == 0: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
108 |
info("worker failed") |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
109 |
return False |
5
55f26e7ee45e
Add randomness to workload.
Fabien Ninoles <fabien@tzone.org>
parents:
4
diff
changeset
|
110 |
time.sleep(workload * (1 + random.random())) |
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
111 |
debug("worker send reply") |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
112 |
socket.send_multipart(address + [request[1], "DONE"]) |
0 | 113 |
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
114 |
@checkzmqerror |
0 | 115 |
def connect_worker(url, workload, failure_rate = 0): |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
116 |
while True: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
117 |
socket = context.socket(zmq.XREQ) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
118 |
info("Connecting worker to %s", url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
119 |
socket.connect(url) |
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
120 |
worker(socket, workload, failure_rate) |
0 | 121 |
|
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
122 |
def requester(socket): |
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
123 |
while True: |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
124 |
i = str(counter.next()) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
125 |
info("Requester send request %s", i) |
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
126 |
socket.send_multipart(["", "REQUEST", i]) |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
127 |
results = 0 |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
128 |
while True: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
129 |
reply = socket.recv_multipart() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
130 |
debug("requester received reply %r", reply) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
131 |
if reply == ["",READY]: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
132 |
break |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
133 |
assert reply[1] == i |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
134 |
results += 1 |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
135 |
info("requester received %d results", results) |
0 | 136 |
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
137 |
@checkzmqerror |
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
138 |
def connect_requester(url): |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
139 |
socket = context.socket(zmq.XREQ) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
140 |
info("Connecting requester to %s", url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
141 |
socket.connect(url) |
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
142 |
requester(socket) |
0 | 143 |
|
144 |
if __name__ == "__main__": |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
145 |
logging.getLogger().setLevel(logging.INFO) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
146 |
feurl = "inproc://frontend" |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
147 |
beurl = "inproc://backend" |
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
148 |
workload = 2.5 |
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
149 |
broker_timeout = 5000 |
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
150 |
proxy_timeout = 5000 |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
151 |
brokers = [] |
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
152 |
broker = threading.Thread(target = broker_collector, args = (feurl, beurl, broker_timeout)) |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
153 |
broker.start() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
154 |
brokers.append(broker) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
155 |
time.sleep(2) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
156 |
senders = [] |
3 | 157 |
for sender in xrange(10): |
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
158 |
sender = threading.Thread(target = connect_requester, args = (feurl,)) |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
159 |
sender.start() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
160 |
senders.append(sender) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
161 |
proxies = [] |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
162 |
proxy_urls = [] |
3 | 163 |
for proxy in xrange(5): |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
164 |
url = "inproc://proxy_be#%d" % (proxy,) |
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
165 |
proxy = threading.Thread(target = proxy_collector, args = (beurl, url, proxy_timeout)) |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
166 |
proxy.start() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
167 |
proxies.append(proxy) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
168 |
proxy_urls.append(url) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
169 |
time.sleep(2) |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
170 |
workers = [] |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
171 |
for url in proxy_urls: |
3 | 172 |
for work in xrange(5): |
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
173 |
work = threading.Thread(target = connect_worker, args = (url, 3, 10)) |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
174 |
work.start() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
175 |
workers.append(work) |
3 | 176 |
time.sleep(20) |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
177 |
info("Joining thread") |
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
178 |
context.term() |
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
179 |
for thread in senders + brokers + proxies + workers: |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
180 |
thread.join() |
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
181 |