threadpool.py
changeset 2 a00ae018daf8
equal deleted inserted replaced
1:ee10f7cde07d 2:a00ae018daf8
       
     1 #!/usr/bin/env python2.6
       
     2 
       
     3 import threading
       
     4 import logging
       
     5 import Queue
       
     6 
       
     7 class _Worker(threading.Thread):
       
     8     def __init__(self, uid, pool):
       
     9         threading.Thread.__init__(self, name = "Worker%d" % uid)
       
    10         self.pool = pool
       
    11     def run(self):
       
    12         logging.info("%s started", self.getName())
       
    13         while True:
       
    14             job = self.pool.jobs.get()
       
    15             if job:
       
    16                 logging.debug("%s is running %r", self.getName(), job)
       
    17                 try:
       
    18                     job()
       
    19                 except:
       
    20                     logging.exception("Job %r return an exception", job)
       
    21                 logging.debug("Job %r completed.", job)
       
    22             else:
       
    23                 self.pool.jobs.put(job)
       
    24                 break
       
    25         logging.info("%s stopped", self.getName())
       
    26 
       
    27 
       
    28 ##
       
    29 # A basic worker pool... should be replaced with a resizable one.
       
    30 class ThreadPool(object):
       
    31     Worker = _Worker
       
    32     
       
    33     def __init__(self, size):
       
    34         super(ThreadPool, self).__init__()
       
    35         self.jobs = Queue.Queue()
       
    36         self._workers = [self.Worker(x+1, self) for x in xrange(size)]
       
    37 
       
    38     def Start(self):
       
    39         logging.info("Starting Workers pool")
       
    40         for worker in self._workers:
       
    41             worker.start()
       
    42         logging.info("Workers started")
       
    43 
       
    44     def Stop(self, now = False):
       
    45         logging.info("Stopping Workers")
       
    46         if now:
       
    47             self.jobs = Queue.Queue()
       
    48         self.jobs.put(None)
       
    49         for worker in self._workers:
       
    50             worker.join()
       
    51         logging.info("Workers stopped")
       
    52 
       
    53     def AddJob(self, job):
       
    54         self.jobs.put(job)
       
    55