threadpool.py
changeset 2 a00ae018daf8
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/threadpool.py	Sun Mar 21 17:06:53 2010 -0400
@@ -0,0 +1,55 @@
+#!/usr/bin/env python2.6
+
+import threading
+import logging
+import Queue
+
+class _Worker(threading.Thread):
+    def __init__(self, uid, pool):
+        threading.Thread.__init__(self, name = "Worker%d" % uid)
+        self.pool = pool
+    def run(self):
+        logging.info("%s started", self.getName())
+        while True:
+            job = self.pool.jobs.get()
+            if job:
+                logging.debug("%s is running %r", self.getName(), job)
+                try:
+                    job()
+                except:
+                    logging.exception("Job %r return an exception", job)
+                logging.debug("Job %r completed.", job)
+            else:
+                self.pool.jobs.put(job)
+                break
+        logging.info("%s stopped", self.getName())
+
+
+##
+# A basic worker pool... should be replaced with a resizable one.
+class ThreadPool(object):
+    Worker = _Worker
+    
+    def __init__(self, size):
+        super(ThreadPool, self).__init__()
+        self.jobs = Queue.Queue()
+        self._workers = [self.Worker(x+1, self) for x in xrange(size)]
+
+    def Start(self):
+        logging.info("Starting Workers pool")
+        for worker in self._workers:
+            worker.start()
+        logging.info("Workers started")
+
+    def Stop(self, now = False):
+        logging.info("Stopping Workers")
+        if now:
+            self.jobs = Queue.Queue()
+        self.jobs.put(None)
+        for worker in self._workers:
+            worker.join()
+        logging.info("Workers stopped")
+
+    def AddJob(self, job):
+        self.jobs.put(job)
+