1 #!/usr/bin/env python2.6 |
1 #!/usr/bin/env python2.6 |
2 import sys |
2 import sys |
3 import logging |
3 import logging |
4 import threading |
4 import threading |
5 import Queue |
5 import Queue |
6 |
6 from threadpool import ThreadPool |
7 ## |
|
8 # A basic worker pool... should be replaced with a resizable one. |
|
9 class ThreadPool(object): |
|
10 |
|
11 class _Worker(threading.Thread): |
|
12 def __init__(self, uid, pool): |
|
13 threading.Thread.__init__(self, name = "Worker%d" % uid) |
|
14 self.pool = pool |
|
15 def run(self): |
|
16 logging.info("%s started", self.getName()) |
|
17 while True: |
|
18 job = self.pool.jobs.get() |
|
19 if job: |
|
20 logging.debug("%s is running %r", self.getName(), job) |
|
21 try: |
|
22 job() |
|
23 except: |
|
24 logging.exception("Job %r return an exception", job) |
|
25 logging.debug("Job %r completed.", job) |
|
26 else: |
|
27 self.pool.jobs.put(job) |
|
28 break |
|
29 logging.info("%s stopped", self.getName()) |
|
30 |
|
31 def __init__(self, size): |
|
32 super(ThreadPool, self).__init__() |
|
33 self.jobs = Queue.Queue() |
|
34 self._workers = [self._Worker(x+1, self) for x in xrange(size)] |
|
35 |
|
36 def Start(self): |
|
37 logging.info("Starting Workers pool") |
|
38 for worker in self._workers: |
|
39 worker.start() |
|
40 logging.info("Workers started") |
|
41 |
|
42 def Stop(self, now = False): |
|
43 logging.info("Stopping Workers") |
|
44 if now: |
|
45 self.jobs = Queue.Queue() |
|
46 self.jobs.put(None) |
|
47 for worker in self._workers: |
|
48 worker.join() |
|
49 logging.info("Workers stopped") |
|
50 |
|
51 def AddJob(self, job): |
|
52 self.jobs.put(job) |
|
53 |
7 |
54 ## |
8 ## |
55 # A task, representing a series of linked pairs of callback and error |
9 # A task, representing a series of linked pairs of callback and error |
56 # handler Concept is similar to twisted, but you need to put all your |
10 # handler Concept is similar to twisted, but you need to put all your |
57 # callback on the task before giving it to the Scheduler. For this |
11 # callback on the task before giving it to the Scheduler. For this |
187 # the current task suspend until the job is finished, then resume. |
141 # the current task suspend until the job is finished, then resume. |
188 def CreateCallbackAsJob(self, src, cb): |
142 def CreateCallbackAsJob(self, src, cb): |
189 dst = Task() |
143 dst = Task() |
190 def CreateJobCallback(result): |
144 def CreateJobCallback(result): |
191 def Job(): |
145 def Job(): |
192 cb(result) |
146 return cb(result) |
193 self.AddJob(dst, Job) |
147 self.AddJob(dst, Job) |
194 src.AddCallback(CreateJobCallback) |
148 src.AddCallback(CreateJobCallback) |
195 return dst |
149 return dst |
196 |
150 |
197 # The global scheduler |
151 # The global scheduler |
210 scheduler.Stop(now) |
164 scheduler.Stop(now) |
211 scheduler = None |
165 scheduler = None |
212 |
166 |
213 if __name__ == '__main__': |
167 if __name__ == '__main__': |
214 from time import sleep |
168 from time import sleep |
215 logging.getLogger().setLevel(logging.DEBUG) |
169 logging.getLogger().setLevel(logging.INFO) |
216 # This function is a sample and shouldn't know about the scheduler |
170 # This function is a sample and shouldn't know about the scheduler |
217 count = 0 |
171 count = 0 |
218 def AsyncCall(): |
172 def AsyncCall(name, seconds): |
219 global count |
173 global count |
220 count += 1 |
174 count += 1 |
221 def Initialize(name, sleep): |
175 def Initialize(name, seconds): |
222 print "Here", name |
176 print "Here", name |
223 return name, sleep |
177 return name, seconds |
224 def Blocking(args): |
178 def Blocking(args): |
225 name, time = args |
179 name, time = args |
226 print name, "goes to bed" |
180 print name, "goes to bed" |
227 sleep(time) |
181 sleep(time) |
228 print name, ": ZZZ..." |
182 print name, ": ZZZ..." |
230 def Finalize(name): |
184 def Finalize(name): |
231 global count |
185 global count |
232 print name, "wakes up!" |
186 print name, "wakes up!" |
233 count -= 1 |
187 count -= 1 |
234 |
188 |
235 dinit = Task(Initialize, "Toto", 5) |
189 dinit = Task(Initialize, name, seconds) |
236 # How can I remove the scheduler from the picture ? |
190 # How can I remove the scheduler from the picture ? |
237 # The only way I can see is to have a special kind of Task |
191 # The only way I can see is to have a special kind of Task |
238 # and a suspended queue... May be this will also be more clean |
192 # and a suspended queue... May be this will also be more clean |
239 dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking) |
193 dfinal = scheduler.CreateCallbackAsJob(dinit, Blocking) |
240 dfinal.AddCallback(Finalize) |
194 dfinal.AddCallback(Finalize) |
243 # its execution. |
197 # its execution. |
244 return dinit |
198 return dinit |
245 |
199 |
246 logging.info("Starting scheduler with 10 workers") |
200 logging.info("Starting scheduler with 10 workers") |
247 StartScheduler(10) |
201 StartScheduler(10) |
248 dasync = AsyncCall() |
|
249 logging.info("Adding asynccall task") |
202 logging.info("Adding asynccall task") |
250 scheduler.AddTask(dasync) |
203 for x in xrange(int(sys.argv[1])): |
|
204 dasync = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0) |
|
205 scheduler.AddTask(dasync) |
251 while count > 0: |
206 while count > 0: |
252 logging.debug("Count = %d", count) |
207 logging.debug("Count = %d", count) |
253 sleep(1) |
208 sleep(1) |
254 logging.info("Stopping scheduler") |
209 logging.info("Stopping scheduler") |
255 StopScheduler() |
210 StopScheduler() |