# HG changeset patch # User Fabien Ninoles # Date 1269205613 14400 # Node ID a00ae018daf8833c349e1338a7bb6ca0c3b973ba # Parent ee10f7cde07d942aa2cb9eb2582bf375a95d613a Separate thread pool in another file and fix some issues. diff -r ee10f7cde07d -r a00ae018daf8 scheduler.py --- a/scheduler.py Sun Mar 21 16:50:13 2010 -0400 +++ b/scheduler.py Sun Mar 21 17:06:53 2010 -0400 @@ -3,53 +3,7 @@ import logging import threading import Queue - -## -# A basic worker pool... should be replaced with a resizable one. -class ThreadPool(object): - - class _Worker(threading.Thread): - def __init__(self, uid, pool): - threading.Thread.__init__(self, name = "Worker%d" % uid) - self.pool = pool - def run(self): - logging.info("%s started", self.getName()) - while True: - job = self.pool.jobs.get() - if job: - logging.debug("%s is running %r", self.getName(), job) - try: - job() - except: - logging.exception("Job %r return an exception", job) - logging.debug("Job %r completed.", job) - else: - self.pool.jobs.put(job) - break - logging.info("%s stopped", self.getName()) - - def __init__(self, size): - super(ThreadPool, self).__init__() - self.jobs = Queue.Queue() - self._workers = [self._Worker(x+1, self) for x in xrange(size)] - - def Start(self): - logging.info("Starting Workers pool") - for worker in self._workers: - worker.start() - logging.info("Workers started") - - def Stop(self, now = False): - logging.info("Stopping Workers") - if now: - self.jobs = Queue.Queue() - self.jobs.put(None) - for worker in self._workers: - worker.join() - logging.info("Workers stopped") - - def AddJob(self, job): - self.jobs.put(job) +from threadpool import ThreadPool ## # A task, representing a series of linked pairs of callback and error @@ -189,7 +143,7 @@ dst = Task() def CreateJobCallback(result): def Job(): - cb(result) + return cb(result) self.AddJob(dst, Job) src.AddCallback(CreateJobCallback) return dst @@ -212,15 +166,15 @@ if __name__ == '__main__': from time import sleep - logging.getLogger().setLevel(logging.DEBUG) + logging.getLogger().setLevel(logging.INFO) # This function is a sample and shouldn't know about the scheduler count = 0 - def AsyncCall(): + def AsyncCall(name, seconds): global count count += 1 - def Initialize(name, sleep): + def Initialize(name, seconds): print "Here", name - return name, sleep + return name, seconds def Blocking(args): name, time = args print name, "goes to bed" @@ -232,7 +186,7 @@ print name, "wakes up!" count -= 1 - dinit = Task(Initialize, "Toto", 5) + dinit = Task(Initialize, name, seconds) # How can I remove the scheduler from the picture ? # The only way I can see is to have a special kind of Task # and a suspended queue... May be this will also be more clean @@ -245,9 +199,10 @@ logging.info("Starting scheduler with 10 workers") StartScheduler(10) - dasync = AsyncCall() logging.info("Adding asynccall task") - scheduler.AddTask(dasync) + for x in xrange(int(sys.argv[1])): + dasync = AsyncCall("Toto%d" % (x+1), (x % 10)/10.0) + scheduler.AddTask(dasync) while count > 0: logging.debug("Count = %d", count) sleep(1) diff -r ee10f7cde07d -r a00ae018daf8 threadpool.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/threadpool.py Sun Mar 21 17:06:53 2010 -0400 @@ -0,0 +1,55 @@ +#!/usr/bin/env python2.6 + +import threading +import logging +import Queue + +class _Worker(threading.Thread): + def __init__(self, uid, pool): + threading.Thread.__init__(self, name = "Worker%d" % uid) + self.pool = pool + def run(self): + logging.info("%s started", self.getName()) + while True: + job = self.pool.jobs.get() + if job: + logging.debug("%s is running %r", self.getName(), job) + try: + job() + except: + logging.exception("Job %r return an exception", job) + logging.debug("Job %r completed.", job) + else: + self.pool.jobs.put(job) + break + logging.info("%s stopped", self.getName()) + + +## +# A basic worker pool... should be replaced with a resizable one. +class ThreadPool(object): + Worker = _Worker + + def __init__(self, size): + super(ThreadPool, self).__init__() + self.jobs = Queue.Queue() + self._workers = [self.Worker(x+1, self) for x in xrange(size)] + + def Start(self): + logging.info("Starting Workers pool") + for worker in self._workers: + worker.start() + logging.info("Workers started") + + def Stop(self, now = False): + logging.info("Stopping Workers") + if now: + self.jobs = Queue.Queue() + self.jobs.put(None) + for worker in self._workers: + worker.join() + logging.info("Workers stopped") + + def AddJob(self, job): + self.jobs.put(job) +