|
1 #!/usr/bin/env python2.6 |
|
2 |
|
3 import threading |
|
4 import logging |
|
5 import Queue |
|
6 |
|
7 class _Worker(threading.Thread): |
|
8 def __init__(self, uid, pool): |
|
9 threading.Thread.__init__(self, name = "Worker%d" % uid) |
|
10 self.pool = pool |
|
11 def run(self): |
|
12 logging.info("%s started", self.getName()) |
|
13 while True: |
|
14 job = self.pool.jobs.get() |
|
15 if job: |
|
16 logging.debug("%s is running %r", self.getName(), job) |
|
17 try: |
|
18 job() |
|
19 except: |
|
20 logging.exception("Job %r return an exception", job) |
|
21 logging.debug("Job %r completed.", job) |
|
22 else: |
|
23 self.pool.jobs.put(job) |
|
24 break |
|
25 logging.info("%s stopped", self.getName()) |
|
26 |
|
27 |
|
28 ## |
|
29 # A basic worker pool... should be replaced with a resizable one. |
|
30 class ThreadPool(object): |
|
31 Worker = _Worker |
|
32 |
|
33 def __init__(self, size): |
|
34 super(ThreadPool, self).__init__() |
|
35 self.jobs = Queue.Queue() |
|
36 self._workers = [self.Worker(x+1, self) for x in xrange(size)] |
|
37 |
|
38 def Start(self): |
|
39 logging.info("Starting Workers pool") |
|
40 for worker in self._workers: |
|
41 worker.start() |
|
42 logging.info("Workers started") |
|
43 |
|
44 def Stop(self, now = False): |
|
45 logging.info("Stopping Workers") |
|
46 if now: |
|
47 self.jobs = Queue.Queue() |
|
48 self.jobs.put(None) |
|
49 for worker in self._workers: |
|
50 worker.join() |
|
51 logging.info("Workers stopped") |
|
52 |
|
53 def AddJob(self, job): |
|
54 self.jobs.put(job) |
|
55 |